## Создаем Spark-сессию

In [1]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

In [2]:
jar_files = [
    "E:\\EDesktop\\Webinar\\postgresql-42.7.5.jar",
    "E:\\EDesktop\\Webinar\\clickhouse-jdbc-0.4.6.jar"
]

In [3]:
spark = (
    SparkSession
    .builder
    .appName("SparkWebinar")
    .config("spark.jars", ",".join(jar_files))
    .getOrCreate()
)

In [4]:
spark

## Подключаемся к источникам

### csv

In [6]:
path = 'E:\\EDesktop\\Webinar\\data'

In [None]:
campaigns_dict = (
    spark.read
    .option('header', True)
    .csv(f'{path}\\campaigns_dict.csv')
)

In [9]:
# ленивые вычисления
campaigns_dict.show(5, truncate=False)

+-----------+------------------------------------------+
|campaign_id|campaign_name                             |
+-----------+------------------------------------------+
|1          |year_modern_kitchen_launch_20250115       |
|2          |quarter_custom_kitchens_showcase_20240210 |
|3          |month_smart_kitchen_promotion_20240305    |
|4          |year_luxury_kitchens_exhibit_20240420     |
|5          |quarter_ecofriendly_kitchen_offer_20240512|
+-----------+------------------------------------------+
only showing top 5 rows



In [10]:
campaigns_dict.printSchema()

root
 |-- campaign_id: string (nullable = true)
 |-- campaign_name: string (nullable = true)



### parquet

In [11]:
submits = spark.read.parquet(f'{path}\\submits.parquet')

In [12]:
submits.show(5, truncate=False)

+---------+--------+-----------+
|submit_id|name    |phone      |
+---------+--------+-----------+
|2282     |Jennifer|79511904041|
|9898     |Jeffrey |79824419733|
|9005     |Linda   |79074725672|
|1507     |Teresa  |79864203598|
|3803     |Tanya   |79779567654|
+---------+--------+-----------+
only showing top 5 rows



In [13]:
submits.printSchema()

root
 |-- submit_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- phone: long (nullable = true)



In [14]:
deals = spark.read.parquet(f'{path}\\deals.parquet')

In [15]:
deals.show(5, truncate=False)

+-------+----------+---------------------+-----------+------------------------+-----------------------------------------------------+
|deal_id|deal_date |fio                  |phone      |email                   |address                                              |
+-------+----------+---------------------+-----------+------------------------+-----------------------------------------------------+
|1      |2024-03-04|Gregory Wu           |79746561889|paul80@example.net      |098 Yates Cliff Apt. 241, East Monica, DE 88076      |
|2      |2024-08-20|William Ross Jr.     |79074725672|xyoung@example.org      |197 Willie Groves Apt. 655, Port Angelaberg, LA 39384|
|3      |2024-10-15|Sonya Kerr           |79201244835|elewis@example.com      |144 Andrew Cape, Lake Nicholas, SC 58918             |
|4      |2024-12-31|Mrs. Angela Tucker MD|79771829751|robertparker@example.net|6056 Collins View, South Harold, OR 15650            |
|5      |2024-03-23|Eric Flores          |79729054809|barbara7

In [16]:
deals.printSchema()

root
 |-- deal_id: long (nullable = true)
 |-- deal_date: string (nullable = true)
 |-- fio: string (nullable = true)
 |-- phone: long (nullable = true)
 |-- email: string (nullable = true)
 |-- address: string (nullable = true)



In [17]:
import pyarrow as pa
import pyarrow.parquet as pq

In [18]:
pq.read_metadata(f'{path}\\submits.parquet')

<pyarrow._parquet.FileMetaData object at 0x00000210EEF953F0>
  created_by: parquet-cpp-arrow version 19.0.0
  num_columns: 3
  num_rows: 4000
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 2371

In [19]:
pq.read_metadata(f'{path}\\deals.parquet')

<pyarrow._parquet.FileMetaData object at 0x00000210EEF889A0>
  created_by: parquet-cpp-arrow version 19.0.0
  num_columns: 6
  num_rows: 2000
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 3733

### postgres

In [20]:
pg_host = 'localhost'
pg_port = '5434'
pg_db = 'webinar'
pg_table = 'costs'
pg_user = 'postgres'
pg_password = 'postgres'

In [116]:
costs = (
    spark.read
    .format('jdbc')
    .option('url', f'jdbc:postgresql://{pg_host}:{pg_port}/{pg_db}')
    .option('dbtable', pg_table)
    .option('user', pg_user)
    .option('password', pg_password)
    .option('driver', 'org.postgresql.Driver')
    .load()
)

In [117]:
costs.show(5, truncate=False)

+----------+-----------+------+------+-----+
|date      |campaign_id|costs |clicks|views|
+----------+-----------+------+------+-----+
|2024-01-01|1          |670.52|40    |110  |
|2024-01-01|2          |602.5 |11    |849  |
|2024-01-01|3          |654.74|51    |566  |
|2024-01-01|4          |897.24|86    |679  |
|2024-01-01|5          |758.19|30    |585  |
+----------+-----------+------+------+-----+
only showing top 5 rows



In [118]:
costs.printSchema()

root
 |-- date: date (nullable = true)
 |-- campaign_id: integer (nullable = true)
 |-- costs: double (nullable = true)
 |-- clicks: integer (nullable = true)
 |-- views: integer (nullable = true)



In [119]:
costs.count()

39300

### clickhouse

In [34]:
from clickhouse_creds import ch_list

In [35]:
ch_host, ch_port, ch_db, ch_table, ch_user, ch_password = ch_list

In [38]:
visits = (
    spark.read
    .format('jdbc')
    .option('url', f'jdbc:clickhouse://{ch_host}:{ch_port}/{ch_db}?ssl=true&sslmode=STRICT')
    .option('dbtable', ch_table)
    .option('user', ch_user)
    .option('password', ch_password)
    .option('driver', 'com.clickhouse.jdbc.ClickHouseDriver')
    .load()
)

In [33]:
visits.show(5)

+-------+-------------------+--------------------+--------+--------+--------+--------------------+----------------+
|visitid|      visitDateTime|                 URL|duration|clientID|  source|         UTMCampaign|          params|
+-------+-------------------+--------------------+--------+--------+--------+--------------------+----------------+
| 189665|2024-01-01 03:19:38|https://our-cool-...|      78|     848|  direct|quarter_ecofriend...|['submit', 2136]|
| 504698|2024-01-01 03:59:20|https://our-cool-...|       8|     527|  direct|year_modern_kitch...|['submit', 4630]|
| 632370|2024-01-01 04:49:09|https://our-cool-...|      72|     520| organic|quarter_ecofriend...|['submit', 2734]|
| 943112|2024-01-01 05:20:28|https://our-cool-...|      14|     117| organic|month_openconcept...|['submit', 5299]|
| 139778|2024-01-01 10:31:30|https://our-cool-...|      73|     655|internal|month_contemporar...| ['submit', 419]|
+-------+-------------------+--------------------+--------+--------+----

In [39]:
visits.show(1, truncate=False, vertical=True)

-RECORD 0-----------------------------------------------------
 visitid       | 189665                                       
 visitDateTime | 2024-01-01 03:19:38                          
 URL           | https://our-cool-website.com/checkout        
 duration      | 78                                           
 clientID      | 848                                          
 source        | direct                                       
 UTMCampaign   | quarter_ecofriendly_kitchens_launch_20240205 
 params        | ['submit', 2136]                             
only showing top 1 row



## Готовим источники

### Визиты (clickhouse)

In [40]:
visits.printSchema()

root
 |-- visitid: integer (nullable = true)
 |-- visitDateTime: timestamp (nullable = true)
 |-- URL: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- clientID: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- UTMCampaign: string (nullable = true)
 |-- params: string (nullable = true)



In [41]:
visits.show(1)

+-------+-------------------+--------------------+--------+--------+------+--------------------+----------------+
|visitid|      visitDateTime|                 URL|duration|clientID|source|         UTMCampaign|          params|
+-------+-------------------+--------------------+--------+--------+------+--------------------+----------------+
| 189665|2024-01-01 03:19:38|https://our-cool-...|      78|     848|direct|quarter_ecofriend...|['submit', 2136]|
+-------+-------------------+--------------------+--------+--------+------+--------------------+----------------+
only showing top 1 row



In [40]:
# кх
filtered_step1 = (
    visits
    .withColumn('dt', F.date_format(F.col('visitDateTime'), 'yyyy-MM-dd'))
    .where(F.col('dt') >= '2024-01-01')
    .where(F.col('dt') < '2025-01-28')
    .where(F.col('source').isin('ad', 'direct'))
    .where(F.col('URL').rlike('.*checkout.*|.*add.*|.*home.*|.*contact.*|.*top50.*|.*customer-service.*|.*wishlist.*|.*sale.*|.*best-sellers.*|.*view.*|.*discount.*|.*featured.*|.*new-arrivals.*|.*settings.*|.*return-policy.*|.*edit.*|.*delete.*|.*reviews.*|.*products.*|.*about.*'))
    .select(
        'dt',
        'visitid',
        'clientID',
        'URL',
        'duration',
        'source',
        'UTMCampaign',
        'params',
        F.regexp_replace(F.col('params'), r'\[|\]', '').alias('params_regex'),
        F.split(F.regexp_replace(F.col('params'), r'\[|\]', ''), ', ').alias('params_split')
    )
)

In [41]:
filtered_step1.show(1, truncate=False, vertical=True)

-RECORD 0----------------------------------------------------
 dt           | 2024-01-01                                   
 visitid      | 189665                                       
 clientID     | 848                                          
 URL          | https://our-cool-website.com/checkout        
 duration     | 78                                           
 source       | direct                                       
 UTMCampaign  | quarter_ecofriendly_kitchens_launch_20240205 
 params       | ['submit', 2136]                             
 params_regex | 'submit', 2136                               
 params_split | ['submit', 2136]                             
only showing top 1 row



In [53]:
filtered_step1.printSchema()

root
 |-- dt: string (nullable = true)
 |-- visitid: integer (nullable = true)
 |-- clientID: integer (nullable = true)
 |-- URL: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- UTMCampaign: string (nullable = true)
 |-- params: string (nullable = true)
 |-- params_regex: string (nullable = true)
 |-- params_split: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [42]:
filtered_step2 = (
    filtered_step1
    .withColumn('event_type', F.regexp_replace(F.col('params_split')[0], "'", ''))
    .withColumn('event_id', F.col('params_split')[1].cast('int'))
)

In [57]:
filtered_step2.show(1, truncate=False, vertical=True)

-RECORD 0----------------------------------------------------
 dt           | 2024-01-01                                   
 visitid      | 189665                                       
 clientID     | 848                                          
 URL          | https://our-cool-website.com/checkout        
 duration     | 78                                           
 source       | direct                                       
 UTMCampaign  | quarter_ecofriendly_kitchens_launch_20240205 
 params       | ['submit', 2136]                             
 params_regex | 'submit', 2136                               
 params_split | ['submit', 2136]                             
 event_type   | submit                                       
 event_id     | 2136                                         
only showing top 1 row



In [43]:
visits_df = (
    filtered_step2
    .where(F.col('event_type') == 'submit')
    .select(
        'dt',
        'visitid',
        'clientID',
        'URL',
        'duration',
        'source',
        'UTMCampaign',
        'event_type',
        'event_id'
    )
)

In [59]:
visits_df.show(1, truncate=False, vertical=True)

-RECORD 0---------------------------------------------------
 dt          | 2024-01-01                                   
 visitid     | 189665                                       
 clientID    | 848                                          
 URL         | https://our-cool-website.com/checkout        
 duration    | 78                                           
 source      | direct                                       
 UTMCampaign | quarter_ecofriendly_kitchens_launch_20240205 
 event_type  | submit                                       
 event_id    | 2136                                         
only showing top 1 row



In [48]:
visits_df.printSchema()

root
 |-- dt: string (nullable = true)
 |-- visitid: integer (nullable = true)
 |-- clientID: integer (nullable = true)
 |-- URL: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- UTMCampaign: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_id: integer (nullable = true)



In [44]:
visits_df.count()

2433

### Расходы (postgres)

In [120]:
costs_df = (
    costs
    .groupBy(
        F.col('date').cast('string').alias('date'),
        'campaign_id'
    )
    .agg(
        F.sum(F.col('costs')).cast('decimal(19,2)').alias('costs'),
        F.sum(F.col('clicks')).alias('clicks'),
        F.sum(F.col('views')).alias('views')
    )
)

In [121]:
costs_df.show(5)

+----------+-----------+------+------+-----+
|      date|campaign_id| costs|clicks|views|
+----------+-----------+------+------+-----+
|2024-01-05|         75|107.03|    26|  958|
|2024-01-05|         97|408.51|    59|  292|
|2024-01-06|         44|748.68|    33|  380|
|2024-01-06|         58|422.91|    30|  575|
|2024-01-11|         91|496.45|     5|  435|
+----------+-----------+------+------+-----+
only showing top 5 rows



In [122]:
costs_df.count()

39300

In [47]:
costs_df.printSchema()

root
 |-- date: string (nullable = true)
 |-- campaign_id: integer (nullable = true)
 |-- costs: decimal(19,2) (nullable = true)
 |-- clicks: long (nullable = true)
 |-- views: long (nullable = true)



### Кампании (csv)

In [51]:
campaigns_dict.show(1, truncate=False)

+-----------+-----------------------------------+
|campaign_id|campaign_name                      |
+-----------+-----------------------------------+
|1          |year_modern_kitchen_launch_20250115|
+-----------+-----------------------------------+
only showing top 1 row



In [52]:
campaigns_dict.printSchema()

root
 |-- campaign_id: string (nullable = true)
 |-- campaign_name: string (nullable = true)



In [53]:
campaigns_df = (
    campaigns_dict
    .withColumn('campaign_id', F.col('campaign_id').cast('integer'))
)

In [54]:
campaigns_df.printSchema()

root
 |-- campaign_id: integer (nullable = true)
 |-- campaign_name: string (nullable = true)



### Заявки (parquet)

In [56]:
submits.printSchema()

root
 |-- submit_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- phone: long (nullable = true)



In [57]:
submits.show(1)

+---------+--------+-----------+
|submit_id|    name|      phone|
+---------+--------+-----------+
|     2282|Jennifer|79511904041|
+---------+--------+-----------+
only showing top 1 row



In [65]:
submits_df = (
    submits
    .withColumn('phone', F.col('phone').cast('string'))
    .withColumn('phone_plus', F.concat(F.lit('+'), F.col('phone')))
    .withColumn('phone_md5', F.md5('phone'))
    .withColumn('phone_plus_md5', F.md5('phone_plus'))
)

In [67]:
submits_df.show(2, truncate=False)

+---------+--------+-----------+------------+--------------------------------+--------------------------------+
|submit_id|name    |phone      |phone_plus  |phone_md5                       |phone_plus_md5                  |
+---------+--------+-----------+------------+--------------------------------+--------------------------------+
|2282     |Jennifer|79511904041|+79511904041|4c7720fdf6f9eec623dc0f961f31f488|6ce81d5347bcd3eadb2921b7c4828e3b|
|9898     |Jeffrey |79824419733|+79824419733|9d106e45036bd4176774eb94adc9aacc|43630233dcc965f9827e394038b0321a|
+---------+--------+-----------+------------+--------------------------------+--------------------------------+
only showing top 2 rows



### Сделки (parquet)

In [68]:
deals.printSchema()

root
 |-- deal_id: long (nullable = true)
 |-- deal_date: string (nullable = true)
 |-- fio: string (nullable = true)
 |-- phone: long (nullable = true)
 |-- email: string (nullable = true)
 |-- address: string (nullable = true)



In [69]:
deals_df = (
    deals
    .withColumn('phone', F.col('phone').cast('string'))
)

In [70]:
deals_df.show(1)

+-------+----------+----------+-----------+------------------+--------------------+
|deal_id| deal_date|       fio|      phone|             email|             address|
+-------+----------+----------+-----------+------------------+--------------------+
|      1|2024-03-04|Gregory Wu|79746561889|paul80@example.net|098 Yates Cliff A...|
+-------+----------+----------+-----------+------------------+--------------------+
only showing top 1 row



In [72]:
deals_df.printSchema()

root
 |-- deal_id: long (nullable = true)
 |-- deal_date: string (nullable = true)
 |-- fio: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address: string (nullable = true)



## Собираем витрину

In [131]:
final_df = (
    visits_df.alias('v')
    .join(
        submits_df.alias('s'),
        F.col('v.event_id') == F.col('s.submit_id'),
        'left'
    )
    .join(
        deals_df.alias('d'),
        (F.col('s.phone') == F.col('d.phone')) &
        (F.col('v.dt') <= F.col('d.deal_date')),
        'left'
    )
    .join(
        campaigns_df.alias('camp'),
        F.col('v.utmcampaign') == F.col('camp.campaign_name'),
        'left'
    )
    .join(
        costs_df.alias('c'),
        (F.col('camp.campaign_id') == F.col('c.campaign_id')) &
        (F.col('v.dt') == F.col('c.date')),
        'left'
    )
)

In [162]:
final_df.cache()

DataFrame[dt: string, visitid: int, clientID: int, URL: string, duration: int, source: string, UTMCampaign: string, event_type: string, event_id: int, submit_id: bigint, name: string, phone: string, phone_plus: string, phone_md5: string, phone_plus_md5: string, deal_id: bigint, deal_date: string, fio: string, phone: string, email: string, address: string, campaign_id: int, campaign_name: string, date: string, campaign_id: int, costs: decimal(19,2), clicks: bigint, views: bigint]

In [163]:
final_df.count()

2639

## Считаем метрики и анализируем результаты

In [145]:
final_df_agg3 = (
    final_df
    .groupBy('camp.campaign_name')
    .agg(
        F.countDistinct('visitid').alias('unique_visits'),
        F.countDistinct('clientid').alias('unique_clients'),
        F.countDistinct('submit_id').alias('unique_submits'),
        F.countDistinct('deal_id').alias('unique_deals'),
        F.sum('costs').alias('total_costs'),
        F.sum('clicks').alias('total_clicks'),
        F.sum('views').alias('total_views')
    )
    .withColumn('avg_deal_cost', (F.col('total_costs') / F.col('unique_deals')).cast('decimal(19,2)'))
)

In [148]:
(
    final_df_agg3
    .where('unique_deals = 0')
    .select('campaign_name', 'total_costs', 'unique_deals')
    .sort('total_costs')
    .show(truncate=False)
)

+-----------------------------------------------+-----------+------------+
|campaign_name                                  |total_costs|unique_deals|
+-----------------------------------------------+-----------+------------+
|month_contemporary_kitchens_event_20241208     |8277.73    |0           |
|quarter_custom_kitchens_showcase_20240210      |8397.63    |0           |
|month_smart_kitchens_launch_20240330           |8703.22    |0           |
|month_smart_kitchen_promotion_20240305         |10276.42   |0           |
|month_ecofriendly_kitchens_experience_20241219 |10559.40   |0           |
|month_contemporary_kitchens_event_20240920     |10640.55   |0           |
|quarter_ecofriendly_kitchen_experience_20241119|10804.92   |0           |
|year_traditional_kitchens_showcase_20241018    |11655.58   |0           |
|year_modern_kitchens_demo_20240419             |12028.69   |0           |
|quarter_luxury_kitchens_innovation_20241116    |12894.26   |0           |
|month_openconcept_kitche

In [147]:
(
    final_df_agg3
    .where('unique_deals > 0')
    .select('campaign_name', 'total_costs', 'unique_deals', 'avg_deal_cost')
    .sort('avg_deal_cost')
    .show(truncate=False)
)

+------------------------------------------------+-----------+------------+-------------+
|campaign_name                                   |total_costs|unique_deals|avg_deal_cost|
+------------------------------------------------+-----------+------------+-------------+
|year_modern_kitchens_showcase_20241014          |10936.74   |8           |1367.09      |
|quarter_custom_kitchens_innovation_20240817     |15422.19   |10          |1542.22      |
|month_contemporary_kitchens_promotion_20241215  |12423.61   |8           |1552.95      |
|quarter_custom_kitchens_experience_20240222     |15455.11   |9           |1717.23      |
|year_modern_kitchen_experience_20240702         |9068.35    |5           |1813.67      |
|year_smart_kitchens_initiative_20241004         |12958.53   |6           |2159.76      |
|quarter_custom_kitchen_initiative_20241102      |18846.27   |8           |2355.78      |
|month_openconcept_kitchens_promotion_20240628   |15318.45   |6           |2553.08      |
|year_trad

In [149]:
(
    final_df_agg3
    .where('unique_deals > 0')
    .select('campaign_name', 'total_costs', 'unique_deals', 'avg_deal_cost')
    .sort('avg_deal_cost', ascending=False)
    .show(truncate=False)
)

+-----------------------------------------------+-----------+------------+-------------+
|campaign_name                                  |total_costs|unique_deals|avg_deal_cost|
+-----------------------------------------------+-----------+------------+-------------+
|quarter_custom_kitchen_show_20240213           |16643.02   |1           |16643.02     |
|month_contemporary_kitchen_showcase_20240630   |15908.47   |1           |15908.47     |
|quarter_spacesaving_kitchen_innovation_20240825|15314.02   |1           |15314.02     |
|year_luxury_kitchens_show_20241001             |15132.48   |1           |15132.48     |
|year_traditional_kitchens_launch_20240707      |13893.29   |1           |13893.29     |
|quarter_spacesaving_kitchen_showcase_20240228  |13126.07   |1           |13126.07     |
|quarter_spacesaving_kitchen_innovation_20241103|13063.42   |1           |13063.42     |
|month_custom_kitchens_show_20241205            |13045.27   |1           |13045.27     |
|year_traditional_kit

In [154]:
final_df_agg4 = (
    final_df
    .groupBy(F.substring('c.date', 1, 7).alias('month'))
    .agg(
        F.countDistinct('visitid').alias('unique_visits'),
        F.countDistinct('clientid').alias('unique_clients'),
        F.countDistinct('submit_id').alias('unique_submits'),
        F.countDistinct('deal_id').alias('unique_deals'),
        F.sum('costs').alias('total_costs'),
        F.sum('clicks').alias('total_clicks'),
        F.sum('views').alias('total_views')
    )
    .withColumn('avg_deal_cost', (F.col('total_costs') / F.col('unique_deals')).cast('decimal(19,2)'))
    .sort('month')
)

In [155]:
final_df_agg4.show()

+-------+-------------+--------------+--------------+------------+-----------+------------+-----------+-------------+
|  month|unique_visits|unique_clients|unique_submits|unique_deals|total_costs|total_clicks|total_views|avg_deal_cost|
+-------+-------------+--------------+--------------+------------+-----------+------------+-----------+-------------+
|2024-01|          202|           183|            66|          36|  108348.27|       10843|     122867|      3009.67|
|2024-02|          185|           165|            56|          27|  101600.49|        8733|     102168|      3762.98|
|2024-03|          183|           164|            58|          35|  100875.50|        9967|     101423|      2882.16|
|2024-04|          184|           163|            53|          21|   97707.65|        9748|     104686|      4652.75|
|2024-05|          191|           177|            68|          30|  101051.11|       10386|      98112|      3368.37|
|2024-06|          212|           184|            72|   

In [159]:
# выручка
(
    final_df_agg4
    .withColumn('revenue', F.col('unique_deals') * F.lit(5000))
    .withColumn('profit', F.col('revenue') - F.col('total_costs'))
    .show()
)

+-------+-------------+--------------+--------------+------------+-----------+------------+-----------+-------------+-------+---------+
|  month|unique_visits|unique_clients|unique_submits|unique_deals|total_costs|total_clicks|total_views|avg_deal_cost|revenue|   profit|
+-------+-------------+--------------+--------------+------------+-----------+------------+-----------+-------------+-------+---------+
|2024-01|          202|           183|            66|          36|  108348.27|       10843|     122867|      3009.67| 180000| 71651.73|
|2024-02|          185|           165|            56|          27|  101600.49|        8733|     102168|      3762.98| 135000| 33399.51|
|2024-03|          183|           164|            58|          35|  100875.50|        9967|     101423|      2882.16| 175000| 74124.50|
|2024-04|          184|           163|            53|          21|   97707.65|        9748|     104686|      4652.75| 105000|  7292.35|
|2024-05|          191|           177|          

In [160]:
final_df_agg4.select(F.sum('total_costs')).show()

+----------------+
|sum(total_costs)|
+----------------+
|      1313753.39|
+----------------+



In [102]:
final_df_agg = (
    final_df
    .groupBy('dt')
    .agg(
        F.countDistinct('visitid').alias('unique_visits'),
        F.countDistinct('clientid').alias('unique_clients'),
        F.countDistinct('submit_id').alias('unique_submits'),
        F.countDistinct('deal_id').alias('unique_deals'),
        F.sum('costs').alias('total_costs'),
        F.sum('clicks').alias('total_clicks'),
        F.sum('views').alias('total_views')
    )
    .sort('dt')
)

In [103]:
final_df_agg.show()

+----------+-------------+--------------+--------------+------------+-----------+------------+-----------+
|        dt|unique_visits|unique_clients|unique_submits|unique_deals|total_costs|total_clicks|total_views|
+----------+-------------+--------------+--------------+------------+-----------+------------+-----------+
|2024-01-08|            1|             1|             1|           1|   57122.22|        1702|      13579|
|2024-01-16|            1|             1|             1|           1|   73352.69|        9908|      15293|
|2024-01-25|            1|             1|             1|           2|   25914.04|        7300|      84808|
|2024-01-26|            1|             1|             1|           3|  193752.48|        3141|      55902|
|2024-01-29|            1|             1|             1|           1|   92952.22|        2992|      29558|
|2024-02-03|            1|             1|             1|           2|   25867.90|       12302|      66908|
|2024-02-12|            1|           

In [106]:
final_df_agg2 = (
    final_df
    .groupBy('dt')
    .agg(
        F.countDistinct('visitid').alias('unique_visits'),
        F.countDistinct('clientid').alias('unique_clients'),
        F.countDistinct('submit_id').alias('unique_submits'),
        F.countDistinct('deal_id').alias('unique_deals'),
        F.sum('costs').alias('total_costs'),
        F.sum('clicks').alias('total_clicks'),
        F.sum('views').alias('total_views')
    )
    .sort('dt')
)

In [107]:
final_df_agg2.show()

+----------+-------------+--------------+--------------+------------+-----------+------------+-----------+
|        dt|unique_visits|unique_clients|unique_submits|unique_deals|total_costs|total_clicks|total_views|
+----------+-------------+--------------+--------------+------------+-----------+------------+-----------+
|2024-01-01|            5|             5|             1|           1|  143063.95|       13660|     125853|
|2024-01-02|            4|             4|             2|           0|  120594.77|       21639|      75900|
|2024-01-03|            5|             5|             3|           3|   28065.32|        3545|       5777|
|2024-01-04|           10|            10|             2|           1|   31124.92|        4816|      35505|
|2024-01-05|           10|            10|             4|           2|  240127.71|       27834|     149946|
|2024-01-06|            9|             9|             2|           1|  240600.89|       15072|      68084|
|2024-01-07|            7|           

In [97]:
final_df.show(1, False, True)

-RECORD 0----------------------------------------------------
 dt             | 2024-05-14                                 
 visitid        | 926827                                     
 clientID       | 176                                        
 URL            | https://our-cool-website.com/edit          
 duration       | 99                                         
 source         | ad                                         
 UTMCampaign    | quarter_ecofriendly_kitchen_show_20240530  
 event_type     | submit                                     
 event_id       | 9985                                       
 submit_id      | 9985                                       
 name           | John                                       
 phone          | 79438904462                                
 phone_plus     | +79438904462                               
 phone_md5      | cd1e72755dc543307906bf638ba3dbdf           
 phone_plus_md5 | 6351238b50bede3e93763bd73db191aa           
 deal_id

## Освобождаем ресурсы, останавливаем Spark-сессию

In [164]:
final_df.unpersist()

DataFrame[dt: string, visitid: int, clientID: int, URL: string, duration: int, source: string, UTMCampaign: string, event_type: string, event_id: int, submit_id: bigint, name: string, phone: string, phone_plus: string, phone_md5: string, phone_plus_md5: string, deal_id: bigint, deal_date: string, fio: string, phone: string, email: string, address: string, campaign_id: int, campaign_name: string, date: string, campaign_id: int, costs: decimal(19,2), clicks: bigint, views: bigint]

In [None]:
spark.stop()