## Создаем Spark-сессию

In [1]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

In [2]:
jar_files = [
    "/usr/local/spark/jars/postgresql-42.6.0.jar",
    "/usr/local/spark/jars/clickhouse-jdbc-0.4.6-all.jar"
]

In [3]:
spark = (
    SparkSession
    .builder
    .appName("SparkDataMart")
    .config("spark.jars", ",".join(jar_files))
    .getOrCreate()
)

25/11/15 15:03:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
",".join(jar_files)

'/usr/local/spark/jars/postgresql-42.6.0.jar,/usr/local/spark/jars/clickhouse-jdbc-0.4.6-all.jar'

In [5]:
spark

## Подключаемся к источникам

**csv**

In [6]:
path = "/home/jovyan/work/data"

In [7]:
campaigns_dict = (
    spark.read
    .option('header', True)
    .csv(f'{path}/campaigns_dict.csv')
)

In [8]:
# ленивые вычисления (transformations, actions)
campaigns_dict.show(5, truncate=False) #truncate=False - чтобы Spark не обрезал текст до определенного количества символов при выводе, а выводил все

+-----------+------------------------------------------+
|campaign_id|campaign_name                             |
+-----------+------------------------------------------+
|1          |year_modern_kitchen_launch_20250115       |
|2          |quarter_custom_kitchens_showcase_20240210 |
|3          |month_smart_kitchen_promotion_20240305    |
|4          |year_luxury_kitchens_exhibit_20240420     |
|5          |quarter_ecofriendly_kitchen_offer_20240512|
+-----------+------------------------------------------+
only showing top 5 rows


In [10]:
campaigns_dict.printSchema()

root
 |-- campaign_id: string (nullable = true)
 |-- campaign_name: string (nullable = true)



**parquet**

In [11]:
submits = spark.read.parquet(f'{path}/submits.parquet')

In [12]:
submits.show(5, truncate=False)

+---------+--------+-----------+
|submit_id|name    |phone      |
+---------+--------+-----------+
|2282     |Jennifer|79511904041|
|9898     |Jeffrey |79824419733|
|9005     |Linda   |79074725672|
|1507     |Teresa  |79864203598|
|3803     |Tanya   |79779567654|
+---------+--------+-----------+
only showing top 5 rows


In [13]:
submits.printSchema()

root
 |-- submit_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- phone: long (nullable = true)



In [14]:
deals = spark.read.parquet(f'{path}/deals.parquet')

In [15]:
deals.show(5, truncate=False)

+-------+----------+---------------------+-----------+------------------------+-----------------------------------------------------+
|deal_id|deal_date |fio                  |phone      |email                   |address                                              |
+-------+----------+---------------------+-----------+------------------------+-----------------------------------------------------+
|1      |2024-03-04|Gregory Wu           |79746561889|paul80@example.net      |098 Yates Cliff Apt. 241, East Monica, DE 88076      |
|2      |2024-08-20|William Ross Jr.     |79074725672|xyoung@example.org      |197 Willie Groves Apt. 655, Port Angelaberg, LA 39384|
|3      |2024-10-15|Sonya Kerr           |79201244835|elewis@example.com      |144 Andrew Cape, Lake Nicholas, SC 58918             |
|4      |2024-12-31|Mrs. Angela Tucker MD|79771829751|robertparker@example.net|6056 Collins View, South Harold, OR 15650            |
|5      |2024-03-23|Eric Flores          |79729054809|barbara7

In [16]:
deals.show(2, truncate=False, vertical=True)

-RECORD 0----------------------------------------------------------
 deal_id   | 1                                                     
 deal_date | 2024-03-04                                            
 fio       | Gregory Wu                                            
 phone     | 79746561889                                           
 email     | paul80@example.net                                    
 address   | 098 Yates Cliff Apt. 241, East Monica, DE 88076       
-RECORD 1----------------------------------------------------------
 deal_id   | 2                                                     
 deal_date | 2024-08-20                                            
 fio       | William Ross Jr.                                      
 phone     | 79074725672                                           
 email     | xyoung@example.org                                    
 address   | 197 Willie Groves Apt. 655, Port Angelaberg, LA 39384 
only showing top 2 rows


In [17]:
deals.printSchema()

root
 |-- deal_id: long (nullable = true)
 |-- deal_date: string (nullable = true)
 |-- fio: string (nullable = true)
 |-- phone: long (nullable = true)
 |-- email: string (nullable = true)
 |-- address: string (nullable = true)



**Установка pyarrow**

In [19]:
# Удалим все старые закачки
!pip cache purge

Files removed: 6


In [20]:
# Обновим pip до последней версии
!pip install --upgrade pip

Collecting pip
  Downloading pip-25.3-py3-none-any.whl.metadata (4.7 kB)
Downloading pip-25.3-py3-none-any.whl (1.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m138.1 kB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 23.3
    Uninstalling pip-23.3:
      Successfully uninstalled pip-23.3
Successfully installed pip-25.3


In [21]:
# Установим pyarrow без кэша и без проверки хэшей
!pip install --no-cache-dir --no-deps pyarrow

Collecting pyarrow
  Downloading pyarrow-22.0.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (3.2 kB)
Downloading pyarrow-22.0.0-cp311-cp311-manylinux_2_28_x86_64.whl (47.7 MB)
[2K   [91m━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━[0m [32m20.0/47.7 MB[0m [31m275.9 kB/s[0m  [36m0:01:41[0m[0m
[0mResuming download pyarrow-22.0.0-cp311-cp311-manylinux_2_28_x86_64.whl (20.0 MB/47.7 MB)
[2K   [91m━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━[0m [32m21.8/47.7 MB[0m [31m240.0 kB/s[0m  [36m0:01:48[0m[0m
[0mResuming download pyarrow-22.0.0-cp311-cp311-manylinux_2_28_x86_64.whl (21.8 MB/47.7 MB)
[2K   [91m━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━[0m [32m23.8/47.7 MB[0m [31m73.8 kB/s[0m  [36m0:05:25[0m[0m
[0mResuming download pyarrow-22.0.0-cp311-cp311-manylinux_2_28_x86_64.whl (23.8 MB/47.7 MB)
[2K   [91m━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━[0m [32m26.7/47.7 MB[0m [31m73.0 kB/s[0m  [36m0:04:49

In [18]:
import pyarrow as pa
import pyarrow.parquet as pq
print(pa.__version__)

22.0.0


In [19]:
pq.read_metadata(f'{path}/submits.parquet')  # Parquet recommends row groups sized between 512MB and 1GB

<pyarrow._parquet.FileMetaData object at 0x7bd53039d670>
  created_by: parquet-cpp-arrow version 19.0.0
  num_columns: 3
  num_rows: 4000
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 2371

In [20]:
submits.count() # количество строк

4000

**postgres**

In [28]:
# чтение датафрейма, который загружу в таблицу postgre
df_csv = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(f'{path}/costs_postgres.csv')

In [30]:
df_csv.show()

+----------+-----------+------+------+-----+
|      date|campaign_id| costs|clicks|views|
+----------+-----------+------+------+-----+
|2024-01-01|          1|670.52|    40|  110|
|2024-01-01|          2| 602.5|    11|  849|
|2024-01-01|          3|654.74|    51|  566|
|2024-01-01|          4|897.24|    86|  679|
|2024-01-01|          5|758.19|    30|  585|
|2024-01-01|          6|523.91|    77|  883|
|2024-01-01|          7|465.35|    98|  527|
|2024-01-01|          8|771.47|     4|  585|
|2024-01-01|          9|973.09|    51|  255|
|2024-01-01|         10|886.07|    88|  815|
|2024-01-01|         11|489.74|    54|  624|
|2024-01-01|         12|522.04|    25|  898|
|2024-01-01|         13|254.72|    23|  895|
|2024-01-01|         14| 840.0|    89|  302|
|2024-01-01|         15|420.02|    64|  974|
|2024-01-01|         16|783.93|    38|  202|
|2024-01-01|         17| 86.72|    30|  554|
|2024-01-01|         18|480.09|    41|  484|
|2024-01-01|         19|856.34|    95|  268|
|2024-01-0

In [34]:
df_csv.printSchema()

root
 |-- date: date (nullable = true)
 |-- campaign_id: integer (nullable = true)
 |-- costs: double (nullable = true)
 |-- clicks: integer (nullable = true)
 |-- views: integer (nullable = true)



In [21]:
pg_host = 'postgres'
pg_port = '5432'
pg_db = 'mydb'
pg_table = 'costs'
pg_user = 'admin'
pg_password = 'admin'

In [43]:
# Пишем в PostgreSQL — Spark сам создаст таблицу, если её нет
# overwrite чтобы создать заново
df_csv.write \
    .format("jdbc") \
    .option("url", f'jdbc:postgresql://{pg_host}:{pg_port}/{pg_db}') \
    .option("dbtable", pg_table) \
    .option("user", pg_user) \
    .option("password", pg_password) \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()

                                                                                

In [22]:
#Чтение таблицы
costs = (
    spark.read
    .format('jdbc')
    .option('url', f'jdbc:postgresql://{pg_host}:{pg_port}/{pg_db}')
    .option('dbtable', pg_table)
    .option('user', pg_user)
    .option('password', pg_password)
    .option('driver', 'org.postgresql.Driver')
    .load()
)

In [23]:
costs.show(5, truncate=False)  # заглянуть в pgAdmin

+----------+-----------+------+------+-----+
|date      |campaign_id|costs |clicks|views|
+----------+-----------+------+------+-----+
|2024-01-01|1          |670.52|40    |110  |
|2024-01-01|2          |602.5 |11    |849  |
|2024-01-01|3          |654.74|51    |566  |
|2024-01-01|4          |897.24|86    |679  |
|2024-01-01|5          |758.19|30    |585  |
+----------+-----------+------+------+-----+
only showing top 5 rows


In [24]:
costs.printSchema()

root
 |-- date: date (nullable = true)
 |-- campaign_id: integer (nullable = true)
 |-- costs: double (nullable = true)
 |-- clicks: integer (nullable = true)
 |-- views: integer (nullable = true)



**ClickHouse**

In [149]:
# чтение датафрейма, который загружу в таблицу кх
df_csv_click = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(f'{path}/visits_clickhouse.csv')

IndentationError: unexpected indent (1671867515.py, line 5)

In [49]:
df_csv_click.show(2)

+-------+-------------------+--------------------+--------+--------+--------+--------------------+----------------+
|visitid|      visitDateTime|                 URL|duration|clientID|  source|         UTMCampaign|          params|
+-------+-------------------+--------------------+--------+--------+--------+--------------------+----------------+
| 720128|2024-01-23 18:23:48|https://our-cool-...|      80|     289|      ad|month_openconcept...|              []|
| 695905|2024-07-24 03:32:19|https://our-cool-...|      66|     765|internal|month_contemporar...|['submit', 9469]|
+-------+-------------------+--------------------+--------+--------+--------+--------------------+----------------+
only showing top 2 rows


In [50]:
df_csv_click.printSchema()

root
 |-- visitid: integer (nullable = true)
 |-- visitDateTime: timestamp (nullable = true)
 |-- URL: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- clientID: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- UTMCampaign: string (nullable = true)
 |-- params: string (nullable = true)



In [25]:
# Параметры ClickHouse
ch_host = 'clickhouse'
ch_port = '8123'
ch_db = 'default'
ch_table = 'visits'
ch_user = 'default'
ch_password = 'mypassword'
driver = "com.clickhouse.jdbc.ClickHouseDriver"
url = f"jdbc:clickhouse://{ch_host}:{ch_port}/{ch_db}"

In [70]:
df_csv_click.write \
    .format("jdbc") \
    .option("url", f'jdbc:clickhouse://{ch_host}:{ch_port}/{ch_db}') \
    .option("dbtable", ch_table) \
    .option("user", ch_user) \
    .option("password", ch_password) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("append") \
    .save()


25/11/09 14:49:04 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction is not supported. You may change jdbcCompliant to false to throw SQLException instead.
25/11/09 14:49:04 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction is not supported. You may change jdbcCompliant to false to throw SQLException instead.
25/11/09 14:49:04 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction [fb534fb0-53d9-43ab-b53c-e788054d1497] (11 queries & 0 savepoints) is committed.
25/11/09 14:49:04 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction [665572f4-4368-4301-9c48-2b80b0a49490] (0 queries & 0 savepoints) is committed.


In [26]:
visits = spark.read.format("jdbc") \
        .option('url', f'jdbc:clickhouse://{ch_host}:{ch_port}/{ch_db}') \
        .option("dbtable", ch_table) \
        .option("user", ch_user) \
        .option("password", ch_password) \
        .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
        .load()

visits.show(8)

+-------+-------------------+--------------------+--------+--------+--------+--------------------+----------------+
|visitid|      visitDateTime|                 URL|duration|clientID|  source|         UTMCampaign|          params|
+-------+-------------------+--------------------+--------+--------+--------+--------------------+----------------+
| 100059|2024-08-31 14:08:00|https://our-cool-...|      79|     522|  direct|month_openconcept...|              []|
| 100094|2024-06-09 09:30:12|https://our-cool-...|      20|     847|internal|month_contemporar...|              []|
| 100109|2024-04-03 17:07:17|https://our-cool-...|      66|     121|  direct|year_traditional_...|['submit', 4315]|
| 100150|2024-02-02 18:21:08|https://our-cool-...|      12|     790|  direct|month_contemporar...|              []|
| 100164|2024-07-13 15:56:27|https://our-cool-...|      44|     958|  direct|year_luxury_kitch...|['submit', 7580]|
| 100230|2024-11-05 15:09:03|https://our-cool-...|      69|     861|inte

In [27]:
visits.show(1, truncate=False, vertical=True)

-RECORD 0------------------------------------------------------
 visitid       | 100059                                        
 visitDateTime | 2024-08-31 14:08:00                           
 URL           | https://our-cool-website.com/featured         
 duration      | 79                                            
 clientID      | 522                                           
 source        | direct                                        
 UTMCampaign   | month_openconcept_kitchens_promotion_20240628 
 params        | []                                            
only showing top 1 row


In [28]:
visits.printSchema()

root
 |-- visitid: integer (nullable = true)
 |-- visitDateTime: timestamp (nullable = true)
 |-- URL: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- clientID: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- UTMCampaign: string (nullable = true)
 |-- params: string (nullable = true)



## Готовим источники

### Визиты (clickhouse)

In [88]:
visits.show(1)

+-------+-------------------+--------------------+--------+--------+------+--------------------+------+
|visitid|      visitDateTime|                 URL|duration|clientID|source|         UTMCampaign|params|
+-------+-------------------+--------------------+--------+--------+------+--------------------+------+
| 100059|2024-08-31 14:08:00|https://our-cool-...|      79|     522|direct|month_openconcept...|    []|
+-------+-------------------+--------------------+--------+--------+------+--------------------+------+
only showing top 1 row


In [89]:
# кх
filtered_step1 = (
    visits
    .withColumn('dt', F.date_format(F.col('visitDateTime'), 'yyyy-MM-dd'))
    .where(F.col('dt').between('2024-01-01', '2025-01-27'))
    .where(F.col('source').isin('ad', 'direct'))
    .where(F.col('URL').rlike('.*checkout.*|.*add.*|.*home.*|.*contact.*|.*top50.*|.*customer-service.*|.*wishlist.*|.*sale.*|.*best-sellers.*|.*view.*|.*discount.*|.*featured.*|.*new-arrivals.*|.*settings.*|.*return-policy.*|.*edit.*|.*delete.*|.*reviews.*|.*products.*|.*about.*'))
    .select(
        'dt',
        'visitid',
        'clientID',
        'URL',
        'duration',
        'source',
        'UTMCampaign',
        'params',
        F.regexp_replace(F.col('params'), r'\[|\]', '').alias('params_regex')
    )
    .withColumn('params_split', F.split('params_regex', ', '))
)

In [90]:
filtered_step1.show(1, truncate=False, vertical=True)

-RECORD 0-----------------------------------------------------
 dt           | 2024-08-31                                    
 visitid      | 100059                                        
 clientID     | 522                                           
 URL          | https://our-cool-website.com/featured         
 duration     | 79                                            
 source       | direct                                        
 UTMCampaign  | month_openconcept_kitchens_promotion_20240628 
 params       | []                                            
 params_regex |                                               
 params_split | []                                            
-RECORD 1-----------------------------------------------------
 dt           | 2024-04-03                                    
 visitid      | 100109                                        
 clientID     | 121                                           
 URL          | https://our-cool-website.com/view      

In [105]:
filtered_step1.printSchema()  # string vs array

root
 |-- dt: string (nullable = true)
 |-- visitid: integer (nullable = true)
 |-- clientID: integer (nullable = true)
 |-- URL: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- UTMCampaign: string (nullable = true)
 |-- params: string (nullable = true)
 |-- params_regex: string (nullable = true)
 |-- params_split: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [80]:
#filtered_step2 = (
#    filtered_step1
#    .withColumn('event_type', F.regexp_replace(F.col('params_split')[0], "'", ''))
#    .withColumn('event_id', F.col('params_split')[1].cast('int'))
#)

In [None]:
#filtered_step2.show(1, truncate=False, vertical=True)

In [155]:
filtered_step2 = (
    filtered_step1
    .withColumn('event_type', F.when(F.size('params_split') > 0, F.regexp_replace(F.col('params_split').getItem(0), "'", '')).otherwise(None))
    .withColumn('event_id',F.when(F.size('params_split') > 1, F.col('params_split').getItem(1).cast('int')).otherwise(None))
    )

In [156]:
filtered_step2.show(2, truncate=False, vertical=True)

-RECORD 0-----------------------------------------------------
 dt           | 2024-08-31                                    
 visitid      | 100059                                        
 clientID     | 522                                           
 URL          | https://our-cool-website.com/featured         
 duration     | 79                                            
 source       | direct                                        
 UTMCampaign  | month_openconcept_kitchens_promotion_20240628 
 params       | []                                            
 params_regex |                                               
 params_split | []                                            
 event_type   |                                               
 event_id     | NULL                                          
-RECORD 1-----------------------------------------------------
 dt           | 2024-04-03                                    
 visitid      | 100109                                 

In [157]:
filtered_step2.printSchema()

root
 |-- dt: string (nullable = true)
 |-- visitid: integer (nullable = true)
 |-- clientID: integer (nullable = true)
 |-- URL: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- UTMCampaign: string (nullable = true)
 |-- params: string (nullable = true)
 |-- params_regex: string (nullable = true)
 |-- params_split: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- event_type: string (nullable = true)
 |-- event_id: integer (nullable = true)



In [91]:
visits_df = (
    filtered_step2
    .where(F.col('event_type') == 'submit')
    .select(
        'dt',
        F.col('visitid').cast('string').alias('visitid'),
        F.col('clientID').cast('string').alias('clientid'),
        'URL',
        'duration',
        'source',
        'UTMCampaign',
        'event_type',
        'event_id'
    )
    .distinct()
)

In [159]:
visits_df.show(1, truncate=False, vertical=True)

-RECORD 0----------------------------------------------------
 dt          | 2024-07-16                                    
 visitid     | 294982                                        
 clientid    | 773                                           
 URL         | https://our-cool-website.com/customer-service 
 duration    | 57                                            
 source      | direct                                        
 UTMCampaign | year_traditional_kitchens_launch_20240707     
 event_type  | submit                                        
 event_id    | 9620                                          
only showing top 1 row


In [93]:
visits_df.printSchema()

root
 |-- dt: string (nullable = true)
 |-- visitid: string (nullable = true)
 |-- clientid: string (nullable = true)
 |-- URL: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- UTMCampaign: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_id: integer (nullable = true)



In [94]:
#Количество строк после преобразования visits
visits_df.count()

2433

In [95]:
#Изначальное количество строк
visits.count()

10000

### Расходы (postgres)

In [107]:
costs_df = (
    costs
    .groupBy(
        F.col('date').cast('string').alias('date'),
        'campaign_id'
    )
    .agg(
        F.sum(F.col('costs')).cast('decimal(19,2)').alias('costs'),
        F.sum(F.col('clicks')).alias('clicks'),
        F.sum(F.col('views')).alias('views')
    )
)

In [108]:
costs_df.show(5)

[Stage 57:>                                                         (0 + 1) / 1]

+----------+-----------+------+------+-----+
|      date|campaign_id| costs|clicks|views|
+----------+-----------+------+------+-----+
|2024-01-05|         75|107.03|    26|  958|
|2024-01-05|         97|408.51|    59|  292|
|2024-01-06|         44|748.68|    33|  380|
|2024-01-06|         58|422.91|    30|  575|
|2024-01-11|         91|496.45|     5|  435|
+----------+-----------+------+------+-----+
only showing top 5 rows


                                                                                

In [109]:
costs_df.printSchema()

root
 |-- date: string (nullable = true)
 |-- campaign_id: integer (nullable = true)
 |-- costs: decimal(19,2) (nullable = true)
 |-- clicks: long (nullable = true)
 |-- views: long (nullable = true)



### Кампании (csv)

In [110]:
campaigns_dict.show(1, truncate=False)

+-----------+-----------------------------------+
|campaign_id|campaign_name                      |
+-----------+-----------------------------------+
|1          |year_modern_kitchen_launch_20250115|
+-----------+-----------------------------------+
only showing top 1 row


In [111]:
campaigns_dict.printSchema()

root
 |-- campaign_id: string (nullable = true)
 |-- campaign_name: string (nullable = true)



In [112]:
campaigns_df = (
    campaigns_dict
    .withColumn('campaign_id', F.col('campaign_id').cast('integer'))
    .withColumn(
        'campaign_duration',
        F.when(F.col('campaign_name').like('year%'), 'Год')
        .when(F.col('campaign_name').like('quarter%'), 'Квартал')
        .when(F.col('campaign_name').like('month%'), 'Месяц')
        .otherwise(None)
    )
)

In [113]:
campaigns_df.show(5, truncate=False)

+-----------+------------------------------------------+-----------------+
|campaign_id|campaign_name                             |campaign_duration|
+-----------+------------------------------------------+-----------------+
|1          |year_modern_kitchen_launch_20250115       |Год              |
|2          |quarter_custom_kitchens_showcase_20240210 |Квартал          |
|3          |month_smart_kitchen_promotion_20240305    |Месяц            |
|4          |year_luxury_kitchens_exhibit_20240420     |Год              |
|5          |quarter_ecofriendly_kitchen_offer_20240512|Квартал          |
+-----------+------------------------------------------+-----------------+
only showing top 5 rows


In [114]:
campaigns_df.printSchema()

root
 |-- campaign_id: integer (nullable = true)
 |-- campaign_name: string (nullable = true)
 |-- campaign_duration: string (nullable = true)



### Заявки (parquet)

In [115]:
submits.printSchema()

root
 |-- submit_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- phone: long (nullable = true)



In [116]:
submits.show(1)

+---------+--------+-----------+
|submit_id|    name|      phone|
+---------+--------+-----------+
|     2282|Jennifer|79511904041|
+---------+--------+-----------+
only showing top 1 row


In [117]:
submits_df = (
    submits
    .withColumn('phone', F.col('phone').cast('string'))
    .withColumn('phone_plus', F.concat(F.lit('+'), F.col('phone')))
    .withColumn('phone_md5', F.md5('phone'))
    .withColumn('phone_plus_md5', F.md5('phone_plus'))
)

In [118]:
submits_df.show(2, truncate=False)

+---------+--------+-----------+------------+--------------------------------+--------------------------------+
|submit_id|name    |phone      |phone_plus  |phone_md5                       |phone_plus_md5                  |
+---------+--------+-----------+------------+--------------------------------+--------------------------------+
|2282     |Jennifer|79511904041|+79511904041|4c7720fdf6f9eec623dc0f961f31f488|6ce81d5347bcd3eadb2921b7c4828e3b|
|9898     |Jeffrey |79824419733|+79824419733|9d106e45036bd4176774eb94adc9aacc|43630233dcc965f9827e394038b0321a|
+---------+--------+-----------+------------+--------------------------------+--------------------------------+
only showing top 2 rows


### Сделки (parquet)

In [119]:
deals.printSchema()

root
 |-- deal_id: long (nullable = true)
 |-- deal_date: string (nullable = true)
 |-- fio: string (nullable = true)
 |-- phone: long (nullable = true)
 |-- email: string (nullable = true)
 |-- address: string (nullable = true)



In [120]:
deals_df = (
    deals
    .withColumn('username', F.split(F.col('email'), '@').getItem(0))
    .withColumn('domain', F.split(F.col('email'), '@').getItem(1))
    .where(F.col('domain').isin('example.com', 'example.org', 'example.net'))
    .withColumn('phone', F.col('phone').cast('string'))
)

In [121]:
deals_df.show(1)

+-------+----------+----------+-----------+------------------+--------------------+--------+-----------+
|deal_id| deal_date|       fio|      phone|             email|             address|username|     domain|
+-------+----------+----------+-----------+------------------+--------------------+--------+-----------+
|      1|2024-03-04|Gregory Wu|79746561889|paul80@example.net|098 Yates Cliff A...|  paul80|example.net|
+-------+----------+----------+-----------+------------------+--------------------+--------+-----------+
only showing top 1 row


In [122]:
deals_df.printSchema()

root
 |-- deal_id: long (nullable = true)
 |-- deal_date: string (nullable = true)
 |-- fio: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address: string (nullable = true)
 |-- username: string (nullable = true)
 |-- domain: string (nullable = true)



## Собираем витрину

In [123]:
# концепция "One Big Table"
customer_detailed = (
    visits_df.alias('v')
    .join(
        submits_df.alias('s'),
        F.col('v.event_id') == F.col('s.submit_id'),
        'left'
    )
    .join(
        deals_df.alias('d'),
        (F.col('s.phone') == F.col('d.phone')) &
        (F.col('v.dt') <= F.col('d.deal_date')),
        'left'
    )
    .join(
        campaigns_df.alias('camp'),
        F.col('v.utmcampaign') == F.col('camp.campaign_name'),
        'left'
    )
    .join(
        costs_df.alias('c'),
        (F.col('camp.campaign_id') == F.col('c.campaign_id')) &
        (F.col('v.dt') == F.col('c.date')),
        'left'
    )
    .select(
        'v.dt',
        F.col('v.visitid').alias('visit_id'),
        F.col('v.clientid').alias('client_id'),
        'v.url',
        'v.duration',
        'v.source',
        'v.utmcampaign',
        'v.event_type',
        'v.event_id',
        's.submit_id',
        's.name',
        's.phone',
        's.phone_plus',
        's.phone_md5',
        's.phone_plus_md5',
        'd.deal_id',
        'd.deal_date',
        'd.fio',
        F.col('d.phone').alias('phone_deal'),
        'd.email',
        'd.address',
        'd.username',
        'd.domain',
        'camp.campaign_name',
        'camp.campaign_duration',
        'c.costs',
        'c.clicks',
        'c.views'
    )
)

In [124]:
customer_detailed.printSchema()

root
 |-- dt: string (nullable = true)
 |-- visit_id: string (nullable = true)
 |-- client_id: string (nullable = true)
 |-- url: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- utmcampaign: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_id: integer (nullable = true)
 |-- submit_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- phone_plus: string (nullable = true)
 |-- phone_md5: string (nullable = true)
 |-- phone_plus_md5: string (nullable = true)
 |-- deal_id: long (nullable = true)
 |-- deal_date: string (nullable = true)
 |-- fio: string (nullable = true)
 |-- phone_deal: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address: string (nullable = true)
 |-- username: string (nullable = true)
 |-- domain: string (nullable = true)
 |-- campaign_name: string (nullable = true)
 |-- campaign_duration: string (nullable = tr

In [125]:
len(customer_detailed.columns)

28

In [126]:
customer_detailed.cache()

25/11/15 18:11:15 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


DataFrame[dt: string, visit_id: string, client_id: string, url: string, duration: int, source: string, utmcampaign: string, event_type: string, event_id: int, submit_id: bigint, name: string, phone: string, phone_plus: string, phone_md5: string, phone_plus_md5: string, deal_id: bigint, deal_date: string, fio: string, phone_deal: string, email: string, address: string, username: string, domain: string, campaign_name: string, campaign_duration: string, costs: decimal(19,2), clicks: bigint, views: bigint]

In [127]:
customer_detailed.count()  # Spark UI

                                                                                

2639

In [128]:
#campaigns_agg = (
#    customer_detailed
#    .groupBy('campaign_name')
#    .agg(
#        F.countDistinct('visit_id').alias('unique_visits'),
#        F.countDistinct('client_id').alias('unique_clients'),
#        F.countDistinct('submit_id').alias('unique_submits'),
#        F.countDistinct('deal_id').alias('unique_deals'),
#        F.sum('costs').alias('total_costs'),
#        F.sum('clicks').alias('total_clicks'),
#        F.sum('views').alias('total_views'),
#        F.sum('duration').alias('total_duration')
#    )
#    .withColumn('avg_deal_cost', (F.col('total_costs') / F.col('unique_deals')).cast('decimal(19,2)'))
#)

In [130]:
campaigns_agg = (
    customer_detailed
    .groupBy('campaign_name')
    .agg(
        F.countDistinct('visit_id').alias('unique_visits'),
        F.countDistinct('client_id').alias('unique_clients'),
        F.countDistinct('submit_id').alias('unique_submits'),
        F.countDistinct('deal_id').alias('unique_deals'),
        F.sum('costs').alias('total_costs'),
        F.sum('clicks').alias('total_clicks'),
        F.sum('views').alias('total_views'),
        F.sum('duration').alias('total_duration')
    )
    .withColumn(
        'avg_deal_cost',
        F.when(F.col('unique_deals') != 0, (F.col('total_costs') / F.col('unique_deals')).cast('decimal(19,2)'))
         .otherwise(None)
    )
)


In [131]:
campaigns_agg.cache().count()

                                                                                

99

In [132]:
campaigns_agg.show(1, truncate=False, vertical=True)

-RECORD 0-------------------------------------------------
 campaign_name  | year_traditional_kitchens_demo_20240725 
 unique_visits  | 22                                      
 unique_clients | 21                                      
 unique_submits | 5                                       
 unique_deals   | 1                                       
 total_costs    | 11446.75                                
 total_clicks   | 1441                                    
 total_views    | 14004                                   
 total_duration | 1100                                    
 avg_deal_cost  | 11446.75                                
only showing top 1 row


In [133]:
dates_agg = (
    customer_detailed
    .groupBy(F.substring('dt', 1, 7).alias('month'))  # 2025-01-01
    .agg(
        F.countDistinct('visit_id').alias('unique_visits'),
        F.countDistinct('client_id').alias('unique_clients'),
        F.countDistinct('submit_id').alias('unique_submits'),
        F.countDistinct('deal_id').alias('unique_deals'),
        F.sum('costs').alias('total_costs'),
        F.sum('clicks').alias('total_clicks'),
        F.sum('views').alias('total_views'),
        F.sum('duration').alias('total_duration')
    )
    .withColumn('avg_deal_cost', (F.col('total_costs') / F.col('unique_deals')).cast('decimal(19,2)'))
)

In [134]:
dates_agg.cache().count()

                                                                                

13

In [135]:
dates_agg.show(1, truncate=False, vertical=True)

-RECORD 0-------------------
 month          | 2024-09   
 unique_visits  | 191       
 unique_clients | 168       
 unique_submits | 67        
 unique_deals   | 15        
 total_costs    | 100807.61 
 total_clicks   | 10091     
 total_views    | 102230    
 total_duration | 10225     
 avg_deal_cost  | 6720.51   
only showing top 1 row


In [136]:
def save_to_postgres(df, table_name):
    (
        df.write
        .format('jdbc')
        .option('url', f'jdbc:postgresql://{pg_host}:{pg_port}/{pg_db}')
        .option('dbtable', table_name)
        .option('user', pg_user)
        .option('password', pg_password)
        .option('driver', 'org.postgresql.Driver')
        .mode("overwrite")
        .save()
    )

In [137]:
save_to_postgres(customer_detailed, 'customer_detailed')

                                                                                

In [138]:
save_to_postgres(campaigns_agg, 'campaigns_agg')

                                                                                

In [139]:
save_to_postgres(dates_agg, 'dates_agg')

## Считаем метрики и анализируем результаты

In [140]:
# 1. Кампании без выручки
(
    campaigns_agg
    .where('unique_deals = 0')
    .select('campaign_name', 'total_costs', 'unique_deals')
    .sort('total_costs')
    .show(truncate=False)
)

+-----------------------------------------------+-----------+------------+
|campaign_name                                  |total_costs|unique_deals|
+-----------------------------------------------+-----------+------------+
|month_smart_kitchens_launch_20240330           |7417.29    |0           |
|quarter_custom_kitchens_showcase_20240210      |8397.63    |0           |
|month_contemporary_kitchens_event_20241208     |9077.02    |0           |
|month_contemporary_kitchens_event_20240920     |10256.14   |0           |
|month_ecofriendly_kitchens_experience_20241219 |10682.85   |0           |
|quarter_ecofriendly_kitchen_experience_20241119|10999.04   |0           |
|month_smart_kitchen_promotion_20240305         |11430.88   |0           |
|year_traditional_kitchens_showcase_20241018    |12105.87   |0           |
|year_modern_kitchens_demo_20240419             |12427.38   |0           |
|month_openconcept_kitchens_initiative_20240328 |13187.82   |0           |
|quarter_luxury_kitchens_

In [141]:
(
    campaigns_agg
    .where('unique_deals = 0')
    .select(F.count('campaign_name'), F.sum('total_costs'))
    .show()
)

+--------------------+----------------+
|count(campaign_name)|sum(total_costs)|
+--------------------+----------------+
|                  13|       149956.85|
+--------------------+----------------+



In [142]:
# 2. Средняя цена сделки
(
    campaigns_agg
    .where('unique_deals > 0')
    .select('campaign_name', 'total_costs', 'unique_deals', 'avg_deal_cost')
    .sort('avg_deal_cost')
    .show(truncate=False)
)

+------------------------------------------------+-----------+------------+-------------+
|campaign_name                                   |total_costs|unique_deals|avg_deal_cost|
+------------------------------------------------+-----------+------------+-------------+
|year_modern_kitchens_showcase_20241014          |11705.53   |8           |1463.19      |
|quarter_custom_kitchens_innovation_20240817     |15356.76   |10          |1535.68      |
|year_modern_kitchen_experience_20240702         |8528.47    |5           |1705.69      |
|quarter_custom_kitchens_experience_20240222     |16131.74   |9           |1792.42      |
|month_contemporary_kitchens_promotion_20241215  |14908.49   |8           |1863.56      |
|quarter_custom_kitchen_initiative_20241102      |16944.70   |8           |2118.09      |
|month_openconcept_kitchens_promotion_20240628   |13101.37   |6           |2183.56      |
|year_smart_kitchens_event_20250102              |11699.64   |5           |2339.93      |
|year_trad

In [143]:
(
    campaigns_agg
    .where('unique_deals > 0')
    .select('campaign_name', 'total_costs', 'unique_deals', 'avg_deal_cost')
    .sort('avg_deal_cost', ascending=False)
    .show(truncate=False)
)

+-----------------------------------------------+-----------+------------+-------------+
|campaign_name                                  |total_costs|unique_deals|avg_deal_cost|
+-----------------------------------------------+-----------+------------+-------------+
|quarter_spacesaving_kitchen_innovation_20240825|16861.94   |1           |16861.94     |
|month_contemporary_kitchen_showcase_20240630   |15880.89   |1           |15880.89     |
|quarter_custom_kitchen_show_20240213           |15874.75   |1           |15874.75     |
|year_luxury_kitchens_show_20241001             |14752.07   |1           |14752.07     |
|quarter_spacesaving_kitchen_showcase_20240228  |14382.19   |1           |14382.19     |
|month_custom_kitchens_show_20241205            |13947.35   |1           |13947.35     |
|year_traditional_kitchens_launch_20240707      |13857.96   |1           |13857.96     |
|year_traditional_kitchens_experience_20240715  |12925.29   |1           |12925.29     |
|quarter_spacesaving_

In [144]:
# 3. Убыточные кампании. Пусть каждая сделка стоит 5к
(
    campaigns_agg
    .select('campaign_name', 'total_costs', 'unique_deals', 'avg_deal_cost')
    .withColumn('revenue', F.col('unique_deals') * F.lit(5000))
    .withColumn('profit', F.col('revenue') - F.col('total_costs'))
    .sort('profit')
    .show(truncate=False)
)

+-----------------------------------------------+-----------+------------+-------------+-------+---------+
|campaign_name                                  |total_costs|unique_deals|avg_deal_cost|revenue|profit   |
+-----------------------------------------------+-----------+------------+-------------+-------+---------+
|quarter_custom_kitchens_experience_20240527    |15732.40   |0           |NULL         |0      |-15732.40|
|month_openconcept_kitchens_experience_20240910 |14837.06   |0           |NULL         |0      |-14837.06|
|quarter_luxury_kitchens_innovation_20241116    |13405.47   |0           |NULL         |0      |-13405.47|
|month_openconcept_kitchens_initiative_20240328 |13187.82   |0           |NULL         |0      |-13187.82|
|year_modern_kitchens_demo_20240419             |12427.38   |0           |NULL         |0      |-12427.38|
|year_traditional_kitchens_showcase_20241018    |12105.87   |0           |NULL         |0      |-12105.87|
|quarter_spacesaving_kitchen_innovati

In [145]:
# 4. Самые прибыльные кампании
(
    campaigns_agg
    .select('campaign_name', 'total_costs', 'unique_deals', 'avg_deal_cost')
    .withColumn('revenue', F.col('unique_deals') * F.lit(5000))
    .withColumn('profit', F.col('revenue') - F.col('total_costs'))
    .sort(F.desc('profit'))
    .show(truncate=False)
)

+------------------------------------------------+-----------+------------+-------------+-------+--------+
|campaign_name                                   |total_costs|unique_deals|avg_deal_cost|revenue|profit  |
+------------------------------------------------+-----------+------------+-------------+-------+--------+
|quarter_custom_kitchens_innovation_20240817     |15356.76   |10          |1535.68      |50000  |34643.24|
|quarter_custom_kitchens_experience_20240222     |16131.74   |9           |1792.42      |45000  |28868.26|
|year_modern_kitchens_showcase_20241014          |11705.53   |8           |1463.19      |40000  |28294.47|
|month_contemporary_kitchens_promotion_20241215  |14908.49   |8           |1863.56      |40000  |25091.51|
|quarter_custom_kitchen_initiative_20241102      |16944.70   |8           |2118.09      |40000  |23055.30|
|year_traditional_kitchens_fair_20240415         |19396.53   |8           |2424.57      |40000  |20603.47|
|month_openconcept_kitchens_promotion

In [146]:
# 5. Метрики в разбивке по месяцам
(
    dates_agg
    .withColumn('revenue', F.col('unique_deals') * F.lit(5000))
    .withColumn('profit', F.col('revenue') - F.col('total_costs'))
    .drop('total_duration', 'total_views')
    .sort('month')
    .show()
)

+-------+-------------+--------------+--------------+------------+-----------+------------+-------------+-------+---------+
|  month|unique_visits|unique_clients|unique_submits|unique_deals|total_costs|total_clicks|avg_deal_cost|revenue|   profit|
+-------+-------------+--------------+--------------+------------+-----------+------------+-------------+-------+---------+
|2024-01|          204|           185|            68|          36|  114500.26|       11503|      3180.56| 180000| 65499.74|
|2024-02|          184|           164|            55|          28|   98550.71|        8960|      3519.67| 140000| 41449.29|
|2024-03|          182|           164|            57|          34|   99016.38|       10123|      2912.25| 170000| 70983.62|
|2024-04|          187|           166|            53|          21|   99450.75|        9704|      4735.75| 105000|  5549.25|
|2024-05|          188|           174|            68|          30|   98370.96|       10277|      3279.03| 150000| 51629.04|
|2024-06

In [147]:
# 6. Сколько всего потратили денег на рекламу за год
dates_agg.select(F.sum('total_costs')).show()

+----------------+
|sum(total_costs)|
+----------------+
|      1319918.78|
+----------------+



## Освобождаем ресурсы, останавливаем Spark-сессию

In [None]:
# Spark UI
customer_detailed.unpersist()
campaigns_agg.unpersist()
dates_agg.unpersist()

In [None]:
spark.stop()