# Bronze layer

### Testing for valid URLs

In [None]:
#https://www.sec.gov/files/dera/data/financial-statement-data-sets/2011q2.zip

import requests
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient



headers = {
   "User-Agent": "jo boulement jo@gmx.at",
    "Accept-Encoding": "gzip, deflate" 
}

def generate_sec_urls(start_year = 2009, start_quarter = 1):
  urls = []
  year = start_year
  quarter = start_quarter

  while True:
    url = f"https://www.sec.gov/files/dera/data/financial-statement-data-sets/{year}q{quarter}.zip"
    print(f"checking URL: {url}")
    response = requests.get(url,headers= headers)
    print(f"status code: {response.status_code}")
    if response.status_code == 200:
      urls.append(url) 
      quarter += 1
      if quarter > 4:
        quarter = 1
        year +=1
    else:
      break
  
  return urls

valid_urls = generate_sec_urls()
valid_urls


checking URL: https://www.sec.gov/files/dera/data/financial-statement-data-sets/2009q1.zip
status code: 200
checking URL: https://www.sec.gov/files/dera/data/financial-statement-data-sets/2009q2.zip
status code: 200
checking URL: https://www.sec.gov/files/dera/data/financial-statement-data-sets/2009q3.zip
status code: 200
checking URL: https://www.sec.gov/files/dera/data/financial-statement-data-sets/2009q4.zip
status code: 200
checking URL: https://www.sec.gov/files/dera/data/financial-statement-data-sets/2010q1.zip
status code: 200
checking URL: https://www.sec.gov/files/dera/data/financial-statement-data-sets/2010q2.zip
status code: 200
checking URL: https://www.sec.gov/files/dera/data/financial-statement-data-sets/2010q3.zip
status code: 200
checking URL: https://www.sec.gov/files/dera/data/financial-statement-data-sets/2010q4.zip
status code: 200
checking URL: https://www.sec.gov/files/dera/data/financial-statement-data-sets/2011q1.zip
status code: 200
checking URL: https://www.se

['https://www.sec.gov/files/dera/data/financial-statement-data-sets/2009q1.zip',
 'https://www.sec.gov/files/dera/data/financial-statement-data-sets/2009q2.zip',
 'https://www.sec.gov/files/dera/data/financial-statement-data-sets/2009q3.zip',
 'https://www.sec.gov/files/dera/data/financial-statement-data-sets/2009q4.zip',
 'https://www.sec.gov/files/dera/data/financial-statement-data-sets/2010q1.zip',
 'https://www.sec.gov/files/dera/data/financial-statement-data-sets/2010q2.zip',
 'https://www.sec.gov/files/dera/data/financial-statement-data-sets/2010q3.zip',
 'https://www.sec.gov/files/dera/data/financial-statement-data-sets/2010q4.zip',
 'https://www.sec.gov/files/dera/data/financial-statement-data-sets/2011q1.zip',
 'https://www.sec.gov/files/dera/data/financial-statement-data-sets/2011q2.zip',
 'https://www.sec.gov/files/dera/data/financial-statement-data-sets/2011q3.zip',
 'https://www.sec.gov/files/dera/data/financial-statement-data-sets/2011q4.zip',
 'https://www.sec.gov/files/

### Pushing to Blob Storage Container

In [None]:
connection_string = ""
container_name = "testtech"

#blob client
blob_service_client = BlobServiceClient.from_connection_string(connection_string)

def download_to_blob(url, blob_name):
  response = requests.get(url, headers=headers)
  if response.status_code == 200:
    blob_client = blob_service_client.get_blob_client(container=container_name, blob = blob_name)
    blob_client.upload_blob(response.content, overwrite = True)
    print(f'file {blob_name} uploaded to blob storage')
  else:
    print(f'failed to download from {url}. Status code {response.status_code}')


def main():
  for url in valid_urls:
    blob_name = url.split('/')[-1]
    download_to_blob(url, blob_name)


if __name__ == "__main__":
    main()



file 2009q1.zip uploaded to blob storage
file 2009q2.zip uploaded to blob storage
file 2009q3.zip uploaded to blob storage
file 2009q4.zip uploaded to blob storage
file 2010q1.zip uploaded to blob storage
file 2010q2.zip uploaded to blob storage
file 2010q3.zip uploaded to blob storage
file 2010q4.zip uploaded to blob storage
file 2011q1.zip uploaded to blob storage
file 2011q2.zip uploaded to blob storage
file 2011q3.zip uploaded to blob storage
file 2011q4.zip uploaded to blob storage
file 2012q1.zip uploaded to blob storage
file 2012q2.zip uploaded to blob storage
file 2012q3.zip uploaded to blob storage
file 2012q4.zip uploaded to blob storage
file 2013q1.zip uploaded to blob storage
file 2013q2.zip uploaded to blob storage
file 2013q3.zip uploaded to blob storage
file 2013q4.zip uploaded to blob storage
file 2014q1.zip uploaded to blob storage
file 2014q2.zip uploaded to blob storage
file 2014q3.zip uploaded to blob storage
file 2014q4.zip uploaded to blob storage
file 2015q1.zip 

# Silver Layer

## Mount Blob storage on Databrick's DBFS


In [None]:
storage_account_name = "testtech"
storage_account_key = ''
container_name_source = "testtech"
container_name_dest = "newtesttech"


# Configure the spark context with the storage account key
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key)

# Mount your source container
dbutils.fs.mount(
  source = f"wasbs://{container_name_source}@{storage_account_name}.blob.core.windows.net",
  mount_point = f"/mnt/{container_name_source}",
  extra_configs = {f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key}
)

# Mount your destination container
dbutils.fs.mount(
  source = f"wasbs://newtesttech@{storage_account_name}.blob.core.windows.net/",
  mount_point = "/mnt/newtesttech",
  extra_configs = {f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key}
)

True

In [None]:
dbutils.fs.ls(f"/mnt/{container_name_source}")

[FileInfo(path='dbfs:/mnt/testtech/2009q2.zip', name='2009q2.zip', size=163510, modificationTime=1719415581000),
 FileInfo(path='dbfs:/mnt/testtech/2009q3.zip', name='2009q3.zip', size=2578176, modificationTime=1719415582000),
 FileInfo(path='dbfs:/mnt/testtech/2009q4.zip', name='2009q4.zip', size=2855181, modificationTime=1719415582000),
 FileInfo(path='dbfs:/mnt/testtech/2010q1.zip', name='2010q1.zip', size=3299076, modificationTime=1719415583000),
 FileInfo(path='dbfs:/mnt/testtech/2010q2.zip', name='2010q2.zip', size=2861043, modificationTime=1719415583000),
 FileInfo(path='dbfs:/mnt/testtech/temp_folder/', name='temp_folder/', size=0, modificationTime=1720402736000)]

## Unzip and move contents from one container to another after transformation

In [None]:
import os
import zipfile
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col



zip_files = dbutils.fs.ls(f"/mnt/{container_name_source}")
zip_files = [f for f in zip_files if f.name.endswith('.zip')]
for zip_file in zip_files:
    
    new_folder_name = zip_file.name.split('.')[0]
    dbutils.fs.mkdirs(f"/mnt/{container_name_dest}/{new_folder_name}")
    
    extract_path = f'/dbfs/mnt/{container_name_source}/temp_folder'
    working_dir = f'/dbfs/mnt/{container_name_source}/{new_folder_name}.zip'
    #dbutils.fs.ls(f"/mnt/{container_name_source}")
    dbutils.fs.mkdirs(extract_path)
    
    with zipfile.ZipFile(working_dir, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
    
    
    sub_path = f'/mnt/{container_name_source}/temp_folder/sub.txt'
    sub_df = spark.read.option("delimiter", "\t").option("header", "true").csv(sub_path)
    sub_df = sub_df.withColumn("filed", to_date(col("filed"), "yyyyMMdd")).withColumn("period", to_date(col("period"), "yyyyMMdd"))
    sub_df.write.mode("overwrite").parquet(f'/mnt/{container_name_dest}/{new_folder_name}/sub.parquet')
    
    tag_path = f'/mnt/{container_name_source}/temp_folder/tag.txt'
    tag_df = spark.read.option("delimiter", "\t").option("header", "true").csv(tag_path)
    tag_df.write.mode("overwrite").parquet(f'/mnt/{container_name_dest}/{new_folder_name}/tag.parquet')
    
    
    num_path = f'/mnt/{container_name_source}/temp_folder/num.txt'
    num_df = spark.read.option("delimiter", "\t").option("header", "true").csv(num_path)
    num_df = num_df.withColumn("ddate", to_date(col("ddate"), "yyyyMMdd"))
    num_df.write.mode("overwrite").parquet(f'/mnt/{container_name_dest}/{new_folder_name}/num.parquet')
    
    
    pre_path = f'/mnt/{container_name_source}/temp_folder/pre.txt'
    pre_df = spark.read.option("delimiter", "\t").option("header", "true").csv(pre_path)
    pre_df.write.mode("overwrite").parquet(f'/mnt/{container_name_dest}/{new_folder_name}/pre.parquet')
    
    dbutils.fs.rm(extract_path, recurse=True)

# Unmount the containers
dbutils.fs.unmount(f"/mnt/{container_name_source}")
dbutils.fs.unmount(f"/mnt/{container_name_dest}")

/mnt/testtech has been unmounted.
/mnt/newtesttech has been unmounted.


True

# Gold Layer

## Create Delta tables

In [None]:
%sql
-- Submissions table
CREATE TABLE Submissions (
    adsh STRING NOT NULL,
    cik BIGINT,
    name STRING NOT NULL,
    sic INT,
    countryba STRING,
    stprba STRING,
    cityba STRING,
    zipba STRING,
    bas1 STRING,
    bas2 STRING,
    baph STRING,
    countryma STRING,
    stprma STRING,
    cityma STRING,
    zipma STRING,
    mas1 STRING,
    mas2 STRING,
    countryinc STRING ,
    stprinc STRING,
    ein STRING,
    former STRING,
    changed DATE,
    afs STRING,
    wksi BOOLEAN NOT NULL,
    fye STRING,
    form STRING,
    period DATE NOT NULL,
    fy INT ,
    fp STRING,
    filed DATE NOT NULL,
    accepted TIMESTAMP NOT NULL,
    prevrpt BOOLEAN NOT NULL,
    detail BOOLEAN NOT NULL,
    instance STRING NOT NULL,
    nciks INT,
    aciks STRING
) USING DELTA;

-- Tags table
CREATE TABLE Tags (
    tag STRING NOT NULL,
    version STRING NOT NULL,
    custom INT NOT NULL,
    abstract BOOLEAN NOT NULL,
    datatype STRING,
    iord STRING,
    crdr STRING,
    tlabel STRING,
    doc STRING
) USING DELTA;

-- Numbers table
CREATE TABLE Numbers (
    adsh STRING NOT NULL,
    tag STRING NOT NULL,
    version STRING NOT NULL,
    ddate DATE NOT NULL,
    qtrs INT NOT NULL,
    uom STRING NOT NULL,
    coreg STRING ,
    value DECIMAL(28,4),
    footnote STRING
) USING DELTA;

-- Presentations table
CREATE TABLE Presentations (
    adsh STRING NOT NULL,
    report INT NOT NULL,
    line INT NOT NULL,
    stmt STRING,
    inpth BOOLEAN NOT NULL,
    rfile STRING,
    tag STRING,
    version STRING,
    plabel STRING
) USING DELTA;

In [None]:
%sql
-- Drop the tables
 -- DROP TABLE IF EXISTS Presentations;
-- DROP TABLE IF EXISTS Numbers;
--DROP TABLE IF EXISTS Tags;
--DROP TABLE IF EXISTS Submissions;

## Add Primary and Foreign keys

In [None]:
%sql
-- Add primary keys
ALTER TABLE Submissions ADD CONSTRAINT pk_submissions PRIMARY KEY (adsh);
ALTER TABLE Tags ADD CONSTRAINT pk_tags PRIMARY KEY (tag, version);
ALTER TABLE Numbers ADD CONSTRAINT pk_numbers PRIMARY KEY (adsh, tag, version, ddate, qtrs, uom);
ALTER TABLE Presentations ADD CONSTRAINT pk_presentations PRIMARY KEY (adsh, report, line);

-- Add foreign keys
ALTER TABLE Numbers ADD CONSTRAINT fk_numbers_submissions 
    FOREIGN KEY (adsh) REFERENCES Submissions(adsh);
ALTER TABLE Numbers ADD CONSTRAINT fk_numbers_tags 
    FOREIGN KEY (tag, version) REFERENCES Tags(tag, version);
ALTER TABLE Presentations ADD CONSTRAINT fk_presentations_submissions 
    FOREIGN KEY (adsh) REFERENCES Submissions(adsh);
ALTER TABLE Presentations ADD CONSTRAINT fk_presentations_tags 
    FOREIGN KEY (tag, version) REFERENCES Tags(tag, version);



## Mount your storage container

In [None]:
storage_account_name = "testtech"
storage_account_key = ''
container_name_source = "testtech"
container_name_dest = "newtesttech"


# Configure the spark context with the storage account key
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key)

# Mount your destination container
dbutils.fs.mount(
  source = f"wasbs://newtesttech@{storage_account_name}.blob.core.windows.net/",
  mount_point = "/mnt/newtesttech",
  extra_configs = {f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key}
)

True

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import *

# Create SparkSession
spark = SparkSession.builder.appName("ParquetToDelta").getOrCreate()

# Define the mount point where your Blob Storage is mounted
root_directory = "/mnt/newtesttech"

# Define schemas for each table
submissions_schema = StructType([
    StructField("adsh", StringType(), False),
    StructField("cik", LongType(), True),
    StructField("name", StringType(), True),
    StructField("sic", IntegerType(), True),
    StructField("countryba", StringType(), False),
    StructField("stprba", StringType(), True),
    StructField("cityba", StringType(), False),
    StructField("zipba", StringType(), True),
    StructField("bas1", StringType(), True),
    StructField("bas2", StringType(), True),
    StructField("baph", StringType(), True),
    StructField("countryma", StringType(), True),
    StructField("stprma", StringType(), True),
    StructField("cityma", StringType(), True),
    StructField("zipma", StringType(), True),
    StructField("mas1", StringType(), True),
    StructField("mas2", StringType(), True),
    StructField("countryinc", StringType(), False),
    StructField("stprinc", StringType(), True),
    StructField("ein", StringType(), True),
    StructField("former", StringType(), True),
    StructField("changed", DateType(), True),
    StructField("afs", StringType(), True),
    StructField("wksi", BooleanType(), False),
    StructField("fye", StringType(), False),
    StructField("form", StringType(), False),
    StructField("period", DateType(), False),
    StructField("fy", IntegerType(), False),
    StructField("fp", StringType(), False),
    StructField("filed", DateType(), False),
    StructField("accepted", TimestampType(), False),
    StructField("prevrpt", BooleanType(), False),
    StructField("detail", BooleanType(), False),
    StructField("instance", StringType(), False),
    StructField("nciks", IntegerType(), False),
    StructField("aciks", StringType(), True)
])

tags_schema = StructType([
    StructField("tag", StringType(), False),
    StructField("version", StringType(), False),
    StructField("custom", IntegerType(), False),
    StructField("abstract", BooleanType(), False),
    StructField("datatype", StringType(), True),
    StructField("iord", StringType(), True),
    StructField("crdr", StringType(), True),
    StructField("tlabel", StringType(), True),
    StructField("doc", StringType(), True)
])

numbers_schema = StructType([
    StructField("adsh", StringType(), True),
    StructField("tag", StringType(), True),
    StructField("version", StringType(), True),
    StructField("ddate", DateType(), True),
    StructField("qtrs", IntegerType(), True),
    StructField("uom", StringType(), True),
    StructField("coreg", StringType(), True),
    StructField("value", DecimalType(28,4), True),
    StructField("footnote", StringType(), True)
])

presentations_schema = StructType([
    StructField("adsh", StringType(), False),
    StructField("report", IntegerType(), True),
    StructField("line", IntegerType(), False),
    StructField("stmt", StringType(), False),
    StructField("inpth", BooleanType(), False),
    StructField("rfile", StringType(), False),
    StructField("tag", StringType(), False),
    StructField("version", StringType(), False),
    StructField("plabel", StringType(), False)
])



# Function to process files in a specific quarter folder
def process_quarter_folder(folder_path):

    files = dbutils.fs.ls(folder_path)
    for file in files:
        if file.name == 'pre.parquet/':
            file_path = file.path
            df = spark.read.parquet(file_path)
            for field in presentations_schema.fields:
                df = df.withColumn(field.name, col(field.name).cast(field.dataType))
            df.write.mode("append").saveAsTable('Presentations')
        elif file.name == 'num.parquet/':
            file_path = file.path
            df = spark.read.parquet(file_path)
            for field in numbers_schema.fields:
                df = df.withColumn(field.name, col(field.name).cast(field.dataType))
            df.write.mode("append").saveAsTable('Numbers')

        elif file.name == 'sub.parquet/':
            file_path = file.path
            df = spark.read.parquet(file_path)
            for field in submissions_schema.fields:
                df = df.withColumn(field.name, col(field.name).cast(field.dataType))
            df.write.mode("append").saveAsTable('Submissions')

        elif file.name == 'tag.parquet/':
            file_path = file.path
            df = spark.read.parquet(file_path)
            for field in tags_schema.fields:
                df = df.withColumn(field.name, col(field.name).cast(field.dataType))
            df.write.mode("append").saveAsTable('Tags')
        else:
            print(f"Skipping file {file.name} as it doesn't match any known pattern")

# Function to process all quarter folders
def process_all_quarters():
    quarters = dbutils.fs.ls(root_directory)
    for quarter in quarters:
        print(f"Processing folder: {quarter.name}")
        process_quarter_folder(quarter.path)

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

print(f"Starting to process all quarter folders from {root_directory}")
process_all_quarters()

print("Data upload completed for all quarters.")

Starting to process all quarter folders from /mnt/newtesttech
Processing folder: 2009q2/
Processing folder: 2009q3/
Processing folder: 2009q4/
Processing folder: 2010q1/
Processing folder: 2010q2/
Data upload completed for all quarters.


In [None]:
dbutils.fs.unmount(f"/mnt/newtesttech")

/mnt/newtesttech has been unmounted.


True

## Simple Data Analysis in SQL

### Submission count of companies

In [None]:
%sql
Select name, COUNT(*) as submission_count
from Submissions
group by name
order by submission_count desc
limit 10;

name,submission_count
GENZYME CORP,6
SOUTHWEST AIRLINES CO,6
CHEVRON CORP,6
"LIBERTY GLOBAL, INC.",6
"CF INDUSTRIES HOLDINGS, INC.",6
CABLEVISION SYSTEMS CORP /NY,6
GENERAL ELECTRIC CO,6
ISSUER DIRECT CORP,5
FIRSTENERGY CORP,5
PAPA JOHNS INTERNATIONAL INC,5


###  Distribution of companies by country

In [None]:
%sql
SELECT countryba, COUNT(*) as company_count
FROM Submissions
GROUP BY countryba
ORDER BY company_count DESC;

countryba,company_count
US,1854
BM,19
CA,18
IE,15
CH,12
JP,11
BR,9
GB,7
IL,5
DE,4


### Distribution of companies by SIC code

In [None]:
%sql
SELECT sic, COUNT(*) as company_count
FROM Submissions
WHERE sic IS NOT NULL
GROUP BY sic
ORDER BY company_count DESC;

sic,company_count
1311,88
4911,73
6798,67
7372,57
2834,55
3674,49
6021,42
1381,39
6331,38
4841,38


### Most commonly used tags

In [None]:
%sql
SELECT t.tag, t.custom, COUNT(*) as usage_count
FROM Numbers n
JOIN Tags t ON n.tag = t.tag
GROUP BY t.tag, t.custom
ORDER BY usage_count DESC
LIMIT 20;

tag,custom,usage_count
ProfitLoss,1,245700
StockholdersEquityIncludingPortionAttributableToNoncontrollingInterest,1,228564
NetIncomeLossAttributableToNoncontrollingInterest,1,169534
IncomeLossFromContinuingOperationsBeforeIncomeTaxes,1,102138
PrepaidExpensesAndOtherCurrentAssets,1,51198
CashAndCashEquivalentsAtCarryingValue,0,46500
IncomeLossFromContinuingOperationsIncludingPortionAttributableToNoncontrollingInterest,1,42891
ComprehensiveIncomeNetOfTaxIncludingPortionAttributableToNoncontrollingInterest,1,40575
IncomeTaxExpenseBenefit,0,37518
EarningsPerShareBasic,0,35400


### Most common report types

In [None]:
%sql
SELECT stmt, COUNT(*) as usage_count
FROM Presentations
GROUP BY stmt
ORDER BY usage_count DESC;

stmt,usage_count
BS,102534
CF,91943
IS,63832
EQ,35673
CP,17163
CI,8489
UN,2727


### Submission trends over time

In [None]:
%sql
SELECT EXTRACT(YEAR FROM filed) as year, EXTRACT(MONTH FROM filed) as month, COUNT(*) as filing_count
FROM Submissions
GROUP BY year, month
ORDER BY year, month;

year,month,filing_count
2009,4,8
2009,5,16
2009,6,2
2009,7,113
2009,8,287
2009,9,39
2009,10,152
2009,11,294
2009,12,40
2010,1,34
