In [1]:
from pyflink.datastream import StreamExecutionEnvironment
import requests
from pyflink.table import StreamTableEnvironment, TableDescriptor, Schema, DataTypes
import json

In [2]:
from my_local_secrets import *

In [3]:
symbol = "IBM"

## Defining the API query params and function

In [4]:
def fetch_data(api_key, symbol):
    url = f"https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol={symbol}&interval=5min&apikey={api_key}"
    response = requests.get(url)
    data = json.loads(response.text)
    return data

## Preparing the pyFlink environment and setting the source for our collection of data

In [5]:
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)

ds = env.from_collection([fetch_data(api_key, symbol)])


## Processing the json object and filter it

In [7]:
import itertools
## Keep only records with volume higher than 100k
def update_tel(data):
    all_values=[]
    for elt in data['Time Series (5min)']:
        all_values.append(list(itertools.chain(*[[elt], list(data['Time Series (5min)'][elt].values())])))
    return [item for item in all_values if int(item[5]) > 100000]
    
ds = ds.map(update_tel)

In [8]:
ds.print()

<pyflink.datastream.data_stream.DataStreamSink at 0x1180742e0>

### Prepare the DB part: create the table to use for data insertion

In [12]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
## Astra Configuration
cloud_config= {
  'secure_connect_bundle': secure_bundle_path
}
auth_provider = PlainTextAuthProvider(client_id, client_secret)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

session.execute(f'USE {db_keyspace}')

<cassandra.cluster.ResultSet at 0x16a1db400>

In [13]:
query = """
CREATE TABLE if not exists market_stock_data (
    date text,
    open float,
    high float,
    low float,
    close float,
    volume float,
    PRIMARY KEY (date)
)
"""
session.execute(query)

<cassandra.cluster.ResultSet at 0x16a0073a0>

## Use the AstraDB RestAPI for data insertion via pyFlink

In [26]:
def send_to_rest_api(data):
    url = f'https://{astra_id}-{astra_region}.apps.astra.datastax.com/api/rest/v2/keyspaces/{db_keyspace}/market_stock_data'
    print(url)
    headers = {'content-type': 'application/json', 'x-cassandra-token': token}
    for row in data: 
        response = requests.post(url, headers=headers, json=json.loads('{"date": "' + str(row[0]) + '", "open": "' + row[1] + '", "high": "' + row[2] + '", "low": "' + row[3] +  '", "close": "' + row[4] + '", "volume": "' + row[5] + '"}'), verify=False)  
        if response.status_code == 200 or response.status_code == 201:
            print("Data sent successfully to REST API")
        else:
            print(f"Failed to send data to REST API. Error: {response.text}")

# Create a DataStream from the collection
ds.map(lambda x: send_to_rest_api(x))

ds.print()

<pyflink.datastream.data_stream.DataStreamSink at 0x10a74d870>

## Execute our pyFlink graph

In [27]:
env.execute()

https://d5103402-d2c2-49ee-8406-0f7edcf01fed-us-east1.apps.astra.datastax.com/api/rest/v2/keyspaces/tf_keyspace/market_stock_data




Data sent successfully to REST API




Data sent successfully to REST API




Data sent successfully to REST API




Data sent successfully to REST API




Data sent successfully to REST API




Data sent successfully to REST API
4> [['2023-07-10 19:00:00', '132.9000', '132.9000', '132.7500', '132.7500', '293396'], ['2023-07-10 18:30:00', '132.9000', '132.9000', '132.9000', '132.9000', '293411'], ['2023-07-10 16:10:00', '132.9000', '132.9440', '132.9000', '132.9440', '293395'], ['2023-07-10 16:00:00', '132.9300', '133.0200', '132.6300', '133.0000', '632125'], ['2023-07-10 15:55:00', '132.8000', '132.9400', '132.7300', '132.9400', '227333'], ['2023-07-10 15:50:00', '132.6900', '132.8200', '132.6050', '132.8000', '113843']]


<pyflink.common.job_execution_result.JobExecutionResult at 0x1080e5db0>

### Check the data in the AstraDB table

In [28]:
import pandas as pd
pd.DataFrame(session.execute("select * from tf_keyspace.market_stock_data;"))

Unnamed: 0,date,close,high,low,open,volume
0,2023-07-10 16:00:00,133.0,133.020004,132.630005,132.929993,632125.0
1,2023-07-10 18:30:00,132.899994,132.899994,132.899994,132.899994,293411.0
2,2023-07-10 16:10:00,132.944,132.944,132.899994,132.899994,293395.0
3,2023-07-06 19:00:00,132.100006,132.160004,132.100006,132.160004,606639.0
4,2023-07-10 15:50:00,132.800003,132.820007,132.604996,132.690002,113843.0
5,2023-07-10 15:55:00,132.940002,132.940002,132.729996,132.800003,227333.0
6,2023-07-10 19:00:00,132.75,132.899994,132.75,132.899994,293396.0
