### Diaspora AP Demo
Adapted from https://github.com/globus/globus-jupyter-notebooks/blob/master/Automation_Using_Globus_Flows.ipynb

In [3]:
"""Use Diaspora AP in Globus Flows."""

from __future__ import annotations

import json
import time
import uuid

import globus_sdk
import globus_sdk.scopes
from diaspora_event_sdk import Client as GlobusClient

# ID of this tutorial notebook as registered with Globus Auth
CLIENT_ID = 'f794186b-f330-4595-b6c6-9c9d3e903e47'

In [4]:
# Do a native app authentication flow to get tokens that allow us
# to interact with the Globus Flows service

scopes = [
    'openid',
    'profile',
    'email',
    globus_sdk.FlowsClient.scopes.manage_flows,
    globus_sdk.FlowsClient.scopes.run_manage,
]
native_auth_client = globus_sdk.NativeAppAuthClient(CLIENT_ID)
native_auth_client.oauth2_start_flow(requested_scopes=scopes)
print(f'Login Here:\n\n{native_auth_client.oauth2_get_authorize_url()}')

auth_code = input('Authorization Code: ')
response = native_auth_client.oauth2_exchange_code_for_tokens(auth_code)

tokens = response.by_resource_server
print(json.dumps(tokens, indent=2))

flows_authorizer = globus_sdk.AccessTokenAuthorizer(
    access_token=tokens['flows.globus.org']['access_token'],
)
flows_client = globus_sdk.FlowsClient(authorizer=flows_authorizer)

Login Here:

https://auth.globus.org/v2/oauth2/authorize?client_id=f794186b-f330-4595-b6c6-9c9d3e903e47&redirect_uri=https%3A%2F%2Fauth.globus.org%2Fv2%2Fweb%2Fauth-code&scope=openid+profile+email+https%3A%2F%2Fauth.globus.org%2Fscopes%2Feec9b274-0c81-4334-bdc2-54e90e689b9a%2Fmanage_flows+https%3A%2F%2Fauth.globus.org%2Fscopes%2Feec9b274-0c81-4334-bdc2-54e90e689b9a%2Frun_manage&state=_default&response_type=code&code_challenge=39kTLNfYi7jeB7t90-Wnyot8djEoZB1MKbdplOVZ94c&code_challenge_method=S256&access_type=online
{
  "auth.globus.org": {
    "scope": "openid profile email",
    "access_token": "Ag70Mb8Gnrpe8k4djMKvlbNb42d9VpaXM0gW5YVyparqDG6WbEiWCvPGdez88xm7OGxvP7jgGvwaQnfnbyY11i6MQmvSGE1nVSmnMp7",
    "refresh_token": null,
    "token_type": "Bearer",
    "expires_at_seconds": 1719592569,
    "resource_server": "auth.globus.org"
  },
  "flows.globus.org": {
    "scope": "https://auth.globus.org/scopes/eec9b274-0c81-4334-bdc2-54e90e689b9a/manage_flows https://auth.globus.org/scopes/ee

In [5]:
# Create an Auth client so we can look up identities
auth_authorizer = globus_sdk.AccessTokenAuthorizer(
    access_token=tokens['auth.globus.org']['access_token'],
)
ac = globus_sdk.AuthClient(authorizer=auth_authorizer)

# Get the user's primary identity
primary_identity = ac.oauth2_userinfo()
identity_id = primary_identity['sub']

print(f"Username: {primary_identity['preferred_username']}")
print(f'ID: {identity_id}')

Username: ravescovi@globusid.org
ID: 95278182-10a1-11e6-9c7e-7b385f033313


### Select a Topic

In [30]:
c = GlobusClient()
print(c)
print("User's OpenID:", c.subject_openid)
topic = 'topic' + c.subject_openid[-12:]
print(c.register_topic(topic))
print(c.list_topics())
print('Topic to produce/consume:', topic)

<diaspora_event_sdk.sdk.client.Client object at 0x7f78c2e30ee0>
User's OpenID: 95278182-10a1-11e6-9c7e-7b385f033313
{
  "status": "no-op",
  "message": "Principal 95278182-10a1-11e6-9c7e-7b385f033313 already has access."
}
{
  "status": "success",
  "topics": [
    "topic7b385f033313",
    "wei_diaspora"
  ]
}
Topic to produce/consume: topic7b385f033313


### 2.1.1 Produce Messages to AP without msg keys

In [7]:
action_url = 'https://diaspora-action-provider.ml22sevubfnks.us-east-1.cs.amazonlightsail.com/'

flow_definition1 = {
    'Comment': 'Publish messages to Diaspora Event Fabric',
    'StartAt': 'PublishMessages',
    'States': {
        'PublishMessages': {
            'Comment': 'Send messages to a specified topic in Diaspora',
            'Type': 'Action',
            'ActionUrl': action_url,
            'Parameters': {
                'action.$': '$.input.action',
                'topic.$': '$.input.topic',
                'msgs.$': '$.input.msgs',
            },
            'ResultPath': '$.PublishMessages',
            'End': True,
        },
    },
}

In [8]:
flow_title = f'Diapora-AP-Flow-{str(uuid.uuid4())[:4]}'
flow = flows_client.create_flow(
    title=flow_title,
    definition=flow_definition1,
    # definition=flow_definition2,
    input_schema={},
)
flow_id = flow['id']
flow_scope = globus_sdk.SpecificFlowClient(flow_id).scopes.make_mutable('user')
print(f"Successfully created flow: '{flow_title} (ID: {flow_id})")
print(f'View the flow in the Web App: https://app.globus.org/flows/{flow_id}')

Successfully created flow: 'Diapora-AP-Flow-2954 (ID: 8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0)
View the flow in the Web App: https://app.globus.org/flows/8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0


In [9]:
if flow_id not in tokens:
    # Do a native app authentication flow and get tokens that
    # include the newly deployed flow scope
    native_auth_client = globus_sdk.NativeAppAuthClient(CLIENT_ID)
    native_auth_client.oauth2_start_flow(requested_scopes=flow_scope)
    print(f'Login Here:\n\n{native_auth_client.oauth2_get_authorize_url()}')

    # Authenticate and come back with your authorization code;
    # paste it into the prompt below.
    auth_code = input('Authorization Code: ')
    token_response = native_auth_client.oauth2_exchange_code_for_tokens(
        auth_code,
    )

    # Save the new token in a place where the flows client can retrieve it.
    tokens[flow_id] = token_response.by_resource_server[flow_id]

    # These are the saved scopes for the flow
    print(json.dumps(tokens, indent=2))

Login Here:

https://auth.globus.org/v2/oauth2/authorize?client_id=f794186b-f330-4595-b6c6-9c9d3e903e47&redirect_uri=https%3A%2F%2Fauth.globus.org%2Fv2%2Fweb%2Fauth-code&scope=https%3A%2F%2Fauth.globus.org%2Fscopes%2F8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0%2Fflow_8886c9d2_7746_4bb8_9ad9_58dcc6bad4b0_user&state=_default&response_type=code&code_challenge=bR5EVZI4_XsHbMdw9GPkzpVEkjT4OLqaviW_my3P1Co&code_challenge_method=S256&access_type=online
{
  "auth.globus.org": {
    "scope": "openid profile email",
    "access_token": "Ag70Mb8Gnrpe8k4djMKvlbNb42d9VpaXM0gW5YVyparqDG6WbEiWCvPGdez88xm7OGxvP7jgGvwaQnfnbyY11i6MQmvSGE1nVSmnMp7",
    "refresh_token": null,
    "token_type": "Bearer",
    "expires_at_seconds": 1719592569,
    "resource_server": "auth.globus.org"
  },
  "flows.globus.org": {
    "scope": "https://auth.globus.org/scopes/eec9b274-0c81-4334-bdc2-54e90e689b9a/manage_flows https://auth.globus.org/scopes/eec9b274-0c81-4334-bdc2-54e90e689b9a/run_manage",
    "access_token": "AgyWkOmyV

In [10]:
flow_input1 = {
    'input': {
        'action': 'produce',
        'topic': topic,
        'msgs': [
            {'content': 'hello world1'},
            {'content': 'hello world2'},
            {'content': 'hello world3'},
        ],
    },
}

In [11]:
# Get a client for the flow
specific_flow_authorizer = globus_sdk.AccessTokenAuthorizer(
    access_token=tokens[flow_id]['access_token'],
)
print(tokens[flow_id]['access_token'])
specific_flow_client = globus_sdk.SpecificFlowClient(
    flow_id=flow_id,
    authorizer=specific_flow_authorizer,
)

# Run the flow
# Set a descriptive label for this flow run
run_label = f"Diaspora AP Flow by {primary_identity['preferred_username']}"
run = specific_flow_client.run_flow(
    body=flow_input1,
    label=run_label,
    tags=['tutorial', 'diaspora'],
)

# Get run details
run_id = run['run_id']
run_status = run['status']
print('This flow can be monitored in the Web App:')
print(f'https://app.globus.org/runs/{run_id}')
print(f'Flow run started with ID: {run_id} - Status: {run_status}')

# Poll the Flow service to check on the status of the flow
while run_status == 'ACTIVE':
    time.sleep(5)
    run = flows_client.get_run(run_id)
    run_status = run['status']
    print(f'Run status: {run_status}')

# Run completed
print(json.dumps(run.data, indent=2))

Ag8z1xvrK8JoBWrbEqKdyzOXQxmEgxO9XkEp33n5b3oDobvQdzf8C4y5XmxNMoEp3MJVrndGMjOYz8uyEQzNlIVp5Xd
This flow can be monitored in the Web App:
https://app.globus.org/runs/0f09301f-abd5-4be8-bc83-d1db906a7f63
Flow run started with ID: 0f09301f-abd5-4be8-bc83-d1db906a7f63 - Status: ACTIVE
Run status: SUCCEEDED
{
  "run_id": "0f09301f-abd5-4be8-bc83-d1db906a7f63",
  "flow_id": "8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0",
  "flow_title": "Diapora-AP-Flow-2954",
  "flow_last_updated": "2024-06-26T16:37:36.461617+00:00",
  "start_time": "2024-06-26T16:38:00.659420+00:00",
  "completion_time": "2024-06-26T16:38:04.038000+00:00",
  "status": "SUCCEEDED",
  "display_status": "SUCCEEDED",
  "details": {
    "code": "FlowSucceeded",
    "output": {
      "input": {
        "msgs": [
          {
            "content": "hello world1"
          },
          {
            "content": "hello world2"
          },
          {
            "content": "hello world3"
          }
        ],
        "topic": "topic7b385f03

### 2.2.1 Produce Messages to AP with a Single Key

In [12]:
flow_definition2 = {
    'Comment': 'Publish messages to Diaspora Event Fabric',
    'StartAt': 'PublishMessages',
    'States': {
        'PublishMessages': {
            'Comment': 'Send messages to a specified topic in Diaspora',
            'Type': 'Action',
            'ActionUrl': action_url,
            'Parameters': {
                'action.$': '$.input.action',
                'topic.$': '$.input.topic',
                'msgs.$': '$.input.msgs',
                'keys.$': '$.input.keys',
            },
            'ResultPath': '$.PublishMessages',
            'End': True,
        },
    },
}

In [13]:
flows_client.update_flow(flow_id, definition=flow_definition2, input_schema={})

GlobusHTTPResponse({"id": "8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0", "definition": {"Comment": "Publish messages to Diaspora Event Fabric", "StartAt": "PublishMessages", "States": {"PublishMessages": {"Comment": "Send messages to a specified topic in Diaspora", "Type": "Action", "ActionUrl": "https://diaspora-action-provider.ml22sevubfnks.us-east-1.cs.amazonlightsail.com/", "Parameters": {"action.$": "$.input.action", "topic.$": "$.input.topic", "msgs.$": "$.input.msgs", "keys.$": "$.input.keys"}, "ResultPath": "$.PublishMessages", "End": true}}}, "input_schema": {}, "globus_auth_scope": "https://auth.globus.org/scopes/8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0/flow_8886c9d2_7746_4bb8_9ad9_58dcc6bad4b0_user", "synchronous": false, "log_supported": true, "types": ["Action"], "api_version": "1.0", "title": "Diapora-AP-Flow-2954", "subtitle": "", "description": "", "keywords": [], "principal_urn": "urn:globus:auth:identity:8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0", "globus_auth_username": "8886c9d2-77

In [14]:
flow_input2a = {
    'input': {
        'action': 'produce',
        'topic': topic,
        'msgs': [
            {'content': 'hello world1'},
            {'content': 'hello world2'},
            {'content': 'hello world3'},
        ],
        'keys': 'my-key-123',
    },
}

In [15]:
# Get a client for the flow
specific_flow_authorizer = globus_sdk.AccessTokenAuthorizer(
    access_token=tokens[flow_id]['access_token'],
)
print(tokens[flow_id]['access_token'])
specific_flow_client = globus_sdk.SpecificFlowClient(
    flow_id=flow_id,
    authorizer=specific_flow_authorizer,
)

# Run the flow
# Set a descriptive label for this flow run
run_label = f"Diaspora AP Flow by {primary_identity['preferred_username']}"
run = specific_flow_client.run_flow(
    body=flow_input2a,
    label=run_label,
    tags=['tutorial', 'diaspora'],
)

# Get run details
run_id = run['run_id']
run_status = run['status']
print('This flow can be monitored in the Web App:')
print(f'https://app.globus.org/runs/{run_id}')
print(f'Flow run started with ID: {run_id} - Status: {run_status}')

# Poll the Flow service to check on the status of the flow
while run_status == 'ACTIVE':
    time.sleep(5)
    run = flows_client.get_run(run_id)
    run_status = run['status']
    print(f'Run status: {run_status}')

# Run completed
print(json.dumps(run.data, indent=2))

Ag8z1xvrK8JoBWrbEqKdyzOXQxmEgxO9XkEp33n5b3oDobvQdzf8C4y5XmxNMoEp3MJVrndGMjOYz8uyEQzNlIVp5Xd
This flow can be monitored in the Web App:
https://app.globus.org/runs/ad07ba50-c1d9-458e-829c-13988bd66aaa
Flow run started with ID: ad07ba50-c1d9-458e-829c-13988bd66aaa - Status: ACTIVE
Run status: SUCCEEDED
{
  "run_id": "ad07ba50-c1d9-458e-829c-13988bd66aaa",
  "flow_id": "8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0",
  "flow_title": "Diapora-AP-Flow-2954",
  "flow_last_updated": "2024-06-26T16:38:24.334693+00:00",
  "start_time": "2024-06-26T16:38:25.910978+00:00",
  "completion_time": "2024-06-26T16:38:29.349000+00:00",
  "status": "SUCCEEDED",
  "display_status": "SUCCEEDED",
  "details": {
    "code": "FlowSucceeded",
    "output": {
      "input": {
        "keys": "my-key-123",
        "msgs": [
          {
            "content": "hello world1"
          },
          {
            "content": "hello world2"
          },
          {
            "content": "hello world3"
          }
        ],
 

### 2.3.1 Produce Messages to AP with a List of Keys

In [16]:
flow_input2b = {
    'input': {
        'action': 'produce',
        'topic': topic,
        'msgs': [
            {'content': 'hello world1'},
            {'content': 'hello world2'},
            {'content': 'hello world3'},
        ],
        'keys': [
            'my-key-123',
            'my-key-456',
            'my-key-789',
        ],
    },
}

In [17]:
# Get a client for the flow
specific_flow_authorizer = globus_sdk.AccessTokenAuthorizer(
    access_token=tokens[flow_id]['access_token'],
)
print(tokens[flow_id]['access_token'])
specific_flow_client = globus_sdk.SpecificFlowClient(
    flow_id=flow_id,
    authorizer=specific_flow_authorizer,
)

# Run the flow
# Set a descriptive label for this flow run
run_label = f"Diaspora AP Flow by {primary_identity['preferred_username']}"
run = specific_flow_client.run_flow(
    body=flow_input2b,
    label=run_label,
    tags=['tutorial', 'diaspora'],
)

# Get run details
run_id = run['run_id']
run_status = run['status']
print('This flow can be monitored in the Web App:')
print(f'https://app.globus.org/runs/{run_id}')
print(f'Flow run started with ID: {run_id} - Status: {run_status}')

# Poll the Flow service to check on the status of the flow
while run_status == 'ACTIVE':
    time.sleep(5)
    run = flows_client.get_run(run_id)
    run_status = run['status']
    print(f'Run status: {run_status}')

# Run completed
print(json.dumps(run.data, indent=2))

Ag8z1xvrK8JoBWrbEqKdyzOXQxmEgxO9XkEp33n5b3oDobvQdzf8C4y5XmxNMoEp3MJVrndGMjOYz8uyEQzNlIVp5Xd
This flow can be monitored in the Web App:
https://app.globus.org/runs/bc580077-91d7-42e6-9a00-6b3c942974b6
Flow run started with ID: bc580077-91d7-42e6-9a00-6b3c942974b6 - Status: ACTIVE
Run status: ACTIVE
Run status: SUCCEEDED
{
  "run_id": "bc580077-91d7-42e6-9a00-6b3c942974b6",
  "flow_id": "8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0",
  "flow_title": "Diapora-AP-Flow-2954",
  "flow_last_updated": "2024-06-26T16:38:24.334693+00:00",
  "start_time": "2024-06-26T16:38:31.728486+00:00",
  "completion_time": "2024-06-26T16:38:37.326000+00:00",
  "status": "SUCCEEDED",
  "display_status": "SUCCEEDED",
  "details": {
    "code": "FlowSucceeded",
    "output": {
      "input": {
        "keys": [
          "my-key-123",
          "my-key-456",
          "my-key-789"
        ],
        "msgs": [
          {
            "content": "hello world1"
          },
          {
            "content": "hello world2

### 3.1.1: Consume All Messages since the Earliest Available Message

In [18]:
flow_definition3 = {
    'Comment': 'Consume messages to Diaspora Event Fabric',
    'StartAt': 'ConsumeMessages',
    'States': {
        'ConsumeMessages': {
            'Comment': 'Receive messages from a specified topic in Diaspora',
            'Type': 'Action',
            'ActionUrl': action_url,
            'Parameters': {
                'action.$': '$.input.action',
                'topic.$': '$.input.topic',
            },
            'ResultPath': '$.ConsumeMessages',
            'End': True,
        },
    },
}

flows_client.update_flow(flow_id, definition=flow_definition3, input_schema={})

GlobusHTTPResponse({"id": "8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0", "definition": {"Comment": "Consume messages to Diaspora Event Fabric", "StartAt": "ConsumeMessages", "States": {"ConsumeMessages": {"Comment": "Receive messages from a specified topic in Diaspora", "Type": "Action", "ActionUrl": "https://diaspora-action-provider.ml22sevubfnks.us-east-1.cs.amazonlightsail.com/", "Parameters": {"action.$": "$.input.action", "topic.$": "$.input.topic"}, "ResultPath": "$.ConsumeMessages", "End": true}}}, "input_schema": {}, "globus_auth_scope": "https://auth.globus.org/scopes/8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0/flow_8886c9d2_7746_4bb8_9ad9_58dcc6bad4b0_user", "synchronous": false, "log_supported": true, "types": ["Action"], "api_version": "1.0", "title": "Diapora-AP-Flow-2954", "subtitle": "", "description": "", "keywords": [], "principal_urn": "urn:globus:auth:identity:8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0", "globus_auth_username": "8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0@clients.auth.globus.o

In [19]:
flow_input3a = {
    'input': {'action': 'consume', 'topic': topic},
}

In [20]:
# Get a client for the flow
specific_flow_authorizer = globus_sdk.AccessTokenAuthorizer(
    access_token=tokens[flow_id]['access_token'],
)
print(tokens[flow_id]['access_token'])
specific_flow_client = globus_sdk.SpecificFlowClient(
    flow_id=flow_id,
    authorizer=specific_flow_authorizer,
)

# Run the flow
# Set a descriptive label for this flow run
run_label = f"Diaspora AP Flow by {primary_identity['preferred_username']}"
run = specific_flow_client.run_flow(
    body=flow_input3a,
    label=run_label,
    tags=['tutorial', 'diaspora'],
)

# Get run details
run_id = run['run_id']
run_status = run['status']
print('This flow can be monitored in the Web App:')
print(f'https://app.globus.org/runs/{run_id}')
print(f'Flow run started with ID: {run_id} - Status: {run_status}')

# Poll the Flow service to check on the status of the flow
while run_status == 'ACTIVE':
    time.sleep(5)
    run = flows_client.get_run(run_id)
    run_status = run['status']
    print(f'Run status: {run_status}')

# Run completed
print(json.dumps(run.data, indent=2))

Ag8z1xvrK8JoBWrbEqKdyzOXQxmEgxO9XkEp33n5b3oDobvQdzf8C4y5XmxNMoEp3MJVrndGMjOYz8uyEQzNlIVp5Xd
This flow can be monitored in the Web App:
https://app.globus.org/runs/9af9adb3-f9f1-4c5b-b2b2-c92034a62728
Flow run started with ID: 9af9adb3-f9f1-4c5b-b2b2-c92034a62728 - Status: ACTIVE
Run status: SUCCEEDED
{
  "run_id": "9af9adb3-f9f1-4c5b-b2b2-c92034a62728",
  "flow_id": "8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0",
  "flow_title": "Diapora-AP-Flow-2954",
  "flow_last_updated": "2024-06-26T16:38:42.716116+00:00",
  "start_time": "2024-06-26T16:38:43.762699+00:00",
  "completion_time": "2024-06-26T16:38:47.813000+00:00",
  "status": "SUCCEEDED",
  "display_status": "SUCCEEDED",
  "details": {
    "code": "FlowSucceeded",
    "output": {
      "input": {
        "topic": "topic7b385f033313",
        "action": "consume"
      },
      "ConsumeMessages": {
        "label": null,
        "status": "SUCCEEDED",
        "details": {
          "topic7b385f033313-0": [
            {
              "key": n

### 3.1.2: Consume Messages since a Timestamp

In [21]:
flow_definition4 = {
    'Comment': 'Consume messages to Diaspora Event Fabric',
    'StartAt': 'ConsumeMessages',
    'States': {
        'ConsumeMessages': {
            'Comment': 'Receive messages from a specified topic in Diaspora',
            'Type': 'Action',
            'ActionUrl': action_url,
            'Parameters': {
                'action.$': '$.input.action',
                'topic.$': '$.input.topic',
                'ts.$': '$.input.ts',
            },
            'ResultPath': '$.ConsumeMessages',
            'End': True,
        },
    },
}

flows_client.update_flow(flow_id, definition=flow_definition4, input_schema={})

GlobusHTTPResponse({"id": "8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0", "definition": {"Comment": "Consume messages to Diaspora Event Fabric", "StartAt": "ConsumeMessages", "States": {"ConsumeMessages": {"Comment": "Receive messages from a specified topic in Diaspora", "Type": "Action", "ActionUrl": "https://diaspora-action-provider.ml22sevubfnks.us-east-1.cs.amazonlightsail.com/", "Parameters": {"action.$": "$.input.action", "topic.$": "$.input.topic", "ts.$": "$.input.ts"}, "ResultPath": "$.ConsumeMessages", "End": true}}}, "input_schema": {}, "globus_auth_scope": "https://auth.globus.org/scopes/8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0/flow_8886c9d2_7746_4bb8_9ad9_58dcc6bad4b0_user", "synchronous": false, "log_supported": true, "types": ["Action"], "api_version": "1.0", "title": "Diapora-AP-Flow-2954", "subtitle": "", "description": "", "keywords": [], "principal_urn": "urn:globus:auth:identity:8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0", "globus_auth_username": "8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0

In [22]:
flow_input4a = {
    'input': {'action': 'consume', 'topic': topic, 'ts': 1715930522000},
}

In [23]:
# Get a client for the flow
specific_flow_authorizer = globus_sdk.AccessTokenAuthorizer(
    access_token=tokens[flow_id]['access_token'],
)
print(tokens[flow_id]['access_token'])
specific_flow_client = globus_sdk.SpecificFlowClient(
    flow_id=flow_id,
    authorizer=specific_flow_authorizer,
)

# Run the flow
# Set a descriptive label for this flow run
run_label = f"Diaspora AP Flow by {primary_identity['preferred_username']}"
run = specific_flow_client.run_flow(
    body=flow_input4a,
    label=run_label,
    tags=['tutorial', 'diaspora'],
)

# Get run details
run_id = run['run_id']
run_status = run['status']
print('This flow can be monitored in the Web App:')
print(f'https://app.globus.org/runs/{run_id}')
print(f'Flow run started with ID: {run_id} - Status: {run_status}')

# Poll the Flow service to check on the status of the flow
while run_status == 'ACTIVE':
    time.sleep(5)
    run = flows_client.get_run(run_id)
    run_status = run['status']
    print(f'Run status: {run_status}')

# Run completed
print(json.dumps(run.data, indent=2))

Ag8z1xvrK8JoBWrbEqKdyzOXQxmEgxO9XkEp33n5b3oDobvQdzf8C4y5XmxNMoEp3MJVrndGMjOYz8uyEQzNlIVp5Xd
This flow can be monitored in the Web App:
https://app.globus.org/runs/87ec99e1-9a2c-439b-a2fc-29539992b3e2
Flow run started with ID: 87ec99e1-9a2c-439b-a2fc-29539992b3e2 - Status: ACTIVE
Run status: SUCCEEDED
{
  "run_id": "87ec99e1-9a2c-439b-a2fc-29539992b3e2",
  "flow_id": "8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0",
  "flow_title": "Diapora-AP-Flow-2954",
  "flow_last_updated": "2024-06-26T16:38:49.273998+00:00",
  "start_time": "2024-06-26T16:38:50.611322+00:00",
  "completion_time": "2024-06-26T16:38:55.273000+00:00",
  "status": "SUCCEEDED",
  "display_status": "SUCCEEDED",
  "details": {
    "code": "FlowSucceeded",
    "output": {
      "input": {
        "ts": 1715930522000,
        "topic": "topic7b385f033313",
        "action": "consume"
      },
      "ConsumeMessages": {
        "label": null,
        "status": "SUCCEEDED",
        "details": {
          "topic7b385f033313-0": [
       

### 3.2.1: Consume Messages with Group ID

In [24]:
flow_definition5 = {
    'Comment': 'Consume messages to Diaspora Event Fabric',
    'StartAt': 'ConsumeMessages',
    'States': {
        'ConsumeMessages': {
            'Comment': 'Receive messages from a specified topic in Diaspora',
            'Type': 'Action',
            'ActionUrl': action_url,
            'Parameters': {
                'action.$': '$.input.action',
                'topic.$': '$.input.topic',
                'group_id.$': '$.input.group_id',
            },
            'ResultPath': '$.ConsumeMessages',
            'End': True,
        },
    },
}

flows_client.update_flow(flow_id, definition=flow_definition5, input_schema={})

GlobusHTTPResponse({"id": "8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0", "definition": {"Comment": "Consume messages to Diaspora Event Fabric", "StartAt": "ConsumeMessages", "States": {"ConsumeMessages": {"Comment": "Receive messages from a specified topic in Diaspora", "Type": "Action", "ActionUrl": "https://diaspora-action-provider.ml22sevubfnks.us-east-1.cs.amazonlightsail.com/", "Parameters": {"action.$": "$.input.action", "topic.$": "$.input.topic", "group_id.$": "$.input.group_id"}, "ResultPath": "$.ConsumeMessages", "End": true}}}, "input_schema": {}, "globus_auth_scope": "https://auth.globus.org/scopes/8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0/flow_8886c9d2_7746_4bb8_9ad9_58dcc6bad4b0_user", "synchronous": false, "log_supported": true, "types": ["Action"], "api_version": "1.0", "title": "Diapora-AP-Flow-2954", "subtitle": "", "description": "", "keywords": [], "principal_urn": "urn:globus:auth:identity:8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0", "globus_auth_username": "8886c9d2-7746-4bb8-9ad9-

In [25]:
flow_input5a = {
    'input': {
        'action': 'consume',
        'topic': topic,
        'group_id': 'my-group-1234',
    },
}
print(flow_input5a)

{'input': {'action': 'consume', 'topic': 'topic7b385f033313', 'group_id': 'my-group-1234'}}


In [26]:
# Get a client for the flow
specific_flow_authorizer = globus_sdk.AccessTokenAuthorizer(
    access_token=tokens[flow_id]['access_token'],
)
print(tokens[flow_id]['access_token'])
specific_flow_client = globus_sdk.SpecificFlowClient(
    flow_id=flow_id,
    authorizer=specific_flow_authorizer,
)

# Run the flow
# Set a descriptive label for this flow run
run_label = f"Diaspora AP Flow by {primary_identity['preferred_username']}"
run = specific_flow_client.run_flow(
    body=flow_input5a,
    label=run_label,
    tags=['tutorial', 'diaspora'],
)

# Get run details
run_id = run['run_id']
run_status = run['status']
print('This flow can be monitored in the Web App:')
print(f'https://app.globus.org/runs/{run_id}')
print(f'Flow run started with ID: {run_id} - Status: {run_status}')

# Poll the Flow service to check on the status of the flow
while run_status == 'ACTIVE':
    time.sleep(5)
    run = flows_client.get_run(run_id)
    run_status = run['status']
    print(f'Run status: {run_status}')

# Run completed
print(json.dumps(run.data, indent=2))

Ag8z1xvrK8JoBWrbEqKdyzOXQxmEgxO9XkEp33n5b3oDobvQdzf8C4y5XmxNMoEp3MJVrndGMjOYz8uyEQzNlIVp5Xd
This flow can be monitored in the Web App:
https://app.globus.org/runs/4143484c-bb82-4c88-a0a4-f95dbcfe1751
Flow run started with ID: 4143484c-bb82-4c88-a0a4-f95dbcfe1751 - Status: ACTIVE
Run status: ACTIVE
Run status: ACTIVE
Run status: SUCCEEDED
{
  "run_id": "4143484c-bb82-4c88-a0a4-f95dbcfe1751",
  "flow_id": "8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0",
  "flow_title": "Diapora-AP-Flow-2954",
  "flow_last_updated": "2024-06-26T16:38:56.317002+00:00",
  "start_time": "2024-06-26T16:38:57.355546+00:00",
  "completion_time": "2024-06-26T16:39:08.592000+00:00",
  "status": "SUCCEEDED",
  "display_status": "SUCCEEDED",
  "details": {
    "code": "FlowSucceeded",
    "output": {
      "input": {
        "topic": "topic7b385f033313",
        "action": "consume",
        "group_id": "my-group-1234"
      },
      "ConsumeMessages": {
        "label": null,
        "status": "SUCCEEDED",
        "details"

### 3.3.1 Consume Messages with Filters (a msg is returned if filter1 OR filter2 is met; within a filter, all conditions must be met)

Can use with `ts` or `group_id` or both. More examples see the other notebook.

In [27]:
flow_definition6 = {
    'Comment': 'Consume messages to Diaspora Event Fabric',
    'StartAt': 'ConsumeMessages',
    'States': {
        'ConsumeMessages': {
            'Comment': 'Receive messages from a specified topic in Diaspora',
            'Type': 'Action',
            'ActionUrl': action_url,
            'Parameters': {
                'action.$': '$.input.action',
                'topic.$': '$.input.topic',
                'filters.$': '$.input.filters',
            },
            'ResultPath': '$.ConsumeMessages',
            'End': True,
        },
    },
}

flows_client.update_flow(flow_id, definition=flow_definition6, input_schema={})

GlobusHTTPResponse({"id": "8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0", "definition": {"Comment": "Consume messages to Diaspora Event Fabric", "StartAt": "ConsumeMessages", "States": {"ConsumeMessages": {"Comment": "Receive messages from a specified topic in Diaspora", "Type": "Action", "ActionUrl": "https://diaspora-action-provider.ml22sevubfnks.us-east-1.cs.amazonlightsail.com/", "Parameters": {"action.$": "$.input.action", "topic.$": "$.input.topic", "filters.$": "$.input.filters"}, "ResultPath": "$.ConsumeMessages", "End": true}}}, "input_schema": {}, "globus_auth_scope": "https://auth.globus.org/scopes/8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0/flow_8886c9d2_7746_4bb8_9ad9_58dcc6bad4b0_user", "synchronous": false, "log_supported": true, "types": ["Action"], "api_version": "1.0", "title": "Diapora-AP-Flow-2954", "subtitle": "", "description": "", "keywords": [], "principal_urn": "urn:globus:auth:identity:8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0", "globus_auth_username": "8886c9d2-7746-4bb8-9ad9-58

In [28]:
flow_input6a = {
    'input': {
        'action': 'consume',
        'topic': topic,
        'filters': [
            {'Pattern': {'value': {'content': [{'prefix': 'hello world1'}]}}},
            {
                'Pattern': {
                    'value': {
                        'content': [
                            {'prefix': 'hello', 'suffix': 'world2'},
                        ],
                    },
                },
            },
        ],
    },
}
print(flow_input6a)

{'input': {'action': 'consume', 'topic': 'topic7b385f033313', 'filters': [{'Pattern': {'value': {'content': [{'prefix': 'hello world1'}]}}}, {'Pattern': {'value': {'content': [{'prefix': 'hello', 'suffix': 'world2'}]}}}]}}


In [29]:
# Get a client for the flow
specific_flow_authorizer = globus_sdk.AccessTokenAuthorizer(
    access_token=tokens[flow_id]['access_token'],
)
print(tokens[flow_id]['access_token'])
specific_flow_client = globus_sdk.SpecificFlowClient(
    flow_id=flow_id,
    authorizer=specific_flow_authorizer,
)

# Run the flow
# Set a descriptive label for this flow run
run_label = f"Diaspora AP Flow by {primary_identity['preferred_username']}"
run = specific_flow_client.run_flow(
    body=flow_input6a,
    label=run_label,
    tags=['tutorial', 'diaspora'],
)

# Get run details
run_id = run['run_id']
run_status = run['status']
print('This flow can be monitored in the Web App:')
print(f'https://app.globus.org/runs/{run_id}')
print(f'Flow run started with ID: {run_id} - Status: {run_status}')

# Poll the Flow service to check on the status of the flow
while run_status == 'ACTIVE':
    time.sleep(5)
    run = flows_client.get_run(run_id)
    run_status = run['status']
    print(f'Run status: {run_status}')

# Run completed
print(json.dumps(run.data, indent=2))

Ag8z1xvrK8JoBWrbEqKdyzOXQxmEgxO9XkEp33n5b3oDobvQdzf8C4y5XmxNMoEp3MJVrndGMjOYz8uyEQzNlIVp5Xd
This flow can be monitored in the Web App:
https://app.globus.org/runs/0791b989-4861-4462-985e-c19537c51086
Flow run started with ID: 0791b989-4861-4462-985e-c19537c51086 - Status: ACTIVE
Run status: ACTIVE
Run status: SUCCEEDED
{
  "run_id": "0791b989-4861-4462-985e-c19537c51086",
  "flow_id": "8886c9d2-7746-4bb8-9ad9-58dcc6bad4b0",
  "flow_title": "Diapora-AP-Flow-2954",
  "flow_last_updated": "2024-06-26T16:39:13.282655+00:00",
  "start_time": "2024-06-26T16:39:14.272138+00:00",
  "completion_time": "2024-06-26T16:39:18.889000+00:00",
  "status": "SUCCEEDED",
  "display_status": "SUCCEEDED",
  "details": {
    "code": "FlowSucceeded",
    "output": {
      "input": {
        "topic": "topic7b385f033313",
        "action": "consume",
        "filters": [
          {
            "Pattern": {
              "value": {
                "content": [
                  {
                    "prefix": 