### Data Reading JSON

In [0]:
df_json = spark.read.format('json').option('inferSchema',True)\
    .option('header',True)\
        .option('multiline',False)\
            .load('/Volumes/pyspask-data-catalog/pyspark-schema/pyspark-stage/drivers.json')

In [0]:
df_json.display()

### Data Reading

In [0]:
dbutils.fs.ls('/Volumes/pyspask-data-catalog/pyspark-schema/pyspark-stage/')

In [0]:
df = spark.read.format('csv').option('inferSchema',True).option('header',True).load('/Volumes/pyspask-data-catalog/pyspark-schema/pyspark-stage/BigMart Sales.csv')

In [0]:
df.show()

In [0]:
df.display()

### Schema Definition

In [0]:
df.printSchema()

### DDL SCHEMA

In [0]:
my_ddl_schema = '''
Item_Identifier string,
Item_Weight string,
Item_Fat_Content string,
Item_Visibility double,
Item_Type string,
Item_MRP double,
Outlet_Identifier string,
Outlet_Establishment_Year int,
Outlet_Size string,
Outlet_Location_Type string,
Outlet_Type string,
Item_Outlet_Sales double'''

In [0]:
df = spark.read.format('csv')\
    .schema(my_ddl_schema)\
    .option('header',True)\
    .load('/Volumes/pyspask-data-catalog/pyspark-schema/pyspark-stage/BigMart Sales.csv')

In [0]:
df.display()

In [0]:
df.printSchema()

### StructType() Schema

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
my_strct_schema = StructType([
    StructField('Item_Identifier', StringType(), True),
    StructField('Item_Weight', StringType(), True),
    StructField('Item_Fat_Content', StringType(), True),
    StructField('Item_Visibility', StringType(), True),
    StructField('Item_Type', StringType(), True),
    StructField('Item_MRP', StringType(), True),
    StructField('Outlet_Identifier', StringType(), True),
    StructField('Outlet_Establishment_Year', StringType(), True),
    StructField('Outlet_Size', StringType(), True),
    StructField('Outlet_Location_Type', StringType(), True),
    StructField('Outlet_Type', StringType(), True),
    StructField('Item_Outlet_Sales', StringType(), True)
])

In [0]:
df = spark.read.format('csv')\
    .schema(my_strct_schema)\
        .option('header',True)\
            .load('/Volumes/pyspask-data-catalog/pyspark-schema/pyspark-stage/BigMart Sales.csv')

In [0]:
df.printSchema()

In [0]:
df = spark.read.format('csv').option('inferSchema',True).option('header',True).load('/Volumes/pyspask-data-catalog/pyspark-schema/pyspark-stage/BigMart Sales.csv')

In [0]:
df.printSchema()

### SELECT

In [0]:
df.display()

In [0]:
df.select(col('Item_Identifier'),col('Item_Weight'),col('Item_Fat_Content')).display()

### ALIAS

In [0]:
df.select(col("Item_Identifier").alias('Item_ID')).display()

### FILTER/WHERE

In [0]:
df.display()

#### Scenario - 1

In [0]:
df.filter(col('Item_Fat_Content')=='Regular').display()

#### Scenario - 2

In [0]:
df.filter((col('Item_Type')=='Soft Drinks') & (col('Item_Weight')<10)).display()

#### Scenario - 3

In [0]:
df.filter((col('Outlet_Location_Type').isin('Tier 1','Tier 2')) & (col("Outlet_Size").isNull())).display()

### withColumnRenamed

In [0]:
df.withColumnRenamed('Item_Weight','Item_Wt').display()

### withColumn

#### Scenario - 1

In [0]:
df = df.withColumn('flag',lit('new'))

In [0]:
df.display()

In [0]:
df.withColumn('multiply',col("Item_Weight")*col("Item_MRP")).display()

#### Scenario - 2

In [0]:
df.withColumn('Item_Fat_Content',regexp_replace(col("Item_Fat_Content"),"Regular","Reg"))\
    .withColumn('Item_Fat_Content',regexp_replace(col("Item_Fat_Content"),"Low Fat","Lf"))\
    .display()

### Type Casting

In [0]:
df = df.withColumn('Item_Weight', col('Item_Weight').cast(StringType()))

In [0]:
df.printSchema()

### Sort/orderBy

#### Scenario - 1

In [0]:
df.sort(col("Item_Weight").desc()).display()

#### Scenario - 2

In [0]:
df.sort(col("Item_Visibility").asc()).display()

#### Scenario - 3

In [0]:
df.sort(['Item_Weight','Item_Visibility'],ascending = [0,0]).display()

#### Scenario - 4

In [0]:
df.sort(['Item_Weight','Item_Visibility'],ascending = [0,1]).display()

### Limit

In [0]:
df.limit(10).display()

### DROP

#### Scenario - 1

In [0]:
df.drop(col("Item_Visibility")).display()

#### Scenario - 2

In [0]:
df.drop('Item_Visibility','Item_Type').display()

### DROP_DUPLICATES

#### Scenario - 1

In [0]:
df.dropDuplicates().display()

#### Scenario - 2

In [0]:
df.drop_duplicates(subset=['Item_Type']).display()

### DISTINCT

In [0]:
df.distinct().display()

### UNION and UNION BY NAME

#### Preaparing Dataframes

In [0]:
data1 = [
    ('1','kid'),
    ('2','sid')
]
schema1 = 'id STRING, name STRING'

df1 = spark.createDataFrame(data1,schema1)

data2 = [
    ('3','rahul'),
    ('4','jas')
]
schema2 = 'id STRING, name STRING'

df2 = spark.createDataFrame(data2,schema2)

In [0]:
df1.display()

In [0]:
df2.display()

### Union

In [0]:
df1.union(df2).display()

In [0]:
data1 = [
    ('kid','1'),
    ('sid','2')
]
schema1 = 'name STRING, id STRING'

df1 = spark.createDataFrame(data1,schema1)

In [0]:
df1.display()

In [0]:
df1.union(df2).display()

### Union by Name

In [0]:
df1.unionByName(df2).display()

### String Functions

#### Initcap()

In [0]:
df.select(initcap('Item_Type')).display()

#### Lower

In [0]:
df.select(lower('Item_Type')).display()

#### Upper

In [0]:
df.select(upper('Item_Type').alias('Upper_Item_Type')).display()

### DATE FUNCTIONS

#### Current_Date

In [0]:
df = df.withColumn('curr_date',current_date())
df.display()

#### Date_Add()

In [0]:
df = df.withColumn('week_after',date_add('curr_date',7))
df.display()

#### Date_Sub()

In [0]:
df = df.withColumn('week_before',date_sub('curr_date',7))
df.display()

In [0]:
df = df.withColumn('week_before',date_add('curr_date',-7))
df.display()

#### DATEDIFF

In [0]:
df = df.withColumn('datediff',datediff('week_after','curr_date'))
df.display()

#### Date_Format()

In [0]:
df = df.withColumn('week_before',date_format('week_before','dd-MM-yyyy'))
df.display()

### Handling nulls

#### Dropping nulls

In [0]:
df.dropna('all').display() # will drop records with have all null values in the columns

In [0]:
df.dropna('any').display() # will drop all the records with any null values in the columns

In [0]:
df.dropna(subset=['Outlet_Size']).display()

In [0]:
df.display()

#### Filling Nulls

In [0]:
df.fillna('NotAvailable').display()

In [0]:
df.fillna('NotAvailable',subset=['Outlet_Size']).display()

### SPLIT and Indexing

#### SPLIT

In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')).display()

#### Indexing

In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')[1]).display()

### EXPLODE

In [0]:
df_exp = df.withColumn('Outlet_Type',split('Outlet_Type',' '))
df_exp.display()

In [0]:
df_exp.withColumn('Outlet Type',explode('Outlet_Type')).display()

### Array_Contains

In [0]:
df_exp.display()

In [0]:
df_exp.withColumn('Type1_flag',array_contains('Outlet_Type','Type1')).display()

### Group_By

#### Scenario - 1

In [0]:
df.display()

In [0]:
df.groupBy('Item_Type').agg(sum('Item_MRP').alias('Sum_Item_MRP')).display()

#### Scenario - 2

In [0]:
df.groupBy('Item_Type').agg(avg('Item_MRP').alias('Avg_Item_MRP')).display()

#### Scenario - 3

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP')).display()

#### Scenario - 4

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP'),avg('Item_MRP').alias('Average_MRP')).display()

### Collect_List

In [0]:
data = [('user1','book1'),
        ('user1','book2'),
        ('user2','book2'),
        ('user2','book4'),
        ('user3','book1')
        ]
schema = 'user string, book string'

df_book = spark.createDataFrame(data,schema)
df_book.display()

In [0]:
df_book.groupBy('user').agg(collect_list('book')).display()

### Pivot

In [0]:
df.groupBy('Item_Type').pivot('Outlet_Size').agg(avg('Item_MRP')).display()

### When-Otherwise

#### Scenario - 1

In [0]:
df = df.withColumn('veg_flag',when(col('Item_Type')=='Meat','Non-Veg').otherwise('Veg'))
df.display()

#### Scenario - 2

In [0]:
df.withColumn('veg_exp_flag',when(((col('Item_Type')!='Meat') & (col('Item_MRP')<100)),'Veg_Inexpensive')\
    .when(((col('Item_Type')!='Meat') & (col('Item_MRP')>100)),'Veg_Expensive')\
        .otherwise('Non_Veg')).display()

### Joins