#Create a Spark Session

In [0]:
from pyspark.sql import * 
spark=SparkSession.builder.master("local[2]").appName("AB APP").enableHiveSupport().getOrCreate()
print(type(spark))

<class 'pyspark.sql.session.SparkSession'>


###Create a directory for the data that is needed to be stored for BB2  

In [0]:

dbutils.fs.rm("dbfs:/dbfs/FileStore/BB2")

Out[2]: True

In [0]:
dbutils.fs.mkdirs("dbfs:/FileStore/BB2")

Out[3]: True

In [0]:
data_frame_1=spark.read.csv("dbfs:/FileStore/BB2/custsmodified" , sep=",",inferSchema=True,header=False).toDF("Id","Fname","Lname","Experience","Profession")
data_frame_1.show(3)

+-------+--------+-----+----------+----------+
|     Id|   Fname|Lname|Experience|Profession|
+-------+--------+-----+----------+----------+
|4000000|  Apache|Spark|        11|      null|
|4000001|Kristina|Chung|        55|     Pilot|
|4000001|Kristina|Chung|        55|     Pilot|
+-------+--------+-----+----------+----------+
only showing top 3 rows



###Query the 4000002 which is going to be dropped in next step  

In [0]:
data_frame_1.where("Id==4000002").show()
#The above is the mistyped data 
#the below is another way 

#data_frame_1.where("""rlike(Id,'[4000002]')==True""").show()
#The above rlike wont work beacuse 
"""
rlike(Id, '[4000002]') is interpreted as a character class pattern, matching any one of the characters 4, 0, 2 anywhere in the Id value.
"""
data_frame_1.where("Id rlike '^4000002$'").show()
"""
Explanation 
^ asserts the position at the start of the string.
4000002 is the exact sequence of digits to match.
$ asserts the position at the end of the string.
"""



+-------+-----+-----+----------+----------+
|     Id|Fname|Lname|Experience|Profession|
+-------+-----+-----+----------+----------+
|4000002|Paige| Chen|       7-7|     Actor|
+-------+-----+-----+----------+----------+

+-------+-----+-----+----------+----------+
|     Id|Fname|Lname|Experience|Profession|
+-------+-----+-----+----------+----------+
|4000002|Paige| Chen|       7-7|     Actor|
+-------+-----+-----+----------+----------+

Out[5]: '\nExplanation \n^ asserts the position at the start of the string.\n4000002 is the exact sequence of digits to match.\n$ asserts the position at the end of the string.\n'

In [0]:
data_frame_1.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Fname: string (nullable = true)
 |-- Lname: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Profession: string (nullable = true)



In [0]:
data_frame_1.describe
data_frame_1.count()

Out[7]: 10005

###In above the experience and and id is string we want integer so do schema 

In [0]:
from pyspark.sql.types import * 
custom_schema=StructType([StructField("Id" , IntegerType(),True ),StructField("Fname" , StringType(),True ),StructField("Lname" , StringType(),True ),StructField("Experience" , IntegerType(),True ),StructField("Profession" , StringType(),True )])
data_frame_2=spark.read.csv("dbfs:/FileStore/BB2/custsmodified" , sep=",",schema=custom_schema,header=False,mode="dropmalformated")
data_frame_2.show(3)
print(data_frame_2.count())

+-------+--------+-----+----------+----------+
|     Id|   Fname|Lname|Experience|Profession|
+-------+--------+-----+----------+----------+
|4000000|  Apache|Spark|        11|      null|
|4000001|Kristina|Chung|        55|     Pilot|
|4000001|Kristina|Chung|        55|     Pilot|
+-------+--------+-----+----------+----------+
only showing top 3 rows

10005


In [0]:

data_frame_1.where("Id rlike '^4000002$'").show()
print(data_frame_1.where("Id rlike '^4000002$'").count())
data_frame_2.where("Id rlike '^4000002$'").show()


+-------+-----+-----+----------+----------+
|     Id|Fname|Lname|Experience|Profession|
+-------+-----+-----+----------+----------+
|4000002|Paige| Chen|       7-7|     Actor|
+-------+-----+-----+----------+----------+

1
+-------+-----+-----+----------+----------+
|     Id|Fname|Lname|Experience|Profession|
+-------+-----+-----+----------+----------+
|4000002|Paige| Chen|      null|     Actor|
+-------+-----+-----+----------+----------+



###Identify only the corrupted data alone and I need to analyse/log them or send it to our source system to get it corrected

In [0]:
data_frame_1.where("upper(Id)<>lower(ID)").show(4)
"""
upper(cid): Converts the values in the cid column to uppercase.
lower(cid): Converts the values in the cid column to lowercase.
<>: This is the "not equal to" operator in SQL


upper , lower function is designed to convert characters to uppercase. However, since 1, 2, 3, and 4 are numeric values, applying upper , lower to them would have no effect
"""

+--------------------+-----+--------+----------+----------+
|                  Id|Fname|   Lname|Experience|Profession|
+--------------------+-----+--------+----------+----------+
|                 ten|Elsie|Hamilton|        43|     Pilot|
|trailer_data:end ...| null|    null|      null|      null|
+--------------------+-----+--------+----------+----------+

Out[10]: '\nupper(cid): Converts the values in the cid column to uppercase.\nlower(cid): Converts the values in the cid column to lowercase.\n<>: This is the "not equal to" operator in SQL\n\n\nupper , lower function is designed to convert characters to uppercase. However, since 1, 2, 3, and 4 are numeric values, applying upper , lower to them would have no effect\n'

###The mistyped data in data_frame_1 without schema is 

In [0]:
data_frame_1.where("""rlike(Experience,'[-]')==True""").show()
"""
rlike(Experience, '[-]'): This uses the rlike function to check if the column Experience matches the regular expression pattern '[-]'.
"""

+-------+-----+-----+----------+----------+
|     Id|Fname|Lname|Experience|Profession|
+-------+-----+-----+----------+----------+
|4000002|Paige| Chen|       7-7|     Actor|
+-------+-----+-----+----------+----------+

Out[11]: "\nrlike(Experience, '[-]'): This uses the rlike function to check if the column Experience matches the regular expression pattern '[-]'.\n"

##Now removing the record with age < , > than 5 

##The below is a useless operation

In [0]:
from pyspark.sql.functions import *
print(data_frame_2.where(size(split("Id",","))!=5).count())


10005


In [0]:
from pyspark.sql.functions import *
data_frame_2.where(size(split("Id",","))!=5).show(3)
data_frame_2.where(size(split("Id",","))!=5).show(3)
print()


+-------+--------+-----+----------+----------+
|     Id|   Fname|Lname|Experience|Profession|
+-------+--------+-----+----------+----------+
|4000000|  Apache|Spark|        11|      null|
|4000001|Kristina|Chung|        55|     Pilot|
|4000001|Kristina|Chung|        55|     Pilot|
+-------+--------+-----+----------+----------+
only showing top 3 rows

+-------+--------+-----+----------+----------+
|     Id|   Fname|Lname|Experience|Profession|
+-------+--------+-----+----------+----------+
|4000000|  Apache|Spark|        11|      null|
|4000001|Kristina|Chung|        55|     Pilot|
|4000001|Kristina|Chung|        55|     Pilot|
+-------+--------+-----+----------+----------+
only showing top 3 rows




##show() understanding 

##Truncate understanding 
Truncation: Column values are truncated to 20 characters if they are longer than that, with the rest replaced by an ellipsis (...). 

In [0]:
# Show 10 rows with full column values
data_frame_1.show(3, truncate=False)
data_frame_1.show(3, truncate=True)


# Show 10 rows in vertical format
data_frame_1.show(3, vertical=True)
data_frame_1.show(3, vertical=False)

"""
Truncate Parameter:

truncate=False: No truncation, full values shown.
truncate=True: Truncates values to 20 characters.
Vertical Parameter:

vertical=True: Rows displayed vertically.
vertical=False: Rows displayed in the default tabular format.
"""

+-------+--------+-----+----------+----------+
|Id     |Fname   |Lname|Experience|Profession|
+-------+--------+-----+----------+----------+
|4000000|Apache  |Spark|11        |null      |
|4000001|Kristina|Chung|55        |Pilot     |
|4000001|Kristina|Chung|55        |Pilot     |
+-------+--------+-----+----------+----------+
only showing top 3 rows

+-------+--------+-----+----------+----------+
|     Id|   Fname|Lname|Experience|Profession|
+-------+--------+-----+----------+----------+
|4000000|  Apache|Spark|        11|      null|
|4000001|Kristina|Chung|        55|     Pilot|
|4000001|Kristina|Chung|        55|     Pilot|
+-------+--------+-----+----------+----------+
only showing top 3 rows

-RECORD 0--------------
 Id         | 4000000  
 Fname      | Apache   
 Lname      | Spark    
 Experience | 11       
 Profession | null     
-RECORD 1--------------
 Id         | 4000001  
 Fname      | Kristina 
 Lname      | Chung    
 Experience | 55       
 Profession | Pilot    
-REC

###Now we are going to find the number of records droped 

###By using the culmNameOfCorruptRecord method 

In [0]:
custom_schema=StructType([StructField("Id" , IntegerType(),True ),StructField("Fname" , StringType(),True ),StructField("Lname" , StringType(),True ),StructField("Experience" , IntegerType(),True ),StructField("Profession" , StringType(),True ),StructField("malformed_data",StringType(),True)])
#data_frame_3=spark.read.csv("dbfs:/FileStore/BB2/custsmodified" , sep=",",schema=custom_schema,header=False,mode="premissive",columnNameOfCorruptRecord="malformed_data")  -> which is internally droppping the data 
data_frame_3=spark.read.csv("dbfs:/FileStore/BB2/custsmodified" , sep=",",schema=custom_schema,header=False,mode="permissive",columnNameOfCorruptRecord="malformed_data")
data_frame_3.show(3)
print("-----------------------")
data_frame_3.where("malformed_data != 'null' ").show(2343)
print("-------------------")
data_frame_3.createOrReplaceTempView("table1")
data_count=spark.sql("select * from table1 where malformed_data is  NULL ").count()
print(data_count)
data_count_1=spark.sql("select * from table1 where malformed_data is not NULL ")
data_count_1.show()
print(data_count_1.count())


"""
after_drop_no_of_record=spark.sql("select * from table1 where malformed_data is not NULL ")
print("_____________")
print(after_drop_no_of_record.count())
print("_____________")
after_drop_no_of_record.show(23)
print("+++++++++++++++++")
data_frame_3.where("malformed_data = 'null' ").select("Id","Fname" ,"Lname" ,"Experience","Profession").show(234234)
calc=data_frame_3.where("malformed_data == 'null' ")
print("CALC - > " , calc.count() )
"""

+-------+--------+-----+----------+----------+--------------+
|     Id|   Fname|Lname|Experience|Profession|malformed_data|
+-------+--------+-----+----------+----------+--------------+
|4000000|  Apache|Spark|        11|      null|          null|
|4000001|Kristina|Chung|        55|     Pilot|          null|
|4000001|Kristina|Chung|        55|     Pilot|          null|
+-------+--------+-----+----------+----------+--------------+
only showing top 3 rows

-----------------------
+-------+-------+--------+----------+----------+--------------------+
|     Id|  Fname|   Lname|Experience|Profession|      malformed_data|
+-------+-------+--------+----------+----------+--------------------+
|4000002|  Paige|    Chen|      null|     Actor|4000002,Paige,Che...|
|4000006|Patrick|    Song|        24|      null|4000006,Patrick,S...|
|   null|  Elsie|Hamilton|        43|     Pilot|ten,Elsie,Hamilto...|
|4000011|Francis|McNamara|        47| Therapist|4000011,Francis,M...|
|   null|   null|    null| 

In [0]:
temp=data_frame_3.where("malformed_data != 'null' ")
print(temp.count())

0


In [0]:
count1=data_frame_3.count()
print(count1)
count2=data_frame_3.where("malformed_data != 'null' ").count()
print(count2)
mal_formed_data=(count1)-(count2)
print(mal_formed_data)

10005
0
10005


###To check the partition 

In [0]:
print(data_frame_2.rdd.getNumPartitions())

1


###Take only the malfored data == NULL and write it some where 

In [0]:
corrupted_data_frame = data_frame_3.where("malformed_data is not NULL")
corrupted_data_frame.show()
print(corrupted_data_frame.count())

corrected_data_frame = data_frame_3.where("malformed_data is NULL")
corrected_data_frame.show()
dbutils.fs.mkdirs("dbfs:/FileStore/BB2/Curated_Data")
# corrected_data_frame.write.csv("dbfs:/FileStore/BB2/Curated_Data/Data")
print(corrected_data_frame.count())
corrected_data_frame.where("malformed_data is not NULL").show()


+-------+-------+--------+----------+----------+--------------------+
|     Id|  Fname|   Lname|Experience|Profession|      malformed_data|
+-------+-------+--------+----------+----------+--------------------+
|4000002|  Paige|    Chen|      null|     Actor|4000002,Paige,Che...|
|4000006|Patrick|    Song|        24|      null|4000006,Patrick,S...|
|   null|  Elsie|Hamilton|        43|     Pilot|ten,Elsie,Hamilto...|
|4000011|Francis|McNamara|        47| Therapist|4000011,Francis,M...|
|   null|   null|    null|      null|      null|trailer_data:end ...|
+-------+-------+--------+----------+----------+--------------------+

0
+-------+---------+---------+----------+--------------------+--------------+
|     Id|    Fname|    Lname|Experience|          Profession|malformed_data|
+-------+---------+---------+----------+--------------------+--------------+
|4000000|   Apache|    Spark|        11|                null|          null|
|4000001| Kristina|    Chung|        55|               Pilo

##Drop the malformed_data column

In [0]:
corrected_data_frame.show(3)
complete_crt_data = corrected_data_frame.drop("malformed_data")
complete_crt_data.show(3)

+-------+--------+-----+----------+----------+--------------+
|     Id|   Fname|Lname|Experience|Profession|malformed_data|
+-------+--------+-----+----------+----------+--------------+
|4000000|  Apache|Spark|        11|      null|          null|
|4000001|Kristina|Chung|        55|     Pilot|          null|
|4000001|Kristina|Chung|        55|     Pilot|          null|
+-------+--------+-----+----------+----------+--------------+
only showing top 3 rows

+-------+--------+-----+----------+----------+
|     Id|   Fname|Lname|Experience|Profession|
+-------+--------+-----+----------+----------+
|4000000|  Apache|Spark|        11|      null|
|4000001|Kristina|Chung|        55|     Pilot|
|4000001|Kristina|Chung|        55|     Pilot|
+-------+--------+-----+----------+----------+
only showing top 3 rows



##na.drop understanding 

In [0]:
custom_schema=StructType([StructField("Id" , IntegerType(),True ),StructField("Fname" , StringType(),True ),StructField("Lname" , StringType(),True ),StructField("Experience" , IntegerType(),True ),StructField("Profession" , StringType(),True )])

drop_understanding_df=spark.read.csv("dbfs:/FileStore/BB2/custsmodified" , sep=",",schema=custom_schema,header=False,mode="permissive")

##Drop all 

In [0]:
drop_all=drop_understanding_df.na.drop()#Drop all the records with any column contains null (by default how=any is applied)
drop_all.count()

Out[22]: 9911

##Drop record witl all column null  

In [0]:
drop_any=drop_understanding_df.na.drop(how="all")
drop_any.count()

Out[23]: 10003

###Drop the records with any one of column given as arg if any one contains null

In [0]:
drop_selected=drop_understanding_df.na.drop(how="any" , subset=["Id","Fname","Fname","Experience","Profession"])
drop_selected.count()

Out[24]: 9912

###Drop the records with any one of column given as arg if all contains null

In [0]:
drop_selected_all=drop_understanding_df.na.drop(how="all" , subset=["Id","Fname","Fname","Experience","Profession"])
drop_selected_all.count()

Out[25]: 10003

In [0]:
drop_selected.where("Id is  NULL ").show(5)
print("-------------------------")
drop_selected_all.where("Experience is NULL ").show(5)

+---+-----+-----+----------+----------+
| Id|Fname|Lname|Experience|Profession|
+---+-----+-----+----------+----------+
+---+-----+-----+----------+----------+

-------------------------
+-------+-------+-----+----------+----------+
|     Id|  Fname|Lname|Experience|Profession|
+-------+-------+-----+----------+----------+
|4000002|  Paige| Chen|      null|     Actor|
|4000010|Dolores| null|      null|      null|
+-------+-------+-----+----------+----------+



##Understanding the whole data in the Dataframe 

In [0]:
custom_schema=StructType([StructField("Id" , IntegerType(),True ),StructField("Fname" , StringType(),True ),StructField("Lname" , StringType(),True ),StructField("Experience" , IntegerType(),True ),StructField("Profession" , StringType(),True )])

complete_df=spark.read.csv("dbfs:/FileStore/BB2/custsmodified" , sep=",",schema=custom_schema,header=False,mode="permissive")

###By below we are getting the data is duplicated  

In [0]:
print(complete_df.count() - complete_df.distinct().count())

2


In [0]:
print(complete_df.count() - complete_df.dropDuplicates().count())#dropduplicates will drop the duplicate record and give only the ditinct record 

2


In [0]:
complete_df.where("Id is NULL and Fname is NULL and Lname is NULL and Experience is NULL and Profession is NULL ").show(4)

+----+-----+-----+----------+----------+
|  Id|Fname|Lname|Experience|Profession|
+----+-----+-----+----------+----------+
|null| null| null|      null|      null|
|null| null| null|      null|      null|
+----+-----+-----+----------+----------+



##Make the NULL to be changed to something 

In [0]:
first_cut_data=complete_df.na.fill(-1,subset=["Experience"])
second_cut_data=first_cut_data.na.fill("_Null_" , subset=["Fname" , "Lname","Profession"])
final_cut_data=second_cut_data.na.fill(0,subset=["Id"])

In [0]:
final_cut_data.where("Id = 0 and Experience = -1 and Fname is NULL ").show()
final_cut_data.where("Lname = '_Null_' ").show()
final_cut_data.where("Fname = '_Null_' and Lname = '_Null_' ").show()



+---+-----+-----+----------+----------+
| Id|Fname|Lname|Experience|Profession|
+---+-----+-----+----------+----------+
+---+-----+-----+----------+----------+

+-------+--------+------+----------+----------+
|     Id|   Fname| Lname|Experience|Profession|
+-------+--------+------+----------+----------+
|4000004|Gretchen|_Null_|        66|    _Null_|
|      0|  _Null_|_Null_|        -1|    _Null_|
|4000009| Malcolm|_Null_|        39|    Artist|
|4000010| Dolores|_Null_|        -1|    _Null_|
|      0|  _Null_|_Null_|        -1|    _Null_|
+-------+--------+------+----------+----------+

+---+------+------+----------+----------+
| Id| Fname| Lname|Experience|Profession|
+---+------+------+----------+----------+
|  0|_Null_|_Null_|        -1|    _Null_|
|  0|_Null_|_Null_|        -1|    _Null_|
+---+------+------+----------+----------+



####Drop duplicates retains only the first unique value and drop rest of all dups

### Distinct is used to do row level deduplication
### DropDuplicates is used to do row & column level deduplication

In [0]:
final_cut_data.dropDuplicates().count()

Out[33]: 10003

###column level deduplication on Id 

In [0]:
final_cut_data.dropDuplicates(subset=["Id"]).count()


Out[34]: 9998

###Drop Duplicates with prioritization 

In [0]:
less_experience_prioritize=final_cut_data.sort("Experience",ascending=True).dropDuplicates(subset=["Experience"])
less_experience_prioritize.show(33)

+-------+---------+---------+----------+--------------------+
|     Id|    Fname|    Lname|Experience|          Profession|
+-------+---------+---------+----------+--------------------+
|4000002|    Paige|     Chen|        -1|               Actor|
|4000000|   Apache|    Spark|        11|              _Null_|
|4000052|  Shirley|  Merritt|        21|            Reporter|
|4000054|   Judith|   Cooper|        22|           Economist|
|4000145|    Roger|    Hanna|        23|            Musician|
|4000006|  Patrick|     Song|        24|              _Null_|
|4000035|  Shelley|    Weeks|        25|            Reporter|
|4000012|    Sandy|   Raynor|        26|              Writer|
|4000026|   Marian|  Solomon|        27|              Lawyer|
|4000024| Franklin|     Vick|        28|              Dancer|
|4000097|  Brandon|    James|        29|            Musician|
|4000003|vaishnavi|santharam|        30|                  IT|
|4000046|    Louis|Rosenthal|        31|              _Null_|
|4000099

In [0]:
more_experience_prioritize=final_cut_data.sort("Experience",ascending=False).dropDuplicates(subset=["Experience"])
more_experience_prioritize.show(33)

+-------+---------+---------+----------+--------------------+
|     Id|    Fname|    Lname|Experience|          Profession|
+-------+---------+---------+----------+--------------------+
|4000002|    Paige|     Chen|        -1|               Actor|
|4000000|   Apache|    Spark|        11|              _Null_|
|4000052|  Shirley|  Merritt|        21|            Reporter|
|4000054|   Judith|   Cooper|        22|           Economist|
|4000145|    Roger|    Hanna|        23|            Musician|
|4000006|  Patrick|     Song|        24|              _Null_|
|4000035|  Shelley|    Weeks|        25|            Reporter|
|4000012|    Sandy|   Raynor|        26|              Writer|
|4000026|   Marian|  Solomon|        27|              Lawyer|
|4000024| Franklin|     Vick|        28|              Dancer|
|4000097|  Brandon|    James|        29|            Musician|
|4000003|vaishnavi|santharam|        30|                  IT|
|4000046|    Louis|Rosenthal|        31|              _Null_|
|4000099

#Ranking (Most Important ) 

In [0]:
custom_schema=StructType([StructField("Id" , IntegerType(),True ),StructField("Fname" , StringType(),True ),StructField("Lname" , StringType(),True ),StructField("Experience" , IntegerType(),True ),StructField("Profession" , StringType(),True )])
new_data_frame=spark.read.csv("dbfs:/FileStore/BB2/custsmodified" , schema=custom_schema , header=False )

##Ranking begins now 

####Old age customer 

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
desc_ranked_df=new_data_frame.select("*",row_number().over(Window.partitionBy("Id").orderBy(col("Experience").desc())).alias("RANK"))

###Explanation 
-row_number().over() is used to create a new column that assigns a unique sequential integer to rows within a window partition.
-Window.partitionBy("Id") defines the window partition, meaning rows will be grouped by the "Id" column.
-orderBy("Experience") specifies the order within each partition, meaning rows within each partition will be ordered by the "Experience" column.
-alias("RANK") names the new column "RANK".

In [0]:
desc_ranked_df.show(12)

+-------+---------+---------+----------+----------+----+
|     Id|    Fname|    Lname|Experience|Profession|RANK|
+-------+---------+---------+----------+----------+----+
|   null|    Karen|  Puckett|        74|    Lawyer|   1|
|   null|    Hazel|   Bender|        63|      null|   2|
|   null|    Elsie| Hamilton|        43|     Pilot|   3|
|   null|     null|     null|      null|      null|   4|
|   null|     null|     null|      null|      null|   5|
|4000000|   Apache|    Spark|        11|      null|   1|
|4000001| Kristina|    Chung|        55|     Pilot|   1|
|4000001| Kristina|    Chung|        55|     Pilot|   2|
|4000002|    Paige|     Chen|      null|     Actor|   1|
|4000003|  mohamed|    irfan|        41|        IT|   1|
|4000003|   Sherri|   Melton|        34|  Reporter|   2|
|4000003|vaishnavi|santharam|        30|        IT|   3|
+-------+---------+---------+----------+----------+----+
only showing top 12 rows



####Young age customer 

In [0]:
asc_ranked_df=new_data_frame.select("*" , row_number().over(Window.partitionBy("Id").orderBy("Experience")).alias("RANK"))

In [0]:
asc_ranked_df.show(12)

+-------+---------+---------+----------+----------+----+
|     Id|    Fname|    Lname|Experience|Profession|RANK|
+-------+---------+---------+----------+----------+----+
|   null|     null|     null|      null|      null|   1|
|   null|     null|     null|      null|      null|   2|
|   null|    Elsie| Hamilton|        43|     Pilot|   3|
|   null|    Hazel|   Bender|        63|      null|   4|
|   null|    Karen|  Puckett|        74|    Lawyer|   5|
|4000000|   Apache|    Spark|        11|      null|   1|
|4000001| Kristina|    Chung|        55|     Pilot|   1|
|4000001| Kristina|    Chung|        55|     Pilot|   2|
|4000002|    Paige|     Chen|      null|     Actor|   1|
|4000003|vaishnavi|santharam|        30|        IT|   1|
|4000003|   Sherri|   Melton|        34|  Reporter|   2|
|4000003|  mohamed|    irfan|        41|        IT|   3|
+-------+---------+---------+----------+----------+----+
only showing top 12 rows



###proper difference of the rank 

In [0]:
desc_ranked_df.where("Id = 4000003").show()

asc_ranked_df.where("Id = 4000003").show()


+-------+---------+---------+----------+----------+----+
|     Id|    Fname|    Lname|Experience|Profession|RANK|
+-------+---------+---------+----------+----------+----+
|4000003|  mohamed|    irfan|        41|        IT|   1|
|4000003|   Sherri|   Melton|        34|  Reporter|   2|
|4000003|vaishnavi|santharam|        30|        IT|   3|
+-------+---------+---------+----------+----------+----+

+-------+---------+---------+----------+----------+----+
|     Id|    Fname|    Lname|Experience|Profession|RANK|
+-------+---------+---------+----------+----------+----+
|4000003|vaishnavi|santharam|        30|        IT|   1|
|4000003|   Sherri|   Melton|        34|  Reporter|   2|
|4000003|  mohamed|    irfan|        41|        IT|   3|
+-------+---------+---------+----------+----------+----+



###Rank concept by SQL 

In [0]:
new_data_frame.createOrReplaceTempView("table1")
spark.sql("select * from table1").show(3);

spark.sql("""select * , row_number()over(partition by Id order by Experience) as RANK from table1""").show(3)
spark.sql("""select * from (select *, row_number()over(partition by Id order by Experience) as RANK from table1) where Id=4000003""").show(3)

spark.sql("""select * , row_number()over(partition by Id order by Experience desc ) as RANK from table1""").show(3)
spark.sql("""select * from (select *, row_number()over(partition by Id order by Experience desc ) as RANK from table1) where Id=4000003""").show(3)

+-------+--------+-----+----------+----------+
|     Id|   Fname|Lname|Experience|Profession|
+-------+--------+-----+----------+----------+
|4000000|  Apache|Spark|        11|      null|
|4000001|Kristina|Chung|        55|     Pilot|
|4000001|Kristina|Chung|        55|     Pilot|
+-------+--------+-----+----------+----------+
only showing top 3 rows

+----+-----+--------+----------+----------+----+
|  Id|Fname|   Lname|Experience|Profession|RANK|
+----+-----+--------+----------+----------+----+
|null| null|    null|      null|      null|   1|
|null| null|    null|      null|      null|   2|
|null|Elsie|Hamilton|        43|     Pilot|   3|
+----+-----+--------+----------+----------+----+
only showing top 3 rows

+-------+---------+---------+----------+----------+----+
|     Id|    Fname|    Lname|Experience|Profession|RANK|
+-------+---------+---------+----------+----------+----+
|4000003|vaishnavi|santharam|        30|        IT|   1|
|4000003|   Sherri|   Melton|        34|  Reporter

###Summarize functions 

In [0]:
new_data_frame.describe().show()
new_data_frame.summary().show()


+-------+------------------+---------+---------+------------------+----------+
|summary|                Id|    Fname|    Lname|        Experience|Profession|
+-------+------------------+---------+---------+------------------+----------+
|  count|             10000|    10003|    10000|             10001|      9915|
|   mean|      4004999.4987|     null|     null| 48.55684431556844|      null|
| stddev|2886.8979293956204|     null|     null|15.549765059339597|      null|
|    min|           4000000|    Aaron|   Abbott|                11|Accountant|
|    max|           4009999|vaishnavi|santharam|                75|    Writer|
+-------+------------------+---------+---------+------------------+----------+

+-------+------------------+---------+---------+------------------+----------+
|summary|                Id|    Fname|    Lname|        Experience|Profession|
+-------+------------------+---------+---------+------------------+----------+
|  count|             10000|    10003|    10000|   

###Use of the above 
####Summary and describe help us understand at every column how many
####count of null values are there, mean(mid value) of a given int columns (understand the distribution of data)
####Percentail of the distribution of data

###Below 
#Data Structurizing - Combining Data (diff dir, subdir, filenames) +

In [0]:
dbutils.fs.mkdirs("/dbfs/FileStore/BB2/Multi_form_Data")
dbutils.fs.rm("/dbfs/FileStore/BB2/Multi_form_Data",True)
dbutils.fs.mkdirs("/FileStore/BB2/Multi_form_Data")
dbutils.fs.mkdirs("/FileStore/BB2/Multi_form_Data/Folder1")
dbutils.fs.mkdirs("/FileStore/BB2/Multi_form_Data/Folder2")



Out[45]: True

In [0]:
%fs cp dbfs:/FileStore/BB2/custsmodified /FileStore/BB2/Multi_form_Data/D1
%fs cp dbfs:/FileStore/BB2/custsmodified /FileStore/BB2/Multi_form_Data/Folder1/D2
%fs cp dbfs:/FileStore/BB2/custsmodified /FileStore/BB2/Multi_form_Data/Folder1/D3
%fs cp dbfs:/FileStore/BB2/custsmodified /FileStore/BB2/Multi_form_Data/Folder2/D4
%fs cp dbfs:/FileStore/BB2/custsmodified /FileStore/BB2/Multi_form_Data/Folder2/D5


In [0]:
dbutils.fs.cp("dbfs:/FileStore/BB2/custsmodified", "dbfs:/FileStore/BB2/Multi_form_Data/D1")
dbutils.fs.cp("dbfs:/FileStore/BB2/custsmodified", "dbfs:/FileStore/BB2/Multi_form_Data/Folder1/D2")
dbutils.fs.cp("dbfs:/FileStore/BB2/custsmodified", "dbfs:/FileStore/BB2/Multi_form_Data/Folder1/D3")
dbutils.fs.cp("dbfs:/FileStore/BB2/custsmodified", "dbfs:/FileStore/BB2/Multi_form_Data/Folder2/D4")
dbutils.fs.cp("dbfs:/FileStore/BB2/custsmodified", "dbfs:/FileStore/BB2/Multi_form_Data/Folder2/D5")


In [0]:
sample_data_fetch_pattern=spark.read.csv("dbfs:/FileStore/BB2/Multi_form_Data" , schema=custom_schema , header=False) 

###The below will fetch from any file // folder within that folder 

In [0]:
sample_data_fetch_pattern.show(3)
print("Number of records " , sample_data_fetch_pattern.count())

+-------+--------+-----+----------+----------+
|     Id|   Fname|Lname|Experience|Profession|
+-------+--------+-----+----------+----------+
|4000000|  Apache|Spark|        11|      null|
|4000001|Kristina|Chung|        55|     Pilot|
|4000001|Kristina|Chung|        55|     Pilot|
+-------+--------+-----+----------+----------+
only showing top 3 rows

Number of records  10005


###The below will only fetch from folder / file with specific pattern 

#Reading from one path contains multiple pattern of files

In [0]:
sample_data_fetch_with_pattern=spark.read.csv("dbfs:/FileStore/BB2/Multi_form_Data" , schema=custom_schema , pathGlobFilter="D[1-3]")

In [0]:
sample_data_fetch_with_pattern.show(3)
print("Count->>>" , sample_data_fetch_with_pattern.count())

+-------+--------+-----+----------+----------+
|     Id|   Fname|Lname|Experience|Profession|
+-------+--------+-----+----------+----------+
|4000000|  Apache|Spark|        11|      null|
|4000001|Kristina|Chung|        55|     Pilot|
|4000001|Kristina|Chung|        55|     Pilot|
+-------+--------+-----+----------+----------+
only showing top 3 rows

Count->>> 10005


#Combining Data - Reading from a multiple different paths contains multiple pattern of files

In [0]:
combined_fetch_records=spark.read.csv(["dbfs:/FileStore/BB2/custsmodified_1.txt","dbfs:/FileStore/BB2/Multi_form_Data/D1"] , schema=custom_schema , header=False , sep ="," )

In [0]:
combined_fetch_records.show(5)
combined_fetch_records.where("Id = 3000001").show(3)

+-------+--------+------+----------+----------+
|     Id|   Fname| Lname|Experience|Profession|
+-------+--------+------+----------+----------+
|4000000|  Apache| Spark|        11|      null|
|4000001|Kristina| Chung|        55|     Pilot|
|4000001|Kristina| Chung|        55|     Pilot|
|4000002|   Paige|  Chen|      null|     Actor|
|4000003|  Sherri|Melton|        34|  Reporter|
+-------+--------+------+----------+----------+
only showing top 5 rows

+-------+-----+-----+----------+----------+
|     Id|Fname|Lname|Experience|Profession|
+-------+-----+-----+----------+----------+
|3000001| John|  Doe|         5|  Engineer|
+-------+-----+-----+----------+----------+



#Merge 2 dataframe 

####The below combine the repeted record also 

#Union

In [0]:
# sample_data_fetch_pattern + combined_fetch_records 
combine_two_df=combined_fetch_records.union(sample_data_fetch_pattern)


In [0]:
combine_two_df.show(3)
print("Count - >>>>" , combine_two_df.count())


+-------+--------+-----+----------+----------+
|     Id|   Fname|Lname|Experience|Profession|
+-------+--------+-----+----------+----------+
|4000000|  Apache|Spark|        11|      null|
|4000001|Kristina|Chung|        55|     Pilot|
|4000001|Kristina|Chung|        55|     Pilot|
+-------+--------+-----+----------+----------+
only showing top 3 rows

Count - >>>> 20025


##union
####Column Order: The columns are combined based on their order in the DataFrames, not their names.
####Column Names: The column names must be identical in both DataFrames, including their order and type.
####Missing Columns: If the columns are not in the same order or there are missing columns, an error will occur.

#unionByName 

In [0]:
# sample_data_fetch_pattern + combined_fetch_records 
combined_by_unionByName= sample_data_fetch_pattern.unionByName(combined_fetch_records,allowMissingColumns=True)

In [0]:
combined_by_unionByName.show(3)
print("Count->>>" , combined_by_unionByName.count())

+-------+--------+-----+----------+----------+
|     Id|   Fname|Lname|Experience|Profession|
+-------+--------+-----+----------+----------+
|4000000|  Apache|Spark|        11|      null|
|4000001|Kristina|Chung|        55|     Pilot|
|4000001|Kristina|Chung|        55|     Pilot|
+-------+--------+-----+----------+----------+
only showing top 3 rows

Count->>> 20025


##unionByName
####Column Order: The columns are combined based on their names, not their order in the DataFrames.
####Column Names: The columns can be in a different order in each DataFrame, and unionByName will still combine them correctly.
####Missing Columns: With allowMissingColumns=True, unionByName can handle DataFrames with missing columns by filling the missing columns with null values.

#Schema Evolution (Structuring) - source data is evolving with different structure in one file 

In [0]:
dbutils.fs.mkdirs("/FileStore/BB2/schema_evolution")

Out[56]: True

In [0]:
day_1=spark.read.csv("dbfs:/FileStore/BB2/schema_evolution/day1.txt", schema=custom_schema , header=False , sep="," )
day_2=spark.read.csv("dbfs:/FileStore/BB2/schema_evolution/day2.txt", schema=custom_schema , header=False , sep=",")
day_3=spark.read.csv("dbfs:/FileStore/BB2/schema_evolution/day3.txt", schema=custom_schema , header=False , sep=",")
day_4=spark.read.csv("dbfs:/FileStore/BB2/schema_evolution/day4.txt", schema=custom_schema , header=False, sep="," )
day_5=spark.read.csv("dbfs:/FileStore/BB2/schema_evolution/day5.txt", schema=custom_schema , header=False, sep="," )
day_6=spark.read.csv("dbfs:/FileStore/BB2/schema_evolution/day6.txt", schema=custom_schema , header=False , sep=",")

day_1.show(3)
day_2.show(3)
day_3.show(3)
day_4.show(3)
day_5.show(3)
day_6.show(3)




+-------+--------+--------+----------+----------+
|     Id|   Fname|   Lname|Experience|Profession|
+-------+--------+--------+----------+----------+
|3000015|Isabella|   White|         1|    Farmer|
|3000016|    Noah|  Garcia|         2|  Engineer|
|3000017|  Olivia|Martinez|         3|   Teacher|
+-------+--------+--------+----------+----------+
only showing top 3 rows

+-------+------+--------+----------+----------+
|     Id| Fname|   Lname|Experience|Profession|
+-------+------+--------+----------+----------+
|3000036|   Ava|Martinez|        22| Architect|
|3000037|Sophia| Jackson|        23|  Designer|
|3000038| Logan|  Taylor|        24|     Pilot|
+-------+------+--------+----------+----------+
only showing top 3 rows

+-------+-----+------+----------+----------+
|     Id|Fname| Lname|Experience|Profession|
+-------+-----+------+----------+----------+
|3000056| Liam|Taylor|        42|   Teacher|
|3000057|  Ava| Brown|        43|    Doctor|
|3000058| Noah| White|        44|    Ar

In [0]:
dbutils.fs.mkdirs("/FileStore/BB2/schema_evolution/Complete_data")

Out[58]: True

In [0]:
day_1.write.mode("overwrite").csv("/FileStore/BB2/schema_evolution/Complete_data/Data", sep="~", header=True)
day_2.write.mode("append").csv("/FileStore/BB2/schema_evolution/Complete_data/Data", sep="~", header=True)
day_3.write.mode("append").csv("/FileStore/BB2/schema_evolution/Complete_data/Data", sep="~", header=True)
day_4.write.mode("append").csv("/FileStore/BB2/schema_evolution/Complete_data/Data", sep="~", header=True)
day_5.write.mode("append").csv("/FileStore/BB2/schema_evolution/Complete_data/Data", sep="~", header=True)
day_6.write.mode("append").csv("/FileStore/BB2/schema_evolution/Complete_data/Data", sep="~", header=True)

print("Count 1-->>" , day_1.count())
print("Count 2-->>" , day_2.count())
print("Count 3-->>" , day_3.count())
print("Count 4-->>" , day_4.count())
print("Count 5-->>" , day_5.count())
print("Count 6-->>" , day_6.count())

all_data_in_one_data_frame=spark.read.csv("dbfs:/FileStore/BB2/schema_evolution/Complete_data/Data",sep="~", header=True)
print("Complete Count -->>" , all_data_in_one_data_frame.count())


Count 1-->> 21
Count 2-->> 20
Count 3-->> 20
Count 4-->> 20
Count 5-->> 20
Count 6-->> 20
Complete Count -->> 121


In [0]:
all_data_in_one_data_frame.show(125)

+-------+---------+---------+----------+----------+
|     Id|    Fname|    Lname|Experience|Profession|
+-------+---------+---------+----------+----------+
|3000015| Isabella|    White|         1|    Farmer|
|3000016|     Noah|   Garcia|         2|  Engineer|
|3000017|   Olivia| Martinez|         3|   Teacher|
|3000018|     Liam|  Johnson|         4|    Doctor|
|3000019|     Emma|    Smith|         5|    Artist|
|3000020|    Mason| Anderson|         6|    Lawyer|
|3000021|   Sophia|   Thomas|         7|  Musician|
|3000022|  William|   Wilson|         8|      Chef|
|3000023|   Amelia| Martinez|         9| Architect|
|3000024|    James|  Jackson|        10|  Designer|
|3000025|Charlotte|   Taylor|        11|     Pilot|
|3000026| Benjamin|    Moore|        12|    Writer|
|3000027|   Elijah|    Davis|        13| Scientist|
|3000028|  Abigail|Hernandez|        14|  Mechanic|
|3000029|    Lucas| Anderson|        15|  Engineer|
|3000030|      Mia|   Taylor|        16|   Teacher|
|3000031|   

###For the above the union / unionByName is a costly approach 

#Below is the best approach 

###Preferred way to achive schema evolution is by using orc/parquet formats (day by day sturcture of data may change)

In [0]:
dbutils.fs.mkdirs("/FileStore/BB2/ORC_complete_data")

Out[61]: True

In [0]:
# dbutils.fs.rm("/FileStore/BB2/ORC_complete_data/Data",True) If data present already before working 
day_1.write.mode("overwrite").orc("/FileStore/BB2/ORC_complete_data/Data")
day_2.write.mode("append").orc("/FileStore/BB2/ORC_complete_data/Data",)
day_3.write.mode("append").orc("/FileStore/BB2/ORC_complete_data/Data")
day_4.write.mode("append").orc("/FileStore/BB2/ORC_complete_data/Data")
day_5.write.mode("append").orc("/FileStore/BB2/ORC_complete_data/Data")
day_6.write.mode("append").orc("/FileStore/BB2/ORC_complete_data/Data")

all_data_in_one_data_frame_2=spark.read.orc("/FileStore/BB2/ORC_complete_data/Data" , mergeSchema=True )

In [0]:
all_data_in_one_data_frame_2.show(4)

+-------+--------+--------+----------+----------+
|     Id|   Fname|   Lname|Experience|Profession|
+-------+--------+--------+----------+----------+
|3000015|Isabella|   White|         1|    Farmer|
|3000016|    Noah|  Garcia|         2|  Engineer|
|3000017|  Olivia|Martinez|         3|   Teacher|
|3000018|    Liam| Johnson|         4|    Doctor|
+-------+--------+--------+----------+----------+
only showing top 4 rows



In [0]:

print("Count 1 ->>>",all_data_in_one_data_frame.count())

print("Count 2 ->>>",all_data_in_one_data_frame_2.count())


Count 1 ->>> 121
Count 2 ->>> 121


#mergeSchema use :
Without mergeSchema=True, Spark expects all files or partitions to have exactly the same schema. If there are schema differences between files, Spark will throw an error unless they match exactly.

Wait for rafeeq ques to be clear for us  

#Data Preparation/Preprocessing/Validation (Cleansing & Scrubbing) - "
###Cleaning data to remove outliers and inaccuracies & Identifying and filling gaps ")
###We are going to use na function to achieve both cleansing and scrubbing
###Cleansing - uncleaned vessel will be thrown (na.drop is used for cleansing)


In [0]:
raw_data=spark.read.csv("dbfs:/FileStore/BB2/Multi_form_Data" , schema=custom_schema , header=False) 

In [0]:
raw_data.show(45)
print("Count->>" , raw_data.count())

+-------+---------+---------+----------+--------------------+
|     Id|    Fname|    Lname|Experience|          Profession|
+-------+---------+---------+----------+--------------------+
|4000000|   Apache|    Spark|        11|                null|
|4000001| Kristina|    Chung|        55|               Pilot|
|4000001| Kristina|    Chung|        55|               Pilot|
|4000002|    Paige|     Chen|      null|               Actor|
|4000003|   Sherri|   Melton|        34|            Reporter|
|4000003|  mohamed|    irfan|        41|                  IT|
|4000003|vaishnavi|santharam|        30|                  IT|
|4000004| Gretchen|     null|        66|                null|
|   null|    Karen|  Puckett|        74|              Lawyer|
|4000006|  Patrick|     Song|        24|                null|
|   null|    Elsie| Hamilton|        43|               Pilot|
|   null|    Hazel|   Bender|        63|                null|
|   null|     null|     null|      null|                null|
|4000009

In [0]:
drop_null_val_columns=raw_data.na.drop("all",subset=["Id" , "Fname","Lname","Experience","Profession"])
print(drop_null_val_columns.count())
drop_null_val_columns.where("Id is NULL").show() # Still the NULL values are there 

drop_null_val_columns=drop_null_val_columns.na.drop(how="any",subset=["Id" , "Fname","Lname","Experience","Profession"])
print(drop_null_val_columns.count())
drop_null_val_columns.where("Id is NULL").show() # Now no NULL values are there 
drop_null_val_columns.where("Fname is NULL").show() # Now no NULL values are there 
drop_null_val_columns.where("Lname is NULL").show() # Now no NULL values are there 
drop_null_val_columns.where("Experience is NULL").show() # Now no NULL values are there 
drop_null_val_columns.where("Profession is NULL").show() # Now no NULL values are there 

drop_using_threshold=drop_null_val_columns.na.drop(thresh=3,subset=["Id" , "Fname","Lname","Experience","Profession"]) 
# Col should not have min 3 non null values  
print(drop_using_threshold.count())

10003
+----+-----+--------+----------+----------+
|  Id|Fname|   Lname|Experience|Profession|
+----+-----+--------+----------+----------+
|null|Karen| Puckett|        74|    Lawyer|
|null|Elsie|Hamilton|        43|     Pilot|
|null|Hazel|  Bender|        63|      null|
+----+-----+--------+----------+----------+

9911
+---+-----+-----+----------+----------+
| Id|Fname|Lname|Experience|Profession|
+---+-----+-----+----------+----------+
+---+-----+-----+----------+----------+

+---+-----+-----+----------+----------+
| Id|Fname|Lname|Experience|Profession|
+---+-----+-----+----------+----------+
+---+-----+-----+----------+----------+

+---+-----+-----+----------+----------+
| Id|Fname|Lname|Experience|Profession|
+---+-----+-----+----------+----------+
+---+-----+-----+----------+----------+

+---+-----+-----+----------+----------+
| Id|Fname|Lname|Experience|Profession|
+---+-----+-----+----------+----------+
+---+-----+-----+----------+----------+

+---+-----+-----+----------+--------

###thresh=n: This means that a row must have at least n non-null values to be retained in the DataFrame.

##Below 
#Scrubbing the DF by Replacing prof with IT as data engineer and writer with editor

In [0]:
dictionary_to_replace={"Lawyer":"Hign court Lawyer" , "Police officer":"Police" }
after_replace_df=drop_using_threshold.na.replace(dictionary_to_replace,subset=["Profession"])
after_replace_df.where("Profession =='Police' or Profession=='Hign court Lawyer'").show()

+-------+----------+---------+----------+-----------------+
|     Id|     Fname|    Lname|Experience|       Profession|
+-------+----------+---------+----------+-----------------+
|4000026|    Marian|  Solomon|        27|Hign court Lawyer|
|4000033|       Tim|    Watts|        58|Hign court Lawyer|
|4000039|     Erica|     Hall|        33|           Police|
|4000065|    Eugene|   Graham|        52|           Police|
|4000090|     Patsy| Sinclair|        48|           Police|
|4000109|   Vincent|   Sumner|        31|Hign court Lawyer|
|4000130|      Toni|    Glass|        46|Hign court Lawyer|
|4000167|      Lynn|Robertson|        45|Hign court Lawyer|
|4000195|    Claire|  Pickett|        59|Hign court Lawyer|
|4000231|Marguerite|   Weiner|        45|           Police|
|4000233|      Alex|    Henry|        67|           Police|
|4000303|      Erin|    Finch|        54|           Police|
|4000333|   Kenneth|  Pickett|        28|           Police|
|4000346|     Diana|    Crane|        26

#1. Data Standardization : Data/Columns(name/type) - Column re-order/number of columns changes (add/remove/Replacement/Renaming)  to make it in a understandable/usable format
###

###Create a word count of the prefession col
##Use the withColumn 

In [0]:
after_adding_col=after_replace_df.withColumn("Size_of_col",lit("Size"))
after_adding_col.show(10)

+-------+---------+---------+----------+----------+-----------+
|     Id|    Fname|    Lname|Experience|Profession|Size_of_col|
+-------+---------+---------+----------+----------+-----------+
|4000001| Kristina|    Chung|        55|     Pilot|       Size|
|4000001| Kristina|    Chung|        55|     Pilot|       Size|
|4000003|   Sherri|   Melton|        34|  Reporter|       Size|
|4000003|  mohamed|    irfan|        41|        IT|       Size|
|4000003|vaishnavi|santharam|        30|        IT|       Size|
|4000011|  Francis| McNamara|        47| Therapist|       Size|
|4000012|    Sandy|   Raynor|        26|    Writer|       Size|
|4000013|   Marion|     Moon|        41| Carpenter|       Size|
|4000015|    Julia|    Desai|        49|  Musician|       Size|
|4000016|   Jerome|  Wallace|        52|Pharmacist|       Size|
+-------+---------+---------+----------+----------+-----------+
only showing top 10 rows



In [0]:
after_adding_col_1=after_adding_col.withColumn("Profession_Size",size(split("Profession"," ")))
after_adding_col_1.show(10)
after_adding_col_1.where("Profession_Size>1").show(3)


+-------+---------+---------+----------+----------+-----------+---------------+
|     Id|    Fname|    Lname|Experience|Profession|Size_of_col|Profession_Size|
+-------+---------+---------+----------+----------+-----------+---------------+
|4000001| Kristina|    Chung|        55|     Pilot|       Size|              1|
|4000001| Kristina|    Chung|        55|     Pilot|       Size|              1|
|4000003|   Sherri|   Melton|        34|  Reporter|       Size|              1|
|4000003|  mohamed|    irfan|        41|        IT|       Size|              1|
|4000003|vaishnavi|santharam|        30|        IT|       Size|              1|
|4000011|  Francis| McNamara|        47| Therapist|       Size|              1|
|4000012|    Sandy|   Raynor|        26|    Writer|       Size|              1|
|4000013|   Marion|     Moon|        41| Carpenter|       Size|              1|
|4000015|    Julia|    Desai|        49|  Musician|       Size|              1|
|4000016|   Jerome|  Wallace|        52|

In [0]:
after_adding_col_1=after_adding_col_1.withColumn("Upper_Prof",upper("Profession"))
after_adding_col_1.show(3)
print(after_adding_col_1.describe())


+-------+--------+------+----------+----------+-----------+---------------+----------+
|     Id|   Fname| Lname|Experience|Profession|Size_of_col|Profession_Size|Upper_Prof|
+-------+--------+------+----------+----------+-----------+---------------+----------+
|4000001|Kristina| Chung|        55|     Pilot|       Size|              1|     PILOT|
|4000001|Kristina| Chung|        55|     Pilot|       Size|              1|     PILOT|
|4000003|  Sherri|Melton|        34|  Reporter|       Size|              1|  REPORTER|
+-------+--------+------+----------+----------+-----------+---------------+----------+
only showing top 3 rows

DataFrame[summary: string, Id: string, Fname: string, Lname: string, Experience: string, Profession: string, Size_of_col: string, Profession_Size: string, Upper_Prof: string]


In [0]:
from pyspark.sql.functions import col
result_data_frame=after_adding_col_1.withColumn("Experience_Int",col("Experience").cast("int")).withColumn("Id_Int",col("Id").cast("int"))

In [0]:
result_data_frame.show(3)
dropped_casted_col=result_data_frame.drop("Experience","Id","Size_of_col")
dropped_casted_col.show(3)
ordered_casted_df=dropped_casted_col.select("Id_Int","Fname","Lname","Profession","Experience_Int","Profession_Size","Upper_Prof")
ordered_casted_df.show(4)

+-------+--------+------+----------+----------+-----------+---------------+----------+--------------+-------+
|     Id|   Fname| Lname|Experience|Profession|Size_of_col|Profession_Size|Upper_Prof|Experience_Int| Id_Int|
+-------+--------+------+----------+----------+-----------+---------------+----------+--------------+-------+
|4000001|Kristina| Chung|        55|     Pilot|       Size|              1|     PILOT|            55|4000001|
|4000001|Kristina| Chung|        55|     Pilot|       Size|              1|     PILOT|            55|4000001|
|4000003|  Sherri|Melton|        34|  Reporter|       Size|              1|  REPORTER|            34|4000003|
+-------+--------+------+----------+----------+-----------+---------------+----------+--------------+-------+
only showing top 3 rows

+--------+------+----------+---------------+----------+--------------+-------+
|   Fname| Lname|Profession|Profession_Size|Upper_Prof|Experience_Int| Id_Int|
+--------+------+----------+---------------+---

##SQL is more comfortable to achive all these Data Standardization
#Add, remove, replace, rearrange, renaming

#data munging completed

##Below 
#Data Enrichment (values)-> Add, Rename, merge(Concat), Split, Casting of Fields, Reformat, " 
#### Makes your data rich and detailed 

In [0]:
renamed_ordered_casted_df=ordered_casted_df.withColumnRenamed("Experience_Int","Experience").withColumnRenamed("Id_Int","Id")
renamed_ordered_casted_df.show(3)

+-------+--------+------+----------+----------+---------------+----------+
|     Id|   Fname| Lname|Profession|Experience|Profession_Size|Upper_Prof|
+-------+--------+------+----------+----------+---------------+----------+
|4000001|Kristina| Chung|     Pilot|        55|              1|     PILOT|
|4000001|Kristina| Chung|     Pilot|        55|              1|     PILOT|
|4000003|  Sherri|Melton|  Reporter|        34|              1|  REPORTER|
+-------+--------+------+----------+----------+---------------+----------+
only showing top 3 rows



In [0]:
renamed_ordered_casted_df=renamed_ordered_casted_df.withColumn("Date_of_entry",current_date()).withColumn("Time_Stamp",current_timestamp())

###Concat 


In [0]:
finalized_df=renamed_ordered_casted_df.withColumn("Mail_Id",concat("Fname",lit("."),"Lname",lit("@gmail.com")))
finalized_df.show(3)
refinalized_df=finalized_df.withColumn("Month",month("Date_of_entry")).withColumn("Year",year("Date_of_entry"))
refinalized_df.show(3)

+-------+--------+------+----------+----------+---------------+----------+-------------+--------------------+--------------------+
|     Id|   Fname| Lname|Profession|Experience|Profession_Size|Upper_Prof|Date_of_entry|          Time_Stamp|             Mail_Id|
+-------+--------+------+----------+----------+---------------+----------+-------------+--------------------+--------------------+
|4000001|Kristina| Chung|     Pilot|        55|              1|     PILOT|   2024-07-12|2024-07-12 12:26:...|Kristina.Chung@gm...|
|4000001|Kristina| Chung|     Pilot|        55|              1|     PILOT|   2024-07-12|2024-07-12 12:26:...|Kristina.Chung@gm...|
|4000003|  Sherri|Melton|  Reporter|        34|              1|  REPORTER|   2024-07-12|2024-07-12 12:26:...|Sherri.Melton@gma...|
+-------+--------+------+----------+----------+---------------+----------+-------------+--------------------+--------------------+
only showing top 3 rows

+-------+--------+------+----------+----------+-----------

# Data Customization & Custom Processing (custom Business logics) -> Apply User defined functions and utils/functions/modularization/reusable functions & reusable framework creation 

#UDF 


In [0]:
python_function=lambda x:x.upper()
result=python_function("sdfsf sdfsdf")
print(result)

SDFSF SDFSDF


In [0]:
from pyspark.sql.functions import * 
py_func_to_udf=udf(python_function)
refinalized_df=refinalized_df.withColumn("Upper_F_NAME" , py_func_to_udf(col("Fname")))

##Advice
 #####use builtin/predefined functions primarily (dont go for udfs if already we have equivalent builtins) 

In [0]:
def function_exp(experience):
  if experience > 50 :
    return "Experienced"
  elif experience >25 and experience<50 :
    return "Professional"
  else :
    return "Begineer"
result=function_exp(9);
print("Result ->>>" , result )

result=function_exp(91);
print("Result ->>>" , result )

result=function_exp(29);
print("Result ->>>" , result )



Result ->>> Begineer
Result ->>> Experienced
Result ->>> Professional


In [0]:
from pyspark.sql.functions import * 
udf_age_cal=udf(function_exp)
refinalized_df=refinalized_df.withColumn("Industry_Exp" , udf_age_cal(col("Experience")))
after_drop_refinalized_df=refinalized_df.drop("Date_of_entry","Time_Stamp","Month","Year","Upper_F_NAME")
after_drop_refinalized_df.show(3)

+-------+--------+------+----------+----------+---------------+----------+--------------------+------------+
|     Id|   Fname| Lname|Profession|Experience|Profession_Size|Upper_Prof|             Mail_Id|Industry_Exp|
+-------+--------+------+----------+----------+---------------+----------+--------------------+------------+
|4000001|Kristina| Chung|     Pilot|        55|              1|     PILOT|Kristina.Chung@gm...| Experienced|
|4000001|Kristina| Chung|     Pilot|        55|              1|     PILOT|Kristina.Chung@gm...| Experienced|
|4000003|  Sherri|Melton|  Reporter|        34|              1|  REPORTER|Sherri.Melton@gma...|Professional|
+-------+--------+------+----------+----------+---------------+----------+--------------------+------------+
only showing top 3 rows



##udf at SQL query

####converting python function to UDF for using the udf in Domain Specific Language

In [0]:
spark.udf.register("sql_funciton",udf_age_cal)

after_drop_refinalized_df.createOrReplaceTempView("table1")

spark.sql("select *,sql_funciton(Experience) as Indus_Exp from table1").show(4)

+-------+--------+------+----------+----------+---------------+----------+--------------------+------------+------------+
|     Id|   Fname| Lname|Profession|Experience|Profession_Size|Upper_Prof|             Mail_Id|Industry_Exp|   Indus_Exp|
+-------+--------+------+----------+----------+---------------+----------+--------------------+------------+------------+
|4000001|Kristina| Chung|     Pilot|        55|              1|     PILOT|Kristina.Chung@gm...| Experienced| Experienced|
|4000001|Kristina| Chung|     Pilot|        55|              1|     PILOT|Kristina.Chung@gm...| Experienced| Experienced|
|4000003|  Sherri|Melton|  Reporter|        34|              1|  REPORTER|Sherri.Melton@gma...|Professional|Professional|
|4000003| mohamed| irfan|        IT|        41|              1|        IT|mohamed.irfan@gma...|Professional|Professional|
+-------+--------+------+----------+----------+---------------+----------+--------------------+------------+------------+
only showing top 4 rows


##Data CUSTOMIZATION (if we can't use existing function)- I HAVE A CUSTOM REQUIREMENT: PYTHON FUNC -> CONVERTED/REGISTERED UDF -> USED DSL/SQL respectively

## Go for UDFs if it is in evitable – Because the usage of UDFs will degrade the performance of your pipeline
##Drawback of creating UDF functions:
###1. Creating, testing, handling exception, validating is a timeconsuming and challenging process
###2. Important Drawback is Usage of UDFs will degrade the performace due to the following reasons
###  a.Custom functions are not by default serialize the data whereas builtin/predefined functions will serialze the data internally
###   b.Custom functions is a black box for Spark Optimizer and Spark (Catalyst) optimizer will not apply any optimization on the UDFs

In [0]:
refinalized_df.explain()
print("Some other DF where UDF is not applied \n\n")
ordered_casted_df.explain()
print("Some other DF where UDF is not applied \n\n")

raw_data.explain()

== Physical Plan ==
*(2) Project [Id#3823, Fname#3824, Lname#3825, Profession#4091, Experience#3826, size(split(Profession#4091,  , -1), true) AS Profession_Size#4160, upper(Profession#4091) AS Upper_Prof#4236, 2024-07-12 AS Date_of_entry#4782, 2024-07-12 12:26:15.031 AS Time_Stamp#4791, concat(Fname#3824, ., Lname#3825, @gmail.com) AS Mail_Id#4801, 7 AS Month#4858, 2024 AS Year#4870, pythonUDF0#5080 AS Upper_F_NAME#4938, pythonUDF1#5081 AS Industry_Exp#4953]
+- BatchEvalPython [<lambda>(Fname#3824)#4937, function_exp(Experience#3826)#4952], [pythonUDF0#5080, pythonUDF1#5081]
   +- *(1) Project [Id#3823, Fname#3824, Lname#3825, Experience#3826, CASE WHEN (Profession#3827 = Lawyer) THEN Hign court Lawyer WHEN (Profession#3827 = Police officer) THEN Police ELSE Profession#3827 END AS Profession#4091]
      +- *(1) Filter ((atleastnnonnulls(1, Id#3823, Fname#3824, Lname#3825, Experience#3826, Profession#3827) AND atleastnnonnulls(5, Id#3823, Fname#3824, Lname#3825, Experience#3826, Profes

###Go for UDFs if it is in evitable – Because the usage of UDFs will degrade the performance of your pipeline

#####'== Physical Plan == "Catalyst optimizer - Not applying pushedfilter"
#####PushedFilters: []

###Below 
#Core Curation/Pre Wrangling - Core Data Processing/Transformation (Level1) (Pre Wrangling) 

In [0]:
finalized_df_1=finalized_df.withColumn("Industry_Exp",when((col("Experience") > 50 ),lit("Experienced")).when((col("Experience")>25) &  (col("Experience")<50),lit("Professional")).otherwise(lit("Begineer")))
finalized_df_1.show(3)

+-------+--------+------+----------+----------+---------------+----------+-------------+--------------------+--------------------+------------+
|     Id|   Fname| Lname|Profession|Experience|Profession_Size|Upper_Prof|Date_of_entry|          Time_Stamp|             Mail_Id|Industry_Exp|
+-------+--------+------+----------+----------+---------------+----------+-------------+--------------------+--------------------+------------+
|4000001|Kristina| Chung|     Pilot|        55|              1|     PILOT|   2024-07-12|2024-07-12 12:26:...|Kristina.Chung@gm...| Experienced|
|4000001|Kristina| Chung|     Pilot|        55|              1|     PILOT|   2024-07-12|2024-07-12 12:26:...|Kristina.Chung@gm...| Experienced|
|4000003|  Sherri|Melton|  Reporter|        34|              1|  REPORTER|   2024-07-12|2024-07-12 12:26:...|Sherri.Melton@gma...|Professional|
+-------+--------+------+----------+----------+---------------+----------+-------------+--------------------+--------------------+------

##coalesce function (equivalent to na.fill function) to convert null to 0

In [0]:
finalized_df_2=finalized_df_1.drop("Upper_Prof","Date_of_entry","Time_Stamp")
finalized_df_3=finalized_df_2.withColumn("Experience_No_NULL" , coalesce("Experience",lit(0)))
finalized_df_4=finalized_df_3.drop("Experience")
finalized_df_4.show(3)
finalized_df_4.createOrReplaceTempView("table1")
after_sql_finalized_df_4=spark.sql("""select * , case when Experience_No_NULL > 50 then 'Experienced' when Experience_No_NULL>25 and Experience_No_NULL<50 then 'Professional' else 'Begineer' end as Industry_Exp  from table1""")
after_sql_finalized_df_4.show(3)
after_sql_finalized_df_4.describe


+-------+--------+------+----------+---------------+--------------------+------------+------------------+
|     Id|   Fname| Lname|Profession|Profession_Size|             Mail_Id|Industry_Exp|Experience_No_NULL|
+-------+--------+------+----------+---------------+--------------------+------------+------------------+
|4000001|Kristina| Chung|     Pilot|              1|Kristina.Chung@gm...| Experienced|                55|
|4000001|Kristina| Chung|     Pilot|              1|Kristina.Chung@gm...| Experienced|                55|
|4000003|  Sherri|Melton|  Reporter|              1|Sherri.Melton@gma...|Professional|                34|
+-------+--------+------+----------+---------------+--------------------+------------+------------------+
only showing top 3 rows

+-------+--------+------+----------+---------------+--------------------+------------+------------------+------------+
|     Id|   Fname| Lname|Profession|Profession_Size|             Mail_Id|Industry_Exp|Experience_No_NULL|Industry_

##Try group by on these data 

In [0]:
after_sql_finalized_df_4.select("*",row_number().over(Window.partitionBy("Experienced").orderBy(col("Experience_No_NULL"))).alias("RANK"))

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-725833244378901>:1[0m
[0;32m----> 1[0m [43mafter_sql_finalized_df_4[49m[38;5;241;43m.[39;49m[43mselect[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43m*[39;49m[38;5;124;43m"[39;49m[43m,[49m[43mrow_number[49m[43m([49m[43m)[49m[38;5;241;43m.[39;49m[43mover[49m[43m([49m[43mWindow[49m[38;5;241;43m.[39;49m[43mpartitionBy[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mExperienced[39;49m[38;5;124;43m"[39;49m[43m)[49m[38;5;241;43m.[39;49m[43morderBy[49m[43m([49m[43mcol[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mExperience_No_NULL[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m)[49m[43m)[49m[38;5;241;43m.[39;49m[43malias[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mRANK[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m)[49m

File [0;

In [0]:
after_sql_finalized_df_4.select("*",row_number().over(Window.partitionBy("Id").orderBy(col("Experience_No_NULL"))).alias("RANK")).show(4)

+-------+---------+---------+----------+---------------+--------------------+------------+------------------+------------+----+
|     Id|    Fname|    Lname|Profession|Profession_Size|             Mail_Id|Industry_Exp|Experience_No_NULL|Industry_Exp|RANK|
+-------+---------+---------+----------+---------------+--------------------+------------+------------------+------------+----+
|4000001| Kristina|    Chung|     Pilot|              1|Kristina.Chung@gm...| Experienced|                55| Experienced|   1|
|4000001| Kristina|    Chung|     Pilot|              1|Kristina.Chung@gm...| Experienced|                55| Experienced|   2|
|4000003|vaishnavi|santharam|        IT|              1|vaishnavi.santhar...|Professional|                30|Professional|   1|
|4000003|   Sherri|   Melton|  Reporter|              1|Sherri.Melton@gma...|Professional|                34|Professional|   2|
+-------+---------+---------+----------+---------------+--------------------+------------+--------------

In [0]:
result=after_sql_finalized_df_4.groupBy("Experience_No_NULL").agg(avg("Experience_No_NULL").alias("AVG_AGE"))
result.describe
# print(type(result))
result.show(4)
after_sql_finalized_df_4.createOrReplaceTempView("table1")
spark.sql("""select Experience_No_NULL,avg(Experience_No_NULL)  from table1 group by Experience_No_NULL """).show(3)



+------------------+-------+
|Experience_No_NULL|AVG_AGE|
+------------------+-------+
|                31|   31.0|
|                65|   65.0|
|                53|   53.0|
|                34|   34.0|
+------------------+-------+
only showing top 4 rows

+------------------+-----------------------+
|Experience_No_NULL|avg(Experience_No_NULL)|
+------------------+-----------------------+
|                31|                   31.0|
|                65|                   65.0|
|                53|                   53.0|
+------------------+-----------------------+
only showing top 3 rows



In [0]:
after_sql_finalized_df_4.groupBy("Profession","Experience_No_NULL").agg(min("Experience_No_NULL").alias("min_exp"),max("Experience_No_NULL").alias("max_exp")).show()

+--------------------+------------------+-------+-------+
|          Profession|Experience_No_NULL|min_exp|max_exp|
+--------------------+------------------+-------+-------+
|            Designer|                60|     60|     60|
|    Childcare worker|                49|     49|     49|
|   Financial analyst|                31|     31|     31|
|              Dancer|                71|     71|     71|
|          Pharmacist|                50|     50|     50|
|Computer support ...|                73|     73|     73|
|        Statistician|                21|     21|     21|
|        Psychologist|                67|     67|     67|
|           Architect|                34|     34|     34|
|            Designer|                53|     53|     53|
|           Carpenter|                25|     25|     25|
|        Veterinarian|                48|     48|     48|
|              Writer|                52|     52|     52|
|              Writer|                38|     38|     38|
|       Social

##Data Wrangling (Analytical Functionalities) - Complete Data Curation/Processing/Transformation (Level2)  ->>
##"Joins (Lookup, Lookup & Enrichment, Denormalization (schema modeling)), Windowing, Analytical, set operations "
##"Summarization (joined/lookup/enriched/denormalized) 

#joins 

In [0]:
dbutils.fs.mkdirs("/FileStore/BB2/JOIN_Data")

Out[89]: True

In [0]:
from pyspark.sql.functions import *
count =0 
print("Inner Join................ ")
print("Count->>>>" , count)
df_left=spark.read.csv("dbfs:/FileStore/BB2/JOIN_Data/table1.txt" , sep="," , header=False).toDF("employee_id","employee_name","employee_age")
df_left.show(3)
count=count+1

print("Count->>>>" , count)
df_right=spark.read.csv("dbfs:/FileStore/BB2/JOIN_Data/table2.txt" , sep="," , header=False).toDF("employee_id","department","salary")
df_left.show(3)
count=count+1


# df_joined=df_left.join(df_right,on=col("employee_id") == col("employee_id") , how="inner")
# df_joined.show(3)
#pyspark.sql.utils.AnalysisException: Reference 'fname' is ambiguous, could be: fname, fname.

#Standard/Comprehensive/Complete way of writing join syntax
print("Count->>>>" , count)
df_joined=df_left.alias("l").join(df_right.alias("r"),on=col("l.employee_id") == col("r.employee_id") , how="inner")
df_joined.show(3)
count=count+1

#If We are going with multiple join conditions, how do we handle
# df_joined=df_left.alias("l").join(df_right.alias("r"),on=[col("l.employee_id") == col("r.employee_id") & ],how="inner")
# df_joined.select("l.custid","l.lname","r.lname").show()

#Observations for inner join:
#returns only matching rows
#we need to use join conditions
#we don't have to mention the type of join, still inner/equi/natural join will happen


#Outer join DSL
#df_joined=df_left.alias("l").join(df_right.alias("r"),on=[col("l.custid")==col("r.cid")],how="leftouterjoin")
#Supported join types include: 'inner', 'outer', 'full', 'fullouter', 'full_outer', 'leftouter', 'left', 'left_outer',
# 'rightouter', 'right', 'right_outer', 'leftsemi', 'left_semi', 'semi', 'leftanti', 'left_anti', 'anti', 'cross'

#Leftouter
#syntax
print("Left Join................ ")
print("Count->>>>" , count)
df_joined=df_left.alias("l").join(df_right.alias("r"),on=col("l.employee_id") == col("r.employee_id") , how="left")
df_joined.show(3)
count=count+1

#or
print("Count->>>>" , count)
df_joined=df_left.alias("l").join(df_right.alias("r"),on=col("l.employee_id") == col("r.employee_id") , how="left_outer")
df_joined.show(3)
count=count+1

#or
print("Count->>>>" , count)
df_joined=df_left.alias("l").join(df_right.alias("r"),on=col("l.employee_id") == col("r.employee_id") , how="leftouter")
df_joined.show(3)
count=count+1

#Observations for left join:
#returns matching rows in the left and right df with values
#returns matching rows in the LEFT df with values and un matching values in the RIGHT df with nulls

#Right
print("Right Join................ ")
print("Count->>>>" , count)
df_joined=df_left.alias("l").join(df_right.alias("r"),on=col("l.employee_id") == col("r.employee_id") , how="right")
df_joined.show(3)
count=count+1

#or
print("Count->>>>" , count)
df_joined=df_left.alias("l").join(df_right.alias("r"),on=col("l.employee_id") == col("r.employee_id") , how="right_outer")
df_joined.show(3)
count=count+1

#or
print("Count->>>>" , count)
df_joined=df_left.alias("l").join(df_right.alias("r"),on=col("l.employee_id") == col("r.employee_id") , how="rightouter")
df_joined.show(3)
count=count+1
#Observations for right join:
#returns matching rows in the left and right df with values
#returns matching rows in the RIGHT df with values and un matching values in the LEFT df with nulls

#Full
print("Full Join................ ")
print("Count->>>>" , count)
df_joined=df_left.alias("l").join(df_right.alias("r"),on=col("l.employee_id") == col("r.employee_id") , how="full")
df_joined.show(3)
count=count+1

#Observations for full join:
#returns matching rows in the left and right df with values
#returns matching rows in the LEFT df with values and un matching values in the RIGHT df with nulls
#returns matching rows in the RIGHT df with values and un matching values in the LEFT df with nulls

#Special Joins (Optimized & Subject join)

#left Semi: Semi join will compare left df with right df and will return only the matching data of left df alone
#Semi join returns Same result we are getting in Inner also ? yes, but only left side data alone will be displayed
print("Left semi................")
print("Count->>>>" , count)
df_joined=df_left.alias("l").join(df_right.alias("r"),on=col("l.employee_id")==col("r.employee_id"),how="semi")
#df_joined.select("l.custid","l.lname","r.cid","r.lname").show()
df_joined.show(3)
count=count+1

#left Anti: Anti join will compare left df with right df and will return only the UN matching data of left df alone
print("Left Join................ ")
print("Count->>>>" , count)
df_joined=df_left.alias("l").join(df_right.alias("r"),on=col("l.employee_id") == col("r.employee_id") , how="ANTI")
df_joined.show(3)
count=count+1

#or
print("Count->>>>" , count)
df_joined=df_left.alias("l").join(df_right.alias("r"),on=col("l.employee_id") == col("r.employee_id") , how="left_outer")
df_joined.show(3)
count=count+1

#or
print("Count->>>>" , count)
df_joined=df_left.alias("l").join(df_right.alias("r"),on=col("l.employee_id") == col("r.employee_id") , how="leftouter")
df_joined.show(3)
count=count+1

#Right Anti join:Swap the DFs
print("Count->>>>" , count)
df_joined=df_right.alias("r").join(df_left.alias("l"),on=col("l.employee_id") == col("r.employee_id") , how="left_outer")
df_joined.show(3)
count=count+1

#Self join DSL: Joining the dataset by itself is Self join, used for hierachical joining
#Interview Question: Have you used self join in your project?
#Yes, in the case of hirachical joins for identifying the customer who referred another customer to give referal offers






# df_self=df_raw1.where("custid in (4000011,4000012,4000013,4000014,4000015)").distinct().toDF("cid","fname","lname","age","prof")
# df_self1=df_self.withColumn("ref_cid",col("cid")-1)
# df_self1.alias("l").join(df_self1.alias("r"),on=[(col("l.cid")==col("r.ref_cid"))]).selectExpr("concat(r.fname,' is referred by ',l.fname)").show(10,False)

# #Cartesian Join (avoided) - returns the multiplication of the rows between df1 and df2 hence it is a cross product
# df_joined=df_left.alias("l").join(df_right.alias("r"))
# #Observations for Cartesian join:
# #returns every permutation and combination of rows
# #If we don't have join conditions to use directly

Inner Join................ 
Count->>>> 0
+-----------+-------------+------------+
|employee_id|employee_name|employee_age|
+-----------+-------------+------------+
|          1|         John|          45|
|          2|        Alice|          30|
|          3|          Bob|          25|
+-----------+-------------+------------+
only showing top 3 rows

Count->>>> 1
+-----------+-------------+------------+
|employee_id|employee_name|employee_age|
+-----------+-------------+------------+
|          1|         John|          45|
|          2|        Alice|          30|
|          3|          Bob|          25|
+-----------+-------------+------------+
only showing top 3 rows

Count->>>> 2
+-----------+-------------+------------+-----------+----------+------+
|employee_id|employee_name|employee_age|employee_id|department|salary|
+-----------+-------------+------------+-----------+----------+------+
|          1|         John|          45|          1|     Sales| 75000|
|          2|        Alic

In [0]:
dbutils.fs.mkdirs("/FileStore/BB2/Other_Data")

Out[101]: True