Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ add_subdirectory(tests/unit)

add_subdirectory(samples/PubSub)

add_subdirectory(samples/Jobs)

add_subdirectory(samples/ShadowDelta)

add_subdirectory(samples/Discovery EXCLUDE_FROM_ALL)
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ The Device SDK provides functionality to create and maintain a MQTT Connection.
### Thing Shadow
This SDK implements the specific protocol for Thing Shadows to retrieve, update and delete Thing Shadows adhering to the protocol that is implemented to ensure correct versioning and support for client tokens. It abstracts the necessary MQTT topic subscriptions by automatically subscribing to and unsubscribing from the reserved topics as needed for each API call. Inbound state change requests are automatically signalled via a configurable callback.

### Jobs
This SDK also implements the Jobs protocol to interact with the AWS IoT Jobs service. The IoT Job service manages deployment of IoT fleet wide tasks such as device software/firmware deployments and updates, rotation of security certificates, device reboots, and custom device specific management tasks. For additional information please see the [Jobs developer guide](https://docs.aws.amazon.com/iot/latest/developerguide/iot-jobs.html).

<a name="design"></a>
## Design Goals of this SDK
The C++ SDK was specifically designed for devices that are not resource constrained and required advanced features such as Message queueing, multi-threading support and the latest language features
Expand Down
7 changes: 6 additions & 1 deletion include/ResponseCode.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ namespace awsiotsdk {

// Discovery Response Parsing Error Codes

DISCOVER_RESPONSE_UNEXPECTED_JSON_STRUCTURE_ERROR = -1200 ///< Discover Response Json is missing expected keys
DISCOVER_RESPONSE_UNEXPECTED_JSON_STRUCTURE_ERROR = -1200, ///< Discover Response Json is missing expected keys

// Jobs Error Codes

JOBS_INVALID_TOPIC_ERROR = -1300 ///< Jobs invalid topic
};

/**
Expand Down Expand Up @@ -314,6 +318,7 @@ namespace awsiotsdk {
const util::String DISCOVER_ACTION_SERVER_ERROR_STRING("Server returned unknown error while performing the discovery action");
const util::String DISCOVER_ACTION_REQUEST_OVERLOAD_STRING("The discovery action is overloading the server, try again after some time");
const util::String DISCOVER_RESPONSE_UNEXPECTED_JSON_STRUCTURE_ERROR_STRING("The discover response JSON is incomplete ");
const util::String JOBS_INVALID_TOPIC_ERROR_STRING("Invalid jobs topic");

/**
* Takes in a Response Code and returns the appropriate error/success string
Expand Down
219 changes: 219 additions & 0 deletions include/jobs/Jobs.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

/**
* @file Jobs.hpp
* @brief
*
*/

#pragma once

#include "mqtt/Client.hpp"

namespace awsiotsdk {
class Jobs {
public:
// Disabling default and copy constructors.
Jobs() = delete; // Delete Default constructor
Jobs(const Jobs &) = delete; // Delete Copy constructor
Jobs(Jobs &&) = default; // Default Move constructor
Jobs &operator=(const Jobs &) & = delete; // Delete Copy assignment operator
Jobs &operator=(Jobs &&) & = default; // Default Move assignment operator

/**
* @brief Create factory method. Returns a unique instance of Jobs
*
* @param p_mqtt_client - mqtt client
* @param qos - QoS
* @param thing_name - Thing name
* @param client_token - Client token for correlating messages (optional)
*
* @return std::unique_ptr<Jobs> pointing to a unique Jobs instance
*/
static std::unique_ptr<Jobs> Create(std::shared_ptr<MqttClient> p_mqtt_client,
mqtt::QoS qos,
const util::String &thing_name,
const util::String &client_token = util::String());

enum JobExecutionTopicType {
JOB_UNRECOGNIZED_TOPIC = 0,
JOB_GET_PENDING_TOPIC,
JOB_START_NEXT_TOPIC,
JOB_DESCRIBE_TOPIC,
JOB_UPDATE_TOPIC,
JOB_NOTIFY_TOPIC,
JOB_NOTIFY_NEXT_TOPIC,
JOB_WILDCARD_TOPIC
};

enum JobExecutionTopicReplyType {
JOB_UNRECOGNIZED_TOPIC_TYPE = 0,
JOB_REQUEST_TYPE,
JOB_ACCEPTED_REPLY_TYPE,
JOB_REJECTED_REPLY_TYPE,
JOB_WILDCARD_REPLY_TYPE
};

enum JobExecutionStatus {
JOB_EXECUTION_STATUS_NOT_SET = 0,
JOB_EXECUTION_QUEUED,
JOB_EXECUTION_IN_PROGRESS,
JOB_EXECUTION_FAILED,
JOB_EXECUTION_SUCCEEDED,
JOB_EXECUTION_CANCELED,
JOB_EXECUTION_REJECTED,
/***
* Used for any status not in the supported list of statuses
*/
JOB_EXECUTION_UNKNOWN_STATUS = 99
};

/**
* @brief GetJobTopic
*
* This function creates a job topic based on the provided parameters.
*
* @param topicType - Jobs topic type
* @param replyType - Topic reply type (optional)
* @param jobId - Job id, can be $next to indicate next queued or in process job, can also be omitted if N/A
*
* @return nullptr on error, unique_ptr pointing to a topic string if successful
*/
std::unique_ptr<Utf8String> GetJobTopic(JobExecutionTopicType topicType,
JobExecutionTopicReplyType replyType = JOB_REQUEST_TYPE,
const util::String &jobId = util::String());

/**
* @brief SendJobsQuery
*
* Send a query to the Jobs service using the provided mqtt client
*
* @param topicType - Jobs topic type for type of query
* @param jobId - Job id, can be $next to indicate next queued or in process job, can also be omitted if N/A
*
* @return ResponseCode indicating status of publish request
*/
ResponseCode SendJobsQuery(JobExecutionTopicType topicType,
const util::String &jobId = util::String());

/**
* @brief SendJobsStartNext
*
* Call Jobs start-next API to start the next pending job execution and trigger response
*
* @param statusDetails - Status details to be associated with started job execution (optional)
*
* @return ResponseCode indicating status of publish request
*/
ResponseCode SendJobsStartNext(const util::Map<util::String, util::String> &statusDetailsMap = util::Map<util::String, util::String>());

/**
* @brief SendJobsDescribe
*
* Send request for job execution details
*
* @param jobId - Job id, can be $next to indicate next queued or in process job, can also
* be omitted to request all pending and in progress job executions
* @param executionNumber - Specific execution number to describe, omit to match latest
* @param includeJobDocument - Flag to indicate whether response should include job document
*
* @return ResponseCode indicating status of publish request
*/
ResponseCode SendJobsDescribe(const util::String &jobId = util::String(),
int64_t executionNumber = 0, // set to 0 to ignore
bool includeJobDocument = true);

/**
* @brief SendJobsUpdate
*
* Send update for specified job
*
* @param jobId - Job id associated with job execution to be updated
* @param status - New job execution status
* @param statusDetailsMap - Status details to be associated with job execution (optional)
* @param expectedVersion - Optional expected current job execution number, error response if mismatched
* @param executionNumber - Specific execution number to update, omit to match latest
* @param includeJobExecutionState - Include job execution state in response (optional)
* @param includeJobDocument - Include job document in response (optional)
*
* @return ResponseCode indicating status of publish request
*/
ResponseCode SendJobsUpdate(const util::String &jobId,
JobExecutionStatus status,
const util::Map<util::String, util::String> &statusDetailsMap = util::Map<util::String, util::String>(),
int64_t expectedVersion = 0, // set to 0 to ignore
int64_t executionNumber = 0, // set to 0 to ignore
bool includeJobExecutionState = false,
bool includeJobDocument = false);

/**
* @brief CreateJobsSubscription
*
* Create a Jobs Subscription instance
*
* @param p_app_handler - Application Handler instance
* @param p_app_handler_data - Data to be passed to application handler. Can be nullptr
* @param topicType - Jobs topic type to subscribe to (defaults to JOB_WILDCARD_TOPIC)
* @param jobId - Job id, can be $next to indicate next queued or in process job, can also be omitted if N/A
* @param replyType - Topic reply type (optional, defaults to JOB_REQUEST_TYPE which omits the reply type in the subscription)
*
* @return shared_ptr Subscription instance
*/
std::shared_ptr<mqtt::Subscription> CreateJobsSubscription(mqtt::Subscription::ApplicationCallbackHandlerPtr p_app_handler,
std::shared_ptr<mqtt::SubscriptionHandlerContextData> p_app_handler_data,
JobExecutionTopicType topicType = JOB_WILDCARD_TOPIC,
JobExecutionTopicReplyType replyType = JOB_REQUEST_TYPE,
const util::String &jobId = util::String());
protected:
std::shared_ptr<MqttClient> p_mqtt_client_;
mqtt::QoS qos_;
util::String thing_name_;
util::String client_token_;

/**
* @brief Jobs constructor
*
* Create Jobs object storing given parameters in created instance
*
* @param p_mqtt_client - mqtt client
* @param qos - QoS
* @param thing_name - Thing name
* @param client_token - Client token for correlating messages (optional)
*/
Jobs(std::shared_ptr<MqttClient> p_mqtt_client,
mqtt::QoS qos,
const util::String &thing_name,
const util::String &client_token);

static bool BaseTopicRequiresJobId(JobExecutionTopicType topicType);
static const util::String GetOperationForBaseTopic(JobExecutionTopicType topicType);
static const util::String GetSuffixForTopicType(JobExecutionTopicReplyType replyType);
static const util::String GetExecutionStatus(JobExecutionStatus status);
static util::String Escape(const util::String &value);
static util::String SerializeStatusDetails(const util::Map<util::String, util::String> &statusDetailsMap);

util::String SerializeJobExecutionUpdatePayload(JobExecutionStatus status,
const util::Map<util::String, util::String> &statusDetailsMap = util::Map<util::String, util::String>(),
int64_t expectedVersion = 0,
int64_t executionNumber = 0,
bool includeJobExecutionState = false,
bool includeJobDocument = false);
util::String SerializeDescribeJobExecutionPayload(int64_t executionNumber = 0,
bool includeJobDocument = true);
util::String SerializeStartNextPendingJobExecutionPayload(const util::Map<util::String, util::String> &statusDetailsMap = util::Map<util::String, util::String>());
util::String SerializeClientTokenPayload();
};
}
58 changes: 46 additions & 12 deletions network/WebSocket/WebSocketConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
#define X_AMZ_DATE "X-Amz-Date"
#define X_AMZ_EXPIRES "X-Amz-Expires"
#define X_AMZ_SECURITY_TOKEN "X-Amz-Security-Token"
#define X_AMZ_CUSTOMAUTHORIZER_NAME "X-Amz-CustomAuthorizer-Name"
#define X_AMZ_CUSTOMAUTHORIZER_SIGNATURE "X-Amz-CustomAuthorizer-Signature"
#define SIGNING_KEY "AWS4"
#define LONG_DATE_FORMAT_STR "%Y%m%dT%H%M%SZ"
#define SIMPLE_DATE_FORMAT_STR "%Y%m%d"
Expand Down Expand Up @@ -99,6 +101,11 @@ namespace awsiotsdk {
bool server_verification_flag)
: openssl_connection_(endpoint, endpoint_port, root_ca_location, tls_handshake_timeout, tls_read_timeout,
tls_write_timeout, server_verification_flag) {
custom_authorizer_name_.clear();
custom_authorizer_signature_.clear();
custom_authorizer_token_name_.clear();
custom_authorizer_token_.clear();

endpoint_ = endpoint;
endpoint_port_ = endpoint_port;
root_ca_location_ = root_ca_location;
Expand All @@ -125,6 +132,21 @@ namespace awsiotsdk {
wss_frame_write_ = std::unique_ptr<wslay_frame_iocb>(new wslay_frame_iocb());
}

WebSocketConnection::WebSocketConnection(util::String endpoint, uint16_t endpoint_port, util::String root_ca_location,
std::chrono::milliseconds tls_handshake_timeout,
std::chrono::milliseconds tls_read_timeout,
std::chrono::milliseconds tls_write_timeout,
util::String custom_authorizer_name, util::String custom_authorizer_signature,
util::String custom_authorizer_token_name, util::String custom_authorizer_token,
bool server_verification_flag)
: WebSocketConnection(endpoint, endpoint_port, root_ca_location, "", "", "", "", tls_handshake_timeout, tls_read_timeout,
tls_write_timeout, false) {
custom_authorizer_name_ = custom_authorizer_name;
custom_authorizer_signature_ = custom_authorizer_signature;
custom_authorizer_token_name_ = custom_authorizer_token_name;
custom_authorizer_token_ = custom_authorizer_token;
}

ResponseCode WebSocketConnection::ConnectInternal() {
// Init Tls
ResponseCode rc = openssl_connection_.Initialize();
Expand Down Expand Up @@ -563,17 +585,12 @@ namespace awsiotsdk {
}

ResponseCode WebSocketConnection::WssHandshake() {
ResponseCode rc;
util::OStringStream stringStream;

// Assuming:
// 1. Ssl socket is ready to do read/write.

// Create canonical query string
util::String canonical_query_string;
canonical_query_string.reserve(CANONICAL_QUERY_BUF_LEN);
ResponseCode rc = InitializeCanonicalQueryString(canonical_query_string);
if (ResponseCode::SUCCESS != rc) {
return rc;
}

// Create Wss handshake Http request
// -> Generate Wss client key
char client_key_buf[WSS_CLIENT_KEY_MAX_LEN + 1];
Expand All @@ -583,15 +600,32 @@ namespace awsiotsdk {
return rc;
}

// -> Assemble Wss Http request
util::OStringStream stringStream;
stringStream << "GET /mqtt?" << canonical_query_string << " " << HTTP_1_1 << "\r\n"
<< "Host: " << endpoint_ << "\r\n"
if (custom_authorizer_name_.empty()) {
// Create canonical query string
util::String canonical_query_string;
canonical_query_string.reserve(CANONICAL_QUERY_BUF_LEN);
rc = InitializeCanonicalQueryString(canonical_query_string);
if (ResponseCode::SUCCESS != rc) {
return rc;
}

// -> Assemble Wss Http request
stringStream << "GET /mqtt?" << canonical_query_string << " " << HTTP_1_1 << "\r\n";
} else {
// -> Assemble Wss Http request
stringStream << "GET /mqtt " << HTTP_1_1 << "\r\n"
<< X_AMZ_CUSTOMAUTHORIZER_NAME << ": " << custom_authorizer_name_ << "\r\n"
<< X_AMZ_CUSTOMAUTHORIZER_SIGNATURE << ": " << custom_authorizer_signature_ << "\r\n"
<< custom_authorizer_token_name_ << ": " << custom_authorizer_token_ << "\r\n";
}

stringStream << "Host: " << endpoint_ << "\r\n"
<< "Connection: " << UPGRADE << "\r\n"
<< "Upgrade: " << WEBSOCKET << "\r\n"
<< "Sec-WebSocket-Version: " << SEC_WEBSOCKET_VERSION_13 << "\r\n"
<< "sec-websocket-key: " << client_key_buf << "\r\n"
<< "Sec-WebSocket-Protocol: " << MQTT_PROTOCOL << "\r\n\r\n";

util::String request_string = stringStream.str();

// Send out request
Expand Down
Loading