# Bronze Layer
In this notebook, we will create the Bronze Layer of our Data Pipeline. We already have fetched the data from the APIs, so now we will perform the initial cleanups.
* Insert the *ticker* column for each of the datasets
* Transform the date column from string to datetime
* Drop any data before 2019-01-01, as we want to focus on the last 5 years of data.
* Union all the data together in a single table


In [0]:
# Importing needed Pyspark functions
from pyspark.sql import functions as F
from pyspark.sql.functions import col, count, sum, when
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, DateType

In [0]:
def transform_data(ticker):
    '''
    Function that (1) loads a dataset; 
    (2) adds a ticker column for the stock dataset; 
    (3) Transform the column to datetime; 
    (4) Drop data before 2019-01-01
    Inputs:
    * ticker: str = code to be added to all obervations
    '''

    # File path to be loaded
    path = f'/FileStore/tables/raw/{ticker}'

    # As the dataset for the ETF "DJUSTL" was pulled from a different API, it has the column date named as datetime. It needs to be corrected
    if ticker == "DJUSTL":
        df = (
            spark.read.parquet(path)
            .withColumn('ticker', F.lit(ticker))
            .withColumnRenamed('datetime', 'date')
        )
    else:    
        # (2) Add ticker column
        df = (
            spark.read.parquet(path)
            .withColumn('ticker', F.lit(ticker))
        )

    # Steps (3) and (4)
    # If working with economic indices, adapt for less columns
    if ticker in ['INFLATION', 'REAL_GDP_PER_CAPITA', 'CPI']:
        df = (df
            .select( 'ticker', 'value',
                      F.to_date('date').alias('date')  )
            .filter( col('date') >= '2019-01-01' ) #data cleanup, drop data before 2019
          )
    else:
        df = (df
            .select( 'ticker', 'open', 'high', 'low', 'close', 'volume',
                      F.to_date('date').alias('date')  )
            .filter( col('date') >= '2019-01-01' ) #data cleanup, drop data before 2019
          )

    # Return transformed data    
    return df

In [0]:
# Function to add RSI to the stock dataset
def add_RSI(df, ticker):
    '''Function to add RSI column to the stocks
    inputs:
    ticker: str = stock code'''

    # File path to be loaded
    path = f'/FileStore/tables/raw/RSI/{ticker}'
    rsi = spark.read.parquet(path)

    # Transform date to datetime format
    rsi = (rsi
            .select( F.to_date('date').alias('date'),
                    col('value').alias('RSI')  )
            .filter( col('date') >= '2019-01-01' ) #data cleanup, drop data before 2019
    )

    # Add RSI to the dataset
    df = (df
          .join(rsi, on='date', how= 'inner')
          )

    # Return transformed data
    return df


In [0]:
names = ['CHTR' , 'CMCSA', 'T', 'TMUS', 'VZ']

# Save transformed datasets
for folder in names:
    stock = transform_data(ticker = folder)
    stock_bronze = add_RSI(stock, ticker=folder)
    path = f'/FileStore/tables/bronze/{folder}' 
    stock_bronze.coalesce(1).write.format('parquet').mode('overwrite').save(path)
    print(f'{folder} transformed')


indicators = ['INFLATION', 'REAL_GDP_PER_CAPITA', 'CPI', 'DJUSTL']
# Save transformed datasets
for folder in indicators:
    df_ind = transform_data(ticker = folder)
    path = f'/FileStore/tables/bronze/{folder}' 
    df_ind.coalesce(1).write.format('parquet').mode('overwrite').save(path)
    print(f'{folder} transformed')

CHTR transformed
CMCSA transformed
T transformed
TMUS transformed
VZ transformed
INFLATION transformed
REAL_GDP_PER_CAPITA transformed
CPI transformed
DJUSTL transformed
