Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Amazon SQS Connector #1

Merged
merged 19 commits into from
Sep 9, 2019
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@

# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*

target/
ballerina.conf
8 changes: 8 additions & 0 deletions Ballerina.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[project]
org-name = "wso2"
version = "0.1.4"
authors = ["WSO2"]
keywords = ["amazon", "sqs", "mq"]
repository = "https://github.com/wso2-ballerina/module-amazonsqs"
license = "Apache-2.0"

37 changes: 37 additions & 0 deletions amazonsqs/Module.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
Connects to Amazon SQS service.

# Module Overview

## Compatibility
| Ballerina Language Version
| --------------------------
| 0.990.3

## Sample

```ballerina
import ballerina/config;
import ballerina/io;
import wso2/amazonrekn;
import wso2/amazoncommons;

amazonsqs:Configuration configuration = {
accessKey: config:getAsString("ACCESS_KEY_ID"),
secretKey: config:getAsString("SECRET_ACCESS_KEY"),
region: config:getAsString("REGION"),
accountNumber: config:getAsString("ACCOUNT_NUMBER")
};

amazonsqs:Client sqsClient = new(config);

public function main() {
map<string> attributes = {};
attributes["VisibilityTimeout"] = "400";
attributes["FifoQueue"] = "true";

string|error response = sqsClient->createQueue("demo.fifo", attributes);
if(response is string && response.hasPrefix("http")) {
log:printInfo("Created queue: \n" + response);
}
}
```
197 changes: 197 additions & 0 deletions amazonsqs/amazonsqs_connector.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 Inc. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/crypto;
import ballerina/encoding;
import ballerina/http;
import ballerina/internal;
import ballerina/io;
import ballerina/system;
import ballerina/time;
import wso2/amazoncommons;
import ballerina/log;


# Object to initialize the connection with Amazon SQS.
#
# + accessKey - The Amazon API access key
# + secretKey - The Amazon API secret key
# + region - The Amazon API Region
# + acctNum - The account number of the SQS service
public type Client client object {

http:Client clientEp;
string accessKey;
string secretKey;
string region;
string acctNum;
string host;

public function __init(Configuration config) {
self.accessKey = config.accessKey;
self.secretKey = config.secretKey;
self.acctNum = config.accountNumber;
self.region = config.region;
self.host = SQS_SERVICE_NAME + "." + self.region + "." + AMAZON_HOST;
self.clientEp = new("https://" + self.host);
Maninda marked this conversation as resolved.
Show resolved Hide resolved
}

public remote function createQueue(string queueName, map<string> attributes) returns string|error;

public remote function sendMessage(string messageBody, string queueResourcePath, map<string> attributes)
returns OutboundMessage|error;

public remote function receiveMessage(string queueResourcePath, map<string> attributes)
returns InboundMessage[]|error;

public remote function deleteMessage(string queueResourcePath, string receiptHandle) returns boolean|error;

};

# Creates a new queue in SQS
Maninda marked this conversation as resolved.
Show resolved Hide resolved
#
# + queueName - Name of the queue to be created
# + attributes - Other attribute parameters
# + return - If success, URL of the created queue, else returns error
public remote function Client.createQueue(string queueName, map<string> attributes) returns string|error{

string amzTarget = "AmazonSQSv20121105.CreateQueue";
string endpoint = "/";
string payload = "";
payload = payload + "Action=CreateQueue";
payload = payload + "&Version=2012-11-05";
payload = payload + "&QueueName=" + queueName;
int attributeNumber = 1;
foreach var (k, v) in attributes {
payload = payload + "&Attribute." + attributeNumber + ".Name=" + k;
payload = payload + "&Attribute." + attributeNumber + ".Value=" + v;
attributeNumber = attributeNumber + 1;
}

http:Request request = check generatePOSTRequest(self.accessKey, self.secretKey, self.host, amzTarget,
endpoint, self.region, payload);
var httpResponse = self.clientEp->post(endpoint, request);
json|error response = handleResponse(httpResponse);
if(response is json){
Maninda marked this conversation as resolved.
Show resolved Hide resolved
return jsonToCreatedQueueUrl(response);
} else {
return response;
}

}

# Send a new message to a SQS queue
#
# + messageBody - Message body string to be sent
# + queueResourcePath - Resource path to the queue from the host address. e.g.: /610968236798/myQueue.fifo
# + attributes - Non-mandatory parameters for sending a message
# + return - If success, details of the sent message, else returns error
public remote function Client.sendMessage(string messageBody, string queueResourcePath, map<string> attributes)
returns OutboundMessage|error {

Maninda marked this conversation as resolved.
Show resolved Hide resolved
string amzTarget = "AmazonSQSv20121105.SendMessage";
string payload = "";
payload = payload + "Action=SendMessage";
payload = payload + "&MessageBody=" + check http:encode(messageBody, "UTF-8");

int attributeNumber = 1;
foreach var (k, v) in attributes {
payload = payload + "&" + k + "=" + v;
attributeNumber = attributeNumber + 1;
}

http:Request request = check generatePOSTRequest(self.accessKey, self.secretKey, self.host, amzTarget,
queueResourcePath, self.region, payload);
var httpResponse = self.clientEp->post(queueResourcePath, request);
json|error response = handleResponse(httpResponse);
if(response is json){
return jsonToOutboundMessage(response);
} else {
return response;
}

}

# Receive the message(s) from the queue
Maninda marked this conversation as resolved.
Show resolved Hide resolved
#
# + queueResourcePath - Resource path to the queue from the host address. e.g.: /610968236798/myQueue.fifo
# + attributes - Non-mandatory parameters for receiving a message
# + return - If success, details of the received message, else returns error
public remote function Client.receiveMessage(string queueResourcePath, map<string> attributes)
returns InboundMessage[]|error {

string amzTarget = "AmazonSQSv20121105.ReceiveMessage";
string payload = "";
payload = payload + "&Action=ReceiveMessage";
int attributeNumber = 1;
foreach var (k, v) in attributes {
payload = payload + "&" + k + "=" + v;
attributeNumber = attributeNumber + 1;
}

http:Request request = check generatePOSTRequest(self.accessKey, self.secretKey, self.host, amzTarget,
queueResourcePath, self.region, payload);
var httpResponse = self.clientEp->post(queueResourcePath, request);
json|error response = handleResponse(httpResponse);
if(response is json){
return jsonToInboundMessages(response);
} else {
return response;
}

}

# Delete the message(s) from the queue for the given receiptHandle
Maninda marked this conversation as resolved.
Show resolved Hide resolved
#
# + queueResourcePath - Resource path to the queue from the host address. e.g.: /610968236798/myQueue.fifo
# + receiptHandle - Receipt Handle parameter for the message(s) to be deleted
# + return - Whether the message(s) were successfully deleted or whether an error occurred
public remote function Client.deleteMessage(string queueResourcePath, string receiptHandle) returns boolean|error {

string amzTarget = "AmazonSQSv20121105.DeleteMessage";
string receiptHandleEncoded = check http:encode(receiptHandle, "UTF-8");
string payload = "";
payload = payload + "Action=DeleteMessage";
payload = payload + "&ReceiptHandle=" + receiptHandleEncoded;

http:Request request = check generatePOSTRequest(self.accessKey, self.secretKey, self.host,
amzTarget, queueResourcePath, self.region, payload);
var httpResponse = self.clientEp->post(queueResourcePath, request);
json|error response = handleResponse(httpResponse);
if (response is json) {
if (response.DeleteMessageResponse != null) {
return true;
} else {
return false;
}
} else {
return response;
}

Maninda marked this conversation as resolved.
Show resolved Hide resolved
}

# Configuration provided for the client
#
# + accessKey - accessKey of Amazon Account
# + secretKey - secretKey of Amazon Account
# + region - region of SQS Queue
# + accountNumber - account number of the SQS queue
public type Configuration record {
string accessKey;
string secretKey;
string region;
string accountNumber;
};
52 changes: 52 additions & 0 deletions amazonsqs/amazonsqs_constants.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 Inc. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

const SQS_SERVICE_NAME = "sqs";
const SQS_CONTENT_TYPE = "application/x-www-form-urlencoded";

const HOST = "Host";
const X_AMZ_CONTENT_SHA256 = "X-Amz-Content-Sha256";
const UNSIGNED_PAYLOAD = "UNSIGNED-PAYLOAD";
const AMAZONSQS_ERROR_CODE = "(wso2/amazonsqs)AmazonsqsError";
const AMAZON_HOST = "amazonaws.com";

const ISO8601_BASIC_DATE_FORMAT = "yyyyMMdd'T'HHmmss'Z'";
const SHORT_DATE_FORMAT = "yyyyMMdd";
const X_AMZ_DATE = "X-Amz-Date";
const X_AMZ_SECURITY_TOKEN = "X-Amz-Security-Token";
const UTF_8 = "UTF-8";
const CONTENT_TYPE = "Content-Type";
const AWS4_HMAC_SHA256 = "AWS4-HMAC-SHA256";
const SERVICE_NAME = "sqs";
const TERMINATION_STRING = "aws4_request";
const AWS4 = "AWS4";
const CREDENTIAL = "Credential";
const SIGNED_HEADER = " SignedHeaders";
const SIGNATURE = " Signature";
const AUTHORIZATION = "Authorization";
const PUT = "PUT";
const POST = "POST";
const GET = "GET";

const string STATUS_CODE = "status code";
const string COLON_SYMBOL = ":";
const string SEMICOLON_SYMBOL = ";";
const string WHITE_SPACE = " ";
const string MESSAGE = "message";
const string NEW_LINE = "\n";
const string ERROR = "error";

const string EMPTY_STRING = "";
Loading