In [1]:
# if you run into an error you may need to run this twice
!pip install boto3
!pip install aws-kinesis-agg
!pip install s3fs

Collecting boto3
  Downloading boto3-1.34.144-py3-none-any.whl.metadata (6.6 kB)
Collecting botocore<1.35.0,>=1.34.144 (from boto3)
  Downloading botocore-1.34.144-py3-none-any.whl.metadata (5.7 kB)
Collecting jmespath<2.0.0,>=0.7.1 (from boto3)
  Downloading jmespath-1.0.1-py3-none-any.whl.metadata (7.6 kB)
Collecting s3transfer<0.11.0,>=0.10.0 (from boto3)
  Downloading s3transfer-0.10.2-py3-none-any.whl.metadata (1.7 kB)
Downloading boto3-1.34.144-py3-none-any.whl (139 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0mta [36m0:00:01[0m
[?25hDownloading botocore-1.34.144-py3-none-any.whl (12.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.4/12.4 MB[0m [31m30.9 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hDownloading jmespath-1.0.1-py3-none-any.whl (20 kB)
Downloading s3transfer-0.10.2-py3-none-any.whl (82 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m82.7/

**Make sure to create an `aws.cfg` file and upload it here**

aws.cfg example:

```
[AWS]
aws_access_key_id = youraccesskey
aws_secret_access_key = yoursecretkey
region_name=us-east-1
```
Make sure to change the values to match yours.

In [2]:
import boto3
import json
import time
import pandas as pd
import configparser

# Read AWS credentials from config file
config = configparser.ConfigParser()
config.read('aws.cfg')

aws_access_key_id = config['AWS']['aws_access_key_id']
aws_secret_access_key = config['AWS']['aws_secret_access_key']
region_name = config['AWS']['region_name']




In [3]:
# Initialize the boto3 client with credentials from config file
kinesis_client = boto3.client(
    'kinesis',
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    region_name=region_name
)


In [4]:
def produce(stream_name, data, partition_key):
    try:
        # Convert timestamps to strings
        for key in data:
            if isinstance(data[key], pd.Timestamp):
                data[key] = data[key].isoformat()
                # using the put_record method to push the stream
        response = kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey=partition_key
        )
        return response
    except Exception as e:
        print(f"Error producing record: {e}")

# this function takes 5 records at a time and streams every 2-seconds
def stream_data(df, stream_name):
    for i in range(0, len(df), 5):
        records = df.iloc[i:i+5].to_dict(orient='records')
        for record in records:
            partition_key = str(record['tpep_pickup_datetime'])  # Use a valid column as the partition key

            # call the produce function
            produce(stream_name, record, partition_key)
        print(f"Sent {len(records)} records to Kinesis")
        time.sleep(2)


In [6]:
!pip install pyarrow

Collecting pyarrow
  Downloading pyarrow-17.0.0-cp310-cp310-manylinux_2_28_x86_64.whl.metadata (3.3 kB)
Downloading pyarrow-17.0.0-cp310-cp310-manylinux_2_28_x86_64.whl (39.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m39.9/39.9 MB[0m [31m28.9 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25hInstalling collected packages: pyarrow
Successfully installed pyarrow-17.0.0


In [7]:
if __name__ == "__main__":
    stream_name = 'input-stream'
    # Reading the yellow_taxis parquet file and stream it
    df = pd.read_parquet('s3://techcatalyst-public/yellow_tripdata_2024-01.parquet',
                     storage_options={
                      'key': aws_access_key_id,
                     'secret': aws_secret_access_key,
                     })
    stream_data(df, stream_name)

Sent 5 records to Kinesis
Sent 5 records to Kinesis
Sent 5 records to Kinesis
Sent 5 records to Kinesis
Sent 5 records to Kinesis
Sent 5 records to Kinesis
Sent 5 records to Kinesis
Sent 5 records to Kinesis
Sent 5 records to Kinesis
Sent 5 records to Kinesis
Sent 5 records to Kinesis


KeyboardInterrupt: 