#WHAT IS DATA MUNGING?

 Process of transforming and mapping data from Raw form into Tidy(usable) format with the intent of making it more appropriate and valuable for a variety of downstream purposes such for further Transformation/Enrichment, Egress/Outbound, analytics, Datascience/AI application & Reporting

#TYPES OF DATA MUNGING:


1.Passive munging - visual analyzation of data using mere eyes. by looking at the data, what we analyze manually.<br>
2.Active munging - basic programmatic way of analyzation.

##1.PASSIVE DATA MUNGING

- It is a Structured data with comma seperator (CSV)
- No Header, No comments, footer is there in the data
- Total columns are (seperator + 1)
- Data Quality
- >Null columns & null rows are there
- >duplicate rows & Duplicate keys
- >format issues are there (age is not in number format eg. 7-7)
- >Uniformity issues (Artist, artist)
- >Number of columns are more or less than the expected
- >eg. 4000011,Francis,McNamara,47,Therapist,NewYork & 4000014,Beth,Woodard,65
- >Identification of data type

In [0]:
from pyspark.sql.session import SparkSession
spark=SparkSession.builder.appName("we47_Bread_n_Butter2_Important_Application").getOrCreate()

In [0]:
raw_df1=spark.read.csv("/Volumes/workspace/default/volumewe47_datalake/custsmodified",inferSchema=True,header=False).toDF("id","fname","lname","age","profession")
raw_df1.show(20,False)
display(raw_df1.limit(20))
raw_df1.printSchema()

In [0]:
##very important
raw_df1.printSchema()
print(raw_df1.columns)
print(raw_df1.dtypes)
print(raw_df1.schema)

In [0]:
print("actual count of the data",raw_df1.count())
print("de-duplication count of the data",raw_df1.dropDuplicates().count())#Can remove duplicates based on: all coulmns or selected columns
print("distinct count of the data",raw_df1.distinct().count())#Removes exact duplicate rows
print("de-duplicated given cid column count",raw_df1.dropDuplicates(['id']).count())#removing duplicates based on cid column
display(raw_df1.describe())
display(raw_df1.summary())


##2.ACTIVE DATA MUNGING

1. Combining Data + Schema Evolution/Merging (Structuring)
2. Validation, Cleansing, Scrubbing - Cleansing (removal of unwanted datasets), Scrubbing (convert raw to tidy)
3. De Duplication and Levels of Standardization () of Data to make it in a usable format (Dataengineers/consumers)

###1.Combining Data + Schema Evolution/Merging (Structuring) - Preliminary Datamunging

In [0]:
struct1="id string, firstname string, lastname string, age string, profession string"
#1.single file
raw_df1=spark.read.csv("/Volumes/workspace/default/volumewe47_datalake/custsmodified",schema=struct1)
display(raw_df1.limit(20))
#2.multiple files with diff names
raw_df1=spark.read.csv(["/Volumes/workspace/default/volumewe47_datalake/custsmodified","/Volumes/workspace/default/volumewe47_datalake/custsmodifiedsample.txt"],schema=struct1)
display(raw_df1)


####A. COMBINING OR SCHEMA MERGING or SCHEMA MELTING of Data from different sources(union,unionByName,allowMissingColumns)

best if we keep on getting variable data from the source regularly

In [0]:
strt1="id string, firstname string, lastname string, age string, profession string"
rawdf1=spark.read.schema(strt1).csv(path=["/Volumes/workspace/default/volumewe47_datalake"],recursiveFileLookup=True,pathGlobFilter="custsmodified_N*")
strt2="id string, firstname string, age string, profession string,city string"
rawdf2=spark.read.schema(strt2).csv(path=["/Volumes/workspace/default/volumewe47_datalake"],recursiveFileLookup=True,pathGlobFilter="custsmodified_T*")
display(rawdf1)
display(rawdf2)


#####note:
**UNION** function can also be used for merging/melting of the dataset when we know that the number and name of both the datasets are same.

In [0]:
rawdf_merged=rawdf1.unionByName(rawdf2,allowMissingColumns=True)
display(rawdf_merged)

####B. schema evolution

it is best to use schema evolution method if we get variable data from the source over a very random period of time (not regularly, very often)

In [0]:
strt1="id string, firstname string, lastname string, age string, profession string"
rawdf1=spark.read.schema(strt1).csv(path=["/Volumes/workspace/default/volumewe47_datalake"],recursiveFileLookup=True,pathGlobFilter="custsmodified_N*")
strt2="id string, firstname string, age string, profession string,city string"
rawdf2=spark.read.schema(strt2).csv(path=["/Volumes/workspace/default/volumewe47_datalake"],recursiveFileLookup=True,pathGlobFilter="custsmodified_T*")

In [0]:
rawdf1.write.orc("/Volumes/workspace/default/volumewe47_datalake/targetorc1/",mode='overwrite')#Useful for future over the time
rawdf2.write.orc("/Volumes/workspace/default/volumewe47_datalake/targetorc1/",mode='append')

In [0]:
rawdf_evolved=spark.read.orc("/Volumes/workspace/default/volumewe47_datalake/targetorc1/",mergeSchema=True)
display(rawdf_evolved)

###2. Validation, Cleansing, Scrubbing - Cleansing (removal of unwanted datasets), Scrubbing (convert raw to tidy)

In [0]:
from pyspark.sql.types import *

####method-1:

In [0]:
#strt1="id int, firstname string, lastname string, age int, profession string"
#method-1:permissive with all rows with respective nulls
strt11=StructType([StructField("id",IntegerType(),True),StructField("firstname",StringType(),True),StructField("lastname",StringType(),True),StructField("age",IntegerType(),True),StructField("profession",StringType(),True)])
dfmethod1=spark.read.schema(strt11).csv("/Volumes/workspace/default/volumewe47_datalake/custsmodified",mode="PERMISSIVE",header=False)
#We are making nulls where ever data format mismatch is there (cutting down mud portition from potato - scrubbing)
display(dfmethod1)
dfmethod1.printSchema()
print("entire count of data",dfmethod1.count())
print("after scrubbing, count of data",len(dfmethod1.collect()))

####method-2:

In [0]:
#method2 - drop malformed rows
dfmethod2=spark.read.schema(strt11).csv("/Volumes/workspace/default/volumewe47_datalake/custsmodified",mode="dropMalformed",header=False)
#We are removing the entire row, where ever data format mismatch is there (number of columns, data type mismatch) (throwing away the entire potato or some portion of it by - cleansing)
display(dfmethod2)
print("entire count of data",dfmethod2.count())
print("after cleansing, count of data",len(dfmethod2.collect()))

In [0]:
dfmethod2.collect()

####method-3: BEST OPTION

If we follow method1 (permissive with strict schema verification) or method2 (drop malformed with strict schema verification)<BR>
Challenges we have in this method1 and 2 is - unknown data loss at column level or at the row level<BR>
method3 best methodology of applying active data munging

In [0]:
dfmethod3=spark.read.csv("/Volumes/workspace/default/volumewe47_datalake/custsmodified",mode="PERMISSIVE",header=False,inferSchema=True)
print(dfmethod3.schema)

In [0]:
strt11=StructType([StructField("id",StringType(),True),StructField("firstname",StringType(),True),StructField("lastname",StringType(),True),StructField("age",StringType(),True),StructField("profession",StringType(),True)])
dfmethod3=spark.read.schema(strt11).csv("/Volumes/workspace/default/volumewe47_datalake/custsmodified",mode="PERMISSIVE",header=False)
display(dfmethod3)
dfmethod3.printSchema()
print("entire count of data",dfmethod3.count())
print("after cleansing, count of data",len(dfmethod3.collect()))

#####Rejection strategy

In [0]:
strt11=StructType([StructField("id",IntegerType(),True),StructField("firstname",StringType(),True),StructField("lastname",StringType(),True),StructField("age",IntegerType(),True),StructField("profession",StringType(),True),StructField("corruptdata",StringType())])
dfmethod3=spark.read.schema(strt11).csv("/Volumes/workspace/default/volumewe47_datalake/custsmodified",mode="PERMISSIVE",header=False,columnNameOfCorruptRecord="corruptdata")
display(dfmethod3)
print("entire count of data",dfmethod3.count())
df_reject=dfmethod3.where("corruptdata is not null")
display(df_reject)

In [0]:
df_reject.drop("corruptdata").write.mode("overwrite").csv("/Volumes/workspace/default/volumewe47_datalake/rejects/",mode="overwrite")
print("Data to reject or update the source",len(dfmethod3.where("corruptdata is not null").collect()))

#####cleansing stage

%md
Can be achieved using 2 functions under na (na.drop)

In [0]:
#actual DF count befor doing any cleansing activity
strt11=StructType([StructField("id",StringType(),True),StructField("firstname",StringType(),True),StructField("lastname",StringType(),True),StructField("age",StringType(),True),StructField("profession",StringType(),True)])
dfmethod3=spark.read.schema(strt11).csv("/Volumes/workspace/default/volumewe47_datalake/custsmodified",mode="PERMISSIVE",header=False)
display(dfmethod3.take(15))
print("Actual DF count",len(dfmethod3.collect()))

In [0]:
#applying cleansing strategies
cleansed_df1=dfmethod3.na.drop(how="any")#drop the row, if any one column in our df row contains null
cleansed_df1=dfmethod3.na.drop(how="any",subset=["id","age"])#drop the row, if any one column id/age contains null
print("cleansed any DF count",len(cleansed_df1.collect()))
display(cleansed_df1.take(15))

In [0]:
cleansed_df2=dfmethod3.na.drop(how="all")#drop the row, if all the columns in our df row contains null
cleansed_df2=dfmethod3.na.drop(how="all",subset=["id","profession"])#drop the row, if all the columns (id,profession) in our df row contains null
print("cleansed all DF count",len(cleansed_df2.collect()))
display(cleansed_df2.take(15))

In [0]:
#Threshold - least bother (if we need minimum this many number of columns with not nulls, then only we will keep the row)
cleansed_df3=dfmethod3.na.drop(thresh=4,subset=["id","firstname","age","profession"])#only allowing null at lastname column
print("cleansed threshold DF count",len(cleansed_df3.collect()))
display(cleansed_df3.take(15))

In [0]:
#Before scrubbing, lets take the right cleansed data with id as null and entire row as null removed out
#Finally I am arriving for our current data, lets perform the best cleansing
cleansed_df=cleansed_df3.na.drop(subset=["id"]).na.drop(how="all")
cleansed_df=cleansed_df.na.drop(subset=["firstname","lastname"],how='all')
print("Final cleansed DF",len(cleansed_df.collect()))
display(cleansed_df.take(15))
     

#####scrubbing stage

Can be achieved using 2 functions under na (na.fill & na.replace)

In [0]:
scrubbed_df1=cleansed_df.na.fill("na",subset=["firstname","lastname"]).na.fill("not provided",subset=["profession"])
scrubbed_df2=scrubbed_df1.na.replace("IT","Information Technologies",subset=["profession"]).na.replace("Pilot","Aircraft Pilot",subset=["profession"])
display(scrubbed_df2.take(20))

In [0]:
#or
#Find and Replace functionality
dict1={"IT":"Information Technologies","Pilot":"Aircraft Pilot","Actor":"Celebrity"}
scrubbed_df=scrubbed_df1.na.replace(dict1,subset=["profession"])
#scrubbed_df2=scrubbed_df1.na.replace("IT","Information Technologies",subset=["profession"]).na.replace("Pilot","Aircraft Pilot",subset=["profession"])
print("scrubbed DF",len(scrubbed_df.collect()))
display(scrubbed_df.take(15))

###3. Standardization, De-Duplication and Replacement / Deletion of Data to make it in a usable format

####Standardization1 - Column Enrichment (Addition of columns)

In [0]:
from pyspark.sql.functions import lit,initcap

lit() is used to create a column with a constant or literal value so it can be added to a Spark DataFrame.

In [0]:
standard_df1=scrubbed_df.withColumn("sourcesystem",lit("Retail")) 
display(standard_df1.take(15))

####Standardization2 - Column Uniformity

In [0]:
#DSL
display(standard_df1.groupBy("profession").count())

In [0]:
#Declarative lang
standard_df1.createOrReplaceTempView("view1")
display(spark.sql("select profession,count(1) from view1 group by profession"))

In [0]:
standard_df2=standard_df1.withColumn("profession",initcap("profession"))
display(standard_df2.take(15))
display(standard_df2.groupBy("profession").count())

####Standardization3 - Format Standardization

In [0]:
from pyspark.sql.functions import *
cid_standardization={"one":"1","two":"2","ten":"10"}#We can think of using GenAI here later
standard_df3=standard_df2.na.replace(cid_standardization,subset=["id"])#Using munging feature for standardizing the data
standard_df3=standard_df3.withColumn("age",regexp_replace("age","-",""))
display(standard_df3.take(15))

####Standardization4 - Type Standardization

**TEST-CODE:**

In [0]:
#I wanted to learn/test a function functionality
#create a dummy dataframe, apply function to that dummy df to test your function's functionality
spark.sql("select '12a3' as col1").where("col1 not rlike '[a-zA-Z]'").show()

In [0]:
standard_df3.printSchema()#still the datatype of 'id' column is string

In [0]:
standard_df3=standard_df3.where("id not rlike '[a-zA-Z]'")#Removed the string data in the id column
display(standard_df3.where("id='trailer_data:end of file'"))
standard_df4=standard_df3.withColumn("age",col("age").cast("int")).withColumn("id",col("id").cast("long"))
standard_df4.printSchema()
display(standard_df4.take(15))

####Standardization5 - Naming Standardization

In [0]:
standard_df5=standard_df4.withColumnRenamed("id","custid")
standard_df5.printSchema()
display(standard_df5.take(15))

####Standardization6 - Reorder Standadization

In [0]:
standard_df6=standard_df5.select("custid","age","firstname","lastname","profession","sourcesystem")
standard_df6.printSchema()
display(standard_df6.take(15))

#DeDuplication - De-Duplication and removal of non prioritized dataset

Duplicate Elimination at the record level, column level and in a priority basis of some column level higher age

In [0]:
dedup_df1=standard_df6.where("custid in (4000001,4000003)").show()

In [0]:
dedup_df1=standard_df6.distinct()#Eliminating Record level duplicates
dedup_df2=dedup_df1.dropDuplicates(subset=["custid"])#Retains only the first row and eliminate the subsequent rows with duplicate keys without having any other priority at the age or any other columns
display(dedup_df2)

In [0]:
dedup_df4=dedup_df1.dropDuplicates(["custid","age","firstname","lastname"]).show()