#### This Script ingest raw data from filestore and load into a silver delta table for processing.
 - Ingest Raw Data from file store 
 - Write to Silver File

In [None]:
# Import required library
import pandas as pd

In [None]:
# Get enviornment variables
dbutils.widgets.text("file_path", "/dbfs/FileStore/RawData/")
dbutils.widgets.text("store_path", "dbfs:/FileStore/RawData/")
dbutils.widgets.text("catalog_name", "ai_ml_learning")
dbutils.widgets.text("schema_name", "occupancy_project")

store_path = dbutils.widgets.get("store_path")
file_path = dbutils.widgets.get("file_path")
catalog_name = dbutils.widgets.get("catalog_name")
schema_name = dbutils.widgets.get("schema_name")

In [None]:
# Ingest data function
def ingest_data(store_path, file_path):
    """
    Ingest raw data from file store into a Delta table
    """
    # get file names
    try:
        file_names = [files.name for files in dbutils.fs.ls(store_path)]
    except Exception as e:
        raise Exception(f"Files does not exist: {e}")

    # read data into pandas dataframe
    data = pd.concat([pd.read_csv(f"{file_path}{name}").reset_index(drop=True) for name in file_names])
    return data

In [None]:
# save data to silver table
def load_data(df, catalog_name, schema_name):
    """
    Load ingested data into a Delta table
    """
    try:
        # convert dataframe to spark dataframe and save as delta table
        spark.createDataFrame(df).write.mode("overwrite").saveAsTable(f"{catalog_name}.{schema_name}.silverTable")
    except Exception as e:
        raise Exception(f"Error loading data: {e}")

    return "Data saved to table {catalog_name}.{schema_name}.silverTable"

In [None]:
# execute functions
df = ingest_data(store_path, file_path)
load_data(df, catalog_name, schema_name)