In [11]:
!pip install faker tqdm



In [71]:
import math

import pyspark
from pyspark.sql import SparkSession

from pyspark.sql.functions import col, lag, unix_timestamp, log, exp, count
from pyspark.sql.window import Window

import random
import pandas as pd
from faker import Faker
from datetime import datetime, timedelta
from tqdm import tqdm

In [None]:
fake = Faker()

device_types = ["desktop", "mobile", "tablet"]

action_types = [
    "click",
    "scroll",
    "view_page",
    "add_to_cart",
    "remove_from_cart",
    "share",
    "like",
    "comment",
]

def generate_session(user_id, session_id):
    session_start = fake.date_time_this_year()
    num_actions = random.randint(2, 10)

    current_time = session_start
    actions = []

    for _ in range(num_actions):
        action_time = current_time
        duration_seconds = random.randint(5, 600)
        action = random.choice(action_types)
        page_views = random.randint(1, 30)
        device = random.choice(device_types)
        location = fake.city()

        actions.append({
            "User_ID": user_id,
            "Session_ID": session_id,
            "Start_Time": action_time.strftime("%Y-%m-%d %H:%M:%S"),
            "End_Time": (action_time + timedelta(seconds=duration_seconds)).strftime("%Y-%m-%d %H:%M:%S"),
            "Duration_seconds": duration_seconds,
            "Action": action,
            "Page_Views": page_views,
            "Device_Type": device,
            "Location": location
        })

        pause = random.randint(1, 40000)
        current_time = action_time + timedelta(seconds=pause)

    return actions

sessions = []
for user_id in tqdm(range(1, 100_000)):
    num_sessions = random.randint(1, 5)
    for session_id in range(1, num_sessions + 1):
        session_data = generate_session(user_id, session_id)
        sessions.extend(session_data)

df_sessions = pd.DataFrame(sessions)

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 99999/99999 [03:38<00:00, 456.80it/s]


In [43]:
df_sessions.to_csv('/data/user_sessions.csv', index=False)

In [44]:
spark = SparkSession.builder \
    .master('spark://master:7077') \
    .appName('PySpark') \
    .getOrCreate()

In [45]:
df_spark = spark.read.option("header", "true").csv("file:///data/user_sessions.csv")

In [46]:
df_spark.printSchema()

root
 |-- User_ID: string (nullable = true)
 |-- Session_ID: string (nullable = true)
 |-- Start_Time: string (nullable = true)
 |-- End_Time: string (nullable = true)
 |-- Duration_seconds: string (nullable = true)
 |-- Action: string (nullable = true)
 |-- Page_Views: string (nullable = true)
 |-- Device_Type: string (nullable = true)
 |-- Location: string (nullable = true)



In [47]:
df_spark.limit(5).toPandas()

Unnamed: 0,User_ID,Session_ID,Start_Time,End_Time,Duration_seconds,Action,Page_Views,Device_Type,Location
0,1,1,2025-02-21 03:58:25,2025-02-21 04:05:48,443,comment,30,tablet,New Jessica
1,1,1,2025-02-21 05:33:13,2025-02-21 05:38:36,323,comment,6,tablet,North Stacey
2,1,1,2025-02-21 06:06:25,2025-02-21 06:06:38,13,view_page,28,desktop,East Tabithatown
3,1,1,2025-02-21 13:43:34,2025-02-21 13:51:42,488,add_to_cart,4,desktop,Sandybury
4,1,1,2025-02-22 00:03:36,2025-02-22 00:04:27,51,like,17,desktop,East Brittany


In [50]:
df = df_spark.withColumn("Start_Time", col("Start_Time").cast("timestamp").cast("long")) \
             .withColumn("End_Time", col("End_Time").cast("timestamp").cast("long"))

In [51]:
df.limit(5).toPandas()

Unnamed: 0,User_ID,Session_ID,Start_Time,End_Time,Duration_seconds,Action,Page_Views,Device_Type,Location
0,1,1,1740110305,1740110748,443,comment,30,tablet,New Jessica
1,1,1,1740115993,1740116316,323,comment,6,tablet,North Stacey
2,1,1,1740117985,1740117998,13,view_page,28,desktop,East Tabithatown
3,1,1,1740145414,1740145902,488,add_to_cart,4,desktop,Sandybury
4,1,1,1740182616,1740182667,51,like,17,desktop,East Brittany


In [57]:
rdd = df.orderBy("User_ID", "Start_Time").rdd.map(
    lambda row: (row["User_ID"], (row["Start_Time"], row["End_Time"]))
)

rdd_grouped = rdd.groupByKey().mapValues(list)

                                                                                

In [None]:
def mapper(user_sessions):
    sessions = sorted(user_sessions, key=lambda x: x[0])

    if not sessions:
        return []

    ses_bounds = []
    cur_ses_start = sessions[0][0]
    cur_ses_end = sessions[0][1]

    for i in range(1, len(sessions)):
        start_time, end_time = sessions[i]
        prev_end_time = sessions[i-1][1]

        if start_time - prev_end_time > 1800:
            ses_bounds.append((cur_ses_start, cur_ses_end))
            cur_ses_start = start_time
            cur_ses_end = end_time
        else:
            cur_ses_end = end_time

    ses_bounds.append((cur_ses_start, cur_ses_end))

    gaps = []
    for i in range(len(ses_bounds) - 1):
        gap = ses_bounds[i+1][0] - ses_bounds[i][1]
        if gap > 0:
            gaps.append(gap)

    return gaps

gaps = rdd_grouped.flatMap(lambda x: mapper(x[1]))

In [70]:
log_sum = gaps.map(lambda x: math.log(x)).reduce(lambda x, y: x + y)
count = gaps.count()
geometric_mean = math.exp(log_sum / count)
print(f"{geometric_mean} сек")



27777.203943467666 сек



                                                                                