## Тестовое задание на позицию Data Scientist

### 1. Data Science \ Data Analysis

Необходимо построить модель, которая на основании поступающих каждую минуту данных определяет качество продукции, производимое на единице оборудования.

Единица оборудования состоит из 5 одинаковых по размеру камер, в каждой камере установлено по 3 датчика температур. Также имеются данные о высоте слоя сырья и его влажности. Высота слоя и влажность измеряются при входе сырья в машину. Сырье проходит через единицу оборудования за 1 час.

Данные с показателями работы оборудования содержатся в файле x_train.csv:

| Название тега  | Описание тега |
| :- | -: |
| T_data_1_1	| 1-й датчик в 1-й камере |
| T_data_1_2	| 2-й датчик в 1-й камере |
| T_data_1_3	| 3-й датчик в 1-й камере |
| T_data_2_1	| 1-й датчик во 2-й камере |
| T_data_2_2	| 2-й датчик во 2-й камере |
| T_data_2_3	| 3-й датчик во 2-й камере |
| T_data_3_1	| 1-й датчик в 3-й камере |
| T_data_3_2	| 2-й датчик в 3-й камере |
| T_data_3_3	| 3-й датчик в 3-й камере |
| T_data_4_1	| 1-й датчик в 4-й камере |
| T_data_4_2	| 2-й датчик в 4-й камере |
| T_data_4_3	| 3-й датчик в 4-й камере |
| T_data_5_1	| 1-й датчик в 5-й камере |
| T_data_5_2	| 2-й датчик в 5-й камере |
| T_data_5_3	| 3-й датчик в 5-й камере |
| H_data	| Высота слоя |
| AH_data	| Влажность сырья |


Качество продукции измеряется в лаборатории по пробам, которые забираются каждый час, данные по известным анализам содержатся в файле y_train.csv. В файле указано время забора пробы, проба забирается на выходе из единицы оборудования.

Вы договорились с заказчиком, что оценкой модели будет являться показатель MAE. 

Для оценки модели необходимо сгенерировать предсказания за период, указанный в файле x_test.csv (1184 предикта).

Результатом является воспроизводимый Python код в данной тетрадке + приложенный csv файл с предсказаниями на тестовой выборке.

In [1]:
import torch
import pandas as pd
import numpy as np
import seaborn as sns
from tqdm import tqdm
from math import trunc
from datetime import datetime
from matplotlib import pyplot as plt
from torch.utils.data import TensorDataset, DataLoader

### EDA

In [2]:
x_train = pd.read_csv('x_train.csv')
y_train = pd.read_csv('y_train.csv')
x_test = pd.read_csv('x_test.csv')

In [3]:
x_train.head()

Unnamed: 0.1,Unnamed: 0,timestamp,T_data_1_1,T_data_1_2,T_data_1_3,T_data_2_1,T_data_2_2,T_data_2_3,T_data_3_1,T_data_3_2,T_data_3_3,T_data_4_1,T_data_4_2,T_data_4_3,T_data_5_1,T_data_5_2,T_data_5_3,H_data,AH_data
0,0,2015-01-01 00:00:00,212,210,211,347,353,347,474,473,481,346,348,355,241,241,243,167.85,9.22
1,1,2015-01-01 00:01:00,212,211,211,346,352,346,475,473,481,349,348,355,241,241,243,162.51,9.22
2,2,2015-01-01 00:02:00,212,211,211,345,352,346,476,473,481,352,349,355,242,241,242,164.99,9.22
3,3,2015-01-01 00:03:00,213,211,211,344,351,346,477,473,481,355,349,355,242,241,242,167.34,9.22
4,4,2015-01-01 00:04:00,213,211,211,343,350,346,478,473,482,358,349,355,243,241,242,163.04,9.22


In [4]:
y_train.head()

Unnamed: 0.1,Unnamed: 0,timestamp,target
0,0,2015-01-04 00:05:00,392
1,1,2015-01-04 01:05:00,384
2,2,2015-01-04 02:05:00,393
3,3,2015-01-04 03:05:00,399
4,4,2015-01-04 04:05:00,400


По данному описанию качества продукции мы видим, что стандартной отклонение равно 46, запомним это значение в качестве отправной точки для baseline.

In [5]:
y_train['target'].describe()

count    28000.000000
mean       402.388857
std         46.287060
min        221.000000
25%        372.000000
50%        408.000000
75%        438.000000
max        505.000000
Name: target, dtype: float64

### Data preprocessing

Для удобства сразу приведем данные о времени из типа string к типу datetime.

In [6]:
x_train['timestamp'] = pd.to_datetime(x_train['timestamp'])
y_train['timestamp'] = pd.to_datetime(y_train['timestamp'])
x_test['timestamp'] = pd.to_datetime(x_test['timestamp'])

Данные о качестве продукции начинаются с более позднего времени, чем данные с показателями работы оборудования, поэтому для удобства формирования тренировочного датасета обрежем данные с показателями работы обородувания до начала первого эксперимента, указанного в данных о качестве продукции.

In [7]:
def get_time_delta(start_time: datetime, end_time: datetime) -> float:
    """Returns time difference in minutes.
    """
    return (end_time - start_time).total_seconds() / 60 if start_time < end_time else -1

In [8]:
def delete_data_before_start(data: pd.Series, start_time: datetime) -> None:
    """Deletes non-actual by time data inplace.
    """
    start = 0
    while (get_time_delta(data['timestamp'].iloc[start], start_time) > 59):
        start += 1

    data.drop(data.index[0:start], inplace=True)
    data.reset_index(drop=True, inplace=True)

In [9]:
delete_data_before_start(x_train, y_train['timestamp'].iloc[0])

#### Mean Baseline

В качестве baseline рассмотрим следующую конструкцию, которая чуть более сложная, чем выводить просто среднее по таргетам для любого значения. 

А именно:

    - выберем данные с показателями работы для каждого эксперимента (данные за час)
    - посчитаем среднее по каждому столбцу для каждого такого набора
    - по полученной строчке для эксперимента будем предсказывать значение качества продукции

In [10]:
def get_base_preprocessed_data(data: pd.Series, end_time: datetime) -> pd.Series:
    """Returns data aggregating mean value per hour.
    
    Args:
        data: Source data.
        end_time: Time of the last experiment.
        
    Returns:
        A dataframe that contains preprocessed data from the corresponding input dataframe.
    """
    cols = list(data)
    cols.remove('Unnamed: 0')
    
    df = pd.DataFrame(columns=cols)
    cols.remove('timestamp')
    
    for idx in tqdm(range(trunc(len(data) / 60))):
        if (get_time_delta(data['timestamp'].iloc[idx], end_time) != -1):
            row = data.loc[60*idx:60*idx+59, cols].mean()
            row['timestamp'] = data['timestamp'].iloc[60*idx+59]
            df.loc[idx] = row
            
    return df

In [11]:
x_train_base_ = get_base_preprocessed_data(x_train, y_train['timestamp'].iloc[-1])

100%|████████████████████████████████████████████████████████████████████████████████████████████████| 28000/28000 [03:31<00:00, 132.39it/s]


In [12]:
y_train_base = y_train['target']

In [13]:
x_train_base = x_train_base_.iloc[:, 1:]

#### Neural network preprocessing

В качестве более сложного подхода, чем описанный ранее mean baseline, реализуем архитектуру нейронной сети для данной задачи.


Для этого сформируем датасет, где каждый элемент будет представлять собой массив, состоящий из массива, размера $(59, 17)$, содержащий все данные об эксперименте и массива, состоящего из соотвествующего значения качества продукции.

In [14]:
def get_nn_preprocessed_data(data: pd.Series) -> pd.Series:
    """Returns data aggregating by experiment.
    
    Args:
        data: Source data.
        
    Returns:
        A dataframe that contains preprocessed data from the corresponding input dataframe.
    """
    X = []
    data = data.iloc[:, 2:]

    for idx in tqdm(range(trunc(len(data) / 60))):
        X.append(data.iloc[60*idx:60*idx+59].to_numpy())
    
    return X

In [15]:
X = get_nn_preprocessed_data(x_train)

100%|███████████████████████████████████████████████████████████████████████████████████████████████| 28000/28000 [00:03<00:00, 8857.09it/s]


In [16]:
y = list(map(lambda x: [x], y_train['target']))

In [17]:
tensor_x = torch.Tensor(X) 
tensor_y = torch.Tensor(y)

dataset = TensorDataset(tensor_x,tensor_y)

train_size = int(0.9 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

train_dataloader = DataLoader(train_dataset, batch_size=256, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=256, shuffle=True)

### Algo

In [18]:
from sklearn.model_selection import GridSearchCV
from sklearn.linear_model import LinearRegression
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler
from sklearn.compose import TransformedTargetRegressor
from sklearn.metrics import mean_absolute_error
import xgboost as xgb

%load_ext tensorboard
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.tensorboard import SummaryWriter
import datetime, os

In [19]:
results = {}

#### Mean

В качестве самого простого решения попробуем выводить среднее по таргетам для любого значения.

In [20]:
mean_score = mean_absolute_error(y_train_base, [y_train_base.mean()]*len(y_train_base))

In [21]:
results['Mean'] = [mean_score]

#### Baseline

Для решения с датасетом из предобработанных средних для каждого эксперимента возьмем простую линейную модель и xgboost.

In [26]:
def get_model_results(model, params={}):
    name = type(model.named_steps['model']).__name__
    model = GridSearchCV(model, param_grid=params, scoring='neg_mean_absolute_error', cv=3, verbose=1, n_jobs=-1)
    model.fit(x_train_base, y_train_base)
    results[name]=[-model.best_score_]
    print(results[name])

In [27]:
lr = Pipeline(steps=[('normalize', MinMaxScaler()), ('model', LinearRegression())])
get_model_results(lr)

Fitting 3 folds for each of 1 candidates, totalling 3 fits
[14.009556091343208]


In [28]:
get_model_results(Pipeline(steps=[('normalize', MinMaxScaler()), ('model', xgb.XGBRegressor())]))

Fitting 3 folds for each of 1 candidates, totalling 3 fits
[9.070946138424569]


#### Neural Network

Реализуем несложную архитектуру нейронной сети. \
В качестве функции потерь для тренировки будем использовать MSE, так как она дифференцируема.

In [22]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()

        def block(in_feat, out_feat, normalize=True):
            layers = [nn.Linear(in_feat, out_feat)]
            if normalize:
                layers.append(nn.BatchNorm1d(out_feat, 0.8))
            layers.append(nn.LeakyReLU(0.2, inplace=True))
            return layers

        self.pred = nn.Sequential(
            *block(1003, 512),
            *block(512, 256),
            *block(256, 16),
            torch.nn.Linear(16, 1),
        )
    
    def forward(self, x):
        x = x.view(x.size(0), -1)
        return self.pred(x)

In [23]:
net = Net()
optimizer = torch.optim.Adam(net.parameters(), lr=0.01)
loss_func = torch.nn.MSELoss()
l1_loss = torch.nn.L1Loss()

writer = SummaryWriter()

for epoch in tqdm(range(48)):
    for step, (batch_x, batch_y) in enumerate(train_dataloader): 
        
        b_x = Variable(batch_x)
        b_y = Variable(batch_y)

        prediction = net(b_x)     

        loss = loss_func(prediction, b_y)  
        writer.add_scalar('MSE/train', loss, step)
        writer.add_scalar('MAE/train', l1_loss(prediction, b_y), step)
        
        optimizer.zero_grad()   
        loss.backward()         
        optimizer.step()    
    
    test_loss = 0 
    with torch.no_grad():
        for data in test_dataloader:
            x, y = data
            pred = net(x)
            test_loss += l1_loss(pred, y)
    test_loss /= len(test_dataloader)
    writer.add_scalar('MAE/test', test_loss, epoch)

torch.save(net.state_dict(), 'net.pth')

2021-07-21 15:10:07.435943: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2021-07-21 15:10:07.435990: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
100%|███████████████████████████████████████████████████████████████████████████████████████████████████████| 48/48 [01:57<00:00,  2.45s/it]


In [24]:
tensorboard --logdir=runs --host=localhost --port=8080

Reusing TensorBoard on port 8080 (pid 37555), started 5:49:22 ago. (Use '!kill 37555' to kill it.)

In [25]:
results['neural_net'] = [test_loss.numpy()]

### Results

Визуализируем полученные результаты с помощью таблички, в которой указан метод и его результаты.

In [29]:
def show_results():
    sorted_res = dict(sorted(results.items(), key=lambda item: item[1][0]))
    v = list(sorted_res.values())
    fst = list(map(lambda x: x[0], v))
    df = pd.DataFrame({"name": list(sorted_res.keys()), "mae": fst})
    with pd.option_context('display.max_rows', None, 'display.max_columns', None, 'display.max_colwidth', None):
        display(df)

In [30]:
show_results()

Unnamed: 0,name,mae
0,neural_net,7.9916077
1,XGBRegressor,9.070946
2,LinearRegression,14.009556
3,Mean,37.763177


### Applying best solution

По результатам нейронная оказалась сеть наиболее удачным подходом.

In [31]:
x = get_nn_preprocessed_data(x_test)
tx = torch.Tensor(x) 
pred = net(tx)

x_base = get_base_preprocessed_data(x_test, x_test['timestamp'].iloc[-1])

100%|█████████████████████████████████████████████████████████████████████████████████████████████████| 1184/1184 [00:00<00:00, 9306.68it/s]
100%|██████████████████████████████████████████████████████████████████████████████████████████████████| 1184/1184 [00:08<00:00, 140.55it/s]


In [32]:
y_test = pd.DataFrame(columns=list(y_train))

y_test['timestamp'] = x_base['timestamp']
y_test['target'] = np.concatenate(pred.detach().numpy())
y_test['Unnamed: 0'] = y_test.index

In [33]:
y_test.to_csv('y_test.csv')

### 2. Software Engineering

#### Напишите класс JsonTransformer для обработки входящих данных в предоставленном файле .json, приведения их к требуемому плоскому формату и сохранению в PostgreSQL

Путь к файлу json -> JsonTransformer -> flat table @ PostgreSQL

Вход - input_example.json
Выход - плоская структура для записи в psql:

`` 
[{
 data_id:"266",
 data_name:"v.2",
 data_sellerId:"355",
 statuses_id:"1766",
 statuses_name:"КС: 10%",
 statuses_amount"100",
 statuses_actionBaseId:"266"},
 ...]
``

(1 строка = 1 элемент внутри ^statuses, statuses_id как уникальный ключ)

In [34]:
import numpy as np
import psycopg2
from functools import reduce

  """)


In [35]:
# "скелет" требуемого класса на Python
import abc
import json


class BaseJsonTransformer(abc.ABC):
    def __init__(self, json_file_path):
        self.json_file_path = json_file_path

    def read_json(self):
        with open(self.json_file_path, 'r') as f:
            return json.load(f)

    @abc.abstractmethod
    def transform_json(self, json_to_transform=None):
        return

    @staticmethod
    def write_to_psql(json, psql_address):
        return

In [38]:
class JsonTransformer(BaseJsonTransformer):
    
    @staticmethod
    def traverse(name, d):
        """Traverses dict into flat structure for psql"""
        res = [{}]
        for k, v in d.items():
            if (k[0] == '^'):
                k = k[1:]
            if isinstance(v, list) and len(v) > 0:
                res_ = []
                for vdict in v:
                    res_.append(JsonTransformer.traverse(k, vdict))
                res_ = np.concatenate(res_)

                nres = []
                for i in res:
                    for j in res_:
                        e = i
                        e.update(j)
                        nres.append(e)     
                res = nres
            elif isinstance(v, dict) and len(v) > 0:
                res = JsonTransformer.traverse(k, v)
            else:
                kname = name + '_' + k if name else k
                for rdict in res:
                    rdict[kname] = v if v else 'null'
        return res
    
    def transform_json(self, json_to_transform=None):
        """Transforms json into flat table for psql."""
        res = []
        json = json_to_transform if json_to_transform else self.read_json()

        for j in json:
            res += JsonTransformer.traverse('', j)
        return res
    
    @staticmethod
    def write_to_psql(json, psql_address="host=localhost dbname=postgres user=postgres password=root"):
        """Writes json in table at the specified address."""
        conn = psycopg2.connect(psql_address)
        cur = conn.cursor()
        #cur.execute("drop table if exists data")
        
        j = json
        cur.execute("create table if not exists data({})".format(reduce(lambda x, y: x + ' text, ' + y, j[0].keys()) + ' text'))

        for j_ in j:
            query = "insert into data values ({})".format("\'" + reduce(lambda x, y: x + "\'"  + " , " + "\'"  + y, j[0].values()) +  "\'")
            cur.execute(query)
        
        conn.commit()
    

In [39]:
trans = JsonTransformer('input_example.json')
j = trans.transform_json()
trans.write_to_psql(j)

### 3. Data Engineering \ Архитектура

*Ситуация*:
- На удалённом сервере есть папка с csv файлами. 
- 1 раз в час в папку поступает новый csv файл (с данными за последний час). 
- В названии каждого файла содержится timestamp, указывающий время обновления файла.
- Файл может содержать ряд неточностей, поэтому каждый новый файл требует обработки.

*Задача*:
- при появлении нового csv файла - в течение 60 секунд запускать его обработку
- по итогу обработки - складывать получившийся результат в PostgreSQL
- весь процесс - мониторить своевременность и успешность обработок


##### Опишите архитектуру сбора и обработки для решения поставленной задачи:
- 150-200 слов - достаточно
- описание конкретных инструментов - приветствуется
- при желании - можно описать всё схемой с минимальными комментариями 

#### Решение:

- каждые 60 секунд запускается скрипт с помощью cron, который проверяет, нет ли csv-файла, который был получен в последние 60 секунд;
    - проверка на наличие осуществляется с помощью вычисления разности между временем запуска скрипта и временем из названия файла;
- если такой файл был найден, то:
    - обрабатываем файл:
        - проверяем на валидность;
        - записываем в PostgreSQL;
    - мониторинг осуществляем с помощью логгинга в Prometeus и Grafana;