## Read Me
### This notebook created to list and read files from parameterized S3 location and file names and save into ingestion layer (1st Layer DBFS location/Schema)

## Importing Required Packages

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
import logging
import time
import datetime

## Parameterize storage location and file names to read for DRY principle and define common variables

In [0]:
storage_location = dbutils.widgets.dropdown("cloud_location", "merkle-de-interview-case-study", ["merkle-de-interview-case-study"])

storage_location_name = dbutils.widgets.get("cloud_location")

##Get notebook name as schema_name in case to be used in namespaces
schema_name = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get().split("/")[-1]

## Metastore_name
location_name = "hive_metastore"

## Creating a logger to facilitate further debug activities
logger_name = "first_layer_pipeline_logger"


## Setting Down Logger for Debug Activities

In [0]:
logging.debug("Debug record and not logged")

logging.info("Info record and not logged")

logging.warning("Warning Logger")

logging.error("Error Issue Logger")

logging.critical("Critical Issue Logger")

ERROR:root:Error Issue Logger
CRITICAL:root:Critical Issue Logger


In [0]:
logger = logging.getLogger(f'{logger_name}_Data_Logger')

logger.setLevel(logging.DEBUG)

formatter=logging.Formatter('%(asctime)s-%(name)s-%(levelname)s-%(funcName)s:%(message)s',datefmt='%d/%m/%Y %I:%M:%S%P')

logFileHandler = logging.FileHandler(f'/dbfs/FileStore/{logger_name}_logfile',mode='w')

logFileHandler.setFormatter(formatter)

logger.addHandler(logFileHandler)

## Defining file names in storage location and dataframe lists

In [0]:
# If access provided to S3 location I could save file names save in a list for a loop based read operation

# file_list_schema = StructType([
# StructField('path',StringType()),
# StructField('name',StringType()),
# StructField('size',IntegerType())
# ])

# file_list = dbutils.fs.ls(f"s3a://{bucket_name}/de/")
# file_list_df = spark.createDataFrame(file_list,file_list_schema)

# file_list = file_list_df.select('name').rdd.map(lambda x : x[0]).collect()

## Created file list as hard coded due to access unavailability to storage address
file_list = ["item.csv","event.csv"]

#Creating dataframe names dynamically
data_frame_names = []
for file_name in file_list:
    data_frame_names.append(file_name.split(".")[0]+"_df")
    
print(data_frame_names)

#Creating dict for file names and dataframe names
df_file_dict = {}
for file_name in file_list:
    for data_frame in data_frame_names:
        df_file_dict[file_name] = data_frame
        data_frame_names.remove(data_frame)
        break

print(df_file_dict)

['item_df', 'event_df']
{'item.csv': 'item_df', 'event.csv': 'event_df'}


## Defining a function for CSV extension file reads from storage location

In [0]:
def read_csv_file(storage_location_name,dataframe_name,file_list = []):
    """
    Creates dataframe based on defined elements data_frame_names list.
    Reads files from cloud storage location conditionally by the file type 

    Args:
        dataframe_name (str): The name of the dataframe.
        storage_location_name (str): The name of the storage location name.
        file_list (list): Name of the files to be read from storage location saved in a list.

    Returns:
        DataFrame: True if the parameters exists, False otherwise.
    """
    for file_name in file_list:
        if "csv" in file_name:
            dataframe_name = (spark.read.format("csv")\
            .option("mode", "PERMISSIVE")\
            .option("quote",'"')\
            .option("escape",'"')\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .option("delimiter", ",")\
            .load(f"s3a://{storage_location_name}/de/{file_name}"))
        else:
            print(f"No CSV files exists in {storage_location_name} cloud storage location")
    return dataframe_name

## Defining Function to write Dataframes into schema by loop

In [0]:
def create_schema(schema_name,location_name):
    """
    Checks if a schema exists in the spark catalog.

    Args:
        location_name (str): catalog name to save schema on
        schema_name (str): The name of the schema to check.

    Returns:
        DataFrame: Empty DF creates schema on defined catalog if not exits
    """
    return spark.sql(f"CREATE SCHEMA IF NOT EXISTS  {location_name}.{schema_name}")


def check_if_table_exists(schema_name, table_name):
    """
    Checks if a table exists in the spark catalog.

    Args:
        table_name (str): The name of the table to check.

    Returns:
        bool: True if the table exists, False otherwise.
    """
    return spark.catalog.tableExists(f"hive_metastore.{schema_name}.{table_name}_bronze_layer_managed_table")

def write_to_managed_table(df, table_name, schema_name, location_name, mode = "overwrite"):
    """
    Writes a DataFrame to a managed table in Delta Lake.

    If the table exists and mode is overwrite, it performs an overwrite operation.
    Otherwise, it either creates a new table or appends transactions to table based on the `mode` parameter.

    Args:
        df (pyspark.sql.DataFrame): The DataFrame to write to the table.
        table_name (str): The name of the target table.
        schema_name (str): The schema name of target table.
        location_name (str): Catalog name to save schema on
        mode (str, optional): The write mode.
    """
    #create schema if not exists
    create_schema(schema_name,location_name)
    # check if the table exists
    if check_if_table_exists(schema_name, table_name):
        print(f"Table exists on hive_metastore.{schema_name}.{table_name}_bronze_layer_managed_table")
        if mode == "overwrite":
            print(f"Overwriting all transactions on managed table hive_metastore.{schema_name}.{table_name}_bronze_layer_managed_table")
            df.write.format("delta").option("delta.columnMapping.mode", "name").mode(mode).saveAsTable(f"hive_metastore.{schema_name}.{table_name}_bronze_layer_managed_table")
        else:
            print(f"Appending all transactions on managed table hive_metastore.{schema_name}.{table_name}_bronze_layer_managed_table")
            df.write.format("delta").option("delta.columnMapping.mode", "name").mode(mode).saveAsTable(f"hive_metastore.{schema_name}.{table_name}_bronze_layer_managed_table")
    else:
        print(f"Writing to managed table hive_metastore.{schema_name}.{table_name}_bronze_layer_managed_table")
        df.write.format("delta").option("delta.columnMapping.mode", "name").saveAsTable(f"hive_metastore.{schema_name}.{table_name}_bronze_layer_managed_table")

## Reading tables from s3 bucket and logging details

In [0]:
message = '{} data from S3 buckets read started'.format(f'{"First_layer_pipeline"}')

logger.info(message)

INFO:first_layer_pipeline_logger_Data_Logger:First_layer_pipeline data from S3 buckets read started


In [0]:
def pipeline_first():
    for key,value in df_file_dict.items():
        print(key,value)
        # Reading csv files and assigning to defined values from dict
        vars()[value] = read_csv_file(storage_location_name,value,[key])
        # Casting all columns to string datatype
        vars()[value] = vars()[value].select([col(f"`{c}`").cast("string") for c in vars()[value].columns])
        # Writing dataframes to 1st layer on defined schema and table
        write_to_managed_table(vars()[value], value, schema_name, location_name)
        # Logging size of the dataframes saved to 1st layer
        shape_df = (vars()[value].count(),len(vars()[value].columns))
        message = '{} files read from s3 bucket completed. Total {} rows & columns loaded into dataframe for {} first layer schema write'.format(f'{"First_layer_pipeline"}',shape_df,value)
        logger.info(message)

In [0]:
pipeline_first()

item.csv item_df
Table exists on hive_metastore.first_layer_pipeline.item_df_bronze_layer_managed_table
Overwriting all transactions on managed table hive_metastore.first_layer_pipeline.item_df_bronze_layer_managed_table


INFO:first_layer_pipeline_logger_Data_Logger:First_layer_pipeline files read from s3 bucket completed. Total (2198, 7) rows & columns loaded into dataframe for item_df first layer schema write


event.csv event_df
Table exists on hive_metastore.first_layer_pipeline.event_df_bronze_layer_managed_table
Overwriting all transactions on managed table hive_metastore.first_layer_pipeline.event_df_bronze_layer_managed_table


INFO:first_layer_pipeline_logger_Data_Logger:First_layer_pipeline files read from s3 bucket completed. Total (853640, 4) rows & columns loaded into dataframe for event_df first layer schema write
