In [23]:
from pyspark.sql import SparkSession 
from pyspark.sql.functions import col, lit, when, \
desc, asc, cast, like, count, min, max, median, percentile
from pyspark.sql.types import *

spark = SparkSession.Builder().appName('pivot() & unpivot() functions in Pyspark')\
.config("spark.sql.crossJoin.enabled",True)\
.getOrCreate()

24/06/17 13:57:05 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [31]:
empData = [
    (1,"Rohit",'M', 3000, "Data"),
    (2,"Ajay", 'M',2000, "Data"),
    (6,"Deepshika", 'F',2000, "Data"),
    (3,"Hemma", 'F',2000, "HR"),
    (4,"Arti", 'F',2000, "Marketing"),
    (5,"Kanchan", 'F',2000, "Marketing"),
]  

empDataSchema = ['empID', 'Name', 'Gender','Salary', 'dept']

df = spark.createDataFrame(empData, empDataSchema)
df.show()

+-----+---------+------+------+---------+
|empID|     Name|Gender|Salary|     dept|
+-----+---------+------+------+---------+
|    1|    Rohit|     M|  3000|     Data|
|    2|     Ajay|     M|  2000|     Data|
|    6|Deepshika|     F|  2000|     Data|
|    3|    Hemma|     F|  2000|       HR|
|    4|     Arti|     F|  2000|Marketing|
|    5|  Kanchan|     F|  2000|Marketing|
+-----+---------+------+------+---------+



In [32]:
df.groupBy('dept', 'gender').count().show()

+---------+------+-----+
|     dept|gender|count|
+---------+------+-----+
|     Data|     M|    2|
|     Data|     F|    1|
|       HR|     F|    1|
|Marketing|     F|    2|
+---------+------+-----+



#### pivot() function

In [33]:
%%time
df.groupBy('dept').pivot('gender',['M', 'F']).count().show()
print('when we specify what we want as pivot columns --> It takes less time')

+---------+----+---+
|     dept|   M|  F|
+---------+----+---+
|     Data|   2|  1|
|       HR|NULL|  1|
|Marketing|NULL|  2|
+---------+----+---+

when we specify what we want as pivot columns --> It takes less time
CPU times: user 9.13 ms, sys: 0 ns, total: 9.13 ms
Wall time: 336 ms


In [34]:
%%time
df.groupBy('dept').pivot('gender').count().show()

+---------+---+----+
|     dept|  F|   M|
+---------+---+----+
|     Data|  1|   2|
|       HR|  1|NULL|
|Marketing|  2|NULL|
+---------+---+----+

CPU times: user 4.25 ms, sys: 2.64 ms, total: 6.88 ms
Wall time: 536 ms


#### Unpivot DataFrame using stack() expr in PySpark

In [35]:
pivotedDf = df.groupBy('dept').pivot('Gender', ['M','F']).count().alias('countOfEmp')
pivotedDf.show()

+---------+----+---+
|     dept|   M|  F|
+---------+----+---+
|     Data|   2|  1|
|       HR|NULL|  1|
|Marketing|NULL|  2|
+---------+----+---+



In [36]:
from pyspark.sql.functions import expr

In [42]:
unpivotedDF = pivotedDf.select('dept', expr("stack(2,'Male', M, 'Female', F) as \
(gender, count)"))
unpivotedDF.filter(col('count').isNotNull()).show()

+---------+------+-----+
|     dept|gender|count|
+---------+------+-----+
|     Data|  Male|    2|
|     Data|Female|    1|
|       HR|Female|    1|
|Marketing|Female|    2|
+---------+------+-----+



In [44]:
spark.stop()