**Практика Python**

ratings: https://drive.google.com/file/d/1Rd5_Hsc59vzxgzfTubkPsfMUmYZZZN8z/view?usp=drive_link

movies: https://drive.google.com/file/d/1GJsbt-o6k4cbPqTImQ-z5NLA6RXpiaWg/view?usp=drive_link

In [None]:
# # Для использования from_pandas нужно установить версию pyspark >= 3.2
# pip install pyspark
# import pyspark.pandas as ps

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
# Create environmental variables
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
# Loading required libraries
import findspark
import re
findspark.init()
from pyspark import SparkFiles
from pyspark.sql.functions import *
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StringType, DateType, FloatType
import datetime as dt
from google.colab import drive
import pandas as pd
from pandas.api.types import is_int64_dtype
import numpy as np
import scipy.stats as st
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
import sklearn.metrics as metrics
from sklearn.preprocessing import LabelEncoder
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix, ConfusionMatrixDisplay
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from mlxtend.plotting import plot_decision_regions

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
# Загрузите в колаб файлы по оценкам (ratings) и фильмам (movies) и
# создайте на их основе pandas-датафреймы
# movies_url = 'https://drive.google.com/uc?id=' + 'https://drive.google.com/file/d/1GJsbt-o6k4cbPqTImQ-z5NLA6RXpiaWg/view?usp=drive_link'.split('/')[-2]
# ratings_url = 'https://drive.google.com/uc?id=' + 'https://drive.google.com/file/d/1Rd5_Hsc59vzxgzfTubkPsfMUmYZZZN8z/view?usp=drive_link'.split('/')[-2]

movies = pd.read_csv('u.item.csv', encoding= 'unicode_escape', sep= '|',
                     names= ['movie_id', 'movie_title', 'release_date', 'video_release_date', 'IMDb_URL',
                             'unknown', 'Action', 'Adventure', 'Animation', 'Children_s', 'Comedy', 'Crime',
                             'Documentary', 'Drama', 'Fantasy', 'Film_Noir', 'Horror', 'Musical', 'Mystery',
                             'Romance', 'Sci_Fi', 'Thriller', 'War', 'Western'])
ratings = pd.read_csv('u.data.csv', sep= '\t', names= ['user_id', 'item_id', 'rating', 'timestamp'])
ratings['timestamp'] = pd.to_datetime(ratings['timestamp'], unit= 's')

In [None]:
# Средствами Pandas, используя dataframe ratings, найдите id пользователя,
# поставившего больше всего оценок
top1 = ratings.groupby('user_id')[['rating']].count().\
    sort_values(by= 'rating', ascending= False).reset_index()
print(f'пользователь с id {top1.iloc[0, 0]} поставившил больше всего оценок - {top1.iloc[0, 1]}')

In [None]:
# Оставьте в датафрейме ratings только те фильмы, который оценил данный пользователь
ratings.loc[ratings['user_id'] == top1.iloc[0, 0]]

In [None]:
# Добавьте к датафрейму из задания 3 столбцы: По жанрам. Каждый столбец - это жанр. Единицу записываем, если фильм принадлежит данному жанру и 0 - если нет
merged_df = ratings.merge(movies, how= 'inner', left_on= 'item_id', right_on= 'movie_id').drop(columns= ['item_id', 'video_release_date'])
top1_films = merged_df.loc[merged_df['user_id'] == top1.iloc[0, 0]]
print(f'rows: {top1_films.shape[0]}\ncolumns: {top1_films.shape[1]}')
top1_films.head()

In [None]:
# столбцы с общим количеством оценок от всех пользователей на фильм и суммарной оценкой от всех пользователей
main_df = merged_df.groupby('movie_id').agg({'rating': ['count', 'sum']})['rating'].merge(top1_films, how= 'inner', on= ['movie_id'])
main_df['release_year'] = pd.to_datetime(main_df['release_date']).dt.year
main_df.head()

In [None]:
# Сформируйте X_train, X_test, y_train, y_test
x = main_df.drop(columns= ['movie_id', 'user_id', 'rating', 'timestamp', 'movie_title', 'release_date', 'IMDb_URL'])
y = main_df['rating']
x_train, x_test, y_train, y_test = train_test_split(x, y, random_state= 13, stratify= y, test_size= .2)

In [None]:
# Возьмите модель линейной регрессии (или любую другую для задачи регрессии) и обучите ее на фильмах

# Модель линейного дискриминантного анализа (ЛДА)
lda = LinearDiscriminantAnalysis()
lda.fit(x_train, y_train)
print(classification_report(y_test, lda.predict(x_test)))

In [None]:
# Визуализация результатов обученной модели ЛДА, полученных на тестовой выборке
predictions = lda.predict(x_test)
cm = confusion_matrix(y_test, predictions, labels= lda.classes_)
disp = ConfusionMatrixDisplay(confusion_matrix= cm,
                              display_labels= lda.classes_)

disp.plot()
plt.show()

In [None]:
# Модель логистической регресии
pipe_lr = make_pipeline(StandardScaler(), LogisticRegression(multi_class= 'multinomial'))
pipe_lr.fit(x_train, y_train)
print(classification_report(y_test, pipe_lr.predict(x_test)))

In [None]:
# Визуализация результатов обученной модели логистической регрессии, полученных на тестовой выборке
predictions_lr = pipe_lr.predict(x_test)
cm_lr = confusion_matrix(y_test, predictions_lr, labels= pipe_lr.classes_)
disp_lr = ConfusionMatrixDisplay(confusion_matrix= cm_lr,
                                 display_labels= pipe_lr.classes_)

disp_lr.plot()
plt.show()

In [None]:
# Модель линейного дискриминантного анализа (ЛДА), с масштабированием
pipe_lda = make_pipeline(StandardScaler(), LinearDiscriminantAnalysis())
pipe_lda.fit(x_train, y_train)
print(classification_report(y_test, pipe_lda.predict(x_test)))

In [None]:
# Визуализация результатов обученной модели ЛДА, полученных на тестовой выборке, с масштабированием
predictions_lda = pipe_lda.predict(x_test)
cm_lda = confusion_matrix(y_test, predictions_lda, labels= pipe_lda.classes_)
disp_lda = ConfusionMatrixDisplay(confusion_matrix= cm_lda,
                                  display_labels= pipe_lda.classes_)

disp_lda.plot()
plt.show()

In [None]:
# Модель линейной регрессии (для доп сравнения)
linr = LinearRegression()
linr.fit(x_train, y_train)
predictions_linr = linr.predict(x_test)

In [None]:
# Диаграмма рассеяния для результатов прогноза по линейной регрессии
plt.scatter(y_test, predictions_linr)
plt.ylabel('Прогнозы')
plt.xlabel('Факт')
plt.title('Диаграмма рассеяния прогнозных и фактических значений')
plt.show()

In [None]:
# Метрики точности модели линейной регрессии
print(f'R2: {metrics.r2_score(y_test, predictions_linr).round(3)}')
print(f'MAE: {metrics.mean_absolute_error(y_test, predictions_linr).round(3)}')
print(f'MSE: {metrics.mean_squared_error(y_test, predictions_linr).round(3)}')
print(f'RMSE: {np.sqrt(metrics.mean_squared_error(y_test, predictions_linr)).round(3)}')

sns.histplot((y_test - predictions_linr), kde= True)
plt.title('Плотность распределения остатков линейной модели')
plt.show()

In [None]:
#                             ||
#                             \/
# Для задач, в которых количество уникальных значений исследуемой переменной меньше 5-7, необходимо использовать алгоритмы классификации

In [None]:
# Загрузить данные в spark
# movies
movies_spark = spark.read.csv('u.item.csv', inferSchema= True, sep= '|').toDF(
    'movie_id', 'movie_title', 'release_date', 'video_release_date',
    'IMDb_URL', 'unknown', 'Action', 'Adventure', 'Animation', "Children_s",
    'Comedy', 'Crime', 'Documentary', 'Drama', 'Fantasy', 'Film_Noir',
    'Horror', 'Musical', 'Mystery', 'Romance', 'Sci_Fi', 'Thriller', 'War',
    'Western'
).drop(
    'video_release_date'
)

# ratings
ratings_spark = spark.read.csv('u.data.csv', inferSchema= True, sep= '\t').toDF(
    'user_id', 'item_id', 'rating', 'timestamp'
).withColumn(
      'timestamp', from_unixtime(col('timestamp'), 'MM-dd-yyyy HH:mm:ss')
)

In [None]:
# # Для использования from_pandas нужно установить версию pyspark >= 3.2
# pip install pyspark
# import pyspark.pandas as ps

In [None]:
# movies_spark = ps.from_pandas(movies)
# movies_spark

In [None]:
merged_df_spark = ratings_spark.join(movies_spark,
                                     ratings_spark['item_id'] == movies_spark['movie_id'],
                                     'inner').drop('item_id')

print((merged_df_spark.count(), len(merged_df_spark.columns)))
merged_df_spark.show()

In [None]:
# Средствами спарка вывести среднюю оценку для каждого фильма
mean_rating_by_movie = merged_df_spark.groupBy('movie_id').agg(
    round(avg('rating'), 2)
).withColumnRenamed(
    'round(avg(rating), 2)', 'mean_rating'
).sort(
    'mean_rating', ascending= False
)

print((mean_rating_by_movie.count(), len(mean_rating_by_movie.columns)))
mean_rating_by_movie.show()

In [None]:
# Посчитайте средствами спарка среднюю оценку для каждого жанра
df_ratings_spark = merged_df_spark.drop('user_id', 'timestamp', 'movie_id', 'movie_title', 'release_date', 'IMDb_URL')
genres = [
    'unknown', 'Action', 'Adventure', 'Animation', 'Children_s', 'Comedy',
    'Crime', 'Documentary', 'Drama', 'Fantasy', 'Film_Noir', 'Horror',
    'Musical', 'Mystery', 'Romance', 'Sci_Fi', 'Thriller', 'War', 'Western'
]
genre_cols_expr = ', '.join([f"'{i}', {i}" for i in genres])
df_transformed = df_ratings_spark.selectExpr('rating', f'stack(19, {genre_cols_expr}) as (genre, is_genre)')

result_spark = df_transformed.filter(col('is_genre') == 1)

print((result_spark.count(), len(result_spark.columns)))
result_spark.groupBy('genre').agg(
    round(avg('rating'), 2)
).withColumnRenamed(
    'round(avg(rating), 2)', 'mean_rating'
).sort(
    'mean_rating', ascending= False
).show()

In [None]:
# Проверка решения в pandas
merged_df = ratings.merge(movies, how= 'inner', left_on= 'item_id', right_on= 'movie_id').drop(columns= ['item_id', 'video_release_date'])
genres = merged_df.drop(columns= ['user_id', 'timestamp', 'movie_id', 'movie_title', 'release_date', 'IMDb_URL'])

df_melted = genres.melt(id_vars= 'rating', var_name= 'genre', value_name= 'is_genre')
result = df_melted[df_melted['is_genre'] == 1]
result.reset_index(drop= True, inplace= True)

print(result.shape)
result.groupby('genre')[['rating']].mean().sort_values(by= 'rating', ascending= False).round(2).rename(columns= {'rating': 'mean_rating'})

In [None]:
# В спарке получить 2 датафрейма с 5-ю самыми популярными и самыми непопулярными фильмами (по количеству оценок, либо по самой оценке - на Ваш выбор)
top_df_spark = merged_df_spark.groupBy('movie_id', 'movie_title').agg(
    count('rating')
).withColumnRenamed(
    'count(rating)', 'count_rating'
)

In [None]:
# Топ 5 самых популярных фильмов по количеству оценок
print((top_df_spark.count(), len(top_df_spark.columns)))
top_df_spark.sort(
    desc('count_rating')
).show(5)

In [None]:
# Топ 5 самых непопулярных фильмов по количеству оценок
print((top_df_spark.count(), len(top_df_spark.columns)))
top_df_spark.sort(
    asc('count_rating')
).show(5)

**Теоретическая часть**

***Задание 1: Описать основные бизнес-отчеты (2-3 штуки), которые мы хотим видеть по нашему бизнесу***

Основные бизнес-отчеты:
- Отчет по самым популярным фильмам/сериалам на платформе за определенный период времени
- Отчет по среднему времени просмотра контента на платформе пользователем
- Отчет по конверсии пользователей из бесплатного пробного периода в платных подписчиков

***Задание 2: Описать основные имеющиеся данные и источники их поступления***

Основные данные и источники их поступления:
- Данные о просмотрах фильмов/сериалов, пользователях, подписках и оплатах на платформе
- Данные о контенте, включая название, жанр, длительность и рейтинг
- Данные о пользователях, включая их историю просмотров, предпочтения и демографическую информацию

***Задание 3: Описать основные сущности в хранилище данных (схема звезда) и процесс заливки данных***

Основные сущности в хранилище данных:
- Факт: таблица фактов с информацией о просмотрах
- Измерения: таблицы с информацией о фильмах, пользователях, подписках, платежах и других связанных сущностях.

Процесс заливки данных в хранилище данных включает процессы ETL (извлечение, преобразование, загрузка) для каждой из сущностей и
происходит в несколько этапов, которые можно описать следующим образом:

- Извлечение данных: на этом этапе происходит извлечение данных из различных источников, таких как базы данных платформы, логи серверов, сторонние сервисы и т.д. Для этого могут использоваться ETL инструменты или написание скриптов для извлечения данных.

- Преобразование данных: после извлечения данных необходимо их преобразовать, чтобы они соответствовали структуре целевого хранилища данных. На этом этапе проводится очистка данных от аномалий, заполнение пропущенных значений, изменение форматов данных и другие манипуляции.

- Загрузка данных: после преобразования данные загружаются в хранилище данных. Обычно используется модель звезда, где фактовая таблица содержит информацию о событиях (например, просмотры), а измерения содержат информацию о сущностях (например, пользователи, фильмы).

- Проверка качества данных: перед загрузкой данных необходимо провести проверку на качество данных. Некоторые из проверок включают проверку на дубликаты записей, наличие обязательных полей, соответствие форматам данных, целостность связей между сущностями и другие.

- Мониторинг: после загрузки данных в хранилище важно организовать мониторинг процесса. Это позволяет быстро выявлять проблемы с данными и вносить коррективы в ETL процессы.

- Автоматизация: для повышения эффективности и устойчивости процесса заливки данных необходимо автоматизировать его. Это позволит уменьшить вероятность человеческих ошибок и ускорить процесс.

***Задание 4: Описать основные проверки на качество данных (10 штук), которыми будем пользоваться при заливке***

Основные проверки на качество данных:
- Проверка на дубликаты записей
- Проверка на наличие обязательных полей
- Проверка на аномалии в данных (например, отрицательные значения)
- Проверка на соответствие форматам данных (например, дата и время)
- Проверка на целостность связей между сущностями
- Проверка на наличие выбросов
- Проверка на повторяющиеся значения
- Проверка на непрерывность последовательности данных
- Проверка на соответствие бизнес-правилам
- Проверка на уникальность ключевых полей

***Задание 5: Придумать Data-проект, который должен улучшить показатели Вашего бизнеса и расписать его по Crisp-DM***

Data-проект по улучшению показателей бизнеса:
Проект по оптимизации рекомендательной системы для увеличения удержания пользователей.
Шаги по Crisp-DM:
- Понимание бизнес-проблемы и цели проекта
- Изучение и предварительная обработка данных
- Построение модели рекомендаций
- Тестирование и настройка модели
- Разворачивание модели в продакшн
- Мониторинг и поддержка модели

***Задание 6: Описать требуемые роли в команде по работе с данными на этапах 4 и 5***

Требуемые роли в команде:
- Data Analyst: для анализа данных и выдачи рекомендаций по улучшению бизнес-показателей
- Data Engineer: для разработки и поддержки инфраструктуры данных, ETL процессов и хранилища данных
- Data Scientist: для разработки моделей машинного обучения и анализа данных для улучшения рекомендательной системы
- Project Manager: для координации и управления проектом по улучшению показателей бизнеса.