### 10-Minute Quick Demo of Diaspora Event SDK and Action Provider Features

#### SDK Features to Demo
| Section | SDK Features |
|---------|--------------|
| 1.1     | Register and List topics |
| 1.2     | List and update and topic configs |
| 1.3     | Produce and consume messages |
| 1.4     | Create and update triggers, get log events |

#### AP Features to Demo
| Section | AP Features |
|---------|-------------|
| 2.1     | Produce messages to AP with keys |
| 2.2     | Consume messages since a timestamp |

For a comprehensive list of SDK features, visit the [Diaspora SDK repository](https://github.com/globus-labs/diaspora-event-sdk/blob/main/DiasporaDemo.ipynb).

For a full list of AP features, refer to the [Diaspora Service documentation](https://haochenpan.github.io/diaspora-service/main/ap/examples/).

#### SDK requirement
`%pip install 'diaspora-event-sdk[kafka-python]'`

#### AP requirement (already satisfied through SDK requirements)
`%pip install globus-sdk`

### 0.1 Import libraries and print SDK versions

In [None]:
"""Import necessary libraries for Demo."""
from __future__ import annotations

import base64
import json
import os
import random
import string
import time
import uuid

import globus_sdk
import globus_sdk.scopes
from diaspora_event_sdk import block_until_ready
from diaspora_event_sdk import Client
from diaspora_event_sdk import KafkaConsumer
from diaspora_event_sdk import KafkaProducer
from diaspora_event_sdk.version import __version__ as diaspora_sdk_version
from globus_sdk import __version__ as globus_sdk_version

In [None]:
c = Client()
print('Globus SDK version:', globus_sdk_version)
print('Diaspora Event SDK version:', diaspora_sdk_version)
print("User's OpenID:", c.subject_openid)

### 0.2 Create a cluster authentication credential and verify cluster connection

In [None]:
print(c.create_key())

In [None]:
assert block_until_ready()

### 1.1 Register Topics

In [None]:
topic1 = 'topic-' + c.subject_openid[-12:]
print('Topic name:', topic1)
print(c.register_topic(topic1))

In [None]:
topic2_suffix = ''.join(random.choice(string.ascii_uppercase +
                                      string.digits) for _ in range(8))
topic2 = 'topic-' + topic2_suffix
print('Topic name:', topic2)
print(c.register_topic(topic2))

In [None]:
print(c.list_topics())

### 1.2 List and Update Topic Configs

In [None]:
print(c.get_topic_configs(topic2))

In [None]:
configs = {
    'delete.retention.ms': 43200000,
    'retention.ms': 43200000,
}
print(c.update_topic_configs(topic2, configs))

In [None]:
print(c.update_topic_partitions(topic2, 2))

### 1.3 Produce and Consume Messages

### 1.3.1 Synchronously produce messages to a registered topic.

In [None]:
producer = KafkaProducer()
future = producer.send(
    topic1, {'message': 'Synchronous message 1 from Diaspora SDK'})
print(future.get(timeout=10))
future = producer.send(
    topic1, {'message': 'Synchronous message 2 from Diaspora SDK'})
print(future.get(timeout=10))

### 1.3.2 Asynchronously produce batched messages to a registered topic.

In [None]:
producer = KafkaProducer()
producer.send(topic1, {'message': 'Asynchronous message 3 from Diaspora SDK'})
producer.send(topic1, {'message': 'Asynchronous message 4 from Diaspora SDK'})
producer.flush()

### 1.3.3 Consume messages from the beginning of the topic (need to interrupt). 

In [None]:
consumer = KafkaConsumer(topic1, auto_offset_reset='earliest')
for message in consumer:
    print(message)

## 1.4 Create and Update triggers, Get Execution Logs

### 1.4.1 Create a deployment package

In [None]:
trigger_package = f'{os.getcwd()}/my_deployment_package'
trigger_file = 'lambda_function.py'
trigger_name_in_def='lambda_handler'

os.system(f'mkdir {trigger_package}')

### 1.4.2 Save code to `trigger_package/trigger_file`

In [None]:
trigger_code = f'''import base64

def {trigger_name_in_def}(event, context):
    try:
        print('EVENT:')
        print(event)

        for partition, records in event['records'].items():
            for record in records:
                print("topic:", record['topic'],
                      "partition:", record['partition'],
                      "offset:", record['offset'],
                      "key:", record.get("key", "NOT-SET"),
                      "value:", base64.b64decode(record['value']))
    except Exception as e:
        print("ERROR:", e)
'''

with open(os.path.join(trigger_package, trigger_file), 'w') as f:
  f.write(trigger_code)

### 1.4.3 Zip the code in `trigger_file`

In [None]:
def get_zipped_code(lambda_function_package):  # noqa: D103
    print(f'Zipping {lambda_function_package}')
    os.system(f'cd {lambda_function_package} && zip -r {lambda_function_package}.zip .')  # noqa: E501
    with open(f'{lambda_function_package}.zip', 'rb') as f:
        return base64.b64encode(f.read()).decode('utf-8')


zipped_code = get_zipped_code(trigger_package)

### 1.4.4 Inspect trigger info

Note: one topic can be associated with multiple triggers

`topic_name`: which topic to consume from

`function_name`: along with topic_name, used to identify and delete the function

`function_runtime`: a function runtime like `python3.11` and `python3.12`

`function_handler`: py-file-name.function-name

`function_code_zipped`: serialized function code


In [None]:
topic_name = 'topic-' + c.subject_openid[-12:]
trigger_name = f'lambda{random.randint(100, 999)}'
trigger_runtime = 'python3.11'
trigger_handler = f'{trigger_file.split(".")[0]}.{trigger_name_in_def}'
print(c.register_topic(topic_name))
print()
print('topic name:\t\t', topic_name)
print('trigger name:\t\t', trigger_name)
print('trigger runtime:\t', trigger_runtime)
print('trigger handler:\t', trigger_handler)
print('zipped trigger code:\t', zipped_code)
print('length of the code:\t', len(zipped_code))

### 1.4.5 Create the trigger

Note: the call blocks for a few seconds to wait for creation results or error message.

Default values are listed in the table below, note that if the runtime is `python3.11` or `python3.12`, a layer with Globus SDK and Diaspora SDK will be attached.

[Trigger parameter syntax (`Code`, etc.)](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda/client/create_function.html)

| Trigger Parameter | Default Value                      |
|--------------------|------------------------------------|
| Runtime            | python3.11                         |
| Handler            | lambda_function.lambda_handler     |
| Code               | {}                                 |
| Timeout            | 30                                 |
| MemorySize         | 128                                |
| Environment        | {}                                 |
| EphemeralStorage   | {'Size': 512}                      |
| Layers             | []                                 |



[Invocation parameter syntax (`FilterCriteria`, etc.)](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-syntax)

| Invocation Parameter              | Default Value |
|--------------------------------|---------------|
| Enabled                        | True          |
| BatchSize                      | 1             |
| FilterCriteria                 | {}            |
| MaximumBatchingWindowInSeconds | 500ms         |
| StartingPosition               | LATEST        |


In [None]:
'''
    Create a new trigger that response to events in a registered topic.
    Note: the creation call takes around 10 seconds to return.
    Note: for Python 3.12 runtime, use 
        arn:aws:lambda:us-east-1:845889416464:layer:globus-diaspora-layer312:1
        to enable the globus SDK in the trigger.
    expected return (first time): {"status": "success", "message": ...}
    expected return (subsequent): {"status": "error", "message": ...}
'''

trigger_configs = {
    'Runtime': trigger_runtime,
    'Handler': trigger_handler,
    'Code': {'ZipFile': zipped_code},
    'Timeout': 3,
    'MemorySize': 128,
    'Environment': {},
    'EphemeralStorage': {'Size': 512},
    'Layers': ['arn:aws:lambda:us-east-1:845889416464:layer:globus-diaspora-layer311:1'],  # noqa: E501
}
invoke_configs = {
    'Enabled': True,
    'BatchSize': 1,
    'StartingPosition': 'LATEST',
}
print(c.create_trigger(topic_name, trigger_name, 
                       trigger_configs, invoke_configs))

### 1.4.6 Produce events to invoke the trigger and verify invocations through inspecting the latest log stream

In [None]:
'''
    Synchronously produce messages to a registered topic to invoke triggers
    expected return: 
        multiple RecordMetadata(...)
'''

producer = KafkaProducer()
future = producer.send(
    topic_name, {'message': 'Synchronous message 3 from Diaspora SDK'})
print(future.get(timeout=10))
future = producer.send(
    topic_name, {'message': 'Synchronous message 4 from Diaspora SDK'})
print(future.get(timeout=10))

In [None]:
'''
    Get the list of log streams belong to the trigger.
    Note: recent_log_stream_name may not contain logs of all invocations,
      as some logs may exist in other streams.
    expected return:  {"status": "success", "streams": [...]}
'''

streams_response = c.list_log_streams(trigger_name)
print(streams_response)
recent_log_stream_name = streams_response['streams'][0]['logStreamName']

In [None]:
'''
    Retrieve the events in a particular log stream.
    Note: this log stream may not contain logs of all trigger invocations,
      as some logs may exist in other streams.
    expected return:  {"status": "success", "events": [...]}
'''
print(c.get_log_events(trigger_name, recent_log_stream_name))

### 1.4.7 Trigger deletion call

In [None]:
'''
    Delete trigger by (topic_name, trigger_name)
    expected return: {"status": "success", "message": ...}
'''
print(c.delete_trigger(topic_name, trigger_name))

In [None]:
'''
    List all triggered created by the user.
    expected return (if all triggers are deleted): None 
    expected return (otherwise): {'function_name': ..., 'function_detail': ..., 'triggers': [...]}
'''  # noqa: E501
for function in c.list_triggers()['triggers']:
    print('trigger name:', function['function_name'], '\n',
          'trigger handler name:', function['function_detail']['Configuration']['Handler'], '\n',  # noqa: E501
          'trigger uuid:', function['triggers'][0]['UUID'], '\n',
          'trigger topic:', function['triggers'][0]['Topics'][0], '\n')

### 1.4.8 Delete the local lambda package

In [None]:
os.system(f'rm -rf {trigger_package}')
os.system(f'rm {trigger_package}.zip')

## 2.1 Produce messages to AP with keys

In [None]:
# ID of this tutorial notebook as registered with Globus Auth
CLIENT_ID = 'f794186b-f330-4595-b6c6-9c9d3e903e47'

# Do a native app authentication flow to get tokens that allow us
# to interact with the Globus Flows service

scopes = [
    'openid',
    'profile',
    'email',
    globus_sdk.FlowsClient.scopes.manage_flows,
    globus_sdk.FlowsClient.scopes.run_manage,
]
native_auth_client = globus_sdk.NativeAppAuthClient(CLIENT_ID)
native_auth_client.oauth2_start_flow(requested_scopes=scopes)
print(f'Login Here:\n\n{native_auth_client.oauth2_get_authorize_url()}')

auth_code = input('Authorization Code: ')
response = native_auth_client.oauth2_exchange_code_for_tokens(auth_code)

tokens = response.by_resource_server
print(json.dumps(tokens, indent=2))

flows_authorizer = globus_sdk.AccessTokenAuthorizer(
    access_token=tokens['flows.globus.org']['access_token'],
)
flows_client = globus_sdk.FlowsClient(authorizer=flows_authorizer)

In [None]:
# Create an Auth client so we can look up identities
auth_authorizer = globus_sdk.AccessTokenAuthorizer(
    access_token=tokens['auth.globus.org']['access_token'],
)
ac = globus_sdk.AuthClient(authorizer=auth_authorizer)

# Get the user's primary identity
primary_identity = ac.oauth2_userinfo()
identity_id = primary_identity['sub']

print(f"Username: {primary_identity['preferred_username']}")
print(f'ID: {identity_id}')
print('Topic to produce/consume:', topic1)

### 2.1.1 Select a Topic

In [None]:
action_url = 'https://diaspora-action-provider.ml22sevubfnks.us-east-1.cs.amazonlightsail.com/'

flow_definition = {
    'Comment': 'Publish messages to Diaspora Event Fabric',
    'StartAt': 'PublishMessages',
    'States': {
        'PublishMessages': {
            'Comment': 'Send messages to a specified topic in Diaspora',
            'Type': 'Action',
            'ActionUrl': action_url,
            'Parameters': {
                'action.$': '$.input.action',
                'topic.$': '$.input.topic',
                'msgs.$': '$.input.msgs',
                'keys.$': '$.input.keys',
            },
            'ResultPath': '$.PublishMessages',
            'End': True,
        },
    },
}

In [None]:
flow_title = f'Diapora-AP-Flow-{str(uuid.uuid4())[:4]}'
flow = flows_client.create_flow(
    title=flow_title,
    definition=flow_definition,
    # definition=flow_definition2,
    input_schema={},
)
flow_id = flow['id']
flow_scope = globus_sdk.SpecificFlowClient(flow_id).scopes.make_mutable('user')
print(f"Successfully created flow: '{flow_title} (ID: {flow_id})")
print(f'View the flow in the Web App: https://app.globus.org/flows/{flow_id}')

if flow_id not in tokens:
    # Do a native app authentication flow and get tokens that
    # include the newly deployed flow scope
    native_auth_client = globus_sdk.NativeAppAuthClient(CLIENT_ID)
    native_auth_client.oauth2_start_flow(requested_scopes=flow_scope)
    print(f'Login Here:\n\n{native_auth_client.oauth2_get_authorize_url()}')

    # Authenticate and come back with your authorization code;
    # paste it into the prompt below.
    auth_code = input('Authorization Code: ')
    token_response = native_auth_client.oauth2_exchange_code_for_tokens(
        auth_code,
    )

    # Save the new token in a place where the flows client can retrieve it.
    tokens[flow_id] = token_response.by_resource_server[flow_id]

    # These are the saved scopes for the flow
    print(json.dumps(tokens, indent=2))

In [None]:
flow_input = {
    'input': {
        'action': 'produce',
        'topic': topic1,
        'msgs': [
            {'content1': 'hello world1'},
            {'content2': 'hello world2'},
            {'content3': 'hello world3'},
        ],
        'keys': [
            'my-key-123',
            'my-key-456',
            'my-key-789',
        ],
    },
}

In [None]:
# Get a client for the flow
specific_flow_authorizer = globus_sdk.AccessTokenAuthorizer(
    access_token=tokens[flow_id]['access_token'],
)
print(tokens[flow_id]['access_token'])
specific_flow_client = globus_sdk.SpecificFlowClient(
    flow_id=flow_id,
    authorizer=specific_flow_authorizer,
)

# Run the flow
# Set a descriptive label for this flow run
run_label = f"Diaspora AP Flow by {primary_identity['preferred_username']}"
run = specific_flow_client.run_flow(
    body=flow_input,
    label=run_label,
    tags=['tutorial', 'diaspora'],
)

# Get run details
run_id = run['run_id']
run_status = run['status']
print('This flow can be monitored in the Web App:')
print(f'https://app.globus.org/runs/{run_id}')
print(f'Flow run started with ID: {run_id} - Status: {run_status}')

# Poll the Flow service to check on the status of the flow
while run_status == 'ACTIVE':
    time.sleep(5)
    run = flows_client.get_run(run_id)
    run_status = run['status']
    print(f'Run status: {run_status}')

# Run completed
print(json.dumps(run.data, indent=2))

## 2.2 Consume messages since a timestamp

In [None]:
flow_definition_b = {
    'Comment': 'Consume messages to Diaspora Event Fabric',
    'StartAt': 'ConsumeMessages',
    'States': {
        'ConsumeMessages': {
            'Comment': 'Receive messages from a specified topic in Diaspora',
            'Type': 'Action',
            'ActionUrl': action_url,
            'Parameters': {
                'action.$': '$.input.action',
                'topic.$': '$.input.topic',
                'ts.$': '$.input.ts',
            },
            'ResultPath': '$.ConsumeMessages',
            'End': True,
        },
    },
}

flows_client.update_flow(flow_id,
                         definition=flow_definition_b,
                         input_schema={},
                         )

In [None]:
flow_input_b = {
    'input': {'action': 'consume', 'topic': topic1, 'ts': 1715930522000},
}

In [None]:
# Get a client for the flow
specific_flow_authorizer = globus_sdk.AccessTokenAuthorizer(
    access_token=tokens[flow_id]['access_token'],
)
print(tokens[flow_id]['access_token'])
specific_flow_client = globus_sdk.SpecificFlowClient(
    flow_id=flow_id,
    authorizer=specific_flow_authorizer,
)

# Run the flow
# Set a descriptive label for this flow run
run_label = f"Diaspora AP Flow by {primary_identity['preferred_username']}"
run = specific_flow_client.run_flow(
    body=flow_input_b,
    label=run_label,
    tags=['tutorial', 'diaspora'],
)

# Get run details
run_id = run['run_id']
run_status = run['status']
print('This flow can be monitored in the Web App:')
print(f'https://app.globus.org/runs/{run_id}')
print(f'Flow run started with ID: {run_id} - Status: {run_status}')

# Poll the Flow service to check on the status of the flow
while run_status == 'ACTIVE':
    time.sleep(5)
    run = flows_client.get_run(run_id)
    run_status = run['status']
    print(f'Run status: {run_status}')

# Run completed
print(json.dumps(run.data, indent=2))

## Clean up: Reset `topic1` and `topic2`, Unregister `topic2`

In [None]:
print(c.reset_topic(topic1))
print(c.reset_topic(topic2))
print(c.unregister_topic(topic2))

In [None]:
print(c.list_topics())