In [1]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import boto3
from io import StringIO, BytesIO
import os
from dotenv import load_dotenv
from etl.polygon.source_polygon import SourcePolygonConnector

In [2]:
'''load_dotenv('protected.env')
POLYGON_API_KEY = os.environ.get('POLYGON_API_KEY')'''
POLYGON_API_KEY = 'BTZo73LLIZlgL0yOWLQbvGlQOoMpW3Un'

In [3]:
Polygon = SourcePolygonConnector(POLYGON_API_KEY)

In [5]:
stocks = Polygon.get_stocks('2024-05-01', '2024-05-03', ['AAPL', 'TSLA'])

MaxRetryError: HTTPSConnectionPool(host='api.polygon.io', port=443): Max retries exceeded with url: /v2/aggs/ticker/AAPL/range/4/hour/2024-05-01/2024-05-03?limit=50000 (Caused by ResponseError('too many 429 error responses'))

In [None]:
stocks

## Getting/Updating/Exporting Meta File

In [2]:
def return_date_list(bucket, arg_date, src_format, meta_key):
    min_date = datetime.strptime(arg_date, src_format).date() - timedelta(days=1)
    today = datetime.today().date()
    try:
        df_meta = read_csv_to_df(bucket, meta_key)
        dates = [(min_date + timedelta(days=x)) for x in range(0, (today-min_date).days + 1)]
        src_dates = set(pd.to_datetime(df_meta['source_date']).dt.date)
        dates_missing = set(dates[1:]) - src_dates
        if dates_missing:
            min_date = min(set(dates[1:]) - src_dates) - timedelta(days=1)
            return_dates = [date.strftime(src_format) for date in dates if date >= min_date]
            return_min_date = arg_date
        else:
            return_dates = []
            return_min_date = datetime(2200, 1, 1).date()
    except bucket.session.client('s3').execptions.NoSuchKey:
        return_dates = [(min_date + timedelta(days=x)).strftime(src_format) for x in range(0, (today-min_date).days + 1)]
        return_min_date = (min_date + timedelta(days=1)).strftime(src_format)
    return return_min_date, return_dates

In [3]:
def read_csv_to_df(bucket, key, decoding = 'utf-8', sep = ','):
    csv_obj = bucket.Object(key=key).get().get('Body').read().decode(decoding)
    data = StringIO(csv_obj)
    df = pd.read_csv(data, delimiter=sep)
    return df

In [4]:
def df_to_s3_csv(df, bucket, key):
    out_buffer = StringIO()
    df.to_csv(out_buffer, index=False)
    bucket.put_object(Body=out_buffer.getvalue(), Key=key)
    return True

In [5]:
def update_meta_file(bucket, meta_key, date_list):
    df_new = pd.DataFrame(columns=['source_date', 'datetime_of_processing'])
    df_new['source_date'] = date_list
    df_new['datetime_of_processing'] = datetime.today().strftime('%Y-%m-%d %H:%M:%S')
    df_old = read_csv_to_df(bucket, meta_key)
    df_all = pd.concat([df_old, df_new])
    df_to_s3_csv(df_all, bucket, meta_key)
    return True

## Extract

In [6]:
def get_stocks(tickers, start_date, end_date, client, timespan = 'hour'):
    stock_objects = {}
    for stock in tickers:
        stock_objects[stock] = client.list_aggs(ticker=stock, multiplier=4, timespan=timespan,
                                                        from_=start_date, to=end_date, limit=50000)
    return stock_objects

In [None]:
stock = get_stocks('*', '2024-05-01', '2024-05-03', )

In [7]:
def dict_to_df(dict):
    dataframes = []
    for ticker, data in dict.items():
        df = pd.DataFrame(data).assign(ticker=ticker)
        dataframes.append(df)
    return pd.concat(dataframes, ignore_index=True)

In [8]:
def extract(tickers, client, date_list):
    if date_list:
        dictStocks = get_stocks(tickers, date_list[0], date_list[-1], client)
        df = dict_to_df(dictStocks)
        return df

## Transform

In [9]:
def timestamp_to_datetime(df):
    if 'timestamp' in df.columns:
        df['date_time'] = pd.to_datetime(df['timestamp'], unit='ms')
        return df
    else: 
        print('No timestamp column')

In [10]:
def drop_columns(df, columns):
    col = columns
    return df[col]

In [11]:
def clean_df(df):
    df = df.dropna()
    df = df.drop_duplicates()
    df['date_time'] = pd.to_datetime(df['date_time'])
    df = df.astype({'open': 'float', 'close': 'float', 'high': 'float', 'low': 'float', 'transactions': 'int'})
    return df

In [12]:
def add_col_returns(df):
    df['periodic_return'] = df.groupby('ticker')['close'].pct_change()*100
    df.replace([np.inf, -np.inf], np.nan, inplace=True)
    df.dropna(subset=['periodic_return'], inplace=True)
    return df

In [13]:
def make_weekly_aggregated(df):
    df.set_index('date_time', inplace=True)
    weekly_aggregated = df.groupby('ticker').resample('W-MON').agg({
        'open': 'first',
        'close': 'last',
        'high': 'max',
        'low': 'min',
        'transactions': 'sum',
        'periodic_return': 'std'  # This calculates the standard deviation of the 4-hourly returns for weekly volatility
    }).rename(columns={'periodic_return': 'pct_volatility'}).reset_index()
    return weekly_aggregated

In [14]:
def add_col_change(weekly_aggregated):
    weekly_aggregated['weekly_pct_change'] = weekly_aggregated.groupby('ticker')['close'].pct_change()*100
    weekly_aggregated['weekly_pct_change'].fillna(0, inplace=True)
    return weekly_aggregated

In [15]:
def transform(df, columns):
    dt_cng_df = timestamp_to_datetime(df)
    dp_col_df = drop_columns(dt_cng_df, columns)
    cleaned_df = clean_df(dp_col_df)
    rtn_add_df = add_col_returns(cleaned_df)
    weekly_aggregated = make_weekly_aggregated(rtn_add_df)
    return weekly_aggregated
    

## Load

In [16]:
def df_to_s3(weekly_aggregated, bucket_target, key):
    out_buffer = BytesIO()
    weekly_aggregated.to_parquet(out_buffer, index=False)
    bucket_target.put_object(Body=out_buffer.getvalue(), Key=key)
    return True

In [17]:
def load(df, bucket, trg_key, trg_format, meta_key, date_list, src_format):
    key = trg_key + datetime.today().strftime(src_format) + trg_format
    df_to_s3(df, bucket, key)
    update_meta_file(bucket, meta_key, date_list)

In [18]:
def run_etl(tickers, client, trg_bucket, date_list, columns, arg_date, trg_key, src_format, trg_format, meta_key):
    raw_df = extract(tickers, client, date_list)
    if raw_df:
        cleaned_df = transform(raw_df, columns)
        extract_date_list = [date for date in date_list if date >= arg_date]
        load(cleaned_df, trg_bucket, trg_key, trg_format, meta_key, extract_date_list, src_format)

In [19]:
def main():
    tickers = ['AAPL', 'TSLA']
    client = RESTClient(POLYGON_API_KEY)
    arg_date = '2024-04-14'
    src_format = "%Y-%m-%d"
    trg_format = '.parquet'
    trg_key = 'polygon_weekly_report_'
    columns = ['ticker', 'open', 'close', 'low', 'high', 'transactions', 'date_time']
    
    
    # Init
    s3 = boto3.resource('s3')
    meta_key = 'meta_file.csv'
    bucket_target = s3.Bucket('adaptivesharks-test-etl-target')
    
    # Run Application
    extract_date, date_list = return_date_list(bucket_target, arg_date, src_format, meta_key)
    run_etl(tickers, client, bucket_target, date_list, columns, extract_date, trg_key, src_format, trg_format, meta_key)
    

In [20]:
# Run
main()

ValueError: The truth value of a DataFrame is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().