## Sudip P. Notebook


import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql import functions as F, types as T

### Reading the product dataset in Parquet format (there are 4 files and I am reading all of them in one shot)

In [6]:
df = spark.read.format("orc").option("header","true").load("file:///D:/data/product_parquet/")

In [7]:
df.show(5)

+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|product_id|product_category_id|        product_name|product_description|product_price|       product_image|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|      1009|                 45|Diamond Fear No E...|                   |       599.99|http://images.acm...|
|      1010|                 46|DBX Vector Series...|                   |        19.98|http://images.acm...|
|      1011|                 46|Old Town Canoe Sa...|                   |       499.99|http://images.acm...|
|      1012|                 46|Pelican Trailblaz...|                   |       299.99|http://images.acm...|
|      1013|                 46|Perception Sport ...|                   |       349.99|http://images.acm...|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
only showing top 5 

In [32]:
agg_df = df.groupBy("product_category_id").agg(avg("product_price")).orderBy(desc("product_category_id")).show()

+-------------------+------------------+
|product_category_id|avg(product_price)|
+-------------------+------------------+
|                 59|  74.4154167175293|
|                 58| 81.16666666666667|
|                 57| 83.11750133832295|
|                 56| 68.78916716575623|
|                 55| 31.49833329518636|
|                 54|  98.5733338991801|
|                 53|  135.198335647583|
|                 52| 56.49958340326945|
|                 51| 108.7441684405009|
|                 50|              85.5|
|                 49|156.44624829292297|
|                 48| 285.4045789241791|
|                 47|208.53041450182596|
|                 46|264.65457820892334|
|                 45|201.15624849001566|
|                 44|111.57250086466472|
|                 43| 283.9045820236206|
|                 42|127.99200286865235|
|                 41| 74.05520850419998|
|                 40|24.989999771118164|
+-------------------+------------------+
only showing top

In [33]:
agg_rnd_df = df.groupBy("product_category_id").agg(round(avg("product_price"))).orderBy(desc("product_category_id")).show()

+-------------------+----------------------------+
|product_category_id|round(avg(product_price), 0)|
+-------------------+----------------------------+
|                 59|                        74.0|
|                 58|                        81.0|
|                 57|                        83.0|
|                 56|                        69.0|
|                 55|                        31.0|
|                 54|                        99.0|
|                 53|                       135.0|
|                 52|                        56.0|
|                 51|                       109.0|
|                 50|                        86.0|
|                 49|                       156.0|
|                 48|                       285.0|
|                 47|                       209.0|
|                 46|                       265.0|
|                 45|                       201.0|
|                 44|                       112.0|
|                 43|          

### Max product price

In [16]:
max_product_df = df.groupBy("product_category_id").agg(max("product_price")).orderBy(desc("product_category_id")).show()

+-------------------+------------------+
|product_category_id|max(product_price)|
+-------------------+------------------+
|                 59|             100.0|
|                 58|             241.0|
|                 57|            189.99|
|                 56|            159.99|
|                 55|              85.0|
|                 54|            299.99|
|                 53|            199.99|
|                 52|             170.0|
|                 51|            219.99|
|                 50|             130.0|
|                 49|            399.99|
|                 48|            799.99|
|                 47|           1099.99|
|                 46|            549.99|
|                 45|            599.99|
|                 44|            399.99|
|                 43|            449.99|
|                 42|            179.99|
|                 41|            399.99|
|                 40|             24.99|
+-------------------+------------------+
only showing top

### Min value

In [18]:
min_product_df = df.groupBy("product_category_id").min("product_price").orderBy(desc("product_category_id")).show()

+-------------------+------------------+
|product_category_id|min(product_price)|
+-------------------+------------------+
|                 59|              28.0|
|                 58|              22.0|
|                 57|               0.0|
|                 56|              9.99|
|                 55|              9.99|
|                 54|             34.99|
|                 53|             69.99|
|                 52|              10.0|
|                 51|              28.0|
|                 50|              34.0|
|                 49|             19.98|
|                 48|             19.98|
|                 47|             21.99|
|                 46|             19.98|
|                 45|             27.99|
|                 44|             21.99|
|                 43|              99.0|
|                 42|               0.0|
|                 41|              9.59|
|                 40|             24.99|
+-------------------+------------------+
only showing top

### Doing aggregate and alias

In [25]:
count_df = df.groupBy("product_category_id").agg(count("product_price")).show()

+-------------------+--------------------+
|product_category_id|count(product_price)|
+-------------------+--------------------+
|                 31|                  24|
|                 53|                  24|
|                 34|                  24|
|                 26|                  24|
|                 27|                  24|
|                 44|                  24|
|                 12|                  24|
|                 22|                  24|
|                 47|                  24|
|                 52|                  24|
|                 13|                  24|
|                  6|                  24|
|                 16|                  24|
|                  3|                  24|
|                 40|                  24|
|                 20|                  24|
|                 57|                  24|
|                 54|                  24|
|                 48|                  24|
|                  5|                  24|
+----------

In [26]:
count_df_alias = df.groupBy("product_category_id").agg(count("product_price")).alias("count").orderBy("product_category_id").show()

+-------------------+--------------------+
|product_category_id|count(product_price)|
+-------------------+--------------------+
|                  2|                  24|
|                  3|                  24|
|                  4|                  24|
|                  5|                  24|
|                  6|                  24|
|                  7|                  24|
|                  8|                  24|
|                  9|                  24|
|                 10|                  24|
|                 11|                  24|
|                 12|                  24|
|                 13|                  24|
|                 15|                  24|
|                 16|                  24|
|                 17|                  20|
|                 18|                  24|
|                 19|                  24|
|                 20|                  24|
|                 21|                  24|
|                 22|                  24|
+----------

### Grouping on Multiple Columns

In [29]:
multiple_col_df = df.groupBy("product_category_id", "product_name").agg(count("product_price")).show()

+-------------------+--------------------+--------------------+
|product_category_id|        product_name|count(product_price)|
+-------------------+--------------------+--------------------+
|                 46|Quest Pioneer Adj...|                   1|
|                 33|LIJA Women's Open...|                   1|
|                 43|Diamondback Boys'...|                   1|
|                  5|Nike Men's KD VI ...|                   1|
|                 12|Fitness Gear Pro ...|                   1|
|                 52|Reebok Men's Wash...|                   1|
|                 53|Nike Kids' Grade ...|                   3|
|                 36|FootJoy Men's Sta...|                   1|
|                 41|Glove It Women's ...|                   1|
|                 43|Thule Trailway 4-...|                   1|
|                  3|Nike Men's USA Aw...|                   1|
|                 27|Nike Men's Deutsc...|                   1|
|                 44|SKLZ Sport-Brella X

### Can run multiple aggregate funcitons 

In [30]:
mul_agg_df = df.groupBy("product_category_id").agg(
    max("product_price").alias("maximum_price"),
    min("product_price").alias("minimum_price"),
    avg("product_price").alias("average_price"),
    count("product_category_id").alias("count_product_category_id")
).orderBy(desc("product_category_id")).show()

+-------------------+-------------+-------------+------------------+-------------------------+
|product_category_id|maximum_price|minimum_price|     average_price|count_product_category_id|
+-------------------+-------------+-------------+------------------+-------------------------+
|                 59|        100.0|         28.0|  74.4154167175293|                       24|
|                 58|        241.0|         22.0| 81.16666666666667|                       24|
|                 57|       189.99|          0.0| 83.11750133832295|                       24|
|                 56|       159.99|         9.99| 68.78916716575623|                       24|
|                 55|         85.0|         9.99| 31.49833329518636|                       24|
|                 54|       299.99|        34.99|  98.5733338991801|                       24|
|                 53|       199.99|        69.99|  135.198335647583|                       24|
|                 52|        170.0|         10.0| 

### Trying Where on Aggregate DataFrame

In [43]:
mul_agg_where_df = df.groupBy("product_category_id").agg(
    max("product_price").alias("maximum_price"),
    min("product_price").alias("minimum_price"),
    avg("product_price").alias("average_price"),
    count("product_category_id").alias("count_product_category_id"))\
    .where(col("average_price") > 22).show(truncate=False)

+-------------------+-------------+-------------+------------------+-------------------------+
|product_category_id|maximum_price|minimum_price|average_price     |count_product_category_id|
+-------------------+-------------+-------------+------------------+-------------------------+
|31                 |899.99       |79.99        |294.7837489446004 |24                       |
|53                 |199.99       |69.99        |135.198335647583  |24                       |
|34                 |169.99       |34.99        |122.6983364423116 |24                       |
|26                 |90.0         |18.0         |41.664583683013916|24                       |
|27                 |90.0         |18.0         |44.164583365122475|24                       |
|44                 |399.99       |21.99        |111.57250086466472|24                       |
|12                 |179.99       |16.99        |69.69791618982951 |24                       |
|22                 |1799.99      |21.99        |1