## Databrics Autoloader 

In [None]:
import logging
from pyspark.sql.functions import col, size, split, input_file_name 
from pyspark.sql.types import StructType, StructField, StringType, DecimalType, DateType, TimestampType, IntergerType 

logging.basicConfig(
    level=logging.INFO, 
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)

logger = logging.getLogger(__name__)


Below I will create a function that will have the autloader logic to use.   This code will be reuse to load different files into the unity catalog using autoloader

In [None]:
def autoloader(
        datasource, 
        format, 
        checkpoint,
        table_name, 
        schema, 
        file
):
    logger.info("start reading data using readstream")

    query = (
        spark.readstream
        .format("cloudFiles")
        .option("cloudFiles.format", format)
        .option("cloudFiles.schemaLocation", checkpoint) # this is where autoloader will house metadata 
        .option("cloudFiles.includeExistingFiles", "true") # Includes existing files in the file path 
        .option("cloudFiles.useIncrementalListing", "true") # use incremental listing to detect new files 
        .option("recursiveFileLookup", "true") # enable recursive file lookup 
        .option("header", "true")
        .opiton("pathGlobFilter", f"*{file}*") # this is a file patter that will be use to filter file name
        .schema(schema)
        .load(datasource)
    )

    # Running transofmation on streamed data

    query = query.withColumn("input_file_name", input_file_name())

    query = query.withColumn("split", split(query["input_file_name"]), "/")

    query = query.withColumn("input_file_name", query["split"][6])

    query = query.drop(query["split"])

    load = (
        query.writeStream
        .format("delta")
        .option("checkpointLocation", checkpoint)
        .option("mergeSchema", "true")
        .tirgger(avialable=True)
        .toTable(table_name)
    )

    return load 

## Load file using the above autoloader function 

For file we are going to autoload it contains 5 columns "FRIST_NAME", "LAST_NAME", "USERID", "USER_EMAIL", "PHONE_NUMER".  The file is pipe delimited therefore I have already use a delimiter in autloader code that will scan for pipe delimited.  

Let set the schema for the file we are going to autoload.   See autoloader fucntion on how the schema is set while the data is readStream. 

In [None]:
schema = StructType(
    [
        StructField("FIRST_NAME", StringType(), True),
        StructField("LAST_NAME", StringType(), True), 
        StructField("USERID", StringType(), True), 
        StructField("USER_EMAIL", StringType(), True), 
        StructField("PHONE_NUMBER", IntergerType(), True)
    ]
)

Since we are loading the data into the unity catalog of databricks we will use the following for catalog name "em_catalog", for the schema name we will use "em_schema".  we our file which is the file location with a file name em_user.  our location is in a databricks volume for exammple /volumes/em_catalog/em_schema/files.  

for our checkpoint location we created a file location as such /volumes/em_catalog/em_schema/checkpoint inside of this file we create the checkpoint for the em_user checkpoint location.   

When running the autoloader for the first time it will scan for directory for file name in the directory it will load all the files present in that directory.  If a new file lands on that directory the autoloader will only load the new data.  Why is this because of the checkpoint the checkpoint is a metadata that keeps track of the files that have been loaded.  Therefore only new files will loaded after running the autoloaded.   

In [None]:
catalog ="em_catalog" 
schema_name = "em_schema"
file = "em_user"
table_name = f"{catalog}.{schema_name}.{file}_inbound" 

checkpoint = f"/volumes/em_catalog/em_schema/checkpoint/{file}"
data_source = "/volumes/em_catalog/em_schema/files"

load = autoloader(data_source, "csv", table_name, checkpoint, schema, file)