In [10]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F

In [3]:
spark = SparkSession(SparkContext(appName="myAppName"))

In [21]:
data = (
  ("iPhone X",       "cell phone", 6000),
  ("iPad",     "tablet",     1500),
  ("iPad Pro 2",       "tablet",     5500),
  ("iPhone 8", "cell phone", 5000),
  ("Galaxy X",  "cell phone", 6000),
  ("Xiaomi Tablet",        "tablet",     2500),
  ("iPhone 7",   "cell phone", 3000),
  ("Galaxy 7",   "cell phone", 3000),
  ("Asus Tablet",        "tablet",     4500),
  ("iPad Pro X",       "tablet",     6500))

In [22]:
df = spark.createDataFrame(data, ["product", "category", "revenue"])
df.cache().show()

+-------------+----------+-------+
|      product|  category|revenue|
+-------------+----------+-------+
|     iPhone X|cell phone|   6000|
|         iPad|    tablet|   1500|
|   iPad Pro 2|    tablet|   5500|
|     iPhone 8|cell phone|   5000|
|     Galaxy X|cell phone|   6000|
|Xiaomi Tablet|    tablet|   2500|
|     iPhone 7|cell phone|   3000|
|     Galaxy 7|cell phone|   3000|
|  Asus Tablet|    tablet|   4500|
|   iPad Pro X|    tablet|   6500|
+-------------+----------+-------+



We want to answer two questions:

1. What are the best-selling and the second best-selling products in every category?
2. What is the difference between the revenue of each product and the revenue of the best-selling product in the same category of that product?

In [23]:
# question 1
w_sort_rev = Window.partitionBy("category").orderBy(F.col("revenue").desc())

In [24]:
(df
 .withColumn("row_number", F.row_number().over(w_sort_rev))
).show(10)

+-------------+----------+-------+----------+
|      product|  category|revenue|row_number|
+-------------+----------+-------+----------+
|   iPad Pro X|    tablet|   6500|         1|
|   iPad Pro 2|    tablet|   5500|         2|
|  Asus Tablet|    tablet|   4500|         3|
|Xiaomi Tablet|    tablet|   2500|         4|
|         iPad|    tablet|   1500|         5|
|     iPhone X|cell phone|   6000|         1|
|     Galaxy X|cell phone|   6000|         2|
|     iPhone 8|cell phone|   5000|         3|
|     iPhone 7|cell phone|   3000|         4|
|     Galaxy 7|cell phone|   3000|         5|
+-------------+----------+-------+----------+



In [25]:
(df
 .withColumn("row_number", F.row_number().over(w_sort_rev))
 .where(F.col("row_number") <= 2)
 .drop("row_number")
).show(10)

+----------+----------+-------+
|   product|  category|revenue|
+----------+----------+-------+
|iPad Pro X|    tablet|   6500|
|iPad Pro 2|    tablet|   5500|
|  iPhone X|cell phone|   6000|
|  Galaxy X|cell phone|   6000|
+----------+----------+-------+



In [30]:
# question 2
(df
.withColumn("next_revenue", F.lag("revenue", -1).over(w_sort_rev))
).show(100)

+-------------+----------+-------+------------+
|      product|  category|revenue|next_revenue|
+-------------+----------+-------+------------+
|   iPad Pro X|    tablet|   6500|        5500|
|   iPad Pro 2|    tablet|   5500|        4500|
|  Asus Tablet|    tablet|   4500|        2500|
|Xiaomi Tablet|    tablet|   2500|        1500|
|         iPad|    tablet|   1500|        null|
|     iPhone X|cell phone|   6000|        6000|
|     Galaxy X|cell phone|   6000|        5000|
|     iPhone 8|cell phone|   5000|        3000|
|     iPhone 7|cell phone|   3000|        3000|
|     Galaxy 7|cell phone|   3000|        null|
+-------------+----------+-------+------------+



In [31]:
(df
 .withColumn("next_revenue", F.lag("revenue", -1).over(w_sort_rev))
 .withColumn("revenue_diff", F.col("revenue") - F.col("next_revenue"))
 .fillna(0)
).show(100)

+-------------+----------+-------+------------+------------+
|      product|  category|revenue|next_revenue|revenue_diff|
+-------------+----------+-------+------------+------------+
|   iPad Pro X|    tablet|   6500|        5500|        1000|
|   iPad Pro 2|    tablet|   5500|        4500|        1000|
|  Asus Tablet|    tablet|   4500|        2500|        2000|
|Xiaomi Tablet|    tablet|   2500|        1500|        1000|
|         iPad|    tablet|   1500|           0|           0|
|     iPhone X|cell phone|   6000|        6000|           0|
|     Galaxy X|cell phone|   6000|        5000|        1000|
|     iPhone 8|cell phone|   5000|        3000|        2000|
|     iPhone 7|cell phone|   3000|        3000|           0|
|     Galaxy 7|cell phone|   3000|           0|           0|
+-------------+----------+-------+------------+------------+



In [32]:
(df
 .withColumn("next_revenue", F.lag("revenue", -1).over(w_sort_rev))
 .withColumn("revenue_diff", F.col("revenue") - F.col("next_revenue"))
 .fillna(0)
 .select("product", "revenue_diff")
).show(100)

+-------------+------------+
|      product|revenue_diff|
+-------------+------------+
|   iPad Pro X|        1000|
|   iPad Pro 2|        1000|
|  Asus Tablet|        2000|
|Xiaomi Tablet|        1000|
|         iPad|           0|
|     iPhone X|           0|
|     Galaxy X|        1000|
|     iPhone 8|        2000|
|     iPhone 7|           0|
|     Galaxy 7|           0|
+-------------+------------+

