In [6]:
from pyspark.sql.functions import col, input_file_name, current_timestamp, regexp_extract
import re

StatementMeta(, 22775446-3136-46d7-affe-7d1adc02af8d, 8, Finished, Available, Finished)

In [7]:
# parameter cell to store csv folder location as well as full table name
topfolder = "Files/jd_sharepoint/MLB"
dbschema = "dbo"

StatementMeta(, 22775446-3136-46d7-affe-7d1adc02af8d, 9, Finished, Available, Finished)

In [8]:
# get list of folders in topfolder
foldernames = notebookutils.fs.ls(topfolder)

StatementMeta(, 22775446-3136-46d7-affe-7d1adc02af8d, 10, Finished, Available, Finished)

In [9]:
# clean the column names (replace spaces with underscores) using this function...special chars will break parquet columns
def clean_column_name(name):
    return re.sub(r'[^a-zA-Z0-True0-9]', '_', name)

StatementMeta(, 22775446-3136-46d7-affe-7d1adc02af8d, 11, Finished, Available, Finished)

In [10]:
for f in foldernames:
    tablename = dbschema + '.' + clean_column_name(f.name)
    tablepath = f.path
    print(f"Now processing {tablename}...")
    
    # drop the table if it already exists
    spark.sql(f"drop table if exists {tablename};")
    spark.sql(f"drop table if exists {tablename}_bad;")

    # read data from csv folder, infer the schema, be permissive and add a field indicating whether the data is corrupted
    df = spark.read.format("csv") \
        .option("mode", "PERMISSIVE") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .option("columnNameOfCorruptRecord", "_corrupt_record") \
        .option("sep", ",") \
        .option("quote", "\"") \
        .option("escape", "\\") \
        .option("multiLine", "true") \
        .option("ignoreLeadingWhiteSpace", "true") \
        .option("ignoreTrailingWhiteSpace", "true") \
        .load(tablepath)
    
    # apply the cleaning function to all columns
    new_column_names = [clean_column_name(c) for c in df.columns]
    df = df.toDF(*new_column_names)

    # add metadata columns
    df = df.withColumn("_date_ingested_utc", current_timestamp()) \
        .withColumn("source_file", input_file_name()) \
        .withColumn("_source_file", regexp_extract(col("source_file"), r"(Files\/.*\.csv)", 1)) \
        .drop("source_file")

    # safely handle corrupt rows only if the column exists
    if "_corrupt_record" in df.columns:
        bad_df = df.filter(col("_corrupt_record").isNotNull())
        bad_count = bad_df.count()
        print(f"Corrupt rows found: {bad_count}")
        bad_df.show(20, truncate=False)
        bad_df.write.format("delta").mode("overwrite").saveAsTable(f"{tablename}_bad")
    else:
        print("     No malformed rows were captured (or none occurred)")


    # write df to a table using overwrite (we will overwrite each time as new data is added to the sharepoint folder)
    (
        df
        .write.format("delta")
        .mode("overwrite")
        .saveAsTable(tablename)
    )

    print(f"...{tablename} processed successfully!")

StatementMeta(, 22775446-3136-46d7-affe-7d1adc02af8d, 12, Submitted, Running, Running)

Now processing dbo.allstarfull...
     No malformed rows were captured (or none occurred)
...dbo.allstarfull processed successfully!
Now processing dbo.batting...
