# Diaspora Event SDK - Demo

[GitHub Repository](https://github.com/globus-labs/diaspora-event-sdk/tree/main)

[QuickStart Guide](https://github.com/globus-labs/diaspora-event-sdk/blob/main/docs/quickstart.md)

#### Available Methods to Web Service APIs

| Function and Arguments | Description |
|------------------------|-------------|
| **MSK Credential Management** | |
| `create_key()` | Revokes existing keys, generates a new key, and updates the token storage with the newly created key and the Diaspora endpoint. |
| **MSK Topic Management** | |
| `list_topics()` | Returns a list of topics currently registered under the user's account. |
| `register_topic(topic)` | Registers a new topic the user's account with permissions to read, write, and describe the topic. |
| `unregister_topic(topic)` | Unregisters a topic from a user's account, but all existing events within the topic are unaffected. |
| `get_topic_configs(topic)` | Retrieves the current configurations for a registered topic. |
| `update_topic_configs(topic, configs)` | Updates the configurations for a registered topic. |
| `update_topic_partitions(topic, new_partitions)` | Increases the number of partitions for a given topic to the specified new partition count. |
| `reset_topic(topic)` | Deletes and recreates the topic, removing all messages and restoring the topic to the default configurations. |
| `register_topic_for_user(topic, user)` | Authorizes another user to access a registered topic under the invoker's account. |
| `unregister_topic_for_user(topic, user)` | Removes access permissions for another user from a registered topic under the invoker's account. |
| `list_topic_users(topic)` | Returns a list of users that have access to the topic. |
| **Lambda Function Management** | |
| `list_triggers()` | Retrieves a list of triggers associated created under the user's account, showing each trigger's configurations and UUID. |
| `create_trigger(topic, trigger, trigger_configs, invoke_configs)` | Creates a new trigger under the user's account with specific function and invocation configurations. |
| `delete_trigger(topic, trigger)` | Deletes a trigger and related AWS resources, while the associated topic remains unaffected.|
| `update_trigger(trigger_uuid, invoke_configs)` | Updates invocation configurations of an existing trigger, identified by its unique trigger UUID. |
| `list_log_streams(trigger)` | List log streams of a trigger under the user's account.|
| `get_log_events(trigger, stream)` | Get events in a particular log stream of a trigger under the user's account.|

## Install the SDK and dependencies 

In [None]:
%pip install -e '.[kafka-python]'

In [None]:
import base64
import json
import os
import random
import time
from pprint import pprint

from diaspora_event_sdk import Client as GlobusClient
from diaspora_event_sdk import KafkaConsumer, KafkaProducer, block_until_ready

## Perform client login and print the user's OpenID

In [None]:
c = GlobusClient()
print("User's OpenID:", c.subject_openid)

## 1 Create a cluster authentication credential and verify cluster connection

### 1.1 Create and retrieve a cluster authentication credential

In [None]:
"""
    call the Web Service API and store credentials and endpoint address in a local token storage.
    Note: call `create_key` at another machine invalidate the retrieved credential at this machine
    expected return: {'access_key': ..., 'secret_key': ..., 'endpoint': ...}
"""
print(c.create_key())

### 1.2 Verify cluster connection

In [None]:
"""
    Internally, this method creates a producer and consumer using the retrieved credential.
    It block until it produced a message and consumed it subsequently.
    Note: it is normal to see a few error messages after calling create_key() because the key has not been ready.
    expected return: None (the method blocks until the connection credential is ready)
"""

assert block_until_ready()

## 2 Demonstrate topic management APIs, SDK producing, and SDK consuming

### 2.1 Register a topic and list all topics registered.

In [None]:
"""
    Register a topic -- the user get read, write, and describe access to it.
    expected return (first time): {"status": "success", "message": ...}
    expected return (subsequent): {"status": "no-op", "message": ...}
"""
topic = "topic" + c.subject_openid[-12:]
print(c.register_topic(topic))

"""
    List all topics that the user has access to.
    expected return: {"status": "success", "topics": [...]}
"""
print(c.list_topics())

### 2.2 Get configurations of a registered topic

In [None]:
"""
    For a registered topic, get its configurations.
    If the topic is not registered (or registered by others), return an error message.
    Explanations of these configurations: https://kafka.apache.org/documentation/#topicconfigs
    expected return: { "status": "success", "configs": {...}}
"""
print(c.get_topic_configs(topic))

### 2.3 Update topic configurations

In [None]:
"""
    Update one or more topic configurations, if the topic has been registered.
    If the topic is not registered (or registered by others), return an error message.
    expected return: { "status": "success", "before": {...},  "after": {...}}
"""
configs = {'min.insync.replicas': 1}
print(c.update_topic_configs(topic, configs))

In [None]:
"""
    Adjust more than one configuration in a single update_topic_configs request.
    expected return: { "status": "success", "before": {...},  "after": {...}}
"""
configs = {
    'delete.retention.ms': 43200000,
    'retention.ms': 43200000
}
print(c.update_topic_configs(topic, configs))

### 2.4 Update topic partitions (default=1)

In [None]:
"""
    Increase the number of partitions for a registered topic.
    If the topic is not registered, return an error. 
    If the new_partitions argument is no larger than the current number of partitions, return an error.
    expected return: { "status": "success" }
"""
print(c.update_topic_partitions(topic, 2))

### 2.4.2 Restore topic configs

In [None]:
"""
    Deletes and recreates the topic, removing all messages and restoring the topic to the default configurations while user access is not affected.
    If the topic is not registered, return an error. 
    Note: under repeated invocations, the topic may marked for deletion but not get deleted for a while. Wait and then call this method again.
    expected return: { "status": "success", "message": "topic deleted and re-created with default configs" }
"""
print(c.reset_topic(topic))

### 2.5 Grant/List/Revoke access of another user

In [None]:
"""
    Authorize another user to access the topic, if the topic has been register by the invoking user.
    expected return (first time): { "status": "success", "message": ...}
    expected return (subsequent): { "status": "no-op", "message": ...}

"""
print(c.grant_user_access(topic, "e2a8169b-feef-4d56-8eba-ab12747bee04"))   # a virtual user

In [None]:
"""
    Returns a list of users that have access to the topic.
    expected return (subsequent): { "status": "success", "users": [...]}
"""
print(c.list_topic_users(topic)) 

In [None]:
"""
    Revokes another user to access the topic, if the topic has been register by the invoking user.
    expected return (first time): { "status": "success", "message": ...}
    expected return (subsequent): { "status": "no-op", "message": ...}
"""
print(c.revoke_user_access(topic, "e2a8169b-feef-4d56-8eba-ab12747bee04"))  # a virtual user

In [None]:
producer = KafkaProducer()  
print(topic)

try:
    future = producer.send(
        topic, {'message': 'Synchronous message 1 from Diaspora SDK'})
    print(future.get(timeout=10))
    future = producer.send(
        topic, {'message': 'Synchronous message 2 from Diaspora SDK'})
    print(future.get(timeout=10))
except Exception as e:
    print(f"Failed to send message: {e}")

### 2.6 SDK Producing

In [None]:
"""
    Synchronously produce messages to a registered topic.
    expected return: 
        multiple RecordMetadata(...)
"""

producer = KafkaProducer()
future = producer.send(
    topic, {'message': 'Synchronous message 1 from Diaspora SDK'})
print(future.get(timeout=10))
future = producer.send(
    topic, {'message': 'Synchronous message 2 from Diaspora SDK'})
print(future.get(timeout=10))

In [None]:
"""
    Asynchronously produce batched messages to a registered topic.
    See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
    for more producer settings and usage.
    expected return: None
"""
producer = KafkaProducer()
producer.send(topic, {'message': 'Asynchronous message 3 from Diaspora SDK'})
producer.send(topic, {'message': 'Asynchronous message 4 from Diaspora SDK'})
producer.flush()

### 2.7 SDK Consuming

In [None]:
"""
    Consume produced messages from the beginning of the topic.
    The consumer exits in three seconds.
    If the topic has more than one partitions, messages may arrive out of order.
    See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html 
    for more consumer settings and usage.
    expected return:
        multiple {'message': ...}
"""
consumer = KafkaConsumer(topic, auto_offset_reset='earliest')
start_time = time.time()
try:
    while True:
        messages = consumer.poll(timeout_ms=100)
        for tp, msgs in messages.items():
            for message in msgs:
                print(json.loads(message.value.decode("utf-8")))

        if time.time() - start_time > 3:
            # print("3 seconds have passed. Exiting...")
            break
finally:
    consumer.close()

### 2.8 Unregister topic, list all topics

In [None]:
"""
    Unregister a topic (i.e., remove user access), leave all existing events in the topic unaffected.
    expected return (first time): { "status": "success", "message": ...}
    expected return (subsequent): { "status": "no-op", "message": ...}
"""
print(c.unregister_topic(topic))

In [None]:
"""
    List all topics that the user has access to.
    expected return: {"status": "success", "topics": [...]}
"""
print(c.list_topics())

## 3 Demonstrate trigger management APIs

### 3.0 Create a deployment package

In [None]:
trigger_package = f"{os.getcwd()}/my_deployment_package" # require abs path here
trigger_file = "lambda_function.py"
trigger_name_in_def="lambda_handler"
os.system(f"mkdir {trigger_package}")

### 3.1 Save code to `trigger_package/trigger_file`

In [None]:
trigger_code = f"""import base64

def {trigger_name_in_def}(event, context):
    try:
        print('EVENT:')
        print(event)

        for partition, records in event['records'].items():
            for record in records:
                print("topic:", record['topic'],
                      "partition:", record['partition'],
                      "offset:", record['offset'],
                      "key:", record.get("key", "NOT-SET"),
                      "value:", base64.b64decode(record['value']))
    except Exception as e:
        print("ERROR:", e)
"""

with open(os.path.join(trigger_package, trigger_file), "w") as f:
  f.write(trigger_code)

### 3.2 Zip the code at `trigger_file`

In [None]:
def get_zipped_code(lambda_function_package):
    print(f"Zipping {lambda_function_package}")
    os.system(f"cd {lambda_function_package} && zip -r {lambda_function_package}.zip .")
    with open(f"{lambda_function_package}.zip", "rb") as f:
        return base64.b64encode(f.read()).decode('utf-8')
zipped_code = get_zipped_code(trigger_package)

### 3.3 Trigger info

Note: one topic can be associated with multiple functions

`topic_name`: which topic to consume from

`function_name`: along with topic_name, used to identify and delete the function

`function_runtime`: a function runtime like `python3.11` and `python3.12`

`function_handler`: py-file-name.function-name

`function_code_zipped`: serialized function code


In [None]:

topic_name = "topic" + c.subject_openid[-12:]
trigger_name = f"lambda{random.randint(100, 999)}"
trigger_runtime = "python3.11"
trigger_handler = f"{trigger_file.split('.')[0]}.{trigger_name_in_def}"
print(c.register_topic(topic_name))
print()
print("topic name:\t\t", topic_name)
print("trigger name:\t\t", trigger_name)
print("trigger runtime:\t", trigger_runtime)
print("trigger handler:\t", trigger_handler)
print("zipped trigger code:\t", zipped_code)
print("length of the code:\t", len(zipped_code))

### 3.4 Trigger creation call

Note: the call blocks for a few seconds to wait for creation results or error message.

Default values are listed in the table below, note that if the runtime is `python3.11` or `python3.12`, a layer with Globus SDK and Diaspora SDK will be attached.

[Trigger parameter syntax (`Code`, etc.)](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda/client/create_function.html)

| Trigger Parameter | Default Value                      |
|--------------------|------------------------------------|
| Runtime            | python3.11                         |
| Handler            | lambda_function.lambda_handler     |
| Code               | {}                                 |
| Timeout            | 30                                 |
| MemorySize         | 128                                |
| Environment        | {}                                 |
| EphemeralStorage   | {'Size': 512}                      |
| Layers             | []                                 |



[Invocation parameter syntax (`FilterCriteria`, etc.)](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-syntax)

| Invocation Parameter              | Default Value |
|--------------------------------|---------------|
| Enabled                        | True          |
| BatchSize                      | 1             |
| FilterCriteria                 | {}            |
| MaximumBatchingWindowInSeconds | 500ms         |
| StartingPosition               | LATEST        |


In [None]:
"""
    Create a new trigger that response to events in a registered topic.
    Note: the creation call takes around 10 seconds to return.
    Note: for Python 3.12 runtime, use 
        arn:aws:lambda:us-east-1:845889416464:layer:globus-diaspora-layer312:1
        to enable the globus SDK in the trigger.
    expected return (first time): {"status": "success", "message": "Trigger creation started."}
    expected return (subsequent): {"status": "error", "message": ...}
"""

trigger_configs = {
    "Runtime": trigger_runtime,
    "Handler": trigger_handler,
    "Code": {'ZipFile': zipped_code},
    "Timeout": 3,
    "MemorySize": 128,
    "Environment": {},
    "EphemeralStorage": {'Size': 512},
    "Layers": ["arn:aws:lambda:us-east-1:845889416464:layer:globus-diaspora-layer311:1"]
}
invoke_configs = {
    "Enabled": True,
    "BatchSize": 1,
    "StartingPosition": "LATEST"
}
print(c.create_trigger(topic_name, trigger_name, trigger_configs, invoke_configs))

### 3.5 List created functions


In [None]:
"""
    List all triggered created by the user.
    Note: the print function below highlights the trigger name, 
        handler name, uuid, and topic it taps on.
    expected return:
        trigger name: ... trigger handler name: ...  trigger uuid: ... trigger topic: ...
"""

for function in c.list_triggers()['triggers']:
    print("trigger name:", function['function_name'], "\n",
          "trigger handler name:", function['function_detail']['Configuration']['Handler'], "\n",
          "trigger uuid:", function['triggers'][0]['UUID'], "\n",
          "trigger topic:", function['triggers'][0]['Topics'][0], "\n",)

In [None]:
"""
    List all triggered created by the user.
    Note: the print function below highlights the trigger most recently created
    expected return: {'function_name': ..., 'function_detail': ..., 'triggers': [...]}
"""

for function in c.list_triggers()['triggers']:
    if function['function_name'] == trigger_name:
        pprint(function, sort_dicts=False)
        trigger_uuid = function['triggers'][0]['UUID']

### 3.6 Update trigger configurations

In [None]:
"""
    Update the invocation configuration -- add event filters
    Note: see the table above for other tunable configurations.
    expected return: {"status": "success", "before": {}, "after": {}}
"""


pattern1 = {"value": {"event_type": ["created"]}}
config1 = {
    "Enabled": True,
    "BatchSize": 123,
    "FilterCriteria": {"Filters": [{'Pattern': json.dumps(pattern1)}]},
    "MaximumBatchingWindowInSeconds": 42
}
print(c.update_trigger(trigger_uuid, config1))

In [None]:
"""
    Update the invocation configuration -- remove event filters and change the batch size back.
    Note: see the table above for other tunable configurations.
    expected return: {"status": "success", "before": {}, "after": {}}
"""
config2 = {
    "BatchSize": 1,
    "FilterCriteria": {},
    "MaximumBatchingWindowInSeconds": 1
}
print(c.update_trigger(trigger_uuid, config2))

### 3.6 Produce events to invoke the trigger and verify invocations through inspecting the latest log stream

In [None]:
"""
    Synchronously produce messages to a registered topic to invoke triggers
    expected return: 
        multiple RecordMetadata(...)
"""

producer = KafkaProducer()
future = producer.send(
    topic_name, {'message': 'Synchronous message 3 from Diaspora SDK'})
print(future.get(timeout=10))
future = producer.send(
    topic_name, {'message': 'Synchronous message 4 from Diaspora SDK'})
print(future.get(timeout=10))

In [None]:
"""
    Get the list of log streams belong to the trigger.
    Note: recent_log_stream_name may not contain logs of all trigger invocations,
      as some logs may exist in other streams.
    expected return:  {"status": "success", "streams": [...]}
"""

streams_response = c.list_log_streams(trigger_name)
print(streams_response)
recent_log_stream_name = streams_response['streams'][0]['logStreamName']

In [None]:
"""
    Retrieve the events in a particular log stream.
    Note: this log stream may not contain logs of all trigger invocations,
      as some logs may exist in other streams.
    expected return:  {"status": "success", "events": [...]}
"""
print(c.get_log_events(trigger_name, recent_log_stream_name))

### 3.7 Trigger deletion call

In [None]:
"""
    Delete trigger by (topic_name, trigger_name)
    expected return: {"status": "success", "message": ...}
"""
print(c.delete_trigger(topic_name, trigger_name))

In [None]:
"""
    List all triggered created by the user.
    expected return (if all triggers are deleted): None 
    expected return (otherwise): {'function_name': ..., 'function_detail': ..., 'triggers': [...]}
"""
for function in c.list_triggers()['triggers']:
    print("trigger name:", function['function_name'], "\n",
          "trigger handler name:", function['function_detail']['Configuration']['Handler'], "\n",
          "trigger uuid:", function['triggers'][0]['UUID'], "\n",
          "trigger topic:", function['triggers'][0]['Topics'][0], "\n",)

### 3.7 Unregister topic

In [None]:
"""
    Unregister a topic (i.e., remove user access), leave all existing events in the topic unaffected.
    expected return (first time): { "status": "success", "message": ...}
    expected return (subsequent): { "status": "no-op", "message": ...}
"""
print(c.unregister_topic(topic_name))