В этом задании предлагается выполнять задания с помощью pyspark. Запрещено использовать вставки SQL-кода.

## **Задача 1: Анализ транзакций клиентов банка (3б)**

Представьте, что у нас есть миллион транзакций клиентов банка. Каждая транзакция имеет:
- `transaction_id` – уникальный идентификатор.
- `customer_id` – ID клиента.
- `amount` – сумма транзакции.
- `transaction_date` – дата транзакции.
- `category` – категория транзакции (Grocery, Electronics, Entertainment и т. д.).

### **Задание**
1. Найдите топ-5 клиентов с наибольшими расходами за последний месяц (0.5 б).
2. Определите тренд покупок по категориям: сгруппируйте данные по неделям и посчитайте общую сумму покупок (1 б).
3. Используйте оконную функцию, чтобы посчитать скользящее среднее трат клиента за последние 3 транзакции (1.5 б).


In [3]:
pip install faker

[0mNote: you may need to restart the kernel to use updated packages.


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, expr
from faker import Faker
import random
from datetime import datetime, timedelta

spark = SparkSession.builder.appName("BankTransactions").getOrCreate()

fake = Faker()

num_rows = 1000000
customers = [fake.uuid4() for _ in range(5000)]
categories = ["Grocery", "Electronics", "Entertainment", "Clothing", "Travel"]

data = [
    (
        i,
        random.choice(customers),
        round(random.uniform(5, 1000), 2),
        (datetime.today() - timedelta(days=random.randint(0, 365))).strftime("%Y-%m-%d"),
        random.choice(categories)
    )
    for i in range(num_rows)
]

columns = ["transaction_id", "customer_id", "amount", "transaction_date", "category"]
transactions_df = spark.createDataFrame(data, columns)

transactions_df.write.mode("overwrite").parquet("transactions.parquet")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/05 22:53:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/05 22:54:13 WARN TaskSetManager: Stage 0 contains a task of very large size (6456 KiB). The maximum recommended task size is 1000 KiB.
25/03/05 22:54:14 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
25/03/05 22:54:14 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 84,44% for 9 writers
25/03/05 22:54:14 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 76,00% for 10 writers
25/03/05 22:54:15 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row g

In [5]:
transactions_df.show()

25/03/05 22:54:16 WARN TaskSetManager: Stage 1 contains a task of very large size (6456 KiB). The maximum recommended task size is 1000 KiB.
25/03/05 22:54:20 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 1 (TID 10): Attempting to kill Python Worker
                                                                                

+--------------+--------------------+------+----------------+-------------+
|transaction_id|         customer_id|amount|transaction_date|     category|
+--------------+--------------------+------+----------------+-------------+
|             0|446d110b-0dd1-450...|589.11|      2024-12-19|  Electronics|
|             1|cc4710c2-b6d3-4c6...|357.82|      2024-11-22|Entertainment|
|             2|bff2537d-e073-428...|383.43|      2024-10-07|Entertainment|
|             3|37ee4fa5-9a7d-4f4...|166.13|      2024-12-21|       Travel|
|             4|d45226cf-dafe-48a...|367.14|      2024-08-02|       Travel|
|             5|83e83834-7b11-491...|656.95|      2024-05-16|     Clothing|
|             6|7986eafa-3212-4b3...|421.21|      2024-12-07|      Grocery|
|             7|49980263-534c-4ea...|  58.2|      2024-12-14|Entertainment|
|             8|3a4d9c45-df25-481...|355.69|      2024-09-05|Entertainment|
|             9|0652a85e-059b-41a...| 103.2|      2024-12-28|       Travel|
|           

In [6]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

transactions_df = spark.read.parquet("transactions.parquet").withColumn("transaction_date", F.to_date("transaction_date"))
last_month = (datetime.today() - timedelta(days=30)).strftime("%Y-%m-%d")
last_month_transactions = transactions_df.filter(F.col("transaction_date") >= last_month)
result = last_month_transactions.groupBy("customer_id").agg(F.sum("amount").alias("total_spent")).orderBy(F.desc("total_spent")).limit(5)
result.show()

[Stage 3:>                                                        (0 + 10) / 10]

+--------------------+------------------+
|         customer_id|       total_spent|
+--------------------+------------------+
|4f18cfd8-9b35-47c...|          19462.24|
|6285c9b7-a027-499...|          17477.82|
|7f1137bd-ab0d-4b5...|           16684.5|
|676331ee-cbc9-45f...|          16336.27|
|8d8eb288-8ab3-43f...|16265.130000000001|
+--------------------+------------------+



                                                                                

In [7]:
transactions_with_week = transactions_df.withColumn("week", F.weekofyear("transaction_date"))
result_2 = transactions_with_week.groupBy("week", "category").agg(F.sum("amount").alias("total_spent"))
result_2 = result_2.orderBy("week", "category")
result_2.show()



+----+-------------+------------------+
|week|     category|       total_spent|
+----+-------------+------------------+
|   1|     Clothing|1926350.9100000006|
|   1|  Electronics|1904905.3199999998|
|   1|Entertainment|1988606.5299999998|
|   1|      Grocery|1950249.6099999999|
|   1|       Travel|1945807.6000000015|
|   2|     Clothing|1935688.1600000001|
|   2|  Electronics|1946308.7299999995|
|   2|Entertainment|1904798.5200000003|
|   2|      Grocery|1887951.3399999996|
|   2|       Travel|1949722.3000000003|
|   3|     Clothing|        1906964.76|
|   3|  Electronics|1973861.2800000007|
|   3|Entertainment|        1949380.57|
|   3|      Grocery|1872115.6800000002|
|   3|       Travel|        1886989.94|
|   4|     Clothing|1977759.6900000002|
|   4|  Electronics|        1912453.57|
|   4|Entertainment|1933568.5800000003|
|   4|      Grocery|         1919330.9|
|   4|       Travel|1843711.4000000004|
+----+-------------+------------------+
only showing top 20 rows



                                                                                

In [8]:
window_spec = Window.partitionBy("customer_id").orderBy("transaction_date").rowsBetween(-2, 0)
transactions_with_moving_avg = transactions_df.withColumn("moving_avg", F.avg("amount").over(window_spec))
transactions_with_moving_avg.show()

[Stage 9:=====>                                                    (1 + 9) / 10]

+--------------+--------------------+------+----------------+-------------+------------------+
|transaction_id|         customer_id|amount|transaction_date|     category|        moving_avg|
+--------------+--------------------+------+----------------+-------------+------------------+
|        176175|00169abe-c977-467...|417.47|      2024-03-07|  Electronics|            417.47|
|        896155|00169abe-c977-467...|173.72|      2024-03-10|Entertainment|           295.595|
|        987507|00169abe-c977-467...|883.47|      2024-03-13|      Grocery|491.55333333333334|
|        742248|00169abe-c977-467...|300.86|      2024-03-15|Entertainment| 452.6833333333334|
|        428788|00169abe-c977-467...| 354.0|      2024-03-16|Entertainment| 512.7766666666666|
|        800845|00169abe-c977-467...|983.46|      2024-03-16|       Travel| 546.1066666666667|
|        631871|00169abe-c977-467...|487.27|      2024-03-18|  Electronics| 608.2433333333333|
|        316706|00169abe-c977-467...|463.14|      

                                                                                

## **Задача 2: Анализ заказов на Amazon (1.5б)**

Есть два набора данных:
1. `orders` – информация о заказах на Amazon.
2. `products` – информация о товарах.

### **Задание**
1. Найти 5 самых популярных товаров (по количеству заказов) (0.5 б).
2. Определить для каждого пользователя средний чек и медианный чек (0.5 б).
3. Найти товары, у которых средняя оценка выше 4.5, но они заказывались реже 100 раз (0.5 б).

In [10]:
num_orders = 500000
num_products = 10000

products = [(i, fake.word(), round(random.uniform(5, 500), 2), round(random.uniform(1, 5), 2))
            for i in range(num_products)]
orders = [
    (i, random.randint(1, 50000), random.randint(0, num_products - 1), random.randint(1, 5),
     (datetime.today() - timedelta(days=random.randint(0, 365))).strftime("%Y-%m-%d"))
    for i in range(num_orders)
]

products_df = spark.createDataFrame(products, ["product_id", "product_name", "price", "rating"])
orders_df = spark.createDataFrame(orders, ["order_id", "customer_id", "product_id", "quantity", "order_date"])

products_df.write.mode("overwrite").parquet("products.parquet")
orders_df.write.mode("overwrite").parquet("orders.parquet")

25/03/05 22:54:36 WARN TaskSetManager: Stage 13 contains a task of very large size (1304 KiB). The maximum recommended task size is 1000 KiB.
25/03/05 22:54:36 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
25/03/05 22:54:36 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 84,44% for 9 writers
25/03/05 22:54:36 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 76,00% for 10 writers
25/03/05 22:54:37 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 84,44% for 9 writers
25/03/05 22:54:37 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
                                                                                

In [11]:
from pyspark.sql import functions as F

products_df = spark.read.parquet("products.parquet")
orders_df = spark.read.parquet("orders.parquet")
product_popularity = orders_df.groupBy("product_id").agg(F.count("order_id").alias("order_count"))
result = product_popularity.orderBy(F.desc("order_count")).limit(5).join(products_df, "product_id")
result.show()

[Stage 16:>                                                       (0 + 10) / 10]

+----------+-----------+------------+------+------+
|product_id|order_count|product_name| price|rating|
+----------+-----------+------------+------+------+
|      9658|         82|   professor| 14.07|  1.73|
|      7044|         76|    industry| 91.57|  4.58|
|      7152|         77|        fish|  97.3|  4.51|
|      6384|         76|    position| 227.3|  1.37|
|      5307|         80|    indicate|155.98|   2.7|
+----------+-----------+------------+------+------+



                                                                                

In [12]:
data = orders_df.join(products_df.select("product_id", "price"), "product_id")
orders_with_total = data.withColumn("total", F.col("price") * F.col("quantity"))
result = orders_with_total.groupBy("customer_id").agg(
    F.avg("total").alias("avg_check"), F.expr("percentile_approx(total, 0.5)").alias("median_check")
)
result.show()

[Stage 23:>                                                       (0 + 10) / 10]

+-----------+------------------+------------------+
|customer_id|         avg_check|      median_check|
+-----------+------------------+------------------+
|          7|           719.962|            705.28|
|         19| 769.5733333333333|            531.02|
|         22| 789.8585714285715|414.06000000000006|
|         26| 960.5871428571428|            517.53|
|         29| 693.1858333333333|            348.22|
|         34| 897.7600000000001|            989.44|
|         50|1061.1163636363635|           1146.08|
|         54| 982.5925000000001|            980.84|
|         65|          1165.625| 781.0500000000001|
|         77| 410.3999999999999|            330.82|
|         94|          732.6025| 655.6500000000001|
|        110|            959.46|            877.48|
|        112| 659.3166666666666|            493.68|
|        113|1097.0354545454547|           1202.15|
|        126| 824.5574999999999|            716.54|
|        130|         719.97625|            557.52|
|        149

                                                                                

In [13]:
orders_df.show()

+--------+-----------+----------+--------+----------+
|order_id|customer_id|product_id|quantity|order_date|
+--------+-----------+----------+--------+----------+
|   49152|      37128|      7044|       5|2025-03-02|
|   49153|       7175|      5824|       2|2024-09-13|
|   49154|      44933|      3459|       2|2024-12-20|
|   49155|      47281|      8256|       2|2024-08-30|
|   49156|       3301|         3|       2|2024-05-25|
|   49157|      17704|      2178|       1|2024-08-09|
|   49158|      27697|      8627|       4|2024-09-21|
|   49159|       5504|      5753|       1|2025-02-15|
|   49160|      44773|       523|       3|2024-03-14|
|   49161|      12451|      4743|       3|2024-04-16|
|   49162|      13194|      3680|       5|2024-11-24|
|   49163|      39606|      4538|       4|2024-03-17|
|   49164|      10982|      8606|       2|2024-03-12|
|   49165|      18132|      7574|       5|2024-10-09|
|   49166|       1844|      3033|       1|2024-11-03|
|   49167|      30963|      

In [14]:
data = orders_df.join(products_df, "product_id").groupBy("product_id").agg(
    F.avg("rating").alias("avg_rating"),
    F.count("order_id").alias("order_count")
)

data = data.join(products_df.select("product_id", "product_name"), "product_id")
result = data.filter((F.col("avg_rating") > 4.5) & (F.col("order_count") < 100))
result.show()

[Stage 29:>                                                       (0 + 10) / 10]

+----------+-----------------+-----------+------------+
|product_id|       avg_rating|order_count|product_name|
+----------+-----------------+-----------+------------+
|      1677|             4.95|         62|       coach|
|      4894|             4.53|         48|       cause|
|      5409|4.660000000000001|         57|         add|
|      8440|             4.76|         50|       claim|
|      2040|             4.71|         51|       smile|
|      3091|             4.69|         41|       truth|
|      1806|             4.72|         56|       green|
|      7436|             4.94|         43|         key|
|      8209|             4.72|         54|        long|
|      9179|             4.91|         55|     success|
|      5241|4.960000000000001|         57|       state|
|       558|             4.78|         55|        stop|
|      6424|             4.52|         52|     subject|
|      8092|4.610000000000001|         58|        food|
|      6675|4.909999999999999|         37|      

                                                                                

## **Задача 3: Анализ задержек авиарейсов (3б)**

У нас есть данные о 1 миллионе авиарейсов.

### **Задание**
1. Найти 10 самых задерживающихся авиакомпаний (0.5 б).
2. Определить влияние времени суток на задержку (утром, днём, вечером) (1 б).
3. Вычислить скользящее среднее задержек за последние 5 дней для каждого маршрута (1.5 б).

In [16]:
num_flights = 1000000
airlines = ["Delta", "United", "American Airlines", "Southwest", "Alaska Airlines"]
airports = ["JFK", "LAX", "ORD", "DFW", "SFO", "ATL", "MIA"]

flights = [
    (i, random.choice(airlines), random.choice(airports), random.choice(airports),
     random.randint(-10, 180), random.randint(-15, 200),
     (datetime.today() - timedelta(days=random.randint(0, 365), hours=random.randint(0, 23), minutes=random.randint(0, 59))).strftime("%Y-%m-%d %H:%M:%S"))
    for i in range(num_flights)
]

flights_df = spark.createDataFrame(flights, ["flight_id", "airline", "origin", "destination", "departure_delay", "arrival_delay", "flight_date"])
flights_df.write.mode("overwrite").parquet("flights.parquet")

25/03/05 22:55:07 WARN TaskSetManager: Stage 32 contains a task of very large size (3806 KiB). The maximum recommended task size is 1000 KiB.
25/03/05 22:55:08 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
25/03/05 22:55:08 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 84,44% for 9 writers
25/03/05 22:55:08 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 76,00% for 10 writers
25/03/05 22:55:08 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 84,44% for 9 writers
25/03/05 22:55:08 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
                                                                                

In [17]:
from pyspark.sql import functions as F

flights_df = spark.read.parquet("flights.parquet")
result = flights_df.groupBy("airline").agg(F.avg("arrival_delay").alias("avg_delay")).orderBy(F.desc("avg_delay")).limit(10)
result.show()

+-----------------+-----------------+
|          airline|        avg_delay|
+-----------------+-----------------+
|  Alaska Airlines|92.89529542452264|
|American Airlines| 92.5935237981213|
|           United|92.57446446956747|
|            Delta|92.39867658394482|
|        Southwest|92.39472143404045|
+-----------------+-----------------+



In [18]:
flights_df.show()

+---------+-----------------+------+-----------+---------------+-------------+-------------------+
|flight_id|          airline|origin|destination|departure_delay|arrival_delay|        flight_date|
+---------+-----------------+------+-----------+---------------+-------------+-------------------+
|   699392|  Alaska Airlines|   MIA|        LAX|            118|          -14|2025-02-08 10:04:45|
|   699393|American Airlines|   ORD|        ORD|             83|           99|2024-11-29 11:47:45|
|   699394|American Airlines|   SFO|        DFW|             78|          138|2024-07-15 08:46:45|
|   699395|           United|   ATL|        ORD|             54|           99|2024-03-24 10:58:45|
|   699396|           United|   JFK|        ATL|             14|          182|2024-08-24 04:56:45|
|   699397|           United|   JFK|        LAX|             61|           75|2024-10-22 16:26:45|
|   699398|  Alaska Airlines|   ORD|        DFW|            105|           44|2024-10-30 09:00:45|
|   699399

In [19]:
from pyspark.sql.functions import when

data = flights_df.withColumn(
    "time_of_day",
    when((F.hour("flight_date") >= 6) & (F.hour("flight_date") < 12), "morning")
    .when((F.hour("flight_date") >= 12) & (F.hour("flight_date") < 18), "afternoon")
    .otherwise("evening")
)
result = data.groupBy("time_of_day").agg(F.avg("arrival_delay").alias("avg_delay"))
result.show()

+-----------+-----------------+
|time_of_day|        avg_delay|
+-----------+-----------------+
|  afternoon|92.54476631471007|
|    morning|92.54736959696145|
|    evening| 92.5966896270177|
+-----------+-----------------+



Не имеет значение время суток, просто погрешность, может утром люди чуть менее уставшие и лучше работают:)))

In [21]:
from pyspark.sql.window import Window

flights_df = flights_df.withColumn("route", F.concat(F.col("origin"), F.lit("-"), F.col("destination")))
flights_df = flights_df.withColumn("departure_date", F.to_date("flight_date"))
window_spec = Window.partitionBy("route").orderBy("departure_date").rowsBetween(-4, 0)
result = flights_df.withColumn("moving_avg_delay", F.avg("arrival_delay").over(window_spec))
result.show()



+---------+-----------------+------+-----------+---------------+-------------+-------------------+-------+--------------+----------------+
|flight_id|          airline|origin|destination|departure_delay|arrival_delay|        flight_date|  route|departure_date|moving_avg_delay|
+---------+-----------------+------+-----------+---------------+-------------+-------------------+-------+--------------+----------------+
|   776958|        Southwest|   MIA|        DFW|              0|            0|2024-03-04 23:50:46|MIA-DFW|    2024-03-04|             0.0|
|   893797|        Southwest|   MIA|        DFW|             75|           32|2024-03-04 23:41:46|MIA-DFW|    2024-03-04|            16.0|
|   275235|            Delta|   MIA|        DFW|             37|           31|2024-03-04 23:26:43|MIA-DFW|    2024-03-04|            21.0|
|   913959|  Alaska Airlines|   MIA|        DFW|             97|          199|2024-03-04 23:53:46|MIA-DFW|    2024-03-04|            65.5|
|    73764|American Airline

                                                                                

В колонках есть отрицательные числа, не знаю нужно ли их убрать, но я считаю что с ними среднее лучше

## **Задача 4: Анализ поведения пользователей в мобильном приложении (1б)**

У нас есть лог-файл с действиями пользователей в мобильном приложении. Каждая строка представляет событие пользователя.
- `event_id` – ID события.
- `user_id` – ID пользователя.
- `event_type` – Тип события (`login`, `view_product`, `add_to_cart`, `purchase`).
- `event_timestamp` – Время события.

### **Задание**
1. Определите время первой и последней активности каждого пользователя (0.5 б).
2. Рассчитайте среднюю продолжительность сессии для каждого пользователя (0.5 б).

In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, expr
from faker import Faker
import random
from datetime import datetime, timedelta

spark = SparkSession.builder.appName("UserBehaviorAnalysis").getOrCreate()

fake = Faker()

num_rows = 1000000
users = [fake.uuid4() for _ in range(50000)]
event_types = ["login", "view_product", "add_to_cart", "purchase"]

data = [
    (
        i,
        random.choice(users),
        random.choice(event_types),
        (datetime.today() - timedelta(days=random.randint(0, 30), hours=random.randint(0, 23), minutes=random.randint(0, 59))).strftime("%Y-%m-%d %H:%M:%S")
    )
    for i in range(num_rows)
]

columns = ["event_id", "user_id", "event_type", "event_timestamp"]
events_df = spark.createDataFrame(data, columns)

events_df.write.mode("overwrite").parquet("user_events.parquet")

25/03/05 22:55:11 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
25/03/05 22:55:34 WARN TaskSetManager: Stage 44 contains a task of very large size (6740 KiB). The maximum recommended task size is 1000 KiB.
25/03/05 22:55:34 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
25/03/05 22:55:34 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 84,44% for 9 writers
25/03/05 22:55:34 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 76,00% for 10 writers
25/03/05 22:55:35 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 84,44% for 9 writers
25/03/05 22:55:35 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling ro

In [25]:
from pyspark.sql import functions as F

events_df = spark.read.parquet("user_events.parquet").withColumn("event_timestamp", F.to_timestamp("event_timestamp"))
result = events_df.groupBy("user_id").agg(F.min("event_timestamp").alias("first_activity"),
                                                 F.max("event_timestamp").alias("last_activity"))
result.show()

[Stage 46:>                                                       (0 + 10) / 10]

+--------------------+-------------------+-------------------+
|             user_id|     first_activity|      last_activity|
+--------------------+-------------------+-------------------+
|1c0b29f9-d2fc-454...|2025-02-03 01:46:16|2025-03-04 20:53:12|
|66a7d6e7-121e-48c...|2025-02-03 12:10:16|2025-03-03 07:34:13|
|0eddc998-ce13-473...|2025-02-03 10:22:13|2025-03-05 19:11:13|
|ca17a52f-ad52-4af...|2025-02-03 01:40:14|2025-03-05 14:17:15|
|d6a31251-5a48-4bf...|2025-02-03 18:37:17|2025-03-05 01:09:12|
|366ec8f2-5d6f-44b...|2025-02-03 18:32:16|2025-03-04 19:41:15|
|246940a1-9dbe-49c...|2025-02-04 11:16:13|2025-03-04 02:24:13|
|0795b3f7-49d7-483...|2025-02-03 07:16:17|2025-03-02 03:11:15|
|499bdfd3-827e-4df...|2025-02-04 11:07:15|2025-03-05 11:23:13|
|bb9e73db-edae-46e...|2025-02-06 09:43:12|2025-03-05 08:33:16|
|0eebfa40-fed5-46a...|2025-02-05 15:29:17|2025-03-05 01:22:14|
|94656142-1c62-447...|2025-02-03 07:28:12|2025-03-03 18:49:17|
|f50c3740-3d2c-46a...|2025-02-04 16:01:15|2025-03-04 09

                                                                                

In [26]:
events_df.show()

+--------+--------------------+------------+-------------------+
|event_id|             user_id|  event_type|    event_timestamp|
+--------+--------------------+------------+-------------------+
|  699392|3e488210-b3ff-4db...|       login|2025-03-01 18:13:15|
|  699393|89ae8a49-c018-416...|       login|2025-03-03 05:59:15|
|  699394|0ca93e6d-e4e0-4ef...|    purchase|2025-02-19 14:23:15|
|  699395|1f61028a-1efb-4f1...|       login|2025-02-28 07:33:15|
|  699396|f3692a13-0579-48c...|    purchase|2025-02-17 08:25:15|
|  699397|72862a2f-56eb-4cd...| add_to_cart|2025-02-03 19:54:15|
|  699398|7f83ed0b-86ba-4da...|       login|2025-02-21 18:24:15|
|  699399|772b363d-b24f-4e8...|    purchase|2025-03-02 14:22:15|
|  699400|60a5127b-6f44-4d2...|    purchase|2025-02-19 14:08:15|
|  699401|e3fa8593-0234-403...| add_to_cart|2025-02-05 03:46:15|
|  699402|ae54a7a0-72d3-471...| add_to_cart|2025-02-25 00:46:15|
|  699403|c298b3af-1dcd-49c...|       login|2025-02-13 07:21:15|
|  699404|ecdb66d2-d801-4

In [27]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy("user_id").orderBy("event_timestamp")
login_events = events_df.filter(F.col("event_type") == "login") \
    .withColumn("login_row", F.row_number().over(window_spec)) \
    .select("user_id", "event_timestamp", "login_row") \
    .withColumnRenamed("event_timestamp", "session_start")
purchase_events = events_df.filter(F.col("event_type") == "purchase") \
    .withColumn("purchase_row", F.row_number().over(window_spec)) \
    .select("user_id", "event_timestamp", "purchase_row") \
    .withColumnRenamed("event_timestamp", "session_end")
sessions = login_events.join(purchase_events, (login_events.user_id == purchase_events.user_id) & (login_events.login_row == purchase_events.purchase_row)) \
    .select(login_events.user_id, "session_start", "session_end")
sessions = sessions.withColumn("session_duration", F.unix_timestamp("session_end") - F.unix_timestamp("session_start"))
avg_session_duration = sessions.groupBy("user_id").agg(F.avg("session_duration").alias("avg_session_duration"))
avg_session_duration.show()

[Stage 60:>                                                       (0 + 10) / 10]

+--------------------+--------------------+
|             user_id|avg_session_duration|
+--------------------+--------------------+
|0323786f-3d2c-4bc...|           229696.25|
|0795b3f7-49d7-483...|            670667.6|
|07e16e4d-0899-4ad...|           -252661.0|
|0af6011a-be4f-478...|           -722938.0|
|0c71c1d6-8c1b-4da...|            865559.0|
|0eddc998-ce13-473...|           -905188.5|
|0eebfa40-fed5-46a...|  -431930.6666666667|
|10a7996c-bb4d-415...|           1057056.4|
|13bfdf0b-c331-483...|   280499.3333333333|
|1c0b29f9-d2fc-454...|            302960.5|
|2094d18e-c043-447...|         -1741231.25|
|235e813d-e624-467...|            810394.4|
|279f2542-dc74-4ba...|           -310511.6|
|2c09135d-4194-4a2...|            592378.0|
|2c2647f5-bfa0-4d6...|          -1448489.0|
|31672510-270c-459...|            146743.0|
|32bb55f1-7840-4d7...|           -294600.2|
|366ec8f2-5d6f-44b...|  164369.33333333334|
|3936f642-4e91-4e1...|           -173460.5|
|3aed6674-a9ad-4e0...| -20443.71

                                                                                

Мы занумеровали события входа и выхода и потом считали разницу с i заходом и i выходом.

## **Задача 5: Анализ трафика на сайте (1.5б)**

Есть лог-файл с посещениями веб-сайта, в котором записано:
- `session_id` – ID сессии.
- `user_id` – ID пользователя.
- `ip_address` – IP-адрес посетителя.
- `page_url` – Посещённая страница.
- `timestamp` – Время визита.

### **Задание**
1. Найдите пиковые часы активности (сгруппируйте по часу) (0.5 б).
2. Определите ботов (пользователей, сделавших >100 запросов за сутки) (0.5 б).
3. Определите топ-5 стран по количеству посещений (с помощью IP-адресов) (0.5 б).


In [30]:
pip install geoip2

[0mNote: you may need to restart the kernel to use updated packages.


In [31]:
! curl -L -o "GeoLite2-City.mmdb" "https://git.io/GeoLite2-City.mmdb"

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:--  0:00:02 --:--:--     0
  0     0    0     0    0     0      0      0 --:--:--  0:00:03 --:--:--     0
  0     0    0     0    0     0      0      0 --:--:--  0:00:03 --:--:--     0
100 56.1M  100 56.1M    0     0  2671k      0  0:00:21  0:00:21 --:--:-- 4059k 0:00:18  0:00:05 4215k


In [32]:
from faker import Faker
import random
import geoip2.database

geo_reader = geoip2.database.Reader("GeoLite2-City.mmdb")

num_rows = 200000
sessions = [fake.uuid4() for _ in range(20000)]
users = [fake.uuid4() for _ in range(100000)]
ip_addresses = [fake.ipv4() for _ in range(5000)]
pages = ["/home", "/products", "/checkout", "/contact", "/about"]

data = []
for i in range(num_rows):
    ip = random.choice(ip_addresses)
    try:
        country = geo_reader.city(ip).country.name
    except:
        country = "Unknown"

    data.append((
        random.choice(sessions),
        random.choice(users),
        ip,
        random.choice(pages),
        (datetime.today() - timedelta(hours=random.randint(0, 23), minutes=random.randint(0, 59))).strftime("%Y-%m-%d %H:%M:%S"),
        country
    ))

columns = ["session_id", "user_id", "ip_address", "page_url", "timestamp", "country"]
traffic_df = spark.createDataFrame(data, columns)
traffic_df.write.mode("overwrite").parquet("web_traffic.parquet")

25/03/05 22:56:15 WARN TaskSetManager: Stage 67 contains a task of very large size (2513 KiB). The maximum recommended task size is 1000 KiB.
25/03/05 22:56:15 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
25/03/05 22:56:15 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 84,44% for 9 writers
25/03/05 22:56:15 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 76,00% for 10 writers
25/03/05 22:56:16 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 84,44% for 9 writers
25/03/05 22:56:16 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
                                                                                

In [33]:
from pyspark.sql import functions as F

traffic_df = spark.read.parquet("web_traffic.parquet")
traffic_df = traffic_df.withColumn("timestamp", F.to_timestamp("timestamp"))
traffic_df = traffic_df.withColumn("hour", F.hour("timestamp"))
data = traffic_df.groupBy("hour").agg(F.count("session_id").alias("visit_count"))
result = data.orderBy(F.desc("visit_count"))
result.show()

+----+-----------+
|hour|visit_count|
+----+-----------+
|  10|       8562|
|  13|       8479|
|  16|       8460|
|   6|       8456|
|   9|       8417|
|  19|       8396|
|   7|       8382|
|  20|       8375|
|  18|       8372|
|  12|       8361|
|  17|       8353|
|   0|       8342|
|  14|       8339|
|   3|       8326|
|  23|       8315|
|   2|       8290|
|   5|       8285|
|  21|       8281|
|   4|       8272|
|  11|       8254|
+----+-----------+
only showing top 20 rows



In [34]:
traffic_df = traffic_df.withColumn("date", F.to_date("timestamp"))
data = traffic_df.groupBy("user_id", "date").agg(F.count("session_id").alias("request_count"))
result = data.filter(F.col("request_count") > 100)
result.show() # нету(



+-------+----+-------------+
|user_id|date|request_count|
+-------+----+-------------+
+-------+----+-------------+



                                                                                

In [40]:
data = traffic_df.groupBy("country").agg(F.count("session_id").alias("visit_count"))
result = data.orderBy(F.desc("visit_count")).limit(5)
result.show()

+-------------+-----------+
|      country|visit_count|
+-------------+-----------+
|United States|      83161|
|        China|      18017|
|        Japan|       9516|
|  South Korea|       7368|
|      Germany|       7305|
+-------------+-----------+

