***Шаг 1. Настройка окружения и Импорты***

In [1]:
import os
import traceback
import pandas as pd
from utils.design import *
from utils.music import BackgroundMusic

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from config import settings

from core.models import Base, BomReport
from core.repositories.raw_repository import RawDataRepository
from core.services.material_service import MaterialETLService

music = BackgroundMusic(settings.MUSIC_PATH)
music.play()

Message.print_message(f"Импорты успешно выполнены!", Color.GREEN, Color.LIGHT_WHITE)

  from pkg_resources import resource_stream, resource_exists


pygame 2.6.1 (SDL 2.28.4, Python 3.13.5)
Hello from the pygame community. https://www.pygame.org/contribute.html

[1m[32m+----------------------------+[0m
[1m[32m| [97m[1mИмпорты успешно выполнены![0m[32m |[0m
[1m[32m+----------------------------+[0m


***Шаг 2. Инициализация Базы Данных***

*Подключение к БД и создание таблиц.
Создание engine (движок подключения) и сессию.
Команда Base.metadata.create_all(engine) проверит наличие таблиц в базе данных Postgres. Если их нет - создаст. Если есть - оставит как есть.*

In [2]:
Message.print_message(
    f"Подключение к БД: {settings.DB_URL}", Color.PURPLE, Color.LIGHT_WHITE
)

engine = create_engine(settings.DB_URL, echo=False)

Base.metadata.create_all(engine)

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

Message.print_message(
    "База данных инициализирована. Сессия открыта.", Color.BLUE, Color.LIGHT_WHITE
)


[1m[35m+---------------------------------------------------------------------+[0m
[1m[35m| [97m[1mПодключение к БД: postgresql+psycopg://root:root@localhost:5435/app[0m[35m |[0m
[1m[35m+---------------------------------------------------------------------+[0m

[1m[34m+-----------------------------------------------+[0m
[1m[34m| [97m[1mБаза данных инициализирована. Сессия открыта.[0m[34m |[0m
[1m[34m+-----------------------------------------------+[0m


***Шаг 3. Внедрение зависимостей***

*Сборка сервисов (Dependency Injection)
Использование принципов SOLID:
Создание Repository (RawDataRepository), который умеет только общаться с базой (INSERT, TRUNCATE, SELECT).
Передаем репозиторий внутрь Service (MaterialETLService). Сервис содержит бизнес-логику: как читать CSV, как чистить данные, как запускать расчеты.*

In [3]:
repository = RawDataRepository(session)

etl_service = MaterialETLService(repository)

Message.print_message("Сервисы готовы к работе.", Color.BLUE, Color.LIGHT_WHITE)


[1m[34m+--------------------------+[0m
[1m[34m| [97m[1mСервисы готовы к работе.[0m[34m |[0m
[1m[34m+--------------------------+[0m


***Шаг 4. ETL Процесс***


На этом этапе происходят преобразования ETL (Extract, Transform, Load)
Extract: Чтение factory_data.csv.

Transform: Переименование колонок (приведение к snake_case с _id).
Очистка типов данных (удаление запятых в числах, обработка NaN).
Дедупликация (агрегация данных по годам).

Load: Массовая вставка очищенных данных в таблицу raw_factory_data.


In [4]:
try:
    if os.path.exists(settings.INPUT_CSV_PATH):
        Message.print_message(
            "Начинаем процесс импорта...", Color.BLUE, Color.LIGHT_WHITE
        )

        rows_inserted = etl_service.run_import_pipeline()

        """
            Extract
            df = self._read_csv(file_path)

            Transform
            df = self._transform_columns(df)
            df = self._clean_data_types(df)
            df = self._deduplicate_by_year(df)

          
            Load
            self.repository.truncate_table()
            self.repository.bulk_insert(records)
        """

        Message.print_message(
            f"Успешно загружено строк в сырую таблицу: {rows_inserted}",
            Color.GREEN,
            Color.LIGHT_WHITE,
        )
    else:
        Message.print_message(
            f"Ошибка: Файл {settings.INPUT_CSV_PATH} не найден.",
            Color.RED,
            Color.LIGHT_WHITE,
        )
except Exception as e:
    Message.print_message(f"Ошибка ETL: {e}", Color.RED, Color.LIGHT_WHITE)
    traceback.print_exc()

[32m2026-01-19 00:58:44.084[0m | [1mINFO    [0m | [36mcore.services.material_service[0m:[36mrun_import_pipeline[0m:[36m113[0m - [1mStarting ETL pipeline...[0m
[32m2026-01-19 00:58:44.093[0m | [1mINFO    [0m | [36mcore.services.material_service[0m:[36mrun_import_pipeline[0m:[36m116[0m - [1mStep 1: Extract[0m
[32m2026-01-19 00:58:44.096[0m | [34m[1mDEBUG   [0m | [36mcore.services.material_service[0m:[36m_read_csv[0m:[36m34[0m - [34m[1mReading CSV file: C:\Users\user\DataGripProjects\pandas_factory_task\resources\csv\raw\factory_data.csv[0m
[32m2026-01-19 00:58:44.177[0m | [1mINFO    [0m | [36mcore.services.material_service[0m:[36mrun_import_pipeline[0m:[36m119[0m - [1mStep 2: Transform[0m
[32m2026-01-19 00:58:44.188[0m | [34m[1mDEBUG   [0m | [36mcore.services.material_service[0m:[36m_clean_data_types[0m:[36m58[0m - [34m[1mCleaning data types...[0m
[32m2026-01-19 00:58:44.241[0m | [1mINFO    [0m | [36mcore.services.mater


[1m[34m+-----------------------------+[0m
[1m[34m| [97m[1mНачинаем процесс импорта...[0m[34m |[0m
[1m[34m+-----------------------------+[0m


[32m2026-01-19 00:58:44.354[0m | [34m[1mDEBUG   [0m | [36mcore.repositories.raw_repository[0m:[36mbulk_insert[0m:[36m49[0m - [34m[1mInserting batch of 110 records...[0m
[32m2026-01-19 00:58:44.562[0m | [32m[1mSUCCESS [0m | [36mcore.services.material_service[0m:[36mrun_import_pipeline[0m:[36m136[0m - [32m[1mETL finished successfully. Rows loaded: 110[0m



[1m[32m+----------------------------------------------+[0m
[1m[32m| [97m[1mУспешно загружено строк в сырую таблицу: 110[0m[32m |[0m
[1m[32m+----------------------------------------------+[0m


***Шаг 5. Формирование итоговой таблицы***

*Теперь, когда "сырые" данные в базе, запускается сложный SQL-скрипт (Recursive Common Table Expression).
Скрипт строит дерево производства: от готового изделия (FIN) до самого нижнего компонента (RM или ADD), проходя через все уровни вложенности.*

In [5]:
try:

    Message.print_message("Запуск SQL процедуры...", Color.PURPLE, Color.LIGHT_WHITE)

    raw_report_data = etl_service.generate_bom_report()

    Message.print_message(
        f"Расчет завершен. Получено строк отчета: {len(raw_report_data)}",
        Color.BLUE,
        Color.LIGHT_WHITE,
    )

except Exception as e:
    Message.print_message(f"Ошибка при расчете: {e}", Color.RED, Color.LIGHT_WHITE)
    traceback.print_exc()

[32m2026-01-19 00:59:45.105[0m | [1mINFO    [0m | [36mcore.services.material_service[0m:[36mgenerate_bom_report[0m:[36m152[0m - [1mExecuting SQL script: bom_explosion.sql[0m
[32m2026-01-19 00:59:45.107[0m | [34m[1mDEBUG   [0m | [36mcore.repositories.raw_repository[0m:[36mexecute_raw_sql[0m:[36m58[0m - [34m[1mExecuting raw SQL query...[0m



[1m[35m+-------------------------+[0m
[1m[35m| [97m[1mЗапуск SQL процедуры...[0m[35m |[0m
[1m[35m+-------------------------+[0m

[1m[34m+---------------------------------------------+[0m
[1m[34m| [97m[1mРасчет завершен. Получено строк отчета: 110[0m[34m |[0m
[1m[34m+---------------------------------------------+[0m


***Шаг 6. Проверка результатов***


*Выводится итоговая таблица. В итоге преобразует результат в Pandas DataFrame для удобного просмотра прямо в ноутбуке.*

In [7]:
if raw_report_data:

    data_dicts = []
    for row in raw_report_data:

        report_item = BomReport(
            plant=row.plant,
            year=row.year,
            fin_material_id=row.fin_material_id,
            prod_material_id=row.prod_material_id,
            component_id=row.component_id,
            component_material_release_type=row.component_material_release_type,
        )

        data_dicts.append(
            {
                "Plant": report_item.plant,
                "Year": report_item.year,
                "FIN ID": report_item.fin_material_id,
                "Prod ID": report_item.prod_material_id,
                "Comp ID": report_item.component_id,
                "Comp Type": report_item.component_material_release_type or "-",
            }
        )

    df_result = pd.DataFrame(data_dicts)

    display(df_result.head(15))

    Message.print_message(
        "Всё успешно прошло... Можно идти спать...", Color.GREEN, Color.LIGHT_WHITE
    )
else:
    Message.print_message(
        "Отчет пуст. Проверьте входные данные.", Color.YELLOW, Color.LIGHT_WHITE
    )

session.close()

Unnamed: 0,Plant,Year,FIN ID,Prod ID,Comp ID,Comp Type
0,RLT_10,2024,10000,10000,50000,PROD
1,RLT_10,2024,10000,50000,90001,ADD
2,RLT_10,2024,10000,50000,90000,ADD
3,RLT_10,2024,10000,50000,80070,PROD
4,RLT_10,2024,10000,80070,90003,ADD
5,RLT_10,2024,10000,80070,80010,PROD
6,RLT_10,2024,10000,80070,90002,ADD
7,RLT_10,2024,10000,80010,90004,ADD
8,RLT_10,2024,10000,80010,80000,PROD
9,RLT_10,2024,10000,80000,70000,RM



[1m[32m+-------------------------------------------+[0m
[1m[32m| [97m[1mВсё успешно прошло... Можно идти спать...[0m[32m |[0m
[1m[32m+-------------------------------------------+[0m


In [8]:
Developer.print_info_of_developer()



[1m[97m+--------------------------------------------------------------+[0m
[1m[97m| [36m[1mЗадание было выполнено стажером Big Data в компании Innowise[0m[97m |[0m
[1m[97m+--------------------------------------------------------------+[0m

[1m[97m+--------------------------------------------------+[0m
[1m[97m| [32m[1mBig Data Engineer: Панфиленко Станислав Игоревич[0m[97m |[0m
[1m[97m+--------------------------------------------------+[0m

[1m[97m+----------------------------------------------------------+[0m
[1m[97m| [34m[1mЛичная электронная почта: stanislav.panfilenko@gmail.com[0m[97m |[0m
[1m[97m+----------------------------------------------------------+[0m

