# **ETL NASDAQ and BTC daily data**

Library importation

In [0]:
import pandas as pd
import yfinance as fy
import logging
import sys
import json

Logging configuration

In [0]:
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    stream=sys.stdout
)

logger = logging.getLogger('financial_etl')

Init the dictionary of the status of the ETL execution

In [0]:
etl_results = {
    'status': 'success',
    'total_companies': 0, 
    'successful_downloads': 0,
    'failed_companies': [],
    'errors': [],
    'data_quality_issues': [],
    'fatal_error':'none',
    'successful_registration':[]
}

We get all the companys from AWS S3

In [0]:
try:
    # I get the secret credentials to access to my AWS S3 bucket
    access_key = dbutils.secrets.get(scope="aws-credentials", key="access-key")
    secret_key = dbutils.secrets.get(scope="aws-credentials", key="secret-key")

    spark.conf.set("fs.s3a.access.key", access_key)
    spark.conf.set("fs.s3a.secret.key", secret_key)
    # This csv i import here has got the list of all the companys i have to process
    companies_df = spark.read.option("header", "true").csv("s3a://data-lake-finan/config/lis-companys.csv")

    bitcoin_related_companies = [row.companys for row in companies_df.collect()]

    etl_results['total_companies'] = len(bitcoin_related_companies)

    logger.info(f'We will process {len(bitcoin_related_companies)} companys')
except Exception as e:
    etl_results['status'] = 'failed'
    etl_results['fatal_error'] = str(e)

### Creation of the functions that are responsible for verifying the quality of the data and also for transforming it

In [0]:
def has_wrong_columns(df : pd.DataFrame,required_columns : list):
    
    columns_in_dataframe = list(df.columns)

    missing_columns = False
    # If i found one missing column i will end the loop and return True
    i = 0
    while(i<len(required_columns) and not missing_columns):
        if required_columns[i] not in columns_in_dataframe:
            missing_columns = True
        i += 1
    return missing_columns

def check_has_NaN(df : pd.DataFrame):
    contain_NaN = df.isna().any().any()
    return contain_NaN
    
def modify_names(df : pd.DataFrame, company : str):
    for col in df.columns:
        if col != 'Date':
            df.rename(columns = {col : f'{col}_{company}'}, inplace=True)
    return df
    
def transformation_and_generation_new_variables(df_original : pd.DataFrame, company : str):
    df = df_original.copy()
    df['Range'] = df['High']-df['Low']
    logger.info(f'The variable Range was generated for {company}')
    df['Range_pct'] = ((df['Range']) / df['Open']) * 100
    logger.info(f'The variable Range_pct was generated for {company}')
    df['Variation_pct'] = ((df['Close']-df['Open'])/df['Open'])*100
    logger.info(f'The variable Variation_pct was generated for {company}')
    df[['Open','High','Low','Close','Range','Variation_pct','Range_pct']] = df[['Open','High','Low','Close','Range','Variation_pct','Range_pct']].astype('float32')
    logger.info(f'The data of {company} was optimized parsing to float32')
    return df

### Extraction of data from companies on the NASDAQ today

In [0]:
period = dbutils.widgets.get("period")
df_economic_data = pd.DataFrame()
all_dataframes = [] #Here i will collect all the diferents dataframes of all the companys
required_columns = ['Date', 'Open', 'High', 'Low', 'Close', 'Volume']
if etl_results['status']!='failed':
    for company in bitcoin_related_companies:
        try:
            data = fy.download(company,period=period,interval='1d',auto_adjust=True)
            if len(data) == 0 :
                logger.warning(f'The company {company} failed during the download')
                etl_results['failed_companies'].append(company)
                etl_results['errors'].append(f'{company}: failed during the download')
            else :
                logger.info(f'The info of the {company} its successfully downloaded')
                etl_results['successful_downloads'] += 1
                data.columns = data.columns.droplevel(1)
                data = data.reset_index() #I clean the unnecessary index comes from yfinance
                if has_wrong_columns(data,required_columns) == True:
                    logger.warning(f'The data structure of the company {company} is wrong')
                    etl_results['failed_companies'].append(company)
                    etl_results['data_quality_issues'].append(f'The data of {company} has wrong structure')
                else :
                    logger.info(f'{company} data structure its correct')
                    if check_has_NaN(data) == True:
                        logger.warning(f'The data of {company} has NULL values')
                        etl_results['failed_companies'].append(company)
                        etl_results['data_quality_issues'].append(f'The data of {company} has NULL values')
                    else : 
                        logger.info(f'{company} has not any NULL object')
                        clean_data = data[required_columns]
                        clean_data = transformation_and_generation_new_variables(clean_data,company)
                        clean_data = modify_names(clean_data,company) # I change the name of the columns for when i merge the dataframes
                        all_dataframes.append(clean_data)
                        logger.info(f'The data of {company} its correctly cleaned, transformed and registered')
                        etl_results['successful_registration'].append(company)
        except Exception as e:
            logger.error(f'Error with {company}: {e}')
            etl_results['failed_companies'].append(company)
            etl_results['errors'].append(f'{company}: {str(e)}')

    try:
        if len(all_dataframes) == 0:
            raise Exception('No data registered')
        df_economic_data = all_dataframes[0] # I put the first one to initlize the column of Date where i will merge the dataframes
        for df in all_dataframes[1:]:
            df_economic_data = df_economic_data.merge(df,on='Date',how='outer')
        logger.info(f'dataframe is correctyl unified, has {len(df_economic_data)} rows and {len(df_economic_data.columns)} columns')
    except Exception as e: 
        etl_results['status'] = 'failed'
        etl_results['fatal_error'] = str(e)
        logger.error(f'A fatal error has ocurred during the data downloading {str(e)}')


In [0]:
display(df_economic_data)

### Data load to Data Lake

In [0]:
if etl_results['status']!='failed':
    try:
        spark_df = spark.createDataFrame(df_economic_data)
        spark_df.write.format("delta").mode('append').option("mergeSchema", "true").save("s3a://data-lake-finan/data/") # I register the data in delta lake of AWS S3
    except Exception as e:
        etl_results['status'] = 'failed'
        etl_results['fatal_error'] = str(e)

dbutils.notebook.exit(json.dumps(etl_results)) # I send the results of the ETL for being captured by Airflow