# Мини-проект по Spark и ClickHouse

## Необходимо обработать данные в Spark и настроить поставку данных в ClickHouse через интеграционные таблицы, а затем выполнить несколько расчетов, чтобы ответить на вопросы бизнеса.

# Исходные данные

## adverts_data.parquet - данные с характеристиками объявлений:
- execution_date - дата, на которую была выгрузка по объявлениям live
- advert_id - id объявления
- mark - марка машины
- model - модель машины
- price - цена машины
- year - год машины
- fuel - тип топлива
- color - цвет
- transmission - коробка передач
- body - тип кузова авто
- country - страна

## live_adverts.parquet – данные с текущими объявлениями:
- execution_date - дата, на которую была выгрузка по объявлениям live
- advert_id - id объявления
- region - регион объявления
- user_id - глобальный ID пользователя (ссылка на user_passports.global_id))
- platform - платформа подачи
 
## user_passports – данные с характеристиками пользователей:
- global_id - глобальный ID пользователя
- user_type_name - тип пользователя

## Схема решения

In [34]:

#Spark connection with S3 options
import os
import socket
from pyspark.sql import SparkSession

# Указываем credentials и параметры для работы с S3
aws_access_key = "YCAJEdS-Tw-yo4Hf834J6HzA1"
aws_secret_key = "YCMdHzm_hstp27wftN2W0yMECpl38mV-b7fr5HhG"
s3_bucket = "kc-hardda-projects"
s3_endpoint_url = "https://storage.yandexcloud.net"

# В переменную student_directory необходимо подставить свой username
student_directory = '/Skuzmenko/'
 
APACHE_MASTER_IP = socket.gethostbyname("apache-spark-master-0.apache-spark-headless.apache-spark.svc.cluster.local")
APACHE_MASTER_URL = f"spark://{APACHE_MASTER_IP}:7077"
POD_IP = os.environ["MY_POD_IP"]
SPARK_APP_NAME = f"spark-{os.environ['HOSTNAME']}"
JARS = """/nfs/env/lib/python3.8/site-packages/pyspark/jars/clickhouse-native-jdbc-shaded-2.6.5.jar, 
/nfs/env/lib/python3.8/site-packages/pyspark/jars/hadoop-aws-3.3.4.jar,
/nfs/env/lib/python3.8/site-packages/pyspark/jars/aws-java-sdk-bundle-1.12.433.jar
"""

MEM = "2048m"
CORES = 1
 
spark = SparkSession.\
        builder.\
        appName(SPARK_APP_NAME).\
        master(APACHE_MASTER_URL).\
        config("spark.executor.memory", MEM).\
        config("spark.jars", JARS).\
        config("spark.executor.cores", CORES).\
        config("spark.driver.host", POD_IP).\
        config("spark.hadoop.fs.s3a.access.key", aws_access_key). \
        config("spark.hadoop.fs.s3a.secret.key", aws_secret_key). \
        config("fs.s3a.endpoint", "https://storage.yandexcloud.net").  \
        config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"). \
        config("spark.hadoop.fs.s3a.path.style.access", True). \
        config("spark.hadoop.fs.s3a.committer.name", "directory"). \
        config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"). \
        getOrCreate()
		

# Читаем файлы с исходными данными
df_adverts_all = spark.read.parquet('s3a://kc-hardda-projects/shared/adverts_data.parquet')
df_live_adverts = spark.read.parquet('s3a://kc-hardda-projects/shared/live_adverts.parquet')
df_user_passports = spark.read.parquet('s3a://kc-hardda-projects/shared/user_passports.parquet')

# Джойним DF
df_flat = df_live_adverts.join(df_adverts_all, on=['execution_date', 'advert_id'], how='left') \
                   .join(df_user_passports, df_live_adverts['user_id'] == df_user_passports['global_id'], how='left')

#---------------------------------------------------------------------------------------------
# Создание папки в S3 (путем создания пустого объекта с именем папки)
try:
    s3.put_object(Bucket=s3_bucket, Key=student_directory)
    print(f"Folder '{student_directory}' created in bucket '{s3_bucket}'")
except NoCredentialsError:
    print("Invalid AWS credentials provided")
#----------------------------------------------------------------------------------------------


# Записываем результат в S3
# путь для записи нужно поменять на свой
df_flat.coalesce(1).write.format("parquet").save("s3a://kc-hardda-projects/Skuzmenko/flat_table", mode='overwrite')

# Если необходимо сохранить файл локально, это можно сделать с помощью этого кода
#pandas_df = df_flat.coalesce(1).toPandas()
#pandas_df.to_parquet('flat_table.parquet')



Folder '/Skuzmenko/' created in bucket 'kc-hardda-projects'


                                                                                

In [41]:
# Получение списка объектов в S3-бакете и нужной директории
response = s3.list_objects_v2(Bucket=s3_bucket, Prefix = 'Skuzmenko')

# Вывод списка объектов
if 'Contents' in response:
    print(f"Objects in bucket '{s3_bucket}':")
    for obj in response['Contents']:
        print(f"- {obj['Key']}")
else:
    print(f"No objects found in bucket '{s3_bucket}'")

Objects in bucket 'kc-hardda-projects':
- Skuzmenko/_SUCCESS
- Skuzmenko/flat_table/_SUCCESS
- Skuzmenko/flat_table/part-00000-0b2423ed-d833-4856-bcf1-299cf517cf3d-c000.snappy.parquet
- Skuzmenko/part-00000-f0ce6a97-d49c-4b56-8be3-c99aa951cc29-c000.snappy.parquet


In [39]:
path_ = "s3a://kc-hardda-projects/Skuzmenko/flat_table"
check = spark.read.parquet(path_)
check.show(5)

[Stage 85:>                                                         (0 + 1) / 1]

+-------------------+---------+------+---------+--------+----+-----+------+----+----+-----+------------+----+-------+---------+--------------+
|     execution_date|advert_id|region|  user_id|platform|mark|model| price|year|fuel|color|transmission|body|country|global_id|user_type_name|
+-------------------+---------+------+---------+--------+----+-----+------+----+----+-----+------------+----+-------+---------+--------------+
|2020-12-25 00:00:00|236092185|Москва|123456986|     ios|null| null|  null|null|null| null|        null|null|   null|123456986|   simple_user|
|2020-12-31 00:00:00|236092185|Москва|123456986|     ios|null| null|  null|null|null| null|        null|null|   null|123456986|   simple_user|
|2021-01-07 00:00:00|236092185|Москва|123456986|     ios|null| null|  null|null|null| null|        null|null|   null|123456986|   simple_user|
|2021-01-24 00:00:00|236092185|Москва|123456986|     ios|null| null|  null|null|null| null|        null|null|   null|123456986|   simple_user|

                                                                                

In [40]:
check.printSchema()

root
 |-- execution_date: timestamp (nullable = true)
 |-- advert_id: long (nullable = true)
 |-- region: string (nullable = true)
 |-- user_id: long (nullable = true)
 |-- platform: string (nullable = true)
 |-- mark: string (nullable = true)
 |-- model: string (nullable = true)
 |-- price: double (nullable = true)
 |-- year: double (nullable = true)
 |-- fuel: string (nullable = true)
 |-- color: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- body: string (nullable = true)
 |-- country: string (nullable = true)
 |-- global_id: long (nullable = true)
 |-- user_type_name: string (nullable = true)



Проанализируем типы данных в файле Parquet сопоставим с типами данных в ClickHouse. https://clickhouse.com/docs/en/interfaces/formats#data-format-parquet

Это позволит правильно указать типы данных при создании таблицы

In [13]:
# Создаем соединение с ClickHouse
from clickhouse_driver import Client

client_st = Client(host='clickhouse.lab.karpov.courses', port=9000, user='hardda_student_s-kuzmenko', password='9fa3838e0331c11878358cea00d5768074017e706ab20d279cf21bf884600c09', database='hardda_student_data')

In [135]:
result = client_st.execute('''drop table if exists prj_s3_ext_Skuzmenko''')

In [136]:
result = client_st.execute('''
CREATE TABLE prj_s3_ext_Skuzmenko
(
execution_date Nullable(DateTime64),
advert_id Nullable(UInt32),
region Nullable(String),
user_id Nullable(UInt32),
platform Nullable(String),
mark Nullable(String),
model Nullable(String),
price Nullable(Float64),
year Nullable(Float64),
fuel Nullable(String),
color Nullable(String),
transmission Nullable(String),
body Nullable(String),
country Nullable(String),
global_id Nullable(UInt32),
user_type_name Nullable(String)
) 
ENGINE = S3('https://storage.yandexcloud.net/kc-hardda-projects/Skuzmenko/flat_table/part-00000-0b2423ed-d833-4856-bcf1-299cf517cf3d-c000.snappy.parquet', "YCAJEdS-Tw-yo4Hf834J6HzA1", "YCMdHzm_hstp27wftN2W0yMECpl38mV-b7fr5HhG", 'Parquet')
''')

In [137]:
# Отправляем запрос
result = client_st.execute("SELECT * FROM prj_s3_ext_Skuzmenko LIMIT 10")

# Обрабатываем результаты запроса
for row in result:
    print(row)

(datetime.datetime(2020, 12, 25, 3, 0), 236092185, 'Москва', 123456986, 'ios', None, None, None, None, None, None, None, None, None, 123456986, 'simple_user')
(datetime.datetime(2020, 12, 31, 3, 0), 236092185, 'Москва', 123456986, 'ios', None, None, None, None, None, None, None, None, None, 123456986, 'simple_user')
(datetime.datetime(2021, 1, 7, 3, 0), 236092185, 'Москва', 123456986, 'ios', None, None, None, None, None, None, None, None, None, 123456986, 'simple_user')
(datetime.datetime(2021, 1, 24, 3, 0), 236092185, 'Москва', 123456986, 'ios', None, None, None, None, None, None, None, None, None, 123456986, 'simple_user')
(datetime.datetime(2021, 1, 25, 3, 0), 256283734, 'Москва', 123456986, 'ios', None, None, 2000.0, None, None, None, None, None, None, 123456986, 'simple_user')
(datetime.datetime(2021, 2, 7, 3, 0), 256283734, 'Москва', 123456986, 'ios', None, None, 2000.0, None, None, None, None, None, None, 123456986, 'simple_user')
(datetime.datetime(2021, 2, 15, 3, 0), 236092185

In [120]:
result = client_st.execute('''drop table if exists prj_main_Skuzmenko''')

In [121]:
result = client_st.execute('''
CREATE TABLE prj_main_Skuzmenko
(
execution_date DateTime64,
advert_id Nullable(UInt32),
region Nullable(String),
user_id Nullable(UInt32),
platform Nullable(String),
mark Nullable(String),
model Nullable(String),
price Nullable(Float64),
year Nullable(Float64),
fuel Nullable(String),
color Nullable(String),
transmission Nullable(String),
body Nullable(String),
country Nullable(String),
global_id Nullable(UInt32),
user_type_name Nullable(String)
) 
ENGINE = MergeTree()
ORDER BY execution_date
''')

In [89]:
result = client_st.execute('''
insert into prj_main_Skuzmenko select * from prj_s3_ext_Skuzmenko
''')

In [3]:
# Отправляем запрос
result = client_st.execute("SELECT * FROM prj_main_Skuzmenko LIMIT 10")

# Обрабатываем результаты запроса
for row in result:
    print(row)

(datetime.datetime(2020, 11, 12, 3, 0), 245470545, 'Москва', 123459843, 'ios', None, None, 2000.0, None, None, None, None, None, None, 123459843, 'profi')
(datetime.datetime(2020, 11, 12, 3, 0), 251072730, 'Москва', 123459894, 'ios', None, None, None, None, None, None, None, None, None, 123459894, 'profi')
(datetime.datetime(2020, 11, 12, 3, 0), 252228480, 'Москва', 123459894, 'ios', None, None, None, None, None, None, None, None, None, 123459894, 'profi')
(datetime.datetime(2020, 11, 12, 3, 0), 161194262, 'Москва', 123460008, 'ios', None, None, 20.0, None, None, None, None, None, None, 123460008, 'profi')
(datetime.datetime(2020, 11, 12, 3, 0), 233425914, 'Москва', 123460008, 'ios', None, None, 1084.0, None, None, None, None, None, None, 123460008, 'profi')
(datetime.datetime(2020, 11, 12, 3, 0), 236854399, 'Москва', 123460008, 'ios', None, None, 79.0, None, None, None, None, None, None, 123460008, 'profi')
(datetime.datetime(2020, 11, 12, 3, 0), 240685513, 'Москва', 123460008, 'ios',

In [122]:
result = client_st.execute('''drop table if exists prj_main_Skuzmenko_agg_view_1''')

In [123]:
result = client_st.execute('''drop table if exists prj_main_Skuzmenko_agg_view_2''')

In [124]:
result = client_st.execute('''drop table if exists prj_main_Skuzmenko_agg_view_3''')

In [125]:
result = client_st.execute('''drop table if exists agg_view_1_mv''')

In [126]:
result = client_st.execute('''drop table if exists agg_view_2_mv''')

In [127]:
result = client_st.execute('''drop table if exists agg_view_3_mv''')

Cделаем 3 материальных представления для ответа на все вопросы бизнеса

In [128]:
result = client_st.execute('''
CREATE TABLE prj_main_Skuzmenko_agg_view_1 (
   execution_date DateTime64,
   platform LowCardinality(Nullable(String)),
   user_type_name LowCardinality(Nullable(String)),
   min_price SimpleAggregateFunction(min, Nullable(Float64)),
   25Q AggregateFunction(quantile(0.25), Nullable(Float64)),
   median_price AggregateFunction(median, Nullable(Float64)),
   avg_price AggregateFunction(avg, Nullable(Float64)),
   75Q AggregateFunction(quantile(0.75), Nullable(Float64)),
   max_price SimpleAggregateFunction(max, Nullable(Float64))
)
ENGINE = AggregatingMergeTree()
ORDER BY (execution_date, platform, user_type_name)
''')

In [129]:
result = client_st.execute('''
CREATE MATERIALIZED VIEW agg_view_1_mv
TO prj_main_Skuzmenko_agg_view_1 AS
SELECT 
   execution_date,
   platform,
   user_type_name,
   min(price) as min_price,
   quantileState(0.25)(price) as 25Q,
   medianState(price) as median_price,
   avgState(price) as avg_price,
   quantileState(0.75)(price) as 75Q,
   max(price) as max_price
FROM prj_main_Skuzmenko
GROUP BY (execution_date, platform, user_type_name)
''')


In [130]:
result = client_st.execute('''
CREATE TABLE prj_main_Skuzmenko_agg_view_2 (
   advert_id UInt32,
   user_id UInt32,
   mark LowCardinality(Nullable(String)),
   model LowCardinality(Nullable(String)),
   min_date SimpleAggregateFunction(min, Nullable(DateTime64)),
   max_date SimpleAggregateFunction(max, Nullable(DateTime64)),
   min_price SimpleAggregateFunction(min, Nullable(Float64)),
   median_price AggregateFunction(median, Nullable(Float64)),
   max_price SimpleAggregateFunction(max, Nullable(Float64))
)
ENGINE = AggregatingMergeTree()
ORDER BY (advert_id, user_id, mark, model)
''')

In [131]:
result = client_st.execute('''
CREATE MATERIALIZED VIEW agg_view_2_mv
TO prj_main_Skuzmenko_agg_view_2 AS
SELECT
   advert_id,
   user_id,
   mark,
   model,
   min(execution_date) as min_date,
   max(execution_date) as max_date,
   min(price) as min_price,
   medianState(price) as median_price,
   max(price) as max_price
FROM prj_main_Skuzmenko
GROUP BY (advert_id, user_id, mark, model)
''')


In [132]:
result = client_st.execute('''
CREATE TABLE prj_main_Skuzmenko_agg_view_3 (
   advert_id UInt32,
   user_id UInt32,
   user_type_name LowCardinality(Nullable(String)),
   mark LowCardinality(Nullable(String)),
   model LowCardinality(Nullable(String)),
   min_date SimpleAggregateFunction(min, Nullable(DateTime64)),
   max_date SimpleAggregateFunction(max, Nullable(DateTime64)),
   cohort Date
)
ENGINE = AggregatingMergeTree()
ORDER BY (advert_id, user_id, user_type_name, mark, model)
''')

In [133]:
result = client_st.execute('''
CREATE MATERIALIZED VIEW agg_view_3_mv
TO prj_main_Skuzmenko_agg_view_3 AS
SELECT
   advert_id,
   user_id,
   user_type_name,
   mark,
   model,
   min(execution_date) as min_date,
   max(execution_date) as max_date,
   toStartOfMonth(min_date) as cohort
FROM prj_main_Skuzmenko
GROUP BY (advert_id, user_id, user_type_name, mark, model)
''')

In [138]:
result = client_st.execute('''
insert into prj_main_Skuzmenko select * from prj_s3_ext_Skuzmenko
''')

**Правда ли, что цены профессионалов больше, чем цены простых пользователей?**

In [70]:
# Отправляем запрос
result = client_st.execute('''
SELECT 
   user_type_name,
   min(min_price) as min_price,
   quantileMerge(0.25)(25Q) as 25Q,
   medianMerge(median_price) as median_price,
   avgMerge(avg_price) as avg_price,
   quantileMerge(0.75)(75Q) as 75Q,
   max(max_price) as max_price
FROM prj_main_Skuzmenko_agg_view_1
GROUP BY (user_type_name)
ORDER BY (user_type_name)
''')

# Обрабатываем результаты запроса
for row in result:
    print(row)

('avtosalon', 1576000.0, 2658000.0, 3098000.0, 3153512.2807017546, 3758000.0, 6078000.0)
('profi', 0.0, 2000.0, 6000.0, 109897.13788965716, 18000.0, 119000000.0)
('simple_user', 0.0, 20000.0, 540000.0, 1444751.1376050818, 1780000.0, 46000000.0)
(None, 0.0, 2800.0, 9000.0, 1012379.9886046273, 137000.0, 42000000.0)


**Ответ:** Нет, не верно. Цены больше у простых пользователей

**Определить топ-5 марок, продающихся быстрее всего, и топ-5 марок, продающихся медленнее всего:**

In [71]:
# Отправляем запрос
result = client_st.execute('''
SELECT 
   mark,
   min(min_date) as min_date,
   max(max_date) as max_date,
   dateDiff('day', min_date, max_date) as diff,
   min(min_price) as min_price,
   medianMerge(median_price) as median_price,
   max(max_price) as max_price
FROM prj_main_Skuzmenko_agg_view_2
WHERE mark in ('Aro', 'Great Wall', 'Alfa Romeo', 'Datsun', 'SEAT', 'Rover', 'Scion', 'Ретро-автомобили', 'MG', 'DongFeng')
GROUP BY (mark)
ORDER BY diff
''')

# Обрабатываем результаты запроса
for row in result:
    print(row)

('Aro', datetime.datetime(2021, 11, 9, 3, 0), datetime.datetime(2021, 11, 10, 3, 0), 1, 160000.0, 160000.0, 160000.0)
('SEAT', datetime.datetime(2021, 10, 11, 3, 0), datetime.datetime(2021, 10, 20, 3, 0), 9, 400000.0, 500000.0, 500000.0)
('DongFeng', datetime.datetime(2021, 10, 26, 3, 0), datetime.datetime(2021, 11, 9, 3, 0), 14, 500000.0, 530000.0, 540000.0)
('Alfa Romeo', datetime.datetime(2021, 10, 23, 3, 0), datetime.datetime(2021, 11, 6, 3, 0), 14, 1260000.0, 1289000.0, 1318000.0)
('Datsun', datetime.datetime(2021, 10, 19, 3, 0), datetime.datetime(2021, 11, 9, 3, 0), 21, 490000.0, 758000.0, 778000.0)
('Ретро-автомобили', datetime.datetime(2020, 12, 3, 3, 0), datetime.datetime(2021, 11, 11, 3, 0), 343, 900000.0, 2000000.0, 3600000.0)
('Scion', datetime.datetime(2020, 11, 22, 3, 0), datetime.datetime(2021, 11, 10, 3, 0), 353, 760000.0, 1660000.0, 1720000.0)
('MG', datetime.datetime(2020, 11, 18, 3, 0), datetime.datetime(2021, 11, 11, 3, 0), 358, 399000.0, 700000.0, 820000.0)
('Rover

**Какую долю объявлений от общей базы составляют эти группы (суммарно ТОП-5 марок, продающихся быстрее всего и ТОП-5 марок, продающихся медленнее всего).**

In [68]:
# Отправляем запрос
result = client_st.execute('''
SELECT 
   countIf(advert_id, mark in ('Aro', 'Great Wall', 'Alfa Romeo', 'Datsun', 'SEAT', 'Rover', 'Scion', 'Ретро-автомобили', 'MG', 'DongFeng')) * 100 / count(advert_id) as percent
FROM prj_main_Skuzmenko_agg_view_2
''')

# Обрабатываем результаты запроса
for row in result:
    print(row)

(0.03503898086621366,)


**Какую долю пользователей от общего количества листеров составляют эти группы (суммарно ТОП-5 марок, продающихся быстрее всего и ТОП-5 марок, продающихся медленнее всего).**

In [69]:
# Отправляем запрос
result = client_st.execute('''
SELECT 

   uniqIf(user_id, mark in ('Aro', 'Great Wall', 'Alfa Romeo', 'Datsun', 'SEAT', 'Rover', 'Scion', 'Ретро-автомобили', 'MG', 'DongFeng')) * 100 / uniq(user_id) as percent

FROM prj_main_Skuzmenko_agg_view_2
''')

# Обрабатываем результаты запроса
for row in result:
    print(row)

(0.17740562021004824,)


**Как изменилось время ухода объявлений в архив за полгода? На сколько дней изменилось среднее время ухода в архив в июньской когорте относительно декабрьской когорты?**

In [93]:
# Отправляем запрос
result = client_st.execute('''
SELECT
   cohort,
   avg(dateDiff('day', min_date, max_date)) as avg_diff,
   median(dateDiff('day', min_date, max_date)) as med_diff
FROM prj_main_Skuzmenko_agg_view_3
WHERE cohort in ('2020-12-01', '2021-06-01')
GROUP BY (cohort)
''')

# Обрабатываем результаты запроса
for row in result:
    print(row)

(datetime.date(2020, 12, 1), 316.91866913123846, 325.0)
(datetime.date(2021, 6, 1), 139.84504331087584, 142.0)


**По каким маркам время ухода в архив выросло больше всего между декабрьской и июньской когортами?**

In [116]:
# Отправляем запрос
result = client_st.execute('''
SELECT
   mark,
   avgIf(dateDiff('day', min_date, max_date), cohort = '2021-06-01') 
   - avgIf(dateDiff('day', min_date, max_date), cohort = '2020-12-01') as avg_diff
   
FROM prj_main_Skuzmenko_agg_view_3
GROUP BY mark
ORDER BY avg_diff DESC
LIMIT 10
''')

# Обрабатываем результаты запроса
for row in result:
    print(row)

('Renault', 107.0)
('Honda', -59.34285714285713)
('Kia', -83.55844155844154)
('Lexus', -100.92857142857143)
('Subaru', -104.0)
('Toyota', -116.4286380597015)
('BMW', -122.04285714285717)
('ВАЗ (Lada)', -124.05555555555554)
('Mazda', -127.0)
('Nissan', -140.90769230769234)


По марке Renault временя ухода в архив выросло, а по остальным только уменьшилось

**У каких пользователей изменение времени продажи выше?**

In [118]:
# Отправляем запрос
result = client_st.execute('''
SELECT
   user_type_name,
   avgIf(dateDiff('day', min_date, max_date), cohort = '2021-06-01') 
   - avgIf(dateDiff('day', min_date, max_date), cohort = '2020-12-01') as avg_diff
   
FROM prj_main_Skuzmenko_agg_view_3
GROUP BY user_type_name
ORDER BY abs(avg_diff) DESC
LIMIT 10
''')

# Обрабатываем результаты запроса
for row in result:
    print(row)

(None, -181.84160756501183)
('profi', -177.84564008416353)
('simple_user', -158.9021880886828)
('avtosalon', None)
