In [1]:
# 1. Импорт и настройки =====================================================
import pandas as pd
import numpy as np
from pathlib import Path
import logging
from typing import Tuple

logging.basicConfig(level=logging.INFO)

# 2. Основной пайплайн ======================================================
class DataPreprocessor:
    def __init__(self, data_dir: Path):
        self.data_dir = data_dir
        self.ga_sessions = None
        self.ga_hits = None
        
    def run_pipeline(self):
        """Запуск полного цикла обработки"""
        self.load_data()
        self.validate_initial_data()
        self.process_sessions()
        self.process_hits()
        self.merge_and_finalize()
        self.save_data()
        logging.info("Обработка завершена успешно")

    # 3. Загрузка данных =====================================================
    def load_data(self):
        """Загрузка сырых данных"""
        logging.info("\n" + "="*50 + "\nШАГ 1: ЗАГРУЗКА ДАННЫХ\n" + "="*50)
        try:
            self.ga_sessions = pd.read_pickle(self.data_dir / 'sample_ga_sessions.pkl')
            self.ga_hits = pd.read_pickle(self.data_dir / 'sample_ga_hits.pkl')
            logging.info(f"Загружено сессий: {len(self.ga_sessions):,}")
            logging.info(f"Загружено событий: {len(self.ga_hits):,}")
        except Exception as e:
            logging.error(f"Ошибка загрузки: {str(e)}")
            raise

    # 4. Валидация сырых данных ==============================================
    def validate_initial_data(self):
        """Проверка целостности сырых данных"""
        logging.info("\n" + "="*50 + "\nШАГ 2: ВАЛИДАЦИЯ ДАННЫХ\n" + "="*50)
        
        # Проверка обязательных колонок
        required_columns = {
            'ga_sessions': ['session_id', 'client_id', 'visit_date'],
            'ga_hits': ['session_id', 'hit_date', 'hit_type']
        }
        
        for df_name, cols in required_columns.items():
            df = getattr(self, df_name)
            missing = set(cols) - set(df.columns)
            if missing:
                raise ValueError(f"{df_name} отсутствуют колонки: {missing}")

        # Проверка уникальности session_id
        if self.ga_sessions['session_id'].duplicated().any():
            raise ValueError("Найдены дубликаты session_id в ga_sessions")

    # 5. Обработка сессий ====================================================
    def process_sessions(self):
        """Обработка данных сессий"""
        logging.info("\n" + "="*50 + "\nШАГ 3: ОБРАБОТКА СЕССИЙ\n" + "="*50)
        
        # Типизация временных меток
        self.ga_sessions['visit_datetime'] = pd.to_datetime(
            self.ga_sessions['visit_date'] + ' ' + self.ga_sessions['visit_time'],
            errors='coerce'
        )
        invalid_dates = self.ga_sessions['visit_datetime'].isna().sum()
        logging.info(f"Некорректных дат сессий: {invalid_dates}")
        
        # Обработка разрешения экрана
        self._process_resolution()
        
        # Обработка категориальных признаков
        self._handle_categoricals()
        
        # Удаление технических полей
        self.ga_sessions = self.ga_sessions.drop(
            ['visit_date', 'visit_time', 'device_screen_resolution'], 
            axis=1
        )

    def _process_resolution(self):
        """Обработка device_screen_resolution"""
        # Разделение на ширину и высоту
        res_split = self.ga_sessions['device_screen_resolution'].str.split('x', expand=True)
        
        self.ga_sessions['screen_width'] = pd.to_numeric(res_split[0], errors='coerce')
        self.ga_sessions['screen_height'] = pd.to_numeric(res_split[1], errors='coerce')
        
        # Корректировка ориентации
        mask = self.ga_sessions['screen_width'] > self.ga_sessions['screen_height']
        self.ga_sessions.loc[mask, ['screen_width', 'screen_height']] = \
            self.ga_sessions.loc[mask, ['screen_height', 'screen_width']].values
        
        # Категоризация
        bins = [0, 400, 768, 1280, 1920, 3840, np.inf]
        labels = ['tiny', 'mobile', 'tablet', 'desktop', '2k', '4k+']
        self.ga_sessions['resolution_type'] = pd.cut(
            self.ga_sessions['screen_width'],
            bins=bins,
            labels=labels,
            right=False
        ).cat.add_categories('unknown').fillna('unknown')

    def _handle_categoricals(self):
        """Обработка категориальных признаков"""
        cat_cols = ['utm_source', 'utm_medium', 'device_os', 'geo_city']
        for col in cat_cols:
            self.ga_sessions[col] = (
                self.ga_sessions[col]
                .astype('category')
                .cat.add_categories('unknown')
                .fillna('unknown')
            )

    # 6. Обработка событий ===================================================
    def process_hits(self):
        """Обработка данных событий"""
        logging.info("\n" + "="*50 + "\nШАГ 4: ОБРАБОТКА СОБЫТИЙ\n" + "="*50)
        
        # Конвертация времени
        self.ga_hits['hit_time'] = pd.to_numeric(
            self.ga_hits['hit_time'], 
            errors='coerce'
        ).fillna(0).astype(int)
        
        self.ga_hits['hit_datetime'] = pd.to_datetime(
            self.ga_hits['hit_date']
        ) + pd.to_timedelta(self.ga_hits['hit_time'] // 1000, unit='s')
        
        # Удаление устаревших полей
        self.ga_hits = self.ga_hits.drop(['hit_date', 'hit_time'], axis=1)
        
        # Обработка пропусков
        self.ga_hits['event_action'] = self.ga_hits['event_action'].fillna('unknown')

    # 7. Объединение данных ==================================================
    def merge_and_finalize(self):
        """Создание целевой переменной и финализация"""
        logging.info("\n" + "="*50 + "\nШАГ 5: ФИНАЛИЗАЦИЯ\n" + "="*50)
        
        # Создание целевой переменной
        target_actions = ['sub_button_click', 'view_new_card', 'submit_form']
        self.ga_hits['is_target'] = self.ga_hits['event_action'].isin(target_actions)
        target_df = self.ga_hits.groupby('session_id')['is_target'].max().astype(int)
        
        # Объединение с сессиями
        self.ga_sessions = self.ga_sessions.merge(
            target_df, 
            on='session_id', 
            how='left'
        ).rename(columns={'is_target': 'target'})
        
        # Финализация
        self.ga_sessions['target'] = self.ga_sessions['target'].fillna(0).astype(int)
        self.ga_sessions = self.ga_sessions.drop('client_id', axis=1)
        
        logging.info("Распределение целевой переменной:")
        logging.info(self.ga_sessions['target'].value_counts(normalize=True))

    # 8. Сохранение результатов ==============================================
    def save_data(self):
        """Сохранение обработанных данных"""
        output_dir = self.data_dir.parent / 'processed'
        output_dir.mkdir(exist_ok=True)
        
        self.ga_sessions.to_parquet(output_dir / 'processed_sessions.parquet')
        self.ga_hits.to_parquet(output_dir / 'processed_hits.parquet')
        logging.info(f"Данные сохранены в: {output_dir}")

# 9. Запуск пайплайна =======================================================
if __name__ == "__main__":
    try:
        data_path = Path.cwd().parent.parent / 'data'
        processor = DataPreprocessor(data_path)
        processor.run_pipeline()
    except Exception as e:
        logging.error(f"Ошибка в основном пайплайне: {str(e)}")
        raise

INFO:root:
ШАГ 1: ЗАГРУЗКА ДАННЫХ
INFO:root:Загружено сессий: 1,860,042
INFO:root:Загружено событий: 15,726,470
INFO:root:
ШАГ 2: ВАЛИДАЦИЯ ДАННЫХ
INFO:root:
ШАГ 3: ОБРАБОТКА СЕССИЙ
ERROR:root:Ошибка в основном пайплайне: unsupported operand type(s) for +: 'datetime.date' and 'str'


TypeError: unsupported operand type(s) for +: 'datetime.date' and 'str'