###Data Reading CSV

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
df = spark.read.format('csv').option('header','True').option('inferSchema','True').load('/Volumes/workspace/santhosh/product_list/BigMart Sales.csv')

In [0]:
df.printSchema()

In [0]:
df.display()

### Data Reading Json

In [0]:
df_json = spark.read.format('json').option('inferSchema',True)\
                                   .option('header',True).option('multiline',False)\
                                       .load('/Volumes/workspace/santhosh/product_list/drivers.json')

In [0]:
df_json.display()

In [0]:
df.printSchema()


### DDL schema 
###used to change datatype

In [0]:
my_ddl = """
Item_Identifier STRING,
Item_Weight STRING,
Item_Fat_Content STRING,
Item_Visibility DOUBLE,
Item_Type STRING,
Item_MRP DOUBLE,
Outlet_Identifier STRING,
Outlet_Establishment_Year INT,
Outlet_Size STRING,
Outlet_Location_Type STRING,
Outlet_Type STRING,
Item_Outlet_Sales DOUBLE
"""


In [0]:
df = spark.read.format('csv').option('header','True').schema(my_ddl).load('/Volumes/workspace/santhosh/product_list/BigMart Sales.csv')

In [0]:
df.display()

- ###SELECT SYNTAX

In [0]:
df.select("Item_Identifier",'Item_Weight','Item_Fat_Content').display()

In [0]:
df.select(col("Item_Identifier"),col('Item_Weight'),col('Item_Fat_Content')).display()

###ALIAS

In [0]:
df.select((col("Item_Identifier").alias('1st col')),col("Item_Weight").alias('2nd col')).display()

###Filter

###Scenario 1

In [0]:
df.filter(col("Item_Fat_Content")=='Regular').display()

###Scenario 2

In [0]:
df.filter((col('Item_Weight')<=10)&(col('Item_Type')=='Soft Drinks')).display()

###Scenario 3

In [0]:
df.filter((col('Outlet_Size').isNull()) & (col('Outlet_Location_Type').isin('Tier 1','Tier 2'))).display()

### WITH colum renamed to change column name permanently

In [0]:
df.withColumnRenamed('Item_Weight','Item_Wt').display()

###with column to add or modify col
scenario 1

In [0]:
df = df.withColumn('Flag_Col',lit('new'))

In [0]:
df.display()

In [0]:
df = df.withColumn('multiply',col('Item_Weight')*col('Item_Visibility')).display()

In [0]:
df.withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"Regular","Reg")).display()

### Type casting

In [0]:
df=df.withColumn('Item_Weight',col('Item_Weight').cast(StringType()))

In [0]:
df.printSchema()

### order by/sort

In [0]:
df.sort(col('Item_Weight').desc()).display()

In [0]:
df.sort(col("Item_Identifier").asc()).display()

In [0]:
df.sort(['Item_Weight','Item_MRP'],ascending = [0,0]).display()

### limit

In [0]:
df.limit(10).display()

In [0]:
df.sort(['Item_Weight','Item_MRP'],ascending = [0,0]).limit(5).display()

### drop


### single column

In [0]:
df.drop("Item_Visibility").display()

### Multiple column

In [0]:
df.drop("Item_Fat_Content",'Item_Type').display()

###Drop duplicates

In [0]:
df.dropDuplicates().display()

In [0]:
df.drop_duplicates(subset=['item_Type']).display()

In [0]:
df.distinct().display()

In [0]:
data1 = [('1','sandy'),('2','sarah')]
schema1 ='id STRING' ,'name STRING'
df1 = spark.createDataFrame(data1,schema1)
df1.display()


In [0]:
data2 = [('vijay','3'),('leo','4')]
schema2 ='name STRING' ,'id STRING'
df2 = spark.createDataFrame(data2,schema2)
df2.display()


###UNION


In [0]:
df1.union(df2).display()

###Union by name


In [0]:
df1.unionByName(df2).display()

###Functions

###Initcap()

In [0]:
df.select(initcap('Item_Type')).display()

###lower

In [0]:
df.select(lower('Item_Type')).display()

###Upper

In [0]:
df.select(upper('Item_Type')).display()

###DATE()

In [0]:
df=df.withColumn('curr_date',current_date())

In [0]:
df.display()

###Date add()

In [0]:
df = df.withColumn('week_after',date_add('curr_date',7))
df.display()

###date sub()

In [0]:
df.withColumn('before_days',date_sub('curr_date',8)).display()

###Date_diff

In [0]:
df = df.withColumn('datediff',datediff('week_after','curr_date'))
df.display()

###Date format

In [0]:
df = df.withColumn('week_after',date_format('week_after','dd-MM-yyyy'))
df.display()

###Droping nulls
dropna(all) every column has null then the row is droped

In [0]:
df.dropna('all').display()

###Dropna(any)
if any column in a row has null then it is dropped

In [0]:
df.dropna('any').display()

In [0]:
df.dropna(subset=['Item_Weight']).display()

###filling nulls

In [0]:
df.fillna('Not Available').display()

In [0]:
df.fillna('Not Available',subset=['Item_Weight']).display()

###Split and Indexing

In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')).display()

In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')[1]).display()

###Explode

In [0]:
df_explode = df.withColumn('Outlet_Type',split('Outlet_Type',' '))
df_explode.display()

In [0]:
df_explode.withColumn('Outlet_Type',explode('Outlet_Type')).display()

###Array func

In [0]:
df_explode.withColumn('Type1_Flag',array_contains('Outlet_Type','Type1')).display()

###groupBy

In [0]:
df.groupBy('Item_Type').agg(sum('item_MRP')).display()

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('Total'),avg("Item_MRP").alias('Avgerage')).display()

###Collect list

In [0]:
books = [('user1','book1'),
            ('user1','book2'),
            ('user2','book3'),
            ('user2','book4'),
            ('user3','book1'),
            ('user4','book2')]
schema = ('user','books')
df_books = spark.createDataFrame(books,schema)
df_books.display()

In [0]:
df_books.groupBy('user').agg(collect_list("books")).display()

pivot func used to set the rows into column and perform process

In [0]:
df.groupBy('Item_Type').pivot('Outlet_size').agg(avg("Item_MRP")).display()

###when otherwise

In [0]:
df = df.withColumn('veg_flag',when(col("Item_Type")=='Meat','non_veg').otherwise('veg')).display()