# Streams - Consuming data via REST API

Given some environment restrictions or preference by IT policies, this notebook shows an alternative mechanism to consume content from Streams. Specifically pulling messages from Google Cloud Pub/Sub Subscriptions using a convenitional REST API, instead of using the default streaming protocols. 

The current notebook shows how to interact with these taxonomies to convert codes to human-readable values or viceversa.

In this notebook...
* [Dependencies and Initialisation](#dependencies-and-initialisation)
* [Authentication](#authentication)
* [Pulling and Processing Messages](#pulling-and-processing-messages)
* [Acknowledging Messages](#acknowledging-messages)
* [Next Steps](#next-steps)

## Dependencies and Initialisation

This notebook expects that the `FACTIVA_USERKEY` and `FACTIVA_SUBSCRIPTIONID` are set in the `.env` file when the `dotenv.load_dotenv()` is executed. More details in the [Configuration notebook](0.2_configuration.ipynb).

In [1]:
import json
import os
import requests
import datetime
import jwt          # pip install PyJWT
import base64

from dotenv import load_dotenv
load_dotenv()

STREAM_CRED_URL = 'https://api.dowjones.com/sns-accounts/streaming-credentials'

USERKEY = os.environ['FACTIVA_USERKEY']
REQ_DEFAULT_HEADERS = {
    'user-key': USERKEY,
    'content-type': "application/json",
    'cache-control': "no-cache",
    'X-API-VERSION': "3.0"
}
SUBSCRIPTIONID = os.environ['FACTIVA_SUBSCRIPTIONID']
AUTHZ_URL = 'https://oauth2.googleapis.com/token'

## Authentication

Loads the streaming credentials from the API and extracts details needed for Google Cloud Pub/Sub requests.

In [2]:
resp = requests.get(STREAM_CRED_URL, headers=REQ_DEFAULT_HEADERS)
streaming_credentials = json.loads(resp.json()['data']['attributes']['streaming_credentials'])

private_key_id = streaming_credentials['private_key_id']
private_key = streaming_credentials['private_key']
client_email = streaming_credentials['client_email']
project_id = streaming_credentials['project_id']

With the streaming credentials data, builds an Authentication Token request object.

In [3]:
iat_dt = datetime.datetime.now()
iat = int(iat_dt.timestamp())
exp_dt = iat_dt + datetime.timedelta(seconds=3600)
exp = int(exp_dt.timestamp())

payload = {
    'iss': client_email,
    'scope': "https://www.googleapis.com/auth/cloud-platform https://www.googleapis.com/auth/pubsub",
    'aud': "https://oauth2.googleapis.com/token",
    'iat': iat,
    'exp': exp
}

additional_headers = {
    'kid': streaming_credentials['private_key_id']
}

authn_token = jwt.encode(payload, streaming_credentials['private_key'], headers=additional_headers, algorithm="RS256")

With the Authentication token, it uses the Google Auth service to obtain a Bearer Token (`jwt_token`).

In [4]:
authz_payload = {
        'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer',
        'assertion': authn_token
    }

resp = requests.post(AUTHZ_URL, data=authz_payload)
jwt_token = resp.json()

## Pulling and Processing Messages

Constants for the message consumption section.

In [None]:
PUBSUB_HEADERS = { 'Authorization': f"Bearer {jwt_token['access_token']}" }
PULL_BODY = { 'maxMessages': 10 }
PULL_URL = f"https://pubsub.googleapis.com/v1/projects/{project_id}/subscriptions/{SUBSCRIPTIONID}:pull"
ACK_URL = f"https://pubsub.googleapis.com/v1/projects/{project_id}/subscriptions/{SUBSCRIPTIONID}:acknowledge"
INFO_URL = f"https://pubsub.googleapis.com/v1/projects/{project_id}/subscriptions/{SUBSCRIPTIONID}"

Message handling custom function and list of messages acknowledgement function.

`process_message`: Function to process a News message. The message data structure is conditioned to the action.
- **ADD**: Reports a new article ID (AN) and must be processed as a new record.
- **REP**: Reports a modified AN. Changes can be applied to metadata or content (title, snippet or body). If no versions are stored, simply overwrite an existing message or insert as new item in case it was not previously received (Upsert operation in some DB engines).
- **DEL**: Reports a AN that needs to be deleted from the database, or soft-deleted by marking it as deleted and filtering out from end-user queries. If a hard delete is applied, ensure to report its deletion to the [Acknowledge Deletes API](https://developer.dowjones.com/site/docs/factiva_apis/factiva_analytics_apis/factiva_acknowledge_deletes_api/index.gsp).

In [6]:
def process_message(message):
    if message['action'] == 'add':
        # Process as a new item
        print(f"[Action: {message['action'].upper()}] AN: {message['an']}, Title: {message['title']}")
    elif message['action'] == 'update':
        # Process as an existing item or eventually as a new item
        print(f"[Action: {message['action'].upper()}] AN: {message['an']}, Title: {message['title']}")
    elif message['action'] == 'delete':
        # Apply a hard or soft delete
        print(f"[Action: {message['action'].upper()}] AN: {message['an']}")


def acknowledge_messages(ack_ids: list):
    ack_payload = {
        'ackIds': [ack_id for ack_id in ack_ids]
    }
    resp = requests.post(ACK_URL, headers=PUBSUB_HEADERS, json=ack_payload)
    if resp.status_code == 200:
        if resp.json() == {}:
            print("--- ACK Success ---")

Code snippet that consumes all messages until the subscription is emptied.

In [11]:
while True:
    pull_response = requests.post(PULL_URL, headers=PUBSUB_HEADERS, json=PULL_BODY)
    if 'receivedMessages' in pull_response.json():
        encoded_messages = pull_response.json()['receivedMessages']
        if len(encoded_messages) > 0:
            ack_ids = []
            for encoded_message in encoded_messages:
                encoded_data = encoded_message['message']['data']
                pubsub_message = base64.b64decode(encoded_data)
                pubsub_dict = json.loads(pubsub_message)
                news_message = pubsub_dict['data'][0]['attributes']
                process_message(news_message)
                ack_ids.append(encoded_message['ackId'])
            acknowledge_messages(ack_ids)
        else:
            break
    else:
        break
print("*** No more messages to process ***")

[Action: ADD] AN: DJDN000020241113ekbd005jj, Title: Press Release: RGC Resources, Inc. Reports 2024 Earnings
--- ACK Success ---
[Action: ADD] AN: DJDN000020241113ekbd005jk, Title: CBOT Dow Jones Index Futures Close - Nov 13
[Action: ADD] AN: DJDN000020241113ekbd005h7, Title: Press Release: Pershing Square Holdings, Ltd. Releases Regular Weekly Net Asset Value and Year-To-Date Return As Of 12 November 2024
[Action: ADD] AN: DJDN000020241113ekbd005ee, Title: Canadian Canola Board Crush Margin Close - Nov 13
[Action: ADD] AN: DJDN000020241113ekbd005jl, Title: Former Pentagon Official on UFOs: 'We Are Not Alone' -- WSJ
[Action: ADD] AN: DJDN000020241113ekbd005ef, Title: *S&PGR Upgrades Pactiv Evergreen To 'BB-'; Outlook Positive
[Action: ADD] AN: DJDN000020241113ekbd005jm, Title: New York City Votes to End One of Renters' Biggest Complaints: the Broker Fee -- Update
[Action: ADD] AN: DJDN000020241113ekbd005eg, Title: Press Release: SBC Medical Group Holdings Inc. Reports Third Quarter 202

## Next Steps

* Create a [Snapshot Extraction](1.6_snapshot_extraction.ipynb)
* Check out [Account Statistics](1.1_account_statistics.ipynb)