#Variable columns

In [0]:
dbutils.fs.put("/scenario_files/dynamicolumns.csv",
"""
id,name,loc,age,sex
1,ravi,bangalore,33,m
2,raj,chennai
3,mohan
4,prasad,hyderabad,35
5,sridhar,chennai
""",True)

In [0]:
df = spark.read.csv("/scenario_files/dynamicolumns.csv",header=True)
display(df)

id,name,loc,age,sex
1,ravi,bangalore,33.0,m
2,raj,chennai,,
3,mohan,,,
4,prasad,hyderabad,35.0,
5,sridhar,chennai,,


In [0]:
dbutils.fs.put("/scenario_files/dynamicolumns_withoutheader.csv",
"""1,ravi,bangalore
2,raj,chennai,33,m
3,mohan
4,prasad,hyderabad,35,m,787878987
5,sridhar,chennai
""",True)

In [0]:
df1 = spark.read.csv("/scenario_files/dynamicolumns_withoutheader.csv")
display(df1)

_c0,_c1,_c2
1,ravi,bangalore
2,raj,chennai
3,mohan,
4,prasad,hyderabad
5,sridhar,chennai


In [0]:
# Create Dataframe reading csv file using spark.read.text api
df1 = spark.read.text("/scenario_files/dynamicolumns_withoutheader.csv")

In [0]:
from pyspark.sql.functions import split,length,col,max,size
# Split text data using split function with comma delimieter
df3 =df1.select(split("value",",").alias("splitted_col"))

In [0]:
# Get Length of each row using size function then find max length of row for generating no of columns dynamically
df3.select('splitted_col',size('splitted_col')).show(truncate=False)

In [0]:
# Verify no of columns is going to generate this from data.
df3.select(max(size('splitted_col'))).collect()[0][0]

In [0]:
# Getting Max Index value for generating dynamic columns using max size of items at each row.
for i in range(df3.select(max(size('splitted_col'))).collect()[0][0]):
    # Dynamically Add Columns using WithColumn 
    df3=df3.withColumn('column'+str(i),df3["splitted_col"][i])
# Drop splitted_Col which is not required after splitting into individual columns
df3 = df3.drop("splitted_col")
df3.show()

In [0]:
df3=df3.withColumn('col'+str(i),df3["splitted_col"][i])
df3.show()
    

#Skip first rows

In [0]:
dbutils.fs.put("/scenario_files/empty_header.csv","""sampleline
smapleline2
sampleline3
id,name,location
1,ravi,bangalore
2,raj,chennai
3,prasad,pune
4,mahesh,hyderabad
5,sridhar,mumbai
""",True)

In [0]:
df=spark.read.csv("/scenario_files/empty_header.csv",header=True)
display(df)

sampleline
smapleline2
sampleline3
id
1
2
3
4
5


In [0]:
rdd = sc.textFile("/scenario_files/empty_header.csv")
rdd.collect()

In [0]:
 rdd.zipWithIndex().filter(lambda a:a[1]>2).map(lambda a:a[0].split(','))

In [0]:
final_rdd= rdd.zipWithIndex().filter(lambda a:a[1]>2).map(lambda a:a[0].split(','))
final_rdd.collect()

In [0]:
# get columns into list to create DataFrame
columns=final_rdd.collect()[0]
columns

In [0]:
column=final_rdd.first()
data=final_rdd.filter(lambda col:col!=column)
data.collect()

In [0]:
data.toDF(columns).show()

#drop duplicates

In [0]:
dbutils.fs.put("/scenarios/duplicates.csv","""id,name,loc,updated_date
1,ravi,bangalore,2021-01-01
1,ravi,chennai,2022-02-02
1,ravi,Hyderabad,2022-06-10
2,Raj,bangalore,2021-01-01
2,Raj,chennai,2022-02-02
3,Raj,Hyderabad,2022-06-10
4,Prasad,bangalore,2021-01-01
5,Mahesh,chennai,2022-02-02
4,Prasad,Hyderabad,2022-06-10
""")

In [0]:
df= spark.read.csv("/scenarios/duplicates.csv",header=True,inferSchema=True)
df.printSchema()
display(df)

id,name,loc,updated_date
1,ravi,bangalore,2021-01-01
1,ravi,chennai,2022-02-02
1,ravi,Hyderabad,2022-06-10
2,Raj,bangalore,2021-01-01
2,Raj,chennai,2022-02-02
3,Raj,Hyderabad,2022-06-10
4,Prasad,bangalore,2021-01-01
5,Mahesh,chennai,2022-02-02
4,Prasad,Hyderabad,2022-06-10


In [0]:
from pyspark.sql.functions import col
display(df.orderBy(col("updated_date").desc()).dropDuplicates(["id"]))

id,name,loc,updated_date
1,ravi,Hyderabad,2022-06-10
2,Raj,chennai,2022-02-02
3,Raj,Hyderabad,2022-06-10
4,Prasad,Hyderabad,2022-06-10
5,Mahesh,chennai,2022-02-02


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *
df = df.withColumn("rowid",row_number().over(Window.partitionBy("id").orderBy(col("updated_date").desc())))

In [0]:
df_uniq = df.filter("rowid=1")

In [0]:
df_baddata = df.filter("rowid>1")

In [0]:
display(df_uniq)

id,name,loc,updated_date,rowid
1,ravi,Hyderabad,2022-06-10,1
2,Raj,chennai,2022-02-02,1
3,Raj,Hyderabad,2022-06-10,1
4,Prasad,Hyderabad,2022-06-10,1
5,Mahesh,chennai,2022-02-02,1


#read all files from nested folder in pySpark dataframe

In [0]:
from pyspark.sql.functions import input_file_name
df1 = spark.read.format("csv").option("recursiveFileLookup","true").option("header", "true").load("dbfs:/FileStore/customers/")
display(df1.withColumn("filename",input_file_name()))# also get file location 

# how to get 53 week number years in pyspark extract 53rd week number in spark

In [0]:
df = spark.createDataFrame([(y-1,"01/01/"+str(y)+"") for y in range(1999,2050)],["year_num","year"])

In [0]:
from pyspark.sql.functions import to_date,weekofyear
# using to_date function we can change string format to date format
# using weekofyear function we can get week number on particular date value
df = df.withColumn("date",to_date("year","dd/MM/yyyy")).withColumn("week_number",weekofyear("date"))
display(df.filter("week_number=53"))

year_num,year,date,week_number
1998,01/01/1999,1999-01-01,53
2004,01/01/2005,2005-01-01,53
2009,01/01/2010,2010-01-01,53
2015,01/01/2016,2016-01-01,53
2020,01/01/2021,2021-01-01,53
2026,01/01/2027,2027-01-01,53
2032,01/01/2033,2033-01-01,53
2037,01/01/2038,2038-01-01,53
2043,01/01/2044,2044-01-01,53
2048,01/01/2049,2049-01-01,53


# how to get no of rows at each partition in pyspark dataframe

In [0]:
df_airlines = spark.read.csv("/databricks-datasets/asa/airlines",header=True)
df_airlines.show()

In [0]:
from pyspark.sql.functions import spark_partition_id
df_airlines = df_airlines.withColumn("PART_ID",spark_partition_id())

In [0]:
display(df_airlines.groupBy("PART_ID").count())

PART_ID,count
16,1311826
0,7453215
1,7009728
2,7141922
3,7140596
4,7129270
5,6488540
6,5967780
7,5683047
8,5527884


#How to Get no of rows from each file in pyspark dataframe

In [0]:
df_airlines = spark.read.csv("/databricks-datasets/asa/airlines/*.csv",header=True)


In [0]:
from pyspark.sql.functions import input_file_name
df_airlines = df_airlines.withColumn("FILE_NAME",input_file_name())

In [0]:
display(df_airlines.groupBy("FILE_NAME").count())

FILE_NAME,count
dbfs:/databricks-datasets/asa/airlines/2007.csv,7453215
dbfs:/databricks-datasets/asa/airlines/2008.csv,7009728
dbfs:/databricks-datasets/asa/airlines/2006.csv,7141922
dbfs:/databricks-datasets/asa/airlines/2005.csv,7140596
dbfs:/databricks-datasets/asa/airlines/2004.csv,7129270
dbfs:/databricks-datasets/asa/airlines/2003.csv,6488540
dbfs:/databricks-datasets/asa/airlines/2001.csv,5967780
dbfs:/databricks-datasets/asa/airlines/2000.csv,5683047
dbfs:/databricks-datasets/asa/airlines/1999.csv,5527884
dbfs:/databricks-datasets/asa/airlines/1997.csv,5411843


#How to add Sequence generated surrogate key as a column in dataframe. #pyspark

1)df.withColumn("ID_KEY",monotonically_increasing_id())\
2) df.withColumn("MD5_KEY",md5(col("EMPNO").cast("string")))\
3) df.withColumn("CRC32_KEY",crc32(col("EMPNO").cast("string")))\
4)df.withColumn("SHA2_KEY",sha2(col("EMPNO").cast("string"),256))\
5)df.withColumn("ROW_NUMBER",row_number().over(Window.partitionBy(lit('')).orderBy(lit(''))))\

#How to handle duplicate column errors in delta table

In [0]:
#while join take left or right 

# how to handle double delimiter or multi delimiters in pyspark

In [0]:
dbutils.fs.put("/schenarios/double_pipe.csv","""id||name||loc
1||ravi||Bangalore
2||Raj||Chennai
3||Mahesh||Hyderabad
4||Prasad||Chennai
5||Sridhar||Pune
""",True)

In [0]:
df = spark.read.csv("/schenarios/double_pipe.csv",header=True,sep="||")
display(df)

id,name,loc
1,ravi,Bangalore
2,Raj,Chennai
3,Mahesh,Hyderabad
4,Prasad,Chennai
5,Sridhar,Pune


In [0]:
dbutils.fs.put("/scenarios/multi_sep.csv","""id,name,loc,marks
1,ravi,Bangalore,35|45|55|65
2,Raj,Chennai,35|45|55|65
3,Mahesh,Hyderabad,35|45|55|65
4,Prasad,Chennai,35|45|55|65
5,Sridhar,Pune,35|45|55|65
""",True)

In [0]:
df_multi = spark.read.csv("/scenarios/multi_sep.csv",header=True)
display(df_multi)

id,name,loc,marks
1,ravi,Bangalore,35|45|55|65
2,Raj,Chennai,35|45|55|65
3,Mahesh,Hyderabad,35|45|55|65
4,Prasad,Chennai,35|45|55|65
5,Sridhar,Pune,35|45|55|65


In [0]:
%sql
select split("1|2|3|4","\\|")

"split(1|2|3|4, \|, -1)"
"List(1, 2, 3, 4)"


In [0]:
from pyspark.sql.functions import split,col
df_multi =df_multi.withColumn("marks_split",split(col("marks"),"[|]"))\
            .withColumn("SUB1",col("marks_split")[0])\
            .withColumn("SUB2",col("marks_split")[1])\
            .withColumn("SUB3",col("marks_split")[2])\
            .withColumn("SUB4",col("marks_split")[3]).drop("marks_split","marks")
display(df_multi)

id,name,loc,SUB1,SUB2,SUB3,SUB4
1,ravi,Bangalore,35,45,55,65
2,Raj,Chennai,35,45,55,65
3,Mahesh,Hyderabad,35,45,55,65
4,Prasad,Chennai,35,45,55,65
5,Sridhar,Pune,35,45,55,65


#Q1 Covert to date format and extract year from it
#to_date("datecol","dd-MM-yyyy")-->Covert to date format
#date_format("datecol","yyyy")-->get year from date