AutoLoader is a utility to automatically and efficiently ingest new files that show up in cloud storage

In [0]:
# Create input folder on volume
dbutils.fs.mkdirs("/Volumes/dev/bronze/landing/autoloader_input/2010/12/01")
dbutils.fs.mkdirs("/Volumes/dev/bronze/landing/autoloader_input/2010/12/02")
dbutils.fs.mkdirs("/Volumes/dev/bronze/landing/autoloader_input/2010/12/03")
dbutils.fs.mkdirs("/Volumes/dev/bronze/landing/autoloader_input/2010/12/04")
dbutils.fs.mkdirs("/Volumes/dev/bronze/landing/autoloader_input/2010/12/05")
dbutils.fs.mkdirs("/Volumes/dev/bronze/landing/autoloader_input/2010/12/06")
dbutils.fs.mkdirs("/Volumes/dev/bronze/landing/autoloader_input/2010/12/07")

In [0]:
# Create checkpoint location on volume
# Checkpoints are used to manage and incrementally process files from cloud storage
dbutils.fs.mkdirs("/Volumes/dev/bronze/landing/checkpoint/autoloader")

In [0]:
# Copy files to nested location
dbutils.fs.cp("/databricks-datasets/definitive-guide/data/retail-data/by-day/2010-12-01.csv", "/Volumes/dev/bronze/landing/autoloader_input/2010/12/01")
dbutils.fs.cp("/databricks-datasets/definitive-guide/data/retail-data/by-day/2010-12-02.csv", "/Volumes/dev/bronze/landing/autoloader_input/2010/12/02")
dbutils.fs.cp("/databricks-datasets/definitive-guide/data/retail-data/by-day/2010-12-03.csv", "/Volumes/dev/bronze/landing/autoloader_input/2010/12/03")

In [0]:
# Ingest with autloader
# There are 2 modes of file detection
# 1. Directory Listing - Uses API calls to detect new files
# 2. File Notification - Uses Notification and Queue services on cloud account (requires elevated cloud permissions)
# By default, option 1 is selected

df = (
    spark
    .readStream # Similar to structured streaming
    .format("cloudFiles")
    .option("cloudFiles.format", "csv") # Specifies file format
    .option("pathGlobFilter", "*.csv") # Specifies file extension
    .option("header", "true") # Grabs header 
    .option("cloudFiles.schemaEvolutionMode", "addNewColums") # This is the default behavior (first rerun with new columns fails, second rerun works); there's also "rescue" (new column and data in _rescued_data column), "none" (schema changes are ignored) or "failOnNewColumn" (fails if new column is added)
    .option("cloudFiles.schemaHints", "Quantity int, UnitPrice double") # Provides hints on reading schema
    .option("cloudFiles.schemaLocation", "/Volumes/dev/bronze/landing/checkpoint/autoloader/1/") # Stores schema used for schema evolutions
    .load("/Volumes/dev/bronze/landing/autoloader_input/*/") # Directs autloader to read from this location recursively
)

# Re-running this won't read the same file again thanks to checkpointing

In [0]:
from pyspark.sql.functions import col

(
    df
    .withColumn("__file", col("_metadata.file_name")) # Acquire file name; _metadata is a hidden column already available in all DataFrames
    .writeStream # Write stream data
    .option("checkpointLocation", "/Volumes/dev/bronze/landing/checkpoint/autoloader/1/") # Specify checkpoint location
    .option("mergeSchema", "true")
    .outputMode("append") # Write in append mode
    .trigger(availableNow=True) # To process in batches
    .toTable("dev.bronze.invoice_al_1") # Write to a table called dev.bronze.invoice_al_1
)
# Initializes a stream and terminates it when it is done writing

In [0]:
%sql
select __file, count(*) as num_records from dev.bronze.invoice_al_1
group by __file;

In [0]:
# Copy some more data files
dbutils.fs.cp("/databricks-datasets/definitive-guide/data/retail-data/by-day/2010-12-05.csv", "/Volumes/dev/bronze/landing/autoloader_input/2010/12/05")
dbutils.fs.cp("/databricks-datasets/definitive-guide/data/retail-data/by-day/2010-12-06.csv", "/Volumes/dev/bronze/landing/autoloader_input/2010/12/06")