### Loading Dataset ....

In [0]:
df = spark.read.format('csv').option('inferSchema',True).option('header',True)\
.load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
from pyspark.sql.types import * 
from pyspark.sql.functions import * 

In [0]:
df.display()

### Join

In [0]:
data1 = [('1','gaur','d01'),
          ('2','kit','d02'),
          ('3','sam','d03'),
          ('4','tim','d03'),
          ('5','aman','d05'),
          ('6','nad','d06')] 

schema1 = 'emp_id STRING, emp_name STRING, dept_id STRING' 

df1 = spark.createDataFrame(data1,schema1)

data2 = [('d01','HR'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]

schema2 = 'dept_id STRING, department STRING'

df2 = spark.createDataFrame(data2,schema2)

In [0]:
df1.display()

In [0]:
df2.display()

#### Inner Join

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'inner').display()

#### Left Join

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'left').display()

#### Right Join

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'right').display()

#### Anti Join

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'anti').display()

### Window Function

#### Row Number ()

In [0]:
from pyspark.sql.window import Window

In [0]:
df.withColumn('rowCol',row_number().over(Window.orderBy('Item_Identifier'))).display()

#### Rank Vs DenseRank

In [0]:
df.withColumn('rank',rank().over(Window.orderBy(col('Item_Identifier').desc())))\
        .withColumn('denseRank',dense_rank().over(Window.orderBy(col('Item_Identifier').desc()))).display()

In [0]:
df.withColumn('dum',sum('Item_MRP').over(Window.orderBy('Item_Identifier')\
    .rowsBetween(Window.unboundedPreceding,Window.currentRow))).display()

#### Cumulative Sum

In [0]:
df.withColumn('cumsum',sum('Item_MRP').over(Window.orderBy('Item_Type'))).display()

In [0]:
df.withColumn('cumsum',sum('Item_MRP').over(Window.orderBy('Item_Type')\
    .rowsBetween(Window.unboundedPreceding,Window.currentRow))).display()

In [0]:
df.withColumn('totalsum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing))).display()

### User Define Function

In [0]:
def new_func(x):
  return x*x

In [0]:
new_udf = udf(new_func)

In [0]:
df.withColumn('newcol',new_udf('Item_MRP')).display()

### Data Writting

#### CSV

In [0]:
df.write.format('csv')\
        .save('/FileStore/tables/CSV/data.csv')

#### Append

In [0]:
df.write.format('csv')\
        .mode('append')\
        .save('/FileStore/tables/CSV/data.csv')

In [0]:
df.write.format('csv')\
        .mode('append')\
        .option('path','/FileStore/tables/CSV/data.csv')\
        .save()

#### Overwrite

In [0]:
df.write.format('csv')\
.mode('overwrite')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()

#### Error

In [0]:
df.write.format('csv')\
.mode('error')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()

#### Ignore

In [0]:
df.write.format('csv')\
.mode('ignore')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()

#### Parquet

In [0]:
df.write.format('parquet')\
.mode('overwrite')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()

#### Table

In [0]:
df.write.format('parquet')\
.mode('overwrite')\
.saveAsTable('my_table')

In [0]:
df.display()

### Spark SQL

#### Create temple View

In [0]:
df.createTempView('my_view')

In [0]:
%sql

select * from my_view where Item_Fat_Content = 'Lf'

In [0]:
df_sql = spark.sql("select * from my_view where Item_Fat_Content = 'Lf'")

In [0]:
df_sql.display()

####     Completed ......