In [10]:
import numpy as np
import pandas as pd
from pandas import DataFrame as df
import pyarrow.parquet as pq
import pyarrow as pa
import glob
from dask.distributed import Client
import dask.dataframe as dd
import dask.array as da
from collections import Counter
from tqdm import tqdm, tqdm_notebook
import os
import pickle

In [2]:
pd.options.display.float_format = '{:,.2f}'.format
import matplotlib.pyplot as plt
from pylab import rcParams
rcParams['figure.figsize'] = 10, 5

# <font color='blue'> Загрузка данных

In [15]:
dataset_dir = r'/home/protus/Documents/DATASETS/ML_Boot_camp/Colaborative filtering'
train_col_dirs = glob.glob(dataset_dir+'/train/date*')
SERIALIZED_DIR = './serialized'

In [70]:
def load_run(func_name, *args):
    """Функция повторного запуска
    Позволяет при первом обращении к указанной функции расчитать и сериализовать её ответ,
    и при последующих обращениях загружать уже расчитанные сериализованные значения.
    
    Удобно при работе с функциями требующими длительного времени выполнения"""
    
    file = SERIALIZED_DIR+'/'+func_name.__name__+'.pcl'
    if os.path.isfile(file):
        with open(file, 'rb') as f:
            return pickle.load(f)
    else:
        res = func_name(*args)
        if res:
            with open(file , 'wb') as f:
                pickle.dump(res, f)
        return res

In [None]:
client = Client(n_workers=4, threads_per_worker=2, processes=False, memory_limit='20GB')
client

In [None]:
min_ds = dd.read_parquet(dataset_dir+'/train/date=2018-02-01/*.parquet', engine='pyarrow')
min_ds.head()

In [None]:
min_ds.isna().compute()

In [None]:
dataset_size = len(dd.read_parquet(dataset_dir+'/train/date*/*.parquet', engine='pyarrow', columns=['instanceId_userId']))
print('Размер датасета {0:,}, кол-во столбцов {1}'.format(dataset_size, len(min_ds.columns)))

In [None]:
datset = dd.read_parquet(dataset_dir+'/train/date*/*.parquet', engine='pyarrow')

In [None]:
#среднее колво Nan на колонку
mean_nans = datset.isna().sum(axis=0).mean().compute()/dataset_size*100
mean_nans

In [None]:
#кол-во колонок c пустыми значениями
#среднее кол-во Nan в колонке

### Проверка типов признаков

In [None]:
columns = min_ds.columns
columns

### Проверка вложенных признаков

In [None]:
columns_with_nested_types = [ isinstance(train_dataset[col][10], np.ndarray) for col in train_dataset.select_dtypes('object').columns]
columns_with_nested_types = train_dataset.select_dtypes('object').columns[columns_with_nested_types]
columns_with_nested_types

In [None]:
train_dataset['metadata_options'][0]

# <font color='blue'>Описательные статистики

In [None]:
get_dd_column = lambda column: dd.read_parquet(dataset_dir+'/train/date*/*.parquet', columns=column, engine='pyarrow')
get_dd_column_unique = lambda column: get_dd_column([column])[column].unique().compute()

In [None]:
print("Кол-во уникальных пользователей {0:,}\n\
Кол-во уникальных обьектов {1:,}".format(\
    get_dd_column_unique('instanceId_userId').count(),
    get_dd_column_unique('instanceId_objectId').count()))

### Items statistics

In [None]:
%%time
#CALC ITEMS EVALUATIONS STATISTICS
feedback = get_dd_column(['feedback'])
feedback_unique = set()
temp = feedback['feedback'].map(lambda x: feedback_unique.update(x)).compute()

In [None]:
feedback_unique

In [None]:
client.cancel(feedback)
client.cancel(temp)

#### Распределение типов контента

In [None]:
%%time
#ITEMS TYPES STATISTICS 
#распределение типов контента
instanceId_objectTypes = get_dd_column(['instanceId_objectType']).groupby(by='instanceId_objectType')\
    ['instanceId_objectType'].count().compute()
print("Типы контента и частоты {0}".format(instanceId_objectTypes.map("{:,}".format)))

In [None]:
def plot_counter(counter):
    plt.bar(counter.keys(), counter.values())
    plt.xticks(rotation=45)

#### Распределение количества оценок пользователей по контенту Post

In [None]:
%%time
#выборка по Post типу контента 
data = get_dd_column(['instanceId_objectType', 'feedback'])
post_feedback = data.loc[data.instanceId_objectType=='Post','feedback'].compute()
#распределение кол-ва оценок на пользователя
f = np.vectorize(lambda x: len(x))
post_feedback_distr = Counter(f(post_feedback))
print(post_feedback_distr)
plot_counter(post_feedback_distr)

#### Распределение оценок пользователей c одной оценкой по обьекту по контенту Post

In [None]:
%%time
def selected_values(x):
    if len(x)==1:
        return x[0]
    
f = np.vectorize(selected_values)
one_feedback_distr = Counter(f(post_feedback))
print(one_feedback_distr)
plot_counter(one_feedback_distr)

In [None]:
client.cancel(post_feedback)
post_feedback = None

#### Какое кол-во оценок ставят пользователи?

In [None]:
user_values = get_dd_column(['instanceId_userId', 'feedback']).\
                groupby('instanceId_userId')['instanceId_userId'].count().compute()

user_values = Counter(user_values)
user_values.most_common()

In [None]:
# в % 
user_activity = list(zip(*user_values.most_common()))
perc = np.array(user_activity[1])/np.array(user_activity[1]).sum()*100
plt.plot(perc[:40])

In [None]:
#Кол-во оценок на пользователя
user_act = list(user_activity[0])
user_act.sort()
plt.plot(user_act)

In [None]:
sum(perc[:4])

- Преобладающее большинство пользователей ставят от 1 до 3-х оценок
- Более 50% пользователей ставят от 1 до 5 оценок.
- Большинство пользователей - 17% ставят 2 оценки
- Есть пользователи с аномальным кол-вом оценок больше 1000 -видимо это боты

### Анализ активности пользователей по времени

In [None]:
%%time
from datetime import datetime, date

user_date_time = get_dd_column(['audit_timestamp', 'instanceId_userId'])
user_date_time['audit_timestamp'] = user_date_time['audit_timestamp']\
                .map(lambda x:  datetime.fromtimestamp(x//1000))
user_date_time['month_day'] = user_date_time['audit_timestamp']\
                .map(lambda x: str(x.month)+'_'+str(x.day))
user_date_time['hour'] = user_date_time['audit_timestamp']\
                .map(lambda x: x.hour)
user_date_time['weekday'] = user_date_time['audit_timestamp']\
                .map(lambda x: x.weekday())
user_date_time = user_date_time.compute()

In [None]:
%%time
month_day = user_date_time.groupby('month_day')['month_day'].count()
month_day.plot(kind='bar')

In [None]:
%%time
hours = user_date_time.groupby('hour')['hour'].count()
hours.plot(kind='bar')

In [None]:
%%time
weekdays = user_date_time.groupby('weekday')['weekday'].count()
weekdays.plot(kind='bar')

In [None]:
client.cancel(user_date_time)
user_date_time = None
month_day = None
hours = None
weekdays = None

- каких-то существенных аномальных перекосов в пользовательской активности не выявлено
- видны недельные циклы пользовательской активности
- ожидалось что в даты близкие к 8 марта и 23 февраля могут быть какие нибудь аномальные активности, но эта гипотеза не подтвердилась

# <font color='blue'>Подготовка тренировочного и валидационного датасета

## Подготовка обучающего датасета

### Семплирование обучающего датасета
- Использовать весь датасет (31млн. строк) для обучения не рентабельно, поэтому просемплируем рандомно небольшую выборку размером с 1,5 млн записей

In [None]:
def dataset_random_sampling(dataset_directory, sample_size=1500000):
    """Семплирование датасета из parquet файлов
    Случайная выборка указанного размера из parquet файлов 
    """
    print('Семплирование датасета')
    sampled_dataset = df(columns=min_ds.columns)
    sample_size = sample_size//len(dataset_directory)

    for file in tqdm_notebook(train_col_dirs):
        sampled_dataset = pd.concat([sampled_dataset, pq.read_table(file+'/').to_pandas()\
                           .sample(n=sample_size, random_state=10)], ignore_index=True)
        
    return sampled_dataset

### Разворачиваем вложенные признаки переменной 'metadata_options'

In [None]:
def metadata_options_binaryze(data):
    metadata_dict = [Counter(x) for x in tqdm_notebook(data['metadata_options'])]
    return df(metadata_dict).fillna(0).astype(int)

### Информация о частоте использования контента в процессе ваимодействия пользователя с ресурсом

- В данных содержится информация посещениях пользователя в разное время. 
- В разные моменты времени кол-во информации о посещенном и оцененном контенте разное. 
- Поскольку это может сильно влиять на точность модели - то это обстоятельство надо учесть и ввести переменную user_activity.

In [None]:
#добавляем перемнную учета пользовательской активности(кол-ва просмотренных обьектов) на текущий момент 
def add_user_activity(data):
    data['user_activity'] = 1
    data['user_activity'] = data.groupby('instanceId_userId')['user_activity'].cumsum()
    return data

### Препроцессинг

In [None]:
def preprocessing(data):
    #сортируем по времени
    data.sort_values(by=['audit_timestamp'], inplace=True)
    #добавляем пользовательскую активность
    data = add_user_activity(data)
    #бинаризация 'metadata_options'
    data = pd.concat([data, metadata_options_binaryze(data)], axis=1)
    data.drop(['metadata_options'], axis=1,  inplace=True)
    
    return data

In [None]:
train_dataset = preprocessing(train_dataset)

In [None]:
## Рассчитываем таргет переменную, запоняем пропуски
target = train_dataset['feedback'].map(lambda x: 1 if 'Liked' in x else 0).values

train_dataset.drop(['feedback'], axis=1, inplace=True)
train_dataset.fillna(0, inplace=True, downcast=False)
## Проверка
train_dataset.isna().sum().sum()

### Проверяем категориальные признаки их типы
- те признаки которые являются числами, но являются строками - пытаемся сконвертировать в число

In [None]:
train_dataset.select_dtypes('object').columns

In [None]:
object_type_columns = train_dataset.select_dtypes('object').columns
converted_columns = []

for column in object_type_columns:
    try:
        train_dataset.loc[:, column] = train_dataset.loc[:, column].astype(int)
        converted_columns.append(column)
    except Exception as e:
        print(column, 'not converted', train_dataset.loc[0, column])

In [None]:
train_dataset.select_dtypes('object').columns

In [None]:
converted_columns

### Делим датасет на TRAIN и VALIDATION с балансировкой по классам
- в трейне и тесте должно оказаться приблизительно пропорциональное кол-во класса 1 и 0

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_valid, y_train, y_valid = train_test_split(train_dataset, target, test_size=0.30, random_state=42, stratify=target)

X_train.reset_index(inplace=True)
X_valid.reset_index(inplace=True)

X_train = X_train.drop(['index'], axis=1)
X_valid = X_valid.drop(['index'], axis=1)

In [None]:
X_valid.dtypes

### Сериализуем данные для последующего обучения модели

In [None]:
def pickle_files(data, name):
    with open(name+'.pcl', 'wb') as f:
        pickle.dump(data, f)
        
pickle_files((X_train, y_train), 'train_dataset')
pickle_files((X_valid, y_valid), 'valid_dataset')

#убираем ссылки на обьекты для удаления интерпретатором
# X_train, y_train, y_valid, train_dataset, target = None, None, None, None, None

## Подготовка тестового датасета

In [None]:
test_dataset = pq.read_table(dataset_dir+'/test/').to_pandas(use_threads=8)
test_dataset.shape

In [None]:
test_dataset = preprocessing(test_dataset)

In [None]:
#converted_columns
test_dataset.select_dtypes('object').columns

In [None]:
#непересекающиеся столбцы в 2-х датасетах
set(X_valid.columns)^set(test_dataset.columns)

In [None]:
test_dataset.drop(['date'], axis=1, inplace=True)
test_dataset.fillna(0, inplace=True)
## Проверка
test_dataset.isna().sum().sum()

In [None]:
pickle_files(test_dataset, 'test_dataset')

# <font color='blue'>Проверка пересечений ключевых признаков 

In [None]:
hash(1)

In [None]:
#images
#objects
#texts
#users
#types

In [None]:
train_dataset.columns

In [None]:
train_dataset['instanceId_objectId']

In [None]:
test_dataset['ImageId']

In [None]:
len(train_dataset)

In [None]:
len(set(train_dataset['instanceId_objectId'])&set(test_dataset['instanceId_objectId']))

# ВЫВОДЫ:

- ввв