In [1]:
from pyspark.sql import SparkSession, functions as sf
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import pandas as pd
import pyarrow

import tabulate
from enum import Enum

Подключимся к спарку

In [2]:
spark = (
    SparkSession.builder
    .master('spark://localhost:7077')
    .config('spark.sql.repl.eagerEval.enabled', True)
    .getOrCreate()
)

23/10/29 19:13:32 WARN Utils: Your hostname, Victus-by-HP-Laptop resolves to a loopback address: 127.0.1.1; using 192.168.31.236 instead (on interface wlp4s0)
23/10/29 19:13:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/29 19:13:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Загрузим сами данные

In [3]:
table_schema = StructType([
    StructField('id', IntegerType(), False),
    StructField('genre', StringType(), False),
    StructField('name', StringType(), False),
    StructField('status', IntegerType(), False),
])
df = (spark.read
      .format('csv')
      .option('header', False)
      .schema(table_schema)
      .load('anime.csv')
      )

Определим какой статус (число), что означает

In [4]:
class WatchingStatus:
    Watching = 1
    Completed = 2
    OnHold = 3
    Dropped = 4
    PlanToWatch = 6
    
    @staticmethod
    def interpret(number):
        number = int(number)
        if number == WatchingStatus.Watching: return 'Watching'
        if number == WatchingStatus.Completed: return 'Completed'
        if number == WatchingStatus.OnHold: return 'OnHold'
        if number == WatchingStatus.Dropped: return 'Dropped'
        if number == WatchingStatus.PlanToWatch: return 'PlanToWatch'
        raise ValueError(f'Непонятное значение для {number}')

## 1. Получим статистику брошенных аниме по жанрам

Для каждого жанра - кол-во аниме, просмотр которых бросили.

P.S. это переписанный исходный запрос

In [5]:
dropped = df.filter(df.status == WatchingStatus.Dropped).groupBy(df.genre).count().orderBy('count', ascending=False)
dropped = dropped.withColumnRenamed('count', 'dropped')
dropped

                                                                                

genre,dropped
comedy,4205
action,3676
romance,2394
fantasy,2391
school,2189
shounen,2177
drama,2070
supernatural,1959
adventure,1787
sci-fi,1522


# 2. Посмотрим какой статус у большинства аниме

In [6]:
most_common = df.groupBy(df.status).count().toPandas()
# most_common_mapped = most_common.rdd.map(lambda x: (WatchingStatus.interpret(x.status), x.count)).collect()
# tabulate.tabulate(most_common_mapped, )
most_common['status'] = most_common['status'].apply(lambda x: WatchingStatus.interpret(x))
most_common.sort_values(by='count', ascending=False)

                                                                                

Unnamed: 0,status,count
4,Completed,594452
1,PlanToWatch,271568
0,Watching,54563
2,OnHold,41871
3,Dropped,37546


Больше всего аниме Завершенных, но меньше всего Брошенных.

Это значит, что случайное аниме с большей долей вероятности посмотрят до конца, чем бросят

# 3. Сколько аниме смотрят или посмотрели пользователи в среднем

In [69]:
avg_anime_watched = (
    df
    .filter((df.status == WatchingStatus.Completed) | (df.status == WatchingStatus.Watching)) # Получаем только просмотренные аниме
    .select(['id', 'name']) # Выбираем только нужные поля
    .groupby('id') # Группируем по пользователям
    .count() # Считаем для каждого пользователя
    .agg({'count': 'avg'}) # Получаем среднее кол-во
)
avg_anime_watched

                                                                                

avg(count)
79.65328915071183


В среднем, каждый пользователь *знает* (посмотрел или смотрит) о **80** аниме!!!! 

# 4. Среднее количество жанров для каждого аниме

In [31]:
avg_genres_count = (
    df
    .select(['name', 'genre'])
    .distinct()
    .groupBy('name')
    .count()
    .agg({'count': 'avg'})
)

avg_genres_count

                                                                                

avg(count)
3.3536240662224914


В среднем у каждого аниме 3-4 жанра

# 5. Сколько экшена посмотрел каждый пользователь в среднем (медиана)

In [62]:
# Первая таблица только с ID пользователей
user_ids = (
    df
    .select('id')
    .distinct()
)

# Список из пар ID-жанр по одной записи для каждого просмотренного аниме
user_genre = (
    df
    .filter(((df.status == WatchingStatus.Completed) | (df.status == WatchingStatus.Watching)) & (df.genre == 'action'))
    .select(['id', 'genre'])
)

# Сливаем все вместе
avg_action = (
    user_ids
    .join(user_genre, on='id', how='left') # Связываем 2 таблицы вместе
    .groupby('id') # Группируем по пользователям (в группах останутся только 'action'
    # если группа пуста, то count() будет превращать NULL как 1 - нужно использовать другой механизм
    .agg(sf.when(sf.count('genre') > 0, sf.count('genre')).otherwise(0).alias('count')) # считаем кол-во элементов в каждой группе
    .agg({'count': 'avg'}) # получаем среднее число
)

avg_action

                                                                                

avg(count)
6.140982944797659


В среднем, каждый пользователь посмотрел 6 аниме с жанром 'action'