### Reading Data Json Format

In [0]:
df_json = spark.read.format('json').option('inferSchema', True).option('header', True).option('multiLine', False).load('/Volumes/workspace/default/big_data/drivers.json')

In [0]:
df_json.display()

### Reading Data

In [0]:
dbutils.fs.ls('/Volumes/workspace/default/big_data/')

In [0]:
df = spark.read.format('csv').option('inferSchema', True).option('header', True).load('/Volumes/workspace/default/big_data/BigMart Sales.csv')

In [0]:
df.display()

### Schema Definition

In [0]:
df.printSchema()

#### DDL Schema

In [0]:
my_ddl_schema = '''
                    Item_Identifier STRING,
                    Item_Weight STRING,
                    Item_Fat_Content STRING,
                    Item_Visibility DOUBLE,
                    Item_Type STRING,
                    Item_MRP DOUBLE,
                    Outlet_Identifier STRING,
                    Outlet_Establishment_Year INT,
                    Outlet_Size STRING,
                    Outlet_Location_Type STRING,
                    Outlet_Type STRING,
                    Item_Outlet_Sales DOUBLE
'''

In [0]:
df = spark.read.format('csv').schema(my_ddl_schema).option('header', True).load('/Volumes/workspace/default/big_data/BigMart Sales.csv')

In [0]:
df.printSchema()

#### structType() Schema

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
my_struct_schema = StructType([
                                StructField('Item_Identifier', StringType(), True),
                                StructField('Item_Weight', StringType(), True),
                                StructField('Item_Fat_Content', StringType(), True),
                                StructField('Item_Visibility', StringType(), True),
                                StructField('Item_Type', StringType(), True),
                                StructField('Item_MRP', StringType(), True),
                                StructField('Outlet_Identifier', StringType(), True),
                                StructField('Outlet_Establishment_Year', StringType(), True),
                                StructField('Outlet_Size', StringType(), True),
                                StructField('Outlet_Location_Type', StringType(), True),
                                StructField('Outlet_Type', StringType(), True),
                                StructField('Item_Outlet_Sales', StringType(), True)
])

In [0]:
df = spark.read.format('csv').schema(my_struct_schema).option('header', True).load('/Volumes/workspace/default/big_data/BigMart Sales.csv')

In [0]:
df.printSchema()

In [0]:
df = spark.read.format('csv').option('inferSchema', True).option('header', True).load('/Volumes/workspace/default/big_data/BigMart Sales.csv')

In [0]:
df.printSchema()

### Data Transformation

#### SELECT Statement

In [0]:
df.select('Item_Identifier', 'Item_Weight', 'Item_Fat_Content').display()

In [0]:
df.select(col('Item_Identifier'), col('Item_Weight'), col('Item_Fat_Content')).display()

#### Alias

In [0]:
df.select(col('Item_Identifier').alias('Item_ID')).display()

#### Filter

In [0]:
df.filter(col('Item_Fat_Content') == 'Regular').display()

In [0]:
df.filter( (col('Item_Type') == 'Soft Drinks') & (col('Item_Weight') < 10) ).display()

In [0]:
df.filter( (col("Outlet_Size").isNull()) & (col("Outlet_Location_Type").isin("Tier 1", "Tier 2")) ).display()

#### Column Renamed

In [0]:
# Rename Single Column
df.withColumnRenamed('Item_Weight', 'Item_Wt').display()

In [0]:
# Rename Multiple Columns
df.withColumnsRenamed({"Item_Identifier": "Item_ID", "Item_Weight": "Item_Wt"}).display()

#### withColumn

In [0]:
# Add New Column
df = df.withColumn('flag', lit('new'))

df.display()

In [0]:
# Add Result Column
df.withColumn('Multiply', col("Item_MRP")*col("Item_Weight")).display()

In [0]:
# Modify Exist Columns
df.withColumn("Item_Fat_Content", regexp_replace(col("Item_Fat_Content"), 'Regular', 'Reg')).display()

In [0]:
# Modify multiple Record in Single Column 
df.withColumn("Item_Fat_Content", 
    regexp_replace(
        regexp_replace(col("Item_Fat_Content"), "Regular", "Reg"), 
        "Low Fat", "LF"
    )
).display()

#### Typpe Casting

In [0]:
df = df.withColumn('Item_Weight', col('Item_Weight').cast(StringType()))

In [0]:
df.printSchema()

#### Sort

In [0]:
df.sort(col('Item_Weight').desc()).display()

In [0]:
df.sort(col('Item_Visibility').asc()).display()

In [0]:
df.sort(["Item_Weight", "Item_Visibility"], ascending=[0,0]).display()

In [0]:
df.sort(["Item_Weight", "Item_Visibility"], ascending=[0,1]).display()

#### Limit

In [0]:
df.limit(10).display()

#### Drop

In [0]:
df.drop("Item_Weight").display()

In [0]:
df.drop("Item_Weight", "Item_Visibility").display()

#### Drop Duplicates

In [0]:
df.dropDuplicates().display()

In [0]:
df.drop_duplicates(subset=["Item_Type"]).display()

In [0]:
df.distinct().display()

#### UNION and UNION By Name

In [0]:
data1 = [('1', 'MAR'), ('2', 'CAM')]
schema1 = 'id STRING, name STRING'
df1 = spark.createDataFrame(data1, schema1)

data2 = [('3', 'ALG'), ('4', 'NGR')]
schema2 = 'id STRING, name STRING'
df2 = spark.createDataFrame(data2, schema2)

In [0]:
df1.display()

In [0]:
df2.display()

##### UNION

In [0]:
df1.union(df2).display()

In [0]:
data1_1 = [ ('MAR', '1'), ('CAM', '2')]
schema1_1 = 'name STRING, id STRING'

df1_1 = spark.createDataFrame(data1_1, schema1_1)

df1_1.display()

In [0]:
df1_1.union(df2).display()

In [0]:
df1_1.unionByName(df2).display()

#### STRING Function

##### initcap()

In [0]:
df.select(initcap(col("Item_Type")).alias('initcap_Item_Type')).display()

##### upper()

In [0]:
df.select(upper(col("Item_Type")).alias('upper_Item_Type')).display()

##### lower

In [0]:
df.select(lower(col("Item_Type")).alias('lower_Item_Type')).display()

#### Date Function

##### current_date()

In [0]:
df = df.withColumn('current_date', current_date())

df.display()

##### date_add()

In [0]:
df = df.withColumn('week_after', date_add(col('current_date'), 7))
df.display()

##### date_sub()

In [0]:
df.withColumn('week_befor', date_sub(col("current_date"), 7)).display()

In [0]:
df = df.withColumn('week_before', date_add(col("current_date"), -7))

df.display()

##### DateDiff()

In [0]:
df = df.withColumn('DateDiff', datediff("week_after", "current_date"))

df.display()

##### date_format()

In [0]:
df = df.withColumn('week_before', date_format('week_before', 'dd-MMM-yyyy'))

df.display()


#### Handling NULLs Values

##### Drop NULLs Values

In [0]:
# drop the records who have the null values in all Columns
df.dropna('all').display()

In [0]:
# drop the records wich hav the null values in any of the columns
df.dropna('any').display()

In [0]:
# drop the records who have a null values in specefics column
df.dropna(subset=["Outlet_Size"]).display()

##### Filling NULLs Values

In [0]:
# Replace NULLs Values in all each existing nuuls values in datasets
df.fillna('Not Availables').display()

In [0]:
# Replace NULLs Values in specifecs column
df.fillna("Not Availables", subset=["Outlet_Size"]).display()

#### Split and Indexing

##### Split()

In [0]:
df.withColumn("Outlet_Type", split("Outlet_Type", ' ')).display()

##### Indexing

In [0]:
df.withColumn("Outlet_Type", split("Outlet_Type", ' ')[1]).display()

#### Explode

In [0]:
df_exp = df.withColumn("Outlet_Type", split("Outlet_Type", " "))

df_exp.display()

In [0]:
df_exp.withColumn("Outlet_Type", explode("Outlet_Type")).display()

In [0]:
df_exp.withColumn("Flag_Type", array_contains("Outlet_Type", "Type1")).display()

#### Group By

In [0]:
df.groupBy("Item_Type").agg(sum("Item_MRP").alias("MRP_Total")).display()

In [0]:
df.groupBy("Item_Type").agg(avg("Item_MRP").alias("MRP_Average")).display()

#### Collect_List

In [0]:
data = [('user1','book1'),
        ('user1','book2'),
        ('user2','book2'),
        ('user2','book4'),
        ('user3','book1')]

schema = 'user string, book string'

df_book = spark.createDataFrame(data,schema)

df_book.display()

In [0]:
df_book.groupBy("user").agg(collect_list("book").alias("Books")).display()

#### Pivot

In [0]:
df.groupBy("Item_Type").pivot("Outlet_Size").agg(avg("Item_MRP").alias("MRP_Average")).display()

#### When-Otherwise

In [0]:
df = df.withColumn('Veg_Flag', when(col("Item_Type")=='Meat', 'Non_Veg').otherwise('Veg'))

df.display()

In [0]:
df.withColumn('Veg_Exp_Flag', when( ((col("Veg_Flag")== 'Veg') & (col("Item_MRP") < 100)), 'Veg_Inexpensive')\
                        .when( ((col("Veg_Flag")== 'Veg') & (col("Item_MRP") > 100)), 'Veg_expensive')\
                        .otherwise('Non-Veg')).display()

In [0]:
df.withColumn('Veg_Exp_Flag', when( (col("Veg_Flag")== 'Veg') & (col("Item_MRP") < 100), 'Veg_Inexpensive')\
                        .when( (col("Veg_Flag")== 'Veg') & (col("Item_MRP") > 100), 'Veg_expensive')\
                        .otherwise('Non-Veg')).display()

#### JOINS

In [0]:
dataa1 = [('1','gaur','d01'),
          ('2','kit','d02'),
          ('3','sam','d03'),
          ('4','tim','d03'),
          ('5','aman','d05'),
          ('6','nad','d06')] 

schemaa1 = 'emp_id STRING, emp_name STRING, dept_id STRING' 

df1 = spark.createDataFrame(dataa1,schemaa1)

dataa2 = [('d01','HR'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]

schemaa2 = 'dept_id STRING, department STRING'

df2 = spark.createDataFrame(dataa2,schemaa2)

In [0]:
df1.display()

In [0]:
df2.display()

##### Inner Join

In [0]:
# we can join the column if they have same name without refer to dataframe name
df1.join(df2, "dept_id", "inner").display()

In [0]:
df1.join(df2, df1["dept_id"]==df2["dept_id"], "inner").display()

##### Left Join

In [0]:
df1.join(df2, "dept_id", "left").display()

##### Right Join

In [0]:
df1.join(df2, "dept_id", "right").display()

##### Anti Join

In [0]:
df1.join(df2, "dept_id", "anti").display()

In [0]:
df2.join(df1, "dept_id", "anti").display()

#### Window Function

In [0]:
df.display()

In [0]:
from pyspark.sql.window import Window

##### Row_Number

In [0]:
df.withColumn('RowNbrCol', row_number().over(Window.orderBy("Item_Identifier"))).display()

##### Rank vs Dense_Rank

In [0]:
df.withColumn("RankCol", rank().over(Window.orderBy(col("Item_Identifier").desc()))) \
    .withColumn("DenseRankCol", dense_rank().over(Window.orderBy(col("Item_Identifier").desc()))).display()

##### Cumulative Sum

In [0]:
df.withColumn("CumulativeSume", sum("Item_MRP").over(Window.orderBy("Item_Type"))).display()

In [0]:
df.withColumn("CumulativeSum", sum("Item_MRP").over(Window.orderBy("Item_Type").rowsBetween(Window.unboundedPreceding, Window.currentRow))).display()

In [0]:
df.withColumn("TotalSum", sum("Item_MRP").over(Window.orderBy("Item_Type").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))).display()

#### USER DEFINED FUNCTIONS (UDF)

In [0]:
# step 1:
def func_square(x):
    return x*x

In [0]:
# step 2
my_udf = udf(func_square)

In [0]:
# step 3
df.withColumn("Square_MRP", my_udf("Item_MRP")).display()

#### Data Writing

##### CSV

In [0]:
df.write.format("csv").option("header", "true").save("/Volumes/workspace/default/big_data/data.csv")

In [0]:
df_read = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/Volumes/workspace/default/big_data/data.csv")

df_read.display()

##### Append

In [0]:

df.write.format("csv").mode("append") \
    .option("header", "false") \
    .option("path", "/Volumes/workspace/default/big_data/data.csv") \
    .save()

In [0]:
df_read1 = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/Volumes/workspace/default/big_data/data.csv")

df_read1.display()

##### OverWrite

In [0]:
df.write.format("csv") \
    .mode("overwrite") \
    .option("header", "true") \
    .option("path", "/Volumes/workspace/default/big_data/data.csv") \
    .save()

df_read1 = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/Volumes/workspace/default/big_data/data.csv")

df_read1.display()

##### Error

In [0]:
df.write.format("csv") \
    .mode("error") \
    .option("header", "true") \
    .option("path", "/Volumes/workspace/default/big_data/data.csv") \
    .save()

##### Ignore

In [0]:
df.write.format("csv") \
    .mode("ignore") \
    .option("header", "false") \
    .option("path", "/Volumes/workspace/default/big_data/data.csv") \
    .save()

df_read1 = spark.read \
                .format("csv") \
                .option("header", "true") \
                .option('inferSchema', "true") \
                .load("/Volumes/workspace/default/big_data/data.csv")

df_read1.display()

##### Parquet

In [0]:
df.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("header", "true") \
    .option("path", "/Volumes/workspace/default/big_data/data.csv") \
    .save()

df_read1 = spark.read \
                .format("parquet") \
                .option("header", "true") \
                .option("inferSchema","true") \
                .load("/Volumes/workspace/default/big_data/data.csv")

df_read1.display()

##### Table

In [0]:
df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("big_mart_sales1")

In [0]:
df_recup = spark.table("big_mart_sales1")

df_recup.display()

#### Spark SQL

##### Create Temp View

In [0]:
# Convert DataFrame to object (View) for porforming SQL Operation
df.createTempView("my_view")

In [0]:
%sql

SELECT * FROM my_view WHERE Item_Fat_Content = "LF"

In [0]:
df_sql = spark.sql("SELECT * FROM my_view WHERE Item_Fat_Content = 'LF'")

df_sql.display()