### Spark Initialization

In [None]:
from pyspark.sql import SparkSession
from IPython.display import display

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Test PySpark Installation") \
    .getOrCreate()

### Data Reading CSV

In [None]:
df=spark.read.format('csv')\
            .option('inferSchema',True)\
            .option('header',True)\
            .load('BigMart_Sales.csv')

Viewing Data

In [None]:
#it is an Action
df.show() 


### Data Reading Json

In [None]:
df_json=spark.read.format('json')\
                    .option('inferSchema',True)\
                    .option('header',True)\
                    .option('mulltiline',False)\
                    .load('drivers.json')

In [None]:
df_json.show()

### SCHEMA - DDL and StructType()

The below schema is what the schema we get if we infer schema to TRUE

In [None]:
df.printSchema()

### Schema Definition

In [None]:
my_ddl_schema='''
            Item_Identifier STRING,
            Item_Weight STRING,
            Itemp_Fat_Content STRING,
            Item_Visibility DOUBLE,
            Item_Type STRING,
            Item_MRP DOUBLE,
            Outlet_Identifier STRING,
            Outlet_Establishment_Year INT,
            Outlet_Size String,
            Outlet_Location_Type STRING,
            Outlet_Type STRING,
            Item_Outlet_Sales DOUBLE
            '''

In [None]:
df=spark.read.format('csv')\
                .schema(my_ddl_schema)\
                .option('header',True)\
                .load('BigMart_Sales.csv')

In [None]:
df.show()
df.printSchema()

### StructType() Schema

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [None]:
struct_schema=StructType([
    StructField('Item_Identifier',StringType(),True),
    StructField('Item_Weight',StringType(),True),
    StructField('Itemp_Fat_Content',StringType(),True),
    StructField('Item_Visibility',StringType(),True),
    StructField('Item_Type',StringType(),True),
    StructField('Item_MRP',StringType(),True),
    StructField('Outlet_Identifier',StringType(),True),
    StructField('Outlet_Establishment',StringType(),True),
    StructField('Outlet_Size',StringType(),True),
    StructField('Outlet_Location_Type',StringType(),True),
    StructField('Outlet_Type',StringType(),True),
    StructField('Item_Outlet_Sales',StringType(),True)
])

In [None]:
df=spark.read.format('csv')\
            .schema(struct_schema)\
            .option('Header',True)\
            .load('BigMart_Sales.csv')

In [None]:
df.printSchema()

## SELECT - To select required columns

In [None]:
df_select=df.select('Item_Identifier','Item_Weight','Item_Fat_Content')
df_select.show()

### COL

In [None]:
df.select(col('Item_Identifier'),col('Item_Weight'),col('Item_Fat_Content')).show()

### Alias - col is mandatory for alias

In [None]:
df.select(col('Item_Identifier')\
          .alias('Item_ID'))\
          .show()

### FILTER/WHERE

1. Filter data with fat content=Regular
2. Slice the data with item type=Soft Drinks and weight<10
3. Fetch the data with Tier in (Tier1 or Tier2) and outler size is null

In [None]:
df.show()

In [None]:
#scenario 1

df.filter(col('Item_Fat_Content')=='Regular').show()

In [None]:
#Scenario-2
df.filter( (col('Item_Type')=='Soft Drinks') & \
            (col('Item_Weight')<10))\
            .show()

In [None]:
#Scenario-3
df.filter(
    (col('Outlet_Size').isNull()) &
    (col('Outlet_Location_Type').isin('Tier 1','Tier 2'))
).show()

### withColumnRenamed

In [None]:
df.withColumnRenamed('Item_Weight','Item_Wt').show()

### withColumns

1. Add New Column
2. Modify Existing Column

In [None]:
#Scenario 1

df=df.withColumn('flag',lit("new"))
df.show()

In [None]:
#Scenario 2
df.withColumn('multiply',col('item_Weight')*col('Item_MRP')).show()

Replace the values on exisitng col

In [None]:
df.withColumn('Item_Fat_Content',\
              regexp_replace(col('Item_Fat_Content'),"Regular","Reg"))\
              .withColumn('Item_Fat_Content',\
                    regexp_replace(col('Item_Fat_Content'),"Low Fat","LF")).show()

### Type Casting

In [None]:
df=df.withColumn('Item_Weight',col('Item_Weight').cast(StringType()))
df.printSchema()

### Sort/orderBy - Default is Asc

1. sort by Item Weight in desc
2. sort by Item_Visibility in Asc
3. sort by Item_weight and Item_Visibility in Desc

In [None]:
df.sort(col('Item_Weight').desc()).show()

In [None]:
df.sort(col('Item_Visibility').asc()).show()

In [None]:
df.sort(['Item_Weight','Item_Visibility'],ascending=[0,0]).show()

In [None]:
df.sort(['Item_Weight','Item_Visibility'],ascending=[0,1]).show()

### Limit

In [None]:
df.limit(5).show()


### DROP

In [None]:
df.drop('Item_Visibility').show()

In [None]:
df.drop('Item_Visibility','Item_Type').show()

### DROP_DUPLICATES

In [None]:
df.dropDuplicates().show()

In [None]:
df.drop_duplicates(subset=['Item_Type']).show()

In [None]:
df.distinct().show()

### UNION AND UNION BYNAME

In [None]:
data1=[('1','kad'),
       ('2','sid')]
schema1='id STRING,name STRING'

df1=spark.createDataFrame(data1,schema1)

data2=[('3','rahul'),
       ('4','jas')]
schema2='id STRING,name STRING'

df2=spark.createDataFrame(data2,schema2)




In [None]:
df1.show()

In [None]:
df2.show()

UNION

In [None]:
df1.union(df2).show()

In [None]:
data1=[('kad','1'),
       ('sid','2')]
schema1='name STRING,id STRING'

df1=spark.createDataFrame(data1,schema1)
df1.show()

In [None]:
df1.union(df2).show()

UNION BY NAME

In [None]:
df1.unionByName(df2).show()

### STRING FUNCTIONS

INITCAP()

UPPER()

LOWER()

In [None]:
df.show()

Initcap()

In [None]:
df.select(initcap('Item_Type')).show(truncate=False)

upper() and lower()

In [None]:
df.select(upper('Item_Type').alias('Upper_Item_Type')).show(truncate=False)
df.select(lower('Item_Type')).show(truncate=False)

### DATE FUNCTIONS

CURRENT_DATE()
DATE_ADD()
DATE_SUB()

Current_Date

In [None]:
df=spark.read.format('csv')\
            .option('inferSchema',True)\
            .option('header',True)\
            .load('BigMart_Sales.csv')

In [None]:
df=df.withColumn('curr_date',current_date())
df.show()

Date_Add()

In [None]:
from pyspark.sql.functions import date_add
df_add=df.withColumn('week_after',date_add('curr_date',7))
df_add.show()

In [None]:
df=df_add.withColumn('week_before',date_sub('curr_date',7))
#you can also do same with -7 in date add
df.show()

### DATEDIFF

In [None]:
df=df.withColumn('date_diff',date_diff('curr_date','week_after'))
df.show()

### Date_Format

In [None]:

df=df.withColumn('week_before',date_format('week_before','dd-MM-yyyy'))
df.show()

### Handling NULLS

1. Dropping Nulls
2. Filling Nulls

Dropping Nulls

In [None]:
df.dropna('all').show()

In [None]:
df.dropna('any').show()

In [None]:
df.dropna(subset='Outlet_Size').show()

Filling Nulls

In [None]:
df.fillna('#NotAvailable').show()

In [None]:
df.fillna('NotAvailable',subset=['Outlet_Size']).show()

### SPLIT AND Indexing

In [None]:
df.show()

In [None]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')).show()

In [None]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')[1]).show()

### EXPLODE

In [None]:
df_exp=df.withColumn('Outlet_Type',split('Outlet_Type',' '))

df_exp.show()

In [None]:
df_exp.withColumn('Outlet_Type',explode('Outlet_Type')).show()

### ARRAY_CONTAINS

In [None]:
df_exp.withColumn('Type1_flg',array_contains('Outlet_Type','Type1')).show()

### GROUP_BY

In [None]:
df.groupBy('Item_Type').agg(sum('Item_MRP')).show()

In [None]:
df.groupBy('Item_Type').agg(avg('Item_MRP').alias('AVG')).show()

In [None]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('SUM'),avg('Item_MRP').alias("AVG")).show()

### COLLECT_LIST

In [None]:
data = [('user1','book1'),
        ('user1','book2'),
        ('user2','book2'),
        ('user2','book4'),
        ('user3','book1')]

schema='user string, book string'

df_book=spark.createDataFrame(data,schema)

df_book.show()

In [None]:
df_book.groupBy('user').agg(collect_list('book')).show()

### PIVOT

In [None]:
df.groupby('item_Type').pivot('Outlet_Size').agg(avg('Item_MRP')).show()

### WHEN-OTHERWISE

Scenario - 1 - if item_type is meat is non-veg else veg

In [None]:
df=df.withColumn('veg_flag',when(col('Item_Type')=='Meat','Non-Veg').otherwise('veg'))

Scenario - 2 - if item type is veg and item mrp is <100 then it is inexpensive else veg expensive otherwise non veg

In [None]:
df.withColumn('veg_exp_flg',when(((col('veg_flag')=='veg') \
              & (col('Item_MRP')<100)),'Veg_Inexpensive') \
              .when((col('veg_flag')=='veg') \
              & (col('Item_MRP')>100),'Veg_Expensive') \
              .otherwise('Non_Veg')).show()

### JOINS

- Inner Join
- Left Join
- Right Join
- Full Join
- Anti Join

In [None]:
dataj1 = [('1','gaur','d01'),
          ('2','kit','d02'),
          ('3','sam','d03'),
          ('4','tim','d03'),
          ('5','aman','d05'),
          ('6','nad','d06')] 

schemaj1 = 'emp_id STRING, emp_name STRING, dept_id STRING' 

df1 = spark.createDataFrame(dataj1,schemaj1)

dataj2 = [('d01','HR'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]

schemaj2 = 'dept_id STRING, department STRING'

df2 = spark.createDataFrame(dataj2,schemaj2)

In [None]:
df1.show()
df2.show()

INNER JOIN

In [None]:
df1.join(df2, df1['dept_id']==df2['dept_id'],'inner').show()

LEFT JOIN

In [None]:
df1.join(df2, df1['dept_id']==df2['dept_id'],'LEFT').show()

RIGHT JOIN

In [None]:
df1.join(df2, df1['dept_id']==df2['dept_id'],'right').show()

ANTI JOIN

In [None]:
df1.join(df2, df1['dept_id']==df2['dept_id'],'anti').show()

### WINDOW FUNCTIONS

- ROW NUMBER()
- RANK()
- DENSE_RANK()

ROW_NUMBER()

In [None]:
from pyspark.sql.window import Window

In [None]:
df.withColumn('rowCol',row_number().over(Window.orderBy('Item_Identifier'))).show()

RANK()

In [None]:
df.withColumn('rank',rank().over(Window.orderBy(col('Item_Identifier').desc()))).show()

Dense_Rank()

In [None]:
df.withColumn('rank',rank().over(Window.orderBy(col('Item_Identifier').desc()))) \
    .withColumn('denserank',dense_rank().over(Window.orderBy(col('Item_Identifier').desc()))).show()

### WINDOW FUNCTIONS

- Cumulative Sum

In [None]:
df.withColumn('cum_sum',sum('Item_MRP').over(Window.orderBy('Item_Type'))).show()

In [None]:
df.withColumn('cum_sum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.currentRow))).show()

In [None]:
df.withColumn('total_sum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing))).show()

### USER DEFINED FUNCTIONS

- create UDF
- create python function using udf()

In [None]:
def my_func(x):
    return x*x

In [None]:
my_udf=udf(my_func)

In [None]:
df.withColumn('New_Col',my_udf('Item_MRP')).show()

### DATA WRITING

- CSV

In [None]:
df.write.format('csv')\
    .mode('overwrite')\
        .save('D:/Data Engineering/Pyspark/Ansh L/write/')

###DATA WRITING MODES

- APPEND - TO APPEND DATA
- OVERWRITE - TO OVERWRITE DATA
- ERROR - To Throw an error if file is already exists
- IGNORE - To IGNORE any error

In [None]:
df.write.format('csv')\
    .mode('append')\
        .save('D:/Data Engineering/Pyspark/Ansh L/write/')

In [None]:
df.write.format('csv')\
    .mode('overwrite')\
        .save('D:/Data Engineering/Pyspark/Ansh L/write/')

In [None]:
df.write.format('csv')\
    .mode('error')\
        .save('D:/Data Engineering/Pyspark/Ansh L/write/')

In [None]:
df.write.format('csv')\
    .mode('Ignore')\
        .save('D:/Data Engineering/Pyspark/Ansh L/write/')

### PARQUET FILE FORMAT

- ROW BASED FILE FORMAT

In [None]:
df.write.format('parquet')\
    .mode('overwrite')\
        .save('D:/Data Engineering/Pyspark/Ansh L/write/')

### Table

In [None]:
df.write.format('parquet')\
    .mode('overwrite')\
        .saveAsTable('my_table')

### SPARK SQL

In [None]:
df.createTempView('my_view')

In [None]:
data2=spark.sql("""
select * from my_view""")

In [None]:
data2.show()

In [None]:
spark.sql("""
select * from my_view where Item_fat_content='Low Fat'
                """).show()
#data3.show()