In [8]:
# Мини-инженерия данных: агрегация конвейера + витрина данных
# Этот файл содержит python-код для создания конвейера обработки данных лидов CRM и формирования витрины данных.

In [9]:
import pandas as pd
from datetime import datetime
import os

In [10]:
# 1. Загрузка сырых данных (симуляция импорта из Google Sheets)
raw_leads_df = pd.read_csv("crm_leads_raw.csv", parse_dates=["submission_date"])

print(f"Всего необработанных лидов: {len(raw_leads_df)}")

Всего необработанных лидов: 1000


In [11]:
# 2. Парсинг, очистка и нормализация данных
# Создаём копию для очистки
cleaned_leads_df = raw_leads_df.copy()

# Стандартизируем названия источников
cleaned_leads_df["source"] = cleaned_leads_df["source"].str.lower().str.replace(" ", "_")

# Проверяем формат email (упрощённая проверка)
cleaned_leads_df["is_valid_email"] = cleaned_leads_df["email"].str.contains(r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")

# Извлекаем год и месяц из даты
cleaned_leads_df["submission_year"] = cleaned_leads_df["submission_date"].dt.year
cleaned_leads_df["submission_month"] = cleaned_leads_df["submission_date"].dt.month

In [12]:
# 3. Создание Витрины Данных (crm_leads_cleaned)
# Выбираем и переименовываем столбцы для витрины данных
data_mart_df = cleaned_leads_df[[
    "lead_id",
    "submission_date",
    "submission_year",
    "submission_month",
    "source",
    "status",
    "value",
    "is_valid_email"
]].copy()

# Сохраняем витрину данных в CSV-файл 
data_mart_df.to_csv("crm_leads_cleaned.csv", index=False)

In [13]:
# 4. Логика Cron-задачи
# Этот раздел симулирует логику, которая будет запускаться cron-задачей ежедневно.

def daily_pipeline_job():
    print("\n--- Запуск Ежедневной Задачи Конвейера (Симуляция) ---")
    # 1. Загрузка новых сырых данных (в реальности через Google Sheets API)
    # Для симуляции просто перезагружаем те же данные.
    new_raw_data = pd.read_csv("crm_leads_raw.csv", parse_dates=["submission_date"])
    
    # 2. Очистка и нормализация новых данных
    # (Применяем те же шаги очистки)
    
    # 3. Добавление к существующей витрине данных
    # (В реальности - добавление в таблицу PostgreSQL)
    
    print("Ежедневная задача конвейера завершена. Витрина данных обновлена.")
    print(f"Временная метка: {datetime.now()}")

# Запускаем симулированную задачу
daily_pipeline_job()


--- Запуск Ежедневной Задачи Конвейера (Симуляция) ---
Ежедневная задача конвейера завершена. Витрина данных обновлена.
Временная метка: 2025-07-28 00:38:58.617523
