### Data Reading JSON

In [0]:
df_json=spark.read.format("json").option("inferschema",True).option("header",True).option("multiline",False).load("/Volumes/filestore/bigmartsales/data/drivers.json")

In [0]:
df_json.display()

### Data Reading

In [0]:
df=spark.read.format("csv").option("header",True).option("inferSchema",True).load("/Volumes/filestore/bigmartsales/data/BigMart Sales.csv")

### Schema Definition

In [0]:
df.printSchema()

###  DDL Schema

In [0]:
my_ddl_schema='''
  Item_Identifier string,
  Item_Weight double,
  Item_Fat_Content string,
  Item_Visibility double,
  Item_Type string,
  Item_MRP double,
  Outlet_Identifier string,
  Outlet_Establishment_Year integer,
  Outlet_Size string,
  Outlet_Location_Type string,
  Outlet_Type string,
  Item_Outlet_Sales double
'''

In [0]:
df=spark.read.format("csv")\
    .schema(my_ddl_schema)\
        .option("header",True)\
            .load("/Volumes/filestore/bigmartsales/data/BigMart Sales.csv")

In [0]:
df.printSchema()

### StructType() Schema

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
my_struct_schema=StructType([
                            StructField("Item_Identifier",StringType(),True),
                            StructField("Item_Weight",DoubleType(),True),
                            StructField("Item_Fat_Content",StringType(),True),
                            StructField("Item_Visibility",DoubleType(),True),
                            StructField("Item_Type",StringType(),True),
                            StructField("Item_MRP",DoubleType(),True),
                            StructField("Outlet_Identifier",StringType(),True),
                            StructField("Outlet_Establishment_Year",IntegerType(),True),
                            StructField("Outlet_Size",StringType(),True),
                            StructField("Outlet_Location_Type",StringType(),True),
                            StructField("Outlet_Type",StringType(),True),
                            StructField("Item_Outlet_Sales",DoubleType(),True)
                            
])

In [0]:
df=spark.read.format("csv")\
    .schema(my_struct_schema)\
        .option("header",True)\
            .load("/Volumes/filestore/bigmartsales/data/BigMart Sales.csv")

In [0]:
df.printSchema()

### SELECT

In [0]:
df_sel=df.select(col("Item_Identifier"),col("Item_Weight"),col("Item_Fat_Content"))

### Alias


In [0]:
df.select(col("Item_Identifier").alias("Item_Id"))

### FILTER

###Scenario1

In [0]:
df.filter(col("Item_Fat_Content")=="Regular")

### Scenario-2


In [0]:
df.filter((col("Item_type")=="Soft Drinks")& (col("Item_Weight")<10))

### scenario-3


In [0]:
df.filter((col("Outlet_Location_Type").isin("Tier 1","Tier 2"))&(col("Outlet_Size").isNull()))

In [0]:
df.withColumnRenamed("Item_Weight","Item_Wt")

In [0]:
df=df.withColumn("flag",lit("new"))

In [0]:
df.withColumn("multiply",col("Item_Weight")*col("Item_MRP"))

In [0]:
df.withColumn("Item_Fat_Content",regexp_replace(col("Item_Fat_Content"),"Regular","reg"))\
.withColumn("Item_Fat_Content",regexp_replace(col("Item_Fat_Content"),"Low Fat","LF")).display()

In [0]:
df.withColumn("Item_Weight",col("Item_Weight").cast(DoubleType()))

In [0]:
df.printSchema()

In [0]:
df.sort(col("Item_Weight").desc()).display()

In [0]:
df.sort(["Item_Weight","Item_Visibility"],ascending=[0,0]).display()

In [0]:
df.limit(10).display()

In [0]:
df.drop("Item_Visibility","Item_MRP").display()

In [0]:
df.dropDuplicates().display()

In [0]:
df.drop_duplicates(subset=["Item_Type"]).display()

In [0]:
df.distinct().display()

In [0]:
data1 = [('kad','1'),
        ('sid','2')]
schema1 = 'name STRING, id STRING' 

df1 = spark.createDataFrame(data1,schema1)

data2 = [('3','rahul'),
        ('4','jas')]
schema2 = 'id STRING, name STRING' 

df2 = spark.createDataFrame(data2,schema2)


In [0]:
data1=[("1","Kid")]

In [0]:
df1.display()

In [0]:
df2.display()

In [0]:
df1.union(df2).display()

In [0]:
df1.union(df2).display()

In [0]:
df1.unionByName(df2).display()

In [0]:
df.select(lower("Item_Type").alias("Upper_Item_Type")).display()

In [0]:
df=df.withColumn("Current_date",current_date())
df.display()


In [0]:
df=df.withColumn("Week_after",date_add("Current_date",7))
df.display()

In [0]:
df=df.withColumn("Week_before",date_sub("Current_date",7))
df.display()

In [0]:
df=df.withColumn("date_diff",date_diff("week_after","Current_date"))
df.display()

In [0]:
df=df.withColumn("Week_before",date_format("Week_before","dd-MM-yyyy"))
df.display()

In [0]:
df.dropna("all").display()

In [0]:
df.dropna("any").display()

In [0]:
df.dropna(subset="Outlet_Size").display()

In [0]:
df.fillna("Not Available").display()

In [0]:
df.fillna("Not available",subset=["Outlet_Size"]).display()

In [0]:
df.withColumn("Outlet_Type",split("Outlet_Type"," ")).display()

In [0]:
df.withColumn("Outlet_Type",split("Outlet_Type"," ")[1]).display()

In [0]:
df_exp=df.withColumn("Outlet_Type",split("Outlet_Type"," "))
df_exp.withColumn("Outlet_Type",explode("Outlet_Type")).display()

In [0]:
df_exp.withColumn("Type1_Encounter",array_contains("Outlet_Type","Type1")).display()

In [0]:
df.display()

In [0]:
df.groupBy("Item_Type").agg(sum("Item_MRP")).display()

In [0]:
df.groupBy("Item_Type").agg(avg("Item_MRP")).display()

In [0]:
df.groupBy("Item_Type","Outlet_Size").agg(sum("Item_MRP").alias("Total_MRP")).display()

In [0]:
df.groupBy("Item_Type","Outlet_Size").agg(sum("Item_MRP").alias("Total_Sum"),avg("Item_MRP").alias("Avg_Sum")).display()

In [0]:
data = [('user1','book1'),
        ('user1','book2'),
        ('user2','book2'),
        ('user2','book4'),
        ('user3','book1')]

schema = 'user string, book string'

df_book = spark.createDataFrame(data,schema)

df_book.display()

In [0]:
df_book.groupBy("user").agg(collect_list("book")).display()

### pivot

In [0]:
df.display()

In [0]:
df.groupBy("Item_Type").pivot("Outlet_Size").agg(avg("Item_MRP")).display()

In [0]:
df=df.withColumn("Veg_Flag",when(col("Item_Type")=="Meat","Non_Veg").otherwise("Veg"))

In [0]:
df.display()

In [0]:
df.withColumn(
    "Veg_Exp_Flag",
    when(
        (col("Veg_Flag") == "Veg") & (col("Item_MRP") > 100), "Veg_Expensive"
    )
    .when(
        (col("Veg_Flag") == "Veg") & (col("Item_MRP") < 100), "Veg_Inexpensive"
    )
    .otherwise("Non_Veg")
).display()

In [0]:
dataj1 = [('1','gaur','d01'),
          ('2','kit','d02'),
          ('3','sam','d03'),
          ('4','tim','d03'),
          ('5','aman','d05'),
          ('6','nad','d06')] 

schemaj1 = 'emp_id STRING, emp_name STRING, dept_id STRING' 

df1 = spark.createDataFrame(dataj1,schemaj1)

dataj2 = [('d01','HR'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]

schemaj2 = 'dept_id STRING, department STRING'

df2 = spark.createDataFrame(dataj2,schemaj2)

In [0]:
df1.join(df2,df1["dept_id"]==df2["dept_id"],"inner").display()

In [0]:
df1.join(df2,df1["dept_id"]==df2["dept_id"],"left").display()

In [0]:
df1.join(df2,df1["dept_id"]==df2["dept_id"],"right").display()

In [0]:
df1.join(df2,df1["dept_id"]==df2["dept_id"],"anti").display()


### Window Function

In [0]:
from pyspark.sql.window import Window

In [0]:
df.withColumn("rowcol",row_number().over(Window.orderBy("Item_Identifier"))).display()

In [0]:

df.withColumn('rank',rank().over(Window.orderBy(col('Item_Identifier').desc())))\
        .withColumn('denseRank',dense_rank().over(Window.orderBy(col('Item_Identifier').desc()))).display()
     

In [0]:
df.withColumn("CumSum",sum("Item_MRP").over(Window.orderBy("Item_Type").rowsBetween(Window.unboundedPreceding, Window.currentRow))).display()

### USER DEFINED FUNCTIONS

In [0]:
def func(x):
    return x*x

In [0]:
my_udf=udf(func)

In [0]:
df.withColumn("Square",my_udf("Item_MRP")).display()

###Data Writing

In [0]:
df.write.format("csv")\
    .save("/Volumes/filestore/bigmartsales/data/data2.csv")

In [0]:
df.write.format("csv").mode("append").option("path","/Volumes/filestore/bigmartsales/data/data2.csv").save()

In [0]:
df.write.format("csv")\
    .mode("overwrite")\
        .option("path","/Volumes/filestore/bigmartsales/data/data1.csv").save()



In [0]:
df.write.format("csv")\
    .mode("overwrite")\
    .option("path","/Volumes/filestore/bigmartsales/data/data1.csv")\
        .save()

In [0]:

df.write.format('parquet')\
.mode('overwrite')\
.option('path',"/Volumes/filestore/bigmartsales/data/data1.csv")\
.save()

### TABLE

In [0]:
df.write.format('delta') \
    .mode('overwrite') \
    .saveAsTable('my_table')

### spark sql

In [0]:
df.createTempView("my_file")

In [0]:
%sql
select * from my_file where Item_Fat_Content=="Low Fat"

In [0]:
df_sql = spark.sql("select * from my_file where Item_Fat_Content = 'Low Fat'")

In [0]:
df_sql.display()