Тут мы реализуем сервис записи фичей в новую таблицу для того чтобы не загружать основной алгоритм обработкой данных

# Загрузка фичей из базы данных

In [1]:
import pandas as pd
from sqlalchemy import create_engine


def load_and_merge_data(engine, chunksize=200000):
    # Чтение данных таблицы user_data
    query = "SELECT * FROM user_data"
    user_data = pd.read_sql(query, engine)
    print(f"User data shape: {user_data.shape}")

    # Чтение данных таблицы post_text_df
    query = "SELECT * FROM post_text_df"
    post_text_df = pd.read_sql(query, engine)
    print(f"Post text data shape: {post_text_df.shape}")

    # Чтение ограниченного количества данных таблицы feed_data
    query = f"SELECT * FROM feed_data"
    feed_data = batch_load_sql_timed(engine, query, chunksize)
    print(f"Feed data shape: {feed_data.shape}")

    # Переименование столбцов идентификаторов
    user_data = user_data.rename(columns={'id': 'user_id'})
    post_text_df = post_text_df.rename(columns={'id': 'post_id'})

    # Объединение таблиц
    data = feed_data.merge(user_data, on='user_id', how='left')
    data = data.merge(post_text_df, on='post_id', how='left')

    print(f"Data shape after load_and_merge_data: {data.shape}")

    return data


def batch_load_sql(engine, query: str, chunksize: int) -> pd.DataFrame:
    conn = engine.connect().execution_options(stream_results=True)
    chunks = []
    for chunk_dataframe in pd.read_sql(query, conn, chunksize=chunksize):
        chunks.append(chunk_dataframe)
    conn.close()
    return pd.concat(chunks, ignore_index=True)

import time

def batch_load_sql_timed(engine, query: str, chunksize: int) -> pd.DataFrame:
    conn = engine.connect().execution_options(stream_results=True)
    chunks = []
    row_count = 0
    start_time = time.time()

    for chunk_dataframe in pd.read_sql(query, conn, chunksize=chunksize):
        chunks.append(chunk_dataframe)
        row_count += len(chunk_dataframe)
        print(f"Loaded {row_count} rows, elapsed time: {time.time() - start_time:.2f} seconds")

    conn.close()
    return pd.concat(chunks, ignore_index=True)

In [1]:
import pandas as pd
from sqlalchemy import create_engine
import time

In [2]:
engine = create_engine(
        "postgresql://robot-startml-ro:pheiph0hahj1Vaif@"
        "postgres.lab.karpov.courses:6432/startml"
    )

chunksize = 1000000

In [3]:
# Чтение данных таблицы user_data
query = "SELECT * FROM user_data"
user_data = pd.read_sql(query, engine)
print(f"User data shape: {user_data.shape}")

User data shape: (163205, 8)


In [4]:
# Чтение данных таблицы post_text_df
query = "SELECT * FROM post_text_df"
post_text_df = pd.read_sql(query, engine)
print(f"Post text data shape: {post_text_df.shape}")

Post text data shape: (7023, 3)


In [5]:
import time

def batch_load_sql_timed(engine, query: str, chunksize: int) -> pd.DataFrame:
    conn = engine.connect().execution_options(stream_results=True)
    chunks = []
    row_count = 0
    start_time = time.time()

    for chunk_dataframe in pd.read_sql(query, conn, chunksize=chunksize):
        chunks.append(chunk_dataframe)
        row_count += len(chunk_dataframe)
        print(f"Loaded {row_count} rows, elapsed time: {time.time() - start_time:.2f} seconds")

    conn.close()
    return pd.concat(chunks, ignore_index=True)

In [6]:
# Чтение ограниченного количества данных таблицы feed_data
query = "SELECT * FROM feed_data"
feed_data = batch_load_sql_timed(engine, query, chunksize)
print(f"Feed data shape: {feed_data.shape}")

Loaded 1000000 rows, elapsed time: 23.64 seconds
Loaded 2000000 rows, elapsed time: 39.26 seconds
Loaded 3000000 rows, elapsed time: 56.54 seconds
Loaded 4000000 rows, elapsed time: 70.47 seconds
Loaded 5000000 rows, elapsed time: 91.23 seconds
Loaded 6000000 rows, elapsed time: 118.89 seconds
Loaded 7000000 rows, elapsed time: 140.45 seconds
Loaded 8000000 rows, elapsed time: 166.71 seconds
Loaded 9000000 rows, elapsed time: 203.59 seconds
Loaded 10000000 rows, elapsed time: 228.68 seconds
Loaded 11000000 rows, elapsed time: 253.18 seconds
Loaded 12000000 rows, elapsed time: 276.47 seconds
Loaded 13000000 rows, elapsed time: 303.22 seconds
Loaded 14000000 rows, elapsed time: 323.59 seconds
Loaded 15000000 rows, elapsed time: 348.35 seconds
Loaded 16000000 rows, elapsed time: 380.08 seconds
Loaded 17000000 rows, elapsed time: 416.70 seconds
Loaded 18000000 rows, elapsed time: 436.11 seconds
Loaded 19000000 rows, elapsed time: 453.57 seconds
Loaded 20000000 rows, elapsed time: 478.46 se

In [7]:
# Переименование столбцов идентификаторов
user_data = user_data.rename(columns={'id': 'user_id'})
post_text_df = post_text_df.rename(columns={'id': 'post_id'})

# Объединение таблиц
data = feed_data.merge(user_data, on='user_id', how='left')
data = data.merge(post_text_df, on='post_id', how='left')

print(f"Data shape after load_and_merge_data: {data.shape}")

Data shape after load_and_merge_data: (76892800, 14)


# Обработка временных меток

adding year and

In [8]:
# Преобразование формата временных меток в объект datetime
data['timestamp'] = pd.to_datetime(data['timestamp'])

# Извлечение признаков из временных меток
data['day_of_week'] = data['timestamp'].dt.dayofweek
data['hour_of_day'] = data['timestamp'].dt.hour

# Расчет времени с момента последнего действия для каждого пользователя
data = data.sort_values(['user_id', 'timestamp'])
data['time_since_last_action'] = data.groupby('user_id')['timestamp'].diff().dt.total_seconds()
data['time_since_last_action'].fillna(0, inplace=True)

# Extracting day of the month and year from the timestamp
data['day_of_month'] = data['timestamp'].dt.day
data['year'] = data['timestamp'].dt.year

# Удаление столбца временных меток
data = data.drop('timestamp', axis=1)

print('Timestamps processed')
print(f"Data shape after timestamps processing: {data.shape}")

Timestamps processed
Data shape after timestamps processing: (76892800, 18)


# Создание дополнительных признаков

In [9]:
# Feature 1: Количество просмотров и лайков для каждого пользователя
user_views_likes = data.groupby('user_id')['action'].value_counts().unstack().fillna(0)
user_views_likes.columns = ['user_views', 'user_likes']
data = data.merge(user_views_likes, on='user_id', how='left')

# Feature 2: Количество просмотров и лайков для каждого поста
post_views_likes = data.groupby('post_id')['action'].value_counts().unstack().fillna(0)
post_views_likes.columns = ['post_views', 'post_likes']
data = data.merge(post_views_likes, on='post_id', how='left')

# Feature 3: Количество просмотров и лайков для каждой группы тематик
temp_df = data[['exp_group', 'topic', 'action']]

# Создание колонок с количеством просмотров и лайков для каждой темы внутри группы
topic_action_count = temp_df.pivot_table(index='exp_group', columns=['topic', 'action'], aggfunc=len, fill_value=0)
topic_action_count.columns = [f'{col[0]}_exp_group_{col[1]}s' for col in topic_action_count.columns]
grouped_data = topic_action_count.reset_index()

data = data.merge(grouped_data, on='exp_group', how='left')

# Преобразование категориальных признаков в строковый формат
categorical_columns = ['country', 'city', 'topic', 'gender', 'os', 'source']
data[categorical_columns] = data[categorical_columns].astype(str)

print('Additional features created')
print(f"Data shape after additional features creation: {data.shape}")

Additional features created
Data shape after additional features creation: (76892800, 36)


In [13]:
data.head()

Unnamed: 0,user_id,post_id,action,target,gender,age,country,city,exp_group,os,...,entertainment_exp_group_likes,entertainment_exp_group_views,movie_exp_group_likes,movie_exp_group_views,politics_exp_group_likes,politics_exp_group_views,sport_exp_group_likes,sport_exp_group_views,tech_exp_group_likes,tech_exp_group_views
0,200,5057,view,0,1,34,Russia,Degtyarsk,3,Android,...,67970,697043,486469,5091113,123438,1386641,180690,1828124,41879,559936
1,200,4872,view,0,1,34,Russia,Degtyarsk,3,Android,...,67970,697043,486469,5091113,123438,1386641,180690,1828124,41879,559936
2,200,5431,view,0,1,34,Russia,Degtyarsk,3,Android,...,67970,697043,486469,5091113,123438,1386641,180690,1828124,41879,559936
3,200,6829,view,0,1,34,Russia,Degtyarsk,3,Android,...,67970,697043,486469,5091113,123438,1386641,180690,1828124,41879,559936
4,200,3146,view,0,1,34,Russia,Degtyarsk,3,Android,...,67970,697043,486469,5091113,123438,1386641,180690,1828124,41879,559936


In [19]:
import math
from tqdm import tqdm

def upload_dataframe_in_chunks(data, table_name, engine, chunksize=10000):
    total_chunks = math.ceil(len(data) / chunksize)
    for i in tqdm(range(total_chunks), desc=f"Uploading to {table_name}"):
        chunk = data[i * chunksize : (i + 1) * chunksize]
        if_exists = "replace" if i == 0 else "append"
        chunk.to_sql(table_name, con=engine, if_exists=if_exists, index=False, method="multi")

chunksize = 100000
upload_dataframe_in_chunks(data, "a-efimik_features_lesson_22", engine, chunksize=chunksize)


Uploading to a-efimik_features_lesson_22:   0%|          | 3/769 [23:49<100:32:03, 472.48s/it]

In [13]:
df = pd.read_sql('SELECT * FROM a-efimik_features_lesson_22 LIMIT 1000', con=engine) # считываем таблицу

ProgrammingError: (psycopg2.errors.SyntaxError) syntax error at or near "-"
LINE 1: SELECT * FROM a-efimik_features_lesson_22 LIMIT 1000
                       ^

[SQL: SELECT * FROM a-efimik_features_lesson_22 LIMIT 1000]
(Background on this error at: https://sqlalche.me/e/20/f405)

In [19]:
df.head()

Unnamed: 0,index,user_id,post_id,action,target,gender,age,country,city,exp_group,...,entertainment_exp_group_likes,entertainment_exp_group_views,movie_exp_group_likes,movie_exp_group_views,politics_exp_group_likes,politics_exp_group_views,sport_exp_group_likes,sport_exp_group_views,tech_exp_group_likes,tech_exp_group_views


In [20]:
df.shape()

TypeError: 'tuple' object is not callable

# Подготовка данных для инференса

In [None]:
# Убираем ненужные столбцы
X = data.drop(['target', 'action', 'text'], axis=1)

categorical_columns = ['country', 'topic', 'city', 'gender', 'os', 'source']

# Создание ID группы на основе столбца 'user_id'
unique_user_ids = X['user_id'].unique()
group_id_dict = {user_id: idx for idx, user_id in enumerate(unique_user_ids)}
X['group_id'] = X['user_id'].map(group_id_dict)

# Сортировка набора данных для предсказаний по 'group_id'
X = X.sort_values(by='group_id')

# Убедитесь, что категориальные переменные представлены в виде строк
X[categorical_columns] = X[categorical_columns].astype(str)



# Запись фичей в базу данных

# То что будет в сервисе

In [None]:
from catboost import Pool

## TODO: надо просто передать лист с индексами категориальных признаков
# Получение индексов категориальных столбцов
cat_features = [X.drop(columns=['user_id']).columns.get_loc(col) for col in categorical_columns]

# Создание объекта Pool для набора данных предсказаний с колонкой 'group_id' и категориальными признаками
prediction_pool = Pool(X.drop(columns=['user_id']), cat_features=cat_features, group_id=X['group_id'])