In [None]:
import os

import numpy as np
import pandas as pd

from sklearn.preprocessing import normalize, MinMaxScaler, StandardScaler
from sklearn.decomposition import IncrementalPCA
from sklearn.linear_model import LinearRegression
from sklearn.multioutput import MultiOutputRegressor

from catboost import CatBoostRegressor, sum_models

from typing import Text, Dict, Tuple, List, Callable, Type, Optional

import yaml
import joblib

import warnings

warnings.filterwarnings("ignore")

from tqdm.auto import tqdm

In [None]:
config_path = '../config/params.yml'
config = yaml.load(open(config_path), Loader=yaml.FullLoader)

preproc = config['preprocessing']
training = config['train']
evaluate = config['evaluate']

# Import

In [None]:
def dtypes_convert(df: pd.DataFrame) -> pd.DataFrame:
    """
    Преобразование числовых полей датафрейма к меньшей размерности для экономии вычислительных ресурсов
    :param df: датафрейм
    """
    fcols = df.select_dtypes('float').columns
    icols = df.select_dtypes('integer').columns

    df[fcols] = df[fcols].apply(pd.to_numeric, downcast='float')
    df[icols] = df[icols].apply(pd.to_numeric, downcast='unsigned')

    return df

In [None]:
def sg_transform(df: pd.DataFrame) -> pd.DataFrame:
    """
    Получение данных об их принадлежности к нитям
    :param df: датафрейм sensor_geometry
    """
    df_out = df.copy()
    # Номер нити, к которой относится датчик
    df_out[preproc['line_column']] = df_out[preproc['sensor_column']] // 60 + 1
    # Флаг принадлежности датчика к центральным нитям
    #df['core'] = (df['line_id'] > 78).astype('uint8')

    return df_out

Импортируем таблицы с метаданными, необходимыми для инференса по тестовому пакету batch:

In [None]:
# Sensor geometry data
file_path = preproc['file_dirs']['sensor_geometry'][
    'local_dir'] + '/' + preproc['file_dirs']['sensor_geometry']['filename']
sensor_geometry = pd.read_csv(file_path).set_index(preproc['sensor_column'])

# Test meta data
file_path = evaluate['file_dirs']['test_meta']['local_dir'] + '/' + evaluate[
    'file_dirs']['test_meta']['filename']
test_meta = pd.read_parquet(file_path).set_index(preproc['event_column'])
test_meta = dtypes_convert(test_meta)

del file_path

In [None]:
test_meta

Далее потребуется таблица с пакетом тестовых данных batch, но ее будем импортировать с непосредственной трансформацией данных

# Preprocessing

In [None]:
def cartesian_to_spherical(cart: np.ndarray) -> np.ndarray:
    """
    Обратное преобразование вектора декартовых координат в сферическую систему
    :param cart: масив сферических координат в формате [x, y, z]
    """
    x = cart[:, 0]
    y = cart[:, 1]
    z = cart[:, 2]
    rxy_sq = x**2 + y**2
    r = np.sqrt(rxy_sq + z**2)
    zenith = np.arctan2(np.sqrt(rxy_sq), z)
    zenith = np.where(zenith < 0, zenith + 2 * np.pi, zenith)
    azimuth = np.arctan2(y, x)
    azimuth = np.where(azimuth < 0, azimuth + 2 * np.pi, azimuth)

    return np.array([azimuth, zenith], dtype='float32').T

In [None]:
def flatten_multiindex(df: pd.DataFrame, isjoin: bool = True) -> list:
    """
    Приведение иерархического мультииндекса стобцов в плоский вид
    :param df: датафрейм
    :param isjoin: объединение имен через '_', иначе - отбрасываем второй уровень
    """
    if isjoin:
        result = [
            '_'.join(col).strip()
            if len(col[1]) > 0 else ' '.join(col).strip()
            for col in df.columns.values
        ]
    else:
        result = [col[0] for col in df.columns.values]
    return result

In [None]:
def batch_prepare(batch_df: pd.DataFrame,
                  drop_aux: bool = True,
                  doms_agg: bool = True) -> pd.DataFrame:
    """
    Подготовка батча данных: агрегация импульсов, нормализация времен (отсчет от 0)
    :param batch_df: датафрейм с батчем данных импульсов
    :param drop_aux: флаг для отброса ненадежных импульсов с auxiliary==True
    :param doms_agg: флаг для агрегирования импульсов и их времени по каждому датчику,
                     иначе - на выход подаются все необработанные импульсы, в том числе,
                     если в одном событии присутствуют импульсы с одного модуля
    """
    if drop_aux:
        batch_df = batch_df[batch_df[preproc['aux_column']] == False]
    if doms_agg:
        batch_df = batch_df.groupby([
            preproc['event_column'], preproc['sensor_column']
        ]).agg(preproc['prepare_aggregators']).reset_index()
        batch_df.columns = flatten_multiindex(batch_df, isjoin=False)
    # Находим времена импульсов относительно начала событий
    times = batch_df.groupby(preproc['event_column']).agg(
        preproc['time_aggregator']).reset_index()
    times.columns = flatten_multiindex(times)
    min_time_column = list(times.columns)[-1]
    batch_df = batch_df.merge(times, on=preproc['event_column'], how='left')
    batch_df[preproc['time_column']] = (batch_df[preproc['time_column']] -
                                        batch_df[min_time_column])
    batch_df.drop(columns=[min_time_column], inplace=True)

    return batch_df

In [None]:
def cut_pulses(batch_df: pd.DataFrame,
               max_pulses: int = 128,
               drop_aux: bool = True) -> pd.DataFrame:
    """
    Выкидываем последние и ненадёжные импульсы в событии если их больше max_pulses
    :param batch_df: датафрейм с импульсами
    :param max_pulses: количество импульсов на одно событие (отсечка)
    :param drop_aux: флаг для отброса ненадежных импульсов с auxiliary==True
    """
    if drop_aux:
        batch_df = batch_df.sort_values(
            [preproc['event_column'], preproc['time_column']])
    else:
        batch_df = batch_df.sort_values([
            preproc['event_column'], preproc['aux_column'],
            preproc['time_column']
        ])
    batch_df = batch_df.reset_index(drop=True)
    batch_df = batch_df.groupby(preproc['event_column']).head(max_pulses)
    batch_df = batch_df.reset_index(drop=True)
    if not drop_aux:
        batch_df = batch_df.sort_values(
            [preproc['event_column'], preproc['time_column']])
        batch_df = batch_df.reset_index(drop=True)

    return batch_df

In [None]:
def get_event_features(batch_df: pd.DataFrame,
                       apply_aux: bool = True) -> pd.DataFrame:
    """
    Набираем агрегированные фичи, характеризующие событие в целом
    :param batch_df: датафрейм с импульсами и координатами датчиков
    :param apply_aux: флаг обнуления фичей при отсутствии надежных импульсов
    """
    for key, mult in preproc['multiplication'].items():
        batch_df[key] = batch_df[mult[0]] * batch_df[mult[1]]

    if apply_aux:
        for col in batch_df.columns:
            if col not in [
                    preproc['event_column'], preproc['sensor_column'],
                    preproc['line_column']
            ]:
                batch_df[col] = batch_df[col] * (
                    1 - batch_df[preproc['aux_column']])

    batch_df = batch_df.groupby(preproc['event_column']).agg(
        preproc['aggregators'])
    batch_df.columns = flatten_multiindex(batch_df, isjoin=True)
    batch_df = batch_df.reset_index()

    for key, dision in preproc['divisions'].items():
        batch_df[key] = np.log10(batch_df[dision[0]] / batch_df[dision[1]])

    for col in preproc['log_scale_transform']:
        batch_df[col] = np.log(1 + batch_df[col])

    for key, col in preproc['log_features'].items():
        batch_df[key] = np.log10(batch_df[col]) / 10

    batch_df.drop(columns=preproc['drop_columns'], inplace=True)
    # На случай ошибок аггрегирования std значений
    batch_df.fillna(value=0, inplace=True)

    return batch_df

In [None]:
def batch_transform(batch_parquet_path: str,
                    sensor_geometry: pd.DataFrame,
                    meta: pd.DataFrame,
                    max_pulses: int = 128,
                    drop_aux: bool = True,
                    doms_agg: bool = True,
                    is_evaluate: bool = False):
    """
    Трансформация батча импульсов в таблицу событий с фичами, характеризующими каждое из них
    :param batch_parquet_path: путь к датафрейму батча с импульсами
    :param sensor_geometry: датафрейм с геометрией датчиков аппарата IceCube
    :param meta: метаданные событиий с векторами направления нейтрино в сф. координатах
    :param max_pulses: количество импульсов на одно событие (отсечка)
    :param drop_aux: флаг для отброса ненадежных импульсов с auxiliary==True
    :param doms_agg: флаг для агрегирования импульсов и их времени по каждому датчику,
                     иначе - на выход подаются все необработанные импульсы, в том числе,
                     если в одном событии присутствуют импульсы с одного модуля
    :param is_evaluate: флаг того, что батч не предназначен для обучения (не имеет таргета)
    """
    batch_df = pd.read_parquet(batch_parquet_path)

    # Подготовка данных импульсов перед преобразованием пакета
    batch_df = batch_prepare(batch_df=batch_df,
                             drop_aux=drop_aux,
                             doms_agg=doms_agg)

    # Выкидываем последние и ненадёжные импульсы в событии если их больше max_pulses
    batch_df = cut_pulses(batch_df=batch_df,
                          max_pulses=max_pulses,
                          drop_aux=drop_aux)
    # Объединяем с геометрией датчиков для получения координат
    batch_df = batch_df.merge(sensor_geometry,
                              on=preproc['sensor_column'],
                              how='left')

    # Набираем агрегированные фичи, характеризующие событие в целом
    batch_df = get_event_features(batch_df, apply_aux=True)

    # Добавляем целевые переменные в сферических и декартовых координатах
    if not is_evaluate:
        batch_df[preproc['target_columns']] = batch_df.merge(
            meta, on=preproc['event_column'],
            how='left')[preproc['target_columns']]
        # Для снижения нелинейности задачи преобразуем к декартовым координатам
        batch_df[preproc['target_cart_columns']] = spherical_to_cartesian(
            batch_df[preproc['target_columns']].to_numpy())

    # Итоговый датафрейм
    batch_df.set_index(preproc['event_column'], drop=True, inplace=True)
    batch_df = dtypes_convert(batch_df)

    return batch_df

In [None]:
def pipeline_preprocess_evaluate() -> pd.DataFrame:
    """
    Преобразование всего датачета батчей с импульсами в формат датафреймов для инференса 
    и сохраняет обработанные файлы в отдельной папке
    """
    sg_file_path = preproc['file_dirs']['sensor_geometry'][
        'local_dir'] + '/' + preproc['file_dirs']['sensor_geometry']['filename']
    sensor_geometry = dtypes_convert(pd.read_csv(sg_file_path))
    sensor_geometry = sg_transform(sensor_geometry)
    meta_file_path = evaluate['file_dirs']['test_meta'][
        'local_dir'] + '/' + evaluate['file_dirs']['test_meta']['filename']
    test_meta = dtypes_convert(pd.read_parquet(meta_file_path))
    #test_meta.set_index(preproc['event_column'], inplace=True)

    # Проверяем наличие batch-файлов
    src_filepath = evaluate['file_dirs']['test_batch']['local_dir']
    src_filename = evaluate['file_dirs']['test_batch']['filename']
    src_path = src_filepath + src_filename
    src_check_file = os.path.isfile(src_path)
    if src_check_file:
        # Обрабатываем batch-файл и берем из него сэмпл эвентов
        batch_data = batch_transform(batch_parquet_path=src_path,
                                     sensor_geometry=sensor_geometry,
                                     meta=test_meta,
                                     max_pulses=10000,
                                     drop_aux=True,
                                     doms_agg=True,
                                     is_evaluate=True)
    else:
        batch_data = None
        print(f'Batch ID #{batch_id} file {src_path} is missing. Skipping')
        
    return batch_data

In [None]:
batch_df_eval = pipeline_preprocess_evaluate()

In [None]:
batch_df_eval.head()

In [None]:
batch_df_eval.info()

# Evaluate

In [None]:
model = joblib.load(training['model_path'])
y_pred = model.predict(batch_df_eval)

In [None]:
y_pred

In [None]:
prediction = cartesian_to_spherical(y_pred)
prediction

In [None]:
cols = preproc['target_columns']
df = pd.DataFrame(prediction, columns=cols, index=batch_df_eval.index, dtype=float)

In [None]:
df

In [None]:
src_filepath = evaluate['file_dirs']['test_batch']['local_dir']
src_filename = evaluate['file_dirs']['test_batch']['filename']
src_path = src_filepath + src_filename
batch_661 = pd.read_parquet(src_path)#.reset_index()

In [None]:
batch_661 = batch_661.groupby([preproc['event_column'], preproc['sensor_column']
                   ]).agg(preproc['prepare_aggregators']).reset_index()
batch_661.columns = flatten_multiindex(batch_661, isjoin=False)

In [None]:
batch_661