In [0]:
storage_account_name = "bikesharedlake"

In [0]:
from pyspark.sql.functions import input_file_name
from pyspark.sql.functions import current_timestamp
from pyspark.sql.dataframe import *

In [0]:
def read_files(layer, file_name, file_format, col_names=None, schema=None):
    """
    Parameter layer is the medallion layer: landingzone, bronze, silver, gold
    Parameter path is the relative folder structure that all files should be read from
    Parameter file_format allows for selection of file formats like CSV or Parquet
    Parameter column_names is a list of column names to use for the DataFrame
    Parameter schema is a schema for the DataFrame
    """

    # constructing full path
    full_path = f"/mnt/{storage_account_name}/{layer}/{file_name}.{file_format}"
    message = 'reading file from: ' + full_path
    print(message)

    try:
        # reading parquet files to DataFrame
        if col_names is not None and schema is None:
            df = spark.read.format(file_format).option("header", True).load(full_path).toDF(*col_names)
        elif col_names is not None and schema is not None:
            df = spark.read.format(file_format).option("header", True).schema(schema).load(full_path).toDF(*col_names)
        elif col_names is None and schema is not None:
            df = spark.read.format(file_format).option("header", True).schema(schema).load(full_path)
        else:
            df = spark.read.format(file_format).load(full_path)
        message = "file successfully read from:" + full_path
        print(message)

    except:
        # failure message
        message = "Failed to read file from:" + full_path
        print(message)

    return df


In [0]:
def df_add_columns(df, add_timestamp=False, add_filename=False):
 
    """
    This function adds bronze layer custom fields
    Parameter add_timestamp determines whether timestamp should be added to dataframe
    Parameter add_filename determines whether landing zone path and filename should be added to dataframe
    """
    #add timestamp column
    if add_timestamp:
        df = df.withColumn(
            "ingestion_date",
            current_timestamp()
        )
 
    #add full path and filename column
    if add_filename:
        df = df.withColumn(
            "bronze_filename",
            input_file_name()
        )
 
    return df
 
#extending DataFrame class with add_columns function 
DataFrame.add_columns = df_add_columns

In [0]:
def write_parquet_table(df, layer, table_name):
 
    """
    This function will write dataframe df to storage account as a parquet files
    Parameter table_name defines parquet file name in bronze layer 
    """
 
    #defining full path to in storage account
    full_path = f"/mnt/{storage_account_name}/{layer}/{table_name}"    
    message = 'writing bronze file: ' + full_path
    print(message)
    
    try:
        #writing table folder with parquet files in overwrite mode for SCD1
        df.write.mode('overwrite').parquet(full_path)
        message = 'Table successfully written: ' + full_path
        print(message)
 
    except:
        #error message
        message = 'Failed to write table: ' + full_path
        print(message)