In [135]:
import boto3
from botocore.exceptions import NoCredentialsError
import s3fs
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from io import StringIO, BytesIO
from datetime import datetime, timedelta
from botocore.client import Config

In [226]:
url = "s3://desiquant/data/candles/NIFTY50/EQ.parquet.gz"

s3_param = {
"endpoint_url": "https://cbabd13f6c54798a9ec05df5b8070a6e.r2.cloudflarestorage.com",
"key": "5c8ea9c516abfc78987bc98c70d2868a", 
"secret": "0cf64f9f0b64f6008cf5efe1529c6772daa7d7d0822f5db42a7c6a1e41b3cadf", 
"client_kwargs": {
    "region_name": "auto"
    },
}

In [301]:
region = s3_params["client_kwargs"]
endpoint_url= s3_params["endpoint_url"]
aws_access_key_id= s3_params["key"]
aws_secret_access_key= s3_params["secret"]
key = 'etl_nifty_report_' + datetime.today().strftime("%Y%m%d_%H%M%S") + '.parquet'
src_bucket ='desiquant'
trg_bucket = 'etl-nifty50'
src_format = '%Y%m%d'
arg_date = '2023-01-10'
trg_key = 'etl_nifty_report_'
trg_format = '.parquet'

## Adapter Layer

In [282]:
def read_parq_to_df(bucket_name, key, s3_params):
    
     # Create an S3 client
    s3 = boto3.client('s3', **s3_params["client_kwargs"], endpoint_url=s3_params["endpoint_url"],
                      aws_access_key_id=s3_params["key"], aws_secret_access_key=s3_params["secret"])
 
    # List objects in the bucket with a specific prefix
    response = s3.list_objects_v2(Bucket=bucket_name, Prefix=key)

    # Extract object keys from the response
    object_keys = [obj['Key'] for obj in response.get('Contents', [])]
    
    response = s3.get_object(Bucket=bucket_name, Key=key)
    data_bytes = response['Body'].read()

    table = pq.read_table(BytesIO(data_bytes))
    df = table.to_pandas()
    return df

def write_df_to_s3(df, bucket_name: str,key: str):
    
    sts_client = boto3.client('sts')
    
    assumed_role = sts_client.assume_role(
    RoleArn='arn:aws:iam::211125758361:role/ETL_S3',
    RoleSessionName='SESSION_NAME')

    credentials = assumed_role['Credentials']

    s3 = boto3.client('s3',
    aws_access_key_id=credentials['AccessKeyId'],
    aws_secret_access_key=credentials['SecretAccessKey'],
    aws_session_token=credentials['SessionToken'],
    region_name='ap-south-1')

    response = s3.list_buckets()
    
    # Convert DataFrame to PyArrow Table
    table = pa.Table.from_pandas(df)

    # Write Parquet file to an in-memory buffer
    buffer = BytesIO()
    pq.write_table(table, buffer)

    # Upload the buffer to S3
    s3 = boto3.client('s3')
    buffer.seek(0)  # Reset the buffer position to the beginning
    s3.upload_fileobj(buffer, bucket_name, key)

    #Print to confirm
    print(f"Data written to S3 bucket: {bucket_name}/{key}")

def read_from_s3(bucket_name: str, key: str):
    sts_client = boto3.client('sts')
    
    assumed_role = sts_client.assume_role(
    RoleArn='arn:aws:iam::211125758361:role/ETL_S3',
    RoleSessionName='SESSION_NAME')

    credentials = assumed_role['Credentials']

    s3 = boto3.client('s3',
    aws_access_key_id=credentials['AccessKeyId'],
    aws_secret_access_key=credentials['SecretAccessKey'],
    aws_session_token=credentials['SessionToken'],
    region_name='ap-south-1')
    
    # Download the Parquet file from S3 into an in-memory buffer
    buffer = BytesIO()
    s3.download_fileobj(bucket_name, key, buffer)

    # Read the Parquet file from the buffer
    buffer.seek(0)  # Reset the buffer position to the beginning
    table = pq.read_table(buffer)

    # Convert PyArrow Table to Pandas DataFrame
    df = table.to_pandas()

    # Now, 'df' contains the data from the Parquet file
    return df

def return_objects(s3_url, s3_params, arg_date):
    arg_date = datetime.strptime(arg_date, '%Y-%m-%d')
    
   # Parse the S3 URL to get bucket name and prefix
    s3_url_parts = s3_url.split("/")
    bucket_name = s3_url_parts[2]
    prefix = "/".join(s3_url_parts[3:])

    # Initialize S3 client
    s3 = boto3.client(
        's3',
        aws_access_key_id=s3_params["key"],
        aws_secret_access_key=s3_params["secret"],
        endpoint_url=s3_params["endpoint_url"],
        region_name=s3_params["client_kwargs"]["region_name"]
    )

    try:
        # List objects in the bucket
        response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

        # Extract object keys from the response
        object_keys = [obj['Key'] for obj in response.get('Contents', []) if datetime.strptime(obj['Key'].split('/')[3], '%Y-%m-%d') >= arg_date]

        return object_keys

    except NoCredentialsError:
        print("Credentials not available or not valid.")
        return []

## Application Layer

In [304]:
def extract(src_bucket, object_keys):
    df = pd.concat([read_parq_to_df(src_bucket, key = obj, s3_params=s3_params) for obj in object_keys], ignore_index=True)
    return df

def transform(df, arg_date):
    df = df[df['date'] >= arg_date]
    df.loc[:, 'date'] = df['date'].dt.date
    aggregations = {
    'open': 'mean',
    'high': 'max',
    'low': 'min',
    'close': 'mean',
    'volume': 'sum'
    }
    result_df = df.groupby('date').agg(aggregations)
    result_df.drop('volume',axis=1,inplace=True)
    result_df['prev_close'] = result_df['close'].shift(1)
    result_df['change_prev_closing_%'] = ((result_df['close'] - result_df['prev_close'])/result_df['prev_close'])*100
    return result_df

def load(trg_bucket, df, trg_key, trg_format):
    key = trg_key + datetime.today().strftime("%Y%m%d_%H%M%S") + trg_format
    write_df_to_s3(df,bucket_name, key)
    return True

def etl_report(src_bucket, trg_bucket, object_keys, arg_date, trg_key, trg_format):
    df = extract(src_bucket, object_keys)
    df = transform(df, arg_date)
    load(trg_bucket, df, trg_key, trg_format)
    return True

In [None]:
etl_report(src_bucket, trg_bucket, object_keys, arg_date, trg_key, trg_format)

In [308]:
response = s3.list_objects_v2(Bucket=src_bucket, Prefix= 'data/candles/')

# Extract object keys from the response
object_keys = [obj['Key'] for obj in response.get('Contents', [])]

AttributeError: 's3.ServiceResource' object has no attribute 'list_objects_v2'

In [None]:
ob = return_objects(s3_url, s3_params, arg_date)

In [287]:
df = extract(bucket_name = 'desiquant',object_keys = object_keys)
df

Unnamed: 0,date,open,high,low,close,volume
0,2021-08-26 09:15:00,42.9,42.9,42.9,42.9,0
1,2021-08-26 09:16:00,42.9,42.9,42.9,42.9,0
2,2021-08-26 09:17:00,42.9,42.9,42.9,42.9,0
3,2021-08-26 09:18:00,42.9,42.9,42.9,42.9,0
4,2021-08-26 09:19:00,42.9,42.9,42.9,42.9,0
...,...,...,...,...,...,...
329491,2021-10-26 09:15:00,340.5,340.5,340.5,340.5,0
329492,2021-10-27 09:15:00,340.5,340.5,340.5,340.5,0
329493,2021-10-28 09:15:00,340.5,340.5,340.5,340.5,0
329494,2021-10-28 09:16:00,0.0,0.0,340.5,340.5,0


In [296]:
df_1 = df[0:20]
df_1

Unnamed: 0,date,open,high,low,close,volume
0,2021-08-26 09:15:00,42.9,42.9,42.9,42.9,0
1,2021-08-26 09:16:00,42.9,42.9,42.9,42.9,0
2,2021-08-26 09:17:00,42.9,42.9,42.9,42.9,0
3,2021-08-26 09:18:00,42.9,42.9,42.9,42.9,0
4,2021-08-26 09:19:00,42.9,42.9,42.9,42.9,0
5,2021-08-26 09:20:00,42.9,42.9,42.9,42.9,0
6,2021-08-26 09:21:00,42.9,42.9,42.9,42.9,0
7,2021-08-26 09:22:00,42.9,42.9,42.9,42.9,0
8,2021-08-26 09:23:00,42.9,42.9,42.9,42.9,0
9,2021-08-26 09:24:00,42.9,42.9,42.9,42.9,0


In [300]:
df_1 = transform(df,arg_date='2021-09-02')
df_1

Unnamed: 0_level_0,open,high,low,close,prev_close,change_prev_closing_%
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2021-09-02,83.854064,438.8,21.7,85.030813,,
2021-09-03,41.118233,455.7,21.7,44.044267,85.030813,-48.201993
2021-09-06,216.161376,438.8,21.7,216.190079,44.044267,390.847267
2021-09-07,156.606205,455.7,21.7,164.157937,216.190079,-24.067775
2021-09-08,259.978312,455.7,21.7,262.648052,164.157937,59.997169
2021-09-09,41.592291,438.8,21.7,42.02793,262.648052,-83.998385
2021-09-13,179.478087,438.8,21.7,179.88676,42.02793,328.017184
2021-09-14,67.303945,455.7,10.35,69.25781,179.88676,-61.499218
2021-09-15,93.710627,455.7,21.7,96.414723,69.25781,39.211336
2021-09-16,99.801986,455.7,21.7,101.746188,96.414723,5.529721
