In [90]:
top_n = 5
top_year = "1992"
output_path = "/user/n.mokrenko/hometask_2"

In [96]:
from pyspark.sql.types import *
import pyspark.sql.functions as sf
import pyspark.sql.window as w

### Создаем датасеты из таблиц базы данных tpch_flat_orc_2

In [97]:
nationDF = spark.read.table("tpch_flat_orc_2.nation")
customerDF = spark.read.table("tpch_flat_orc_2.customer")
regionDF = spark.read.table("tpch_flat_orc_2.region")
ordersDF = spark.read.table("tpch_flat_orc_2.orders")

### Создаем новый датафрейм с топ-n пользователей с максимальным количеством заказов за год top_year

In [99]:
cnt_cond = lambda cond: sf.sum(sf.when(cond, 1).otherwise(0)) # функция проверки соответсвия значения ячейки значению cond
joined_df = ordersDF.join(
    customerDF, ordersDF.o_custkey == customerDF.c_custkey, 'left').withColumn( 
    'top_year',sf.lit(top_year)).groupBy('c_custkey').agg( # добавляем колонку top_year, группируем по Id-шнику
    cnt_cond((sf.col('o_custkey') == sf.col('c_custkey')) & ordersDF.o_orderdate.like('1992%')).alias('orders_count'), #столбец orders_count, содержащий количество заказов пользователя
    sf.min(sf.col('top_year')).alias('top_year')
).orderBy(
    sf.desc('orders_count')).limit(top_n).withColumn( # сортируем по убыванию количества заказов
    'row_number', sf.row_number().over(w.Window.orderBy('top_year')))

### Джойним таблицу покупателей с таблицей стран, а затем с таблицей регионов

In [100]:
cust_join = customerDF.join(
    nationDF, customerDF.c_nationkey == nationDF.n_nationkey).join(
    regionDF, nationDF.n_regionkey == regionDF.r_regionkey).select( # деламем select нужных нам столбцов    
    sf.col('c_custkey'),
    sf.col('c_name').alias('customer_name'),
    sf.col('n_name').alias('customer_nation_name'),
    sf.col('r_name').alias('customer_region_name')
    )

### Соединяем два предыдущих датафрейма, получаем таблицу с искомыми значениями

In [102]:
result_df = joined_df.join(
    cust_join, cust_join.c_custkey == joined_df.c_custkey).select(
        'top_year','row_number', 'customer_name', 'customer_nation_name', 'customer_region_name','orders_count'
    ).orderBy('row_number')

### Создаем csv с нашим окончательным датафреймом и записываем его в output path

In [104]:
result_df.sortWithinPartitions('row_number').coalesce(1).write.csv(path=output_path, mode="overwrite", sep="\t", header=True)

### Проверка

In [105]:
spark.read.csv(output_path, header=True, sep = '\t').show(10)

+--------+----------+------------------+--------------------+--------------------+------------+
|top_year|row_number|     customer_name|customer_nation_name|customer_region_name|orders_count|
+--------+----------+------------------+--------------------+--------------------+------------+
|    1992|         1|Customer#000093871|           INDONESIA|                ASIA|          13|
|    1992|         2|Customer#000283198|              RUSSIA|              EUROPE|          13|
|    1992|         3|Customer#000214702|               CHINA|                ASIA|          12|
|    1992|         4|Customer#000265180|                IRAN|         MIDDLE EAST|          12|
|    1992|         5|Customer#000293968|       UNITED STATES|             AMERICA|          12|
+--------+----------+------------------+--------------------+--------------------+------------+