### Data Reading Json

In [0]:
dbutils.fs.ls("/FileStore/tables")

In [0]:
df_json = (
    spark.read.format("json")
    .option("inferSchema", True)
    .option("header", True)
    .option("multiline", False)
    .load("/FileStore/tables/drivers.json")
)

In [0]:
df_json.display()

### Data reading

In [0]:
dbutils.fs.ls("/FileStore/tables")

In [0]:
df = (
    spark.read.format("CSV")
    .option("inferSchema", True)
    .option("header", True)
    .load("/FileStore/tables/BigMart_Sales.csv")
)

In [0]:
df.display()

### Schema definition

In [0]:
df.printSchema()

### DDL Schema

changed Item_weight from double to string using custom schema

In [0]:
my_ddl_schema = """
                    Item_Identifier STRING,
                    Item_Weight STRING, 
                    Item_Fat_Content STRING, 
                    Item_Visibility DOUBLE, 
                    Item_Type STRING,
                    Item_MRP DOUBLE,
                    Outlet_Identifier STRING, 
                    Outlet_Establishment_Year INT,
                    Outlet_Size STRING ,
                    Outlet_Location_Type STRING, 
                    Outlet_Type STRING, 
                    Item_Outlet_Sales DOUBLE 
                  """

In [0]:
df = (
    spark.read.format("CSV")
    .schema(my_ddl_schema)
    .option("header", True)
    .load("/FileStore/tables/BigMart_Sales.csv")
)

In [0]:
df.display()

In [0]:
df.printSchema()

### StructType() Schema

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
my_strct_schema = StructType(
    [
        StructField("Item_Identifier", StringType(), True),
        StructField("Item_Weight", StringType(), True),
        StructField("Item_Fat_Content", StringType(), True),
        StructField("Item_Visibility", StringType(), True),
        StructField("Item_MRP", StringType(), True),
        StructField("Outlet_Identifier", StringType(), True),
        StructField("Outlet_Establishment_Year", StringType(), True),
        StructField("Outlet_Size", StringType(), True),
        StructField("Outlet_Location_Type", StringType(), True),
        StructField("Outlet_Type", StringType(), True),
        StructField("Item_Outlet_Sales", StringType(), True),
    ]
)

In [0]:
df = (
    spark.read.format("csv")
    .schema(my_strct_schema)
    .option("header", True)
    .load("/FileStore/tables/BigMart_Sales.csv")
)

In [0]:
df = (
    spark.read.format("csv")
    .option("inferSchema", True)
    .option("header", True)
    .load("/FileStore/tables/BigMart_Sales.csv")
)

In [0]:
df.printSchema()

In [0]:
df.display()

### Select

In [0]:
df.select(col("Item_Identifier")).display()

In [0]:
df.select("Item_Identifier", "Item_Type").display()

In [0]:
df.select(col("Item_Identifier"), col("Item_Type")).display()

### Alias

In [0]:
df.select(col("Item_Identifier").alias("Item_ID")).display()

### Filter

In [0]:
df.filter(col("Item_type") == "Dairy").display()

In [0]:
df.filter((col("Item_Type") == "Meat") & (col("Item_Weight") >= 8)).display()

In [0]:
df.filter(
    (col("Outlet_Size").isNull())
    & (col("Outlet_Location_Type").isin("Tier 1", "Tier 2"))
).display()

### Column rename

In [0]:
df.withColumnRenamed("Item_Weight", "Item_Wt").display()

### WithColumn

In [0]:
df = df.withColumn("Flag", lit("New"))
df.display()

adding to df for saving this in the data frame

In [0]:
df.withColumn("Multiply", col("Item_Weight") * col("Item_MRP")).display()

In [0]:
df.withColumn(
    "Item_Fat_Content", regexp_replace(col("Item_Fat_Content"), "Regular", "Reg")
).withColumn(
    "Item_Fat_Content", regexp_replace(col("Item_Fat_Content"), "Low Fat", "Lf")
).display()

### TypeCasting

In [0]:
df.withColumn("Item_Weight", col("Item_Weight").cast(StringType())).printSchema()

### Sorting/Ordering

In [0]:
df.sort(col("Item_Identifier").asc()).display()

In [0]:
df.sort(col("Item_Identifier").desc()).display()

In [0]:
df.sort(["Item_identifier", "Item_Weight"], ascending=[0, 0]).display()

0 is fasle, 1 is true

In [0]:
df.sort(["Item_identifier", "Item_Weight"], ascending=[0, 1]).display()

### Limit

In [0]:
df.limit(10).display()

In [0]:
df.sort(col("Item_Weight").desc()).limit(3).display()

### Drop

In [0]:
df.drop("Item_Weight").display()

In [0]:
df.drop("Item_Weight", "Item_MRP").display()

### Drop Duplicates

In [0]:
df.dropDuplicates().display()

In [0]:
df.distinct().display()

In [0]:
df.drop_duplicates(["Item_Type"]).display()

adding "subest" is optional in syntax

In [0]:
df.drop_duplicates(subset=["Item_Type", "Item_Visibility"]).display()

### Union / Union By Name

In [0]:
data1 = [("1", "abc"), ("2", "def")]
sc1 = "Id STRING , Name String"

df1 = spark.createDataFrame(data1, sc1)

data2 = [("3", "ghi"), ("4", "jkl")]
sc2 = "Id STRING , Name String"

df2 = spark.createDataFrame(data2, sc2)

data3 = [("mno", "5"), ("pqr", "6")]
sc3 = "Name STRING , Id String"

df3 = spark.createDataFrame(data3, sc3)


df1.display()
df2.display()
df3.display()

In [0]:
df1.union(df2).display()
df2.union(df1).display()

In [0]:
df1.union(df3).display()
df3.union(df1).display()

In [0]:
df1.unionByName(df3).display()
df3.unionByName(df2).display()

### String Functions

In [0]:
df = (
    spark.read.format("csv")
    .option("inferSchema", True)
    .option("header", True)
    .load("/FileStore/tables/BigMart_Sales.csv")
)

from pyspark.sql.types import *
from pyspark.sql.functions import *

### initcap() / lower() / upper()

In [0]:
df.select(initcap("Item_Type")).display()
df.select(lower("Item_Type")).display()
df.select(upper("Item_Type")).display()

### Date Functions

#### current_date() / date-add() / date_sub() / datediff() / date_formate()

In [0]:
df = df.withColumn("Cur_date", current_date())
df.display()

In [0]:
df = df.withColumn("weeek_after", date_add("Cur_date", 7))
df.display()

In [0]:
df.withColumn("weeek_before", date_sub("Cur_date", 7)).display()

In [0]:
df = df.withColumn("weeek_before", date_add("Cur_date", -7))
df.display()

In [0]:
df = df.withColumn("date_diff", datediff("weeek_after", "weeek_before"))
df.display()

In [0]:
df.withColumn("weeek_before", date_format("weeek_before", "dd-MM-yyyy")).display()

### Handling Nulls

#### dropna() / fillna()

In [0]:
df.dropna().display()
df.dropna("all").display()
df.dropna("any").display()

In [0]:
df.dropna(subset=["Outlet_Size"]).display()

In [0]:
df.fillna("Not available").display()

In [0]:
df.fillna("Not available", subset=["Outlet_Size"]).display()

#### Split and Index

In [0]:
df.withColumn("Outlet_Type", split("Outlet_Type", " ")).display()  # gives raw list

df.withColumn("Outlet_Type", split("Outlet_Type", " ")[1]).display()

### Explode

In [0]:
df_exp = df.withColumn("Outlet_Type", split("Outlet_Type", " "))
df_exp.display()

In [0]:
df_exp.withColumn("Outlet_Type", explode("Outlet_Type")).display()

expolde() splits a row into differnet rows based on the array in a column

#### array_contains()

In [0]:
df_exp.withColumn("Outlet_Type", array_contains("Outlet_Type", "Type1")).display()

returns a boolean value (either true or false)

### groupBy

In [0]:
df.groupBy("Item_Type").agg(sum("Item_MRP")).display()
df.groupBy("Item_Type").agg(sum("Item_MRP").alias("Total_MRP")).display()

In [0]:
df.groupBy("Item_Type").agg(avg("Item_MRP")).display()
df.groupBy("Item_Type").agg(avg("Item_MRP").alias("MRP_Avg")).display()

In [0]:
df.groupBy("Item_Type", "Outlet_Size").agg(sum("Item_MRP").alias("Total_MRP")).display()

In [0]:
df.groupBy("Item_Type", "Outlet_Size").agg(sum("Item_MRP"), avg("Item_MRP")).display()
df.groupBy("Item_Type", "Outlet_Size").agg(
    sum("Item_MRP").alias("Sum_MRP"), avg("Item_MRP").alias("Avg_MRP")
).display()

In [0]:
df.dropna().groupBy("Item_Type", "Outlet_Size").agg(
    sum("Item_MRP").alias("Sum_MRP"), avg("Item_MRP").alias("Avg_MRP")
).display()

df.fillna("Not available").groupBy("Item_Type", "Outlet_Size").agg(
    sum("Item_MRP").alias("Sum_MRP"), avg("Item_MRP").alias("Avg_MRP")
).display()

### Collect List

In [0]:
data = [
    ("user1", "book1"),
    ("user1", "book2"),
    ("user2", "book2"),
    ("user2", "book4"),
    ("user3", "book1"),
]

schema = "user string, book string"

df_book = spark.createDataFrame(data, schema)

df_book.display()

In [0]:
df_book.groupBy("user").agg(collect_list("book")).display()

In [0]:
df.select("Item_Type", "Outlet_Size", "Item_MRP").display()

### Pivot

In [0]:
df.groupBy("Item_Type").pivot("Outlet_Size").agg(
    avg("Item_MRP").alias("Average_MRP")
).display()

In [0]:
df.dropna().groupBy("Item_Type").pivot("Outlet_Size").agg(
    avg("Item_MRP").alias("Average_MRP")
).display()

In [0]:
df.groupBy("Item_Type").pivot("Outlet_Size").agg(
    avg("Item_MRP").alias("Average_MRP"), sum("Item_MRP").alias("Total_MRP")
).display()

### When-Otherwise

In [0]:
df = df.withColumn(
    "Veg_flag", when(col("Item_Type") == "Meat", "Non_Veg").otherwise("Veg")
)

In [0]:
df.display()

In [0]:
df.withColumn(
    "Veg_Expense",
    when((col("Veg_flag") == "Veg") & (col("Item_MRP") < 100), "Veg_inexpensive")
    .when((col("Veg_flag") == "Veg") & (col("Item_MRP") > 100), "Veg_expensive")
    .otherwise("Non_veg"),
).display()

### Joins

In [0]:
dataj1 = [
    ("1", "gaur", "d01"),
    ("2", "kit", "d02"),
    ("3", "sam", "d03"),
    ("4", "tim", "d03"),
    ("5", "aman", "d05"),
    ("6", "nad", "d06"),
]

schemaj1 = "emp_id STRING, emp_name STRING, dept_id STRING"

df1 = spark.createDataFrame(dataj1, schemaj1)

dataj2 = [
    ("d01", "HR"),
    ("d02", "Marketing"),
    ("d03", "Accounts"),
    ("d04", "IT"),
    ("d05", "Finance"),
]

schemaj2 = "dept_id STRING, department STRING"

df2 = spark.createDataFrame(dataj2, schemaj2)

df1.display()
df2.display()

In [0]:
df1.join(df2).display()
df1.join(df2, df1["dept_id"] == df2["dept_id"], "inner").display()

In [0]:
df1.join(df2, df1["dept_id"] == df2["dept_id"], "left").display()

In [0]:
df1.join(df2, df1["dept_id"] == df2["dept_id"], "right").display()

In [0]:
df1.join(df2, df1["dept_id"] == df2["dept_id"], "anti").display()

### Window Functions

In [0]:
from pyspark.sql.window import Window

##### row_number() it gives unique id based on a column data

In [0]:
df.withColumn("Row_Id", row_number().over(Window.orderBy("Item_Identifier"))).display()

##### Rank and derank

In [0]:
df.withColumn(
    "rank", rank().over(Window.orderBy(col("Item_Identifier").desc()))
).display()

In [0]:
df.withColumn(
    "dense_rank", dense_rank().over(Window.orderBy(col("Item_Identifier").desc()))
).display()

df.withColumn(
    "dense_rank", dense_rank().over(Window.orderBy(col("Item_Identifier").asc()))
).display()

asc() and desc orders the column that we comapre to

In [0]:
df.withColumn(
    "rank", rank().over(Window.orderBy(col("Item_Identifier").desc()))
).withColumn(
    "denseRank", dense_rank().over(Window.orderBy(col("Item_Identifier").desc()))
).display()

#### cumulative sum

In [0]:
df.withColumn("CumSum", sum("Item_MRP").over(Window.orderBy("Item_Type"))).display()

In [0]:
df.withColumn(
    "CumSum",
    sum("Item_MRP").over(
        Window.orderBy("Item_Type").rowsBetween(
            Window.unboundedPreceding, Window.currentRow
        )
    ),
).display()

In [0]:
df.withColumn(
    "CumSum",
    sum("Item_MRP").over(
        Window.orderBy("Item_Type").rowsBetween(
            Window.unboundedPreceding, Window.unboundedFollowing
        )
    ),
).display()

In [0]:
df.withColumn(
    "CumSum",
    sum("Item_MRP").over(
        Window.orderBy("Item_Type").rowsBetween(
            Window.currentRow, Window.unboundedFollowing
        )
    ),
).display()

### User Defined Functions(UDF)

In [0]:
def my_func(x):
  return x*x

my_udf=udf(my_func)

In [0]:
df.withColumn('NewCol',my_udf('Item_MRP')).display()

### DATA Writing

##### CSV ( append / overwrite /error / ignore)

In [0]:
df.write.format('csv').save('/FileStore/tables/CSV/data.csv')



In [0]:
df.write.format('csv').mode('append').save('/FileStore/tables/CSV/data.csv')

In [0]:
df.write.format('csv').mode('append').option('path','/FileStore/tables/CSV/data.csv').save()

In [0]:
df.write.format('csv').mode('overwrite').option('path','/FileStore/tables/CSV/data.csv').save()

In [0]:
df.write.format('csv').mode('error').option('path','/FileStore/tables/CSV/data.csv').save()

In [0]:
df.write.format('csv').mode('ignore').option('path','/FileStore/tables/CSV/data.csv').save()

#### PARQUET and saveAsTable

In [0]:
df.write.format('parquet').mode('overwrite').option('path','/FileStore/tables/CSV/data.csv').save()

In [0]:
df.write.format('parquet').mode('overwrite').saveAsTable('My_Table')
df.display()

### SPARK SQL

In [0]:
df.createTempView('my_view')

In [0]:
%sql

select * from my_view where Item_Fat_Content = 'Regular'

In [0]:
df_sql= spark.sql("select * from my_view where Item_Fat_Content = 'Regular'")

df_sql.display()