### Data Reading





In [0]:
df = spark.read.format('csv').option('inferSchema', True).option('header',True).load('/Volumes/workspace/first_schema/big_data/BigMart Sales.csv')


In [0]:
display(df)

### Data Reading json

In [0]:
df_json = spark.read.format('json').option('inferSchema',True)\
                    .option('header',True)\
                    .option('multiLine',False)\
                    .load('/Volumes/workspace/first_schema/big_data/drivers.json')


In [0]:
display(df_json)

### Schema Definition


In [0]:
df.printSchema()

### DDL Schema

In [0]:
my_ddl_schema = '''
                   Item_Identifier STRING,
                   Item_Weight STRING,
                   Item_Fat_Content STRING,
                   Item_Visibility DOUBLE,
                   Item_Type STRING,
                   Item_MRP DOUBLE,
                   Outlet_Identifier STRING,
                   Outlet_Establishment_Year INTEGER,
                   Outlet_Size STRING,
                   Outlet_Location_Type STRING,
                   Outlet_Type STRING,
                   Item_Outlet_Sales DOUBLE
                '''

In [0]:
df = spark.read.format('csv').schema(my_ddl_schema).option('header', True).load('/Volumes/workspace/first_schema/big_data/BigMart Sales.csv')

In [0]:
df.display()

In [0]:
df.printSchema()

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
df.printSchema()

### SELECT

In [0]:
df.select('Item_Identifier','Item_Weight','Item_Fat_Content').display()

In [0]:
df.select(col('Item_Identifier'),col('Item_Weight'),col('Item_Fat_Content')).display()

### ALIAS

In [0]:
df.select(col('Item_Identifier').alias('Item_ID')).display()

### FILTER

### SCENARIO - Fetch records with regular fat content

In [0]:
df.filter(col('Item_Fat_Content')=='Regular').display()

### SCENARIO Fetch records with item type as soft drink and item weight less than 10

In [0]:
df.filter( (col('Item_Type')=='Soft Drinks') & (col('Item_Weight')<10)).display()

### Scenario - Fetch the data with tier in (Tier1 or Tier 2) and outlet size is null

In [0]:
df.filter( (col('Outlet_Size').isNull()) & (col('Outlet_Location_Type').isin('Tier 1','Tier 2'))).display()

### withColumnRenamed

In [0]:
df.withColumnRenamed('Item_Weight','Item_WT').display()

### withColumn


### Creating new column

In [0]:
df = df.withColumn('flag',lit('new'))  

In [0]:
df.display()

In [0]:
df.withColumn('multiply',col('Item_Weight')*col('Item_MRP')).display()

### Modifying existing column values

In [0]:
df.withColumn('Item_Fat_Content',regexp_replace('Item_Fat_Content','Regular','Reg'))\
     .withColumn('Item_Fat_Content',regexp_replace('Item_Fat_Content','Low Fat','Lf')).display()

### Type Casting

In [0]:
df = df.withColumn('Item_Weight',col('Item_Weight').cast(StringType()))

In [0]:
df.printSchema()

### sort data

### Scenario 1

In [0]:
df.sort(col('Item_Weight').desc()).display()

### Scenario 2

In [0]:
df.sort(col('Item_Visibility').asc()).display()

### Scenario 3

In [0]:
df.sort(['Item_Weight','Item_Visibility'],ascending=[0,0]).display()

### Scenario 4

In [0]:
df.sort(['Item_Weight','Item_Visibility'],ascending=[0,1]).display()

### Limit

In [0]:
df.limit(10).display()

#### Dropping columns

### Scenario 1

In [0]:
df.drop('Item_Visibility').display()

### Scenario 2

In [0]:
df.drop('Item_Visibility','Item_Type').display()

### Drop_Duplicates

In [0]:
df.dropDuplicates().display()

### Scenario 2 - drop duplicates based on Item_types column

In [0]:
df.dropDuplicates(subset=['Item_Type']).display()

In [0]:
df.distinct().display()

### Union and Union by Name

### Preparing DataFrames

In [0]:
data1 = [('1','kad'),
         ('2','sid')]
schema1 = 'id STRING,name STRING'

df1 = spark.createDataFrame(data1,schema1)

data2 = [('3','rahul'),
         ('4','jas')]
schema2 = 'id STRING, name STRING'

df2 = spark.createDataFrame(data2,schema2)


In [0]:
df1.display()

In [0]:
df2.display()

In [0]:
df1.union(df2).display()

In [0]:
data1 = [('kad','1'),
         ('sid','2')]
schema1 = 'name STRING, id STRING'

df1 = spark.createDataFrame(data1,schema1)

df1.display()

In [0]:
df1.union(df2).display()

In [0]:
df1.unionByName(df2).display()

### String Function

### InitCap() - It is like proper function in excel, it make sure first letter is in upper case.

In [0]:
df.select(initcap(col('Item_Type'))).display()

### Date functions

### Current_Date() - It returns today' date

In [0]:
df = df.withColumn('curr_date',current_date())
df.display()


### Date_add()

In [0]:
df = df.withColumn('week_after',date_add('curr_date',7))
df.display()

### DateDiff()

In [0]:
df = df.withColumn('datediff',date_diff('week_after','curr_date'))
df.display()

### Date_Format()

In [0]:
df = df.withColumn('week_after',date_format('week_after','dd-MM-yyyy'))
df.display()

### Handling Nulls

### Dropping nulls

In [0]:
    df.dropna('all').display()

In [0]:
# It will delete record if any column has null value, even if one column has null value
df.dropna('any').display()

In [0]:
df.dropna(subset=['Outlet_Size']).display()

### Fill Na values


### Scenario 1 - Replace all the values

In [0]:
df.fillna('NotAvailable').display()

### Scenario 2 - Replace NA values for particular columns

In [0]:
df.fillna('NotAvailable',subset=['Outlet_Size']).display()

### Split and Indexing

In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')).display()

### Indexing

In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')[1]).display()

### Explode - If column has values as array or list and we want to convert column into row

In [0]:
df_exp = df.withColumn('Outlet_Type',split('Outlet_Type',' '))
df_exp.display()

In [0]:
df_exp.withColumn('Outlet_Type',explode('Outlet_Type')).display()

### Array_Contain

In [0]:
df_exp.withColumn('Type1_flag',array_contains('Outlet_Type','Type1')).display()

### Group By

In [0]:
df.groupBy('Item_Type').agg(sum('Item_MRP')).display()

### Get Average mrp

In [0]:
df.groupBy('Item_Type').agg(avg('Item_MRP')).display()

### Find mrp of each item type by outlet size

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP')).display()

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP'),avg('Item_MRP').alias('Avg_Price')).display()

In [0]:
data_Q = [(1,100),
          (2,200),
          (None,300),
          (None,400)]
Schema_Q = 'id INT, value INT'
df_Q = spark.createDataFrame(data_Q,Schema_Q)
df_Q.display()

In [0]:
df_Q.groupBy('id').agg(sum('value').alias('Sum')).display()

### Collect_List - It is similar to String_agg in sql server

In [0]:
data = [('user1','book1'),
        ('user1','book2'),
        ('user2','book2'),
        ('user2','book4'),
        ('user3','book1')]

schema = 'user STRING, book STRING'

df_book = spark.createDataFrame(data,schema)
df_book.display()

In [0]:
df_book.groupBy('user').agg(collect_list('book')).display()

### Pivot

In [0]:
df.groupBy('Item_Type').pivot('Outlet_Size').agg(avg('Item_MRP')).display()

### When Otherwise - It is similar to case when statement in sql server

In [0]:
df = df.withColumn('veg_flag',when(col('Item_Type')=='Meat','Non-Veg').otherwise('Veg'))

In [0]:
df.display()

In [0]:
df.withColumn('veg_exp_flag',when(((col('veg_flag')=='Veg') & (col("Item_MRP")<100)),'Veg-inexpensive')\
                            .when(((col('veg_flag')=='Veg') & (col('Item_MRP')>100)),'Veg-expensive')\
                            .otherwise('Non-Veg')).display()

### Joins

In [0]:
dataj1 = [('1','gaur','d01'),
          ('2','kit','d02'),
          ('3','sam','d03'),
          ('4','tim','d03'),
          ('5','aman','d05'),
          ('6','nad','d06')] 

schemaj1 = 'emp_id STRING, emp_name STRING, dept_id STRING' 

df1 = spark.createDataFrame(dataj1,schemaj1)

dataj2 = [('d01','HR'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]

schemaj2 = 'dept_id STRING, department STRING'

df2 = spark.createDataFrame(dataj2,schemaj2)

In [0]:
df1.display()

In [0]:
df2.display()

### Inner Join

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'inner').select(df1['*'],df2['department']).display()

### Left Join

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'left').display()

### Right Join

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'right').display()

### Anti Join

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'anti').display()

### Window function

### Row_Number

In [0]:
df.limit(100).display()

In [0]:
from pyspark.sql.window import Window

In [0]:
df.withColumn('rowCol',row_number().over(Window.orderBy('Item_Identifier'))).display()

### Rank

In [0]:
df.withColumn('rank',rank().over(Window.orderBy(col('Item_Identifier').desc())))\
        .withColumn('denseRank',dense_rank().over(Window.orderBy(col('Item_Identifier').desc()))).display()

### Cummulative Sum

In [0]:
df = df.withColumn('CumSum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.currentRow)))\
    .withColumn('CumSumPartition',sum('Item_MRP').over(Window.partitionBy('Item_Type').orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.currentRow)))

In [0]:
df.display()

### Cummulative Sum with - Unbounded following : It gives total sum for all the rows.

In [0]:
df.withColumn('TotalSum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing))).display()

### User Defined Function(UDF)

### Step - 1

In [0]:
def my_func(x):
    return x*x

### Step 2 - Convert it into pyspark udf

In [0]:
my_udf = udf(my_func)

In [0]:
df.withColumn('mynewcol',my_udf('Item_MRP')).display()

In [0]:
df = df.drop('CumSum','CumSumPartition')
df.display()

### Data Writing

#### CSV