ATTAK is a full-stack framework for applications built on auto-scaling components.
We aim to be platform agnostic, beginning with support for Amazon Web Services and then Google Cloud.
Companies like Amazon and Google create amazing building block services, but they don't put much focus on tools combining them. ATTAK blends many services together to create a fully featured application framework, and offers tools for complex local simulation, multi-function unit testing, and much more. It handles the setup for permissions, event triggers, etc., and lets you focus on your application rather than platform specifics.
Pre-alpha.
Roadmap:
- Alpha AWS compatibility summer 2017
- Alpha Google compatibility late 2017
Readme and other documentation may be inaccurate or incomplete.
npm install -g attak
Generate a simple boilerplate project by running:
attak init
Rename .env.example
to .env
and put in values as appropriate for your deploy. The required fields are:
AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY
AWS_ROLE_ARN
Visit the ATTAK ui and run the simulation with the command displayed, which will look like:
attak simulate -i [simulation id]
If you want to run the topology without the UI debugger, simply run
attak simulate
ATTAK will deploy all processors and streams in the topology.
attak deploy
- Simple hello world - Emit some text and reverse it
- HTTP Endpoint - Basic request handler
- Static Hosting - Configurat static hosting and federated auth
- Scheduled Events - Trigger functions on a schedule
- Dynamic processor/stream definitions - Example of dynamic definitions
- Chat Room - Chat room with google auth
- Stream github events - Monitor the GitHub Events API for updates and process the results
An topology is a structure that defines the features of your application. It declares the resources required (serverless functions, message queues, static hosting, authentication, etc.) and ATTAK will automatically setup the permissions and connections necessary.
Note: An ATTAK project is meant to be a node package, so we will
require
the project directory, andindex.js
(or whatever is specified inpackage.json
asmain
) will be loaded as your topology.
Every topology needs a name, but all other parameters are optional. They include:
processors
- event handler functionsschedule
- trigger processors periodicallystreams
- connections between processorsstatic
- static hosting configurationapi
- handle HTTP requests
At its core, a topology is a description of one or more processors and the connections between them. Here's an example of a very simple topology. Processors is a key-value map between processor name and procesor definition, and streams is an array of processor connections.
module.exports = {
name: 'attak-example',
processors: {
reverse: './processors/reverse',
hello_world_spout: './processors/hello_world'
},
streams: [
['hello_world_spout', 'reverse']
]
}
ATTAK can build many different types of applications. For instance, here's a topology that sets up static hosting and an HTTP endpoint, but no streams.
module.exports = {
name: 'static-example',
static: './public',
api: 'requestHandler',
processors: {
requestHandler: function(event, context, callback) {
callback(null, "Amazing HTTP request response")
}
}
}
Processors are the basic unit of computation in ATTAK and represents configuration for a single process. A processor has a main "handler function" that is called in response to events, and it can be used to process data, emit events, invoke other processors, call other handler functions locally, or otherwise run code in response to events.
Handler functions are called with an event, context, and callback like other serverless frameworks. Unlike other frameworks, ATTAK handlers have context.emit(topic, message)
, which allows them to send out data across the topology. They may emit any number of events, and must call the callback when finished. Here's an example handler:
module.exports = {
handler: function(event, context, callback) {
console.log(event)
context.emit('output topic', 'hello ' + event.name)
callback()
}
}
Processors can emit any number of events on any number of topics. Events can optionally contain data of any type.
handler: function(event, context, callback) {
context.emit('processing started')
context.emit('got event', event)
// If emitting a lot, do it asynchronously
context.emit('frequent', 'event', function(err) {
console.log('done emitting')
// The handler must call the callback. We can emit
// errors if we have them
if (err) {
callback(err)
} else {
callback()
}
})
}
Processor handler functions can be defined inline
module.exports = {
processors: {
inlineProcessor: function(event, context, callback) {
// process the event...
event.total += 1
context.emit('topic name', {your: 'data'})
callback()
}
}
}
If all processors are in a single folder, processors can be set to the folder path.
module.exports = {
processors: './processors'
}
If we want to dynamically generate processors, we have two options.
We can define a function that will return an key-value map of processor name to processor definitions:
module.exports = {
processors: function() {
var processors = []
for (var iProcessor=0; iProcessor<10; ++iProcessor) {
processors.push(
function(event, context, callback) {
...your processor logic...
}
)
}
return processors
}
}
Or we can define a function that takes a processor name and returns processor definitions:
module.exports = {
processor: function(name) {
return function(event, context, callback) {
...your processor logic...
}
}
}
The processor's callback argument is important for several reasons
It signals the end of asynchronous execution
Serverless functions (which processors run on) are billed by the millisecond. Calling the callback shuts down process execution, which also shuts down billing.
Note: Emitting data is an asynchronous operation and takes some time. If a processor emits data and then immediately calls the callback, process execution will not halt until all emits are complete.
It allows the handler function to report errors without crashing.
Callbacks are one way to report errors. Error handling is an important topic, so it has it's own section.
Processor logs are recorded and stored according to the platform being used. For example, AWS Lambda logs can be found in AWS CloudWatch.
More logging features and details coming soon.
Debugging issues on a distributed topology is difficult. When a processor has an error we want to figure out what the stack/context/event data were, and we may even want to know the states of other previous processors.
ATTAK has a built in error handling system. Errors are recorded, retries are configurable,
By default, errors are not replayed, but rather recorded into an errors queue. By default the queue lasts for 24 hours, but can be configured to persist into DynamoDB or other datastores.
reported errors
The handler callback can be called with an error as the first parameter to report an error.
handler: (event, context, callback) {
try {
throw new Error 'purposefully caused error'
} catch(err) {
callback(err)
}
}
This will allow ATTAK to record the error and stop execution as fast as possible. Emits that were still processing before the error will be allowed to finish.
Streams setup the flow of data between processors. Behind the scenes, a stream configures ATTAK to send emitted data through a distributed queue (Kinesis Streams, for example) to another processor.
Since streams are simply connections between two processors, they can be defined very simply as an array
streams: [
// Simple stream from source to destination
['sourceProcessor', 'destProcessor'],
// There can be unlimited streams from a given processor
['sourceProcessor', 'otherProcessor'],
// There can also be unlimited streams into a given processor
['otherProcessor', 'finalProcessor'],
// Loops are fine too
['finalProcessor', 'sourceProcessor'],
// Listen for events on a specific topic
['finalProcessor', 'otherProcessor', 'finalProcessorTopic'],
]
Streams can also be defined as an object structure, which allows for more options to be defined:
module.exports = {
name: 'stream-example'
streams: [
{
from: 'processor1',
to: 'processor2',
shards: 50
}
]
}
Streams can be defined dynamically by providing a function for your topology's streams
parameter. The function is called as the topology is loaded, and should return an array of stream definitions (in either array or object format)
streams: function() {
var streams = []
for (var iStream=0; iStream<NUM_PROCESSORS; ++iStream) {
streams.push({
to: `processor${iStream}`,
from: `processor${iStream == NUM_PROCESSORS - 1 ? 0 : iStream + 1}`
})
}
return streams
}
ATTAK can trigger processors on a schedule. Schedules can be define in cron-like pattens or by specifying a call frequency.
module.exports = {
name: 'scheduled',
schedule: [
{
name: 'every_2_minutes_cron',
type: 'cron',
value: '*/2 * * * ? *',
processor: 'onEvent'
},
{
name: 'every_minute_rate',
type: 'rate',
value: '1 minute',
processor: 'onEvent'
}
{
name: 'twice_a_day_rate',
type: 'rate',
value: '12 hours',
processor: 'onEvent'
}
],
processors: {
onEvent: function(event, context, callback) {
console.log("INSIDE SCHEDULED EVENT HANDLER", event)
callback(null, {ok: true})
}
}
}
ATTAK can configure your files to be hosted statically on services like s3. It will also assist with setting up things like federated authentication (sending users through a google authentication flow for AWS Cognito credentials, for instance).
Setting up basic static hosting is easy:
module.exports = {
// Uploads all files in public and makes them publicly viewable
static: './public'
}
This feature is under active development, but an example looks something like:
static: {
// Expose files inside the build directory
dir: './build',
permissions: {
// Setup permissions for the front end to invoke processor1
invoke: ['processor1']
},
auth: {
federated: {
// Setup a federated google auth flow using an api key
google: {
key: '...'
}
}
}
},
ATTAK processors can be used to handle http requests. To do this, specify one of your processors in the api
parameter
module.exports = {
name: 'api-example',
api: 'requestHandler'
processors: {
requestHandler: function(event, context, callback) {
callback null, {
body: 'ATTAK!',
statusCode: 200,
headers: {
'Content-Type': 'text'
}
}
}
}
}