In [None]:
import pandas as pd
import numpy as np
from datetime import datetime
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
data_orders = [
                (1, 1, 100, 1, 1232, datetime(2020, 9, 5, 0, 0, 0)),
                (2, 11, 100, 2, 2345, datetime(2020, 10, 4, 0, 0, 0)),
                (3, 22, 300, 8, 7686, datetime(2020, 3, 15, 0, 0, 0)),
                (4, 33, 200, 6, 5544, datetime(2020, 10, 7, 0, 0, 0)),
                (5, 44, 200, 3, 7676, datetime(2020, 12, 1, 0, 0, 0)),
                (6, 55, 300, 5, 3342, datetime(2020, 6, 2, 0, 0, 0)),
                (7, 66, 100, 3, 8687, datetime(2020, 11, 5, 0, 0, 0)),
                (8, 77, 200, 11, 35356, datetime(2020, 8, 7, 0, 0, 0)),
                (9, 88, 300, 9, 42224, datetime(2020, 3, 13, 0, 0, 0)),
                (10, 99, 300, 7, 35454, datetime(2020, 12, 21, 0, 0, 0)),
                (11, 2, 100, 1, 1232, datetime(2020, 9, 5, 0, 0, 0)),
                (12, 11, 100, 2, 2345, datetime(2020, 1, 4, 0, 0, 0)),
                (13, 22, 300, 8, 7686, datetime(2020, 2, 15, 0, 0, 0)),
                (14, 33, 200, 6, 5544, datetime(2020, 7, 7, 0, 0, 0)),
                (15, 44, 200, 3, 7676, datetime(2020, 12, 1, 0, 0, 0)),
                (16, 55, 300, 5, 3342, datetime(2020, 8, 2, 0, 0, 0)),
                (17, 66, 100, 3, 8687, datetime(2020, 12, 5, 0, 0, 0)),
                (18, 77, 200, 11, 35356, datetime(2020, 2, 7, 0, 0, 0)),
                (19, 88, 300, 9, 42224, datetime(2020, 5, 13, 0, 0, 0)),
                (20, 99, 300, 7, 35454, datetime(2020, 11, 21, 0, 0, 0)),
                (21, 3, 100, 1, 1232, datetime(2019, 12, 25, 0, 0, 0)),
                (22, 11, 100, 2, 2345, datetime(2019, 10, 4, 0, 0, 0)),
                (23, 22, 300, 8, 7686, datetime(2019, 12, 15, 0, 0, 0)),
                (24, 33, 200, 6, 5544, datetime(2019, 10, 7, 0, 0, 0)),
                (25, 44, 200, 3, 7676, datetime(2019, 11, 1, 0, 0, 0)),
                (26, 55, 300, 5, 3342, datetime(2019, 9, 2, 0, 0, 0)),
                (27, 66, 100, 3, 8687, datetime(2019, 10, 5, 0, 0, 0)),
                (28, 77, 200, 11, 35356, datetime(2019, 11, 7, 0, 0, 0)),
                (29, 88, 300, 9, 42224, datetime(2019, 12, 13, 0, 0, 0)),
                (30, 99, 300, 7, 35454, datetime(2019, 10, 21, 0, 0, 0))
               ]

data_vendors = [
                (100, 'Marsel', 10),
                (200, 'KFC', 15),
                (300, 'Ararat', 20),
               ]

In [None]:
schema_orders = StructType([ 
                            StructField("order_id", IntegerType(), True), 
                            StructField("user_id", IntegerType(), True), 
                            StructField("vendor_id", IntegerType(), True), 
                            StructField("product_count", IntegerType(), True), 
                            StructField("products_sum", IntegerType(), True),  
                            StructField("order_date", TimestampType(), True) 
                            ])

schema_vendors = StructType([ 
                            StructField("vendor_id", IntegerType(), True), 
                            StructField("vendor_name", StringType(), True), 
                            StructField("take_rate", IntegerType(), True)  
                            ])

In [None]:
orders = spark.createDataFrame(data = data_orders, schema = schema_orders)
orders.createOrReplaceTempView("orders")
orders.show(30)

+--------+-------+---------+-------------+------------+-------------------+
|order_id|user_id|vendor_id|product_count|products_sum|         order_date|
+--------+-------+---------+-------------+------------+-------------------+
|       1|      1|      100|            1|        1232|2020-09-05 00:00:00|
|       2|     11|      100|            2|        2345|2020-10-04 00:00:00|
|       3|     22|      300|            8|        7686|2020-03-15 00:00:00|
|       4|     33|      200|            6|        5544|2020-10-07 00:00:00|
|       5|     44|      200|            3|        7676|2020-12-01 00:00:00|
|       6|     55|      300|            5|        3342|2020-06-02 00:00:00|
|       7|     66|      100|            3|        8687|2020-11-05 00:00:00|
|       8|     77|      200|           11|       35356|2020-08-07 00:00:00|
|       9|     88|      300|            9|       42224|2020-03-13 00:00:00|
|      10|     99|      300|            7|       35454|2020-12-21 00:00:00|
|      11|  

In [None]:
vendors = spark.createDataFrame(data = data_vendors, schema = schema_vendors)
vendors.createOrReplaceTempView("vendors")
vendors.show()

+---------+-----------+---------+
|vendor_id|vendor_name|take_rate|
+---------+-----------+---------+
|      100|     Marsel|       10|
|      200|        KFC|       15|
|      300|     Ararat|       20|
+---------+-----------+---------+



## Задание_1. Выведи топ 10 вендоров с самым большим количеством заказов за последний месяц

In [None]:
request_1 = '''
WITH table_4 as (WITH table_3 as(WITH table_2 as(WITH table_1 as(
SELECT *, YEAR(order_date) as year, MONTH(order_date) as month FROM orders order by year desc, month desc)
SELECT * from table_1 WHERE year = (SELECT MAX(year) FROM table_1) and month = (SELECT MAX(month) FROM table_1))
SELECT vendors.vendor_id, vendors.vendor_name from vendors RIGHT JOIN table_2 on vendors.vendor_id = table_2.vendor_id)
SELECT table_3.vendor_name, COUNT(table_3.vendor_id) as count FROM table_3 GROUP BY table_3.vendor_name)
SELECT table_4.vendor_name FROM table_4 ORDER BY table_4.count desc LIMIT 2
'''
#берем топ 2 вендора, поскольку в нашей таблице их всего 3
spark.sql(request_1).show()

+-----------+
|vendor_name|
+-----------+
|        KFC|
|     Marsel|
+-----------+



## Задание_2. Выведи все рестораны(vendor_name) и количество их заказов

In [None]:
request_2 = '''
SELECT b.vendor_name, c.count FROM vendors as b LEFT JOIN (
SELECT a.vendor_id, COUNT(a.order_date) as count FROM orders as a GROUP BY a.vendor_id) as c
on b.vendor_id = c.vendor_id
'''
spark.sql(request_2).show()

+-----------+-----+
|vendor_name|count|
+-----------+-----+
|     Ararat|   12|
|     Marsel|    9|
|        KFC|    9|
+-----------+-----+



## Задание_3. Выведи всех клиентов, которые не заказывали в КФС за последний месяц

In [None]:
request_3 = '''
WITH table_3_3 as(WITH table_3_2 as(WITH table_3_1 as(
SELECT *, YEAR(order_date) as year, MONTH(order_date) as month FROM orders order by year desc, month desc)
SELECT * from table_3_1 WHERE year = (SELECT MAX(year) FROM table_3_1) and month = (SELECT MAX(month) FROM table_3_1))
SELECT vendors.vendor_id, vendors.vendor_name, table_3_2.user_id from vendors RIGHT JOIN table_3_2 on vendors.vendor_id = table_3_2.vendor_id)
SELECT table_3_3.user_id FROM table_3_3 WHERE table_3_3.vendor_name not like "KFC"
'''
spark.sql(request_3).show()

+-------+
|user_id|
+-------+
|     99|
|     66|
+-------+



## Задание_4. Выведи топ 3 лучших вендера по заказам в каждом месяце за последние 12 месяцев

In [None]:
request_4 = '''
WITH table_4_4 as (WITH table_4_3 as (WITH table_4_2 as (WITH table_4_1 as (
SELECT *, YEAR(order_date) as year, MONTH(order_date) as month, DAYOFYEAR(order_date) as num_day FROM orders order by year desc, month desc)
SELECT vendors.vendor_name, table_4_22.order_date, table_4_22.year, table_4_22.month, table_4_22.num_day, table_4_22.cnt FROM vendors LEFT JOIN (
SELECT table_4_1.year, table_4_1.month, table_4_1.vendor_id, table_4_1.order_date, table_4_1.num_day, COUNT(table_4_1.order_date) as cnt FROM table_4_1 
GROUP BY table_4_1.year, table_4_1.month, table_4_1.vendor_id, table_4_1.order_date, table_4_1.num_day ORDER BY table_4_1.year desc, table_4_1.month desc) as table_4_22
on vendors.vendor_id = table_4_22.vendor_id ORDER BY table_4_22.year desc, table_4_22.month desc, table_4_22.cnt desc)
SELECT * FROM table_4_2 WHERE table_4_2.year = (SELECT MAX(table_4_2.year) FROM table_4_2) OR ((table_4_2.year = (SELECT MAX(table_4_2.year) FROM table_4_2) - 1) AND table_4_2.num_day > (SELECT MAX(table_4_2.num_day) FROM table_4_2 WHERE table_4_2.year = (SELECT MAX(table_4_2.year) FROM table_4_2))))
SELECT *, CONCAT(table_4_3.year, ' ', table_4_3.month) as new_date FROM table_4_3)
SELECT table_4_5.new_date, table_4_5.vendor_name, table_4_5.rating_in_section FROM (
SELECT table_4_4.order_date, table_4_4.vendor_name, table_4_4.year, table_4_4.month, table_4_4.cnt, table_4_4.new_date, ROW_NUMBER() OVER (PARTITION BY table_4_4.new_date ORDER BY table_4_4.year desc, table_4_4.month desc, table_4_4.cnt) AS rating_in_section FROM table_4_4 ORDER BY table_4_4.year desc, table_4_4.month desc, table_4_4.cnt) AS table_4_5 WHERE table_4_5.rating_in_section <= 2
'''
#берем первые 2 значения, поскольку в нашей таблице их всего 3
spark.sql(request_4).show()

+--------+-----------+-----------------+
|new_date|vendor_name|rating_in_section|
+--------+-----------+-----------------+
| 2020 12|     Ararat|                1|
| 2020 12|     Marsel|                2|
| 2020 11|     Ararat|                1|
| 2020 11|     Marsel|                2|
| 2020 10|     Marsel|                1|
| 2020 10|        KFC|                2|
|  2020 9|     Marsel|                1|
|  2020 8|     Ararat|                1|
|  2020 8|        KFC|                2|
|  2020 7|        KFC|                1|
|  2020 6|     Ararat|                1|
|  2020 5|     Ararat|                1|
|  2020 3|     Ararat|                2|
|  2020 3|     Ararat|                1|
|  2020 2|     Ararat|                1|
|  2020 2|        KFC|                2|
|  2020 1|     Marsel|                1|
| 2019 12|     Marsel|                1|
+--------+-----------+-----------------+



## Задание_5. Напиши запрос, который выводит следующую таблицу

In [None]:
request_5 = '''
WITH table_5_3 as (WITH table_5_2 as (WITH table_5_1 as (
SELECT orders.user_id, SUM(table_5_J_1.take_rate) as revenue FROM orders LEFT JOIN (
SELECT vendors.take_rate, vendors.vendor_id FROM vendors) as table_5_J_1 on table_5_J_1.vendor_id = orders.vendor_id GROUP BY orders.user_id ORDER BY orders.user_id)
SELECT *, (table_5_1.user_id * 0 + (SELECT SUM(table_5_1.revenue) FROM table_5_1)) as total_revenue FROM table_5_1 GROUP BY table_5_1.user_id, table_5_1.revenue ORDER BY table_5_1.user_id)
SELECT *, ROUND(table_5_2.revenue / table_5_2.total_revenue * 100, 2) as share FROM table_5_2)
SELECT *, ROUND(COALESCE(SUM(table_5_3.share) over (order by table_5_3.user_id rows between unbounded preceding and current row), 0), 2) as cumulative_share FROM table_5_3
'''
spark.sql(request_5).show()

+-------+-------+-------------+-----+----------------+
|user_id|revenue|total_revenue|share|cumulative_share|
+-------+-------+-------------+-----+----------------+
|      1|     10|          465| 2.15|            2.15|
|      2|     10|          465| 2.15|             4.3|
|      3|     10|          465| 2.15|            6.45|
|     11|     30|          465| 6.45|            12.9|
|     22|     60|          465| 12.9|            25.8|
|     33|     45|          465| 9.68|           35.48|
|     44|     45|          465| 9.68|           45.16|
|     55|     60|          465| 12.9|           58.06|
|     66|     30|          465| 6.45|           64.51|
|     77|     45|          465| 9.68|           74.19|
|     88|     60|          465| 12.9|           87.09|
|     99|     60|          465| 12.9|           99.99|
+-------+-------+-------------+-----+----------------+

