
## JOINS

- Inner Join
- Left Join
- Right Join
- Full Join
- Anti Join



### INNER JOINS

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

df_emp = spark.read.format('csv').option('inferSchema', True).option('header', True)\
            .load('/Volumes/workspace/default/pyspark/employee_join.csv')

df_emp.display()

df_dept = spark.read.format('csv').option('inferSchema', True).option('header', True)\
            .load('/Volumes/workspace/default/pyspark/department_join.csv')

df_dept.display()

In [0]:
df_emp.join(df_dept, df_emp['dept_id'] == df_dept['dept_id'], 'inner').display()


#### LEFT JOINS

In [0]:
df_emp.join(df_dept, df_emp['dept_id'] == df_dept['dept_id'], 'left').display()
#df_emp.join(df_dept, df_emp['dept_id'] == df_dept['dept_id'], 'right').display()
#df_emp.join(df_dept, df_emp['dept_id'] == df_dept['dept_id'], 'outer').display()
#df_emp.join(df_dept)


####RIGHT JOIN

In [0]:
df_emp.join(df_dept, df_emp['dept_id'] == df_dept['dept_id'], 'right').display()


#### OUTER JOIN

In [0]:
df_emp.join(df_dept, df_emp['dept_id'] == df_dept['dept_id'], 'outer').display()


#### ANTI JOIN

In [0]:
df_emp.join(df_dept, df_emp['dept_id'] == df_dept['dept_id'], 'anti').display()


##WINDOW FUNCTION


#### ROW NUMBER

In [0]:
from pyspark.sql.window import *

df = spark.read.format('csv').option('inferSchema', True).option('header', True)\
    .load('/Volumes/workspace/default/pyspark/employee.csv')

df.display()

In [0]:
df.withColumn('RowCol', row_number().over(Window.orderBy('employee_id'))).display()


#### Rank Function

In [0]:
df.withColumn('RankCol', rank().over(Window.orderBy('experience_years'))).display()


#### Dense_Rank Function

In [0]:
df.withColumn('DRankCol', dense_rank().over(Window.orderBy('experience_years'))).display()

#### Diff Between Row_Number, Rank and Dense Rank Function

In [0]:
df.withColumn('RowCol', row_number().over(Window.orderBy('experience_years')))\
    .withColumn('RankCol', rank().over(Window.orderBy('experience_years')))\
    .withColumn('DRankCol', dense_rank().over(Window.orderBy('experience_years'))).display()

##### Cumulative Sum Function

In [0]:
df.withColumn('cumsum', sum('salary').over(Window.orderBy('department').rowsBetween(Window.unboundedPreceding, Window.currentRow))).display()

#### Diff between Window.unboundedPreceding, Window.currentRows and Window.unboundedFollowing

In [0]:
df.withColumn('cumsum', sum('salary').over(Window.orderBy('department').rowsBetween(Window.unboundedPreceding, Window.currentRow)))\
    .withColumn('totalsum', sum('salary').over(Window.orderBy('department').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))).display()

### User Defined Function

In [0]:
def my_func(x):
    return x*x

In [0]:
my_udf = udf(my_func)

In [0]:
df.withColumn('newudfval', my_udf('experience_years')).display()

##Data Writing


##### CSV

In [0]:
df.write.format('csv')\
    .mode('overwrite')\
    .save('/Volumes/workspace/default/pyspark/datafile.csv')

In [0]:
df.coalesce(1).write.mode('overwrite').option("header", "true").csv("/Volumes/workspace/default/pyspark/datafile_tmp.csv")



####TABLE 

In [0]:
df.write.format('delta')\
    .mode('overwrite')\
        .saveAsTable('my_table')

In [0]:
df.write.format('parquet')\
    .mode('overwrite')\
        .option('path', 'workspace.default')\
        .saveAsTable('my_table')

In [0]:
%sql
select * from my_table;