<a href="https://colab.research.google.com/github/bryancev/Data_Engineer_traineeship/blob/main/dataframe_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Задание №1: Анализ активности онлайн-пользователей

## 1.1 Создание и проверка структуры DataFrame

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from datetime import date

# Инициализация SparkSession
spark = SparkSession.builder \
        .appName("user_activity_project") \
        .getOrCreate()

schema = StructType((
    StructField("user_id", IntegerType(), True),
    StructField("activity_date", DateType(), True),
    StructField("sessions_by_device", MapType(StringType(), IntegerType()), True),
    StructField("visited_pages", ArrayType(StringType()), True),
    StructField("usability_rating", DoubleType(), True)
))

data = [
    (101, date(2025, 1, 1), {"mobile": 3, "desktop": 1}, ["/home", "/products", "/cart"], 4.5),
    (102, date(2025, 1, 1), {"desktop": 2}, ["/home", "/about"], 3.0),
    (101, date(2025, 1, 2), {"mobile": 2}, ["/products", "/checkout"], None),
    (103, date(2025, 1, 2), {"tablet": 1, "mobile": 1}, ["/blog", "/contact"], 5.0),
    (104, date(2025, 1, 3), {"desktop": 4}, ["/dashboard"], 3.5),
    (101, date(2025, 1, 3), {"mobile": 1, "desktop": 1}, ["/home", "/products"], 4.0),
    (105, date(2025, 1, 4), {"mobile": 5}, ["/faq"], None),
    (102, date(2025, 1, 4), {"desktop": 1, "mobile": 1}, ["/settings"], 3.8),
    (103, date(2025, 1, 5), {"tablet": 2}, ["/products"], 4.2),
    (106, date(2025, 1, 5), {"desktop": 3, "mobile": 2}, ["/login", "/profile", "/home"], 4.7),
    (101, date(2025, 1, 6), {"mobile": 1}, ["/cart", "/checkout"], 4.0),
    (104, date(2025, 1, 6), {"desktop": 2, "tablet": 1}, ["/contact"], None),
    (105, date(2025, 1, 7), {"mobile": 3, "desktop": 1}, ["/pricing"], 4.1),
    (106, date(2025, 1, 7), {"desktop": 1}, ["/home", "/about"], 3.9),
    (107, date(2025, 1, 8), {"mobile": 4, "tablet": 2}, ["/products", "/blog"], 4.9)
]

df_user_activity = spark.createDataFrame(data, schema=schema)

# Схема DataFrame
df_user_activity.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- activity_date: date (nullable = true)
 |-- sessions_by_device: map (nullable = true)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = true)
 |-- visited_pages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- usability_rating: double (nullable = true)



## 1.2 Расчет построчных показателей

In [None]:
# Сумма количества сессий по всем устройствам (mobile, desktop, tablet)

df_user_activity = df_user_activity.withColumn(
                                "total_sessions_count",
                                F.coalesce(F.col("sessions_by_device")["mobile"], F.lit(0)) +
                                F.coalesce(F.col("sessions_by_device")["desktop"], F.lit(0)) +
                                F.coalesce(F.col("sessions_by_device")["tablet"], F.lit(0))
                            )
df_user_activity.show(5, truncate=False)

+-------+-------------+---------------------------+-------------------------+----------------+--------------------+
|user_id|activity_date|sessions_by_device         |visited_pages            |usability_rating|total_sessions_count|
+-------+-------------+---------------------------+-------------------------+----------------+--------------------+
|101    |2025-01-01   |{mobile -> 3, desktop -> 1}|[/home, /products, /cart]|4.5             |4                   |
|102    |2025-01-01   |{desktop -> 2}             |[/home, /about]          |3.0             |2                   |
|101    |2025-01-02   |{mobile -> 2}              |[/products, /checkout]   |NULL            |2                   |
|103    |2025-01-02   |{mobile -> 1, tablet -> 1} |[/blog, /contact]        |5.0             |2                   |
|104    |2025-01-03   |{desktop -> 4}             |[/dashboard]             |3.5             |4                   |
+-------+-------------+---------------------------+---------------------

In [None]:
# Количество мобильных сессий

df_user_activity = df_user_activity.withColumn(
                                "mobile_sessions",
                                F.when(F.col("sessions_by_device")["mobile"].isNull(), 0)
                                .otherwise(F.col("sessions_by_device")["mobile"])
                            )
df_user_activity.show(5, truncate=False)

+-------+-------------+---------------------------+-------------------------+----------------+--------------------+---------------+
|user_id|activity_date|sessions_by_device         |visited_pages            |usability_rating|total_sessions_count|mobile_sessions|
+-------+-------------+---------------------------+-------------------------+----------------+--------------------+---------------+
|101    |2025-01-01   |{mobile -> 3, desktop -> 1}|[/home, /products, /cart]|4.5             |4                   |3              |
|102    |2025-01-01   |{desktop -> 2}             |[/home, /about]          |3.0             |2                   |0              |
|101    |2025-01-02   |{mobile -> 2}              |[/products, /checkout]   |NULL            |2                   |2              |
|103    |2025-01-02   |{mobile -> 1, tablet -> 1} |[/blog, /contact]        |5.0             |2                   |1              |
|104    |2025-01-03   |{desktop -> 4}             |[/dashboard]             

## 1.3 Агрегация данных

In [None]:
# Количество сессий по пользователям

user_total_sessions = (df_user_activity
                        .select("user_id", "total_sessions_count")
                        .groupBy("user_id")
                        .agg(F.sum("total_sessions_count").alias("total_sessions_all")) \
                        .orderBy(F.col("total_sessions_all").desc())
)

# Вывод результата
user_total_sessions.show(5, truncate=False)

+-------+------------------+
|user_id|total_sessions_all|
+-------+------------------+
|101    |9                 |
|105    |9                 |
|104    |7                 |
|107    |6                 |
|106    |6                 |
+-------+------------------+
only showing top 5 rows



In [None]:
# Количество уникальных посещенных страниц по пользователям

user_unique_pages = (df_user_activity
          .select("user_id", F.explode_outer("visited_pages").alias("page")) # explode_outer - развертывание массива в отдельные строки
          .groupBy("user_id")
          .agg(F.countDistinct("page").alias("unique_pages_count"))
          .orderBy("user_id")
)

# Вывод результата
user_unique_pages.show(5, truncate=False)

+-------+------------------+
|user_id|unique_pages_count|
+-------+------------------+
|101    |4                 |
|102    |3                 |
|103    |3                 |
|104    |2                 |
|105    |2                 |
+-------+------------------+
only showing top 5 rows



## 1.4 Фильтрация данных

In [None]:
# Фильтрация где usability_rating выше 3.5.

filtered_rating = df_user_activity.filter(
        (F.col("usability_rating") > 3.5) &
        (F.col("usability_rating").isNotNull())
    ).orderBy(
        F.col("usability_rating").desc(),
        F.col("user_id")
    )

# Вывод результата
filtered_rating.select(
        "user_id",
        "activity_date",
        "usability_rating",
        "visited_pages"
).show(5, truncate=False)

+-------+-------------+----------------+-------------------------+
|user_id|activity_date|usability_rating|visited_pages            |
+-------+-------------+----------------+-------------------------+
|103    |2025-01-02   |5.0             |[/blog, /contact]        |
|107    |2025-01-08   |4.9             |[/products, /blog]       |
|106    |2025-01-05   |4.7             |[/login, /profile, /home]|
|101    |2025-01-01   |4.5             |[/home, /products, /cart]|
|103    |2025-01-05   |4.2             |[/products]              |
+-------+-------------+----------------+-------------------------+
only showing top 5 rows



In [None]:
# Остановка SparkSession
spark.stop()

# Задание №2: Анализ финансовых данных

In [None]:
import requests
import re

# Получение "прямой" ссылки
public_url = "https://disk.yandex.ru/d/hLm5fkIc2yeXYw"
api_response = requests.get(f"https://cloud-api.yandex.net/v1/disk/public/resources/download?public_key={public_url}")
api_response.raise_for_status()

file_response = requests.get(api_response.json()['href'])
file_response.raise_for_status()

# Сохранение в файл
with open("autovaz.csv", "wb") as f:
    f.write(file_response.content)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window

spark = SparkSession.builder \
         .appName("project_avtovaz") \
         .getOrCreate()


df = spark.read.option("header", True).option("sep", ",").csv("autovaz.csv")
df.show(5, truncate=False)
df.printSchema()

+----+----------------------------------+-------------------------------+-----------------------------------+-----------------------------+---------------------------------+---------------------------------------+--------------------+------------------------------------------------------+-------------------------------------------+------------------------------------------------------+-------------------+----+
|Год |Реализация (внутренний рынок: тыс)|Реализация (внешний рынок: тыс)|Себестоимость реализации (млн. руб)|Валовая прибыль от реализации|Общая сумма дивидендов (млн. руб)|Чистая прибыль отчётного года(млн. руб)|Всего текущие активы|Потоки денежных средств от инвестиционной деятельности|Потоки денежных средств от фин деятельности|Чистые денежные средства  от операционной деятельности| Прибыль до налогов| Тип|
+----+----------------------------------+-------------------------------+-----------------------------------+-----------------------------+---------------------------------

## 2.1 Предварительная подготовка данных

In [None]:
# стандартизация названий столбцов

def clean_column_name(name):
    # удаление спецсимволов
    name = re.sub(r'[^\w]', '_', name.strip().lower())
    # замена множественных '_' одним
    name = re.sub(r'_+', '_', name)
    # удаление '_' в конце и начале
    return name.rstrip('_')

new_columns = [clean_column_name(col) for col in df.columns]
df = df.toDF(*new_columns)
df.show(5, truncate=False)

+----+-------------------------------+----------------------------+--------------------------------+-----------------------------+------------------------------+-------------------------------------+--------------------+------------------------------------------------------+-------------------------------------------+-----------------------------------------------------+------------------+----+
|год |реализация_внутренний_рынок_тыс|реализация_внешний_рынок_тыс|себестоимость_реализации_млн_руб|валовая_прибыль_от_реализации|общая_сумма_дивидендов_млн_руб|чистая_прибыль_отчётного_года_млн_руб|всего_текущие_активы|потоки_денежных_средств_от_инвестиционной_деятельности|потоки_денежных_средств_от_фин_деятельности|чистые_денежные_средства_от_операционной_деятельности|прибыль_до_налогов|тип |
+----+-------------------------------+----------------------------+--------------------------------+-----------------------------+------------------------------+-------------------------------------+-----

In [None]:
# Приведение числовых колонок к соответствующим типам

columns_to_cast_types = {
    "год": IntegerType(),
    "реализация_внутренний_рынок_тыс": DoubleType(),
    "реализация_внешний_рынок_тыс": DoubleType(),
    "себестоимость_реализации_млн_руб": DoubleType(),
    "валовая_прибыль_от_реализации": DoubleType(),
    "общая_сумма_дивидендов_млн_руб": DoubleType(),
    "чистая_прибыль_отчётного_года_млн_руб": DoubleType(),
    "всего_текущие_активы": DoubleType(),
    "потоки_денежных_средств_от_инвестиционной_деятельности": DoubleType(),
    "потоки_денежных_средств_от_фин_деятельности": DoubleType(),
    "чистые_денежные_средства_от_операционной_деятельности": DoubleType(),
    "прибыль_до_налогов": DoubleType(),
    "тип": StringType()
}

for col_name, col_type in columns_to_cast_types.items():
    df = df.withColumn(
        col_name,
        F.col(col_name).cast(col_type)
)

df.show(5, truncate=False)

+----+-------------------------------+----------------------------+--------------------------------+-----------------------------+------------------------------+-------------------------------------+--------------------+------------------------------------------------------+-------------------------------------------+-----------------------------------------------------+------------------+----+
|год |реализация_внутренний_рынок_тыс|реализация_внешний_рынок_тыс|себестоимость_реализации_млн_руб|валовая_прибыль_от_реализации|общая_сумма_дивидендов_млн_руб|чистая_прибыль_отчётного_года_млн_руб|всего_текущие_активы|потоки_денежных_средств_от_инвестиционной_деятельности|потоки_денежных_средств_от_фин_деятельности|чистые_денежные_средства_от_операционной_деятельности|прибыль_до_налогов|тип |
+----+-------------------------------+----------------------------+--------------------------------+-----------------------------+------------------------------+-------------------------------------+-----

## 2.2 Анализ и ранжирование прибыли


*   Чистая прибыль за предыдущий год для каждой записи
*   Абсолютное изменение чистой прибыли по сравнению с предыдущим годом
*   Ранжирование годов по величине абсолютного изменения чистой прибыли


In [None]:
# Чистая прибыли за предыдущий год

df = df.withColumn(
    "чистая_прибыль_предыдущего_года",
    F.lag(F.col("чистая_прибыль_отчётного_года_млн_руб"), 1).over(Window.orderBy("год"))
)

In [None]:
# Изменения чистой прибыли

df = df.withColumn(
    "изменение_прибыли",
    F.col("чистая_прибыль_отчётного_года_млн_руб") - F.col("чистая_прибыль_предыдущего_года")
)

In [None]:
# Ранжирование годов по абсолютному росту чистой прибыли

window_rank = Window.orderBy(F.desc("изменение_прибыли"))

df = df.withColumn(
    "ранг_роста_прибыли",
    F.rank().over(window_rank)
)

In [None]:
# просмотр данных

df.select("год",
          "чистая_прибыль_отчётного_года_млн_руб",
          "чистая_прибыль_предыдущего_года",
          "изменение_прибыли",
          "ранг_роста_прибыли") \
    .orderBy("ранг_роста_прибыли") \
    .show(25, truncate=False)

+----+-------------------------------------+-------------------------------+-----------------+------------------+
|год |чистая_прибыль_отчётного_года_млн_руб|чистая_прибыль_предыдущего_года|изменение_прибыли|ранг_роста_прибыли|
+----+-------------------------------------+-------------------------------+-----------------+------------------+
|2010|1500.0                               |-38500.0                       |40000.0          |1                 |
|2017|-12384.0                             |-35467.0                       |23083.0          |2                 |
|2018|5860.0                               |-12384.0                       |18244.0          |3                 |
|2001|19056.0                              |4554.0                         |14502.0          |4                 |
|2016|-35467.0                             |-43233.0                       |7766.0           |5                 |
|2000|4554.0                               |1021.0                         |3533.0      

## 2.3 Категоризация и агрегация


*   "Высокоприбыльный год": если чистая прибыль > 5000 млн руб.
*   "Среднеприбыльный год": если чистая прибыль от 1000 до 5000 млн руб.
*   "Низкоприбыльный год": если чистая прибыль от 0 до 1000 млн руб.
*   "Убыточный год": если чистая прибыль < 0
*   "Нулевая прибыль": если чистая прибыль = 0


In [None]:
# Создание категории

df = df.withColumn(
    "категория_прибыли",
    F.when(F.col("чистая_прибыль_отчётного_года_млн_руб") > 5000, "Высокоприбыльный")
     .when((F.col("чистая_прибыль_отчётного_года_млн_руб") > 1000) &
           (F.col("чистая_прибыль_отчётного_года_млн_руб") <= 5000), "Среднеприбыльный")
     .when((F.col("чистая_прибыль_отчётного_года_млн_руб") > 0) &
           (F.col("чистая_прибыль_отчётного_года_млн_руб") <= 1000), "Низкоприбыльный")
     .when(F.col("чистая_прибыль_отчётного_года_млн_руб") < 0, "Убыточный")
     .otherwise("Нулевая прибыль")
)

df.show(5, truncate=False)

+----+-------------------------------+----------------------------+--------------------------------+-----------------------------+------------------------------+-------------------------------------+--------------------+------------------------------------------------------+-------------------------------------------+-----------------------------------------------------+------------------+----+-------------------------------+-----------------+------------------+-----------------+
|год |реализация_внутренний_рынок_тыс|реализация_внешний_рынок_тыс|себестоимость_реализации_млн_руб|валовая_прибыль_от_реализации|общая_сумма_дивидендов_млн_руб|чистая_прибыль_отчётного_года_млн_руб|всего_текущие_активы|потоки_денежных_средств_от_инвестиционной_деятельности|потоки_денежных_средств_от_фин_деятельности|чистые_денежные_средства_от_операционной_деятельности|прибыль_до_налогов|тип |чистая_прибыль_предыдущего_года|изменение_прибыли|ранг_роста_прибыли|категория_прибыли|
+----+------------------------

In [None]:
# Группировка

profit_category = df.groupBy(F.col("категория_прибыли")).agg(
                  F.count("год").alias("количество_лет"),
                  F.round(F.avg(
                      F.col("реализация_внутренний_рынок_тыс") + F.col("реализация_внешний_рынок_тыс")
                      ),
                      2
                  ).alias("средняя_общая_реализация_тыс_шт"),
                  F.round(F.avg("чистая_прибыль_отчётного_года_млн_руб"), 2).alias("средняя_прибыль_млн_руб"),
                  F.min("год").alias("первый_год"),
                  F.max("год").alias("последний_год")
              ).orderBy(F.col("количество_лет").desc())

profit_category.show(5, truncate=False)

+-----------------+--------------+-------------------------------+-----------------------+----------+-------------+
|категория_прибыли|количество_лет|средняя_общая_реализация_тыс_шт|средняя_прибыль_млн_руб|первый_год|последний_год|
+-----------------+--------------+-------------------------------+-----------------------+----------+-------------+
|Среднеприбыльный |11            |662.48                         |2563.18                |1999      |2021         |
|Убыточный        |8             |412.63                         |-21101.5               |2008      |2019         |
|Низкоприбыльный  |3             |394.17                         |386.33                 |2012      |2022         |
|Высокоприбыльный |2             |582.0                          |12458.0                |2001      |2018         |
+-----------------+--------------+-------------------------------+-----------------------+----------+-------------+



In [None]:
# Остановка SparkSession
spark.stop()

# Задание №3: Анализ данных "Звездные Войны"

In [None]:
import requests, zipfile

# Direct ZIP download and extract
public_yandex_disk_link = "https://disk.yandex.ru/d/F0a1ojppfdskQg"
api_url = f"https://cloud-api.yandex.net/v1/disk/public/resources/download?public_key={public_yandex_disk_link}"
download_url = requests.get(api_url).json()["href"]
content = requests.get(download_url).content

with open("temp.zip", "wb") as f:
    f.write(content)
with zipfile.ZipFile("temp.zip", "r") as z:
    z.extractall(".")

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
spark = SparkSession.builder \
        .appName("star_wars_project") \
        .getOrCreate()

characters = spark.read.parquet('parquet_files/characters.parquet')
species = spark.read.parquet('parquet_files/species.parquet')
organizations = spark.read.parquet('parquet_files/organizations.parquet')

species.show(5, truncate=False)
characters.show(5, truncate=False)
organizations.show(5, truncate=False)

+---+--------------+--------------+-----------+--------------+------------------------+-----------+----------+----------------+--------------+---------+
|id |name          |classification|designation|average_height|skin_colors             |hair_colors|eye_colors|average_lifespan|language      |homeworld|
+---+--------------+--------------+-----------+--------------+------------------------+-----------+----------+----------------+--------------+---------+
|1  |Human         |Mammal        |Sentient   |1.8           |Light, Dark             |Various    |Various   |79.0            |Galactic Basic|Various  |
|2  |Yoda's species|Unknown       |Sentient   |0.66          |Green                   |White      |Brown     |900.0           |Galactic Basic|Unknown  |
|3  |Wookiee       |Mammal        |Sentient   |2.28          |Brown                   |Brown      |Blue      |400.0           |Shyriiwook    |Kashyyyk |
|4  |Gungan        |Amphibian     |Sentient   |1.96          |Orange              

## 3.1 Предварительная подготовка данных

In [None]:
# Удаляем дубликаты

characters = characters.dropDuplicates(subset=["name", "species", "homeworld", "year_born"])
species = species.dropDuplicates(subset=["name","classification", "designation", "average_height"])

## 3.2 Распространенность видов и их классификаций

In [None]:
# количество персонажей для каждого вида (characters.species)

(characters.groupBy(F.col("species"))
    .agg(F.count("*").alias("character_count"))
    .orderBy(F.col("character_count").desc())
    .limit(10).show()
)

+------------+---------------+
|     species|character_count|
+------------+---------------+
|       Human|             47|
|       Droid|              4|
|     Twi'lek|              3|
| Dathomirian|              3|
|     Togruta|              2|
|     Unknown|              2|
|Mon Calamari|              2|
|    Clawdite|              1|
|       Chiss|              1|
|    Besalisk|              1|
+------------+---------------+



In [None]:
# Распространенность классификаций

(species.join(characters, species.name == characters.species, "left")
    .groupBy(species.classification)
    .agg(F.count("*").alias("total_character_count"))
    .orderBy(F.col("total_character_count").desc())
    .show()
)

+--------------+---------------------+
|classification|total_character_count|
+--------------+---------------------+
|        Mammal|                   73|
|     Amphibian|                    6|
|     Reptilian|                    5|
|    Artificial|                    4|
|        Hybrid|                    3|
|       Unknown|                    1|
|     Insectoid|                    1|
|     Gastropod|                    1|
+--------------+---------------------+



In [None]:
# Средний рост по классификации видов

(species.groupBy("classification")
    .agg(F.round(F.avg("average_height"), 1).alias("average_height_class"))
    .orderBy(F.col("average_height_class").desc())
    .show()
)

+--------------+--------------------+
|classification|average_height_class|
+--------------+--------------------+
|     Gastropod|                 3.9|
|     Amphibian|                 1.9|
|     Insectoid|                 1.8|
|     Reptilian|                 1.8|
|        Mammal|                 1.7|
|        Hybrid|                 1.7|
|       Unknown|                 0.7|
|    Artificial|                NULL|
+--------------+--------------------+



## 3.3 Члены Ордена Джедаев и Ситхов

In [None]:
# Члены Ордена Джедаев и Ситхов
# Преобразуем тип колонки
organizations = organizations.withColumn(
    "leader_array",
    F.split(F.col("leader"), ", ").cast(ArrayType(StringType()))
)
organizations = organizations.withColumn(
    "members_array",
    F.split(F.col("members"), ", ").cast(ArrayType(StringType()))
)

members = (organizations
           .select("name", F.explode_outer("leader_array").alias("name_member"))
           .filter(F.col("name").isin("Jedi Order", "Sith Order"))
    .union
           (organizations
            .select("name", F.explode_outer("members_array").alias("name_member"))
            .filter(F.col("name").isin("Jedi Order", "Sith Order")))
            )

members.orderBy("name").show()

+----------+----------------+
|      name|     name_member|
+----------+----------------+
|Jedi Order|            Yoda|
|Jedi Order|  Obi-Wan Kenobi|
|Jedi Order|      Mace Windu|
|Jedi Order|Anakin Skywalker|
|Jedi Order|  Luke Skywalker|
|Sith Order|   Darth Sidious|
|Sith Order|     Darth Vader|
|Sith Order|      Darth Maul|
|Sith Order|   Darth Tyranus|
+----------+----------------+



## 3.4 Топ-5

In [None]:
# Топ-5 старейших персонажей

(characters.select("name", "year_born")
    .dropna(subset=["year_born"])
    .orderBy(F.col("year_born"))
    .show(5, truncate=False)
)

+-----------+---------+
|name       |year_born|
+-----------+---------+
|General Hux|0.0      |
|Poe Dameron|2.0      |
|Kylo Ren   |5.0      |
|Rose Tico  |11.0     |
|Finn       |11.0     |
+-----------+---------+
only showing top 5 rows



In [None]:
# Топ-5 самых юных персонажей
(characters.select("name", "year_born")
    .dropna(subset=["year_born"])
    .orderBy(F.col("year_born").desc())
    .show(5, truncate=False)
)

+--------------+---------+
|name          |year_born|
+--------------+---------+
|Maz Kanata    |973.0    |
|Yoda          |896.0    |
|Jabba the Hutt|600.0    |
|Chewbacca     |200.0    |
|C-3PO         |112.0    |
+--------------+---------+
only showing top 5 rows



## 3.5 Персонажи с экстремальным индексом массы тела

In [None]:
#  Раcсчитайте ИМТ: weight / height**2. Исключите NULL из вывода.

bmi_characters = characters.withColumn("bmi", F.round(F.col("weight") / (F.col("height"))**2, 2))

(bmi_characters
    .dropna(subset=["height","weight"])
    .orderBy(F.col("bmi").desc())
    .select("name", "height", "weight", "bmi")
    .show(5, truncate=False)
)

+--------------+------+------+-----+
|name          |height|weight|bmi  |
+--------------+------+------+-----+
|Jabba the Hutt|3.9   |1358.0|89.28|
|Pong Krell    |2.5   |300.0 |48.0 |
|Yoda          |0.66  |17.0  |39.03|
|Darth Vader   |2.02  |136.0 |33.33|
|Sebulba       |1.12  |40.0  |31.89|
+--------------+------+------+-----+
only showing top 5 rows



In [None]:
# Остановка SparkSession
spark.stop()

# Задание №4: Анализ пользовательских сессий

In [None]:
import requests

# Получение "прямой" ссылки
public_url = "https://disk.yandex.ru/d/ICkCXc8Jy0-wag"
api_response = requests.get(f"https://cloud-api.yandex.net/v1/disk/public/resources/download?public_key={public_url}")
api_response.raise_for_status()

file_response = requests.get(api_response.json()['href'])
file_response.raise_for_status()

# Сохранение в файл
with open("activity_log.avro", "wb") as f:
    f.write(file_response.content)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# 1. Инициализируем SparkSession
# Убедитесь, что у вас установлен пакет spark-avro

spark = SparkSession.builder \
    .appName("netflix_project") \
    .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.5.0") \
    .getOrCreate()

In [None]:
# Загрузка данных из файла Avro
df = spark.read.format("avro").load("activity_log.avro")

# Выведите схему и первые 10 строк
df.printSchema()
df.show(10, truncate=False)

root
 |-- event_id: string (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- session_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- page_url: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- amount: double (nullable = true)

+--------+-------+----------+------------+-----------------+--------------------+----------+------+
|event_id|user_id|session_id|event_type  |timestamp        |page_url            |product_id|amount|
+--------+-------+----------+------------+-----------------+--------------------+----------+------+
|E001    |101    |S101-A    |login       |01-Jan-2025 10:00|/login              |NULL      |NULL  |
|E002    |101    |S101-A    |view_page   |01-Jan-2025 10:05|/products/category_A|NULL      |NULL  |
|E003    |101    |S101-A    |view_product|01-Jan-2025 10:08|/product/P005       |P005      |NULL  |
|E004    |101    |S101-A    |add_to_cart |01-Jan-2025 10:10|/cart/add

## 4.1 Предварительная подготовка данных

In [None]:
# Преобразование колонки timestamp в TimestampType

df = df.withColumn("event_time", F.to_timestamp("timestamp", "dd-MMM-yyyy HH:mm"))
df.show(5, truncate=False)

+--------+-------+----------+------------+-----------------+--------------------+----------+------+-------------------+
|event_id|user_id|session_id|event_type  |timestamp        |page_url            |product_id|amount|event_time         |
+--------+-------+----------+------------+-----------------+--------------------+----------+------+-------------------+
|E001    |101    |S101-A    |login       |01-Jan-2025 10:00|/login              |NULL      |NULL  |2025-01-01 10:00:00|
|E002    |101    |S101-A    |view_page   |01-Jan-2025 10:05|/products/category_A|NULL      |NULL  |2025-01-01 10:05:00|
|E003    |101    |S101-A    |view_product|01-Jan-2025 10:08|/product/P005       |P005      |NULL  |2025-01-01 10:08:00|
|E004    |101    |S101-A    |add_to_cart |01-Jan-2025 10:10|/cart/add           |P005      |NULL  |2025-01-01 10:10:00|
|E005    |101    |S101-A    |purchase    |01-Jan-2025 10:15|/checkout           |P005      |120.5 |2025-01-01 10:15:00|
+--------+-------+----------+-----------

## 4.2 Группировка

In [None]:
# Количество событий для каждого дня

(df.groupBy(F.col("event_time").cast("date").alias("date"))
        .agg(F.count("event_id").alias("total_events"))
        .orderBy("date").show()
)

+----------+------------+
|      date|total_events|
+----------+------------+
|2025-01-01|           6|
|2025-01-02|           3|
|2025-01-03|           3|
|2025-01-04|           2|
|2025-01-05|           6|
|2025-01-06|           5|
|2025-01-07|           5|
|2025-01-08|           4|
|2025-01-09|           6|
+----------+------------+



In [None]:
# Уникальные пользователи

df.select(F.col("user_id")).distinct().show()

+-------+
|user_id|
+-------+
|    101|
|    103|
|    107|
|    102|
|    105|
|    106|
|    104|
+-------+



In [None]:
# Количество уникальных сессий

# df.select(F.col("session_id")).distinct().count()

df.select(F.col("session_id")).agg(F.countDistinct("session_id").alias("unique_sessions_count")).show()

+---------------------+
|unique_sessions_count|
+---------------------+
|                    9|
+---------------------+



In [None]:
# Количество уникальных сессий на пользователя

(df.groupBy("user_id").agg(
    F.countDistinct("session_id").alias("unique_sessions_count")
    ).orderBy("user_id").show())

+-------+---------------------+
|user_id|unique_sessions_count|
+-------+---------------------+
|    101|                    2|
|    102|                    2|
|    103|                    1|
|    104|                    1|
|    105|                    1|
|    106|                    1|
|    107|                    1|
+-------+---------------------+



In [None]:
# Количество событий в каждой сессии

(df.groupBy("user_id", "session_id")
    .agg(F.count("*").alias("count_sessions"))
    .orderBy("count_sessions")
    .show()
)

+-------+----------+--------------+
|user_id|session_id|count_sessions|
+-------+----------+--------------+
|    104|    S104-D|             2|
|    102|    S102-B|             3|
|    103|    S103-C|             3|
|    106|    S106-F|             4|
|    102|    S102-C|             5|
|    101|    S101-B|             5|
|    101|    S101-A|             6|
|    105|    S105-E|             6|
|    107|    S107-G|             6|
+-------+----------+--------------+



In [None]:
# Количество покупок

(df.filter(F.col("event_type") == "purchase")
    .agg(F.count(F.col("amount")).alias("count_purchase_amount"))
    .show()
)

+---------------------+
|count_purchase_amount|
+---------------------+
|                    4|
+---------------------+



In [None]:
# Общая сумма всех покупок

(df.filter(F.col("event_type") == "purchase")
    .agg(F.sum(F.col("amount")).alias("sum_purchase_amount"))
    .show()
)

+-------------------+
|sum_purchase_amount|
+-------------------+
|              905.5|
+-------------------+



In [None]:
# Средняя сумма всех покупок

(df.filter(F.col("event_type") == "purchase")
    .agg(F.avg(F.col("amount")).alias("avg_purchase_amount"))
    .show()
)

+-------------------+
|avg_purchase_amount|
+-------------------+
|            226.375|
+-------------------+



In [None]:
# Продолжительность каждой сессии

df_session_durations = df.groupBy(F.col("user_id"), F.col("session_id")).agg(
    F.min("event_time").alias("min_time"),
    F.max("event_time").alias("max_time")
).withColumn(
    "session_duration_seconds",
    (F.unix_timestamp(F.col("max_time")) - F.unix_timestamp(F.col("min_time")))
)

df_session_durations.orderBy("user_id", "session_id").show(truncate=False)

+-------+----------+-------------------+-------------------+------------------------+
|user_id|session_id|min_time           |max_time           |session_duration_seconds|
+-------+----------+-------------------+-------------------+------------------------+
|101    |S101-A    |2025-01-01 10:00:00|2025-01-01 10:20:00|1200                    |
|101    |S101-B    |2025-01-06 09:30:00|2025-01-06 09:40:00|600                     |
|102    |S102-B    |2025-01-02 11:30:00|2025-01-02 11:38:00|480                     |
|102    |S102-C    |2025-01-07 15:00:00|2025-01-07 15:18:00|1080                    |
|103    |S103-C    |2025-01-03 09:00:00|2025-01-03 09:10:00|600                     |
|104    |S104-D    |2025-01-04 14:00:00|2025-01-04 14:03:00|180                     |
|105    |S105-E    |2025-01-05 16:45:00|2025-01-05 17:02:00|1020                    |
|106    |S106-F    |2025-01-08 10:00:00|2025-01-08 10:10:00|600                     |
|107    |S107-G    |2025-01-09 11:00:00|2025-01-09 11:

In [None]:
# Средняя продолжительность сессии

(df_session_durations
    .agg(F.round(F.avg(F.col("session_duration_seconds")), 2).alias("average_session_duration_seconds"))
    .show(truncate=False)
)

+--------------------------------+
|average_session_duration_seconds|
+--------------------------------+
|740.0                           |
+--------------------------------+



In [None]:
# Остановка SparkSession
spark.stop()

# Задание №5: Анализ пользовательских сессий

In [None]:
import requests

# Получение "прямой" ссылки
public_url = "https://disk.yandex.ru/d/w9AhJSpY7xqSUg"
api_response = requests.get(f"https://cloud-api.yandex.net/v1/disk/public/resources/download?public_key={public_url}")
api_response.raise_for_status()

file_response = requests.get(api_response.json()['href'])
file_response.raise_for_status()

# Сохранение в файл
with open("netflix_data.csv", "wb") as f:
    f.write(file_response.content)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import Window
spark = SparkSession.builder \
        .appName("netflix_project") \
        .getOrCreate()

schema = StructType((
            StructField("title", StringType(), True),
            StructField("type", StringType(), True),
            StructField("genres", StringType(), True),
            StructField("releaseYear", IntegerType(), True),
            StructField("imdbId", StringType(), True),
            StructField("imdbAverageRating", DoubleType(), True),
            StructField("imdbNumVotes", IntegerType(), True),
            StructField("availableCountries", StringType(), True)
))

# Считываем со схемой
df = spark.read.option("header", True).csv("netflix_data.csv", schema = schema)

sc.

## 5.1 Предварительная подготовка данных

In [None]:
# Преобразуем типы со структурой array
df = df.withColumn("genres", F.split(F.col("genres"), ", "))
df = df.withColumn("availableCountries", F.split(F.col("availableCountries"), ", "))

df.show(5, truncate=True)
df.printSchema()

+-------------------+-----+--------------------+-----------+---------+-----------------+------------+------------------+
|              title| type|              genres|releaseYear|   imdbId|imdbAverageRating|imdbNumVotes|availableCountries|
+-------------------+-----+--------------------+-----------+---------+-----------------+------------+------------------+
|              Ariel|movie|[Comedy, Crime, D...|       1988|tt0094675|              7.4|        9240|              NULL|
|Shadows in Paradise|movie|[Comedy, Drama, M...|       1986|tt0092149|              7.4|        8074|              NULL|
|       Forrest Gump|movie|    [Drama, Romance]|       1994|tt0109830|              8.8|     2392180|              NULL|
|    American Beauty|movie|             [Drama]|       1999|tt0169547|              8.3|     1252895|              NULL|
|  The Fifth Element|movie|[Action, Adventur...|       1997|tt0119116|              7.6|      529085|              NULL|
+-------------------+-----+-----

In [None]:
# Удаляем дубликаты

print(f"Количество строк до удаления дубликатов: {df.count()}")
df = df.dropDuplicates(["title", "releaseYear", "imdbId"])

Количество строк до удаления дубликатов: 21910


In [None]:
# Удаляем пропуски

df = df.dropna(subset=["imdbAverageRating","genres"])
print(f"Количество строк после удаления дубликатов и пропусков: {df.count()}")
df.show(5, truncate=True)

Количество строк после удаления дубликатов и пропусков: 20090
+--------------------+-----+--------------------+-----------+----------+-----------------+------------+------------------+
|               title| type|              genres|releaseYear|    imdbId|imdbAverageRating|imdbNumVotes|availableCountries|
+--------------------+-----+--------------------+-----------+----------+-----------------+------------+------------------+
|"Gabriel ""Fluffy...|movie|            [Comedy]|       2019| tt9426212|              7.3|        2285|              NULL|
|"The Most Notorio...|   tv|[Action, Adventur...|       2024|tt32501772|              7.3|         897|              NULL|
|                #AAY|movie|            [Comedy]|       2024|tt28359277|              6.4|         817|              NULL|
|              #Alive|movie|[Action, Drama, H...|       2020|tt10620868|              6.3|       52102|              NULL|
|#Anne Frank Paral...|movie|[Documentary, Dra...|       2019| tt9850370|     

## Анализ

In [None]:
# Cоотношение фильмов и сериалов по годам

(df.groupBy(F.col("releaseYear"))
    .pivot("type")
    .agg(F.count("*").alias("total_titles"))
    .orderBy(F.col("releaseYear").desc())
    .limit(10)
    .show()
)

+-----------+-----+---+
|releaseYear|movie| tv|
+-----------+-----+---+
|       2025|  147|184|
|       2024|  844|494|
|       2023| 1176|511|
|       2022| 1271|574|
|       2021|  954|465|
|       2020|  874|409|
|       2019| 1010|389|
|       2018| 1022|348|
|       2017|  833|239|
|       2016|  680|178|
+-----------+-----+---+



In [None]:
# Cоотношение фильмов и сериалов по жанрам

(df.select(F.explode_outer(F.col("genres")).alias("genre"), "type")
    .groupBy("genre")
    .pivot("type")
    .agg(F.count("*").alias("count"))
    .orderBy((F.col("movie") + F.col("tv")).desc())
    .show(10, truncate=False)
)

+-----------+-----+----+
|genre      |movie|tv  |
+-----------+-----+----+
|Drama      |7509 |2208|
|Comedy     |6034 |1378|
|Action     |2709 |761 |
|Crime      |2310 |773 |
|Romance    |2370 |662 |
|Adventure  |1772 |696 |
|Thriller   |1798 |298 |
|Documentary|1447 |540 |
|Animation  |982  |998 |
|Mystery    |1054 |384 |
+-----------+-----+----+
only showing top 10 rows



In [None]:
# Популярность жанров на IMDb

(df.select(F.explode_outer(F.col("genres")).alias("genre"), "imdbAverageRating", "imdbNumVotes")
    .groupBy("genre")
    .agg(F.round(F.avg("imdbAverageRating"), 2).alias("average_rating"), F.sum("imdbNumVotes").alias("total_votes"))
    .orderBy(F.col("average_rating").desc())
    .show(10, truncate=False)
)

+------------------+--------------+-----------+
|genre             |average_rating|total_votes|
+------------------+--------------+-----------+
|Action & Adventure|7.0           |222        |
|Documentary       |6.93          |6721103    |
|Biography         |6.93          |43144041   |
|History           |6.92          |17775960   |
|Animation         |6.79          |38227002   |
|Talk-Show         |6.76          |115327     |
|War               |6.72          |11575840   |
|Sport             |6.72          |9147143    |
|Music             |6.65          |11650484   |
|News              |6.64          |14014      |
+------------------+--------------+-----------+
only showing top 10 rows



In [None]:
# Динамика выпуска контента по годам и рейтинг

(df.groupBy("releaseYear")
    .agg(F.count("*").alias("total_titles"), F.round(F.avg("imdbAverageRating"),2).alias("average_rating"))
    .orderBy(F.col("releaseYear").desc())
    .show(10, truncate=False)
)

+-----------+------------+--------------+
|releaseYear|total_titles|average_rating|
+-----------+------------+--------------+
|2025       |331         |6.44          |
|2024       |1338        |6.28          |
|2023       |1687        |6.27          |
|2022       |1845        |6.26          |
|2021       |1419        |6.33          |
|2020       |1283        |6.33          |
|2019       |1399        |6.42          |
|2018       |1370        |6.41          |
|2017       |1072        |6.46          |
|2016       |858         |6.52          |
+-----------+------------+--------------+
only showing top 10 rows



In [None]:
# Анализ динамики прироста фильмов по годам

(df.groupBy("releaseYear")
   .agg(F.count("*").alias("current_year_title_count"))
   .withColumn("prev_year_titles_count",
               F.lag("current_year_title_count").over(Window.orderBy("releaseYear")))
   .withColumn("title_count_growth",
               F.col("current_year_title_count") - F.col("prev_year_titles_count"))
   .orderBy(F.col("title_count_growth").desc())
   .show(10, truncate=False)
)

+-----------+------------------------+----------------------+------------------+
|releaseYear|current_year_title_count|prev_year_titles_count|title_count_growth|
+-----------+------------------------+----------------------+------------------+
|2022       |1845                    |1419                  |426               |
|2018       |1370                    |1072                  |298               |
|2017       |1072                    |858                   |214               |
|2016       |858                     |710                   |148               |
|2021       |1419                    |1283                  |136               |
|2015       |710                     |590                   |120               |
|2013       |513                     |404                   |109               |
|2014       |590                     |513                   |77                |
|2003       |214                     |170                   |44                |
|2009       |324            

In [None]:
spark.stop()