## Разбиение ETL-процесса на отдельные файлы в Python
Для лучшей организации кода ETL-процесс (Extract-Transform-Load) можно разбить на отдельные модули. Вот как это можно сделать:

Рекомендуемая структура проекта

etl_project/  
├── __init__.py  
├── extract.py  
├── transform.py  
├── load.py  
├── main.py  
└── utils.py (опционально)  

### 1. extract.py - модуль для извлечения данных
```python
import pandas as pd
from typing import Dict, Any

def extract_from_csv(file_path: str) -> pd.DataFrame:
    """Извлечение данных из CSV файла"""
    return pd.read_csv(file_path)

def extract_from_api(api_url: str, params: Dict[str, Any] = None) -> Dict:
    """Извлечение данных из API"""
    import requests
    response = requests.get(api_url, params=params)
    response.raise_for_status()
    return response.json()

def extract_from_database(query: str, connection_string: str) -> pd.DataFrame:
    """Извлечение данных из базы данных"""
    import sqlalchemy
    engine = sqlalchemy.create_engine(connection_string)
    return pd.read_sql(query, engine)
```

### 2. transform.py - модуль для трансформации данных
```python
import pandas as pd
from typing import Dict, Any

def clean_data(df: pd.DataFrame) -> pd.DataFrame:
    """Очистка данных"""
    # Удаление дубликатов
    df = df.drop_duplicates()
    
    # Заполнение пропущенных значений
    for col in df.columns:
        if df[col].dtype == 'object':
            df[col] = df[col].fillna('unknown')
        else:
            df[col] = df[col].fillna(df[col].median())
    
    return df

def transform_data(df: pd.DataFrame, rules: Dict[str, Any]) -> pd.DataFrame:
    """Основная трансформация данных"""
    df = clean_data(df)
    
    # Применение правил трансформации
    if 'rename_columns' in rules:
        df = df.rename(columns=rules['rename_columns'])
    
    if 'filter_conditions' in rules:
        df = df.query(rules['filter_conditions'])
    
    return df
```

### 3. load.py - модуль для загрузки данных
```python
import pandas as pd
from typing import Dict

def load_to_csv(df: pd.DataFrame, file_path: str) -> None:
    """Загрузка данных в CSV файл"""
    df.to_csv(file_path, index=False)

def load_to_database(df: pd.DataFrame, table_name: str, connection_string: str) -> None:
    """Загрузка данных в базу данных"""
    import sqlalchemy
    engine = sqlalchemy.create_engine(connection_string)
    df.to_sql(table_name, engine, if_exists='replace', index=False)

def load_to_api(data: Dict, api_url: str) -> None:
    """Отправка данных через API"""
    import requests
    response = requests.post(api_url, json=data)
    response.raise_for_status()
```

### 4. main.py - основной скрипт для запуска ETL
```python
from extract import extract_from_csv
from transform import transform_data
from load import load_to_database

def run_etl_pipeline():
    # Конфигурационные параметры
    config = {
        'input_file': 'data/input.csv',
        'output_db': 'sqlite:///data/output.db',
        'output_table': 'processed_data',
        'transform_rules': {
            'rename_columns': {'old_name': 'new_name'},
            'filter_conditions': 'value > 100'
        }
    }
    
    # Extract
    print("Extracting data...")
    raw_data = extract_from_csv(config['input_file'])
    
    # Transform
    print("Transforming data...")
    processed_data = transform_data(raw_data, config['transform_rules'])
    
    # Load
    print("Loading data...")
    load_to_database(processed_data, config['output_table'], config['output_db'])
    
    print("ETL process completed successfully!")

if __name__ == "__main__":
    run_etl_pipeline()
```

### Дополнительные рекомендации
Конфигурация: Вы можете вынести конфигурационные параметры в отдельный файл (например, config.py или settings.yaml).

Логирование: Добавьте логирование в каждый этап для отслеживания процесса.

Обработка ошибок: Реализуйте обработку ошибок для каждого этапа.

Тестирование: Создайте отдельные тесты для каждого модуля.

Документация: Добавьте docstrings и комментарии для каждого модуля и функции.

Такое разделение делает код более:
- Читаемым
- Поддерживаемым
- Тестируемым
- Переиспользуемым
- Масштабируемым

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [2]:
import mysql.connector
import psycopg2
from sqlalchemy import create_engine
from sqlalchemy import URL

In [3]:
from config import mysql_qm 
from config import postgresql_home 

In [7]:
# EXTRACT
# Забираем данные из БД (в этом примере -с удаленной MySQL)

sql_query = '''
    SELECT
        *
    FROM
        sales 
        INNER JOIN companies USING(Артель)
        INNER JOIN chaebols USING(Артель)
        INNER JOIN industries USING(Товар) 
        INNER JOIN regions USING(Град)
'''

with mysql.connector.connect(
            host=mysql_qm['host'],
            user=mysql_qm['user'], 
            password=mysql_qm['password'],
            database=mysql_qm['database']
            ) as connection:
    df = pd.read_sql(sql_query, connection)

df = df[['Дата', 'Плата', 'Артель', 'Чеболь', 'Товар', 'Промысел', 'Град', 'Царство', ]]

df.Дата = pd.to_datetime(df.Дата)

  df = pd.read_sql(sql_query, connection)


In [9]:
# TRANSFORM
# В этом игрушечном примере создаем "куб" для сумм продаж

cube = df.groupby(['Чеболь', 'Промысел', 'Царство', ]).Плата.sum().round(2).reset_index()
cube = cube.rename(columns={'Плата': 'Сумма_продаж'})

запишем полученную таблицу в БД Postgres

In [11]:
# LOAD
# [Пере]запишем полученную таблицу в БД Postgres

sql_create_query = '''
    CREATE TABLE IF NOT EXISTS cube (
        Чеболь       VARCHAR(64), 
        Промысел     VARCHAR(64), 
        Царство      VARCHAR(64), 
        Сумма_продаж DECIMAL(10, 2)
    )
'''

with psycopg2.connect(
            host=postgresql_home['host'],
            user=postgresql_home['user'], 
            password=postgresql_home['password'],
            database=postgresql_home['database']
            ) as connection:
    with connection.cursor() as cursor:
        cursor.execute(sql_create_query)

url_object = URL.create(
    "postgresql+psycopg2",
    username=postgresql_home['user'],
    password=postgresql_home['password'], 
    host=postgresql_home['host'],
    port=postgresql_home['port'],
    database='competition_analysis',
)

engine = create_engine(url_object)

cube.to_sql(
    name='cube', # имя таблицы
    con=engine,  # движок
    if_exists="replace", # если в таблице данные уже есть, заменяем их
    index=False # без индекса датафрейма
)

42