In [1]:
sc

<pyspark.context.SparkContext at 0x7fd4fd3c4790>

In [6]:
!rm -rf metastore_db/*.lck

from pyspark.sql import SQLContext
sqlc = SQLContext(sc)

In [7]:
from collections import namedtuple
Sales = namedtuple("Sales",["id","account","year","commission","sales_reps"])

sales = sc.parallelize([Sales(1, "Acme", "2013", 1000, ["Jim", "Tom"]),
         Sales(2, "Lumos", "2013", 1100, ["Fred", "Ann"]),
         Sales(3, "Acme", "2014", 2800, ["Jim"]),
         Sales(4, "Lumos", "2014", 1200, ["Ann"]),
         Sales(5, "Acme", "2014", 4200, ["Fred", "Sally"])]).toDF()

sales.show()

+---+-------+----+----------+-------------+
| id|account|year|commission|   sales_reps|
+---+-------+----+----------+-------------+
|  1|   Acme|2013|      1000|   [Jim, Tom]|
|  2|  Lumos|2013|      1100|  [Fred, Ann]|
|  3|   Acme|2014|      2800|        [Jim]|
|  4|  Lumos|2014|      1200|        [Ann]|
|  5|   Acme|2014|      4200|[Fred, Sally]|
+---+-------+----+----------+-------------+



In [8]:
from pyspark.sql.functions import explode

sales.select("id","account","year","commission",explode("sales_reps").alias("sales_rep")).show()

+---+-------+----+----------+---------+
| id|account|year|commission|sales_rep|
+---+-------+----+----------+---------+
|  1|   Acme|2013|      1000|      Jim|
|  1|   Acme|2013|      1000|      Tom|
|  2|  Lumos|2013|      1100|     Fred|
|  2|  Lumos|2013|      1100|      Ann|
|  3|   Acme|2014|      2800|      Jim|
|  4|  Lumos|2014|      1200|      Ann|
|  5|   Acme|2014|      4200|     Fred|
|  5|   Acme|2014|      4200|    Sally|
+---+-------+----+----------+---------+



In [9]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import IntegerType

column_len = UserDefinedFunction(lambda x: len(x), IntegerType())

exploded = sales.select("id","account","year","commission",
             column_len("sales_reps").alias("num_reps"),
             explode("sales_reps").alias("sales_rep"))

exploded.show()

+---+-------+----+----------+--------+---------+
| id|account|year|commission|num_reps|sales_rep|
+---+-------+----+----------+--------+---------+
|  1|   Acme|2013|      1000|       2|      Jim|
|  1|   Acme|2013|      1000|       2|      Tom|
|  2|  Lumos|2013|      1100|       2|     Fred|
|  2|  Lumos|2013|      1100|       2|      Ann|
|  3|   Acme|2014|      2800|       1|      Jim|
|  4|  Lumos|2014|      1200|       1|      Ann|
|  5|   Acme|2014|      4200|       2|     Fred|
|  5|   Acme|2014|      4200|       2|    Sally|
+---+-------+----+----------+--------+---------+



In [10]:
exploded = exploded.withColumn("share", exploded.commission / exploded.num_reps).drop("num_reps")
exploded.show()

+---+-------+----+----------+---------+------+
| id|account|year|commission|sales_rep| share|
+---+-------+----+----------+---------+------+
|  1|   Acme|2013|      1000|      Jim| 500.0|
|  1|   Acme|2013|      1000|      Tom| 500.0|
|  2|  Lumos|2013|      1100|     Fred| 550.0|
|  2|  Lumos|2013|      1100|      Ann| 550.0|
|  3|   Acme|2014|      2800|      Jim|2800.0|
|  4|  Lumos|2014|      1200|      Ann|1200.0|
|  5|   Acme|2014|      4200|     Fred|2100.0|
|  5|   Acme|2014|      4200|    Sally|2100.0|
+---+-------+----+----------+---------+------+



In [11]:
exploded.groupBy("sales_rep").pivot("year").sum("share").orderBy("sales_rep").show()

+---------+-----+------+
|sales_rep| 2013|  2014|
+---------+-----+------+
|      Ann|550.0|1200.0|
|     Fred|550.0|2100.0|
|      Jim|500.0|2800.0|
|    Sally| null|2100.0|
|      Tom|500.0|  null|
+---------+-----+------+



In [12]:
exploded.groupBy("account", "sales_rep").pivot("year").sum("share").orderBy("account", "sales_rep").show()

+-------+---------+-----+------+
|account|sales_rep| 2013|  2014|
+-------+---------+-----+------+
|   Acme|     Fred| null|2100.0|
|   Acme|      Jim|500.0|2800.0|
|   Acme|    Sally| null|2100.0|
|   Acme|      Tom|500.0|  null|
|  Lumos|      Ann|550.0|1200.0|
|  Lumos|     Fred|550.0|  null|
+-------+---------+-----+------+

