### Импорт библиотек 

In [1]:
import io
import sys
import uuid

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

### Создаем spark сессию

In [2]:
def _spark_session():
    return (SparkSession.builder
            .appName("SparkJob5-" + uuid.uuid4().hex)
            .getOrCreate())

In [3]:
spark = _spark_session()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/13 17:30:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## spark датафреймы из parquet файлов

In [4]:
lineitem_df = spark.read.parquet('/Users/denis/Desktop/de/lineitem')
orders_df = spark.read.parquet('/Users/denis/Desktop/de/orders')
nation_df = spark.read.parquet('/Users/denis/Desktop/de/nation')
customer_df = spark.read.parquet('/Users/denis/Desktop/de/customer')
region_df = spark.read.parquet('/Users/denis/Desktop/de/region')
supplier_df = spark.read.parquet('/Users/denis/Desktop/de/supplier')
part_df = spark.read.parquet('/Users/denis/Desktop/de/part')
partsupp_df = spark.read.parquet('/Users/denis/Desktop/de/partsupp')

### Агрегация данных Spark

### Обработка данных для отчета по LineItems

Необходимо построить отчет по данным о позициях в заказе (lineitems) содержащий сводную информацию по позициям каждого заказа, когда либо совершенного в системе, группировать данные необходимо по идентификатору заказа.
Используйте сортировку по ключу заказа на возрастание.

In [71]:
lineitem_df \
.select(['L_ORDERKEY', 'L_EXTENDEDPRICE', 'L_DISCOUNT', 
         'L_TAX', 'L_SHIPDATE', 'L_RECEIPTDATE', 'L_RETURNFLAG'])\
.groupby(['L_ORDERKEY'])\
.agg(F.count('L_ORDERKEY').alias("count"),
     F.sum('L_EXTENDEDPRICE').alias("sum_extendprice"),
     F.mean('L_DISCOUNT').alias("mean_discount"),
     F.mean('L_TAX').alias("mean_tax"),
     F.avg(F.datediff(F.col('L_RECEIPTDATE'), F.col('L_SHIPDATE'))).alias("delivery_days"),
     F.sum(F.when(F.col('L_RETURNFLAG')=='A', 1).otherwise(0)).alias("A_return_flags"),
     F.sum(F.when(F.col('L_RETURNFLAG')=='R', 1).otherwise(0)).alias("R_return_flags"),
     F.sum(F.when(F.col('L_RETURNFLAG')=='N', 1).otherwise(0)).alias("N_return_flags"))\
     .orderBy(['L_ORDERKEY']).show()

                                                                                

+----------+-----+------------------+--------------------+--------------------+------------------+--------------+--------------+--------------+
|L_ORDERKEY|count|   sum_extendprice|       mean_discount|            mean_tax|     delivery_days|A_return_flags|R_return_flags|N_return_flags|
+----------+-----+------------------+--------------------+--------------------+------------------+--------------+--------------+--------------+
|         1|    6|         195298.34| 0.08166666666666668| 0.03666666666666667| 8.333333333333334|             0|             0|             6|
|         2|    1|          63066.32|                 0.0|                0.05|               5.0|             0|             0|             1|
|         3|    6|         287551.64| 0.06166666666666667|0.024999999999999998|15.833333333333334|             3|             3|             0|
|         4|    1|           39819.0|                0.03|                0.08|               8.0|             0|             0|        

### Обработка данных для отчета по Orders

Необходимо построить отчет по данным о заказах (orders) содержащий сводную информацию по заказам в разрезе месяца, страны откуда был отправлен заказ, а так же приоритета выполняемого заказа. 
Используйте сортировку по названию страны и приоритета заказа на возрастание.

### JOIN SPARK

In [6]:
rdf = orders_df.join(customer_df, orders_df.O_CUSTKEY == customer_df.C_CUSTKEY, how = 'left')
rdf = rdf.join(nation_df, rdf.C_NATIONKEY == nation_df.N_NATIONKEY, how = 'left')

In [25]:
rdf \
.select(['O_ORDERDATE', 'N_NAME', 'O_ORDERPRIORITY', 
         'O_ORDERKEY', 'O_TOTALPRICE', 'O_ORDERSTATUS']) \
.withColumn("O_MONTH",F.date_format(F.to_date("O_ORDERDATE", "yyyy-MM-dd"), "yyyy-MM")) \
.groupby(['O_MONTH', 'N_NAME', 'O_ORDERPRIORITY']) \
.agg(F.count('O_ORDERKEY').alias("orders_count"),
     F.avg('O_TOTALPRICE').alias("avg_order_price"),
     F.sum('O_TOTALPRICE').alias("sum_order_price"),
     F.min('O_TOTALPRICE').alias("min_order_price"), 
     F.max('O_TOTALPRICE').alias("max_order_price"), 
     F.sum(F.when(F.col('O_ORDERSTATUS')=='F', 1).otherwise(0)).alias("f_return_flags"),
     F.sum(F.when(F.col('O_ORDERSTATUS') == 'O', 1).otherwise(0)).alias("o_order_status"),
     F.sum(F.when(F.col('O_ORDERSTATUS') == 'P', 1).otherwise(0)).alias("p_order_status")
    ).orderBy(['N_NAME', 'O_ORDERPRIORITY']).show()

24/02/12 23:20:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/12 23:20:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/12 23:20:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/12 23:20:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/12 23:20:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/12 23:20:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/12 23:20:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/12 23:20:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/12 23:20:33 WARN RowBasedKeyValueBatch: Calling spill() on

+-------+-------+---------------+------------+------------------+--------------------+---------------+---------------+--------------+--------------+--------------+
|O_MONTH| N_NAME|O_ORDERPRIORITY|orders_count|   avg_order_price|     sum_order_price|min_order_price|max_order_price|f_return_flags|o_order_status|p_order_status|
+-------+-------+---------------+------------+------------------+--------------------+---------------+---------------+--------------+--------------+--------------+
|1996-04|ALGERIA|       1-URGENT|        1469|152267.07248468342|2.2368032947999996E8|        1043.52|      469164.85|             0|          1469|             0|
|1992-04|ALGERIA|       1-URGENT|        1512|149576.27086640213|2.2615932155000004E8|         979.39|      443304.14|          1512|             0|             0|
|1997-08|ALGERIA|       1-URGENT|        1524|149620.19489501312|      2.2802117702E8|        1200.14|      413478.84|             0|          1524|             0|
|1994-10|ALGERIA

### Обработка данных для отчета по Customers

Необходимо построить отчет по данным о клиентах (customers) содержащий сводную информацию по заказам в разрезе месяца, страны откуда был отправлен заказ, а так же приоритета выполняемого заказа. 
Используйте сортировку по названию страны (N_NAME) и приоритета заказа (C_MKTSEGMENT) на возрастание.

### JOIN SPARK

In [33]:
rdf = region_df.join(nation_df, region_df.R_REGIONKEY == nation_df.N_REGIONKEY, how = 'left')
rdf = rdf.join(customer_df, rdf.N_NATIONKEY == customer_df.C_NATIONKEY, how = 'left')

In [47]:
rdf.select(['R_NAME', 'N_NAME', 'C_MKTSEGMENT', 'C_CUSTKEY', 'C_ACCTBAL']) \
.groupby(['R_NAME', 'N_NAME', 'C_MKTSEGMENT']) \
.agg(F.countDistinct('C_CUSTKEY').alias("unique_customers_count"),
     F.avg('C_ACCTBAL').alias("avg_acctbal"),
     F.mean('C_ACCTBAL').alias("mean_acctbal"),
     F.min('C_ACCTBAL').alias("min_acctbal"),
     F.max('C_ACCTBAL').alias("max_acctbal")
    ).orderBy(['N_NAME', 'C_MKTSEGMENT']).show()

24/02/12 23:49:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
[Stage 81:>                                                         (0 + 8) / 8]

+-------+---------+------------+----------------------+------------------+------------------+-----------+-----------+
| R_NAME|   N_NAME|C_MKTSEGMENT|unique_customers_count|       avg_acctbal|      mean_acctbal|min_acctbal|max_acctbal|
+-------+---------+------------+----------------------+------------------+------------------+-----------+-----------+
| AFRICA|  ALGERIA|  AUTOMOBILE|                 11890| 4489.457164003364| 4489.457164003364|    -999.35|    9998.88|
| AFRICA|  ALGERIA|    BUILDING|                 12068| 4494.172725389461| 4494.172725389461|    -999.29|    9998.43|
| AFRICA|  ALGERIA|   FURNITURE|                 11826| 4480.853280060884| 4480.853280060884|    -999.89|    9999.01|
| AFRICA|  ALGERIA|   HOUSEHOLD|                 12142| 4489.274984351838| 4489.274984351838|    -998.98|    9998.86|
| AFRICA|  ALGERIA|   MACHINERY|                 11990| 4471.501088407005| 4471.501088407005|    -999.46|    9998.69|
|AMERICA|ARGENTINA|  AUTOMOBILE|                 11992| 

                                                                                

### Обработка данных для отчета по Suppliers

Необходимо построить отчет по данным о поставщиках(suppliers) содержащий сводную информацию в разрезе страны и региона поставщика. 
Используйте сортировку по названию страны (N_NAME) и региона (R_NAME) на возрастание.

In [5]:
rdf = region_df.join(nation_df, region_df.R_REGIONKEY == nation_df.N_REGIONKEY, how = 'left')
rdf = rdf.join(supplier_df, rdf.N_NATIONKEY == supplier_df.S_NATIONKEY, how = 'left')

In [11]:
rdf \
.select(['R_NAME', 'N_NAME', 'S_SUPPKEY', 'S_ACCTBAL']) \
.groupby(['R_NAME', 'N_NAME']) \
.agg(F.countDistinct('S_SUPPKEY').alias("unique_supplers_count"),
     F.avg('S_ACCTBAL').alias("avg_acctbal"),
     F.mean('S_ACCTBAL').alias("mean_acctbal"),
     F.min('S_ACCTBAL').alias("min_acctbal"),
     F.max('S_ACCTBAL').alias("max_acctbal")
    ).orderBy(['N_NAME', 'R_NAME']).show()

+-----------+----------+---------------------+------------------+------------------+-----------+-----------+
|     R_NAME|    N_NAME|unique_supplers_count|       avg_acctbal|      mean_acctbal|min_acctbal|max_acctbal|
+-----------+----------+---------------------+------------------+------------------+-----------+-----------+
|     AFRICA|   ALGERIA|                 3934| 4486.125091509919| 4486.125091509919|    -998.44|    9999.01|
|    AMERICA| ARGENTINA|                 4007| 4487.078779635633| 4487.078779635633|    -995.61|    9998.69|
|    AMERICA|    BRAZIL|                 3995| 4537.748092615765| 4537.748092615765|    -996.42|    9994.95|
|    AMERICA|    CANADA|                 4054|  4574.45507153429|  4574.45507153429|    -997.13|    9999.93|
|       ASIA|     CHINA|                 3988| 4508.844879638919| 4508.844879638919|    -997.43|    9999.57|
|MIDDLE EAST|     EGYPT|                 3981| 4593.200592815871| 4593.200592815871|    -994.14|     9998.2|
|     AFRICA|  ETHI

### Обработка данных для отчета по Part

Необходимо построить отчет по данным о грузоперевозках (part) содержащий сводную информацию в разрезе страны поставки (N_NAME), типа поставки (P_TYPE) и типа контейнера (P_CONTAINER). 
Используйте сортировку по названию страны (N_NAME) , типа поставки (P_TYPE) и типа контейнера (P_CONTAINER) на возрастание.

In [18]:
rdf = supplier_df.join(nation_df, supplier_df.S_NATIONKEY == nation_df.N_NATIONKEY, how = 'left')
rdf = rdf.join(partsupp_df, rdf.S_SUPPKEY == partsupp_df.PS_SUPPKEY, how = 'left')
rdf = rdf.join(part_df, rdf.PS_PARTKEY == part_df.P_PARTKEY, how = 'left')

In [19]:
rdf = rdf \
.select(['N_NAME', 'P_TYPE', 'P_CONTAINER', 'PS_PARTKEY',
         'P_RETAILPRICE', 'P_SIZE', 'P_RETAILPRICE', 'PS_SUPPLYCOST']) \
.groupby(['N_NAME', 'P_TYPE', 'P_CONTAINER']) \
.agg(F.countDistinct('PS_PARTKEY').alias("parts_count"),
     F.avg('P_RETAILPRICE').alias("avg_retailprice"),
     F.sum('P_SIZE').alias("size"),
     F.mean('P_RETAILPRICE').alias("mean_retailprice"),
     F.min('P_RETAILPRICE').alias("min_retailprice"),
     F.max('P_RETAILPRICE').alias("max_retailprice"),
     F.avg('PS_SUPPLYCOST').alias("avg_supplycost"),
     F.mean('PS_SUPPLYCOST').alias("mean_supplycost"),
     F.min('PS_SUPPLYCOST').alias("min_supplycost"),
     F.max('PS_SUPPLYCOST').alias("max_supplycost")
).orderBy(['N_NAME', 'P_TYPE', 'P_CONTAINER'])
rdf.show()

24/02/13 17:36:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/13 17:36:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/13 17:36:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/13 17:36:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/13 17:36:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/13 17:36:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/13 17:36:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/13 17:36:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/13 17:36:11 WARN RowBasedKeyValueBatch: Calling spill() on

+-------+--------------------+-----------+-----------+------------------+----+------------------+---------------+---------------+------------------+------------------+--------------+--------------+
| N_NAME|              P_TYPE|P_CONTAINER|parts_count|   avg_retailprice|size|  mean_retailprice|min_retailprice|max_retailprice|    avg_supplycost|   mean_supplycost|min_supplycost|max_supplycost|
+-------+--------------------+-----------+-----------+------------------+----+------------------+---------------+---------------+------------------+------------------+--------------+--------------+
|ALGERIA|ECONOMY ANODIZED ...|  JUMBO BAG|         41|1449.0938636363635|1119|1449.0938636363635|         978.02|        1958.94|           485.485|           485.485|          4.43|         936.8|
|ALGERIA|ECONOMY ANODIZED ...|  JUMBO BOX|         53| 1507.499649122807|1358| 1507.499649122807|         944.95|        2014.88| 508.1329824561403| 508.1329824561403|         25.15|        988.16|
|ALGERIA|E

                                                                                