In [None]:
from traceback import print_exc #для отладки
import os  # Модуль для работы с операционной системой: чтение переменных окружения, управление файлами и путями и т.д.
import pickle  # Модуль для сериализации/десериализации Python-объектов в бинарный формат — часто используется для сохранения моделей.

import pandas as pd #это pandas, нужен нам для выгрузки таблиц из БД
import numpy as np #это numpy, нужен нам для обработки признаков в таблицах
from typing import List # Модуль для аннотации типов — в данном случае List используется, чтобы явно указать, что функция возвращает список объектов.
from fastapi import FastAPI, HTTPException #это сам FastAPI, с помощью которого мы создаем приложения, а второй модуль - это для отладки
from pydantic import BaseModel #это класс, на основании которого мы создаем класс PostGet
from datetime import datetime #модуль дает возможность использовать специальный формат данных datetime
from sqlalchemy import create_engine #создание движка для обращения к sql
from sklearn.preprocessing import LabelEncoder #модуль используется для обработки категориальных фичей посредством замены каждого уникального текстового значения на порядковый номер

# ==== Модель ответа ====
class PostGet(BaseModel): #создаем класс, который задаёт формат вывода ответа на запрос
    id: int #id будет числом
    text: str #текст будет строкой
    topic: str #топик будет строкой

    class Config:
        orm_mode = True #этот параметр позволяет FastAPI получать на вход не только словари, но и объекты SQLAlchemy

app = FastAPI() #создаем приложение

# ==== Подключение к БД ====
def batch_load_sql(query: str) -> pd.DataFrame: 
    #создаем функцию для загрузки данных из SQL. 
    #query: str - это входной аргумент (SQL-запрос) в формате string
    #-> pd.DataFrame - аннотация, что выводом функции будет pandas dataframe
    CHUNKSIZE = 200_000 #это размер куска в 200000 строк, который мы вынимаем за раз из БД
    engine = create_engine( #создаем движок, говорим ему имя пользователя, пароль, хост, порт, базу
        "postgresql://robot-startml-ro:pheiph0hahj1Vaif@"
        "postgres.lab.karpov.courses:6432/startml"
    )
    conn = engine.connect().execution_options(stream_results=True) 
    #здесь мы открываем соединение
    #stream_results=True - говорит не загружать сразу всю таблицу в память, отдавать рез-ты по мере чтения
    chunks = [] #создаем пустой список chunks
    for chunk_dataframe in pd.read_sql(query, conn, chunksize=CHUNKSIZE): #начинаем цикл, где для каждого куска dataframe размером CHUNKSIZE...
        chunks.append(chunk_dataframe) #мы добавляем этот кусок в список chunks
    conn.close() #закрываем соединение
    return pd.concat(chunks, ignore_index=True) #в качестве вывода мы возвращаем chunks, превращенный из списка в pandas dataframe

# ==== Загрузка модели ====
def get_model_path(path: str) -> str: 
    #создаем функцию для получения пути к модели
    #входной параметр path ожидается в формате string, вывод - тоже string
    if os.environ.get("IS_LMS") == "1": #проверяем значение переменной окружения, чтобы понять, откуда запускается код
        return "/workdir/user_input/model" #если мы в LMS, функция вернет этот код
    return path #во всех остальных случаях вернем то же, что и приняли на входе

model_path = get_model_path("model.pkl") #запускаем функцию get_model_path со значением path = "model.pkl", результат кладем в model_path
with open(model_path, "rb") as f: # открываем файл в бинарном режиме
    model = pickle.load(f) # загружаем модель из файла

# ==== Загрузка признаков пользователей ====
user_features = batch_load_sql("SELECT * FROM maxezdu_features_lesson_22")
#запускаю функцию batch_load_sql с запросом всей таблицы maxezdu_features_lesson_22, сохраняю её в user_features

# ==== Загрузка постов ====
posts_df = batch_load_sql("SELECT post_id, text, topic FROM post_text_df")
#запускаю функцию batch_load_sql с запросом 3 столбцов таблицы post_text_df, сохраняю её в posts_df

# Добавим длину текста как признак поста
posts_df['text_len'] = posts_df['text'].str.len()
# к датафрейму с постами мы добавляем колонку text_len, 
# которая получается как длина string из колонки text (.str.len() — это строковой метод pandas)

# Загружаем небольшой сэмпл feed_data
feed_sample = batch_load_sql("SELECT * FROM feed_data LIMIT 200000")

# Оставляем только просмотры
feed_sample = feed_sample[feed_sample["action"] == "view"]

# Считаем статистики постов
post_stats = feed_sample.groupby("post_id").agg(
    views_y=("action", "count"),
    likes_y=("target", "sum")
).reset_index()

post_stats["ctr_y"] = post_stats["likes_y"] / post_stats["views_y"]
post_stats = post_stats.fillna(0)

posts_df = posts_df.merge(post_stats, on="post_id", how="left")
posts_df = posts_df.fillna(0)



# ==== Категориальные признаки ====
cat_cols = ['country', 'city', 'os', 'source', 'topic'] #создаем список, в котором перечисляем категориальные признаки
label_encoders = {} #создаем пустой словарь, в который будем класть по ключу 'имя из cat_cols' экземпляр эгкодера с соответствием
#уникальных значений в колонке их порядковому номеру

# Обучаем LabelEncoder отдельно для пользователей и постов
for col in cat_cols: #начинаем цикл по списку cat_cols
    le = LabelEncoder() #создаем экземпляр энкодера
    if col in user_features.columns: #условие: если col есть среди колонок таблицы user_features, то:
        user_features[col] = user_features[col].astype(str) #сделать тип всех значений в этой колонке в user_features стрингом
        le.fit(user_features[col]) #применяем fit для le на колонке из user_features. 
        #Этот метод в итоге сохраняет в переменную le список из преобразовынных в порядковые номера значений колонки user_features[col]?
    else: #когда col нет в user_features, берем колонку с именем col в таблице posts_df и делаем такое же преобразование
        posts_df[col] = posts_df[col].astype(str)
        le.fit(posts_df[col])
    label_encoders[col] = le #добавляем в словарь по ключу 'имя из cat_cols' энкодер

# ==== Эндпоинт ====
@app.get("/post/recommendations/", response_model=List[PostGet]) #создаем метод эндпоинта .get с параметрами: путь и форма ответа
def recommended_posts(id: int, time: datetime, limit: int = 5) -> List[PostGet]: 
    #функция, на вход получает id, время и лимит. Вывод ожидается по форме List
    try: #проверка на ошибочку
        if id not in user_features["user_id"].values: #если такого user id нет в таблице, пишет User not found
            raise HTTPException(status_code=404, detail="User not found")

        user_row = user_features[user_features["user_id"] == id].drop(columns=["user_id"]) 
        #ищем строку в user_features с нужным id, удаляем из этой строки сам user_id, и сохраняем в user_row
        #то есть получаем список значений user_features для запрашиваемого юзера

        user_row = user_row.rename(columns={'views': 'views_x','likes': 'likes_x','ctr': 'ctr_x'})

        # Дублируем user_row на количество постов
        user_matrix = pd.concat([user_row] * len(posts_df), ignore_index=True)
        #создаем user_matrix, которая есть просто таблица с повторяющейся len(posts_df) раз строкой user_row


        # Добавляем постовые признаки
        post_feats = posts_df[['topic', 'text_len', 'views_y', 'likes_y', 'ctr_y']].copy()
        #создаем копию таблицы постов, в которой только два столбца: топик и длина текста

        # Объединяем пользовательские и постовые признаки
        data_for_pred = pd.concat([user_matrix.reset_index(drop=True), post_feats.reset_index(drop=True)], axis=1)
        # соединяем таблицу из одинаковых строк с признаками юзера с таблицей с признаками topic и text_len из поста

        # Кодируем категориальные признаки
        for col in cat_cols:
            data_for_pred[col] = label_encoders[col].transform(data_for_pred[col].astype(str))
            #проходимся по категориальным колонкам в новой таблице data_for_pred и вставляем с 
            #помощью метода transform значения из сформированных ранее энкодеров


        # Предсказания вероятностей
        preds = model.predict_proba(data_for_pred)[:, 1]
        #выводим в переменную preds вероятности принадлежности каждой пары [запрашиваемый юзер, пост] к положительному классу

        # Сортируем посты по предсказанному рейтингу
        posts_df_copy = posts_df.copy() #создаем копию таблицы постов
        posts_df_copy["score"] = preds #добавляем в эту копию колонку score, заполненную значениями preds
        top_posts = posts_df_copy.sort_values("score", ascending=False).head(limit) 
        #в top_posts сохраняем limit постов из отсортированной по убыванию значения в колонке score таблицы posts_df_copy

        return [
            PostGet(id=row.post_id, text=row.text, topic=row.topic) #возвращаем ответ по форме PostGet
            for _, row in top_posts.iterrows() #значения из строк таблицы top_posts, начиная с верхней
        ]

    except Exception:
        print("Ошибка в recommended_posts():")
        print_exc()
        raise HTTPException(status_code=500, detail="Internal server error")
