# Ноутбук очистки данных

Интерактивный анализ и очистка данных с подробной отчетностью по каждому шагу.

## Содержание:
1. Подключение к базе данных
2. Анализ качества данных
3. Очистка дубликатов
4. Удаление некорректных записей
5. Архивирование неактивных данных
6. Итоговый отчет


In [None]:
# Импорт необходимых библиотек
import pandas as pd
import psycopg2
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# Настройка отображения
plt.style.use('default')
sns.set_palette("husl")
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)

print("📚 Библиотеки загружены успешно")

In [None]:
# Подключение к базе данных
import os
from dotenv import load_dotenv

load_dotenv()

# Параметры подключения
DB_CONFIG = {
    'host': os.getenv('DB_HOST', '103.246.146.132'),
    'port': os.getenv('DB_PORT', 5432),
    'database': os.getenv('DB_NAME', 'hackathon'),
    'user': os.getenv('DB_USER', 'user_db'),
    'password': os.getenv('DB_PASSWORD', 'psql14182025')
}

def get_connection():
    return psycopg2.connect(**DB_CONFIG)

def execute_query(query, return_df=True):
    """Выполнение SQL запроса с возвратом DataFrame"""
    conn = get_connection()
    try:
        if return_df:
            return pd.read_sql(query, conn)
        else:
            cur = conn.cursor()
            cur.execute(query)
            conn.commit()
            result = cur.fetchall()
            cur.close()
            return result
    finally:
        conn.close()

# Тест подключения
try:
    test_query = "SELECT 'Подключение успешно!' as status, CURRENT_TIMESTAMP as timestamp"
    result = execute_query(test_query)
    print("✅ База данных подключена")
    print(f"🕐 Время сервера: {result.iloc[0]['timestamp']}")
except Exception as e:
    print(f"❌ Ошибка подключения: {e}")

## 1. Анализ текущего состояния данных

Получим базовую статистику по таблицам до начала очистки.

In [None]:
# Получение статистики таблиц ДО очистки
tables_stats_before = {}
tables = ['customers', 'orders', 'order_items', 'products']

print("📊 СТАТИСТИКА ТАБЛИЦ ДО ОЧИСТКИ")
print("=" * 50)

for table in tables:
    try:
        query = f"SELECT COUNT(*) as count FROM {table}"
        result = execute_query(query)
        count = result.iloc[0]['count']
        tables_stats_before[table] = count
        print(f"{table:15} | {count:>10,} строк")
    except Exception as e:
        print(f"{table:15} | Ошибка: {e}")
        tables_stats_before[table] = 0

# Визуализация
if tables_stats_before:
    plt.figure(figsize=(10, 6))
    plt.bar(tables_stats_before.keys(), tables_stats_before.values())
    plt.title('Количество записей в таблицах (до очистки)')
    plt.ylabel('Количество записей')
    plt.xticks(rotation=45)
    for i, v in enumerate(tables_stats_before.values()):
        plt.text(i, v + max(tables_stats_before.values()) * 0.01, f'{v:,}', 
                ha='center', va='bottom')
    plt.tight_layout()
    plt.show()

## 2. Анализ качества данных

Проверим наличие дубликатов, пропущенных значений и некорректных данных.

In [None]:
# Анализ дубликатов клиентов
duplicates_query = """
SELECT 
    email,
    COUNT(*) as count,
    string_agg(id::text, ', ') as duplicate_ids
FROM customers 
WHERE email IS NOT NULL AND email != ''
GROUP BY email 
HAVING COUNT(*) > 1
ORDER BY count DESC
LIMIT 10
"""

duplicates = execute_query(duplicates_query)
print(f"🔍 ДУБЛИКАТЫ КЛИЕНТОВ: {len(duplicates)} найдено")
if len(duplicates) > 0:
    print("\nТоп-10 дубликатов:")
    display(duplicates)
else:
    print("✅ Дубликатов клиентов не найдено")

In [None]:
# Анализ некорректных заказов
invalid_orders_queries = {
    'Заказы без товаров': """
        SELECT COUNT(*) as count 
        FROM orders o
        WHERE o.id NOT IN (
            SELECT DISTINCT order_id 
            FROM order_items 
            WHERE order_id IS NOT NULL
        )
    """,
    'Заказы с будущими датами': """
        SELECT COUNT(*) as count 
        FROM orders 
        WHERE order_date > CURRENT_DATE
    """,
    'Заказы с несуществующими клиентами': """
        SELECT COUNT(*) as count 
        FROM orders 
        WHERE customer_id NOT IN (
            SELECT id FROM customers WHERE id IS NOT NULL
        )
    """
}

print("🔍 АНАЛИЗ НЕКОРРЕКТНЫХ ЗАКАЗОВ")
print("=" * 50)

invalid_orders_stats = {}
for description, query in invalid_orders_queries.items():
    try:
        result = execute_query(query)
        count = result.iloc[0]['count']
        invalid_orders_stats[description] = count
        print(f"{description:35} | {count:>6} записей")
    except Exception as e:
        print(f"{description:35} | Ошибка: {e}")
        invalid_orders_stats[description] = 0

In [None]:
# Анализ некорректных позиций заказов
invalid_items_query = """
SELECT 
    'Нулевое или отрицательное количество' as issue_type,
    COUNT(*) as count
FROM order_items 
WHERE quantity <= 0 OR quantity IS NULL

UNION ALL

SELECT 
    'Отрицательная цена' as issue_type,
    COUNT(*) as count
FROM order_items 
WHERE unit_price < 0 OR unit_price IS NULL

UNION ALL

SELECT 
    'Несуществующие товары' as issue_type,
    COUNT(*) as count
FROM order_items 
WHERE product_id NOT IN (SELECT id FROM products WHERE id IS NOT NULL)
"""

invalid_items = execute_query(invalid_items_query)
print("\n🔍 АНАЛИЗ НЕКОРРЕКТНЫХ ПОЗИЦИЙ ЗАКАЗОВ")
print("=" * 50)
display(invalid_items)

In [None]:
# Анализ неактивных товаров
inactive_products_query = """
SELECT 
    COUNT(*) as total_products,
    COUNT(CASE WHEN is_active = true THEN 1 END) as active_products,
    COUNT(CASE WHEN is_active = false THEN 1 END) as inactive_products,
    COUNT(CASE WHEN is_active = true AND id NOT IN (
        SELECT DISTINCT oi.product_id 
        FROM order_items oi 
        JOIN orders o ON oi.order_id = o.id 
        WHERE o.order_date >= CURRENT_DATE - INTERVAL '12 months'
          AND oi.product_id IS NOT NULL
    ) AND created_at < CURRENT_DATE - INTERVAL '12 months' THEN 1 END) as candidates_for_archiving
FROM products
"""

products_analysis = execute_query(inactive_products_query)
print("\n🔍 АНАЛИЗ ТОВАРОВ")
print("=" * 50)
display(products_analysis)

# Визуализация состояния товаров
if len(products_analysis) > 0:
    data = products_analysis.iloc[0]
    categories = ['Активные', 'Неактивные', 'Кандидаты на архивирование']
    values = [data['active_products'], data['inactive_products'], data['candidates_for_archiving']]
    
    plt.figure(figsize=(8, 6))
    plt.pie(values, labels=categories, autopct='%1.1f%%', startangle=90)
    plt.title('Распределение товаров по статусу')
    plt.axis('equal')
    plt.show()

## 3. Выполнение очистки данных

Теперь приступим к поэтапной очистке данных с подсчетом изменений.

In [None]:
# Инициализация отчета об очистке
cleaning_report = []

def log_cleaning_step(step_name, table_name, rows_before, rows_after, justification):
    """Логирование шага очистки"""
    affected = rows_before - rows_after
    cleaning_report.append({
        'step': step_name,
        'table': table_name,
        'rows_before': rows_before,
        'rows_after': rows_after,
        'rows_affected': affected,
        'justification': justification
    })
    
    print(f"\n📋 {step_name}")
    print(f"   Таблица: {table_name}")
    print(f"   До: {rows_before:,} строк")
    print(f"   После: {rows_after:,} строк")
    print(f"   Обработано: {affected:,} строк")
    print(f"   Обоснование: {justification}")
    print("-" * 50)

In [None]:
# Шаг 1: Удаление тестовых клиентов
print("🧹 ШАГ 1: УДАЛЕНИЕ ТЕСТОВЫХ КЛИЕНТОВ")

# Подсчет до удаления
test_customers_before_query = """
SELECT COUNT(*) as count FROM customers 
WHERE email LIKE '%test%' 
   OR email LIKE '%example%' 
   OR name LIKE '%Test%'
   OR name LIKE '%test%'
"""
rows_before = execute_query(test_customers_before_query).iloc[0]['count']

# Удаление тестовых клиентов
delete_test_customers = """
DELETE FROM customers 
WHERE email LIKE '%test%' 
   OR email LIKE '%example%' 
   OR name LIKE '%Test%'
   OR name LIKE '%test%'
"""
execute_query(delete_test_customers, return_df=False)

# Подсчет после удаления
rows_after = execute_query(test_customers_before_query).iloc[0]['count']

log_cleaning_step(
    "Удаление тестовых клиентов",
    "customers",
    rows_before,
    rows_after,
    "Тестовые данные искажают реальную аналитику и метрики бизнеса"
)

In [None]:
# Шаг 2: Удаление дубликатов клиентов
print("\n🧹 ШАГ 2: УДАЛЕНИЕ ДУБЛИКАТОВ КЛИЕНТОВ")

# Подсчет до удаления
rows_before = execute_query("SELECT COUNT(*) as count FROM customers").iloc[0]['count']

# Удаление дубликатов (оставляем запись с минимальным ID)
delete_duplicates = """
DELETE FROM customers 
WHERE id NOT IN (
    SELECT MIN(id) 
    FROM customers 
    WHERE email IS NOT NULL AND email != ''
    GROUP BY email
) AND email IS NOT NULL AND email != ''
"""
execute_query(delete_duplicates, return_df=False)

# Подсчет после удаления
rows_after = execute_query("SELECT COUNT(*) as count FROM customers").iloc[0]['count']

log_cleaning_step(
    "Удаление дубликатов клиентов",
    "customers",
    rows_before,
    rows_after,
    "Дубликаты клиентов искажают аналитику. Оставлены записи с наименьшим ID как наиболее ранние"
)

In [None]:
# Шаг 3: Очистка некорректных заказов
print("\n🧹 ШАГ 3: ОЧИСТКА НЕКОРРЕКТНЫХ ЗАКАЗОВ")

# Подсчет до очистки
rows_before = execute_query("SELECT COUNT(*) as count FROM orders").iloc[0]['count']

# Удаление заказов без товаров
delete_empty_orders = """
DELETE FROM orders 
WHERE id NOT IN (
    SELECT DISTINCT order_id 
    FROM order_items 
    WHERE order_id IS NOT NULL
)
"""
execute_query(delete_empty_orders, return_df=False)

# Удаление заказов с будущими датами
delete_future_orders = """
DELETE FROM orders 
WHERE order_date > CURRENT_DATE
"""
execute_query(delete_future_orders, return_df=False)

# Удаление заказов с несуществующими клиентами
delete_orphan_orders = """
DELETE FROM orders 
WHERE customer_id NOT IN (
    SELECT id FROM customers WHERE id IS NOT NULL
)
"""
execute_query(delete_orphan_orders, return_df=False)

# Подсчет после очистки
rows_after = execute_query("SELECT COUNT(*) as count FROM orders").iloc[0]['count']

log_cleaning_step(
    "Очистка некорректных заказов",
    "orders",
    rows_before,
    rows_after,
    "Заказы без товаров не имеют коммерческой ценности. Будущие даты и несуществующие клиенты - ошибки данных"
)

In [None]:
# Шаг 4: Очистка позиций заказов
print("\n🧹 ШАГ 4: ОЧИСТКА ПОЗИЦИЙ ЗАКАЗОВ")

# Подсчет до очистки
rows_before = execute_query("SELECT COUNT(*) as count FROM order_items").iloc[0]['count']

# Удаление некорректных позиций
delete_invalid_items = """
DELETE FROM order_items 
WHERE quantity <= 0 
   OR unit_price < 0
   OR quantity IS NULL 
   OR unit_price IS NULL
   OR product_id NOT IN (SELECT id FROM products WHERE id IS NOT NULL)
"""
execute_query(delete_invalid_items, return_df=False)

# Подсчет после очистки
rows_after = execute_query("SELECT COUNT(*) as count FROM order_items").iloc[0]['count']

log_cleaning_step(
    "Очистка позиций заказов",
    "order_items",
    rows_before,
    rows_after,
    "Позиции с нулевым количеством или отрицательными ценами искажают расчет выручки и не могут быть реальными операциями"
)

In [None]:
# Шаг 5: Архивирование неактивных товаров
print("\n🧹 ШАГ 5: АРХИВИРОВАНИЕ НЕАКТИВНЫХ ТОВАРОВ")

# Подсчет активных товаров до архивирования
rows_before = execute_query("SELECT COUNT(*) as count FROM products WHERE is_active = true").iloc[0]['count']

# Деактивация товаров без продаж за последние 12 месяцев
archive_products = """
UPDATE products 
SET is_active = false, 
    updated_at = CURRENT_TIMESTAMP
WHERE is_active = true 
  AND id NOT IN (
      SELECT DISTINCT oi.product_id 
      FROM order_items oi 
      JOIN orders o ON oi.order_id = o.id 
      WHERE o.order_date >= CURRENT_DATE - INTERVAL '12 months'
        AND oi.product_id IS NOT NULL
  )
  AND created_at < CURRENT_DATE - INTERVAL '12 months'
"""
execute_query(archive_products, return_df=False)

# Подсчет активных товаров после архивирования
rows_after = execute_query("SELECT COUNT(*) as count FROM products WHERE is_active = true").iloc[0]['count']

log_cleaning_step(
    "Архивирование товаров",
    "products",
    rows_before,
    rows_after,
    "Неактивные товары загромождают каталог и усложняют аналитику. Архивирование помогает сосредоточиться на актуальном ассортименте"
)

## 4. Итоговый отчет

Сравним состояние данных до и после очистки.

In [None]:
# Получение статистики таблиц ПОСЛЕ очистки
tables_stats_after = {}

print("📊 СТАТИСТИКА ТАБЛИЦ ПОСЛЕ ОЧИСТКИ")
print("=" * 50)

for table in tables:
    try:
        query = f"SELECT COUNT(*) as count FROM {table}"
        result = execute_query(query)
        count = result.iloc[0]['count']
        tables_stats_after[table] = count
        
        # Сравнение с начальным состоянием
        before = tables_stats_before.get(table, 0)
        diff = before - count
        change_pct = (diff / before * 100) if before > 0 else 0
        
        print(f"{table:15} | {count:>10,} строк | Изменение: {diff:>6,} ({change_pct:>5.1f}%)")
    except Exception as e:
        print(f"{table:15} | Ошибка: {e}")
        tables_stats_after[table] = 0

In [None]:
# Создание детального отчета
if cleaning_report:
    df_report = pd.DataFrame(cleaning_report)
    
    print("\n📋 ДЕТАЛЬНЫЙ ОТЧЕТ ПО ОЧИСТКЕ")
    print("=" * 80)
    
    # Отображение отчета
    pd.set_option('display.max_colwidth', 50)
    display(df_report[['step', 'table', 'rows_before', 'rows_after', 'rows_affected']])
    
    # Общая статистика
    total_affected = df_report['rows_affected'].sum()
    print(f"\n📈 ОБЩАЯ СТАТИСТИКА:")
    print(f"   Всего операций: {len(cleaning_report)}")
    print(f"   Всего обработано записей: {total_affected:,}")
    print(f"   Время выполнения: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

In [None]:
# Визуализация результатов очистки
if cleaning_report:
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
    
    # График 1: Сравнение до/после по таблицам
    x = range(len(tables))
    width = 0.35
    
    before_counts = [tables_stats_before.get(table, 0) for table in tables]
    after_counts = [tables_stats_after.get(table, 0) for table in tables]
    
    ax1.bar([i - width/2 for i in x], before_counts, width, label='До очистки', alpha=0.8)
    ax1.bar([i + width/2 for i in x], after_counts, width, label='После очистки', alpha=0.8)
    
    ax1.set_xlabel('Таблицы')
    ax1.set_ylabel('Количество записей')
    ax1.set_title('Сравнени�� количества записей до и после очистки')
    ax1.set_xticks(x)
    ax1.set_xticklabels(tables, rotation=45)
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    
    # График 2: Количество обработанных записей по операциям
    df_report = pd.DataFrame(cleaning_report)
    ax2.barh(df_report['step'], df_report['rows_affected'])
    ax2.set_xlabel('Количество обработанных записей')
    ax2.set_title('Количество записей по операциям очистки')
    ax2.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

In [None]:
# Сохранение отчета в CSV
if cleaning_report:
    # Создание отчета с временной меткой
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    
    # Сохранение детального отчета
    df_report = pd.DataFrame(cleaning_report)
    report_filename = f'data_cleaning_report_{timestamp}.csv'
    df_report.to_csv(report_filename, index=False, encoding='utf-8')
    
    # Сохранение сводки по таблицам
    summary_data = []
    for table in tables:
        before = tables_stats_before.get(table, 0)
        after = tables_stats_after.get(table, 0)
        diff = before - after
        change_pct = (diff / before * 100) if before > 0 else 0
        
        summary_data.append({
            'table': table,
            'rows_before': before,
            'rows_after': after,
            'rows_changed': diff,
            'change_percent': change_pct
        })
    
    df_summary = pd.DataFrame(summary_data)
    summary_filename = f'tables_summary_{timestamp}.csv'
    df_summary.to_csv(summary_filename, index=False, encoding='utf-8')
    
    print(f"✅ Отчеты сохранены:")
    print(f"   📄 Детальный отчет: {report_filename}")
    print(f"   📊 Сводка по таблицам: {summary_filename}")
    
    print("\n🎉 ОЧИСТКА ДАННЫХ ЗАВЕРШЕНА УСПЕШНО!")