In [0]:
%sql
use catalog customer_catalog;

## Importing necessary PySpark DataFrame, functions and types for data processing

In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import explode, col, cast, when, to_date, try_to_timestamp, explode_outer
from pyspark.sql.types import IntegerType, FloatType, DateType, DoubleType

## Read Function for json file format

In [0]:
''' def read_file(layer, table_name):
    return spark.read.format("json") \
                     .option("multiline", "true") \
                     .table(f"customer_catalog.{layer}.{table_name}
'''

### Read function for any file format

This function reads a file from the specified layer and table name.
Parameters :
- layer : Data Layer [bronze, silver, gold]
- table_name : Name of the table
- fileFormat : format of the file [json, CSV, parquet, delta]
- header : if true, first row is header (default is False)
- inferSchema : if true, take schema from the file (default is false)
- multiLine : used for JSON file, (default is False)
- mode : [PERMISSIVE : put the corrupt data in corrupt_Record column and put null in all the columns,          DROPMALFROMED : drops the row that has corrupted data, FAILFAST : fails to read the file that has corrupted data]
(default is PERMISSIVE)
- columnnameofCorruptedRecord : name of the column which stores the corrupted data
- delimiter : used for CSV files, the symbol used to separate fields (default is ",")

After getting the fileFormat as input it returns DataFrame according to that

In [0]:
def read_file(layer : str,
              table_name : str,
              fileFormat : str,
              header : bool = False,
              inferSchema : bool = False,
              multiLine : bool = False,
              mode : str = "PERMISSIVE",
              columnnameofCorruptedRecord : str = "corrupted_record",
              delimiter : str = ","
              ) -> DataFrame:
    
    file_path = (f"customer_catalog.{layer}.{table_name}")
    
    if fileFormat == "json" :
        return spark.read.format("json") \
                  .option("multiLine", multiLine) \
                  .table(file_path)

    elif fileFormat == "csv" :
        return spark.read.format("csv") \
                  .option("header", header) \
                  .option("inferSchema", inferSchema) \
                  .option("delimiter", delimiter) \
                  .table(file_path)
    
    elif fileFormat == "parquet" :
        return spark.read.format("parquet") \
                  .table(file_path)
    
    elif fileFormat == "delta" :
        return spark.read.format("delta") \
                  .table(file_path)
    else :
        print("Invalid file format")
                  
   # return (f"df_{layer}")
    

## Function to flatten struct and array type column

In [0]:
"""
def flatten_struct(df, col_name, prefix=""):
    fields = df.schema[col_name].dataType.fields
    for field in fields:
        field_name = f"{prefix}{col_name}_{field.name}"
        if field.dataType.typeName() == "struct":
            df = flatten_struct(df.withColumn(field_name, col(f"{col_name}.{field.name}")), field_name)
        else:
            df = df.withColumn(field_name, col(f"{col_name}.{field.name}"))
    return df.drop(col_name)

df = read_file("bronze", "orders", "json")

while True :
    for name, dtype in df.dtypes:
       field = df.schema[name]
       if field.dataType.typeName() == "array":
           df = df.withColumn(name, explode(col(name)))
       elif field.dataType.typeName() == "struct":
           df = flatten_struct(df, name)
display(df)
"""

In [0]:
def flatten_struct(df : DataFrame,
                   col_name : str) -> DataFrame :
    fields = df.schema[col_name].dataType.fields
    for field in fields:
        field_name = f"{col_name}_{field.name}"
        if field.dataType.typeName() == "struct":
            df = flatten_struct(df.withColumn(field_name, col(f"{col_name}.{field.name}")), field_name)
        else:
            df = df.withColumn(field_name, col(f"{col_name}.{field.name}"))
    return df.drop(col_name)

def flatten_column(df : DataFrame) -> DataFrame:
    while True:
        complex_fields = []
        for field in df.schema.fields:
            if field.dataType.typeName() == "array":
                complex_fields.append((field.name, "array"))
            elif field.dataType.typeName() == "struct":
                complex_fields.append((field.name, "struct"))

        if len(complex_fields) == 0:
            break

        for field_name, field_type in complex_fields:
            if field_type == "array":
                df = df.withColumn(field_name, explode_outer(col(field_name)))
            elif field_type == "struct":
                df = flatten_struct(df, field_name)
                
    return df

## Write function

This function write the table in delta format.
Parameters :
- layer : Data layer [bronze, silver, gold]
- table_name : name of the table
- mode : [append - add new columns after existing data, overwrite - deletes exsiting data and put the new data, error - throws error if table already exists (default), ignore - ignore if table already exists]
- overwriteSchema - if true, it will replace the existing table schema with the new one (works with overwrite mode)
- mergeSchema - if true, it will merge the existing and new schema

In [0]:
def write_file(layer : str,
               table_name : str,
               mode : str,
               overwriteSchema : bool = False,
               mergeSchema : bool = False,
               ):
    return spark.write.format("delta") \
                   .option("mode", mode) \
                   .option("overwriteSchema", overwriteSchema) \
                   .option("mergeSchema", mergeSchema) \
                   .saveAsTable(f"customer_catalog.{layer}.{table_name}")