# Events Stream Analayzer 
#### Analyze a stream of events generated by an imaginary service.

In [32]:
from pyspark.sql import SparkSession

In [33]:
source_path = '../src/csv'

output_path = '../output'

In [34]:
spark = SparkSession.builder.appName("Events").master("local[*]").getOrCreate()

In [35]:
spark

In [36]:
filename = f'{source_path}/events.csv'
print(f"Trying to load filename: {filename} into Spark Datafram...")

df = spark.read.format('csv').load(filename, header=True, inferSchema=True)
print('Filename loaded!')

Trying to load filename: ../src/csv/events.csv into Spark Datafram...
Filename loaded!


In [37]:
df.printSchema()

root
 |-- event_id: string (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- url: string (nullable = true)



In [38]:
df.limit(10).show(truncate=0)

+------------------------------------+-------+-------------------+-----------+-----------------------------------------+
|event_id                            |user_id|timestamp          |event_type |url                                      |
+------------------------------------+-------+-------------------+-----------+-----------------------------------------+
|f26f5621-02cb-43a6-b542-6616945d7bf9|48986  |2024-12-13 00:00:30|end_session|https://myfrontsite.com/contact          |
|cccd8c23-c642-40bf-8631-b90ca2db8cf0|37016  |2024-12-13 00:00:37|in-page    |https://myfrontsite.com/contact          |
|15d5cbcf-dd07-4b83-8f5b-c7ad133e4c49|32549  |2024-12-13 00:02:35|conversion |https://myfrontsite.com/home             |
|e8dbb325-f65b-48ac-b063-890f6efe6eb1|62972  |2024-12-13 00:03:59|in-page    |https://myfrontsite.com/checkout         |
|8ca1a4d6-8a79-4996-add5-6b00b4959744|38252  |2024-12-13 00:04:17|conversion |https://myfrontsite.com/home             |
|6ad59f7f-9019-4738-9c95-24cbf91

## Top 10 users that convert the most

In [39]:
df_users_converted = df.where('event_type = "conversion"')

In [40]:
from pyspark.sql.functions import col, count,desc

df_users_converted_agg = df_users_converted \
    .groupBy(col('user_id')) \
    .agg(count('event_id').alias('conversion_count')) \
    .orderBy('conversion_count', ascending=False) \
    .limit(10)

In [41]:
print('Top 10 users that convert the most:')
df_users_converted_agg.show(truncate=0)

Top 10 users that convert the most:
+-------+----------------+
|user_id|conversion_count|
+-------+----------------+
|61284  |16              |
|47732  |13              |
|32549  |13              |
|16232  |12              |
|66826  |12              |
|62972  |12              |
|37016  |12              |
|65429  |12              |
|48986  |11              |
|22850  |11              |
+-------+----------------+



In [42]:
output_filename = f'{output_path}/df_users_converted_agg.csv'

df_users_converted_agg.write.format('csv').save(path=output_filename, mode='overwrite', header=True)

print(f'CSV report file created at: {output_filename}')

CSV report file created at: ../output/df_users_converted_agg.csv


In [43]:
############################################################

## Fast converting users

In [13]:
df.where('user_id = 30040').orderBy('timestamp').show(truncate=0)

+------------------------------------+-------+-------------------+-------------+-----------------------------------------+
|event_id                            |user_id|timestamp          |event_type   |url                                      |
+------------------------------------+-------+-------------------+-------------+-----------------------------------------+
|c7533086-5515-42e8-8250-5132ce0615a1|30040  |2024-12-13 00:37:42|in-page      |https://myfrontsite.com/display-product/2|
|81070106-fcdb-4463-bd0f-808d9ade79cd|30040  |2024-12-13 00:55:37|in-page      |https://myfrontsite.com/home             |
|9dd4de6e-4d1a-40e1-97c7-0e82479fb0c1|30040  |2024-12-13 01:01:20|in-page      |https://myfrontsite.com/display-product/1|
|bb45fa65-5887-466c-98cc-618776efb405|30040  |2024-12-13 01:25:56|conversion   |https://myfrontsite.com/contact          |
|3b8823dc-e2f3-457a-99f3-5708e579967a|30040  |2024-12-13 01:43:26|conversion   |https://myfrontsite.com/search-products  |
|5fed06f2-2039-4

In [14]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, min

df_min_start_session = df.where('event_type = "start_session"').groupBy('user_id').agg(min('timestamp').alias('first_start_session_ts'))

In [15]:
df_join_min_start_session = df.where('event_type != "start_session"').join(df_min_start_session, on='user_id')

In [16]:
df_min_conversion = df_join_min_start_session.where('timestamp > first_start_session_ts and event_type="conversion"').groupBy('user_id').agg(min('timestamp').alias('first_conversion_ts'))
# df_min_conversion.where('user_id = ç').show()

In [17]:
df_filtered_ts = df_join_min_start_session.join(df_min_conversion, on='user_id').where('timestamp >= first_start_session_ts and timestamp <= first_conversion_ts')

In [18]:
df_users_converted_distance = df_filtered_ts.groupBy('user_id').agg(count('timestamp').alias('distance')).orderBy('distance', ascending=True)

In [19]:
df_users_converted_distance.show(10)

+-------+--------+
|user_id|distance|
+-------+--------+
|  48986|       1|
|  47732|       1|
|  22850|       1|
|  61284|       1|
|  96346|       1|
|  68620|       1|
|  32549|       2|
|  86713|       2|
|  37016|       2|
|  30040|       2|
+-------+--------+
only showing top 10 rows



In [29]:
output_filename = f'{output_path}/df_users_converted_distance.csv'

df_users_converted_distance.write.format('csv').save(path=output_filename, mode='overwrite', header=True)

print(f'CSV report file created at: {output_filename}')

CSV report file created at: ../output/df_users_converted_distance.csv


## Average Converting Distance

In [24]:
from pyspark.sql.functions import avg, round

df_users_converted_distance.agg(round(avg('distance'),2).alias('avg_conv_distance')).show()
print('A scalar number that represents how many events does the average user has to go through until conversion')

+-----------------+
|avg_conv_distance|
+-----------------+
|             3.13|
+-----------------+

A scalar number that represents how many events does the average user has to go through until conversion


## Users URls Path Pattern

In [25]:
from pyspark.sql.functions import regexp_replace

df_clean_url = df.select('user_id', regexp_replace(col('url'),'https://myfrontsite.com', '').alias('url'))

In [26]:
from pyspark.sql.functions import collect_list, concat_ws

df_urls_agg = df_clean_url.groupBy('user_id').agg(concat_ws(',', collect_list("url")).alias('urls'))

In [27]:
tested_pattren = ['/search-products', '/display-product/1', '/buy-product']

df_tested_pattren = spark.createDataFrame([('tested_pattren', value,) for value in tested_pattren], ['key', 'value'])

df_tested_urls_agg = df_tested_pattren.groupBy('key').agg(concat_ws(',', collect_list("value")).alias('tested_pattren'))

tested_urls = df_tested_urls_agg.collect()[0][1]

print(f'Lookup pattern: {tested_urls}')

[Stage 57:>                                                         (0 + 8) / 8]

Lookup pattern: /search-products,/display-product/1,/buy-product


                                                                                

In [28]:
df_urls_agg.filter(col("urls").contains(tested_urls)).show(truncate=0)

+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_id|urls                                                                                                                                                                                                                                                                                                                                                                                                                                              |
+-------+-----------------------------------------------------------------------------------------------------