In [8]:
# staging.ipynb
# ----------------------
# 1. Подключение к PostgreSQL
# ----------------------
import pandas as pd
from torgstat.db import get_engine, DB_SCHEMA
import sys
from sqlalchemy import text

try:
    # Создаём движок SQLAlchemy
    engine = get_engine()
    
    # Проверяем реальное подключение с использованием text()
    with engine.connect() as conn:
        result = conn.execute(text("SELECT 1 as test_connection"))
        test_result = result.scalar()
        
    if test_result == 1:
        print("✓ Подключение к PostgreSQL успешно установлено и проверено")
    else:
        print("✗ Ошибка: Не удалось проверить подключение")
        sys.exit(1)
        
except Exception as e:
    print(f"✗ Ошибка подключения к PostgreSQL: {e}")
    sys.exit(1)

# Функция для чтения таблицы из схемы analytics с детальным логированием
def read_table(table_name: str):
    try:
        query = f'SELECT * FROM {DB_SCHEMA}.{table_name};'
        df = pd.read_sql(query, engine)
        
        # Детальная информация о сырых данных
        print(f"\n📊 {table_name.upper()} - СЫРЫЕ ДАННЫХ:")
        print(f"   📏 Строк: {len(df):,}")
        print(f"   🧩 Столбцов: {len(df.columns)}")
        
        # Анализ пропусков
        null_counts = df.isnull().sum()
        if null_counts.sum() > 0:
            print(f"   ⚠️  Пропуски:")
            for col, count in null_counts.items():
                if count > 0:
                    print(f"      - {col}: {count} пропусков ({count/len(df)*100:.1f}%)")
        
        # Анализ типов данных
        print(f"   🔧 Типы данных:")
        for col, dtype in df.dtypes.items():
            print(f"      - {col}: {dtype}")
            
        # Особые проверки для ключевых полей
        if 'amount' in df.columns:
            amount_stats = df['amount'].apply(lambda x: type(x).__name__).value_counts()
            print(f"   💰 Типы значений в amount: {amount_stats.to_dict()}")
            
        if 'status' in df.columns:
            print(f"   🏷️  Уникальные статусы: {df['status'].unique()}")
            
        print(f"✓ Таблица {table_name} успешно загружена")
        return df
        
    except Exception as e:
        print(f"✗ Ошибка загрузки таблицы {table_name}: {e}")
        raise

print("✓ Все проверки пройдены успешно")

✓ Подключение к PostgreSQL успешно установлено и проверено
✓ Все проверки пройдены успешно


In [9]:
# ----------------------
# 2. Читаем сырые таблицы
# ----------------------

df_users = read_table("users")
df_sessions = read_table("sessions")
df_subs = read_table("subscriptions")
df_invoices = read_table("invoices")
df_plans = read_table("plans")
df_events = read_table("events")

# Просмотр первых и случайных строк

display(df_users.head())
display(df_sessions.head())
display(df_subs.head())
display(df_invoices.head())
display(df_plans.head())
display(df_events.head())


print("\n🔍 ДЕТАЛЬНЫЙ АНАЛИЗ ПРОБЛЕМ:")
print("═" * 50)

# Анализ amount в invoices
if 'amount' in df_invoices.columns:
    print("💰 INVOICES.amount:")
    print(f"   Тип данных: {df_invoices['amount'].dtype}")
    print(f"   Уникальные типы значений: {df_invoices['amount'].apply(type).value_counts().to_dict()}")
    print(f"   Отрицательные значения: {(pd.to_numeric(df_invoices['amount'], errors='coerce') < 0).sum()}")
    
# Анализ дат
date_columns = ['signup_date', 'session_date', 'start_date', 'period_start', 'period_end', 'invoice_date', 'event_date']
for col in date_columns:
    for df in [df_users, df_sessions, df_subs, df_invoices, df_events]:
        if col in df.columns:
            print(f"📅 {df.name if hasattr(df, 'name') else 'Unknown'}.{col}:")
            print(f"   Тип: {df[col].dtype}")
            print(f"   Примеры: {df[col].head(3).tolist()}")


📊 USERS - СЫРЫЕ ДАННЫХ:
   📏 Строк: 2,000
   🧩 Столбцов: 3
   ⚠️  Пропуски:
      - region: 109 пропусков (5.5%)
   🔧 Типы данных:
      - user_id: int64
      - signup_date: object
      - region: object
✓ Таблица users успешно загружена

📊 SESSIONS - СЫРЫЕ ДАННЫХ:
   📏 Строк: 5,914
   🧩 Столбцов: 7
   ⚠️  Пропуски:
      - utm_source: 311 пропусков (5.3%)
      - utm_medium: 220 пропусков (3.7%)
   🔧 Типы данных:
      - session_id: object
      - user_id: int64
      - session_date: object
      - utm_source: object
      - utm_medium: object
      - utm_campaign: object
      - is_first_session: bool
✓ Таблица sessions успешно загружена

📊 SUBSCRIPTIONS - СЫРЫЕ ДАННЫХ:
   📏 Строк: 681
   🧩 Столбцов: 5
   🔧 Типы данных:
      - subscription_id: int64
      - user_id: int64
      - plan_id: int64
      - start_date: object
      - status: object
   🏷️  Уникальные статусы: ['churned' 'active']
✓ Таблица subscriptions успешно загружена

📊 INVOICES - СЫРЫЕ ДАННЫХ:
   📏 Строк: 2,205
   

Unnamed: 0,user_id,signup_date,region
0,1,2024-04-12,Казань
1,2,2024-03-15,Казань
2,3,2024-02-22,Санкт-Петербург
3,4,2024-02-27,Москва
4,5,2024-01-15,Новосибирск


Unnamed: 0,session_id,user_id,session_date,utm_source,utm_medium,utm_campaign,is_first_session
0,sess_1_1,1,2024-04-12,google,cpc,generic,True
1,sess_1_2,1,2024-05-03,google,,generic,False
2,sess_1_3,1,2024-07-04,google,cpc,generic,False
3,sess_2_1,2,2024-03-15,referral,referral,friend_ref,True
4,sess_2_2,2,03-18-2024,referral,referral,friend_ref,False


Unnamed: 0,subscription_id,user_id,plan_id,start_date,status
0,1001,3,1,2024-02-26,churned
1,1002,4,1,2024-03-02,churned
2,1003,5,2,2024-01-26,churned
3,1004,7,1,2024-01-20,churned
4,1005,8,1,2024-05-15,churned


Unnamed: 0,invoice_id,subscription_id,user_id,period_start,period_end,invoice_date,amount,paid,is_initial
0,50001,1001,3,2024-02-26,2024-03-27,2024-02-26,-499.0,True,True
1,50002,1001,3,2024-03-27 00:00:00,2024-04-26,2024-03-27,499.0,True,False
2,50003,1001,3,2024-04-26,2024-05-26,2024-04-26,499.0,True,False
3,50004,1001,3,2024-05-26,2024-06-25,2024-05-26,499.0,True,False
4,50005,1002,4,2024-03-02,2024-04-01,2024-03-02,499.0,True,True


Unnamed: 0,plan_id,plan_name,period,price
0,1,Basic,monthly,499.0
1,2,Pro,monthly,999.0
2,3,Business,monthly,1999.0


Unnamed: 0,user_id,event_date,event_name
0,2.0,2024-03-15,app_open
1,2.0,2024-03-19,app_open
2,2.0,2024-03-24,app_open
3,2.0,2024-03-30,app_open
4,2.0,2024-04-03,app_open



🔍 ДЕТАЛЬНЫЙ АНАЛИЗ ПРОБЛЕМ:
══════════════════════════════════════════════════
💰 INVOICES.amount:
   Тип данных: object
   Уникальные типы значений: {<class 'str'>: 2205}
   Отрицательные значения: 20
📅 Unknown.signup_date:
   Тип: object
   Примеры: ['2024-04-12', '2024-03-15', '2024-02-22']
📅 Unknown.session_date:
   Тип: object
   Примеры: ['2024-04-12', '2024-05-03', '2024-07-04']
📅 Unknown.start_date:
   Тип: object
   Примеры: ['2024-02-26', '2024-03-02', '2024-01-26']
📅 Unknown.period_start:
   Тип: object
   Примеры: ['2024-02-26', '2024-03-27 00:00:00', '2024-04-26']
📅 Unknown.period_end:
   Тип: object
   Примеры: ['2024-03-27', '2024-04-26', '2024-05-26']
📅 Unknown.invoice_date:
   Тип: object
   Примеры: ['2024-02-26', '2024-03-27', '2024-04-26']
📅 Unknown.event_date:
   Тип: object
   Примеры: ['2024-03-15', '2024-03-19', '2024-03-24']


In [11]:
# ----------------------
# 3. Создаём staging-таблицы
# ----------------------
from sqlalchemy import types
import numpy as np

# STG_USERS: очистка и нормализация
df_stg_users = df_users.copy()
df_stg_users['signup_date'] = pd.to_datetime(df_stg_users['signup_date'], errors='coerce')
df_stg_users = df_stg_users.drop_duplicates(subset='user_id')
print(f"STG_USERS: {len(df_stg_users)} строк после очистки")
# ДЕТАЛЬНОЕ ЛОГИРОВАНИЕ
print(f"   📅 signup_date преобразован в: {df_stg_users['signup_date'].dtype}")
print(f"   🗑️  Удалено дубликатов: {len(df_users) - len(df_stg_users)}")
print(f"   ⚠️  Пропуски region: {df_stg_users['region'].isnull().sum()}")

# STG_SESSIONS: проверяем дубликаты, типы данных
df_stg_sessions = df_sessions.copy()
df_stg_sessions['session_date'] = pd.to_datetime(df_stg_sessions['session_date'], errors='coerce')
df_stg_sessions = df_stg_sessions.drop_duplicates(subset='session_id')
print(f"STG_SESSIONS: {len(df_stg_sessions)} строк после очистки")
# ДЕТАЛЬНОЕ ЛОГИРОВАНИЕ
print(f"   📅 session_date преобразован в: {df_stg_sessions['session_date'].dtype}")
print(f"   🗑️  Удалено дубликатов: {len(df_sessions) - len(df_stg_sessions)}")
print(f"   ⚠️  Пропуски UTM: {df_stg_sessions['utm_source'].isnull().sum()} source, {df_stg_sessions['utm_medium'].isnull().sum()} medium")

# STG_SUBSCRIPTIONS: статус и даты
df_stg_subs = df_subs.copy()
df_stg_subs['start_date'] = pd.to_datetime(df_stg_subs['start_date'], errors='coerce')

# Безопасная обработка end_date (если колонка существует)
if 'end_date' in df_stg_subs.columns:
    df_stg_subs['end_date'] = pd.to_datetime(df_stg_subs['end_date'], errors='coerce')

df_stg_subs['status'] = df_stg_subs['status'].fillna('unknown').str.upper().str.strip()
print(f"STG_SUBSCRIPTIONS: {len(df_stg_subs)} строк после очистки")
# ДЕТАЛЬНОЕ ЛОГИРОВАНИЕ
print(f"   📅 start_date преобразован в: {df_stg_subs['start_date'].dtype}")
if 'end_date' in df_stg_subs.columns:
    print(f"   📅 end_date преобразован в: {df_stg_subs['end_date'].dtype}")
print(f"   🏷️  Статусы нормализованы: {df_stg_subs['status'].unique()}")
print(f"   ⚠️  Пропуски статусов: {df_stg_subs['status'].isnull().sum()}")

# STG_INVOICES: даты и суммы
df_stg_invoices = df_invoices.copy()
df_stg_invoices['period_start'] = pd.to_datetime(df_stg_invoices['period_start'], errors='coerce')
df_stg_invoices['period_end'] = pd.to_datetime(df_stg_invoices['period_end'], errors='coerce')
df_stg_invoices['invoice_date'] = pd.to_datetime(df_stg_invoices['invoice_date'], errors='coerce')
df_stg_invoices['amount'] = pd.to_numeric(df_stg_invoices['amount'], errors='coerce')
print(f"STG_INVOICES: {len(df_stg_invoices)} строк после очистки")
# ДЕТАЛЬНОЕ ЛОГИРОВАНИЕ
print(f"   💰 amount преобразован в: {df_stg_invoices['amount'].dtype}")
print(f"   📅 Все даты преобразованы в datetime")
print(f"   ⚠️  Отрицательные amount: {(df_stg_invoices['amount'] < 0).sum()}")
print(f"   ⚠️  Пропуски amount: {df_stg_invoices['amount'].isnull().sum()}")

# STG_EVENTS: даты и события (исправлено - используем event_name вместо event_type)
df_stg_events = df_events.copy()
df_stg_events['event_date'] = pd.to_datetime(df_stg_events['event_date'], errors='coerce')

# Обрабатываем event_name (а не event_type)
if 'event_name' in df_stg_events.columns:
    df_stg_events['event_name'] = df_stg_events['event_name'].fillna('unknown').str.upper().str.strip()
else:
    print("⚠️  Колонка 'event_name' отсутствует в events")

print(f"STG_EVENTS: {len(df_stg_events)} строк после очистки")
# ДЕТАЛЬНОЕ ЛОГИРОВАНИЕ
print(f"   📅 event_date преобразован в: {df_stg_events['event_date'].dtype}")
print(f"   👤 user_id преобразован в: {df_stg_events['user_id'].dtype}")
print(f"   ⚠️  Пропуски user_id: {df_stg_events['user_id'].isnull().sum()}")

# ----------------------
# 4. Записываем staging-таблицы обратно в БД
# ----------------------
staging_tables = {
    "stg_users": df_stg_users,
    "stg_sessions": df_stg_sessions,
    "stg_subscriptions": df_stg_subs,
    "stg_invoices": df_stg_invoices,
    "stg_events": df_stg_events
}

# Выводим информацию о колонках для отладки
print("\nСтруктура таблиц:")
for table_name, df in staging_tables.items():
    print(f"{table_name}: {list(df.columns)}")

for table_name, df in staging_tables.items():
    try:
        # Сохраняем с обработкой ошибок
        df.to_sql(
            table_name, 
            engine, 
            schema=DB_SCHEMA, 
            if_exists='replace', 
            index=False
        )
        print(f"✓ Таблица {DB_SCHEMA}.{table_name} сохранена ({len(df)} строк)")
        
    except Exception as e:
        print(f"✗ Ошибка сохранения {table_name}: {e}")

print("✓ Все staging-таблицы обработаны и сохранены")

STG_USERS: 2000 строк после очистки
   📅 signup_date преобразован в: datetime64[ns]
   🗑️  Удалено дубликатов: 0
   ⚠️  Пропуски region: 109
STG_SESSIONS: 5914 строк после очистки
   📅 session_date преобразован в: datetime64[ns]
   🗑️  Удалено дубликатов: 0
   ⚠️  Пропуски UTM: 311 source, 220 medium
STG_SUBSCRIPTIONS: 681 строк после очистки
   📅 start_date преобразован в: datetime64[ns]
   🏷️  Статусы нормализованы: ['CHURNED' 'ACTIVE']
   ⚠️  Пропуски статусов: 0
STG_INVOICES: 2205 строк после очистки
   💰 amount преобразован в: float64
   📅 Все даты преобразованы в datetime
   ⚠️  Отрицательные amount: 20
   ⚠️  Пропуски amount: 24
STG_EVENTS: 16119 строк после очистки
   📅 event_date преобразован в: datetime64[ns]
   👤 user_id преобразован в: float64
   ⚠️  Пропуски user_id: 331

Структура таблиц:
stg_users: ['user_id', 'signup_date', 'region']
stg_sessions: ['session_id', 'user_id', 'session_date', 'utm_source', 'utm_medium', 'utm_campaign', 'is_first_session']
stg_subscriptions: