#read csv file

In [0]:
df_csv = spark.read.format("csv").load("/FileStore/global2ksample.csv", header=True, inferSchema=True)
#to limit and see first 3 rows
df_csv.limit(3).display()
#filter a record
df_csv.where("priority_rank=='2'").display()
#display all columns
df_csv.columns
#change column type using select for only specific columns
from pyspark.sql.types import IntegerType
df_csv_new = df_csv.select(df_csv.priority_rank.cast(IntegerType()))
df_csv_new.printSchema()

#change column using with to modify in same dataframe
df_csv = df_csv.withColumn("priority_rank",df_csv["priority_rank"].cast(IntegerType()))
df_csv.printSchema()


#read Json file

In [0]:
#for multiline json output use this
df_json = spark.read.option("multiline","true").json("/FileStore/test_response.json")
df_json.limit(3).display()
#to view all schema
df_json.printSchema()

create partition by date

In [0]:
#adding new column
from pyspark.sql.functions import *
df_csv = df_csv.withColumn("created_date",current_date())
df_csv = df_csv.withColumn("YEAR",date_format("created_date",'yyyy'))

df_csv.display()
df_csv.printSchema()



#creating partition by year
df_csv.write.format('delta') \
            .mode('overwrite') \
            .partitionBy('YEAR').saveAsTable("test_partition_table")

In [0]:
%fs ls user/hive/warehouse/test_partition_table

In [0]:
%sql
select * from test_partition_table where YEAR='2022'

#Handle datafile with variable columns

In [0]:
#lets create a file with random columns
dbutils.fs.put("/variable_column.csv","""1,sree
1,sreejish,data,engineer
1,sreejsh,data""")

In [0]:
#read it as text file so all columns comes under one field
df1 = spark.read.text("/variable_column.csv")
df1.display()

In [0]:
#create a split columns using split function
from pyspark.sql.functions import split
df1 = df1.withColumn("splittable_column",split("value",",")).drop("value")#dropping value columns
df1.display()

In [0]:
#find maximum number of columns, use size function
from pyspark.sql.functions import size,max
#truncate False used to show all, get max size of number of col
df1.select(max(size("splittable_column"))).show(truncate=False)
#loop through and create col and ad value
for i in range(df1.select(max(size("splittable_column"))).collect()[0][0]):
    df1 = df1.withColumn("col"+str(i),df1['splittable_column'][i])
df1.drop("splittable_column").display()

#skip first few rows from file

In [0]:
dbutils.fs.put("/skiprows.csv","""line1
line2
line3
user,role,exp
sree,databricks,2
sree,Azure,3
sree,GCP,1""")


In [0]:
rdd = sc.textFile("/skiprows.csv")
#zipWithIndex function to create a sequence, split the extra rows using the below function
rdd_final=rdd.zipWithIndex().filter(lambda a:a[1]>2).map(lambda a:a[0].split(","))



In [0]:
col= rdd_final.collect()[0] #get only columns
#skip first row
skipline=rdd_final.first()

rdd_final.filter(lambda a:a!=skipline).toDF(col).show()

#remove duplicates from Dataframe

In [0]:
df_dup = spark.read.csv("/FileStore/global2ksample.csv", header=True)

In [0]:
from pyspark.sql.functions import col
df_dup.orderBy(col("priority_rank").desc()).dropDuplicates(["country"]).display()