## Mount Azure Blob Storage to Databricks

In [None]:
storage_account = "onpremtocloudsto"
raw_container = "raw"
access_key = "REMOVED"

dbutils.fs.mount(
    source = f"wasbs://{raw_container}@{storage_account}.blob.core.windows.net",
    mount_point = "/mnt/raw_data",
    extra_configs = {f"fs.azure.account.key.{storage_account}.blob.core.windows.net": access_key}
)




Out[5]: True

In [None]:
dbutils.fs.ls('/mnt/raw')

Out[6]: [FileInfo(path='dbfs:/mnt/raw/dbo.layoffs.txt', name='dbo.layoffs.txt', size=245238, modificationTime=1743769785000)]

## Importing Libraries

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *



## Reading Data

In [None]:
df_layoffs=spark.read.format('csv')\
                      .option('inferschema',True)\
                      .option('Header',True) \
                      .load('/mnt/raw_data')



In [None]:
df_layoffs.display()

company,location,industry,total_laid_off,percentage_laid_off,date,stage,country,funds_raised_millions
Atlassian,Sydney,Other,500.0,0.05,2023-03-06T00:00:00.000+0000,Post-IPO,Australia,210.0
SiriusXM,New York City,Media,475.0,0.08,2023-03-06T00:00:00.000+0000,Post-IPO,United States,525.0
Alerzo,Ibadan,Retail,400.0,,2023-03-06T00:00:00.000+0000,Series B,Nigeria,16.0
UpGrad,Mumbai,Education,120.0,,2023-03-06T00:00:00.000+0000,Unknown,India,631.0
Loft,Sao Paulo,Real Estate,340.0,0.15,2023-03-03T00:00:00.000+0000,Unknown,Brazil,788.0
Embark Trucks,SF Bay Area,Transportation,230.0,0.7,2023-03-03T00:00:00.000+0000,Post-IPO,United States,317.0
Lendi,Sydney,Real Estate,100.0,,2023-03-03T00:00:00.000+0000,Unknown,Australia,59.0
UserTesting,SF Bay Area,Marketing,63.0,,2023-03-03T00:00:00.000+0000,Acquired,United States,152.0
Airbnb,SF Bay Area,,30.0,,2023-03-03T00:00:00.000+0000,Post-IPO,United States,6400.0
Accolade,Seattle,Healthcare,,,2023-03-03T00:00:00.000+0000,Post-IPO,United States,458.0


In [None]:
df_layoffs.printSchema()

root
 |-- company: string (nullable = true)
 |-- location: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- total_laid_off: string (nullable = true)
 |-- percentage_laid_off: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- stage: string (nullable = true)
 |-- country: string (nullable = true)
 |-- funds_raised_millions: string (nullable = true)



In [None]:


df_layoffs = df_layoffs.select(
    col("company"),
    col("location"),
    col("industry"),
    col("total_laid_off").cast("integer").alias("total_laid_off"),
    col("percentage_laid_off").cast("double").alias("percentage_laid_off"),
    col("date").cast("date").alias("date"),
    col("stage"),
    col("country"),
    col("funds_raised_millions").cast("integer").alias("funds_raised_millions")
)

In [None]:
df_layoffs.display()

company,location,industry,total_laid_off,percentage_laid_off,date,stage,country,funds_raised_millions
Atlassian,Sydney,Other,500.0,0.05,2023-03-06,Post-IPO,Australia,210.0
SiriusXM,New York City,Media,475.0,0.08,2023-03-06,Post-IPO,United States,525.0
Alerzo,Ibadan,Retail,400.0,,2023-03-06,Series B,Nigeria,16.0
UpGrad,Mumbai,Education,120.0,,2023-03-06,Unknown,India,631.0
Loft,Sao Paulo,Real Estate,340.0,0.15,2023-03-03,Unknown,Brazil,788.0
Embark Trucks,SF Bay Area,Transportation,230.0,0.7,2023-03-03,Post-IPO,United States,317.0
Lendi,Sydney,Real Estate,100.0,,2023-03-03,Unknown,Australia,59.0
UserTesting,SF Bay Area,Marketing,63.0,,2023-03-03,Acquired,United States,152.0
Airbnb,SF Bay Area,,30.0,,2023-03-03,Post-IPO,United States,6400.0
Accolade,Seattle,Healthcare,,,2023-03-03,Post-IPO,United States,458.0


In [None]:
from pyspark.sql.functions import col, sum as spark_sum

null_counts = df_layoffs.select([
    spark_sum(col(c).isNull().cast("integer")).alias(c)
    for c in df_layoffs.columns
])

display(null_counts)

company,location,industry,total_laid_off,percentage_laid_off,date,stage,country,funds_raised_millions
0,0,3,702,760,1,0,0,203


In [None]:
from pyspark.sql.functions import col, when, isnull

# To Define column groups
text_columns = ["company", "location", "industry", "stage", "country"]
numeric_columns = ["total_laid_off", "funds_raised_millions"]
percentage_column = ["percentage_laid_off"]

# Replace nulls in TEXT columns with "Unknown"
for column in text_columns:
    df_layoffs = df_layoffs.withColumn(
        column,
        when(isnull(col(column)), "Unknown").otherwise(col(column))
    )

# Replace nulls in NUMERIC columns with 0 (fixed parentheses)
for column in numeric_columns + percentage_column:
    df_layoffs = df_layoffs.withColumn(
        column,
        when(isnull(col(column)), 0).otherwise(col(column)).cast("double")
    )

In [None]:
df_layoffs.display()

company,location,industry,total_laid_off,percentage_laid_off,date,stage,country,funds_raised_millions
Atlassian,Sydney,Other,500.0,0.05,2023-03-06,Post-IPO,Australia,210.0
SiriusXM,New York City,Media,475.0,0.08,2023-03-06,Post-IPO,United States,525.0
Alerzo,Ibadan,Retail,400.0,0.0,2023-03-06,Series B,Nigeria,16.0
UpGrad,Mumbai,Education,120.0,0.0,2023-03-06,Unknown,India,631.0
Loft,Sao Paulo,Real Estate,340.0,0.15,2023-03-03,Unknown,Brazil,788.0
Embark Trucks,SF Bay Area,Transportation,230.0,0.7,2023-03-03,Post-IPO,United States,317.0
Lendi,Sydney,Real Estate,100.0,0.0,2023-03-03,Unknown,Australia,59.0
UserTesting,SF Bay Area,Marketing,63.0,0.0,2023-03-03,Acquired,United States,152.0
Airbnb,SF Bay Area,Unknown,30.0,0.0,2023-03-03,Post-IPO,United States,6400.0
Accolade,Seattle,Healthcare,0.0,0.0,2023-03-03,Post-IPO,United States,458.0


In [None]:
# To Save your final DataFrame
df_layoffs.write.parquet("/dbfs/FileStore/layoffs_processed.parquet")

In [None]:
df_layoffs.display()

company,location,industry,total_laid_off,percentage_laid_off,date,stage,country,funds_raised_millions,month_name
Atlassian,Sydney,Other,500.0,0.05,2023-03-06,Post-IPO,Australia,210.0,March
SiriusXM,New York City,Media,475.0,0.08,2023-03-06,Post-IPO,United States,525.0,March
Alerzo,Ibadan,Retail,400.0,0.0,2023-03-06,Series B,Nigeria,16.0,March
UpGrad,Mumbai,Education,120.0,0.0,2023-03-06,Unknown,India,631.0,March
Loft,Sao Paulo,Real Estate,340.0,0.15,2023-03-03,Unknown,Brazil,788.0,March
Embark Trucks,SF Bay Area,Transportation,230.0,0.7,2023-03-03,Post-IPO,United States,317.0,March
Lendi,Sydney,Real Estate,100.0,0.0,2023-03-03,Unknown,Australia,59.0,March
UserTesting,SF Bay Area,Marketing,63.0,0.0,2023-03-03,Acquired,United States,152.0,March
Airbnb,SF Bay Area,Unknown,30.0,0.0,2023-03-03,Post-IPO,United States,6400.0,March
Accolade,Seattle,Healthcare,0.0,0.0,2023-03-03,Post-IPO,United States,458.0,March


In [None]:

df_layoffs = (df_layoffs
    .withColumn("year", year(col("date")))
    .withColumn("month_num", month(col("date")))
    .withColumn("month_name", date_format(col("date"), "MMMM"))
)

column_order = ["date", "year", "month_num", "month_name"] + \
               [c for c in df_layoffs.columns if c not in ["date", "year", "month_num", "month_name"]]

df_layoffs = df_layoffs.select(column_order)


display(df_layoffs)

date,year,month_num,month_name,company,location,industry,total_laid_off,percentage_laid_off,stage,country,funds_raised_millions,month
2023-03-06,2023.0,3.0,March,Atlassian,Sydney,Other,500.0,0.05,Post-IPO,Australia,210.0,3.0
2023-03-06,2023.0,3.0,March,SiriusXM,New York City,Media,475.0,0.08,Post-IPO,United States,525.0,3.0
2023-03-06,2023.0,3.0,March,Alerzo,Ibadan,Retail,400.0,0.0,Series B,Nigeria,16.0,3.0
2023-03-06,2023.0,3.0,March,UpGrad,Mumbai,Education,120.0,0.0,Unknown,India,631.0,3.0
2023-03-03,2023.0,3.0,March,Loft,Sao Paulo,Real Estate,340.0,0.15,Unknown,Brazil,788.0,3.0
2023-03-03,2023.0,3.0,March,Embark Trucks,SF Bay Area,Transportation,230.0,0.7,Post-IPO,United States,317.0,3.0
2023-03-03,2023.0,3.0,March,Lendi,Sydney,Real Estate,100.0,0.0,Unknown,Australia,59.0,3.0
2023-03-03,2023.0,3.0,March,UserTesting,SF Bay Area,Marketing,63.0,0.0,Acquired,United States,152.0,3.0
2023-03-03,2023.0,3.0,March,Airbnb,SF Bay Area,Unknown,30.0,0.0,Post-IPO,United States,6400.0,3.0
2023-03-03,2023.0,3.0,March,Accolade,Seattle,Healthcare,0.0,0.0,Post-IPO,United States,458.0,3.0


In [None]:

duplicate_count = df_layoffs.count() - df_layoffs.distinct().count()
print(f"Total duplicate rows: {duplicate_count}")

Total duplicate rows: 0


In [None]:

spark.conf.set(
    "fs.azure.account.key.onpremtocloudsto.blob.core.windows.net",
    "REMOVED"
)


(df_layoffs.write
    .format("parquet")
    .option("header", "true")
    .mode("overwrite")
    .save("wasbs://transformed@onpremtocloudsto.blob.core.windows.net/layoffs")
)

In [None]:
display(dbutils.fs.ls("wasbs://transformed@onpremtocloudsto.blob.core.windows.net/layoffs/"))

path,name,size,modificationTime
wasbs://transformed@onpremtocloudsto.blob.core.windows.net/layoffs/_SUCCESS,_SUCCESS,0,1743928642000
wasbs://transformed@onpremtocloudsto.blob.core.windows.net/layoffs/_committed_2087610143358017104,_committed_2087610143358017104,210,1743928640000
wasbs://transformed@onpremtocloudsto.blob.core.windows.net/layoffs/_committed_2428823201101023082,_committed_2428823201101023082,112,1743927085000
wasbs://transformed@onpremtocloudsto.blob.core.windows.net/layoffs/_committed_8766842119838399893,_committed_8766842119838399893,210,1743928555000
wasbs://transformed@onpremtocloudsto.blob.core.windows.net/layoffs/_started_2087610143358017104,_started_2087610143358017104,0,1743928635000
wasbs://transformed@onpremtocloudsto.blob.core.windows.net/layoffs/_started_2428823201101023082,_started_2428823201101023082,0,1743927080000
wasbs://transformed@onpremtocloudsto.blob.core.windows.net/layoffs/_started_8766842119838399893,_started_8766842119838399893,0,1743928551000
wasbs://transformed@onpremtocloudsto.blob.core.windows.net/layoffs/part-00000-tid-2087610143358017104-d3616d46-aed7-4928-9ead-e95f05572361-22-1-c000.snappy.parquet,part-00000-tid-2087610143358017104-d3616d46-aed7-4928-9ead-e95f05572361-22-1-c000.snappy.parquet,50523,1743928638000


In [None]:
# Create a local temporary copy in the same format
temp_path = "/tmp/layoffs_preview"
(df_layoffs.write
  .format("parquet")
  .option("header", "true")
  .mode("overwrite")
  .save(temp_path)
)


preview_df = spark.read.option("header", "true").parquet(temp_path)
display(preview_df)

date,year,month_num,month_name,company,location,industry,total_laid_off,percentage_laid_off,stage,country,funds_raised_millions,month
2023-03-06,2023.0,3.0,March,Atlassian,Sydney,Other,500.0,0.05,Post-IPO,Australia,210.0,3.0
2023-03-06,2023.0,3.0,March,SiriusXM,New York City,Media,475.0,0.08,Post-IPO,United States,525.0,3.0
2023-03-06,2023.0,3.0,March,Alerzo,Ibadan,Retail,400.0,0.0,Series B,Nigeria,16.0,3.0
2023-03-06,2023.0,3.0,March,UpGrad,Mumbai,Education,120.0,0.0,Unknown,India,631.0,3.0
2023-03-03,2023.0,3.0,March,Loft,Sao Paulo,Real Estate,340.0,0.15,Unknown,Brazil,788.0,3.0
2023-03-03,2023.0,3.0,March,Embark Trucks,SF Bay Area,Transportation,230.0,0.7,Post-IPO,United States,317.0,3.0
2023-03-03,2023.0,3.0,March,Lendi,Sydney,Real Estate,100.0,0.0,Unknown,Australia,59.0,3.0
2023-03-03,2023.0,3.0,March,UserTesting,SF Bay Area,Marketing,63.0,0.0,Acquired,United States,152.0,3.0
2023-03-03,2023.0,3.0,March,Airbnb,SF Bay Area,Unknown,30.0,0.0,Post-IPO,United States,6400.0,3.0
2023-03-03,2023.0,3.0,March,Accolade,Seattle,Healthcare,0.0,0.0,Post-IPO,United States,458.0,3.0


In [None]:
#verify the files in blob
blob_path = "wasbs://transformed@onpremtocloudsto.blob.core.windows.net/layoffs"
display(dbutils.fs.ls(blob_path))

path,name,size,modificationTime
wasbs://transformed@onpremtocloudsto.blob.core.windows.net/layoffs/_SUCCESS,_SUCCESS,0,1743928642000
wasbs://transformed@onpremtocloudsto.blob.core.windows.net/layoffs/_committed_2087610143358017104,_committed_2087610143358017104,210,1743928640000
wasbs://transformed@onpremtocloudsto.blob.core.windows.net/layoffs/_committed_2428823201101023082,_committed_2428823201101023082,112,1743927085000
wasbs://transformed@onpremtocloudsto.blob.core.windows.net/layoffs/_committed_8766842119838399893,_committed_8766842119838399893,210,1743928555000
wasbs://transformed@onpremtocloudsto.blob.core.windows.net/layoffs/_started_2087610143358017104,_started_2087610143358017104,0,1743928635000
wasbs://transformed@onpremtocloudsto.blob.core.windows.net/layoffs/_started_2428823201101023082,_started_2428823201101023082,0,1743927080000
wasbs://transformed@onpremtocloudsto.blob.core.windows.net/layoffs/_started_8766842119838399893,_started_8766842119838399893,0,1743928551000
wasbs://transformed@onpremtocloudsto.blob.core.windows.net/layoffs/part-00000-tid-2087610143358017104-d3616d46-aed7-4928-9ead-e95f05572361-22-1-c000.snappy.parquet,part-00000-tid-2087610143358017104-d3616d46-aed7-4928-9ead-e95f05572361-22-1-c000.snappy.parquet,50523,1743928638000


In [None]:
blob_df = spark.read.parquet(blob_path)
display(blob_df.limit(5))

date,year,month_num,month_name,company,location,industry,total_laid_off,percentage_laid_off,stage,country,funds_raised_millions,month
2023-03-06,2023,3,March,Atlassian,Sydney,Other,500.0,0.05,Post-IPO,Australia,210.0,3
2023-03-06,2023,3,March,SiriusXM,New York City,Media,475.0,0.08,Post-IPO,United States,525.0,3
2023-03-06,2023,3,March,Alerzo,Ibadan,Retail,400.0,0.0,Series B,Nigeria,16.0,3
2023-03-06,2023,3,March,UpGrad,Mumbai,Education,120.0,0.0,Unknown,India,631.0,3
2023-03-03,2023,3,March,Loft,Sao Paulo,Real Estate,340.0,0.15,Unknown,Brazil,788.0,3


In [None]:
# To Save your final DataFrame
df_layoffs.write.parquet("/dbfs/FileStore/layoffs_Final_processed.parquet")