<a href="https://colab.research.google.com/github/ankit-rathi/Data-Engineering-with-AWS/blob/main/Try_Kinesis_Stream.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Install boto3 library
!pip install boto3

Collecting boto3
  Downloading boto3-1.35.40-py3-none-any.whl.metadata (6.7 kB)
Collecting botocore<1.36.0,>=1.35.40 (from boto3)
  Downloading botocore-1.35.40-py3-none-any.whl.metadata (5.7 kB)
Collecting jmespath<2.0.0,>=0.7.1 (from boto3)
  Downloading jmespath-1.0.1-py3-none-any.whl.metadata (7.6 kB)
Collecting s3transfer<0.11.0,>=0.10.0 (from boto3)
  Downloading s3transfer-0.10.3-py3-none-any.whl.metadata (1.7 kB)
Downloading boto3-1.35.40-py3-none-any.whl (139 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.1/139.1 kB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading botocore-1.35.40-py3-none-any.whl (12.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.6/12.6 MB[0m [31m25.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading jmespath-1.0.1-py3-none-any.whl (20 kB)
Downloading s3transfer-0.10.3-py3-none-any.whl (82 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m82.6/82.6 kB[0m [31m936.2 kB/s[0m eta [36m0

In [2]:
# Mount google drive
from google.colab import drive
drive.mount('/content/drive')
import os

import pandas as pd

project_path = '/content/drive/My Drive/Personal'
os.chdir(project_path)

Mounted at /content/drive


In [11]:
# Import required libraries
import pandas as pd
import os
import boto3
import json
import time
from botocore.exceptions import ClientError

# Load AWS credentials from CSV
aws_keys_df = pd.read_csv('aws-rootkey.csv')

AWS_ACCESS_KEY_ID = aws_keys_df['Access_key_ID'][0]
AWS_SECRET_ACCESS_KEY = aws_keys_df['Secret_access_key'][0]
REGION_NAME = aws_keys_df['Region'][0]

# Initialize boto3 clients
s3_client = boto3.client('s3', region_name=REGION_NAME,
                         aws_access_key_id=AWS_ACCESS_KEY_ID,
                         aws_secret_access_key=AWS_SECRET_ACCESS_KEY)

kinesis_client = boto3.client('kinesis', region_name=REGION_NAME,
                              aws_access_key_id=AWS_ACCESS_KEY_ID,
                              aws_secret_access_key=AWS_SECRET_ACCESS_KEY)

s3_resource = boto3.resource('s3', region_name=REGION_NAME,
                             aws_access_key_id=AWS_ACCESS_KEY_ID,
                             aws_secret_access_key=AWS_SECRET_ACCESS_KEY)


# Step 1: Create an S3 bucket for logging
def create_s3_bucket(bucket_name):
    try:
        s3_client.create_bucket(
            Bucket=bucket_name,
            CreateBucketConfiguration={'LocationConstraint': REGION_NAME}
        )
        print(f"S3 bucket '{bucket_name}' created successfully.")
    except ClientError as e:
        print(f"Error creating S3 bucket: {e}")

# Step 2: Create a Kinesis Data Stream
def create_kinesis_stream(stream_name, shard_count=1):
    try:
        kinesis_client.create_stream(
            StreamName=stream_name,
            ShardCount=shard_count
        )
        print(f"Kinesis stream '{stream_name}' created successfully.")

        # Wait for the stream to become active
        while True:
            response = kinesis_client.describe_stream(StreamName=stream_name)
            status = response['StreamDescription']['StreamStatus']
            if status == 'ACTIVE':
                print(f"Kinesis stream '{stream_name}' is active.")
                break
            time.sleep(2)
    except ClientError as e:
        print(f"Error creating Kinesis stream: {e}")

# Step 3: Push data to Kinesis stream
def put_records_to_kinesis(stream_name, records):
    for record in records:
        try:
            kinesis_client.put_record(
                StreamName=stream_name,
                Data=json.dumps(record),
                PartitionKey="partitionkey"
            )
            print(f"Record {record} pushed to Kinesis.")
        except ClientError as e:
            print(f"Error pushing record to Kinesis: {e}")

# Step 4: Log streaming data to S3
def log_kinesis_to_s3(stream_name, bucket_name, log_file):
    try:
        shard_iterator_response = kinesis_client.get_shard_iterator(
            StreamName=stream_name,
            ShardId='shardId-000000000000',  # Default shard ID
            ShardIteratorType='TRIM_HORIZON'
        )
        shard_iterator = shard_iterator_response['ShardIterator']

        # Get records from the stream
        response = kinesis_client.get_records(ShardIterator=shard_iterator, Limit=10)
        records = response['Records']

        if records:
            # Log records to S3 as a JSON file
            log_data = [json.loads(record['Data']) for record in records]
            s3_client.put_object(
                Bucket=bucket_name,
                Key=log_file,
                Body=json.dumps(log_data),
                ContentType='application/json'
            )
            print(f"Logged {len(records)} records to S3 bucket '{bucket_name}'.")
        else:
            print("No records to log.")
    except ClientError as e:
        print(f"Error logging Kinesis data to S3: {e}")

# Step 5: Cleanup - Delete Kinesis stream and S3 bucket
def cleanup_resources(stream_name, bucket_name):
    try:
        # Delete Kinesis stream
        kinesis_client.delete_stream(StreamName=stream_name, EnforceConsumerDeletion=True)
        print(f"Kinesis stream '{stream_name}' deleted successfully.")

        # Delete all objects in the S3 bucket
        bucket = s3_resource.Bucket(bucket_name)
        bucket.objects.all().delete()

        # Delete the S3 bucket
        s3_client.delete_bucket(Bucket=bucket_name)
        print(f"S3 bucket '{bucket_name}' deleted successfully.")
    except ClientError as e:
        print(f"Error cleaning up resources: {e}")

if __name__ == "__main__":
    # Define parameters
    bucket_name = 'kinesis-logging-bucket-' + str(int(time.time()))  # Make bucket name unique
    stream_name = 'my-kinesis-stream'
    log_file = 'kinesis-log.json'

    # Sample records to push to Kinesis
    records_to_push = [
        {"id": 1, "message": "Hello, this is record 1."},
        {"id": 2, "message": "Hello, this is record 2."},
        {"id": 3, "message": "Hello, this is record 3."}
    ]

    # Step 1: Create an S3 bucket
    create_s3_bucket(bucket_name)

    # Step 2: Create a Kinesis Data Stream
    create_kinesis_stream(stream_name)

    # Step 3: Push records to Kinesis
    put_records_to_kinesis(stream_name, records_to_push)

    # Step 4: Log Kinesis records to S3
    log_kinesis_to_s3(stream_name, bucket_name, log_file)

    # Step 5: Cleanup resources after testing
    cleanup_resources(stream_name, bucket_name)


S3 bucket 'kinesis-logging-bucket-1728974563' created successfully.
Error creating Kinesis stream: An error occurred (ResourceInUseException) when calling the CreateStream operation: Stream my-kinesis-stream under account 419441991443 already exists.
Record {'id': 1, 'message': 'Hello, this is record 1.'} pushed to Kinesis.
Record {'id': 2, 'message': 'Hello, this is record 2.'} pushed to Kinesis.
Record {'id': 3, 'message': 'Hello, this is record 3.'} pushed to Kinesis.
Logged 6 records to S3 bucket 'kinesis-logging-bucket-1728974563'.
Kinesis stream 'my-kinesis-stream' deleted successfully.
S3 bucket 'kinesis-logging-bucket-1728974563' deleted successfully.
