### Data Reading JSON

In [0]:
df_json=spark.read\
                .format('json')\
                .option('inferSchema', True)\
                .option('header', True)\
                .option('multiline', False)\
                .load('/Volumes/workspace/default/spark_volume/drivers.json')

In [0]:
df_json.display()

### Data Reading

In [0]:
df=spark.read.format('csv').option('inferSchema',True).option('header',True).load('/Volumes/workspace/default/spark_volume/BigMart Sales.csv')

In [0]:
df.display()

### Schema Definition

In [0]:
df.printSchema()

### DDL SCHEMA

In [0]:
my_ddl_schema='''
                Item_Identifier string,
                Item_Weight string,
                Item_Fat_Content string,
                Item_Visibility double,
                Item_Type string,
                Item_MRP double,
                Outlet_Identifier string,
                Outlet_Establishment_Year integer,
                Outlet_Size string,
                Outlet_Location_Type string,
                Outlet_Type string,
                Item_Outlet_Sales double
                '''

In [0]:
df=spark.read\
        .format('csv')\
        .schema(my_ddl_schema)\
        .option('header', True)\
        .load('/Volumes/workspace/default/spark_volume/BigMart Sales.csv')


In [0]:
df.display()

In [0]:
df.printSchema()


### StructType() Schema

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
my_strct_schema=StructType([        
                              StructField('Item_Identifier', StringType(),True),
                              StructField('Item_Weight', StringType(), True),
                              StructField('Item_Fat_Content', StringType(), True),
                              StructField('Item_Visibility', StringType(), True),
                              StructField('Item_Type', StringType(), True),
                              StructField('Item_MRP', StringType(), True),
                              StructField('Outlet_Identifier', StringType(), True),
                              StructField('Outlet_Establishment_Year', StringType(), True),
                              StructField('Outlet_Size', StringType(), True),
                              StructField('Outlet_Location_Type', StringType(), True),
                              StructField('Outlet_Type', StringType(), True),
                              StructField('Item_Outlet_Sale', StringType(), True)
])

In [0]:
df=spark.read\
        .format('csv')\
        .schema(my_strct_schema)\
        .option('header', True)\
        .load('/Volumes/workspace/default/spark_volume/BigMart Sales.csv')

In [0]:
df.printSchema()


### SELECT

In [0]:
df_sel=df.select('Item_Identifier', 'Item_Weight', 'Item_Fat_Content').display()

In [0]:
df_sel=df.select(col('Item_Identifier'),col('Item_Weight'), col('Item_Fat_Content')).display()

### ALIAS

In [0]:
df.select(col('Item_Identifier').alias('Item_ID')).display()

### FILTER

#### Scenario - 1

In [0]:
df.filter(col('Item_Fat_Content')=='Regular').display()

#### Scenario - 2

In [0]:
df.filter((col('Item_Type')=='Soft Drinks') & (col('Item_Weight')<10)).display()

#### Scenario - 3

In [0]:
df.filter((col('Outlet_Size').isNull()) & (col('Outlet_Location_Type').isin('Tier 1','Tier 2'))).display()

### withColumnRenamed

In [0]:
df.withColumnRenamed('Item_Weight','Item_Wt').display()

### withColumn


#### Scenario - 1

In [0]:
df.withColumn('flag',lit('new')).display()

In [0]:
df.withColumn('multiply',col('Item_Weight')*col("Item_MRP")).display()


#### Scenario - 2

In [0]:
df.withColumn('Item_Fat_content',regexp_replace(col('Item_Fat_Content'), "Regular", "Reg"))\
    .withColumn('Item_Fat_content',regexp_replace(col('Item_Fat_Content'), "Low Fat", "Lf"))\
    .display()



### Type Casting

In [0]:
df_cast=df.withColumn('Item_Weight',col('Item_Weight').cast(StringType()))

df_cast.printSchema()

### Sort/orderBy


#### Scenario - 1

In [0]:
df.sort(col('Item_Weight').desc()).display()


#### Scenario - 2 

In [0]:
df.sort(col("Item_Weight").asc()).display()

#### Scenario - 3 


In [0]:
df.sort(['Item_Weight','Item_MRP'], ascending=[0,1]).display()


### Limit

In [0]:
df.limit(10).display()


### DROP

#### Scenario - 1

In [0]:
df_drop=df.drop('Item_Visibility')

df_drop.display()


#### Scenario - 2

In [0]:
df_drop_multiple=df.drop('Item_Visibility','Item_Type')

df_drop_multiple.display()


### DROP_DUPLICATES


#### Scenario - 1 

In [0]:
df_drop_duplicates=df.dropDuplicates()

df_drop_duplicates.display()


#### Scenario - 2

In [0]:
df_drop_duplicates_2=df.drop_duplicates(subset=['Item_Type'])

df_drop_duplicates_2.display()



In [0]:
df.distinct().display()


### UNION AND UNION BYNAME

#### Preparing Dataframes

In [0]:
data1=[('1','kad'),('2','sid')]
schema1='id string, name string'

df1=spark.createDataFrame(data1,schema1)

data2=[('3','rahul'),('4','jas')]
schema2='id string, name string'

df2=spark.createDataFrame(data2,schema2)

In [0]:
df1.display()

In [0]:
df2.display()


### union

In [0]:
df1.union(df2).display()

#### unionByName

In [0]:
data1=[('kad','1'),('sid','2')]
schema1='name string, id string'

df1=spark.createDataFrame(data1,schema1)

data2=[('3','rahul'),('4','jas')]
schema2='id string, name string'

df2=spark.createDataFrame(data2,schema2)

In [0]:
df1.display()

In [0]:
df2.display()

In [0]:
df1.unionByName(df2).display()


### String Functions

#### InitCap()

In [0]:
from pyspark.sql.functions import *
df=spark.read\
        .format('csv')\
        .option('inferSchema',True)\
        .option('header',True)\
        .load('/Volumes/workspace/default/spark_volume/BigMart Sales.csv')

In [0]:
#df.select(initcap('Item_Type').alias('initcap_item_type')).display()
df.select(lower('Item_Type').alias('lower_item_type')).display()
#df.select(upper('Item_Type').alias('upper_item_type')).display()


### Date Functions


#### Current_Date

In [0]:
df=df.withColumn('curr_date',current_date())

df.display()

#### Date_Add

In [0]:
df=df.withColumn('week_after',date_add('curr_date',7))

df.display()


#### Date_Sub

In [0]:
df.withColumn('week_before',date_sub('curr_date',7)).display()

In [0]:
df=df.withColumn('week_before',date_add('curr_date',-7))
df.display()

### DateDiff

In [0]:
df=df.withColumn('datediff',datediff('curr_date','week_after'))
df.display()


### Date_Format()



In [0]:
df=df.withColumn('week_before',date_format('week_before','dd-MM-yyyy'))

df.display()

### Handling Nulls


### Dropping Nulls

In [0]:
df.dropna('all').display()

In [0]:
df.dropna('any').display()

In [0]:
df.dropna(subset=['Outlet_size']).display()

#### Filling Nulls

In [0]:
df.fillna('NotAvailable').display()

In [0]:
df.fillna('Not Avaialable',subset=['Outlet_Size']).display()

### Split and Indexing

#### SPLIT


In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')).display()


#### Indexing

In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')[1]).display()


### Explode

In [0]:
df_exp=df.withColumn('Outlet_Type',split('Outlet_Type',' '))
df_exp.display()

In [0]:
df_exp.withColumn('Outlet_Type',explode('Outlet_type')).display()


### Array_Contains

In [0]:
df_exp.display()

In [0]:
df_exp.withColumn('Type1_flag',array_contains('Outlet_Type','Type1')).display()

### GROUP_BY

#### Scenario - 1

In [0]:
df.groupBy('Item_Type').agg(sum('Item_MRP')).display()

#### Scenario - 2

In [0]:
df.groupBy('Item_Type').agg(avg('Item_MRP')).display()


#### Scenario - 3

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP')).display()

#### Scenario - 4

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP'),avg('Item_MRP')).display()

### Collect_List

In [0]:
data=[('user1','book1'),
      ('user1','book2'),
      ('user2','book2'),
      ('user2','book4'),
      ('user3','book1')]

schema='user string, book string'

df_book=spark.createDataFrame(data,schema)

df_book.display()

In [0]:
df_book.groupBy('user').agg(collect_list('book')).display()

### PIVOT

In [0]:
from pyspark.sql.functions import * 
df.groupBy('Item_Type').pivot('Outlet_Size').agg(avg('Item_MRP')).display()

### WHEN-OTHERWISE

#### Scenario - 1

In [0]:
from pyspark.sql.functions import *
df=spark.read\
        .format('csv')\
        .option('inferSchema',True)\
        .option('header',True)\
        .load('/Volumes/workspace/default/spark_volume/BigMart Sales.csv')

In [0]:
df_wo= df.withColumn('veg_flag',when(col('Item_Type')=='Meat','Non-Veg').otherwise('Veg'))

#### Scenario - 2

In [0]:
df_wo.withColumn(
    'veg_exp_flag',
    when(
        (col('veg_flag') == 'Veg') & (col('Item_MRP') < 100),
        'Veg_Inexpensive'
    ).when(
        (col('veg_flag') == 'Veg') & (col('Item_MRP') > 100),
        'Veg_Expensive'
    ).otherwise('Non-Veg')
).display()

### JOINS

In [0]:
dataj1 = [('1','gaur','d01'),
          ('2','kit','d02'),
          ('3','sam','d03'),
          ('4','tim','d03'),
          ('5','aman','d05'),
          ('6','nad','d06')] 

schemaj1 = 'emp_id STRING, emp_name STRING, dept_id STRING' 

df1 = spark.createDataFrame(dataj1,schemaj1)

dataj2 = [('d01','HR'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]

schemaj2 = 'dept_id STRING, department STRING'

df2 = spark.createDataFrame(dataj2,schemaj2)


In [0]:
df1.display()

In [0]:
df2.display()

#### Inner Join

In [0]:
df1.join(df2, df1['dept_id']==df2['dept_id'],'inner').display()

#### Left Join

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'left').display()

#### Right Join

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'right').display()

#### Anti Join

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'anti').display()

### Window Functions

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
df=spark.read\
        .format('csv')\
        .option('inferSchema',True)\
        .option('header',True)\
        .load('/Volumes/workspace/default/spark_volume/BigMart Sales.csv')

In [0]:
df.withColumn('rowCol',row_number().over(Window.orderBy('Item_Identifier'))).display()

#### Rank vs Desnse Rank

In [0]:
df = df.withColumn(
    'rank',
    rank().over(Window.orderBy(col('Item_Identifier').desc()))
).withColumn(
    'denserank',
    dense_rank().over(Window.orderBy(col('Item_Identifier').desc()))
)
display(df)

### Cummulative Sum

In [0]:
df.withColumn('cumsum',sum('Item_MRP').over(Window.orderBy('Item_type'))).display()

In [0]:
df.withColumn('cumsum',sum('Item_MRP').over(Window.orderBy('Item_type').rowsBetween(Window.unboundedPreceding,Window.currentRow))).display()

### User Defined Functions

In [0]:
def my_func(x):
    return x*x

In [0]:
my_udf=udf(my_func)

In [0]:
df.withColumn('mynewcol',my_udf('Item_MRP')).display()

### Data Writing

#### csv

In [0]:
df.write.format('csv')\
    .save("/Volumes/workspace/default/spark_volume/data")

#### apend

In [0]:
df.write.format('csv')\
    .mode('append')\
    .save("/Volumes/workspace/default/spark_volume/data")

#### overwrite

In [0]:
df.write.format('csv')\
    .mode('overwrite')\
    .save("/Volumes/workspace/default/spark_volume/data")

#### error

In [0]:
df.write.format('csv')\
    .mode('error')\
    .save("/Volumes/workspace/default/spark_volume/data")

#### ignore

In [0]:
df.write.format('csv')\
    .mode('ignore')\
    .save("/Volumes/workspace/default/spark_volume/data")

#### parquet

In [0]:
df.write.format('parquet')\
    .mode('append')\
    .save("/Volumes/workspace/default/spark_volume/data")

#### Table

In [0]:
df.write.format('delta')\
    .mode('overwrite')\
    .saveAsTable('my_table')

### SparkSQL

In [0]:
df.createTempView('my_view')

In [0]:
%sql

select * from my_view;

In [0]:
%sql
select * from my_view where Item_Fat_Content='Low Fat';

In [0]:
df_sql=spark.sql("select * from my_view where Item_Fat_Content='Low Fat'")

In [0]:
df_sql.display()