#Data Engineering - Building ETL Pipleine

###In this project, we will be building an ETL pipeline for the dataset which contains salary information of the people living in San Francisco

We have initially placed our raw untouched data in "/FileStore/tables/Salaries.csv"

In [0]:
def pathExists(path):
  try:
    dbutils.fs.ls(path)
    return True
  except:
    return False

pathExists("/FileStore/tables/Salaries.csv")

In [0]:
display(dbutils.fs.ls("/FileStore/tables/Salaries.csv"))

path,name,size
dbfs:/FileStore/tables/Salaries.csv,Salaries.csv,16257213


We store the file path in a variable to make our life easier

In [0]:
filePath = "/FileStore/tables/Salaries.csv"
landingPath = '/FileStore/SalaryPipeline/'


##Ingesting Data from source into new Dataframe using batch loading
Now, we ingest the raw untouched data from source path into a new layer.

In [0]:
kafka_schema = "value STRING"
sal_df1 = (
  spark.read
  .format("csv")
  .option("inferSchema", "True")
  .option("header","true")
  .load(filePath)
)

In [0]:
display(sal_df1)

Id,EmployeeName,JobTitle,BasePay,OvertimePay,OtherPay,Benefits,TotalPay,TotalPayBenefits,Year,Notes,Agency,Status
1,NATHANIEL FORD,GENERAL MANAGER-METROPOLITAN TRANSIT AUTHORITY,167411.18,0.0,400184.25,,567595.43,567595.43,2011,,San Francisco,
2,GARY JIMENEZ,CAPTAIN III (POLICE DEPARTMENT),155966.02,245131.88,137811.38,,538909.28,538909.28,2011,,San Francisco,
3,ALBERT PARDINI,CAPTAIN III (POLICE DEPARTMENT),212739.13,106088.18,16452.6,,335279.91,335279.91,2011,,San Francisco,
4,CHRISTOPHER CHONG,WIRE ROPE CABLE MAINTENANCE MECHANIC,77916.0,56120.71,198306.9,,332343.61,332343.61,2011,,San Francisco,
5,PATRICK GARDNER,"DEPUTY CHIEF OF DEPARTMENT,(FIRE DEPARTMENT)",134401.6,9737.0,182234.59,,326373.19,326373.19,2011,,San Francisco,
6,DAVID SULLIVAN,ASSISTANT DEPUTY CHIEF II,118602.0,8601.0,189082.74,,316285.74,316285.74,2011,,San Francisco,
7,ALSON LEE,"BATTALION CHIEF, (FIRE DEPARTMENT)",92492.01,89062.9,134426.14,,315981.05,315981.05,2011,,San Francisco,
8,DAVID KUSHNER,DEPUTY DIRECTOR OF INVESTMENTS,256576.96,0.0,51322.5,,307899.46,307899.46,2011,,San Francisco,
9,MICHAEL MORRIS,"BATTALION CHIEF, (FIRE DEPARTMENT)",176932.64,86362.68,40132.23,,303427.55,303427.55,2011,,San Francisco,
10,JOANNE HAYES-WHITE,"CHIEF OF DEPARTMENT, (FIRE DEPARTMENT)",285262.0,0.0,17115.73,,302377.73,302377.73,2011,,San Francisco,


During ingestion process, we need to record metadata. So , let us add the required columns:

In [0]:
from pyspark.sql.functions import * 
sal_df1 = sal_df1.withColumn('Ingest_time', current_timestamp())
sal_df1 = sal_df1.withColumn('Ingest_date',current_date())

In [0]:
display(sal_df1)

Id,EmployeeName,JobTitle,BasePay,OvertimePay,OtherPay,Benefits,TotalPay,TotalPayBenefits,Year,Notes,Agency,Status,Ingest_time,Ingest_date
1,NATHANIEL FORD,GENERAL MANAGER-METROPOLITAN TRANSIT AUTHORITY,167411.18,0.0,400184.25,,567595.43,567595.43,2011,,San Francisco,,2021-05-20T05:06:56.992+0000,2021-05-20
2,GARY JIMENEZ,CAPTAIN III (POLICE DEPARTMENT),155966.02,245131.88,137811.38,,538909.28,538909.28,2011,,San Francisco,,2021-05-20T05:06:56.992+0000,2021-05-20
3,ALBERT PARDINI,CAPTAIN III (POLICE DEPARTMENT),212739.13,106088.18,16452.6,,335279.91,335279.91,2011,,San Francisco,,2021-05-20T05:06:56.992+0000,2021-05-20
4,CHRISTOPHER CHONG,WIRE ROPE CABLE MAINTENANCE MECHANIC,77916.0,56120.71,198306.9,,332343.61,332343.61,2011,,San Francisco,,2021-05-20T05:06:56.992+0000,2021-05-20
5,PATRICK GARDNER,"DEPUTY CHIEF OF DEPARTMENT,(FIRE DEPARTMENT)",134401.6,9737.0,182234.59,,326373.19,326373.19,2011,,San Francisco,,2021-05-20T05:06:56.992+0000,2021-05-20
6,DAVID SULLIVAN,ASSISTANT DEPUTY CHIEF II,118602.0,8601.0,189082.74,,316285.74,316285.74,2011,,San Francisco,,2021-05-20T05:06:56.992+0000,2021-05-20
7,ALSON LEE,"BATTALION CHIEF, (FIRE DEPARTMENT)",92492.01,89062.9,134426.14,,315981.05,315981.05,2011,,San Francisco,,2021-05-20T05:06:56.992+0000,2021-05-20
8,DAVID KUSHNER,DEPUTY DIRECTOR OF INVESTMENTS,256576.96,0.0,51322.5,,307899.46,307899.46,2011,,San Francisco,,2021-05-20T05:06:56.992+0000,2021-05-20
9,MICHAEL MORRIS,"BATTALION CHIEF, (FIRE DEPARTMENT)",176932.64,86362.68,40132.23,,303427.55,303427.55,2011,,San Francisco,,2021-05-20T05:06:56.992+0000,2021-05-20
10,JOANNE HAYES-WHITE,"CHIEF OF DEPARTMENT, (FIRE DEPARTMENT)",285262.0,0.0,17115.73,,302377.73,302377.73,2011,,San Francisco,,2021-05-20T05:06:56.992+0000,2021-05-20


In [0]:
sal_df1.printSchema()

We write it into the temporary landing table

In [0]:
from pyspark.sql.functions import col
(
  sal_df1
  .write
  .format("delta")
  .mode("append")
  .partitionBy("Ingest_date")
  .save(landingPath)
)

In [0]:
display(dbutils.fs.ls(landingPath))

path,name,size
dbfs:/FileStore/SalaryPipeline/Ingest_date=2021-05-19/,Ingest_date=2021-05-19/,0
dbfs:/FileStore/SalaryPipeline/Ingest_date=2021-05-20/,Ingest_date=2021-05-20/,0
dbfs:/FileStore/SalaryPipeline/_delta_log/,_delta_log/,0


Now, we register the landing table into the Metastore

In [0]:
spark.sql(
"""
DROP TABLE IF EXISTS salary_tracker_landing
""")

spark.sql(
f"""
CREATE TABLE salary_tracker_landing
USING DELTA
LOCATION "{landingPath}"
"""
)

In [0]:
%sql
select * from salary_tracker_landing 

Id,EmployeeName,JobTitle,BasePay,OvertimePay,OtherPay,Benefits,TotalPay,TotalPayBenefits,Year,Notes,Agency,Status,Ingest_time,Ingest_date
1,NATHANIEL FORD,GENERAL MANAGER-METROPOLITAN TRANSIT AUTHORITY,167411.18,0.0,400184.25,,567595.43,567595.43,2011,,San Francisco,,2021-05-19T06:24:32.158+0000,2021-05-19
2,GARY JIMENEZ,CAPTAIN III (POLICE DEPARTMENT),155966.02,245131.88,137811.38,,538909.28,538909.28,2011,,San Francisco,,2021-05-19T06:24:32.158+0000,2021-05-19
3,ALBERT PARDINI,CAPTAIN III (POLICE DEPARTMENT),212739.13,106088.18,16452.6,,335279.91,335279.91,2011,,San Francisco,,2021-05-19T06:24:32.158+0000,2021-05-19
4,CHRISTOPHER CHONG,WIRE ROPE CABLE MAINTENANCE MECHANIC,77916.0,56120.71,198306.9,,332343.61,332343.61,2011,,San Francisco,,2021-05-19T06:24:32.158+0000,2021-05-19
5,PATRICK GARDNER,"DEPUTY CHIEF OF DEPARTMENT,(FIRE DEPARTMENT)",134401.6,9737.0,182234.59,,326373.19,326373.19,2011,,San Francisco,,2021-05-19T06:24:32.158+0000,2021-05-19
6,DAVID SULLIVAN,ASSISTANT DEPUTY CHIEF II,118602.0,8601.0,189082.74,,316285.74,316285.74,2011,,San Francisco,,2021-05-19T06:24:32.158+0000,2021-05-19
7,ALSON LEE,"BATTALION CHIEF, (FIRE DEPARTMENT)",92492.01,89062.9,134426.14,,315981.05,315981.05,2011,,San Francisco,,2021-05-19T06:24:32.158+0000,2021-05-19
8,DAVID KUSHNER,DEPUTY DIRECTOR OF INVESTMENTS,256576.96,0.0,51322.5,,307899.46,307899.46,2011,,San Francisco,,2021-05-19T06:24:32.158+0000,2021-05-19
9,MICHAEL MORRIS,"BATTALION CHIEF, (FIRE DEPARTMENT)",176932.64,86362.68,40132.23,,303427.55,303427.55,2011,,San Francisco,,2021-05-19T06:24:32.158+0000,2021-05-19
10,JOANNE HAYES-WHITE,"CHIEF OF DEPARTMENT, (FIRE DEPARTMENT)",285262.0,0.0,17115.73,,302377.73,302377.73,2011,,San Francisco,,2021-05-19T06:24:32.158+0000,2021-05-19


In [0]:
sal_df1.count()

In [0]:
display(sal_df1)

Id,EmployeeName,JobTitle,BasePay,OvertimePay,OtherPay,Benefits,TotalPay,TotalPayBenefits,Year,Notes,Agency,Status,Ingest_time,Ingest_date
1,NATHANIEL FORD,GENERAL MANAGER-METROPOLITAN TRANSIT AUTHORITY,167411.18,0.0,400184.25,,567595.43,567595.43,2011,,San Francisco,,2021-05-20T05:07:54.354+0000,2021-05-20
2,GARY JIMENEZ,CAPTAIN III (POLICE DEPARTMENT),155966.02,245131.88,137811.38,,538909.28,538909.28,2011,,San Francisco,,2021-05-20T05:07:54.354+0000,2021-05-20
3,ALBERT PARDINI,CAPTAIN III (POLICE DEPARTMENT),212739.13,106088.18,16452.6,,335279.91,335279.91,2011,,San Francisco,,2021-05-20T05:07:54.354+0000,2021-05-20
4,CHRISTOPHER CHONG,WIRE ROPE CABLE MAINTENANCE MECHANIC,77916.0,56120.71,198306.9,,332343.61,332343.61,2011,,San Francisco,,2021-05-20T05:07:54.354+0000,2021-05-20
5,PATRICK GARDNER,"DEPUTY CHIEF OF DEPARTMENT,(FIRE DEPARTMENT)",134401.6,9737.0,182234.59,,326373.19,326373.19,2011,,San Francisco,,2021-05-20T05:07:54.354+0000,2021-05-20
6,DAVID SULLIVAN,ASSISTANT DEPUTY CHIEF II,118602.0,8601.0,189082.74,,316285.74,316285.74,2011,,San Francisco,,2021-05-20T05:07:54.354+0000,2021-05-20
7,ALSON LEE,"BATTALION CHIEF, (FIRE DEPARTMENT)",92492.01,89062.9,134426.14,,315981.05,315981.05,2011,,San Francisco,,2021-05-20T05:07:54.354+0000,2021-05-20
8,DAVID KUSHNER,DEPUTY DIRECTOR OF INVESTMENTS,256576.96,0.0,51322.5,,307899.46,307899.46,2011,,San Francisco,,2021-05-20T05:07:54.354+0000,2021-05-20
9,MICHAEL MORRIS,"BATTALION CHIEF, (FIRE DEPARTMENT)",176932.64,86362.68,40132.23,,303427.55,303427.55,2011,,San Francisco,,2021-05-20T05:07:54.354+0000,2021-05-20
10,JOANNE HAYES-WHITE,"CHIEF OF DEPARTMENT, (FIRE DEPARTMENT)",285262.0,0.0,17115.73,,302377.73,302377.73,2011,,San Francisco,,2021-05-20T05:07:54.354+0000,2021-05-20


Before we move this data into the next layer, the data has to be cleaned/transformed.
So we do a quick analysis and remove or add columns and fill any data if required.

In [0]:
sal_df1.count()

Our data would definitely have some null values. We arent going to perform EDA while building ETL pipline, but we will simply enter only the clean data and pass it on.

In [0]:
#Notes column is completely empty, so we can drop it.
sal_cleaned_df = sal_df1.drop('Notes')

In [0]:
#Now, we need the data where the status is not Null. It needs to be either PT or FT
sal_cleaned_df = sal_cleaned_df.filter("Status IS NOT NULL")
display(sal_cleaned_df)

Id,EmployeeName,JobTitle,BasePay,OvertimePay,OtherPay,Benefits,TotalPay,TotalPayBenefits,Year,Agency,Status,Ingest_time,Ingest_date
110532,David Shinn,Deputy Chief 3,129150.01,0.0,342802.63,38780.04,471952.64,510732.68,2014,San Francisco,PT,2021-05-20T05:07:56.380+0000,2021-05-20
110533,Amy P Hart,Asst Med Examiner,318835.49,10712.95,60563.54,89540.23,390111.98,479652.21,2014,San Francisco,FT,2021-05-20T05:07:56.380+0000,2021-05-20
110534,William J Coaker Jr.,Chief Investment Officer,257340.0,0.0,82313.7,96570.66,339653.7,436224.36,2014,San Francisco,PT,2021-05-20T05:07:56.380+0000,2021-05-20
110535,Gregory P Suhr,Chief of Police,307450.04,0.0,19266.72,91302.46,326716.76,418019.22,2014,San Francisco,FT,2021-05-20T05:07:56.380+0000,2021-05-20
110536,Joanne M Hayes-White,"Chief, Fire Department",302068.0,0.0,24165.44,91201.66,326233.44,417435.1,2014,San Francisco,FT,2021-05-20T05:07:56.380+0000,2021-05-20
110537,Ellen G Moffatt,Asst Med Examiner,270222.04,6009.22,67956.2,71580.48,344187.46,415767.94,2014,San Francisco,FT,2021-05-20T05:07:56.380+0000,2021-05-20
110538,John L Martin,Dept Head V,311298.55,0.0,0.0,89772.32,311298.55,401070.87,2014,San Francisco,FT,2021-05-20T05:07:56.380+0000,2021-05-20
110539,Harlan L Kelly-Jr,Executive Contract Employee,310161.02,0.0,0.0,88823.51,310161.02,398984.53,2014,San Francisco,FT,2021-05-20T05:07:56.380+0000,2021-05-20
110540,Samson Lai,"Battalion Chief, Fire Suppress",179464.14,128685.99,27334.83,59876.9,335484.96,395361.86,2014,San Francisco,FT,2021-05-20T05:07:56.380+0000,2021-05-20
110541,David L Franklin,Asst Chf of Dept (Fire Dept),201566.88,97907.38,29916.28,64599.59,329390.54,393990.13,2014,San Francisco,FT,2021-05-20T05:07:56.380+0000,2021-05-20


In [0]:
cleaned_salary_unmasked_pd = sal_cleaned_df.toPandas()

In [0]:
#Lets check the count of the cleaned Data 
sal_cleaned_df.count()

In [0]:
#We will store the messed up data in another Dataframe 
sal_dirty_df = sal_df1.filter("Status IS NULL")
display(sal_dirty_df)

Id,EmployeeName,JobTitle,BasePay,OvertimePay,OtherPay,Benefits,TotalPay,TotalPayBenefits,Year,Notes,Agency,Status,Ingest_time,Ingest_date
1,NATHANIEL FORD,GENERAL MANAGER-METROPOLITAN TRANSIT AUTHORITY,167411.18,0.0,400184.25,,567595.43,567595.43,2011,,San Francisco,,2021-05-20T05:08:04.247+0000,2021-05-20
2,GARY JIMENEZ,CAPTAIN III (POLICE DEPARTMENT),155966.02,245131.88,137811.38,,538909.28,538909.28,2011,,San Francisco,,2021-05-20T05:08:04.247+0000,2021-05-20
3,ALBERT PARDINI,CAPTAIN III (POLICE DEPARTMENT),212739.13,106088.18,16452.6,,335279.91,335279.91,2011,,San Francisco,,2021-05-20T05:08:04.247+0000,2021-05-20
4,CHRISTOPHER CHONG,WIRE ROPE CABLE MAINTENANCE MECHANIC,77916.0,56120.71,198306.9,,332343.61,332343.61,2011,,San Francisco,,2021-05-20T05:08:04.247+0000,2021-05-20
5,PATRICK GARDNER,"DEPUTY CHIEF OF DEPARTMENT,(FIRE DEPARTMENT)",134401.6,9737.0,182234.59,,326373.19,326373.19,2011,,San Francisco,,2021-05-20T05:08:04.247+0000,2021-05-20
6,DAVID SULLIVAN,ASSISTANT DEPUTY CHIEF II,118602.0,8601.0,189082.74,,316285.74,316285.74,2011,,San Francisco,,2021-05-20T05:08:04.247+0000,2021-05-20
7,ALSON LEE,"BATTALION CHIEF, (FIRE DEPARTMENT)",92492.01,89062.9,134426.14,,315981.05,315981.05,2011,,San Francisco,,2021-05-20T05:08:04.247+0000,2021-05-20
8,DAVID KUSHNER,DEPUTY DIRECTOR OF INVESTMENTS,256576.96,0.0,51322.5,,307899.46,307899.46,2011,,San Francisco,,2021-05-20T05:08:04.247+0000,2021-05-20
9,MICHAEL MORRIS,"BATTALION CHIEF, (FIRE DEPARTMENT)",176932.64,86362.68,40132.23,,303427.55,303427.55,2011,,San Francisco,,2021-05-20T05:08:04.247+0000,2021-05-20
10,JOANNE HAYES-WHITE,"CHIEF OF DEPARTMENT, (FIRE DEPARTMENT)",285262.0,0.0,17115.73,,302377.73,302377.73,2011,,San Francisco,,2021-05-20T05:08:04.247+0000,2021-05-20


Now, we will write the clean data into a new table

In [0]:
cleanedPath = '/FileStore/CleanedPipeline/'
maskedcleanedPath = '/FileStore/MaskedCleanedPipeline/'

In [0]:
(
  sal_cleaned_df
  .write
  .format("delta")
  .mode("append")
  .partitionBy("Ingest_time")
  .save(cleanedPath)
)

In [0]:
spark.sql(
"""
DROP TABLE IF EXISTS cleaned_salary_tracker
"""
)
spark.sql(
f"""
CREATE TABLE cleaned_salary_tracker
USING DELTA
LOCATION "{cleanedPath}"
"""
)

In [0]:
%sql
select * from cleaned_salary_tracker limit

Id,EmployeeName,JobTitle,BasePay,OvertimePay,OtherPay,Benefits,TotalPay,TotalPayBenefits,Year,Agency,Status,Ingest_time,Ingest_date
110532,David Shinn,Deputy Chief 3,129150.01,0.0,342802.63,38780.04,471952.64,510732.68,2014,San Francisco,PT,2021-05-19T11:47:11.617+0000,2021-05-19
110533,Amy P Hart,Asst Med Examiner,318835.49,10712.95,60563.54,89540.23,390111.98,479652.21,2014,San Francisco,FT,2021-05-19T11:47:11.617+0000,2021-05-19
110534,William J Coaker Jr.,Chief Investment Officer,257340.0,0.0,82313.7,96570.66,339653.7,436224.36,2014,San Francisco,PT,2021-05-19T11:47:11.617+0000,2021-05-19
110535,Gregory P Suhr,Chief of Police,307450.04,0.0,19266.72,91302.46,326716.76,418019.22,2014,San Francisco,FT,2021-05-19T11:47:11.617+0000,2021-05-19
110536,Joanne M Hayes-White,"Chief, Fire Department",302068.0,0.0,24165.44,91201.66,326233.44,417435.1,2014,San Francisco,FT,2021-05-19T11:47:11.617+0000,2021-05-19
110537,Ellen G Moffatt,Asst Med Examiner,270222.04,6009.22,67956.2,71580.48,344187.46,415767.94,2014,San Francisco,FT,2021-05-19T11:47:11.617+0000,2021-05-19
110538,John L Martin,Dept Head V,311298.55,0.0,0.0,89772.32,311298.55,401070.87,2014,San Francisco,FT,2021-05-19T11:47:11.617+0000,2021-05-19
110539,Harlan L Kelly-Jr,Executive Contract Employee,310161.02,0.0,0.0,88823.51,310161.02,398984.53,2014,San Francisco,FT,2021-05-19T11:47:11.617+0000,2021-05-19
110540,Samson Lai,"Battalion Chief, Fire Suppress",179464.14,128685.99,27334.83,59876.9,335484.96,395361.86,2014,San Francisco,FT,2021-05-19T11:47:11.617+0000,2021-05-19
110541,David L Franklin,Asst Chf of Dept (Fire Dept),201566.88,97907.38,29916.28,64599.59,329390.54,393990.13,2014,San Francisco,FT,2021-05-19T11:47:11.617+0000,2021-05-19


#Data Masking
Now, we need to perform Data masking which is Hiding sensitve information. Here, we consider the name column to be masked. We will keep a seperate record for both Masked and Unmasked version.

In [0]:
cln_sal_pd_df = sal_cleaned_df.toPandas()

We apply a simple Lambda function to hide the name, but not completely.

In [0]:
cln_sal_pd_df['EmployeeName'] = cln_sal_pd_df['EmployeeName'].apply(lambda x: x[-4:].rjust(len(x), "*"))

In [0]:
cln_sal_pd_df.tail(5)

Unnamed: 0,Id,EmployeeName,JobTitle,BasePay,OvertimePay,OtherPay,Benefits,TotalPay,TotalPayBenefits,Year,Agency,Status,Ingest_time,Ingest_date
38114,148646,************lson,Human Services Technician,0.0,0.0,0.0,0.0,0.0,0.0,2014,San Francisco,PT,2021-05-20 05:08:27.554,2021-05-20
38115,148648,**********rson,Communications Dispatcher 2,0.0,0.0,0.0,0.0,0.0,0.0,2014,San Francisco,PT,2021-05-20 05:08:27.554,2021-05-20
38116,148649,*******lker,Custodian,0.0,0.0,0.0,0.0,0.0,0.0,2014,San Francisco,PT,2021-05-20 05:08:27.554,2021-05-20
38117,148650,*********lery,Custodian,0.0,0.0,0.0,0.0,0.0,0.0,2014,San Francisco,PT,2021-05-20 05:08:27.554,2021-05-20
38118,148654,*****opez,"Counselor, Log Cabin Ranch",0.0,0.0,-618.13,0.0,-618.13,-618.13,2014,San Francisco,PT,2021-05-20 05:08:27.554,2021-05-20


We convert the above Pandas Dataframe back into a Spark Dataframe.

In [0]:
masked_salary_df = spark.createDataFrame(cln_sal_pd_df)

In [0]:
display(masked_salary_df)

Id,EmployeeName,JobTitle,BasePay,OvertimePay,OtherPay,Benefits,TotalPay,TotalPayBenefits,Year,Agency,Status,Ingest_time,Ingest_date
110532,*******hinn,Deputy Chief 3,129150.01,0.0,342802.63,38780.04,471952.64,510732.68,2014,San Francisco,PT,2021-05-20T05:08:27.554+0000,2021-05-20
110533,******Hart,Asst Med Examiner,318835.49,10712.95,60563.54,89540.23,390111.98,479652.21,2014,San Francisco,FT,2021-05-20T05:08:27.554+0000,2021-05-20
110534,**************** Jr.,Chief Investment Officer,257340.0,0.0,82313.7,96570.66,339653.7,436224.36,2014,San Francisco,PT,2021-05-20T05:08:27.554+0000,2021-05-20
110535,**********Suhr,Chief of Police,307450.04,0.0,19266.72,91302.46,326716.76,418019.22,2014,San Francisco,FT,2021-05-20T05:08:27.554+0000,2021-05-20
110536,****************hite,"Chief, Fire Department",302068.0,0.0,24165.44,91201.66,326233.44,417435.1,2014,San Francisco,FT,2021-05-20T05:08:27.554+0000,2021-05-20
110537,***********fatt,Asst Med Examiner,270222.04,6009.22,67956.2,71580.48,344187.46,415767.94,2014,San Francisco,FT,2021-05-20T05:08:27.554+0000,2021-05-20
110538,*********rtin,Dept Head V,311298.55,0.0,0.0,89772.32,311298.55,401070.87,2014,San Francisco,FT,2021-05-20T05:08:27.554+0000,2021-05-20
110539,*************y-Jr,Executive Contract Employee,310161.02,0.0,0.0,88823.51,310161.02,398984.53,2014,San Francisco,FT,2021-05-20T05:08:27.554+0000,2021-05-20
110540,****** Lai,"Battalion Chief, Fire Suppress",179464.14,128685.99,27334.83,59876.9,335484.96,395361.86,2014,San Francisco,FT,2021-05-20T05:08:27.554+0000,2021-05-20
110541,************klin,Asst Chf of Dept (Fire Dept),201566.88,97907.38,29916.28,64599.59,329390.54,393990.13,2014,San Francisco,FT,2021-05-20T05:08:27.554+0000,2021-05-20


In [0]:
(
  masked_salary_df
  .write
  .format("delta")
  .mode("append")
  .partitionBy("Ingest_time")
  .save(maskedcleanedPath)
)

In [0]:
display(masked_salary_df)

Id,EmployeeName,JobTitle,BasePay,OvertimePay,OtherPay,Benefits,TotalPay,TotalPayBenefits,Year,Agency,Status,Ingest_time,Ingest_date
110532,*******hinn,Deputy Chief 3,129150.01,0.0,342802.63,38780.04,471952.64,510732.68,2014,San Francisco,PT,2021-05-20T05:08:27.554+0000,2021-05-20
110533,******Hart,Asst Med Examiner,318835.49,10712.95,60563.54,89540.23,390111.98,479652.21,2014,San Francisco,FT,2021-05-20T05:08:27.554+0000,2021-05-20
110534,**************** Jr.,Chief Investment Officer,257340.0,0.0,82313.7,96570.66,339653.7,436224.36,2014,San Francisco,PT,2021-05-20T05:08:27.554+0000,2021-05-20
110535,**********Suhr,Chief of Police,307450.04,0.0,19266.72,91302.46,326716.76,418019.22,2014,San Francisco,FT,2021-05-20T05:08:27.554+0000,2021-05-20
110536,****************hite,"Chief, Fire Department",302068.0,0.0,24165.44,91201.66,326233.44,417435.1,2014,San Francisco,FT,2021-05-20T05:08:27.554+0000,2021-05-20
110537,***********fatt,Asst Med Examiner,270222.04,6009.22,67956.2,71580.48,344187.46,415767.94,2014,San Francisco,FT,2021-05-20T05:08:27.554+0000,2021-05-20
110538,*********rtin,Dept Head V,311298.55,0.0,0.0,89772.32,311298.55,401070.87,2014,San Francisco,FT,2021-05-20T05:08:27.554+0000,2021-05-20
110539,*************y-Jr,Executive Contract Employee,310161.02,0.0,0.0,88823.51,310161.02,398984.53,2014,San Francisco,FT,2021-05-20T05:08:27.554+0000,2021-05-20
110540,****** Lai,"Battalion Chief, Fire Suppress",179464.14,128685.99,27334.83,59876.9,335484.96,395361.86,2014,San Francisco,FT,2021-05-20T05:08:27.554+0000,2021-05-20
110541,************klin,Asst Chf of Dept (Fire Dept),201566.88,97907.38,29916.28,64599.59,329390.54,393990.13,2014,San Francisco,FT,2021-05-20T05:08:27.554+0000,2021-05-20


In [0]:
#Now we write the masked data into a delta table
spark.sql(
"""
DROP TABLE IF EXISTS masked_cleaned_salary_tracker
"""
)
spark.sql(
f"""
CREATE TABLE masked_cleaned_salary_tracker
USING DELTA
LOCATION "{maskedcleanedPath}"
"""
)

In [0]:
masked_cleaned_pd_df = masked_salary_df.toPandas()

In [0]:
cleaned_salary_unmasked_pd.head()

Unnamed: 0,Id,EmployeeName,JobTitle,BasePay,OvertimePay,OtherPay,Benefits,TotalPay,TotalPayBenefits,Year,Agency,Status,Ingest_time,Ingest_date
0,110532,David Shinn,Deputy Chief 3,129150.01,0.0,342802.63,38780.04,471952.64,510732.68,2014,San Francisco,PT,2021-05-20 05:07:58.717,2021-05-20
1,110533,Amy P Hart,Asst Med Examiner,318835.49,10712.95,60563.54,89540.23,390111.98,479652.21,2014,San Francisco,FT,2021-05-20 05:07:58.717,2021-05-20
2,110534,William J Coaker Jr.,Chief Investment Officer,257340.0,0.0,82313.7,96570.66,339653.7,436224.36,2014,San Francisco,PT,2021-05-20 05:07:58.717,2021-05-20
3,110535,Gregory P Suhr,Chief of Police,307450.04,0.0,19266.72,91302.46,326716.76,418019.22,2014,San Francisco,FT,2021-05-20 05:07:58.717,2021-05-20
4,110536,Joanne M Hayes-White,"Chief, Fire Department",302068.0,0.0,24165.44,91201.66,326233.44,417435.1,2014,San Francisco,FT,2021-05-20 05:07:58.717,2021-05-20


Now, we have two cleaned versions ie masked and unmasked versions.

Before the Data is being sent to the data scientists, Unit testing is usually performed to ensure the quality of the data.

The above data is the clean record which can be used by the Data analysts or Data Scientists for research purposes.
Further analysis can be done in Python and the analysis can be done using Power BI or Tableau. 
Further the data is ingested into an SQL server or Cloud storage services so that the analysts and scientists can pull the data