In [0]:
%run ./setAccess

In [0]:
# create container with all ETL process levels and usages
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
dbutils.fs.ls("abfss://"+container+"@"+storageAccountName+".dfs.core.windows.net/")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")

for dir in dirs.values():
    dbutils.fs.mkdirs("abfss://"+container+"@"+storageAccountName+".dfs.core.windows.net/"+dir)

In [0]:
# build a star-shaped database in "curated" Data Lake's layer using .json config file
from pyspark.sql.types import *

configFileName = "airlines.json"
config = (spark.read
          .format("json")
          .option("multiline","true")
          .load("abfss://"+container+"@"+storageAccountName+".dfs.core.windows.net/"+
                dirs["config"]+configFileName)
          .collect()
         )

databases = list(set([conf.asDict()["targetBase"] for conf in config]))
mapTypes = {
    "Integer": IntegerType(),
    "Long": LongType(),
    "String": StringType(),
}
# create empty dimension tables 
for database in databases:
    colNames = [conf.asDict()["targetName"] for conf in config if conf.asDict()["targetBase"]==database]
    types = [conf.asDict()["type"] for conf in config if conf.asDict()["targetBase"]==database]
    nullable = [conf.asDict()["nullable"] for conf in config if conf.asDict()["targetBase"]==database]
    schema = [StructField("ID",LongType(),True)]
    for cN,t,n in list(zip(colNames,types,nullable)):
        schema.append(StructField(cN,mapTypes[t],n))
    schema = StructType(schema)
    df = spark.createDataFrame(data=[],schema=schema)
    (df.write
        .format("parquet")
        .mode("overwrite")
        .option("header", "true")
        .save("abfss://"+container+"@"+storageAccountName+".dfs.core.windows.net/"+
              dirs["lvl3"]+database+".parquet"))

# create empty facts table
schema = [StructField("ID",LongType(),True)]
for database in databases:
    schema.append(StructField(database+"_ID",LongType(),True))
schema = StructType(schema)
df = spark.createDataFrame(data=[],schema=schema)
(df.write
     .format("parquet")
     .mode("overwrite")
     .option("header", "true")
     .save("abfss://"+container+"@"+storageAccountName+".dfs.core.windows.net/"+
           dirs["lvl3"]+configFileName.split(".")[0]+".parquet"))

In [0]:
# create some random test files, using first 10 files from '/databricks-datasets/airlines/'
from pyspark.sql.types import StructType
import random
import string
from itertools import chain
from pyspark.sql.functions import create_map,lit,col

header = (spark.read
          .format("csv")
          .option("header", "true")
          .option("inferSchema", "true")
          .load("/databricks-datasets/airlines/part-00000")
)
schema = header.schema  # header found only in (...)part-00000
data = (spark.read
        .format("csv")
        .option("header", "true")
        .schema(schema)
        .load("/databricks-datasets/airlines/part-0000*")
       )

files = data.randomSplit([1.0 for i in range(10)], 1)
def createFiles(files,range_,fileNamePrefix,header):
    for i in range_:
        # create single-csv-files, not folders with 'parts'
        (files[i].coalesce(1).write 
            .format("csv") 
            .option("header", header) 
            .mode("overwrite") 
            .save("abfss://"+container+"@"+storageAccountName+".dfs.core.windows.net/"+
                  dirs["test"]+fileNamePrefix+str(i)+".csv") 
        )
        dbutils.fs.mv(
            "abfss://"+container+"@"+storageAccountName+".dfs.core.windows.net/"+dirs["test"]+fileNamePrefix+str(i)+".csv",
            "abfss://"+container+"@"+storageAccountName+".dfs.core.windows.net/"+dirs["test"]+fileNamePrefix+str(i)+".csv_buff",
            recurse=True
        )
        fileList = dbutils.fs.ls("abfss://"+container+"@"+storageAccountName+".dfs.core.windows.net/"+dirs["test"]+fileNamePrefix+str(i)+".csv_buff")
        fileList = [fileList[i][0] for i in range(len(fileList)) if fileList[i][0].endswith(".csv")][0]
        dbutils.fs.mv(
            fileList,
            "abfss://"+container+"@"+storageAccountName+".dfs.core.windows.net/"+dirs["test"]+fileNamePrefix+str(i)+".csv"
        )
        dbutils.fs.rm("abfss://"+container+"@"+storageAccountName+".dfs.core.windows.net/"+dirs["test"]+fileNamePrefix+str(i)+".csv_buff",
                      recurse=True)
createFiles(files,range(4),"airlines_testFile-","true")

# corrupted files creation:
# files[4] -> random number (1-15) of random columns dropped
random.seed(1)
files[4] = files[4].drop(
    *random.choices(
        StructType(files[4].schema).fieldNames(),
        k=random.randint(1,15)
    )
)

# files[5] -> random number (1-15) of random columns randomily renamed
def getRandString(length):
    return ''.join(random.choice(string.ascii_letters) for i in range(length))

for i in range(random.randint(1,15)):
    files[5] = files[5].withColumnRenamed(
        random.choice(StructType(files[5].schema).fieldNames()),
        getRandString(10)
    )
    
# files[6] -> change 'YES'/'NO' in 'IsDepDelayed' column to 'Y'/'N' (assuming: final database is expecting 'YES'/'NO' format)
mapping = {"YES":"Y", "NO":"N"}
mapping_expr = create_map([lit(x) for x in chain(*mapping.items())])
files[6] = files[6].withColumn("IsDepDelayed",mapping_expr.getItem(files[6].IsDepDelayed))

createFiles(files,range(4,7),"airlines_testFile-","true")

# files[7] -> file without header
createFiles(files,range(7,8),"airlines_testFile-","false")

# files [8] -> is duplicated 'files[6]'
files[8] = files[6]
createFiles(files,range(8,9),"airlines_testFile-","true")

# files[9] -> badly named file
createFiles(files,range(9,10),"airlin_testFile-","true")

In [0]:
# choose a file from created in previous cell and copy it FROM "(...)testSourceFiles" TO "(...)lvl1-raw" layer
copiedTestFileName = "airlines_testFile-0.csv"  # "airlines_testFile-X.csv" with X=0,1,2,...,8 and "airlin_testFile-9.csv". Corrupted files: X=4,5,...,8 (and 9 -> bad filename).

dbutils.fs.cp(
    "abfss://"+container+"@"+storageAccountName+".dfs.core.windows.net/"+dirs["test"]+copiedTestFileName,
    "abfss://"+container+"@"+storageAccountName+".dfs.core.windows.net/"+dirs["lvl1"]+copiedTestFileName,
    recurse=True
)