# ETL: автоматизация подготовки данных семинары

## Урок 3. Получение денормализованных таблиц из нормализованных

1. Денормализуйте таблицу так, чтобы не нужно было для каждого рекламодателя постоянно подсчитывать количество кампаний и продаж.

In [1]:
import init_spark_env

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct

In [3]:
# Создадим Spark сессию
spark = SparkSession.builder \
    .appName("Denormalization") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/15 22:06:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark

In [7]:
# Создание нормализованных таблиц
# Таблица "Рекламодатели"
advertisers_data = [
    (1, 'Advertiser A'),
    (2, 'Advertiser B'),
    (3, 'Advertiser C')
]
advertisers = spark.createDataFrame(advertisers_data, ["UniqueID", "Name"])

advertisers.show()

+--------+------------+
|UniqueID|        Name|
+--------+------------+
|       1|Advertiser A|
|       2|Advertiser B|
|       3|Advertiser C|
+--------+------------+



In [8]:
# Таблица "Кампании"
campaigns_data = [
    (1, 'Campaign 1'),
    (2, 'Campaign 2'),
    (3, 'Campaign 3'),
    (4, 'Campaign 4')
]
campaigns = spark.createDataFrame(campaigns_data, ["UniqueID", "campaign_name"])

campaigns.show()

+--------+-------------+
|UniqueID|campaign_name|
+--------+-------------+
|       1|   Campaign 1|
|       2|   Campaign 2|
|       3|   Campaign 3|
|       4|   Campaign 4|
+--------+-------------+



In [9]:
# Таблица "Продажи"
sales_data = [
    (1, 1, 1),
    (2, 1, 2),
    (3, 2, 3),
    (4, 3, 4),
    (5, 3, 4)
]
sales = spark.createDataFrame(sales_data, ["UniqueID", "advertise_id", "campaign_id"])

sales.show()


+--------+------------+-----------+
|UniqueID|advertise_id|campaign_id|
+--------+------------+-----------+
|       1|           1|          1|
|       2|           1|          2|
|       3|           2|          3|
|       4|           3|          4|
|       5|           3|          4|
+--------+------------+-----------+



In [16]:
# Денормализация данных
# Подсчитаем количество уникальных кампаний и количество продаж для каждого рекламодателя
campaign_counts = sales.groupBy("advertise_id").agg(countDistinct("campaign_id").alias("campaign_count"))
sales_counts = sales.groupBy("advertise_id").count().withColumnRenamed("count", "sales_count")

In [20]:
# Объединим данные по рекламодателям с количеством кампаний и продаж
denormalized_data = advertisers.join(campaign_counts, advertisers.UniqueID == campaign_counts.advertise_id, "left") \
                               .join(sales_counts, advertisers.UniqueID == sales_counts.advertise_id, "left") \
                               .drop("advertise_id")

In [21]:
# Заполним отсутствующие значения нулями и выведем результат
denormalized_data = denormalized_data.fillna(0)
denormalized_data.show()

                                                                                

+--------+------------+--------------+-----------+
|UniqueID|        Name|campaign_count|sales_count|
+--------+------------+--------------+-----------+
|       1|Advertiser A|             2|          2|
|       2|Advertiser B|             1|          1|
|       3|Advertiser C|             1|          2|
+--------+------------+--------------+-----------+



2. В базе данных есть две таблицы: страны и клиенты. Одной из потребностей компании является исследование клиентов и стран с точки зрения эффективности продаж, поэтому часто выполняются объединения между таблицами: клиенты и страны. Что нужно сделать, чтобы ограничить частое объединение этих двух таблиц?

In [23]:
# Создание нормализованных таблиц
# Таблица "Страны"
countries_data = [
    (1, 'USA'),
    (2, 'Canada'),
    (3, 'France')
]
countries = spark.createDataFrame(countries_data, ["UniqueID", "country_name"])

countries.show()

+--------+------------+
|UniqueID|country_name|
+--------+------------+
|       1|         USA|
|       2|      Canada|
|       3|      France|
+--------+------------+



In [24]:
# Таблица "Клиенты"
customers_data = [
    (1, 'John Doe', 1),
    (2, 'Jane Smith', 2),
    (3, 'Alice Brown', 3),
    (4, 'Bob Davis', 1)
]
customers = spark.createDataFrame(customers_data, ["UniqueID", "customer_name", "country_id"])

customers.show()

+--------+-------------+----------+
|UniqueID|customer_name|country_id|
+--------+-------------+----------+
|       1|     John Doe|         1|
|       2|   Jane Smith|         2|
|       3|  Alice Brown|         3|
|       4|    Bob Davis|         1|
+--------+-------------+----------+



In [26]:
# Денормализация данных
# Объединим таблицу клиентов с таблицей стран, чтобы добавить информацию о стране
denormalized_data = customers.join(countries, customers.country_id == countries.UniqueID, "left") \
                             .drop("country_id") \
                             .withColumnRenamed("country_name", "customer_country")

# Вывод денормализованной таблицы
denormalized_data.show()

                                                                                

+--------+-------------+--------+----------------+
|UniqueID|customer_name|UniqueID|customer_country|
+--------+-------------+--------+----------------+
|       1|     John Doe|       1|             USA|
|       2|   Jane Smith|       2|          Canada|
|       3|  Alice Brown|       3|          France|
|       4|    Bob Davis|       1|             USA|
+--------+-------------+--------+----------------+



3. Вернемся к первому примеру. Предположим, компания хочет регулярно извлекать данные о продажах, например, о кампаниях или рекламодателях с полными именами. Как мы можем решить проблему постоянной необходимости
объединения таблиц?

In [27]:
# Создание нормализованных таблиц
# Таблица "Рекламодатели"
advertisers_data = [
    (1, 'Advertiser A'),
    (2, 'Advertiser B'),
    (3, 'Advertiser C')
]
advertisers = spark.createDataFrame(advertisers_data, ["UniqueID", "Name"])

advertisers.show()



+--------+------------+
|UniqueID|        Name|
+--------+------------+
|       1|Advertiser A|
|       2|Advertiser B|
|       3|Advertiser C|
+--------+------------+



In [28]:
# Таблица "Кампании"
campaigns_data = [
    (1, 'Campaign 1'),
    (2, 'Campaign 2'),
    (3, 'Campaign 3'),
    (4, 'Campaign 4')
]
campaigns = spark.createDataFrame(campaigns_data, ["UniqueID", "campaign_name"])

campaigns.show()



+--------+-------------+
|UniqueID|campaign_name|
+--------+-------------+
|       1|   Campaign 1|
|       2|   Campaign 2|
|       3|   Campaign 3|
|       4|   Campaign 4|
+--------+-------------+



In [29]:
# Таблица "Продажи"
sales_data = [
    (1, 1, 1),
    (2, 1, 2),
    (3, 2, 3),
    (4, 3, 4),
    (5, 3, 4)
]
sales = spark.createDataFrame(sales_data, ["UniqueID", "advertise_id", "campaign_id"])

sales.show()

+--------+------------+-----------+
|UniqueID|advertise_id|campaign_id|
+--------+------------+-----------+
|       1|           1|          1|
|       2|           1|          2|
|       3|           2|          3|
|       4|           3|          4|
|       5|           3|          4|
+--------+------------+-----------+



In [32]:
# Денормализация данных
# Присоединим информацию о рекламодателях и кампаниях к таблице "Продажи"
denormalized_sales = sales.join(advertisers, sales.advertise_id == advertisers.UniqueID, "left") \
                          .join(campaigns, sales.campaign_id == campaigns.UniqueID, "left") \
                          .drop(advertisers.UniqueID) \
                          .drop(campaigns.UniqueID) \
                          .withColumnRenamed("Name", "advertiser_name") \
                          .withColumnRenamed("campaign_name", "campaign_name")

# Шаг 4: Вывод денормализованной таблицы
denormalized_sales.show()

+--------+------------+-----------+---------------+-------------+
|UniqueID|advertise_id|campaign_id|advertiser_name|campaign_name|
+--------+------------+-----------+---------------+-------------+
|       1|           1|          1|   Advertiser A|   Campaign 1|
|       3|           2|          3|   Advertiser B|   Campaign 3|
|       2|           1|          2|   Advertiser A|   Campaign 2|
|       4|           3|          4|   Advertiser C|   Campaign 4|
|       5|           3|          4|   Advertiser C|   Campaign 4|
+--------+------------+-----------+---------------+-------------+



                                                                                

In [33]:
spark.stop()