In [1]:
import sys
sys.path.append('../Storage')

import pgConn
import PostgresSQL_table_queries
import CloudStorage

import pandas as pd
from datetime import datetime

In [2]:
pg_conn = pgConn.PgConn("historical")
df = pg_conn.get_stocks_prices()

Connection to the database successful!
Table name set to: historical


In [3]:
df.head()

Unnamed: 0,ref,book,date,open,high,low,close,adj_close,volume
0,https://finance.yahoo.com,tusd-btc,2023-08-09,3.4e-05,3.4e-05,3.4e-05,3.4e-05,3.4e-05,102207
1,https://finance.yahoo.com,tusd-btc,2023-08-08,3.4e-05,3.4e-05,3.4e-05,3.4e-05,3.4e-05,74850
2,https://finance.yahoo.com,tusd-btc,2023-08-07,3.4e-05,3.4e-05,3.4e-05,3.4e-05,3.4e-05,34264
3,https://finance.yahoo.com,tusd-btc,2023-08-06,3.4e-05,3.4e-05,3.4e-05,3.4e-05,3.4e-05,29283
4,https://finance.yahoo.com,tusd-btc,2023-08-05,3.4e-05,3.4e-05,3.4e-05,3.4e-05,3.4e-05,78352


In [4]:
df.count()

ref          81263
book         81263
date         81263
open         81263
high         81263
low          81263
close        81263
adj_close    81263
volume       81263
dtype: int64

In [5]:
# Assuming df is your DataFrame
date_column_type = df['date'].dtype
print("Type of values in 'date' column:", date_column_type)

Type of values in 'date' column: object


In [6]:
class DataETL():
    
    def __init__(self, dataframe):
            self.df = dataframe
    
    class Export():
        def __init__(self, dataframe):
            self.cloudProvider = CloudStorage.CloudStorageProvider()
            self.df = dataframe
            
        def export_stocks_to_s3(self, bucket_name, prefix_path):
            # Initialize AWS storage
            aws_storage = self.cloudProvider.AWS()

            # Create a new bucket
            aws_storage.create_bucket(bucket_name)

            # Upload DataFrame with datetime subfolder structure
            aws_storage.upload_dataframe_with_timestamp(self.df, bucket_name, prefix_path)
            
        def export_stocks_to_s3_full_file(self, bucket_name, prefix_path, filename):
            # Initialize AWS storage
            aws_storage = self.cloudProvider.AWS()
            aws_storage.upload_dataframe_to_csv(self.df, bucket_name, filename, prefix_path)
            
    class Ingestion():
        
        def __init__(self, dataframe):
            self.df = dataframe
            self.cloudProvider = CloudStorage.CloudStorageProvider()
            
        def get_full_data_csv_file(self, bucket_name, prefix_path):
            # Initialize AWS storage
            aws_storage = self.cloudProvider.AWS()
            return aws_storage.get_csv_from_specific_folder(bucket_name, prefix_path)
    
    class Process():
        
        def __init__(self, dataframe):
            self.df = dataframe
        
        def getData():
            self.df = pg_conn.get_financial_news()
        
        def filter_by_current_date(self):
            # Get the current date as a string
            today_date_str = datetime.today().strftime('%Y-%m-%d')

            # Extract the year, month, and day from the current date string
            current_year, current_month, current_day = today_date_str.split('-')

            # Filter the DataFrame by comparing the substrings of the datetime column
            filtered_df = self.df[self.df['date'].str.startswith(f'{current_year}-{current_month}-{current_day}')]

            return filtered_df
        
        def filter_by_date_range(self, df, start_date, end_date):
            # Convert start_date and end_date strings to datetime objects
            start_date = pd.to_datetime(start_date)
            end_date = pd.to_datetime(end_date)
            
            df['date'] = pd.to_datetime(df['date'])
            # Filter by date range
            filtered_df = df[(df['date'] >= start_date) & (df['date'] <= end_date)]
            return filtered_df
        
        def filter_by_book(self, df, book=None):
            # Filter by book if specified
            if book is not None:
                filtered_df = df[df['book'] == book]
            return filtered_df
        
        def get_unique_by_date(self, df):
            # Sort the DataFrame by 'date' in descending order
            df_sorted = df.sort_values(by='date', ascending=False)

            # Drop duplicates, keeping only the first occurrence for each unique combination of book and date
            df_unique_latest = df_sorted.drop_duplicates(subset=['book', 'date'])
            df_unique_latest.count()
            return df_unique_latest
            
    class Transform():
        def extractStopWords():
            pass

In [7]:
FILTER_BY_CURRENT_DATE = False
FILTER_BY_RANGE_DATE = True
FILTER_BY_BOOK_DATE = False
# TODO 2019-2017
etl = DataETL(df)
etl_process = etl.Process(etl.df)
uniqued_df = etl_process.get_unique_by_date(etl_process.df)
processed_df = uniqued_df

if FILTER_BY_CURRENT_DATE == True:
    processed_df = etl_process.filter_by_current_date(uniqued_df)
    processed_df.head()
elif FILTER_BY_RANGE_DATE == True:
    start_date='2024-05-03'
    end_date='2024-05-04'
    processed_df = etl_process.filter_by_date_range(uniqued_df, start_date, end_date)
    processed_df.head()
elif FILTER_BY_BOOK_DATE == True:
    processed_df = etl_process.filter_by_current_date(uniqued_df)
    processed_df.head()

In [8]:
etl_export = etl.Export(processed_df)
bucket_name = "test-financial-stocks-bucket"
prefix_path = "stocks/crypto"
post_full_csv = False
now = datetime.now()
filename = f"{now.year}-{now.month:02}-{now.day:02}_full_record"
etl_export.export_stocks_to_s3(bucket_name, prefix_path)
if post_full_csv == True:
    etl_export.export_stocks_to_s3_full_file(bucket_name, prefix_path, filename)

Bucket 'test-financial-stocks-bucket' created successfully.
Data uploaded to S3 bucket 'test-financial-stocks-bucket' under folder 'stocks/crypto/atom-usd/2024/05/04/20240504-atom-usd.csv'
Data uploaded to S3 bucket 'test-financial-stocks-bucket' under folder 'stocks/crypto/trx-usd/2024/05/04/20240504-trx-usd.csv'
Data uploaded to S3 bucket 'test-financial-stocks-bucket' under folder 'stocks/crypto/ldo-usd/2024/05/04/20240504-ldo-usd.csv'
Data uploaded to S3 bucket 'test-financial-stocks-bucket' under folder 'stocks/crypto/xlm-usd/2024/05/04/20240504-xlm-usd.csv'
Data uploaded to S3 bucket 'test-financial-stocks-bucket' under folder 'stocks/crypto/paxg-usd/2024/05/04/20240504-paxg-usd.csv'
Data uploaded to S3 bucket 'test-financial-stocks-bucket' under folder 'stocks/crypto/avax-usd/2024/05/04/20240504-avax-usd.csv'
Data uploaded to S3 bucket 'test-financial-stocks-bucket' under folder 'stocks/crypto/btc-usd/2024/05/04/20240504-btc-usd.csv'
Data uploaded to S3 bucket 'test-financial-st

Data uploaded to S3 bucket 'test-financial-stocks-bucket' under folder 'stocks/crypto/tigres-usd/2024/05/03/20240503-tigres-usd.csv'
Data uploaded to S3 bucket 'test-financial-stocks-bucket' under folder 'stocks/crypto/ape-usd/2024/05/03/20240503-ape-usd.csv'
Data uploaded to S3 bucket 'test-financial-stocks-bucket' under folder 'stocks/crypto/matic-usd/2024/05/03/20240503-matic-usd.csv'
Data uploaded to S3 bucket 'test-financial-stocks-bucket' under folder 'stocks/crypto/snx-usd/2024/05/03/20240503-snx-usd.csv'
Data uploaded to S3 bucket 'test-financial-stocks-bucket' under folder 'stocks/crypto/axs-usd/2024/05/03/20240503-axs-usd.csv'
Data uploaded to S3 bucket 'test-financial-stocks-bucket' under folder 'stocks/crypto/yfi-usd/2024/05/03/20240503-yfi-usd.csv'
Data uploaded to S3 bucket 'test-financial-stocks-bucket' under folder 'stocks/crypto/sushi-usd/2024/05/03/20240503-sushi-usd.csv'
Data uploaded to S3 bucket 'test-financial-stocks-bucket' under folder 'stocks/crypto/dydx-usd/20

In [9]:
ingest_data = True
get_full_file = True
df_from_file = None

if ingest_data == True:
    etl_ingestion = etl.Ingestion(etl.df)
    bucket_name = "test-financial-stocks-bucket"
    prefix_path = "stocks/crypto/"
    year = ''
    mont = ''
    day = ''
    hour = ''
    minute = ''
    if get_full_file == True:
        df_from_file = etl_ingestion.get_full_data_csv_file(bucket_name, prefix_path)

In [13]:
if df_from_file is not None:
    df_from_file.head()