#Read data

In [0]:
#path 

dbutils.fs.ls('/FileStore/tables')

In [0]:
df=spark.read\
      .format('csv')\
      .option("inferschema",True)\
      .option("header",True)\
      .load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
df.show()

In [0]:
#better o/p
df.display()

## Read JSON file 

In [0]:
df_json=spark.read\
    .format('json')\
        .option("inferschema",True)\
        .option("multiline",False)\
        .option("header",True)\
        .load('/FileStore/tables/drivers.json')

In [0]:
df_json.display()

#Schema definition

In [0]:
#print schema
df.printSchema()

##DDL schema

In [0]:
my_ddl_schema='''
        Item_Identifier STRING,
        Item_Weight STRING,
        Item_Fat_Content STRING,
        Item_Visibility DOUBLE,
        Item_Type STRING,
        Item_MRP DOUBLE,
        Outlet_Identifier STRING,
        Outlet_Establishment_Year INT,
        Outlet_Size STRING,
        Outlet_Location_Type STRING,
        Outlet_Type STRING,
        Item_Outlet_Sales DOUBLE

'''

In [0]:
df=spark.read\
      .format('csv')\
    .schema(my_ddl_schema)\
      .option("header",True)\
      .load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
df.display()

In [0]:
df.printSchema()

## StructType() Schema

In [0]:
#import libraries

from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
my_struct_schema=StructType(
    [
        StructField('Item_identifier',StringType(),True),
        StructField('Item_Weight', DoubleType(), True),
        StructField('Item_Fat_Content', StringType(), True),
        StructField('Item_Visibility', StringType(), True),
        StructField('Item_Type', StringType(), True),
        StructField('Item_MRP', DoubleType(), True),
        StructField('Outlet_Identifier', StringType(), True),
        StructField('Outlet_Establishment_Year', IntegerType(), True),
        StructField('Outlet_Size', StringType(), True),
        StructField('Outlet_Location_Type', StringType(), True),
        StructField('Outlet_Type', StringType(), True),
        StructField('Item_Outlet_Sales', DoubleType(), True)
    ]
)

In [0]:
df=spark.read\
    .format('csv')\
    .option('header',True)\
    .schema(my_struct_schema)\
    .load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
df.display()

In [0]:
df.printSchema()

#Transformations

##SELECT

In [0]:
df.display()

In [0]:
df_sel=df.select('item_identifier','item_weight','Item_Fat_Content')
df_sel.display()

In [0]:
#need to import function col
df.select(col('item_identifier'),col('item_type'),col('item_mrp')).display()

##ALIAS

In [0]:
df.select(col('item_identifier').alias('item_id'),col('item_type'),col('item_mrp')).display()

## FILTER

#### filter based on item_fat_content=Regular

In [0]:
df.filter(col('item_fat_content')=='Regular').display()

#### item type is soft drink and item weight less than 10

In [0]:
df.filter((col('item_type')=='Soft Drinks') & (col('item_weight')<10)).display()

#### Outlet location type -> tier 1 or 2     AND outlet size is null

In [0]:
df.filter(((col('Outlet_Location_Type')=='Tier 1') | (col('Outlet_Location_Type')=='Tier 2')) & (col('Outlet_Size').isNull())).display()

#### isin() and isNull()

In [0]:
df.filter((col('Outlet_Location_Type').isin('Tier 1','Tier 2')) & (col('Outlet_Size').isNull())).display()

## withColumnRenamed

In [0]:
df=df.withColumnRenamed('item_weight','item_wt')


In [0]:
df.display()

## New column/modify column (withColumn)


###new column

In [0]:
df=df.withColumn('flag',lit('new'))

In [0]:
df.display()

In [0]:
df=df.withColumn('multiply',col('item_wt')*col('item_mrp'))
df.display()

###Replace column values

In [0]:
df=df.withColumn('Item_Fat_Content',regexp_replace(col('Item_fat_Content'),'Regular','reg'))\
    .withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),'Low Fat','lf'))

df.display()

##Typecasting

In [0]:
df=df.withColumn('Item_wt',col('item_wt').cast(StringType()))

In [0]:
df.printSchema()

In [0]:
df.display()

In [0]:
df=df.withColumn('Item_wt',col('item_wt').cast(DoubleType()))
df.display()

## Sort/ orderBy()

### sorting by item weight descending

In [0]:
df.sort(col('item_weight').desc())\
    .display()

###Sorting by item_visibility ascending

In [0]:
df.sort(col('Item_visibility').asc()).display()

### sorting by multiple columns

#### sorting by item_wt descending and item_visibility ascending

In [0]:
df.sort(['item_wt','item_visibility'],ascending=[0,1]).display()

##LIMIT

In [0]:
df.limit(10).display()

In [0]:
df_x=df.limit(20)
display(df_x)

##Drop

In [0]:
df_drop=df.drop('Item_Visibility')
df_drop.display()

###drop multiple columns

In [0]:
df_drop2=df.drop('Item_visibility','Item_wt')
display(df_drop2)

##drop_duplicates

### entry duplicate

In [0]:
df.display()
df.dropDuplicates().display()
df.display()

In [0]:
df_ddc=df.dropDuplicates(subset=['item_type'])
df_ddc.display()

##UNION and UNION BY NAME

####preparing dataframes

In [0]:
data1=[(1,'Raj'),
       (2,'Ankit')]

schema1='id Integer, name String'

df1=spark.createDataFrame(data1,schema1)


In [0]:
data2=[(3,'modiji'),
       (4,'rahulji')]

schema2='id Integer, name String'

df2=spark.createDataFrame(data2,schema2)

In [0]:
df1.display()
df2.display()

#### Apply union

In [0]:
df1=df1.union(df2)

In [0]:
df1.display()

In [0]:
#messed up data

data3=[('Ankit',5),('Raj',6)]
schema='name string, id int'

df3=spark.createDataFrame(data3,schema)
df3.display()

In [0]:
df1.union(df3).display()


####Union Byname

In [0]:
df1.unionByName(df3).display()

In [0]:
data4=[('xyz','abc'),('xyz1','abc1')]
schema4='address string, name string'

df4=spark.createDataFrame(data4,schema4)
df4.display()


In [0]:
df1.union(df4).display()

In [0]:
# df1.unionByName(df4).display()

#String functions

##initcap()

In [0]:
df.select(initcap('Item_Type').alias('capitalize')).display()

##lower()

In [0]:
df.select(lower('Item_Type').alias('lowercase')).display()

In [0]:
dfxy=df.withColumn('Item_Type',lower('Item_Type'))
dfxy.display()

##upper()

In [0]:
df_up=df.withColumn('Item_Type',upper('Item_Type'))
df_up.display()

#Date functions

## 1. current date

In [0]:
df_date=df.withColumn('Curr_date',current_date())
df_date.display()

##date_add()

In [0]:
df_dateAdd=df_date.withColumn('week_after',date_add('curr_date',7))
df_dateAdd.display()

##date_sub()

In [0]:
df_dateSub=df_date.withColumn('week_before',date_sub('curr_date',7))
df_dateSub.display()

In [0]:
df_dateSub1=df_date.withColumn('week_before',date_add('curr_date',-7))
df_dateSub1.display()

## datediff()

In [0]:
df_dd=df_dateSub.withColumn('date_difference',datediff('curr_date','week_before'))
df_dd.display()

In [0]:
df_dd=df_dateSub.withColumn('date_difference',
                            datediff('week_before','curr_date'))
df_dd.display()

##Date Format

In [0]:
df_daF=df_dateSub.withColumn('week_before',date_format('week_before','dd/mm/yy'))
df_daF.display()

#Handling Nulls

###Dropping Nulls

#### drop row where all columns are null

In [0]:
df.dropna('all').display()

####drop entry where any column null


In [0]:
df.dropna('any').display()

####Drop any when specific column value is null

In [0]:
df.dropna(subset=['Outlet_Size']).display()

### Replacing Null Values (filling nulls)

####replace all null values

In [0]:
df.fillna('NotAvailable').display()

#### replace subset col

In [0]:
df.fillna('NA',subset=['outlet_size']).display()

In [0]:
df.fillna(0,subset=['multiply']).display()

##Split And Indexing

####split

In [0]:
df_sp=df.withColumn('outlet_type',split('outlet_type',' '))
df_sp.display()

####indexing

In [0]:
df_sp1=df.withColumn('outlet_type',split('outlet_type',' ')[1])
df_sp1.display()

##Explode

In [0]:
#df_sp -> has splitted outlet type column

df_exp=df_sp.withColumn('outlet_type',explode('outlet_type'))
df_exp.display()

## array_contains()

In [0]:
df_sp.withColumn('Type1_flag',array_contains('outlet_type','Type1')).display()

##Group By

####sum of MRP for each item type

In [0]:
df_sum=df.groupBy('item_type').agg(sum('item_mrp').alias('sum of mrps'))
df_sum.display()

#### Average MRP

In [0]:
df.groupBy('item_type').agg(avg('item_mrp').alias('average MRP')).display()

###Group By on multiple columns

In [0]:
df.groupBy('item_type','outlet_size').agg(sum('item_mrp').alias('total_mrp')).display()

### Multiple Aggregations in one group By

In [0]:
df.groupBy('item_type','outlet_size').agg(sum('item_mrp').alias('total_mrp'),avg('item_mrp').alias('average')).display()

##Collect_list

In [0]:
data=[
    ('user1','book1'),
    ('user1','book2'),
    ('user2','book2'),
    ('user2','book3'),
    ('user3','book1'),
    
]

schema='user string, book string'

df_book=spark.createDataFrame(data,schema)
df_book.display()

In [0]:
df_book.groupBy('user').agg(collect_list('book').alias('book read')).display()

##PIVOT

In [0]:
df.groupBy('Item_Type').pivot('Outlet_size').agg(avg('Item_mrp')).display()

## When Otherwise

####scenario 1: Veg or non-veg?

In [0]:
df.withColumn('veg_flag',when(col('Item_Type')=="Meat","Non-veg").otherwise('veg'))\
.display()

#### if veg and MRP>100 then veg_expensive if mrp<100 ->veg inexpensive else non-veg

In [0]:
df.withColumn('label',when(col('Item_type')=='Meat',"Non-veg").otherwise(when(col('item_mrp')>100,"Veg-expensive").otherwise('veg-inexpensive'))).display()

###---------------------------------end----------------------------------------

#Joins

## INNER JOIN

In [0]:
#create dataframe

data1=[
    ('1','Gaur','d01'),
    ('2','kit','d02'),
    ('3','Sam','d03'),
    ('4','Tam','d03'),
    ('5','Bam','d05'),
    ('5','Bam','d05'),
    ('6','Ank','d06')

]

schema_d1='emp_id string, emp_name string, dept_id string'

data2=[
    ('d01','HR'),
    ('d02','Marketing'),
    ('d03','Accounting'),
    ('d04','IT'),
    ('d05','Finance'),
]

schema_d2='dept_id string, department string'


df_emp=spark.createDataFrame(data1,schema_d1)
df_dept=spark.createDataFrame(data2,schema_d2)

df_emp.display()
df_dept.display()

In [0]:
df_emp.join(df_dept,df_emp['dept_id']==df_dept['dept_id']).display()

##Left Join

In [0]:
df_emp.join(df_dept,df_emp['dept_id']==df_dept['dept_id'],'left').display()

##Right Join

In [0]:
df_emp.join(df_dept,df_emp['dept_id']==df_dept['dept_id'],'right').display()

##Anti join

In [0]:
df_emp.join(df_dept,df_emp['dept_id']==df_dept['dept_id'],'anti').display()

##--------------------END------------------------------------

#Window functions

In [0]:
from pyspark.sql.window import Window

In [0]:
df.withColumn('row_num',row_number().over(Window.orderBy('Item_identifier'))).display()

##Rank 

In [0]:
df.withColumn('rank',rank().over(Window.orderBy('Item_identifier'))).display() 

In [0]:
#desc

df.withColumn('rank',rank().over(Window.orderBy(col('Item_identifier').desc()))).display() 

##Dense rank

In [0]:
df.withColumn('dense_rank',dense_rank().over(Window.orderBy('Item_identifier'))).display()

## Cumulative sum

In [0]:
df.withColumn('cumsum',sum("item_mrp").over(Window.orderBy('item_type').rowsBetween(Window.unboundedPreceding,Window.currentRow))).display()

#User defined functions

#### step 1 -> create function

In [0]:
def my_func(x):
    return x*x

####step 2: convert into pyspark

In [0]:
my_udf=udf(my_func)

####step 3: Use

In [0]:
df.withColumn("myCol",my_udf('item_mrp')).display()

# --------------------END----------------------------------

#Data writing

###1. CSV

In [0]:
df.write.format('csv')\
    .save('/FileStore/tables/csv/data.csv')

####append mode

In [0]:
df.write.format('csv')\
  .mode('append')\
    .save('/FileStore/tables/csv/data.csv')

####overwrire

In [0]:
df.write.format('csv')\
  .mode('overwrite')\
    .save('/FileStore/tables/csv/data.csv')

####error mode

In [0]:
# df.write.format('csv')\
#   .mode('error')\
#     .save('/FileStore/tables/csv/data.csv')

#### Ignore

In [0]:
df.write.format('csv')\
  .mode('ignore')\
    .save('/FileStore/tables/csv/data.csv')

## 2. Parquet

In [0]:
df.write.format('parquet')\
  .mode('overwrite')\
    .save('/FileStore/tables/csv/data.pq')

##Table

In [0]:
df.write.format('csv')\
  .mode('append')\
    .saveAsTable("ta")

# --------------------------END-------------------------

#Spark SQL

###createTempView

In [0]:
df.createTempView('my_view')

In [0]:
%sql
SELECT * from my_view
WHERE Item_Fat_Content='lf'

#### write it into df

In [0]:
df_sql=spark.sql("SELECT * from my_view WHERE Item_Fat_Content='lf' ")

In [0]:
df_sql.display()