In [0]:
df = spark.read.csv("/Volumes/workspace/default/oid_lab_4/mobile_app_interactions_expanded.csv", header=True)

In [0]:
display(df)

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

# --- ВСТАНОВЛЕНО КОРЕКТНИЙ ФОРМАТ ---
TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss" 
# ------------------------------------

# Припустімо, df - це ваш DataFrame

# 1. Вибірка та перетворення типів
df_cleaned = df.select(
    F.col("user_id").alias("user_id"),
    F.col("session_id").alias("session_id"),
    # ВИПРАВЛЕНО: Використовуємо коректний TIME_FORMAT
    F.try_to_timestamp(
        F.col("timestamp"), 
        F.lit(TIME_FORMAT)
    ).alias("timestamp") # Перейменуємо на "timestamp"
)

# 2. Очищення: відкидаємо рядки, де конвертація була невдалою
df_cleaned = df_cleaned.filter(F.col("timestamp").isNotNull())

print("Схема робочого DataFrame після коректної конвертації:")
df_cleaned.printSchema()

In [0]:
display(df_cleaned)

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import min as F_min, max as F_max, avg as F_avg, count
import time

# --- Вхідний DataFrame: df_cleaned ---
# Припустімо, що df_cleaned містить: "user_id", "session_id", "timestamp"
df_input = df_cleaned 


# --- 1. Алгоритм 1: Налаштування за замовчуванням (Високе Перетасування) ---

print("\n--- Алгоритм 1: Налаштування за замовчуванням (T_A) ---")
# Spark викликає SHUFFLE за замовчуванням (зазвичай 200 розділів)

start_time_A = time.time()

# 1. Розрахунок тривалості сеансу: Групування за СПИСКОМ колонок
df_sessions_A = df_input.groupBy("user_id", "session_id").agg(
    # Тривалість = MAX("timestamp") - MIN("timestamp")
    (F_max(F.col("timestamp")).cast("long") - F_min(F.col("timestamp")).cast("long")).alias("session_duration_seconds"),
    count("*").alias("event_count")
)

# 2. Кінцева агрегація: Обчислення загального середнього значення (Action)
avg_duration_A = df_sessions_A.agg(F_avg(F.col("session_duration_seconds")).alias("average_session_duration")).collect()

T_A = time.time() - start_time_A

print(f"Час виконання Алгоритму 1 (T_A): {T_A:.2f} сек")
print("Середня тривалість сеансу (алгоритм 1):", avg_duration_A[0]["average_session_duration"])

# --- 2. Алгоритм 2: Оптимізоване Перетасування (Стратегія B) ---

OPTIMAL_PARTITIONS = 24 

print(f"\n--- Алгоритм 2: Оптимізоване Перетасування (T_B, {OPTIMAL_PARTITIONS} розділів) ---")

# ПЕРЕТАСУВАННЯ: Передаємо ОБИДВІ колонки до repartition
df_low_shuffle = df_input.repartition(
    OPTIMAL_PARTITIONS, 
    F.col("user_id"), 
    F.col("session_id")
)

start_time_B = time.time()

# Групування та агрегація (ВИКОРИСТОВУЄМО "timestamp")
# Групування також за двома колонками
df_sessions_B = df_low_shuffle.groupBy("user_id", "session_id").agg(
    (F_max(F.col("timestamp")).cast("long") - F.min(F.col("timestamp")).cast("long")).alias("session_duration_seconds"),
    count("*").alias("event_count")
)

# Кінцева агрегація
avg_duration_B = df_sessions_B.agg(F_avg(F.col("session_duration_seconds")).alias("average_session_duration")).collect()

T_B = time.time() - start_time_B

print(f"Час виконання Алгоритму 2 (T_B): {T_B:.2f} сек")
print("Середня тривалість сеансу (алгоритм 2):", avg_duration_B[0]["average_session_duration"])

In [0]:
import math

N = 8                # Кількість ядер
# ----------------------------------------

# 2. Оцінка Послідовної Частки (1-P)
# Припускаємо, що T_seq (послідовний час) становить 10% від найкращого виміру (T_B)
T_seq = 0.1 * T_B

# T_par_on_N_cores = T_B - T_seq
T_par_on_N_cores = T_B - T_seq

# Оцінка загального часу паралельної роботи на одному ядрі (T_par)
T_par_estimated = T_par_on_N_cores * N

# 3. Обчислення частки паралелізму (P)
# P = T_par / T_total_on_one_core
T_1_estimated = T_seq + T_par_estimated # Час на одному ядрі (T_1)
P = T_par_estimated / T_1_estimated 
P_seq = 1 - P

print("--- Оцінка компонентів часу ---")
print(f"Оцінений послідовний час (T_seq): {T_seq:.2f} сек")
print(f"Оцінений загальний час на 1 ядрі (T_1): {T_1_estimated:.2f} сек")
print(f"Оцінена частка паралельної роботи (P): {P:.4f} ({P*100:.2f}%)")

# --- 4. ЗАСТОСУВАННЯ ЗАКОНУ АМДАЛА ---

# Розрахунок максимального теоретичного прискорення
S_max_theoretical = 1 / (P_seq + (P / N))

print("\n--- Моделювання продуктивності ---")
print(f"Максимальне теоретичне прискорення (S_max) на {N} ядрах: {S_max_theoretical:.2f} рази")
