In [1]:
import pandas as pd
from pyspark.sql import SparkSession
import os
from pathlib import Path
from pyspark.sql.types import * # загрузили все типы данных
#from pyspark.sql.functions import col,lower, upper, substring, lit, round
import pyspark.sql.functions as F


In [2]:
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" # без этой строчки у нас будет возникать постоянное предупреждение с просьбой установить эту переменную в значение 1, что мы заранее и делаем

spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark

25/01/10 01:10:33 WARN Utils: Your hostname, igel-pn64 resolves to a loopback address: 127.0.1.1; using 192.168.88.122 instead (on interface wlp3s0)
25/01/10 01:10:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/10 01:10:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
table_infos = {'customers':  [('customer_id', IntegerType()),
                              ('first_name', StringType()),
                              ('last_name',  StringType()),
                              ('phone',  StringType()),
                              ('email',  StringType()),
                              ('street', StringType()),
                              ('city',  StringType()),
                              ('state',  StringType()),
                              ('zip_code',  IntegerType())],
               'brands': [('brand_id', IntegerType()),
                          ('brand_name', StringType())],
               'products': [('product_id', IntegerType()),
                            ('product_name', StringType()),
                            ('brand_id', IntegerType()),
                            ('category_id', IntegerType()),
                            ('model_year', IntegerType()),
                            ('list_price', FloatType())],
               'categories': [('category_id', IntegerType()),
                              ('category_name', StringType())],
               'stores': [('store_id', IntegerType()),
                          ('store_name', StringType()),
                          ('phone',  StringType()),
                          ('email',  StringType()),
                          ('street', StringType()),
                          ('city',  StringType()),
                          ('state',  StringType()),
                          ('zip_code',  IntegerType())],
               'staffs': [('staff_id', IntegerType()),
                          ('first_name', StringType()),
                          ('last_name',  StringType()),
                          ('email',  StringType()),
                          ('phone',  StringType()),
                          ('active',  IntegerType()),
                          ('store_id',  IntegerType()),
                          ('manager_id',  IntegerType())],
               'orders': [('order_id', IntegerType()),
                          ('customer_id', IntegerType()),
                          ('order_status', IntegerType()),
                          ('order_date', DateType()),
                          ('required_date', DateType()),
                          ('shipped_date', DateType()),
                          ('store_id', IntegerType()),
                          ('staff_id', IntegerType())],
               'stocks': [('store_id', IntegerType()),
                          ('product_id', IntegerType()),
                          ('quantity', IntegerType())],
               'order_items': [('order_id', IntegerType()),
                               ('item_id', IntegerType()),
                               ('product_id', IntegerType()),
                               ('quantity', IntegerType()),
                              ('list_price', FloatType()),
                              ('discount', FloatType())]
                 } 


In [4]:
schema = lambda t_s: StructType([StructField (x[0], x[1], True) for x in t_s])

# загрузка данных
data_directory = 'data'
#table_names = ['brands', 'categories', 'stores', 'products', 'customers', 'staffs', 'orders', 'order_items', 'stocks']
tables_df = {}
for table_name, table_struct in table_infos.items():
    print(f'Загрузка данных {table_name}')
    filename = Path(data_directory, f'{table_name}.csv').as_posix()
    #schema = StructType([StructField (x[0], x[1], True) for x in table_struct])
    df = spark.read.csv(filename, header=True, sep=",", schema=schema(table_struct)) # добавить информацию о типах столбцов
    tables_df[table_name] = df
    #if table_name == 'brands':
    #    print(schema)
    #    print(tables_df[table_name].count())
    #    tables_df[table_name].printSchema() # для вывода структуры (схемы) DataFrame
    #    df.show()
    print(f'  Загружено: {tables_df[table_name].count()}')


Загрузка данных customers
  Загружено: 1445
Загрузка данных brands
  Загружено: 9
Загрузка данных products
  Загружено: 321
Загрузка данных categories
  Загружено: 7
Загрузка данных stores
  Загружено: 3
Загрузка данных staffs
  Загружено: 10
Загрузка данных orders
  Загружено: 1615
Загрузка данных stocks
  Загружено: 939
Загрузка данных order_items
  Загружено: 4722


In [5]:
customers_df = tables_df['customers']
brands_df = tables_df['brands']
products_df = tables_df['products']
categories_df = tables_df['categories']
stores_df = tables_df['stores']
staffs_df = tables_df['staffs']
orders_df = tables_df['orders']
stocks_df = tables_df['stocks']
order_items_df = tables_df['order_items']

In [6]:
is_test_doing = True # признак тестового прогона
if is_test_doing:
    # Добавить категорию без продуктов
    new_category = spark.createDataFrame([(10000, "Категория с продуктами")], categories_df.columns)
    categories_df = categories_df.union(new_category)

    #  Добавить заказчика без заказов  
    new_customer = spark.createDataFrame([(10000, "Заказчик", "Пустой", "3333", "1@1.1", "", "", "", "")], customers_df.columns)
    customers_df = customers_df.union(new_customer)

    # Добавить сотрудника без магазина
    new_staff = spark.createDataFrame([(10000, "Сотрудник", "Без магазина", "1@1.1", "912", 1, None, None)],
                                      schema=schema(table_infos["staffs"]))
    staffs_df = staffs_df.union(new_staff)

In [7]:
# 1) Напишите запрос, чтобы получить все названия продуктов и соответствующие им торговые марки (brand).
products_df. \
    join(brands_df, products_df.brand_id == brands_df.brand_id). \
    select(products_df.product_name, brands_df.brand_name). \
    show()

+--------------------+-----------+
|        product_name| brand_name|
+--------------------+-----------+
|     Trek 820 - 2016|       Trek|
|Ritchey Timberwol...|    Ritchey|
|Surly Wednesday F...|      Surly|
|Trek Fuel EX 8 29...|       Trek|
|Heller Shagamaw F...|     Heller|
|Surly Ice Cream T...|      Surly|
|Trek Slash 8 27.5...|       Trek|
|Trek Remedy 29 Ca...|       Trek|
|Trek Conduit+ - 2016|       Trek|
|Surly Straggler -...|      Surly|
|Surly Straggler 6...|      Surly|
|Electra Townie Or...|    Electra|
|Electra Cruiser 1...|    Electra|
|Electra Girl's Ha...|    Electra|
|Electra Moto 1 - ...|    Electra|
|Electra Townie Or...|    Electra|
|Pure Cycles Vine ...|Pure Cycles|
|Pure Cycles Weste...|Pure Cycles|
|Pure Cycles Willi...|Pure Cycles|
|Electra Townie Or...|    Electra|
+--------------------+-----------+
only showing top 20 rows



In [8]:
# 2) Напишите запрос, чтобы найти всех активных сотрудников и наименования магазинов, в которых они работают.
staffs_df. \
    join(stores_df, staffs_df.store_id == stores_df.store_id, 'outer'). \
    select(staffs_df.first_name, staffs_df.last_name, stores_df.store_name). \
    filter(staffs_df.active == 1). \
    orderBy(stores_df.store_name, staffs_df.first_name, staffs_df.last_name). \
    show()

+----------+------------+----------------+
|first_name|   last_name|      store_name|
+----------+------------+----------------+
| Сотрудник|Без магазина|            NULL|
|  Jannette|       David|   Baldwin Bikes|
| Marcelene|       Boyer|   Baldwin Bikes|
|    Venita|      Daniel|   Baldwin Bikes|
|Bernardine|     Houston|   Rowlett Bikes|
|      Kali|      Vargas|   Rowlett Bikes|
|     Layla|     Terrell|   Rowlett Bikes|
|   Fabiola|     Jackson|Santa Cruz Bikes|
|     Genna|     Serrano|Santa Cruz Bikes|
|    Mireya|    Copeland|Santa Cruz Bikes|
|    Virgie|     Wiggins|Santa Cruz Bikes|
+----------+------------+----------------+



                                                                                

In [9]:
# 3) Напишите запрос, чтобы перечислить всех покупателей выбранного магазина с указанием их полных имен, 
#    электронной почты и номера телефон
customers_df. \
    join(orders_df, customers_df.customer_id == orders_df.customer_id). \
    select(customers_df.first_name, customers_df.last_name, customers_df.email, customers_df.phone). \
    filter(orders_df.store_id == 2). \
    orderBy(customers_df.first_name, customers_df.last_name). \
    show()

+----------+---------+--------------------+--------------+
|first_name|last_name|               email|         phone|
+----------+---------+--------------------+--------------+
|     Aaron|    Knapp|aaron.knapp@yahoo...|(914) 402-4335|
|     Abbey|     Pugh|abbey.pugh@gmail.com|          NULL|
|      Abby|   Gamble| abby.gamble@aol.com|          NULL|
|      Abby|   Gamble| abby.gamble@aol.com|          NULL|
|      Adam| Thornton|adam.thornton@hot...|          NULL|
|     Addie|     Hahn|addie.hahn@hotmai...|          NULL|
|    Adelle|   Larsen|adelle.larsen@gma...|          NULL|
|    Adelle|   Larsen|adelle.larsen@gma...|          NULL|
|     Adena|    Blake|adena.blake@hotma...|          NULL|
|    Adrien|   Hunter|adrien.hunter@yah...|          NULL|
|   Adriene|  Rollins|adriene.rollins@m...|          NULL|
|     Afton|   Juarez|afton.juarez@gmai...|          NULL|
|    Agatha|   Melton|agatha.melton@yah...|          NULL|
|     Agnes|     Sims|  agnes.sims@aol.com|(716) 780-990

In [10]:
# 4) Напишите запрос для подсчета количества продуктов в каждой категории.
categories_df. \
  join(products_df, products_df.category_id == categories_df.category_id). \
  groupBy(categories_df.category_name). \
  count(). \
  union (categories_df.\
          join(products_df, products_df.category_id == categories_df.category_id, how='left_anti').\
          select(categories_df.category_name). \
          withColumn('count', F.lit(0))).\
  show()

+--------------------+-----+
|       category_name|count|
+--------------------+-----+
|      Electric Bikes|   24|
|      Mountain Bikes|   60|
|    Comfort Bicycles|   30|
|          Road Bikes|   60|
| Cyclocross Bicycles|   10|
|   Children Bicycles|   59|
|   Cruisers Bicycles|   78|
|Категория с проду...|    0|
+--------------------+-----+



In [11]:
# 5) Напишите запрос, чтобы указать общее количество заказов для каждого клиента.
customers_df. \
  join(orders_df, customers_df.customer_id == orders_df.customer_id). \
  groupBy(customers_df.customer_id). \
  count(). \
  union(customers_df. \
    join(orders_df, customers_df.customer_id == orders_df.customer_id, how='left_anti'). \
    select(customers_df.customer_id). \
    withColumn('count', F.lit(0))).\
  orderBy("count"). \
  show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|      10000|    0|
|       1371|    1|
|       1077|    1|
|       1409|    1|
|        243|    1|
|        278|    1|
|        964|    1|
|        367|    1|
|        541|    1|
|        442|    1|
|       1010|    1|
|        705|    1|
|       1258|    1|
|        720|    1|
|       1360|    1|
|       1175|    1|
|        270|    1|
|       1202|    1|
|        730|    1|
|        296|    1|
+-----------+-----+
only showing top 20 rows



In [12]:
# 6) Напишите запрос, в котором будет указана информация о полном имени и общем количестве заказов клиентов, которые хотя бы 1 раз сделали заказ.
customers_df. \
  join(orders_df, customers_df.customer_id == orders_df.customer_id). \
  groupBy(customers_df.first_name, customers_df.last_name). \
  count(). \
  orderBy("count"). \
  show()

+-----------+---------+-----+
| first_name|last_name|count|
+-----------+---------+-----+
|    Yevette|     Todd|    1|
|    Caridad|  Compton|    1|
|       Inge|    Olsen|    1|
|     Loreta| Johnston|    1|
|       Dori|  Alvarez|    1|
|      Shara|     Pope|    1|
|    Chasidy|  Webster|    1|
|    Novella|    Patel|    1|
|        Ann|    Heath|    1|
|      Zella|Fernandez|    1|
|      Cinda|    Rocha|    1|
|       Ping|    Quinn|    1|
|      Katia|    Henry|    1|
|        Roy|     Chan|    1|
|      Doris|  Kaufman|    1|
|   Angelika|    Perry|    1|
|Charlesetta|     Soto|    1|
|      Angie|   Powers|    1|
|   Herminia|    Reyes|    1|
|    Mellisa|  Griffin|    1|
+-----------+---------+-----+
only showing top 20 rows



In [13]:
# Напишите запрос для расчета общего объема продаж по каждому продукту (с учетом количества продукта, его цены по прейскуранту и скидки).
order_items_df = order_items_df.withColumn('sales_volume', F.round(order_items_df.list_price * order_items_df.quantity * (1 -order_items_df.discount), 2))
order_items_df.\
    groupBy(order_items_df.product_id).\
    sum('sales_volume').\
    show()

+----------+------------------+
|product_id| sum(sales_volume)|
+----------+------------------+
|       148|   14739.970703125|
|       243|  3875.93994140625|
|        31|26438.090209960938|
|        85|  8464.19970703125|
|       251| 16649.93994140625|
|       137|11305.650146484375|
|        65|18681.880493164062|
|        53| 14909.77978515625|
|       255| 2456.969970703125|
|       296|  295.989990234375|
|       133| 4139.969970703125|
|        78|22899.930541992188|
|       108|     12622.1796875|
|       155|   54359.951171875|
|        34| 6781.939849853516|
|       193|12403.949951171875|
|       211|  2943.10009765625|
|       101|12079.799743652344|
|       115|14431.949951171875|
|       126|2928.0299377441406|
+----------+------------------+
only showing top 20 rows



In [14]:
# Напишите запрос с расчетом количества заказов по каждому статусу заказа.
orders_df.\
  groupBy(orders_df.order_status).\
  count().\
  show()

+------------+-----+
|order_status|count|
+------------+-----+
|           1|   62|
|           3|   45|
|           4| 1445|
|           2|   63|
+------------+-----+



In [15]:
# Напишите запрос для расчета общей суммы продаж за каждый месяц.
orders_df = orders_df.\
              withColumn('order_date_year', F.year(orders_df.order_date)).\
              withColumn('order_date_month', F.month(orders_df.order_date))
orders_df.\
  join(order_items_df, orders_df.order_id == order_items_df.order_id).\
  groupBy(orders_df.order_date_year, orders_df.order_date_month).\
  sum('sales_volume').\
  orderBy(orders_df.order_date_year, orders_df.order_date_month).\
  show()

+---------------+----------------+------------------+
|order_date_year|order_date_month| sum(sales_volume)|
+---------------+----------------+------------------+
|           2016|               1|215146.22846984863|
|           2016|               2|156112.10897827148|
|           2016|               3|180600.12924194336|
|           2016|               4| 167143.8988647461|
|           2016|               5|205269.81867980957|
|           2016|               6| 210561.9485168457|
|           2016|               7|199556.60835266113|
|           2016|               8| 225657.1583404541|
|           2016|               9|273091.36808776855|
|           2016|              10|212077.85832214355|
|           2016|              11|182329.23866271973|
|           2016|              12|199829.78840637207|
|           2017|               1| 285616.2632522583|
|           2017|               2| 312923.4736251831|
|           2017|               3| 308911.6022415161|
|           2017|           

In [16]:
# Напишите запрос, чтобы найти топ 5 клиентов, которые потратили больше всего денег.
orders_df.\
  join(order_items_df, orders_df.order_id == order_items_df.order_id).\
  join(customers_df, orders_df.customer_id == customers_df.customer_id).\
  groupBy(customers_df.first_name, customers_df.last_name). \
  sum('sales_volume').\
  orderBy('sum(sales_volume)', ascending=False). \
  show(5)

+----------+---------+------------------+
|first_name|last_name| sum(sales_volume)|
+----------+---------+------------------+
|    Sharyn|  Hopkins|34807.931091308594|
|   Pamelia|   Newman| 33634.24038696289|
|      Abby|   Gamble| 32802.99040222168|
|   Lyndsey|     Bean|32675.060485839844|
|    Emmitt|  Sanchez|31925.870529174805|
+----------+---------+------------------+
only showing top 5 rows



25/01/10 01:10:50 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
