## Mounting your blob storage source and sink

In [0]:
%python
storage_account_name = 'testtech'
storage_account_key = ''
container_name_source = 'bronzelayer'
container_name_destination = 'silverlayer'

# Configure the spark context with the storage account key
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key)

# Mount your source container
dbutils.fs.mount(
  source = f"wasbs://{container_name_source}@{storage_account_name}.blob.core.windows.net",
  mount_point = f"/mnt/{container_name_source}",
  extra_configs = {f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key}
)


dbutils.fs.mount(
  source = f"wasbs://{container_name_destination}@{storage_account_name}.blob.core.windows.net",
  mount_point = f"/mnt/{container_name_destination}",
  extra_configs = {f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key}
)

print('both Blob containers has been mounted')

both Blob containers has been mounted


## Unzipping each file in the Bronze Layer container and saving it to the Silver Layer container

In [0]:
dbutils.fs.ls(f"/mnt/{container_name_source}")

[FileInfo(path='dbfs:/mnt/bronzelayer/2009q2.zip', name='2009q2.zip', size=144894, modificationTime=1744090065000),
 FileInfo(path='dbfs:/mnt/bronzelayer/2009q3.zip', name='2009q3.zip', size=3544077, modificationTime=1744090066000),
 FileInfo(path='dbfs:/mnt/bronzelayer/2009q4.zip', name='2009q4.zip', size=4050938, modificationTime=1744090066000),
 FileInfo(path='dbfs:/mnt/bronzelayer/2010q1.zip', name='2010q1.zip', size=5311282, modificationTime=1744090067000),
 FileInfo(path='dbfs:/mnt/bronzelayer/2010q2.zip', name='2010q2.zip', size=3994236, modificationTime=1744090067000),
 FileInfo(path='dbfs:/mnt/bronzelayer/2010q3.zip', name='2010q3.zip', size=13745950, modificationTime=1744090068000),
 FileInfo(path='dbfs:/mnt/bronzelayer/2010q4.zip', name='2010q4.zip', size=14679781, modificationTime=1744090069000),
 FileInfo(path='dbfs:/mnt/bronzelayer/2011q1.zip', name='2011q1.zip', size=19615544, modificationTime=1744090070000),
 FileInfo(path='dbfs:/mnt/bronzelayer/2011q2.zip', name='2011q

In [0]:
# Import necessary libraries
import os
import zipfile
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col

# List all files in the source container
zip_files = dbutils.fs.ls(f"/mnt/{container_name_source}")
# Filter the list to only include zip files
zip_files = [f for f in zip_files if f.name.endswith('.zip')]

# Loop through each zip file
for zip_file in zip_files:
    
    # Create a new folder name based on the zip file name
    new_folder_name = zip_file.name.split('.')[0]
    # Create a new directory in the destination container
    dbutils.fs.mkdirs(f"/mnt/{container_name_destination}/{new_folder_name}")
    
    # Define the temporary extraction path
    extract_path = f'/dbfs/mnt/{container_name_source}/temp_folder'
    # Define the working directory where the zip file is located
    working_dir = f'/dbfs/mnt/{container_name_source}/{new_folder_name}.zip'
    # Create the temporary extraction directory
    dbutils.fs.mkdirs(extract_path)
    
    # Open the zip file and extract its contents to the temporary directory
    with zipfile.ZipFile(working_dir, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
    
    # Define the path to the 'sub.txt' file inside the temporary directory
    sub_path = f'/mnt/{container_name_source}/temp_folder/sub.txt'
    # Read the 'sub.txt' file into a DataFrame with specified delimiter and header
    sub_df = spark.read.option("delimiter", "\t").option("header", "true").csv(sub_path)
    # Convert the 'filed' and 'period' columns to date format
    sub_df = sub_df.withColumn("filed", to_date(col("filed"), "yyyyMMdd")).withColumn("period", to_date(col("period"), "yyyyMMdd"))
    # Write the DataFrame to a Parquet file in the destination directory
    sub_df.write.mode("overwrite").parquet(f'/mnt/{container_name_destination}/{new_folder_name}/sub.parquet')
    
    # Define the path to the 'tag.txt' file inside the temporary directory
    tag_path = f'/mnt/{container_name_source}/temp_folder/tag.txt'
    # Read the 'tag.txt' file into a DataFrame with specified delimiter and header
    tag_df = spark.read.option("delimiter", "\t").option("header", "true").csv(tag_path)
    # Write the DataFrame to a Parquet file in the destination directory
    tag_df.write.mode("overwrite").parquet(f'/mnt/{container_name_destination}/{new_folder_name}/tag.parquet')
    
    # Define the path to the 'num.txt' file inside the temporary directory
    num_path = f'/mnt/{container_name_source}/temp_folder/num.txt'
    # Read the 'num.txt' file into a DataFrame with specified delimiter and header
    num_df = spark.read.option("delimiter", "\t").option("header", "true").csv(num_path)
    num_df = num_df.withColumn("ddate", to_date(col("ddate"), "yyyyMMdd"))
    # Write the DataFrame to a Parquet file in the destination directory
    num_df.write.mode("overwrite").parquet(f'/mnt/{container_name_destination}/{new_folder_name}/num.parquet')
    
    # Define the path to the 'pre.txt' file inside the temporary directory
    pre_path = f'/mnt/{container_name_source}/temp_folder/pre.txt'
    # Read the 'pre.txt' file into a DataFrame with specified delimiter and header
    pre_df = spark.read.option("delimiter", "\t").option("header", "true").csv(pre_path)
    # Write the DataFrame to a Parquet file in the destination directory
    pre_df.write.mode("overwrite").parquet(f'/mnt/{container_name_destination}/{new_folder_name}/pre.parquet')
    
    # Remove the temporary extraction directory and its contents
    dbutils.fs.rm(extract_path, recurse=True)

## Save as CSV to the target container (OPTIONAL)

In [0]:
import os
import zipfile
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col

# List all files in the source container
zip_files = dbutils.fs.ls(f"/mnt/{container_name_source}")

# Filter the list to only include zip files
zip_files = [f for f in zip_files if f.name.endswith('.zip')]

# Loop through each zip file
for zip_file in zip_files:
    # Create a new folder name based on the zip file name
    new_folder_name = zip_file.name.split('.')[0]
    
    # Create a new directory in the destination container
    dbutils.fs.mkdirs(f"/mnt/{container_name_destination}/{new_folder_name}")
    
    # Define the temporary extraction path
    extract_path = f'/dbfs/mnt/{container_name_source}/temp_folder'
    
    # Define the working directory where the zip file is located
    working_dir = f'/dbfs/mnt/{container_name_source}/{new_folder_name}.zip'
    
    # Create the temporary extraction directory
    dbutils.fs.mkdirs(extract_path)
    
    # Open the zip file and extract its contents to the temporary directory
    with zipfile.ZipFile(working_dir, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
    
    # Define the path to the 'sub.txt' file inside the temporary directory
    sub_path = f'/mnt/{container_name_source}/temp_folder/sub.txt'
    
    # Read the 'sub.txt' file into a DataFrame with specified delimiter and header
    sub_df = spark.read.option("delimiter", "\t").option("header", "true").csv(sub_path)
    
    # Convert the 'filed' and 'period' columns to date format
    sub_df = sub_df.withColumn("filed", to_date(col("filed"), "yyyyMMdd")).withColumn("period", to_date(col("period"), "yyyyMMdd"))
    
    # Write the DataFrame to a CSV file in the destination directory
    sub_df.write.mode("overwrite").option("header", "true").csv(f'/mnt/{container_name_destination}/{new_folder_name}/sub.csv')
    
    # Define the path to the 'tag.txt' file inside the temporary directory
    tag_path = f'/mnt/{container_name_source}/temp_folder/tag.txt'
    
    # Read the 'tag.txt' file into a DataFrame with specified delimiter and header
    tag_df = spark.read.option("delimiter", "\t").option("header", "true").csv(tag_path)
    
    # Write the DataFrame to a CSV file in the destination directory
    tag_df.write.mode("overwrite").option("header", "true").csv(f'/mnt/{container_name_destination}/{new_folder_name}/tag.csv')
    
    # Define the path to the 'num.txt' file inside the temporary directory
    num_path = f'/mnt/{container_name_source}/temp_folder/num.txt'
    
    # Read the 'num.txt' file into a DataFrame with specified delimiter and header
    num_df = spark.read.option("delimiter", "\t").option("header", "true").csv(num_path)
    num_df = num_df.withColumn("ddate", to_date(col("ddate"), "yyyyMMdd"))
    
    # Write the DataFrame to a CSV file in the destination directory
    num_df.write.mode("overwrite").option("header", "true").csv(f'/mnt/{container_name_destination}/{new_folder_name}/num.csv')
    
    # Define the path to the 'pre.txt' file inside the temporary directory
    pre_path = f'/mnt/{container_name_source}/temp_folder/pre.txt'
    
    # Read the 'pre.txt' file into a DataFrame with specified delimiter and header
    pre_df = spark.read.option("delimiter", "\t").option("header", "true").csv(pre_path)
    
    # Write the DataFrame to a CSV file in the destination directory
    pre_df.write.mode("overwrite").option("header", "true").csv(f'/mnt/{container_name_destination}/{new_folder_name}/pre.csv')
    
    # Remove the temporary extraction directory and its contents
    dbutils.fs.rm(extract_path, recurse=True)

## Unmounting the blob storage containers

In [0]:
# Unmount the source container
dbutils.fs.unmount(f"/mnt/{container_name_source}")
# Unmount the destination container
dbutils.fs.unmount(f"/mnt/{container_name_destination}")

/mnt/bronzelayer has been unmounted.
/mnt/silverlayer has been unmounted.


True

## Creating Delta Tables for our Data

In [0]:
%sql
-- Submissions table
CREATE TABLE Submissions (
    adsh STRING NOT NULL,
    cik BIGINT,
    name STRING NOT NULL,
    sic INT,
    countryba STRING,
    stprba STRING,
    cityba STRING,
    zipba STRING,
    bas1 STRING,
    bas2 STRING,
    baph STRING,
    countryma STRING,
    stprma STRING,
    cityma STRING,
    zipma STRING,
    mas1 STRING,
    mas2 STRING,
    countryinc STRING ,
    stprinc STRING,
    ein STRING,
    former STRING,
    changed DATE,
    afs STRING,
    wksi BOOLEAN NOT NULL,
    fye STRING,
    form STRING,
    period DATE ,
    fy INT ,
    fp STRING,
    filed DATE NOT NULL,
    accepted TIMESTAMP NOT NULL,
    prevrpt BOOLEAN NOT NULL,
    detail BOOLEAN NOT NULL,
    instance STRING NOT NULL,
    nciks INT,
    aciks STRING
) USING DELTA;

-- Tags table
CREATE TABLE Tags (
    tag STRING NOT NULL,
    version STRING NOT NULL,
    custom INT NOT NULL,
    abstract BOOLEAN NOT NULL,
    datatype STRING,
    iord STRING,
    crdr STRING,
    tlabel STRING,
    doc STRING
) USING DELTA;

-- Numbers table
CREATE TABLE Numbers (
    adsh STRING NOT NULL,
    tag STRING NOT NULL,
    version STRING NOT NULL,
    ddate DATE NOT NULL,
    qtrs INT NOT NULL,
    uom STRING NOT NULL,
    coreg STRING ,
    value DECIMAL(28,4),
    footnote STRING
) USING DELTA;

-- Presentations table
CREATE TABLE Presentations (
    adsh STRING NOT NULL,
    report INT NOT NULL,
    line INT NOT NULL,
    stmt STRING,
    inpth BOOLEAN NOT NULL,
    rfile STRING,
    tag STRING,
    version STRING,
    plabel STRING
) USING DELTA;

## Adding constraints and Keys

In [0]:
%sql
-- Add primary keys
ALTER TABLE Submissions ADD CONSTRAINT pk_submissions PRIMARY KEY (adsh);
ALTER TABLE Tags ADD CONSTRAINT pk_tags PRIMARY KEY (tag, version);
ALTER TABLE Numbers ADD CONSTRAINT pk_numbers PRIMARY KEY (adsh, tag, version, ddate, qtrs, uom);
ALTER TABLE Presentations ADD CONSTRAINT pk_presentations PRIMARY KEY (adsh, report, line);

-- Add foreign keys
ALTER TABLE Numbers ADD CONSTRAINT fk_numbers_submissions 
    FOREIGN KEY (adsh) REFERENCES Submissions(adsh);
ALTER TABLE Numbers ADD CONSTRAINT fk_numbers_tags 
    FOREIGN KEY (tag, version) REFERENCES Tags(tag, version);
ALTER TABLE Presentations ADD CONSTRAINT fk_presentations_submissions 
    FOREIGN KEY (adsh) REFERENCES Submissions(adsh);
ALTER TABLE Presentations ADD CONSTRAINT fk_presentations_tags 
    FOREIGN KEY (tag, version) REFERENCES Tags(tag, version);

# Moving the unzipped Parquet files to the delta table
## Mount the container with the parquet files

In [0]:
%python
storage_account_name = 'testtech'
storage_account_key = ''
container_name_destination = 'silverlayer'

# Configure the spark context with the storage account key
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key)

dbutils.fs.mount(
  source = f"wasbs://{container_name_destination}@{storage_account_name}.blob.core.windows.net",
  mount_point = f"/mnt/{container_name_destination}",
  extra_configs = {f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key}
)

print('Destination container has been mounted')

Destination container has been mounted


## Enforce Database Schema Push to Delta Table

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import *

# Create SparkSession
spark = SparkSession.builder.appName("ParquetToDelta").getOrCreate()

# Define the root directory where your Blob Storage is mounted
root_directory = f"/mnt/{container_name_destination}"

# Define schemas for each table
submissions_schema = StructType([
    StructField("adsh", StringType(), False), # Not nullable
    StructField("cik", LongType(), True), # Nullable
    StructField("name", StringType(), True), # Nullable
    StructField("sic", IntegerType(), True), # Nullable
    StructField("countryba", StringType(), False), # Not nullable
    StructField("stprba", StringType(), True), # Nullable
    StructField("cityba", StringType(), False), # Not nullable
    StructField("zipba", StringType(), True), # Nullable
    StructField("bas1", StringType(), True), # Nullable
    StructField("bas2", StringType(), True), # Nullable
    StructField("baph", StringType(), True), # Nullable
    StructField("countryma", StringType(), True), # Nullable
    StructField("stprma", StringType(), True), # Nullable
    StructField("cityma", StringType(), True), # Nullable
    StructField("zipma", StringType(), True), # Nullable
    StructField("mas1", StringType(), True), # Nullable
    StructField("mas2", StringType(), True), # Nullable
    StructField("countryinc", StringType(), False), # Not nullable
    StructField("stprinc", StringType(), True), # Nullable
    StructField("ein", StringType(), True), # Nullable
    StructField("former", StringType(), True), # Nullable
    StructField("changed", DateType(), True), # Nullable
    StructField("afs", StringType(), True), # Nullable
    StructField("wksi", BooleanType(), False), # Not nullable
    StructField("fye", StringType(), False), # Not nullable
    StructField("form", StringType(), False), # Not nullable
    StructField("period", DateType(), True), # Not nullable
    StructField("fy", IntegerType(), False), # Not nullable
    StructField("fp", StringType(), False), # Not nullable
    StructField("filed", DateType(), False), # Not nullable
    StructField("accepted", TimestampType(), False), # Not nullable
    StructField("prevrpt", BooleanType(), False), # Not nullable
    StructField("detail", BooleanType(), False), # Not nullable
    StructField("instance", StringType(), False), # Not nullable
    StructField("nciks", IntegerType(), False), # Not nullable
    StructField("aciks", StringType(), True) # Nullable
])

tags_schema = StructType([
    StructField("tag", StringType(), False), # Not nullable
    StructField("version", StringType(), False), # Not nullable
    StructField("custom", IntegerType(), False), # Not nullable
    StructField("abstract", BooleanType(), False), # Not nullable
    StructField("datatype", StringType(), True), # Nullable
    StructField("iord", StringType(), True), # Nullable
    StructField("crdr", StringType(), True), # Nullable
    StructField("tlabel", StringType(), True), # Nullable
    StructField("doc", StringType(), True) # Nullable
])

numbers_schema = StructType([
    StructField("adsh", StringType(), True), # Nullable
    StructField("tag", StringType(), True), # Nullable
    StructField("version", StringType(), True), # Nullable
    StructField("ddate", DateType(), True), # Nullable
    StructField("qtrs", IntegerType(), True), # Nullable
    StructField("uom", StringType(), True), # Nullable
    StructField("coreg", StringType(), True), # Nullable
    StructField("value", DecimalType(28,4), True), # Nullable
    StructField("footnote", StringType(), True) # Nullable
])

presentations_schema = StructType([
    StructField("adsh", StringType(), False), # Not nullable
    StructField("report", IntegerType(), True), # Nullable
    StructField("line", IntegerType(), False), # Not nullable
    StructField("stmt", StringType(), False), # Not nullable
    StructField("inpth", BooleanType(), False), # Not nullable
    StructField("rfile", StringType(), False), # Not nullable
    StructField("tag", StringType(), False), # Not nullable
    StructField("version", StringType(), False), # Not nullable
    StructField("plabel", StringType(), False) # Not nullable
])


# Function to process files in a specific quarter folder
def process_quarter_folder(folder_path):
    files = dbutils.fs.ls(folder_path) # List all files in the folder
    for file in files:
        if file.name == 'pre.parquet/':
            file_path = file.path
            df = spark.read.parquet(file_path) # Read parquet file
            for field in presentations_schema.fields:
                df = df.withColumn(field.name, col(field.name).cast(field.dataType)) # Cast columns to the defined schema
            df.write.mode("append").saveAsTable('Presentations') # Save to Delta table
        elif file.name == 'num.parquet/':
            file_path = file.path
            df = spark.read.parquet(file_path)
            for field in numbers_schema.fields:
                df = df.withColumn(field.name, col(field.name).cast(field.dataType))
            df.write.mode("append").saveAsTable('Numbers')
        elif file.name == 'sub.parquet/':
            file_path = file.path
            df = spark.read.parquet(file_path)
            for field in submissions_schema.fields:
                df = df.withColumn(field.name, col(field.name).cast(field.dataType))
            df.write.mode("append").saveAsTable('Submissions')
        elif file.name == 'tag.parquet/':
            file_path = file.path
            df = spark.read.parquet(file_path)
            for field in tags_schema.fields:
                df = df.withColumn(field.name, col(field.name).cast(field.dataType))
            df.write.mode("append").saveAsTable('Tags')
        else:
            print(f"Skipping file {file.name} as it doesn't match any known pattern")

# Function to process all quarter folders
def process_all_quarters():
    quarters = dbutils.fs.ls(root_directory) # List all quarter folders
    for quarter in quarters:
        print(f"Processing folder: {quarter.name}")
        process_quarter_folder(quarter.path) # Process each quarter folder

# Enable automatic schema merging in Delta Lake
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

print(f"Starting to process all quarter folders from {root_directory}")
process_all_quarters()

print("Data upload completed for all quarters.")

In [0]:
%sql
select * from tags;

In [0]:
dbutils.fs.unmount(f"/mnt/{container_name_destination}")

## Create Database tables in Snowflake

In [0]:
%sql
#https://www.youtube.com/watch?v=9PBvVeCQi0w WATCH THIS
#https://www.youtube.com/watch?v=VOJ54hu2e2Q
#https://www.youtube.com/watch?v=CUR6rKrIEGc

USE DATABASE FINANCIAL;
USE SCHEMA ALLL;

CREATE TABLE Submissions (
    adsh TEXT PRIMARY KEY,
    cik BIGINT,
    name TEXT NOT NULL,
    sic INT,
    countryba TEXT,
    stprba TEXT,
    cityba TEXT,
    zipba TEXT,
    bas1 TEXT,
    bas2 TEXT,
    baph TEXT,
    countryma TEXT,
    stprma TEXT,
    cityma TEXT,
    zipma TEXT,
    mas1 TEXT,
    mas2 TEXT,
    countryinc TEXT,
    stprinc TEXT,
    ein TEXT,
    former TEXT,
    changed DATE,
    afs TEXT,
    wksi BOOLEAN NOT NULL,
    fye TEXT,
    form TEXT,
    period DATE,
    fy INT,
    fp TEXT,
    filed DATE NOT NULL,
    accepted TIMESTAMP NOT NULL,
    prevrpt BOOLEAN NOT NULL,
    detail BOOLEAN NOT NULL,
    instance TEXT NOT NULL,
    nciks INT,
    aciks TEXT
);

CREATE TABLE Tags (
    tag TEXT NOT NULL,
    version TEXT NOT NULL,
    custom INT NOT NULL,
    abstract BOOLEAN NOT NULL,
    datatype TEXT,
    iord TEXT,
    crdr TEXT,
    tlabel TEXT,
    doc TEXT,
    PRIMARY KEY (tag, version)
);

CREATE TABLE Numbers (
    adsh TEXT NOT NULL,
    tag TEXT NOT NULL,
    version TEXT NOT NULL,
    ddate DATE NOT NULL,
    qtrs INT NOT NULL,
    uom TEXT NOT NULL,
    coreg TEXT,
    value NUMERIC(28,4),
    footnote TEXT,
    PRIMARY KEY (adsh, tag, version, ddate, qtrs, uom),
    FOREIGN KEY (adsh) REFERENCES Submissions(adsh),
    FOREIGN KEY (tag, version) REFERENCES Tags(tag, version)
);

CREATE TABLE Presentations (
    adsh TEXT NOT NULL,
    report INT NOT NULL,
    line INT NOT NULL,
    stmt TEXT,
    inpth BOOLEAN NOT NULL,
    rfile TEXT,
    tag TEXT,
    version TEXT,
    plabel TEXT,
    PRIMARY KEY (adsh, report, line),
    FOREIGN KEY (adsh) REFERENCES Submissions(adsh),
    FOREIGN KEY (tag, version) REFERENCES Tags(tag, version)
);


## Push to SNOWFLAKE

In [0]:
sfOptions = {
    "sfURL": "MPAGWCW-OX41008.snowflakecomputing.com",
    "sfDatabase": "FINANCIAL",
    "sfSchema": "ALLL",
    "sfWarehouse": "COMPUTE_WH",
    "sfUser": "MICHAELGEORGE",
    "sfPassword": ""
}

sub = spark.read.format("delta").table("Submissions")
tag = spark.read.format("delta").table("Tags")
pre = spark.read.format("delta").table("Presentations")
num = spark.read.format("delta").table("Numbers")



sub.write \
  .format("snowflake").options(**sfOptions).option("dbtable", "Submissions").mode("overwrite").save()

tag.write \
  .format("snowflake").options(**sfOptions).option("dbtable", "Tags").mode("overwrite").save()


pre.write \
  .format("snowflake").options(**sfOptions).option("dbtable", "Presentations").mode("overwrite").save()

num.write \
  .format("snowflake").options(**sfOptions).option("dbtable", "Numbers").mode("overwrite").save()
