Data Reading

In [0]:
dbutils.fs.ls('/FileStore/tables/')

In [0]:
df = spark.read.format('csv').option('inferSchema',True)\
    .option('header',True)\
    .load('/FileStore/tables/BigMart_Sales.csv')    

In [0]:
df.display()

Schema Definition

In [0]:
df.printSchema()

DDL SCHEMA

In [0]:
my_ddl_schema = '''
                    Item_Identifier STRING,
                    Item_Weight Double ,
                    Item_Fat_Content STRING, 
                    Item_Visibility DOUBLE,
                    Item_Type STRING,
                    Item_MRP DOUBLE,
                    Outlet_Identifier STRING,
                    Outlet_Establishment_Year INT,
                    Outlet_Size STRING,
                    Outlet_Location_Type STRING, 
                    Outlet_Type STRING,
                    Item_Outlet_Sales DOUBLE 

                ''' 

In [0]:
df = spark.read.format('csv').schema(my_ddl_schema)\
    .option('header',True)\
    .load('/FileStore/tables/BigMart_Sales.csv')  

In [0]:
df.printSchema()

TRANSFORMATIONS

SELECT

In [0]:

from pyspark.sql.types import * 
from pyspark.sql.functions import *  

In [0]:
df.select(col('Item_Identifier'), col('Item_Weight'), col('Item_Fat_Content')).display()

ALIAS

In [0]:
df.select(col('Item_Identifier').alias('Item_ID')).display()

FILTER

In [0]:
df.filter(col('Item_Fat_Content')=='Regular').display()

In [0]:
df.filter((col('Item_Type')=='Soft Drinks') & (col('Item_Weight') > 10) ).display()

In [0]:
df.filter( (col('Outlet_Size').isNull()) & (col('Outlet_Location_Type').isin('Tier 1','Tier 2')) ).display()

withColumnRenamed

In [0]:
df.withColumnRenamed('Item_Weight','Item_Wt').display()

withColumn

In [0]:
df = df.withColumn('flag', lit("new"))

In [0]:
df.display()

In [0]:
df = df.withColumn('Item_Fat_Content', regexp_replace(col('Item_Fat_Content'),"Regular","Reg"))\
    .withColumn('Item_Fat_Content', regexp_replace(col('Item_Fat_Content'),"Low Fat","Lf"))

df.display()

Type Casting

In [0]:
df = df.withColumn('Item_Weight', col('Item_Weight').cast(StringType())) 

In [0]:
df.printSchema()

Sort

In [0]:
df.sort(col('Item_Weight').desc()).display()

In [0]:
df.sort(col('Item_Visibility').asc()).display()

In [0]:
df.sort(['Item_Weight','Item_Visibility'],ascending = [0,0]).display()

In [0]:
df.sort(['Item_weight','Item_Visibility'], ascending = [0,1]).display()

Limit

In [0]:
df.limit(10).display()

DROP

In [0]:
df.drop('Item_Visibility').display()

In [0]:

df.drop('Item_Visibility','Item_Type').display()

DROP DUPLICATES

In [0]:
df.dropDuplicates().display()

In [0]:
df.dropDuplicates(subset=['Item_Type']).display()

UNION and UNION BY NAME

In [0]:
data1 = [('1','kad'),
        ('2','sid')]
schema1 = 'id STRING, name STRING' 

df1 = spark.createDataFrame(data1,schema1)

data2 = [('3','rahul'),
        ('4','jas')]
schema2 = 'id STRING, name STRING' 

df2 = spark.createDataFrame(data2,schema2)


In [0]:
df1.union(df2).display()

In [0]:
data1 = [('kad','1',),
        ('sid','2',)]
schema1 = 'name STRING, id STRING' 

df1 = spark.createDataFrame(data1,schema1)

df1.display()

In [0]:
df1.unionByName(df2).display()

Date Functions

In [0]:
df = df.withColumn('current_date',current_date())\
    .withColumn('week_after',date_add('current_date',7))\
        .withColumn('week_before',date_sub('current_date',7))\
           .withColumn('week_before_variation',date_add('current_date',-7))\
               .withColumn('date_diff',datediff('week_after','current_date'))\
                   .withColumn('date_format',date_format('week_before','dd-MM-yyyy'))

In [0]:
df.limit(10).display()

Handling Nulls

In [0]:
df.dropna('all').display()

In [0]:
df.dropna('any').display()

In [0]:
df.dropna(subset=['Outlet_Size']).display()

Filling Nulls

In [0]:

df.fillna('NotAvailable').display()

In [0]:
df.fillna('NotAvailable',subset=['Outlet_Size']).display()

SPLIT and Indexing

SPLIT

In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')).display()

Indexing

In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')[1]).display()

Explode

In [0]:

df_exp = df.withColumn('Outlet_Type',split('Outlet_Type',' '))

df_exp.withColumn('Outlet_Type',explode('Outlet_Type')).display() 
     

In [0]:

df_exp.display()

In [0]:
df_exp.withColumn('Type1_flag',array_contains('Outlet_Type','Type1')).display()

GroupBY

In [0]:

df.groupBy('Item_Type').agg(sum('Item_MRP')).display()

In [0]:
df.groupBy('Item_Type').agg(avg('Item_MRP')).display()

In [0]:

df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP')).display()

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP'),avg('Item_MRP')).display()

Collect_List

In [0]:
data = [('user1','book1'),
        ('user1','book2'),
        ('user2','book2'),
        ('user2','book4'),
        ('user3','book1')]

schema = 'user string, book string'

df_book = spark.createDataFrame(data,schema)

df_book.display()

In [0]:
df_book.groupBy('user').agg(collect_list('book')).display()

PIVOT

In [0]:
df.groupBy('Item_Type').pivot('Outlet_Size').agg(avg('Item_MRP')).display()

When-Otherwise

In [0]:
df = df.withColumn('veg_flag',when(col('Item_Type')=='Meat','Non-Veg').otherwise('Veg'))

In [0]:

df.display()

In [0]:

df.withColumn('veg_exp_flag',when(((col('veg_flag')=='Veg') & (col('Item_MRP')<100)),'Veg_Inexpensive')\
                            .when((col('veg_flag')=='Veg') & (col('Item_MRP')>100),'Veg_Expensive')\
                            .otherwise('Non_Veg')).display() 
     

JOINS

In [0]:
dataj1 = [('1','gaur','d01'),
          ('2','kit','d02'),
          ('3','sam','d03'),
          ('4','tim','d03'),
          ('5','aman','d05'),
          ('6','nad','d06')] 

schemaj1 = 'emp_id STRING, emp_name STRING, dept_id STRING' 

df1 = spark.createDataFrame(dataj1,schemaj1)

dataj2 = [('d01','HR'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]

schemaj2 = 'dept_id STRING, department STRING'

df2 = spark.createDataFrame(dataj2,schemaj2)

Inner Join

In [0]:
df1.join(df2, df1['dept_id']==df2['dept_id'],'inner').display()

Left Join

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'left').display()

RIGHT JOIN

In [0]:

df1.join(df2,df1['dept_id']==df2['dept_id'],'right').display()

ANTI JOIN

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'anti').display()

WINDOW FUNCTIONS

ROW_NUMBER()

In [0]:
from pyspark.sql.window import Window

In [0]:
df.withColumn('rowCol',row_number().over(Window.orderBy('Item_Identifier'))).display()

RANK VS DENSE RANK

In [0]:


df.withColumn('rank',rank().over(Window.orderBy(col('Item_Identifier').desc())))\
        .withColumn('denseRank',dense_rank().over(Window.orderBy(col('Item_Identifier').desc()))).display()

In [0]:
df.withColumn('dum',sum('Item_MRP').over(Window.orderBy('Item_Identifier').rowsBetween(Window.unboundedPreceding,Window.currentRow))).display()
     

USER DEFINED FUNCTIONS (UDF)

In [0]:
def my_func(x):
    return x*x 
my_udf = udf(my_func)
     

In [0]:
df.withColumn('mynewcol',my_udf('Item_MRP')).display()

DATA WRITING

CSV

In [0]:

df.write.format('csv')\
        .save('/FileStore/tables/CSV/data.csv')

APPEND

In [0]:

df.write.format('csv')\
        .mode('append')\
        .save('/FileStore/tables/CSV/data.csv')
     

Overwrite

In [0]:
df.write.format('csv')\
.mode('overwrite')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()
     

Error

In [0]:
df.write.format('csv')\
.mode('error')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()

Ignore

In [0]:
df.write.format('csv')\
.mode('ignore')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()

PARQUET

In [0]:
df.write.format('parquet')\
.mode('overwrite')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()

TABLE

In [0]:

df.write.format('parquet')\
.mode('overwrite')\
.saveAsTable('my_table')