# DATA TRANSFORMATION FOR ADVANCED LEVEL

In [0]:
from pyspark.sql.functions import *

## COLLECT LIST

In [0]:
data = [('James','Java'),
  ('James','Python'),
  ('James','Python'),
  ('Anna','PHP'),
  ('Anna','Javascript'),
  ('Maria','Java'),
  ('Maria','C++'),
  ('James','Scala'),
  ('Anna','PHP'),
  ('Anna','HTML')
]

# Create DataFrame
df = spark.createDataFrame(data,schema=["name","languages"])
df.display()

In [0]:
df.groupBy('name').agg(collect_list('languages')).display()

## COLLECT SET

In [0]:
df.groupBy('name').agg(collect_set('languages')).display()

## PIVOT

In [0]:
df = spark.read.format('csv').option('inferSchema', True).option('header', True).load('dbfs:/FileStore/tables/BigMart_Sales.csv')

In [0]:
df.groupBy('Item_Type').pivot('Outlet_Size').agg(avg('Item_MRP')).display()

## WHEN-OTHERWISE

### Scenario - 1

In [0]:
df.withColumn('veg_flag', when(col('Item_Type') == 'Meat', 'Non-Veg').otherwise('Veg')).display()

### Scenario - 2

In [0]:
df.withColumn('veg_expensive', when((col('Item_Type')!='Meat') & (col('Item_MRP') > 100), 'Expensive')\
    .when((col('Item_Type')!='Meat') & (col('Item_MRP') <= 100), 'Not expensive').otherwise('Non-Veg'))\
    .display()

## JOINS

### INNER JOIN

In [0]:
emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)

In [0]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner").display()

### LEFT JOIN

In [0]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"left").display()

### RIGHT JOIN

In [0]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"right").display()

### ANTI JOIN

In [0]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"anti").display()

## WINDOW FUNCTIONS

### ROW NUMBER

In [0]:
from pyspark.sql.window import Window

In [0]:
df.withColumn('row_column', row_number().over(Window.orderBy('Item_Identifier'))).display()

### RANK

In [0]:
df.withColumn('row_column', rank().over(Window.orderBy('Item_Identifier'))).display()

### DENSE RANK

In [0]:
df.withColumn('row_column', dense_rank().over(Window.orderBy(col('Item_Identifier').desc()))).display()

### CUMMULATIVE SUM

In [0]:
df.withColumn('cumm_sum', sum('Item_MRP').over(Window.orderBy(col('Item_Type')))).display()

In [0]:
df.withColumn('cumm_sum', sum('Item_MRP').over(Window.orderBy(col('Item_Type'))\
    .rowsBetween(Window.unboundedPreceding, Window.currentRow))).display()

In [0]:
df.withColumn('total_sum', sum('Item_MRP').over(Window.orderBy(col('Item_Type'))\
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))).display()

## USER DEFINED FUNCTIONS

In [0]:
def my_func(x):
  return x*x

In [0]:
my_udf = udf(my_func)

In [0]:
df.withColumn('aux_col', my_func(col('Item_MRP'))).display()

## DATA WRITING

### CSV

In [0]:
df.write.format('csv').save('dbfs:/FileStore/tables/CSV/data.csv')

### PARQUET

In [0]:
df.write.format('parquet').save('dbfs:/FileStore/tables/CSV/data-parquet.csv')

### TABLE

In [0]:
df.write.format('parquet').saveAsTable('my_table')

## SPARK SQL

In [0]:
df.createTempView('my_view')

In [0]:
%sql

SELECT * FROM MY_VIEW
WHERE Item_Fat_Content = 'Low Fat';