### Magical Synergy: The Unbreakable Bond between Data Engineer and ETL

As a Data Engineer, your job is to be a wizard who manages and organizes the flow of information within a company or organization. Imagine you have a big box full of different kinds of toys scattered all over the place. ETL is like a magic process that takes all those toys, organizes them neatly into different boxes based on their types, and puts labels on each box so you can easily find what you need later. You as the magician has that magical ability at your disposal. 

Hereis how the magic works:

1. **Extract**: You have the ability to gather data from different places like databases, spreadsheets, or even websites. It's like you can magically reach out and collect all the scattered pieces of information from different sources.

2. **Transform**: Now comes the exciting part! You use your magic skills to clean up and transform the data into a more useful and understandable format. It's like turning a bunch of jumbled letters into a clear and meaningful sentence. You can think of it as a clean up and make the future progress easier. 

3. **Load**: Once the data is all tidy and neat, you store it in a special place, like a data warehouse. It's like putting everything into labeled boxes, so anyone in the company can find what they need easily.

By doing all this, you make sure that everyone in the organization has access to accurate and well-organized data. This helps people make better decisions, understand trends, and find answers to important questions. You're like the hero behind the scenes, making sure the company's data is always ready to work its magic and help everyone succeed!

### Installing require libary


In [1]:
!pip install pyspark -q
!pip install yfinance -q
!pip install yahoo_fin -q

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("finance").master("local[*]").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/05 08:02:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
import functools
import time

# Color spectrum
COLOR_RED = '\033[91m'
COLOR_GREEN = '\033[92m'
COLOR_BLUE = '\033[94m'
COLOR_YELLOW = '\033[93m'
COLOR_MAGENTA = '\033[95m'
COLOR_CYAN = '\033[96m'
# Text styles
TEXT_BOLD = '\033[1m'
TEXT_UNDERLINE = '\033[4m'
# Reset all styles
COLOR_RESET = '\033[0m'

def time_func(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        time_taken = end_time - start_time
        print(f'Function {COLOR_MAGENTA + TEXT_BOLD}{func.__name__}{COLOR_RESET} elapsed time: {COLOR_CYAN + TEXT_BOLD}{time_taken * 1000:.3f}ms{COLOR_RESET}')
        print()
        return result 
    return wrapper
        
def log_function_call(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        arg_str = ', '.join([repr(arg) for arg in args] + [f"{k}={v!r}" for k, v in kwargs.items()])
        print(f"Calling {func.__name__}({arg_str})")
        result = func(*args, **kwargs)
        print(f"{func.__name__} returned {result!r}")
        return result
    return wrapper

# Example functions that use two different wrapper function styles
@time_func
def add(a, b):
    return a + b

@log_function_call
def greet(name):
    return f"Hello, {name}!"

# # Calling the functions
# add_result = add(5, 7)
# greet_message = greet("Alice")

IMPORTANT DISCLAIMER: Normally ```load()``` function would stored to data we have transformed into a dataware house for later use. However, since I don't have a database or dataware house to store my data, I will slightly change the loading process to storing the data in a format of a PySpark dataframe.

In [4]:
from datetime import datetime, timedelta

from pyspark.sql.window import Window 
import pyspark.sql.functions as F
import pyspark.sql.types as T

import yfinance as yf
import pandas as pd


class SchemaLibrary():
    def __init__(self):
        self.financial_schema = T.StructType([
            T.StructField("ticker", T.StringType(), nullable=False),
            T.StructField("date", T.DateType(), nullable=False),
            T.StructField("macd", T.DoubleType(), nullable=False),
            T.StructField("daily_rtn", T.DoubleType(), nullable=False),
            T.StructField("mavg_20_days", T.DoubleType(), nullable=False),
            T.StructField("vol_chg", T.DoubleType(), nullable=False),
            T.StructField("vol_shock", T.DoubleType(), nullable=False),
            T.StructField("sma", T.DoubleType(), nullable=False),
            T.StructField("rsi", T.DoubleType(), nullable=False),
            T.StructField("created_at", T.StringType(), nullable=False),
            T.StructField("dt", T.StringType(), nullable=False)
        ])
        
        self.log_schema = T.StructType([
            T.StructField("ticker", T.StringType(), nullable=False),
            T.StructField("dt", T.StringType(), nullable=True),
        ])


class FinanceUtility():
    def __init__(self):
        self.window_constraint = Window.partitionBy(F.col('ticker')).orderBy(F.asc(F.col('Date')))
    
    # Calculate EMA 
    def calculate_ema(
        self,
        prices, 
        days, 
        smoothing=2.0
    ):
        # Set the first EMA value equal to the first close price
        ema = [prices[0]]  
        for price in prices[1:]:
            ema.append((price * (smoothing / (1 + days))) + ema[-1] * (1 - (smoothing / (1 + days))))
        return ema
    
    # Calculate MACD =12-Period EMA − 26-Period EMA
    def calculate_macd(
        self,
        prices, 
        short_period=12, 
        long_period=26, 
        smoothing=2.0
    ):
        ema_short_period = self.calculate_ema(prices, short_period, smoothing)
        ema_long_period = self.calculate_ema(prices, long_period, smoothing)
        return [x-y for x,y in zip(ema_short_period, ema_long_period)]
    
    def calculate_daily_return(
        self,
        sdf
    ):
        data_sdf = sdf.withColumn('prev_close', F.lag(F.col('close'),1).over(self.window_constraint))
        data_sdf = data_sdf.withColumn('daily_rtn', (F.col('close') - F.col('prev_close')) / F.col('prev_close'))  

        data_sdf = data_sdf.drop(*['prev_close'])
        return data_sdf

    def calculate_moving_average(
        self,
        sdf, 
        window=20
    ):
        moving_avg_name = f"mavg_{window}_days"
        data_sdf = sdf.withColumn(moving_avg_name, F.avg(F.col('close')).over(self.window_constraint.rowsBetween(-(window-1),0)))
        return data_sdf

    def calculate_volume_change(
        self,
        sdf
    ):
        data_sdf = sdf.withColumn('prev_volume', F.lag(F.col('volume'),1).over(self.window_constraint))
        data_sdf = data_sdf.withColumn('vol_chg', (F.col('Volume') - F.col('prev_volume')) / F.col('prev_volume'))

        data_sdf = data_sdf.drop(*['prev_volume'])
        return data_sdf

    def calculate_volume_shock(
        self,
        sdf, 
        volume_threshold=10
    ):
        data_sdf = sdf.withColumn('vol_shock', F.when(F.col('vol_chg') > volume_threshold, 1).otherwise(0))
        return data_sdf

    def calculate_sma(
        self,
        sdf, 
        window=20
    ):
        window_constraint = self.window_constraint.rowsBetween(-window+1,0)
        return sdf.withColumn('sma', F.avg(F.col('close')).over(window_constraint))

    def calculate_rsi(
        self,
        sdf,
        window =14
    ):
        # Daily_Price_Change = Close(t) - Close(t-1)
        data_sdf = sdf.withColumn('daily_price_change', F.col('close') - F.lag(F.col('close'),1).over(self.window_constraint))

        # Gain(t) = max(0, Daily_Price_Change), Loss(t) = max(0, -Daily_Price_Change)
        data_sdf = data_sdf.withColumn(
            'gain', 
            F.when(F.col('daily_price_change') > 0, F.col('daily_price_change')).otherwise(0)
        )
        data_sdf = data_sdf.withColumn(
            "loss", 
            F.when(F.col("daily_price_change") < 0, F.abs(F.col("daily_price_change"))).otherwise(0)
        )

        # AG = (sum of Gain(t) for the last 'n' days) / 'n', AL = (sum of Loss(t) for the last 'n' days) / 'n'
        window_rsi_constraint = self.window_constraint.rowsBetween(-window+1,0)
        data_sdf = data_sdf.withColumn('avg_gain', F.avg("gain").over(window_rsi_constraint))
        data_sdf = data_sdf.withColumn('avg_loss', F.avg("loss").over(window_rsi_constraint))

        # RS = AG / AL
        data_sdf = data_sdf.withColumn('rs', F.col('avg_gain') / F.col('avg_loss')).fillna(0.0)

        # RSI = 100 - (100 / (1 + RS))
        data_sdf =  data_sdf.withColumn('rsi', 100 - (100 / (1 + F.col('rs'))))

        data_sdf = data_sdf.drop(*['daily_price_change','gain','loss','avg_gain','avg_loss','rs'])

        return data_sdf


class FinanceETL():
    def __init__(
        self, 
        ticker, 
        start_date, 
        end_date
    ):
        self.ticker = ticker
        self.start_date = start_date
        self.end_date = end_date
        
        self.utility = FinanceUtility()
        self.schema = SchemaLibrary()
        
    
    def extract(
        self
    ):
        '''
        Extract daily stock data from start to end date into pandas dataframe
        Data is downloaded from Yahoo Finance
        Return df and flag (1: empty dataframe, 0: otherwise)
        '''
        df = yf.download(
            self.ticker,
            start = self.start_date,
            end = self.end_date,
            interval = '1d',
            threads = True
        ).reset_index()
        
        return df, 1 if len(df) == 0 else 0
    
    def transform(
        self, 
        df
    ):
        """
        Transformation using Pandas
        1. Add Stock ticker (ex. AAPL, MSFT, etc.)
        2. Calculate MACD based on Close Price for each entry
        
        Transformation using PySpark
        1. Calculate Daily Return
        2. Calculate 20 Days Moving Average
        3. Calculate Volume Change
        4. Calculate Volume Shock based on Volume Change
        5. Calculate Simple Moving Average (SMA)
        6. Calculate RSI
        7. Add created_dt and dt for partition purposes
        """
        # Pandas Dataframe Transformation
        df['ticker'] = self.ticker
        df['macd'] = self.utility.calculate_macd(df['Close'])
        
        # Spark Dataframe Transformation
        sdf = spark.createDataFrame(df).select(['ticker','Date','Close','Volume','macd'])
        sdf = sdf.withColumnRenamed('Close','close').withColumnRenamed('Volume','volume')
        sdf = sdf.withColumnRenamed('Date','date').withColumn('date', F.to_date(F.col('date')))
        
        sdf = self.utility.calculate_daily_return(sdf)
        sdf = self.utility.calculate_moving_average(sdf)
        sdf = self.utility.calculate_volume_change(sdf)
        sdf = self.utility.calculate_volume_shock(sdf)
        sdf = self.utility.calculate_sma(sdf)
        sdf = self.utility.calculate_rsi(sdf)
        
        sdf = sdf.drop(*['close','volume'])
        
        sdf = sdf.withColumn('created_dt', F.lit(datetime.now()))
        sdf = sdf.withColumn('dt', F.lit(datetime.now().strftime("%Y-%m-%d")))
        
        return sdf
    
    def load(
        self,
        data_sdf,
        sdf
    ):
        """
        Update Stock information
        Add Partition to make sure that we update based on the most recent data (latest)
        """
        # Add another condition to check whether the ticker is already inside the pyspark dataframe
        # If it already is, then we have to use the Window function; otherwise, we can union them
        # However, if this process still takes too much memory time, then I have to revise the loading OR the overall code structure
        window_latest_info = Window.partitionBy(F.col('ticker'), F.col('date'), F.col('dt')).orderBy(F.desc(F.col('created_at')))
        updated_sdf = (
            data_sdf.unionAll(sdf)
            .withColumn('latest_ranking', F.row_number().over(window_latest_info))
            .filter(F.col('latest_ranking') == 1)
        ).drop('latest_ranking')
        
        return updated_sdf
    
    def etl_log_load(
        self,
        log_sdf,
        ticker,
        dt
    ):
        """
        Create a dataframe that log all the etl command execution for both successful and unsucessful attempt
        """
        sdf = spark.createDataFrame([(ticker,dt)], self.schema.log_schema)
        return log_sdf.union(sdf)
    
    @time_func
    def etl_execution(
        self,
        data_sdf,
        log_sdf
    ):
        """
        Execute the function Extract, Transform, and 
        """
        print(f'{COLOR_BLUE}Process: etl_execution has started at {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} for {self.ticker}{COLOR_RESET}')
        df, is_empty = self.extract()
        
        if is_empty == 1:
            error_datetime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            print(f'{COLOR_RED}ERROR: etl process for {self.ticker} has failed at {error_datetime}{COLOR_RESET}')
            return data_sdf, self.etl_log_load(log_sdf, self.ticker, error_datetime)
        else:
            sdf = self.transform(df)
            print(f'{COLOR_GREEN}COMPLETE: etl process for {self.ticker}{COLOR_RESET}') 
            return self.load(data_sdf, sdf), self.etl_log_load(log_sdf, self.ticker, None)

### Example ETL Execution

Now, we will be testing the code. I have selected two stocks (AAPl and TSLA) with one additional imaginary stock called (DUMMY). "DUMMY" will be used as a test case where such stock ticker doesn't exist. The process must be able to capture this error, store to the information, and report the error. 

In [5]:
# Setup Schema and Empty Spark Dataframe for storing information
schema = SchemaLibrary()
financial_sdf = spark.createDataFrame([], schema.financial_schema)
log_sdf = spark.createDataFrame([], schema.log_schema)

# Setup start and end date for retriving the information
current_time = datetime.now()
start_date = current_time- timedelta(days = 200) # Prior two years
end_date = current_time.strftime('%Y-%m-%d')

# Example Execution of ETL
for ticker in ['AAPL','TSLA','DUMMY']:
    etl = FinanceETL(ticker=ticker, start_date=start_date, end_date=end_date)
    financial_sdf, log_sdf = etl.etl_execution(financial_sdf, log_sdf)

# Display the result
financial_sdf.groupBy(F.col('ticker')).count().show()
log_sdf.filter(F.col('dt').isNotNull()).show()

[94mProcess: etl_execution has started at 2023-08-05 08:02:13 for AAPL[0m
[*********************100%***********************]  1 of 1 completed
[92mCOMPLETE: etl process for AAPL[0m
Function [95m[1metl_execution[0m elapsed time: [96m[1m2199.700ms[0m

[94mProcess: etl_execution has started at 2023-08-05 08:02:16 for TSLA[0m
[*********************100%***********************]  1 of 1 completed
[92mCOMPLETE: etl process for TSLA[0m
Function [95m[1metl_execution[0m elapsed time: [96m[1m1086.315ms[0m

[94mProcess: etl_execution has started at 2023-08-05 08:02:17 for DUMMY[0m
[*********************100%***********************]  1 of 1 completed
[91mERROR: etl process for DUMMY has failed at 2023-08-05 08:02:17[0m
Function [95m[1metl_execution[0m elapsed time: [96m[1m260.550ms[0m



                                                                                

+------+-----+
|ticker|count|
+------+-----+
|  AAPL|  139|
|  TSLA|  139|
+------+-----+





+------+-------------------+
|ticker|                 dt|
+------+-------------------+
| DUMMY|2023-08-05 08:02:17|
+------+-------------------+



                                                                                

In [6]:
# For display purposes
(
    financial_sdf
    .withColumn('disp_rank', F.row_number().over(Window.partitionBy(F.col('ticker')).orderBy(F.desc(F.col('date')))))
    .filter(F.col('disp_rank').isin([1]))
    .drop(*['disp_rank'])
).toPandas().head(n=3)

                                                                                

Unnamed: 0,ticker,date,macd,daily_rtn,mavg_20_days,vol_chg,vol_shock,sma,rsi,created_at,dt
0,AAPL,2023-08-04,0.888196,-0.04802,192.165001,0.891064,0.0,192.165001,27.203654,2023-08-05 08:02:15.567891,2023-08-05
1,TSLA,2023-08-04,0.475714,-0.021055,269.26,0.017152,0.0,269.26,30.652676,2023-08-05 08:02:16.922907,2023-08-05


#### Testing another execution
Given the result from the previous ETL pipeline, I would like to see whether I can use the same procedure to add new stock or replace the exisiting stock with more recent data or not. Hence, I have selected two testing stocks as AAPL and MSFT.

In [7]:
# Repeat the ETL process specifically for MSFT and AAPL (again)
msft_etl = FinanceETL(ticker='MSFT', start_date=start_date, end_date=end_date)
financial_sdf, log_sdf = msft_etl.etl_execution(financial_sdf, log_sdf)

aapl_etl = FinanceETL(ticker='AAPL', start_date=start_date, end_date=end_date)
financial_sdf, log_sdf = aapl_etl.etl_execution(financial_sdf, log_sdf)

# Display the result
financial_sdf.groupBy(F.col('ticker')).count().show()
log_sdf.filter(F.col('dt').isNotNull()).show()


[94mProcess: etl_execution has started at 2023-08-05 08:02:31 for MSFT[0m
[*********************100%***********************]  1 of 1 completed
[92mCOMPLETE: etl process for MSFT[0m
Function [95m[1metl_execution[0m elapsed time: [96m[1m927.214ms[0m

[94mProcess: etl_execution has started at 2023-08-05 08:02:32 for AAPL[0m
[*********************100%***********************]  1 of 1 completed
[92mCOMPLETE: etl process for AAPL[0m
Function [95m[1metl_execution[0m elapsed time: [96m[1m852.233ms[0m



                                                                                

+------+-----+
|ticker|count|
+------+-----+
|  AAPL|  139|
|  TSLA|  139|
|  MSFT|  139|
+------+-----+





+------+-------------------+
|ticker|                 dt|
+------+-------------------+
| DUMMY|2023-08-05 08:02:17|
+------+-------------------+



                                                                                

In [8]:
# For display purposes for testing another execution
(
    financial_sdf
    .withColumn('disp_rank', F.row_number().over(Window.partitionBy(F.col('ticker')).orderBy(F.desc(F.col('date')))))
    .filter(F.col('disp_rank').isin([1]))
    .drop(*['disp_rank'])
).toPandas().head(n=3)

                                                                                

Unnamed: 0,ticker,date,macd,daily_rtn,mavg_20_days,vol_chg,vol_shock,sma,rsi,created_at,dt
0,AAPL,2023-08-04,0.888196,-0.04802,192.165001,0.891064,0.0,192.165001,27.203654,2023-08-05 08:02:32.623885,2023-08-05
1,MSFT,2023-08-04,-1.654641,0.003429,339.874498,0.299884,0.0,339.874498,38.533268,2023-08-05 08:02:31.822584,2023-08-05
2,TSLA,2023-08-04,0.475714,-0.021055,269.26,0.017152,0.0,269.26,30.652676,2023-08-05 08:02:16.922907,2023-08-05


### ETL Repiar kit

In [9]:
# Create ETL repair
class FinanceETLRepair():
    def __init__(
        self, 
        data, 
        log
    ):
        self.data = data
        self.log = log
        
    def extract_error_log(self):
        """
        Return PySpark dataframe of error ticker and download time
        """
        return self.log.filter(F.col('dt').isNotNull())
    
    def extract_ticker_list(self):
        """
        Return List of Distinct Stock ticker that failed to download from previous ETL process
        """
        log_sdf = self.extract_error_log()
        return log_sdf.select(['ticker']).distinct().toPandas()['ticker'].tolist()
    
    def single_repair_derror(
        self, 
        ticker,
        start_date, 
        end_date
    ):
        """
        Insert new data for single stock ticker
        """
        etl_finance = FinanceETL(ticker, start_date=start_date, end_date=end_date)
        self.data, self.log = etl_finance.etl_execution(self.data, self.log)
        return self.data, self.log
        
    def multiple_repair_derror(
        self, 
        ticker_list, 
        start_date, 
        end_date
    ):
        """
        Insert new data for multiple stock tickers (input in the format of a list)
        """
        for ticker in ticker_list:
            etl_finance = FinanceETL(ticker, start_date=start_date, end_date=end_date)
            self.data, self.log = etl_finance.etl_execution(self.data, self.log)
        return self.data, self.log

#### Disclamier: testing the ETL repair kit

For testing purposes, I will modified the stock inside the error_ticker_list to be something completely new like ```['CRM','UBER','IBM']```. Also, I will demonstrate that ```single_repair_derror()``` function will overwrite the existing data incase of single group of data is needed etl exeuction again. I will be using MSFT as testing stock. 


In [10]:
# Execute the Repair process
etl_repair = FinanceETLRepair(financial_sdf, log_sdf)
error_log = etl_repair.extract_error_log()
error_ticker_list = etl_repair.extract_ticker_list() # ["DUMMY"]
error_ticker_list = ['CRM','UBER','IBM']

financial_sdf, log_sdf = etl_repair.single_repair_derror('MSFT', start_date, end_date)
financial_sdf, log_sdf= etl_repair.multiple_repair_derror(error_ticker_list, start_date, end_date)

financial_sdf.groupBy(F.col('ticker')).count().show()

                                                                                

[94mProcess: etl_execution has started at 2023-08-05 08:02:47 for MSFT[0m
[*********************100%***********************]  1 of 1 completed
[92mCOMPLETE: etl process for MSFT[0m
Function [95m[1metl_execution[0m elapsed time: [96m[1m747.654ms[0m

[94mProcess: etl_execution has started at 2023-08-05 08:02:48 for CRM[0m
[*********************100%***********************]  1 of 1 completed
[92mCOMPLETE: etl process for CRM[0m
Function [95m[1metl_execution[0m elapsed time: [96m[1m1068.787ms[0m

[94mProcess: etl_execution has started at 2023-08-05 08:02:49 for UBER[0m
[*********************100%***********************]  1 of 1 completed
[92mCOMPLETE: etl process for UBER[0m
Function [95m[1metl_execution[0m elapsed time: [96m[1m973.877ms[0m

[94mProcess: etl_execution has started at 2023-08-05 08:02:50 for IBM[0m
[*********************100%***********************]  1 of 1 completed
[92mCOMPLETE: etl process for IBM[0m
Function [95m[1metl_execution[0m elaps



+------+-----+
|ticker|count|
+------+-----+
|  AAPL|  139|
|  TSLA|  139|
|   IBM|  139|
|  UBER|  139|
|  MSFT|  139|
|   CRM|  139|
+------+-----+



                                                                                

In [11]:
# For display purposes for testing ETL repair kit
(
    financial_sdf
    .withColumn('disp_rank', F.row_number().over(Window.partitionBy(F.col('ticker')).orderBy(F.desc(F.col('date')))))
    .filter(F.col('disp_rank').isin([1]))
    .drop(*['disp_rank'])
).toPandas().head(n=6)

                                                                                

Unnamed: 0,ticker,date,macd,daily_rtn,mavg_20_days,vol_chg,vol_shock,sma,rsi,created_at,dt
0,AAPL,2023-08-04,0.888196,-0.04802,192.165001,0.891064,0.0,192.165001,27.203654,2023-08-05 08:02:32.623885,2023-08-05
1,CRM,2023-08-04,0.879956,-0.0045,224.756499,-0.129714,0.0,224.756499,26.498415,2023-08-05 08:02:48.857252,2023-08-05
2,IBM,2023-08-04,3.006501,-0.001454,138.881499,0.068625,0.0,138.881499,91.254246,2023-08-05 08:02:50.759943,2023-08-05
3,MSFT,2023-08-04,-1.654641,0.003429,339.874498,0.299884,0.0,339.874498,38.533268,2023-08-05 08:02:47.842017,2023-08-05
4,TSLA,2023-08-04,0.475714,-0.021055,269.26,0.017152,0.0,269.26,30.652676,2023-08-05 08:02:16.922907,2023-08-05
5,UBER,2023-08-04,0.974281,-0.015465,46.331,0.015478,0.0,46.331,48.730559,2023-08-05 08:02:49.781731,2023-08-05


## Running the ETL Pipeline on larger set of stocks
There is a lot of data to be stored in the Dataframe. I have split the list of stocks into smaller partition can create seperate dataframe for storing the informatio. 

In [12]:
from yahoo_fin.stock_info import tickers_nasdaq, tickers_other
import random

class Stocks():
    def extract_nasdaq_tickers(self):
        return tickers_nasdaq()
    
    def extract_nasdaq100_df(self):
        df = pd.read_html('https://en.wikipedia.org/wiki/Nasdaq-100')[4]
        return df, df['Ticker'].tolist()
    
    def extract_sp500_df(self):
        df = pd.read_html('https://en.wikipedia.org/wiki/List_of_S%26P_500_companies')[0]
        return df, df['Symbol'].to_list()
    
    def extract_other_tickers(self):
        return tickers_other()
    
    def extract_all_tickers(self):
        return self.extract_nasdaq_tickers() + self.extract_other_tickers()

def split_into_partitions(arr):
    n = len(arr)
    partition_size = n // 5  # Size of each partition (5 partitions)
    remainder = n % 5  # Remainder elements after forming 5 partitions

    partitions = []
    start = 0
    for i in range(5):
        end = start + partition_size
        if i < remainder:
            end += 1
        partitions.append(arr[start:end])
        start = end

    return partitions

# Example usage:
sp500_df, sp500_stock_tickers = Stocks().extract_sp500_df()
splitted_result = split_into_partitions(sp500_stock_tickers)

def database_params_setup():
    schema = SchemaLibrary()
    sdf = spark.createDataFrame([], schema.financial_schema)
    log_sdf = spark.createDataFrame([], schema.log_schema)
    
    return sdf, log_sdf

# Create Database with dictionary as the structure
db = {}

# Setup start and end date for retriving the information
current_time = datetime.now()
start_date = current_time - timedelta(days = 200) # Prior two hundred days
end_date = current_time.strftime('%Y-%m-%d')


# Execute the ETL for each stock for each partition
for index,partition in enumerate(splitted_result):
    print(f"PARTITION {index+1} BEGINS: ------>")
    sdf, log_sdf = database_params_setup()
    for ticker in random.sample(partition,10):
        etl = FinanceETL(ticker=ticker, start_date=start_date, end_date=end_date)
        sdf, log_sdf = etl.etl_execution(sdf, log_sdf)
    db[f'stock_{index+1}'] = {}
    db[f'stock_{index+1}']['sdf'] = sdf
    db[f'stock_{index+1}']['log_sdf'] = log_sdf

PARTITION 1 BEGINS: ------>
[94mProcess: etl_execution has started at 2023-08-05 08:03:05 for ARE[0m
[*********************100%***********************]  1 of 1 completed
[92mCOMPLETE: etl process for ARE[0m
Function [95m[1metl_execution[0m elapsed time: [96m[1m791.528ms[0m

[94mProcess: etl_execution has started at 2023-08-05 08:03:06 for AEP[0m
[*********************100%***********************]  1 of 1 completed
[92mCOMPLETE: etl process for AEP[0m
Function [95m[1metl_execution[0m elapsed time: [96m[1m871.886ms[0m

[94mProcess: etl_execution has started at 2023-08-05 08:03:07 for AME[0m
[*********************100%***********************]  1 of 1 completed
[92mCOMPLETE: etl process for AME[0m
Function [95m[1metl_execution[0m elapsed time: [96m[1m824.006ms[0m

[94mProcess: etl_execution has started at 2023-08-05 08:03:08 for MO[0m
[*********************100%***********************]  1 of 1 completed
[92mCOMPLETE: etl process for MO[0m
Function [95m[1met

In [13]:
schema = SchemaLibrary()
stock_sdf = spark.createDataFrame([], schema.financial_schema)

for key, value in db.items():
    stock_sdf = stock_sdf.unionAll(db[key]['sdf'].filter(F.col('date') == (current_time - timedelta(days=1)).strftime('%Y-%m-%d')))

stock_sdf.toPandas().head()

23/08/05 08:04:28 WARN DAGScheduler: Broadcasting large task binary with size 1604.4 KiB
                                                                                

Unnamed: 0,ticker,date,macd,daily_rtn,mavg_20_days,vol_chg,vol_shock,sma,rsi,created_at,dt
0,ADM,2023-08-04,2.420522,0.006826,82.734,0.610674,0.0,82.734,74.094938,2023-08-05 08:03:10.462351,2023-08-05
1,AEP,2023-08-04,-0.610782,-0.009147,85.614999,-0.215618,0.0,85.614999,32.565772,2023-08-05 08:03:07.372058,2023-08-05
2,AES,2023-08-04,-0.141604,-0.04891,21.4715,0.276678,0.0,21.4715,34.895846,2023-08-05 08:03:09.730769,2023-08-05
3,AIG,2023-08-04,1.018229,-0.00736,59.7675,-0.161527,0.0,59.7675,63.4064,2023-08-05 08:03:13.581748,2023-08-05
4,ALL,2023-08-04,0.457975,-0.01338,108.904,-0.443291,0.0,108.904,62.400267,2023-08-05 08:03:11.177286,2023-08-05
