In [0]:
import os #working with files and directories

raw_data_dir = "/Volumes/anime_warehouse/default/raw_files/raw_data"

# All files in raw_data_dir
files = dbutils.fs.ls(raw_data_dir)

for file_info in files:
    file_name = file_info.name
    
    # Process only CSV files and skip folders that already exist
    if file_name.endswith(".csv"):
        # Create a folder name name / path
        folder_name = file_name.replace(".csv", "")
        folder_path = f"{raw_data_dir}/{folder_name}"
        
        # 2. Create the new sub-folder
        dbutils.fs.mkdirs(folder_path)
        
        # 3. Move the file into it
        source_path = f"{raw_data_dir}/{file_name}"
        destination_path = f"{folder_path}/{file_name}"
        
        dbutils.fs.mv(source_path, destination_path)
        print(f"Moved {file_name} into {folder_path}")

print("Done")

* Wanted to automate the process of making a new folder for each new csv loaded into my raw_data folder, such that when I run the auto loader, it wont treat all the CSVs as one dataset and lump them together
  * also, if i upload a new file into the anime folder, auto loader only processes those new rows and won't get confused by files belonging to other tables

In [0]:
import os 

table_name = "anime_warehouse.bronze.raw_anime"
source_path = "/Volumes/anime_warehouse/default/raw_files/raw_data/"
schema_location = "/Volumes/anime_warehouse/default/raw_files/checkpoints/raw_anime"

def ingest_bronze(source_path, table_name, schema_location):
    """
    Ingests raw CSVs into a Delta Bronze table using Auto Loader.
    
    Args:
        source_path (str): Path to the raw CSV files
        table_name (str): Target table name
        schema_location (str): Path to store the schema inference checkpoints
    """
    (spark.readStream
        .format("cloudFiles") 
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.schemaLocation", schema_location)
        .option("cloudFiles.inferColumnTypes", "true") 
        .option("header", "true")
        .option("escape", '"')
        .load(source_path)
        .writeStream
        .option("checkpointLocation", f"{schema_location}/_checkpoint")
        .trigger(availableNow=True) # Process all files once, then stop (Batch mode)
        .toTable(table_name)
    )
    print(f"Successfully ingested {table_name}")

raw_data_base = "/Volumes/anime_warehouse/default/raw_files/raw_data"
checkpoint_base = "/Volumes/anime_warehouse/default/raw_files/checkpoints"
folders = dbutils.fs.ls(raw_data_base)

for f in folders:
    if f.isDir():
        folder_name = f.name.replace("/","")
        current_source_path = f.path
        current_table_name = f"anime_warehouse.bronze.raw_{folder_name}"
        current_schema_loc = f"{checkpoint_base}/raw_{folder_name}"

        ingest_bronze(
            source_path = current_source_path,
            table_name = current_table_name,
            schema_location = current_schema_loc
        )

print("Done")
    

* The script for the actual ingestion process after sorting each CSV into a folder
  * Cloud files: automatically detects new files, such that if I rerun, it only picks up new files
  * Trigger(availableNow=True): One-time run, batch job
  * Schema location: Data lineage, if a new file comes with an extra column, the spark job would normally fail to maintain integrity, but with cloud schema location it keeps a record of the past data and updates it
  * toTable() automatically makes the table per csv as a delta table

In [0]:
%sql
SELECT * FROM anime_warehouse.bronze.raw_anime_genres LIMIT 20;