# PY08 - Stream Workers

## Overview

Macrometa GDN allows you to integrate streaming data and take appropriate actions. Most stream processing use cases involve collecting, analyzing, and integrating or acting on data generated during business activities by various sources.

| Stage | Description | 
|  :----:  |    :----:   |
| Collect | Receive or capture data from various data sources.| 
| Analyze | Analyze data to identify interesting patterns and extract information.| 
| Act | Take actions based on the findings. For example, running simple code, calling an external service, or triggering a complex integration.| 
| Integrate | Provide processed data for consumer consumption.| 

You can process streams to perform the following actions with your data:

- Transform data from one format to another. For example, from XML to JSON.
- Enrich data received from a specific source by combining it with databases and services.
- Correlate data by joining multiple streams to create an aggregate stream.
- Clean data by filtering it and by modifying the content in messages. For example, obfuscating sensitive information.
- Derive insights by identifying event patterns in data streams.
- Summarize data with time windows and incremental aggregations.
- Real-time ETL for collections, tailing files, and scraping HTTP endpoints.
- Integrating stream data and trigger actions based on the data. This can be a single service request or a complex enterprise integration flow.

In this tutorial we will build a simple stream worker for finding various heart rate measures like average bpm, minimum bpm etc.

## Pre-requisite

Let's assume your 

- tenant name is an email address
- user password is xxxxx.

if you need to install pyc8, you can run the cell below, otherwise you may skip it.

In [None]:
!pip install pyC8

## 1. Importing Libraries & Define Variables

In [None]:
import json
from c8 import C8Client

# Variables - Queries
fed_url = "gdn.paas.macrometa.io";
email = "email"; # <-- Email goes here
password = "password!"; # password goes here
geo_fabric = "_system";
heart_rates_collection = "HeartRates";
heart_rates_statistics_collection = "HeartRateStatistics";

heart_rate_statistics_worker = "HeartRateStatisticsWorker";
mock_heart_rate_data_generator = "MockHeartRateDataGenerator";

## 2. Connecting to GDN

In [None]:
# Initialize the C8 Data Fabric client.
print("\n ------- CONNECTION SETUP  ------")
print("tenant: {}, geofabric:{}".format(email, geo_fabric))
client = C8Client(protocol='https', host=fed_url, port=443,
                email=email, password=password,
                geofabric=geo_fabric)  

# For the "mytenant" tenant, connect to "_system" fabric as tenant admin.
# This returns an API wrapper for the "_system" fabric on tenant 'mytenant'
# Note that the 'mytenant' tenant should already exist.

tenant = client.tenant(email=email, password=password)

sys_fabric = tenant.useFabric('_system')

## 3. Creating Collections

In [None]:
# Create a new collection if it does not exist
print("\n 3. CREATE_COLLECTION");
if client.has_collection(heart_rates_collection):
    print("Collection {} exists".format(heart_rates_collection))
else:
    client.create_collection(name=heart_rates_collection)
    print("Collection {} Created!".format(heart_rates_collection))
    
if client.has_collection(heart_rates_statistics_collection):
    print("Collection {} exists".format(heart_rates_statistics_collection))
else:
    client.create_collection(name=heart_rates_statistics_collection)
    print("Collection {} Created!".format(heart_rates_statistics_collection))

**Note: for this to work you need to enable Stream from the Macrometa Dashboard. Do this by opening up the dashboard, select collections, find the collection named "HeartRates" and select "enable Stream"**

## 4. Validate Stream Application

### 4.1 Validating heart rate simulator definition

#### Option1: Use mockaroo api to generate the mock heart rate data 

NOTE: If you are using mockaroo API then you need to signup to mockaroo and past the API Key. You can find API key here https://www.mockaroo.com/myaccount

In [None]:
mockarooAPIKey = "XXXX";
dataGeneratorAppDefinition = '''
@App:name('MockHeartRateDataGenerator')
@App:description('Mock data generator by calling mockaroo api for heart rate')
@App:qlVersion('2')


CREATE TRIGGER HeartRateDataGeneratorTrigger WITH ( interval = 10 sec );

CREATE TABLE HeartRates (name string, bpm int);

CREATE SINK MockarooServiceCallSink WITH (type='http-call', sink.id='mockaroo-service', publisher.url='https://api.mockaroo.com/api/a6e130b0?count=10&key={}', map.type='json', method='GET') (triggered_time string);

CREATE SOURCE MockarooServiceResponseSink WITH (type='http-call-response', sink.id='mockaroo-service', map.type='json', http.status.code='200') (name string, bpm int);


INSERT INTO MockarooServiceCallSink
SELECT time:currentTimestamp() as triggered_time 
FROM HeartRateDataGeneratorTrigger;

-- Note: Consume data received from the external service
@info(name = 'ConsumeProcessedData')
INSERT INTO HeartRates
SELECT name, bpm
FROM MockarooServiceResponseSink;
'''.format(mockarooAPIKey);

#### Option2: Use custom stream worker to generate the heart rate data 

In [None]:
dataGeneratorAppDefinition = '''
@App:name("MockHeartRateDataGenerator")
@App:qlVersion("2")

CREATE TRIGGER HeartRateDataGeneratorTrigger WITH ( interval = 10 sec );

CREATE TABLE HeartRates (name string, bpm int);


-- Note: Generating random bpm and name 
@info(name = 'ConsumeProcessedData')
INSERT INTO HeartRates
SELECT 
js:eval("['Vasili', 'Rivalee', 'Betty', 'Jennifer', 'Alane', 'Sarena', 'Bruno', 'Carolee', 'Emmott', 'Andre'][Math.floor(Math.random() * 10)]","string") as name,
js:eval("Math.floor(Math.random() * 40) + 40","int") as bpm
FROM HeartRateDataGeneratorTrigger;
'''

### 4.2 Validating stream worker

In [None]:
statisticAppDefinition = '''
@App:name('HeartRateStatisticsWorker')
@App:description("Calculate the statistics for heart rates")
@App:qlVersion("2")

CREATE SOURCE HeartRates WITH (type = 'database', collection = "HeartRates", collection.type="doc", replication.type="global", map.type='json') (name string, bpm int);

CREATE TABLE HeartRateStatistics (eventTime long, name string, minBpm int, maxBpm int, avgBpm double);


INSERT INTO HeartRateStatistics
SELECT 
    eventTimestamp() as eventTime,
    name as name,
    min(bpm) as minBpm,
    max(bpm) as maxBpm,
    avg(bpm) as avgBpm
FROM HeartRates window sliding_time(1 min)
group by name
''';


print("\n 4. VALIDATE_STREAM_WORKERS ... region {}".format(fed_url));

print("--- Validating Stream Application Definition");
result = client.validate_stream_app(data=statisticAppDefinition);

print("--- Validated HeartRateStatisticsWorker Stream Application Definition {} ---".format(result));
result = client.validate_stream_app(data = dataGeneratorAppDefinition);

print("--- Validated MockHeartRateDataGenerator Stream Application Definition {} ---".format(result));

## 5. Save Stream Application

In [None]:
print("\n 5. CREATE_STREAM_WORKERS ... region {}".format(fed_url));
print("--- Creating Stream Application");
#The stream app will be created by default in the local region. Optionally, you can send dclist to deploy stream
result = client.create_stream_app(data=statisticAppDefinition);

print("--- Created Stream Application {} ---".format(heart_rate_statistics_worker));
result = client.create_stream_app(data=dataGeneratorAppDefinition);

print("--- Created Stream Application {} ---".format(mock_heart_rate_data_generator));

## 6. Publish Stream Application

In [None]:
print("\n6. Activating Stream Workers");
client.activate_stream_app(heart_rate_statistics_worker, True);
client.activate_stream_app(mock_heart_rate_data_generator, True);
print("6. Activated Stream Workers");

## 7. Checking HeartRates documents using C8QL

In [None]:
# Please wait for 1 minute after executing above step as we used sliding window of 1 minute
cursor = client.execute_query('FOR doc IN HeartRates LIMIT 0, 5 RETURN doc')

docs = [document for document in cursor]

print(json.dumps(docs, indent=4))

## 8. Checking HeartRateStatistics documents using C8QL

In [None]:
cursor = client.execute_query('FOR doc IN HeartRateStatistics LIMIT 0, 5 RETURN doc')

docs = [document for document in cursor]

print(json.dumps(docs, indent=4))

## 9.  Delete StreamApp and Collections

In [None]:
print("\n9. DELETE_DATA");
client.delete_stream_app(heart_rate_statistics_worker);
client.delete_stream_app(mock_heart_rate_data_generator);
client.delete_collection(heart_rates_collection);
client.delete_collection(heart_rates_statistics_collection);
print("StreamApp and Collection deleted");

## Section Completed!

TBC...