In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc, sum, col, substring, asc

spark = SparkSession \
    .builder \
    .appName("PySpark homework") \
    .config("spark.jars", "postgresql-42.7.4.jar") \
    .getOrCreate()

DB_URL = 'jdbc:postgresql://localhost:5433/postgres'
DB_USER = 'postgres'
DB_PASSWORD = 'postgres'    #bad practice, better using env variable instead 

def load_df(db_url, user, password, db_table):
    return spark.read \
    .format("jdbc") \
    .option("url", db_url) \
    .option("dbtable", db_table) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "org.postgresql.Driver") \
    .load()


In [None]:
# запрос,чтобы получить все названия продуктов и соответствующие им торговые марки (brand)
df_products = load_df(DB_URL, DB_USER, DB_PASSWORD, 'products')
df_brands = load_df(DB_URL, DB_USER, DB_PASSWORD, 'brands')

df = df_products.join(df_brands, df_products.brand_id == df_brands.brand_id)

df.select(df.product_name, df.brand_name).show()


+--------------------+----------+
|        product_name|brand_name|
+--------------------+----------+
|Electra Townie Or...|   Electra|
|Electra Cruiser 1...|   Electra|
|Electra Girl's Ha...|   Electra|
|Electra Moto 1 - ...|   Electra|
|Electra Townie Or...|   Electra|
|Electra Townie Or...|   Electra|
|Electra Cruiser 1...|   Electra|
|Electra Girl's Ha...|   Electra|
|Electra Girl's Ha...|   Electra|
|Electra Townie Or...|   Electra|
|Electra Townie Or...|   Electra|
|Electra Townie Or...|   Electra|
|Electra Townie Or...|   Electra|
|Electra Amsterdam...|   Electra|
|Electra Cruiser L...|   Electra|
|Electra Cruiser L...|   Electra|
|Electra Girl's Ha...|   Electra|
|Electra Glam Punk...|   Electra|
|Electra Amsterdam...|   Electra|
|Electra Amsterdam...|   Electra|
+--------------------+----------+
only showing top 20 rows



In [7]:
#запрос,чтобы найти всех активных сотрудников и наименования магазинов,в которых они работают
df_staffs = load_df(DB_URL, DB_USER, DB_PASSWORD, 'staffs')
df_stores = load_df(DB_URL, DB_USER, DB_PASSWORD, 'stores')

df_staffs.join(df_stores, df_staffs.store_id == df_stores.store_id)\
    .where(df_staffs.active == 1)\
    .select(df_staffs.first_name, df_staffs.last_name, df_stores.store_name).show()

+----------+---------+----------------+
|first_name|last_name|      store_name|
+----------+---------+----------------+
|   Fabiola|  Jackson|Santa Cruz Bikes|
|    Mireya| Copeland|Santa Cruz Bikes|
|     Genna|  Serrano|Santa Cruz Bikes|
|    Virgie|  Wiggins|Santa Cruz Bikes|
|  Jannette|    David|   Baldwin Bikes|
| Marcelene|    Boyer|   Baldwin Bikes|
|    Venita|   Daniel|   Baldwin Bikes|
|      Kali|   Vargas|   Rowlett Bikes|
|     Layla|  Terrell|   Rowlett Bikes|
|Bernardine|  Houston|   Rowlett Bikes|
+----------+---------+----------------+



In [14]:
#запрос,чтобы перечислить всех покупателей выбранного магазина с указанием их полных имен, электронной почты и номера телефона

df_customers = load_df(DB_URL, DB_USER, DB_PASSWORD, 'customers')
df_orders = load_df(DB_URL, DB_USER, DB_PASSWORD, 'orders')

df_customers.join(df_orders, (df_customers.customer_id == df_orders.customer_id) & (df_orders.store_id == 2), 'inner')\
            .select(df_customers.first_name, df_customers.last_name, df_customers.email, df_customers.phone).show()

+----------+---------+--------------------+--------------+
|first_name|last_name|               email|         phone|
+----------+---------+--------------------+--------------+
|     Julee|  Woodard|julee.woodard@yah...|          NULL|
|   Jerlene|     Rios|jerlene.rios@yaho...|          NULL|
|     Burma|  Summers|burma.summers@yah...|          NULL|
|       Shu|     Mays|  shu.mays@gmail.com|          NULL|
|     Leone|  Emerson|leone.emerson@msn...|          NULL|
|   Lekisha|     Pope|lekisha.pope@msn.com|          NULL|
|   Natosha|  Rowland|natosha.rowland@a...|          NULL|
|     Cyndi|     Bush|cyndi.bush@gmail.com|          NULL|
|     Cayla|  Johnson|cayla.johnson@msn...|          NULL|
|    Melita|Dominguez|melita.dominguez@...|          NULL|
|     Klara|      Kim| klara.kim@gmail.com|          NULL|
|  Bettyann|   Acosta|bettyann.acosta@g...|(717) 746-6658|
|   Daphine|   Willis|daphine.willis@ms...|          NULL|
|    Regine|     Odom| regine.odom@msn.com|          NUL

In [15]:
# запрос для подсчета количества продуктов в каждой категории
df_products = load_df(DB_URL, DB_USER, DB_PASSWORD, 'products')

df_products.groupby(df_products.category_id).agg(count('*')).show()

+-----------+--------+
|category_id|count(1)|
+-----------+--------+
|          1|      59|
|          6|      60|
|          3|      78|
|          5|      24|
|          4|      10|
|          7|      60|
|          2|      30|
+-----------+--------+



In [17]:
#запрос, чтобы указать общее количество заказов для каждого клиента
df_orders = load_df(DB_URL, DB_USER, DB_PASSWORD, 'orders')

df_orders.groupby(df_orders.customer_id).agg(count('*').alias('customer_orders')).sort(desc('customer_orders')).show()

+-----------+---------------+
|customer_id|customer_orders|
+-----------+---------------+
|         13|              3|
|         24|              3|
|         16|              3|
|          1|              3|
|          6|              3|
|         12|              3|
|          3|              3|
|         15|              3|
|         61|              3|
|         31|              3|
|          9|              3|
|         47|              3|
|         17|              3|
|         40|              3|
|          4|              3|
|         19|              3|
|          8|              3|
|         43|              3|
|          7|              3|
|         53|              3|
+-----------+---------------+
only showing top 20 rows



In [73]:
# запрос, в котором будет указана информация о полном имени и общем количестве заказов клиентов, которые хотя бы 1 раз сделали заказ

df_orders = load_df(DB_URL, DB_USER, DB_PASSWORD, 'orders')
df_customers = load_df(DB_URL, DB_USER, DB_PASSWORD, 'customers')

df_orders = df_orders.groupby(df_orders.customer_id).agg(count('*').alias('customer_orders'))

df_agg = df_customers.join(df_orders, df_customers.customer_id == df_orders.customer_id)
df_agg.select('first_name', 'last_name', 'customer_orders').sort(desc(df_agg.customer_orders)).show()


+----------+---------+---------------+
|first_name|last_name|customer_orders|
+----------+---------+---------------+
|   Lashawn|    Ortiz|              3|
|    Corene|     Wall|              3|
|    Emmitt|  Sanchez|              3|
|     Debra|    Burks|              3|
|   Lyndsey|     Bean|              3|
|     Robby|    Sykes|              3|
|    Tameka|   Fisher|              3|
|    Linnie|   Branch|              3|
|   Elinore|  Aguilar|              3|
| Williemae| Holloway|              3|
|  Genoveva|  Baldwin|              3|
| Bridgette|   Guerra|              3|
|     Caren| Stephens|              3|
|     Ronna|   Butler|              3|
|     Daryl|   Spence|              3|
|  Lizzette|    Stein|              3|
| Jacquline|   Duncan|              3|
|   Mozelle|   Carter|              3|
|   Latasha|     Hays|              3|
| Saturnina|   Garner|              3|
+----------+---------+---------------+
only showing top 20 rows



In [13]:
#load dataframes from postgres database
df_orders = load_df(DB_URL, DB_USER, DB_PASSWORD, 'orders')
df_order_items = load_df(DB_URL, DB_USER, DB_PASSWORD, 'order_items')

df = df_orders.join(df_order_items, df_orders.order_id == df_order_items.order_id, "inner")
df = df.withColumn('total_cost', (df.quantity * df.list_price) * (1 - df.discount)) \
    .withColumn('month', substring('order_date', 1, 7)) #add columns for month and total cost

In [14]:
#запрос для расчета общего объема продаж по каждому продукту (с учетом количества продукта, его цены по прейскуранту и скидки).
df.groupBy('product_id').agg(sum('total_cost')).show()

+----------+------------------+
|product_id|   sum(total_cost)|
+----------+------------------+
|       148|  14739.9736328125|
|       243|  3875.94287109375|
|        31|26438.107299804688|
|        85| 8464.243408203125|
|       251|  16649.9443359375|
|       137|11305.649658203125|
|        65|18681.940887451172|
|        53|14909.801025390625|
|       255|     2456.97265625|
|       133| 4139.972412109375|
|       296|295.99200439453125|
|        78|22899.965759277344|
|       108|12622.219146728516|
|       155|  54359.9560546875|
|        34|6781.9554443359375|
|       193| 12403.95556640625|
|       211| 2943.099853515625|
|       101|12079.844604492188|
|       126|  2928.03759765625|
|       115|14431.954833984375|
+----------+------------------+
only showing top 20 rows



In [15]:
#запрос с расчетом количества заказов по каждому статусу заказа
df_orders.groupBy('order_status').agg(count('*')).show()

+------------+--------+
|order_status|count(1)|
+------------+--------+
|           1|      62|
|           3|      45|
|           4|    1445|
|           2|      63|
+------------+--------+



In [None]:
#запрос для расчета общей суммы продаж за каждый месяц.
df.groupBy('month').agg(sum('total_cost')).sort(asc('month')).show()

+-------+------------------+
|  month|   sum(total_cost)|
+-------+------------------+
|2016-01|215146.42176818848|
|2016-02|156112.32083129883|
|2016-03|180600.32550048828|
|2016-04| 167144.0491027832|
|2016-05| 205270.0065460205|
|2016-06|210562.12280273438|
|2016-07| 199556.8063812256|
|2016-08|225657.37339782715|
|2016-09|273091.60652160645|
|2016-10|212078.07704162598|
|2016-11| 182329.4101715088|
|2016-12|199829.97749328613|
|2017-01|285616.48503112793|
|2017-02| 312923.7484512329|
|2017-03|308911.90142822266|
|2017-04| 227290.9143371582|
|2017-05|268233.23861694336|
|2017-06| 378865.6547393799|
|2017-07| 229995.3980255127|
|2017-08| 290553.4551849365|
+-------+------------------+
only showing top 20 rows



In [17]:
#запрос, чтобы найти топ 5 клиентов, которые потратили больше всего денег.
df = df_orders.join(df_order_items, df_orders.order_id == df_order_items.order_id, "inner")
df = df.withColumn('total_cost', (df.quantity * df.list_price) * (1 - df.discount))
df.where(col("order_status") == 4).groupBy('customer_id').agg(sum('total_cost').alias('sum')).sort(desc("sum")).limit(5).show()

+-----------+------------------+
|customer_id|               sum|
+-----------+------------------+
|         73| 27050.71827697754|
|        122|24890.624755859375|
|       1224|24607.026489257812|
|       1214| 20648.95458984375|
|        425|20509.425842285156|
+-----------+------------------+

