## Data Reading


In [0]:

df_bigMart = spark.read.format('csv').option('inferSchema',True)\
                    .option('header',True)\
                    .option('multiLine',False)\
                    .load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
df_bigMart.display()

## checking data path from dbfs using command


In [0]:
dbutils.fs.ls('/FileStore/tables/')

## Schema -DDL and StructType()

In [0]:
df_bigMart.printSchema()

In [0]:
my_ddl_schema = '''
                    Item_Identifier STRING,
                    Item_Weight STRING,
                    Item_Fat_Content STRING, 
                    Item_Visibility DOUBLE,
                    Item_Type STRING,
                    Item_MRP DOUBLE,
                    Outlet_Identifier STRING,
                    Outlet_Establishment_Year INT,
                    Outlet_Size STRING,
                    Outlet_Location_Type STRING, 
                    Outlet_Type STRING,
                    Item_Outlet_Sales DOUBLE 

                ''' 

In [0]:
df = spark.read.format('csv')\
            .schema(my_ddl_schema)\
            .option('header',True)\
            .load('/FileStore/tables/BigMart_Sales.csv') 

In [0]:
df.printSchema()

In [0]:
df.display()

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
mystrct_schema = StructType([
                                StructField('Item_Identifier',StringType(),True),
                                StructField('Item_Weight',StringType(),True), 
                                StructField('Item_Fat_Content',StringType(),True), 
                                StructField('Item_Visibility',StringType(),True), 
                                StructField('Item_MRP',StringType(),True), 
                                StructField('Outlet_Identifier',StringType(),True), 
                                StructField('Outlet_Establishment_Year',StringType(),True), 
                                StructField('Outlet_Size',StringType(),True), 
                                StructField('Outlet_Location_Type',StringType(),True), 
                                StructField('Outlet_Type',StringType(),True), 
                                StructField('Item_Outlet_Sales',StringType(),True)
])

In [0]:
df = spark.read.format('csv')\
        .schema(mystrct_schema)\
            .option('header',True)\
                .load('/FileStore/tables/BigMart_Sales.csv')
df.display()

In [0]:
df.printSchema()


In [0]:
df_bigMart.printSchema()


# Transfermation in Spark

### SELECT

In [0]:
df_sel = df_bigMart.select(['Item_Identifier','Item_Weight'])
df_sel.display()

### COL

In [0]:
df_bigMart.select(col('Item_Identifier'),col('Item_Weight')).display()

### alias

In [0]:
df_bigMart.select(col('Item_Identifier').alias('Item_code')).display()

In [0]:
df_bigMart.select('Item_Identifier').alias('Item_code').display()

# Filter/Where

In [0]:
from pyspark.sql.functions import col
df_bigMart.filter(col('Item_Fat_Content') == 'Regular').display()
# df_bigMart.select(col('Item_Fat_Content')).display()

In [0]:
#  To use multiple conditions we have to use extra () for each condition
df_bigMart.filter((col('Item_Type') == 'Soft Drinks') & (col('Item_Weight') < 10.0)).display()

In [0]:
lst_values = ['Tier 1','Tier 2']
df_bigMart.filter((col('Outlet_Location_Type').isin(lst_values)) & (col('Outlet_Size').isNull())).display()

### withColumnRenamed

In [0]:
df_bigMart.withColumnRenamed('Item_Weight','Item_wt').display()

### withColumn

In [0]:
#new col with a constant value, we have to use lit()
df = df_bigMart.withColumn('Flag',
                           lit('aaaaa')
                           )
df.display()

In [0]:
# with a transfromation 

df = df_bigMart.withColumn('mul',
                           col('Item_Weight') * col('Item_MRP')
                           )
df.display()

In [0]:
#To modify the existing col
df_bigMart.withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),'Regular','Reg'))\
        .withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),'Low Fat','Lf')).display()

### Type Casting

In [0]:
df = df_bigMart.withColumn('Item_Weight', col('Item_Weight').cast(StringType()))
df.printSchema()

### sort/orderBy


In [0]:
df.display()

In [0]:
df1 = df.sort(col('Item_Weight').desc())
df1.display()

In [0]:
df2 = df.sort(col('Item_Visibility').asc())
df2.display()

In [0]:
df3 = df.sort(['Item_Weight','Item_Visibility'],ascending = [0,0])
df3.display()

In [0]:
df4 = df.sort(['Item_Weight','Item_Visibility'],ascending = [0,1])
df4.display()

### limit

In [0]:
df5 = df4.limit(3)
df5.display()

### drop

In [0]:
df6 = df5.drop('Item_Weight')
df6.display()

In [0]:
df7 = df5.drop('Item_Weight','Item_Visibility')
df7.display()

### drop_duplicates

In [0]:
df8 = df4.drop_duplicates()
df8.display() 

In [0]:
# drop duplicates based on a col or multiple cols
df9 = df4.drop_duplicates(subset=['Item_Type'])
df9.display()

In [0]:
df10 = df4.distinct()
df10.display()

### Union and Union byname

In [0]:
data1 = [
    ('1','Kad'),
    ('2','jas')
]
schema1 = 'id STRING, name STRING'
df1 = spark.createDataFrame(data=data1, schema=schema1)
df1.display()

data2 = [
    ('3','rahul'),
    ('2','jas')
]
schema2 = 'id STRING, name STRING'
df2 = spark.createDataFrame(data=data2, schema=schema2)
df2.display()

In [0]:
df3 = df1.union(df2)
df3.display()

In [0]:
data1 = [
    ('Kad','1'),
    ('jas','2')
]
schema1 = 'name STRING, id STRING'
df1 = spark.createDataFrame(data=data1, schema=schema1)
df1.display()

In [0]:
df = df1.union(df2)
df.display()

In [0]:
df = df1.unionByName(df2)
df.display()

# String Functions
### initcap()
### lower()
### upper()

In [0]:
df.select(initcap('name')).display()

In [0]:
df.select(lower('name')).display()

In [0]:
df.select(upper('name')).display()

# Date Functions
### current_date()
### date_add()
### date_sub()

In [0]:
df1 = df.withColumn('curr_date', current_date())
df1.display()

In [0]:
df2 = df1.withColumn("week_After", date_add('curr_date',7))
df2.display()

In [0]:
df3 = df2.withColumn('date_sub',date_sub('curr_date',3))
df3.display()

In [0]:
df4 = df3.withColumn('dd', date_add('curr_date',-3))
df4.display()

### dateiff

In [0]:
df5 = df4.withColumn('diff_dates',datediff('week_after',current_date()))
df5.display()

### date_format

In [0]:
df6 = df5.withColumn('date_sub',date_format('date_sub','dd-MM-yyyy'))
df6.display()

# Handling NULLS
### 1. dropping NULLs
### 2. filling NULLs

In [0]:
# If any col has a null value then drop that record
df = df_bigMart.dropna(how='any')
df.display()
# If all the cols having NULL values in the records
df = df_bigMart.dropna(how='all')
df.display()

In [0]:
# if we have nulls in a particular cols
df = df_bigMart.dropna(subset=['Outlet_Size'])
df.display()

In [0]:
df = df_bigMart.fillna('NOT_AVAILAVALE')
df.display()

In [0]:
df = df_bigMart.fillna(0,subset=['Item_Weight'])
df.display()

# Split and Indexing

In [0]:
df = df_bigMart.withColumn('NewSplit_col',split('Outlet_Type',' '))
display(df)

In [0]:
df.select(col('NewSplit_col')[0]).display()

### explode

In [0]:
df1 = df.withColumn('NewSplit_col',explode('NewSplit_col'))
df1.display()

## array_contains

In [0]:
df2 = df.withColumn('yes/no', array_contains('NewSplit_col', 'Type1'))
df2.display()

### group_by


In [0]:
df = df2.groupBy('Item_Type').agg(avg('Item_MRP'))
df.display()

In [0]:
df = df2.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP'))
df.display()

In [0]:
df = df2.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP'),avg('Item_MRP'))
df.display()

### collect_list

### pivot

In [0]:
df3 = df2.select('Item_Type','Outlet_Size', 'Item_MRP')
df3.display()

In [0]:
df_pivot = df3.groupBy('Item_Type').pivot('Outlet_Size').agg(sum('Item_MRP'))
df_pivot.display()

### When_Otherwise

In [0]:
df_cndn = df_pivot.withColumn('cndn_flag',when(col('Item_Type')=='Meat','Non Veg').otherwise('Veg'))
df_cndn.display()

In [0]:
df_c = df_cndn.withColumn('veg_prc_flag', when((col('cndn_flag')=='Veg') & (col('High')>6000),'Veg Expensive')\
    .when((col('cndn_flag')=='Veg') & (col('High')<6000),'Veg Inexpensive')\
        .otherwise('Veg'))
df_c.display()

### Join
#### Left join
#### right join
#### full join
#### inner join
#### anti join

In [0]:

dataj1 = [('1','gaur','d01'),
          ('2','kit','d02'),
          ('3','sam','d03'),
          ('4','tim','d03'),
          ('5','aman','d05'),
          ('6','nad','d06')] 

schemaj1 = 'emp_id STRING, emp_name STRING, dept_id STRING' 

df1 = spark.createDataFrame(dataj1,schemaj1)

dataj2 = [('d01','HR'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]

schemaj2 = 'dept_id STRING, department STRING'

df2 = spark.createDataFrame(dataj2,schemaj2)

df1.display()
df2.display()

In [0]:
df_inner = df1.join(df2,on='dept_id',how='inner')
df_inner.display()

In [0]:
#Inner join will get the common matching records from both the table
df_join_inner = df1.join(df2,df1['dept_id']==df2['dept_id'], 'inner')
df_join_inner.display()

In [0]:
#Left join will give the whole left side of the table and matching records from the right side table
df_left = df1.join(df2,on='dept_id',how='left')
df_left.display()

In [0]:
#Right join will give the whole right side of the table and matching records from the left side of the table
df_right = df1.join(df2,on='dept_id',how='right')
df_right.display()

In [0]:
#Anti join will give you the non matching records from the table, from df1 in the below code
df_anti = df1.join(df2, on='dept_id', how='anti')
df_anti.display()

In [0]:
df_anti = df2.join(df1, on='dept_id', how='anti')
df_anti.display()

# Windows Functions
### Row Number()
### Rank()
### Dense Rank()
### Ntile()
### Lead()
### Lag()

In [0]:
# Row_number() will return a unique rank for each row without any gap in ranks.
from pyspark.sql.window import Window

In [0]:
df_rn = df_inner.withColumn('Row_Number_Rank', row_number().over(Window.orderBy('dept_id')))
df_rn.display()

In [0]:
#Rank() will skip the rank value if it is a tie
df_rank = df_inner.withColumn('rank',rank().over(Window.orderBy('dept_id')))
df_rank.display()

In [0]:
#Dense_rank() will return the next ranking value without skipping for a tie
df_dense_rank = df_inner.withColumn('Dense_rank',dense_rank().over(Window.orderBy('dept_id')))
df_dense_rank.display()

## Cumulative sum

In [0]:
#Employees Salary info
data1=[(100,"Raj",None,1,"01-04-23",50000),
       (200,"Joanne",100,1,"01-04-23",4000),(200,"Joanne",100,1,"13-04-23",4500),(200,"Joanne",100,1,"14-04-23",4020)]
schema1=["EmpId","EmpName","Mgrid","deptid","salarydt","salary"]
df_emp=spark.createDataFrame(data1,schema1)
display(df_emp)
#department dataframe
data2=[(1,"IT"),
       (2,"HR")]
schema2=["deptid","deptname"]
df_dept=spark.createDataFrame(data2,schema2)
display(df_dept)

In [0]:
df = df_emp.withColumn('Newsaldt',to_date('salarydt','dd-MM-yy'))
display(df)

In [0]:
# df1 = df.join(df_dept, df.deptid==df_dept.deptid).drop(df_dept.deptid)
df1 = df.join(df_dept,['deptid'])

display(df1)

In [0]:
df2 = df1.alias('a').join(df1.alias('b'),col('a.Mgrid')==col('b.EmpId'),'left')
display(df2)

In [0]:
df3 = df_emp.withColumn('Cum_sum', sum('salary').over(Window.orderBy('EmpId').rowsBetween(Window.unboundedPreceding,Window.currentRow)))
df3.display()

In [0]:
df4 = df3.withColumn('total_sum', sum('Cum_sum').over(Window.orderBy('EmpId').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)))
df4.display() 

## User defined Functions(UDF)

In [0]:
def my_func(x):
    return x*x
my_func = udf(my_func)

In [0]:
df4.withColumn('udf_func',my_func('EmpId')).display()

## Data Writing

In [0]:
# Data writing modes
# 1. append
# 2. Overwrite
# 3. Error
# 4. Ignore

In [0]:
#CSV
df.write.format('csv')\
    .mode('append')\
        .save('Path')

In [0]:
#CSV
df.write.format('csv')\
    .mode('overwrite')\
        .save('Path')

In [0]:
#CSV
df.write.format('csv')\
    .mode('error')\
        .save('Path')

In [0]:
#CSV
df.write.format('csv')\
    .mode('ignore')\
        .save('Path')