In [80]:
import pandas as pd
import numpy as np
import re
import sqlite3

from openpyxl import Workbook
from openpyxl.utils import get_column_letter
import os
from dotenv import load_dotenv # для чтения файла .env 

import base64 # для декодирования: сначала кодируем в UTF-8, затем в Base64

from requests import Session
# requests качает → копаетсяBeautifulSoup 
import requests 
#from bs4 import BeautifulSoup # ищет теги <div> <span> и тд для Веб-сайт / грязный HTML
    #BeautifulSoup(response.text, 'html.parser') Для HTML
    #BeautifulSoup(response.text, 'xml') Для XML (нужен lxml)
    # поиск JSON в теге <script> внутри HTML-страницы - BeautifulSoup + json.loads()
            # HTML-код:
        #<script id="app-data">
        #{"user": "Ivan", "orders": [1, 2, 3]}
        #</script>
            #поиск внутри HTML-код
            # 1. BeautifulSoup — чтобы найти блок
        #soup = BeautifulSoup(response.text, 'html.parser')
        #script = soup.find('script', id='app-data')
            # 2. json.loads — чтобы распарсить
        #data = json.loads(script.text)
from xml.etree import ElementTree as ET # обработка xml для SAP/1С/SOAP/чистый XML имеет четкую структуру
import json # обработка json # data = response.json() # Или, если нужно вручную: import json data = json.loads(response.text)
from requests.auth import HTTPBasicAuth # базовый вариант аутентификации логин:пароль
#from requests_ntlm import HttpNtlmAuth # для работы с логин пароль на кирилице
#from requests_negotiate_sspi import HttpNegotiateAuth # негативная аутентификация (без )

from datetime import datetime, timedelta
import time

from itertools import chain

from IPython.display import display  # Для Jupyter
from tabulate import tabulate      # Для консоли

from urllib.parse import quote_plus # Экранируем специальные символы в пароле


#import subprocess # Подключиться к 1С с Windows-аутентификацией. Через powershell -UseDefaultCredentials
# Обход блокировки Python Когда нужна нативная аутентификация Windows

import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns

from sqlalchemy import create_engine # для создания движка sql
from sqlalchemy.sql import text

import psycopg2 # обеспечивает взаимодействие между Python-приложениями и базой данных
# позволяет выполнять SQL-запросы и получать данные.
#print(psycopg2.__version__)  # Должно вывести 2.9.10


import hashlib # для хэширования и создания уникальных ключей
import uuid # уникальные ключи по UUID

import logging

#import polars as pl # polars — он потребляет в 3–5 раз меньше памяти чем pandas

# настройка логирования
logging.basicConfig(level=logging.INFO,
    format='%(asctime)s | %(levelname)s | %(message)s',
    handlers=[
        logging.FileHandler("sales_1C.log", encoding='utf-8'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

import urllib3 
# Отключаем предупреждение о SSL (только для теста!)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


#### Параметры подключения

In [82]:
# Загружаем переменные окружения
load_dotenv()

# Получаем логин и пароль
username = os.getenv("USERNAME_1C")
password = os.getenv("PASSWORD_1C")

if not username or not password:
    raise ValueError("Логин или пароль не найдены в .env файле!")


# ПАРАМЕТРЫ
BUFFER_DAYS = 2       # запас по краям что точно захватить требуемый период
# при преобразовании в формат даты с временем для нашей учетной ситсемы, время автоматически будет 00:00:00, ТО все что позже в выборку не попадет
# и мы потеряем эти записи
CHUNK_DAYS = 7        # оптимальный размер чанка

# для загрузки исторических данных
DateFrom = '20250101'
DateTo = '20250201'

# преобразовать в дату, тк timedelta умеет работать только с датой, DateFrom и DateTo в начальном формате это строки
expanded_from = datetime.strptime(DateFrom, '%Y%m%d') - timedelta(days=BUFFER_DAYS)
expanded_to = datetime.strptime(DateTo, '%Y%m%d') + timedelta(days=BUFFER_DAYS)


In [85]:
# функция разделяет наш период на более мелкие - по 7 дней (как правило это оптимальный вариант)
def date_range_chunks(start_date, end_date, days=7):
    start = start_date #datetime.strptime(start_date, '%Y%m%d')
    end = end_date #datetime.strptime(end_date, '%Y%m%d')
    while start < end:
        chunk_end = min(start + timedelta(days=days), end) 
        yield start.strftime('%Y%m%d'), chunk_end.strftime('%Y%m%d')
        start = chunk_end #+timedelta(days=1) 
# перекрываем чанк на -1 день иначе пограничные даты типа 2025-01-01 23:59:59 выпадают из выборки
# в SAP дата отражается дата без времени и все попадает в выборку 
# тк выборка строится по аналогичной дате до 2025-01-01 00:00:00
# далее удалить дубликаты из выборки 

In [86]:
import base64
# Кодируем в UTF-8, затем в Base64
credentials = f"{username}:{password}"
credentials_b64 = base64.b64encode(credentials.encode('utf-8')).decode('ascii')
# отдельно декодировать логин не получится, т.к. HTTPBasicAuth(login, password) Берёт сырой логин и пароль (например) иванов1:secret 
# Объединяет: логин:пароль -> Кодирует весь сигнал в байтах (→ тут-то и проблема) -> Кодирует в Base64 -> Передаёт в заголовке -> 
# Authorization: Basic aXY...base64...
# передаем логин:пароль в headers в уже закодированом виде 
 

headers = {
    'Authorization': f'Basic {credentials_b64}',
    'User-Agent': 'Python ETL Script'
}
start_time_all = datetime.now() # время запуска запроса
#all_data_rows = [] # добавляем все строки
all_data = [] # добавляем все датафреймы по чанкам, это будет список ('list') датафреймов
# далее потребуется его преобразовать в привычный pandas -> df_pandas = df_combined.to_pandas()

# Основной цикл
for date_start, date_end in date_range_chunks(expanded_from, expanded_to, days=CHUNK_DAYS):
    start_time = datetime.now() # время запуска запроса
    logger.info(f"Запрос: {date_start} – {date_end}")
    attempt = 0
    success = False

    while attempt < 3 and not success: # 3 попытки на перезапуск запроса в случае ошибок, обрыва связи и тп
            attempt += 1
            try:
                # url-ссылка указана внутри цикла, т.к. вне цикла будет использована вся ссылка и перебор параметров не пойдет                
                url = f'http://example/sale?BeginDate={date_start}&EndDate={date_end}'
                headers = {
                    'Authorization': f'Basic {credentials_b64}',
                    'User-Agent': 'Python ETL Script'
                }
                
                response = requests.get(
                    url,
                    headers=headers,
                    verify=False  # только для теста!
                )
                if response.status_code == 200:
                    data = response.json()
                    all_data.extend(data)
                    logger.info(f"✅ Получено: {len(data)} строк")
                    success = True  # ✅ Устанавливаем успех
                else:
                    print(f"❌ Ошибка: {response.status_code}, {response.text[:200]}")
            except Exception as e:
                    logger.error(f"❌ Ошибка: {e}")
                    success = False  # ❌ Устанавливаем ошибку
            
        # повтор цикла если произошла ошибка 
            if not success and attempt < 3:
                logger.info("Повтор через 5 сек...")
                time.sleep(5)
        
    # расчет времени окончания запроса (чанка)
    duration = datetime.now() - start_time
    logger.info(f"✅ Готово за {duration.total_seconds():.1f} сек")
                
duration_all = datetime.now() - start_time_all
logger.info(f"✅ Вся обработка:  {duration_all.total_seconds():.1f} сек")

# ✅ Теперь сразу обрабатываем
if all_data:
    df = pd.DataFrame(all_data)
        
    df['date_doc'] = pd.to_datetime(df['date_doc'], errors='coerce')
   
        # заполняем пропуски (при первичном анализе выявлены пропуски в полях ниже)
    future = pd.Timestamp('2099-12-31 23:59:59') # заглушка
    #df['date_transfer_risk'] = df['date_transfer_risk'].fillna(future)
    #df['document_date'] = df['document_date'].fillna(future)
    
    
   
    logger.info(f"✅ DataFrame создан: {len(df)} строк")

        
else:
    logger.info("❌ Нет данных для обработки!")

# Теперь можно использовать df дальше
    

2025-09-30 17:06:54,711 | INFO | Запрос: 20241230 – 20250106
2025-09-30 17:06:54,973 | INFO | ✅ Получено: 24 строк
2025-09-30 17:06:54,973 | INFO | ✅ Готово за 0.3 сек
2025-09-30 17:06:54,973 | INFO | Запрос: 20250106 – 20250113
2025-09-30 17:06:55,205 | INFO | ✅ Получено: 35 строк
2025-09-30 17:06:55,207 | INFO | ✅ Готово за 0.2 сек
2025-09-30 17:06:55,208 | INFO | Запрос: 20250113 – 20250120
2025-09-30 17:06:55,471 | INFO | ✅ Получено: 53 строк
2025-09-30 17:06:55,473 | INFO | ✅ Готово за 0.3 сек
2025-09-30 17:06:55,473 | INFO | Запрос: 20250120 – 20250127
2025-09-30 17:06:55,752 | INFO | ✅ Получено: 51 строк
2025-09-30 17:06:55,754 | INFO | ✅ Готово за 0.3 сек
2025-09-30 17:06:55,756 | INFO | Запрос: 20250127 – 20250203
2025-09-30 17:06:56,221 | INFO | ✅ Получено: 83 строк
2025-09-30 17:06:56,223 | INFO | ✅ Готово за 0.5 сек
2025-09-30 17:06:56,224 | INFO | ✅ Вся обработка:  1.5 сек
2025-09-30 17:06:56,232 | INFO | ✅ DataFrame создан: 246 строк


#### присваиваем hash_id
далее удалим дубликаты по hash_id на случай если получили дублирование при перекрытии чанков

In [88]:
df['hash_id'] = df.astype(str).apply(''.join, axis=1).apply(
        lambda x: hashlib.md5(x.encode()).hexdigest()
    )

In [89]:
# проверяем количество уникальных id через хэш
df['hash_id'].nunique()

246

In [90]:
# проверяем количество дубликатов
display(f"количество дубликатов: {df.duplicated().sum()}")
display(df[df.duplicated()])
# удаляем дубликаты
df = df.drop_duplicates(['hash_id'])
# проверяем размер df
display(len(df))
logger.info(f"✅ Удалены дубликаты по hash_id")

# дубликаты появляются из-за пограничного времени 00:00:00 в date_doc
# в исходном отчете такие дубликаты отсутствуют

'количество дубликатов: 0'

Unnamed: 0,nomenclature,buyer,sales_doc,date_doc,region,distribution_channel,quantity,price,revenue_no_nds,tariff,price_in_currency,revenue_in_currency,rate_currency,country,incoterms,hash_id


246

2025-09-30 17:06:56,275 | INFO | ✅ Удалены дубликаты по hash_id


дубликатов нет, количество уникальных hash_id до удаления и после не изменилось

#### Проверяем пропуски, в т.ч. пустые значения ('')

In [91]:
# Проверяем наличие пустых строк, заполнены ''

display(f"Столбцы с пустыми строками: {df.columns[df.eq('').any()]}")


"Столбцы с пустыми строками: Index(['tariff', 'country', 'incoterms'], dtype='object')"

In [92]:
# заполняем пустые значения на 0 для числовых полей или 'unknown' для текстовых полей
df['tariff'] = df['tariff'].replace('', 0)
df['country'] = df['country'].replace('', 'unknown')
df['incoterms'] = df['incoterms'].replace('', 'unknown')


In [93]:
display(df.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 246 entries, 0 to 245
Data columns (total 16 columns):
 #   Column                Non-Null Count  Dtype         
---  ------                --------------  -----         
 0   nomenclature          246 non-null    object        
 1   buyer                 246 non-null    object        
 2   sales_doc             246 non-null    object        
 3   date_doc              246 non-null    datetime64[ns]
 4   region                246 non-null    object        
 5   distribution_channel  246 non-null    object        
 6   quantity              246 non-null    float64       
 7   price                 246 non-null    float64       
 8   revenue_no_nds        246 non-null    float64       
 9   tariff                246 non-null    float64       
 10  price_in_currency     246 non-null    float64       
 11  revenue_in_currency   246 non-null    float64       
 12  rate_currency         246 non-null    float64       
 13  country             

None

In [96]:
# Сплошная проверка полноты данных, фильтр на дате отчета
display(min(df.query('date_doc > "2024-12-31 23:59:59" & date_doc < "2025-09-02 00:00:00"')['date_doc']))
display(max(df.query('date_doc > "2024-12-31 23:59:59" & date_doc < "2025-09-02 00:00:00"')['date_doc']))
display(f"количество уникальных sale: {df.query('date_doc > "2024-12-31 23:59:59" & date_doc < "2025-09-02 00:00:00"')['sale'].nunique()}")
display(f"Размер df: {len(df.query('date_doc > "2024-12-31 23:59:59" & date_doc < "2025-09-02 00:00:00"'))}")
display(f"сумма по полю quantity: {df.query('date_doc > "2024-12-31 23:59:59" & date_doc < "2025-09-02 00:00:00"')['quantity'].sum()}")
display(f"сумма по полю price: {df.query('date_doc > "2024-12-31 23:59:59" & date_doc < "2025-09-02 00:00:00"')['price'].sum()}")
display(f"сумма по полю revenue_no_nds: {df.query('date_doc > "2024-12-31 23:59:59" & date_doc < "2025-09-02 00:00:00"')['revenue_no_nds'].sum()}")

Timestamp('2025-01-03 12:00:00')

Timestamp('2025-02-02 12:00:00')

'количество уникальных sales_doc: 191'

'Размер df: 227'

'сумма по полю quantity: 10000.00'

'сумма по полю price: 20000000.00'

'сумма по полю revenue_no_nds: 30000000.00'

количество уникальных документов 191, размер датафрейма 227, значит в качестве уникального ключа нельзя использовать только номер документа

In [99]:
# ФУНКЦИЙ ПРИСОВЕНИЯ ID

# Фуккция присваивает уникальный id как объединение неизменяемых полей (определяется экспертно)
def bisness_id(df):
    # Базовые бизнес-поля которые идентифицируют запись
    id_columns = ['nomenclature', 'buyer', 'sale']
    
    # Проверка наличия колонок
    missing_cols = [col for col in id_columns if col not in df.columns]
    if missing_cols:
        raise ValueError(f"Отсутствуют колонки: {missing_cols}")
    
    # Создаем стабильный ключ (даже для дубликатов)
    df['bisiness_id'] = df[id_columns].astype(str).apply('_'.join, axis=1)
    
     # Для дубликатов добавляем порядковый номер
    df['duplicate_index'] = df.groupby(id_columns).cumcount()

    # Создаем уникальный id
    df['unique_id'] = df['bisiness_id'] + '_' + df['duplicate_index'].astype(str)
    
    return df


In [100]:
sale = bisness_id(df) 
    # Проверяем стабильность
logger.info(f"Уникальных ID df: {sale['unique_id'].nunique()}")

2025-09-30 17:06:56,384 | INFO | Уникальных ID df: 246


In [101]:
# Добавим время загрузки
sale['load_time'] = pd.Timestamp.now()

In [102]:
display(sale.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 246 entries, 0 to 245
Data columns (total 20 columns):
 #   Column                Non-Null Count  Dtype         
---  ------                --------------  -----         
 0   nomenclature          246 non-null    object        
 1   buyer                 246 non-null    object        
 2   sales_doc             246 non-null    object        
 3   date_doc              246 non-null    datetime64[ns]
 4   region                246 non-null    object        
 5   distribution_channel  246 non-null    object        
 6   quantity              246 non-null    float64       
 7   price                 246 non-null    float64       
 8   revenue_no_nds        246 non-null    float64       
 9   tariff                246 non-null    float64       
 10  price_in_currency     246 non-null    float64       
 11  revenue_in_currency   246 non-null    float64       
 12  rate_currency         246 non-null    float64       
 13  country             

None

#### ПРОВЕРКИ

####   Подключение к БД PostgreSQL

#### Загрузка исторических данных

In [103]:

# Читаем параметры подключения
db_user = os.getenv("DB_USER")
db_password = os.getenv("DB_PASSWORD")
db_host = os.getenv("DB_HOST")
db_port = os.getenv("DB_PORT")
db_name = os.getenv("DB_NAME")


print(f"Тестовый датафрейм: {sale.shape[0]} строк, {sale.shape[1]} столбцов")

# Экранируем пароль
encoded_password = quote_plus(db_password)

db_url = f"postgresql+psycopg2://{db_user}:{encoded_password}@{db_host}:{db_port}/{db_name}"
engine = create_engine(db_url)

# Создаём движок для подключения по ссылке engine
engine = create_engine(db_url, pool_pre_ping=True) 
# используем дополнительно pool_pre_ping=True
# если соединение «умрёт» между запросами — SQLAlchemy автоматически его заменяет, и вы никогда не увидите ошибку «битого соединения»



try:
    # Загружаем в PostgreSQL
    sale.to_sql(
        name='test_sale',           # имя таблицы
        schema='dm',                    # схема
        con=engine,
        if_exists='replace',            # перезаписать, если есть        
        index=False,                    # не сохранять индекс
        method='multi',                 # немного быстрее при вставке
        chunksize=10                    # пачками — меньше нагрузки
    )
    print(f"✅ Данные загружены: {len(sale)} строк в таблицу dm.test_sale")

except Exception as e:
    print(f"❌ Ошибка при загрузке в БД: {e}")

finally:
    engine.dispose()  # закрываем соединение

Тестовый датафрейм: 246 строк, 20 столбцов
✅ Данные загружены: 246 строк в таблицу dm.test_sale


# Прочитаем обратно


In [None]:
df_test_sale = pd.read_sql("SELECT * FROM dm.test_sale LIMIT 2", engine)
display(df_test_sale)

#### Добавляем в таблицу в БД ADD CONSTRAINT
    для работы с ON CONFLICT (unique_id) DO UPDATE
    после загрузки историчеких данных, предполагается что таблица уже есть в БД


In [105]:
# 1. Очистка предыдущего состояния ---
try:
    with engine.begin() as conn:
        pass  # триггер на rollback, если была ошибка
except Exception as e:
    logging.warning(f"Транзакция была в состоянии ошибки, выполнен rollback: {e}")

engine.dispose()
logging.info("✅ Соединение с БД очищено")

# Пересоздаём engine
engine = create_engine(db_url)
logging.info("✅ Новое подключение к БД создано")


# 2. Проверка и создание UNIQUE CONSTRAINT ---
constraint_name = 'uk_unique_id' # долюно быть уникальное имя в рамках одной схемы для каждой таблицы внутри схемы
table_schema = 'dm'
table_name = 'test_sale'

check_sql = f"""
SELECT 1 FROM information_schema.table_constraints 
WHERE table_schema = '{table_schema}' 
  AND table_name = '{table_name}'
  AND constraint_name = '{constraint_name}';
"""

try:
    result = pd.read_sql(check_sql, engine)
    if len(result) == 0:
        with engine.begin() as conn:
            conn.execute(text(f"""
                ALTER TABLE {table_schema}.{table_name}
                ADD CONSTRAINT {constraint_name} UNIQUE (unique_id)
            """))
        logging.info(f"✅ Ограничение {constraint_name} добавлено")
    else:
        logging.info(f"🟢 Ограничение {constraint_name} уже существует — пропускаем")
except Exception as e:
    logging.error(f"❌ Ошибка при работе с ограничением: {e}")
    raise  # или continue, в зависимости от стратегии

2025-09-30 17:06:56,542 | INFO | ✅ Соединение с БД очищено
2025-09-30 17:06:56,543 | INFO | ✅ Новое подключение к БД создано
2025-09-30 17:06:56,578 | INFO | ✅ Ограничение uk_unique_id добавлено


#### Подготовка нового датафрейма исторические + новые данные 
тестируем инкрементальную загрузку

In [106]:

# ПАРАМЕТРЫ
BUFFER_DAYS = 2       # запас по краям
CHUNK_DAYS = 7        # оптимальный размер чанка

# для загрузки исторических данных ПЕРИОД 1
DateFrom_new = '20250115'
DateTo_new = '20250301'

# преобразовать в дату, тк timedelta умеет работать только с датой, DateFrom и DateTo в начальном формате это строки
expanded_from_new = datetime.strptime(DateFrom_new, '%Y%m%d') - timedelta(days=BUFFER_DAYS)
expanded_to_new = datetime.strptime(DateTo_new, '%Y%m%d') + timedelta(days=BUFFER_DAYS)


In [107]:
# Подготовка нового датафрейма исторические + новые данные 

import base64
# Кодируем в UTF-8, затем в Base64
credentials = f"{username}:{password}"
credentials_b64 = base64.b64encode(credentials.encode('utf-8')).decode('ascii')


headers = {
    'Authorization': f'Basic {credentials_b64}',
    'User-Agent': 'Python ETL Script'
}
start_time_all = datetime.now() # время запуска запроса

all_data_new = [] 

# Основной цикл
for date_start, date_end in date_range_chunks(expanded_from_new, expanded_to_new, days=CHUNK_DAYS):
    start_time = datetime.now() # время запуска запроса
    logger.info(f"Запрос: {date_start} – {date_end}")
    attempt = 0
    success = False

    while attempt < 3 and not success: # 3 попытки на перезапуск запроса в случае ошибок, обрыва связи и тп
            attempt += 1
            try:
                # url-ссылка указана внутри цикла, т.к. вне цикла будет использована вся ссылка и перебор параметров не пойдет
                url = f'http://example/sale?BeginDate={date_start}&EndDate={date_end}'
                headers = {
                    'Authorization': f'Basic {credentials_b64}',
                    'User-Agent': 'Python ETL Script'
                }
                
                response = requests.get(
                    url,
                    headers=headers,
                    verify=False  # только для теста!
                )
                if response.status_code == 200:
                    data_new = response.json()
                    all_data_new.extend(data_new)
                    logger.info(f"✅ Получено: {len(data_new)} строк")
                    success = True  # ✅ Устанавливаем успех
                else:
                    print(f"❌ Ошибка: {response.status_code}, {response.text[:200]}")
            except Exception as e:
                    logger.error(f"❌ Ошибка: {e}")
                    success = False  # ❌ Устанавливаем ошибку
            
        # повтор цикла если произошла ошибка 
            if not success and attempt < 3:
                logger.info("Повтор через 5 сек...")
                time.sleep(5)
        
    # расчет времени окончания запроса (чанка)
    duration = datetime.now() - start_time
    logger.info(f"✅ Готово за {duration.total_seconds():.1f} сек")
                
duration_all = datetime.now() - start_time_all
logger.info(f"✅ Вся обработка:  {duration_all.total_seconds():.1f} сек")

# ✅ Теперь сразу обрабатываем
if all_data_new:
    df_new = pd.DataFrame(all_data_new)
        
    df_new['date_doc'] = pd.to_datetime(df_new['date_doc'], errors='coerce')
    
        # заполняем пропуски (при первичном анализе выявлены пропуски в полях ниже)
    future = pd.Timestamp('2099-12-31 23:59:59') # заглушка
    df_new['date_transfer_risk'] = df_new['date_transfer_risk'].fillna(future)
    #df_new['document_date'] = df_new['document_date'].fillna(future)

    logger.info(f"✅ DataFrame создан: {len(df_new)} строк")

        
else:
    logger.info("❌ Нет данных для обработки!")

# Теперь можно использовать df дальше
    

2025-09-30 17:06:56,592 | INFO | Запрос: 20250113 – 20250120
2025-09-30 17:06:56,894 | INFO | ✅ Получено: 53 строк
2025-09-30 17:06:56,896 | INFO | ✅ Готово за 0.3 сек
2025-09-30 17:06:56,896 | INFO | Запрос: 20250120 – 20250127
2025-09-30 17:06:57,211 | INFO | ✅ Получено: 51 строк
2025-09-30 17:06:57,211 | INFO | ✅ Готово за 0.3 сек
2025-09-30 17:06:57,211 | INFO | Запрос: 20250127 – 20250203
2025-09-30 17:06:57,644 | INFO | ✅ Получено: 83 строк
2025-09-30 17:06:57,645 | INFO | ✅ Готово за 0.4 сек
2025-09-30 17:06:57,645 | INFO | Запрос: 20250203 – 20250210
2025-09-30 17:06:57,944 | INFO | ✅ Получено: 51 строк
2025-09-30 17:06:57,944 | INFO | ✅ Готово за 0.3 сек
2025-09-30 17:06:57,944 | INFO | Запрос: 20250210 – 20250217
2025-09-30 17:06:58,311 | INFO | ✅ Получено: 78 строк
2025-09-30 17:06:58,312 | INFO | ✅ Готово за 0.4 сек
2025-09-30 17:06:58,312 | INFO | Запрос: 20250217 – 20250224
2025-09-30 17:06:58,635 | INFO | ✅ Получено: 54 строк
2025-09-30 17:06:58,635 | INFO | ✅ Готово за 

#### присваиваем hash_id
далее удалим дубликаты по hash_id на случай если получили дублирование при перекрытии чанков

In [109]:
df_new['hash_id'] = df_new.astype(str).apply(''.join, axis=1).apply(
        lambda x: hashlib.md5(x.encode()).hexdigest()
    )

In [110]:
# проверяем количество уникальных id через хэш
df_new['hash_id'].nunique()

442

In [111]:
# проверяем количество дубликатов
display(f"количество дубликатов: {df_new.duplicated().sum()}")
display(df_new[df_new.duplicated()])
# удаляем дубликаты
df_new = df_new.drop_duplicates(['hash_id'])
# проверяем размер df
display(len(df_new))
logger.info(f"✅ Удалены дубликаты по hash_id")

# дубликаты появляются из-за пограничного времени 00:00:00 в date_doc
# в исходном отчете такие дубликаты отсутствуют

'количество дубликатов: 0'

Unnamed: 0,nomenclature,buyer,sales_doc,date_doc,region,distribution_channel,quantity,price,revenue_no_nds,tariff,price_in_currency,revenue_in_currency,rate_currency,country,incoterms,hash_id


442

2025-09-30 17:06:59,094 | INFO | ✅ Удалены дубликаты по hash_id


#### Проверяем пропуски, в т.ч. пустые значения ('')

In [112]:
# Проверяем наличие пустых строк, заполнены ''

display(f"Столбцы с пустыми строками: {df_new.columns[df_new.eq('').any()]}")


"Столбцы с пустыми строками: Index(['tariff', 'country', 'incoterms'], dtype='object')"

In [113]:
# заполняем пустные значения, аналогично как с обработкой исторических данных
df_new['tariff'] = df_new['tariff'].replace('', 0)
df_new['country'] = df_new['country'].replace('', 'unknown')
df_new['incoterms'] = df_new['incoterms'].replace('', 'unknown')



In [114]:
display(df_new.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 442 entries, 0 to 441
Data columns (total 16 columns):
 #   Column                Non-Null Count  Dtype         
---  ------                --------------  -----         
 0   nomenclature          442 non-null    object        
 1   buyer                 442 non-null    object        
 2   sales_doc             442 non-null    object        
 3   date_doc              442 non-null    datetime64[ns]
 4   region                442 non-null    object        
 5   distribution_channel  442 non-null    object        
 6   quantity              442 non-null    float64       
 7   price                 442 non-null    float64       
 8   revenue_no_nds        442 non-null    float64       
 9   tariff                442 non-null    float64       
 10  price_in_currency     442 non-null    float64       
 11  revenue_in_currency   442 non-null    float64       
 12  rate_currency         442 non-null    float64       
 13  country             

None

In [115]:
# удаляем дубликаты
df_new = df_new.drop_duplicates()
logger.info(f"✅ Удалены дубликаты: {df_new.duplicated().sum()}")
logger.info(f"В таблице записей: {len(df_new)}")

2025-09-30 17:06:59,134 | INFO | ✅ Удалены дубликаты: 0
2025-09-30 17:06:59,136 | INFO | В таблице записей: 442


In [116]:
df_new = bisness_id(df_new) #, salt='one_s_analyse_revenue') если потребуется дополнительно идентификатор
    # Проверяем стабильность
logger.info(f"Уникальных unique_id: {df_new['unique_id'].nunique()}")

2025-09-30 17:06:59,148 | INFO | Уникальных unique_id: 442


In [117]:
# Добавим время загрузки
df_new['load_time'] = pd.Timestamp.now()

#### Инкрементальная загрузка

In [118]:
# Подключение к БД
# Читаем пароль
db_user = os.getenv("DB_USER")
db_password = os.getenv("DB_PASSWORD")
db_host = os.getenv("DB_HOST")
db_port = os.getenv("DB_PORT")
db_name = os.getenv("DB_NAME")

# Экранируем пароль
encoded_password = quote_plus(db_password)

# создаем ссылку на подключение
db_url = f"postgresql+psycopg2://{db_user}:{encoded_password}@{db_host}:{db_port}/{db_name}"

# Создаём движок для подключения по ссылке engine
engine = create_engine(db_url, pool_pre_ping=True) 
# используем дополнительно pool_pre_ping=True
# если соединение «умрёт» между запросами — SQLAlchemy автоматически его заменяет, и вы никогда не увидите ошибку «битого соединения»


logger.info("✅ Подключение к БД готово. Проверяем таблицу...")

2025-09-30 17:06:59,160 | INFO | ✅ Подключение к БД готово. Проверяем таблицу...


In [119]:
# Проверка наличия таблицы и количества записей
query_count = """
SELECT COUNT(*) AS total_count 
FROM dm.test_sale;
"""

try:
    result = pd.read_sql(query_count, engine)
    logger.info(f"📊 В таблице dm.test_sale: {result.iloc[0]['total_count']} строк")
except Exception as e:
    logger.info(f"❌ Ошибка при запросе к таблице: {e}")

2025-09-30 17:06:59,193 | INFO | 📊 В таблице dm.test_sale: 246 строк


In [120]:
# Забираем все id из БД
query_ids = """
SELECT DISTINCT unique_id
FROM dm.test_sale
WHERE unique_id IS NOT NULL;
"""

try:
    existing_df = pd.read_sql(query_ids, engine)
    existing_ids = set(existing_df['unique_id'])
    logger.info(f"✅ Успешно загружено {len(existing_ids)} уникальных ID из БД")
    # Покажем несколько первых
    print("Примеры ID из БД:", list(existing_ids)[:5])
except Exception as e:
    logger.info(f"❌ Не удалось загрузить unique_id: {e}")
    existing_ids = set()

2025-09-30 17:06:59,205 | INFO | ✅ Успешно загружено 246 уникальных ID из БД


Примеры ID из БД: ['.....']


In [121]:
# Проверим, есть ли unique_id в df
if 'unique_id' not in df_new.columns:
    logger.info("❌ Ошибка: в df нет столбца unique_id")
else:
    logger.info(f"✅ В df_new найден столбец unique_id. Всего строк: {len(df_new)}")

    # Преобразуем в set для быстрого сравнения
    new_ids = set(df_new['unique_id'])

    # Находим, сколько уже есть в БД
    already_exists = new_ids & existing_ids 
# Оператор & между двумя set в Python означает пересечение: возвращает новый set, содержащий только те элементы, которые присутствуют в обоих множествах.    
     
    brand_new = new_ids - existing_ids
# возвращает новый set состящий из элементо отсутствующих в новом set

    logger.info(f"🔁 Уже есть в БД: {len(already_exists)}")
    logger.info(f"🆕 Новых ID: {len(brand_new)}")

    # Фильтруем df — только новые строки
    df_new_id = df_new[df_new['unique_id'].isin(brand_new)].copy()

    logger.info(f"✅ Отфильтровано для загрузки: {len(df_new_id)} строк")
    
    # Проверим, нет ли дублей
    if df_new_id['unique_id'].duplicated().any():
        logger.info("⚠️ Найдены дубли по unique_id в новых данных!")
        df_new_id.drop_duplicates(subset=['unique_id'], keep='first', inplace=True)
        logger.info(f"✅ После удаления дублей: {len(df_new_id)} строк")

2025-09-30 17:06:59,212 | INFO | ✅ В df_new найден столбец unique_id. Всего строк: 442
2025-09-30 17:06:59,212 | INFO | 🔁 Уже есть в БД: 187
2025-09-30 17:06:59,213 | INFO | 🆕 Новых ID: 255
2025-09-30 17:06:59,215 | INFO | ✅ Отфильтровано для загрузки: 255 строк


In [122]:
# Забираем все hash_id из БД
query_hash_id = """
SELECT DISTINCT hash_id
FROM dm.test_sale
WHERE unique_id IS NOT NULL;
"""

try:
    existing_df_hash_id = pd.read_sql(query_hash_id, engine)
    existing_df_hash_id = set(existing_df_hash_id['hash_id'])
    logger.info(f"✅ Успешно загружено {len(existing_df_hash_id)} уникальных hash_id из БД")
    # Покажем несколько первых
    print("Примеры ID из БД:", list(existing_df_hash_id)[:5])
except Exception as e:
    logger.info(f"❌ Не удалось загрузить unique_id: {e}")
    existing_ids = set()

2025-09-30 17:06:59,223 | INFO | ✅ Успешно загружено 246 уникальных hash_id из БД


Примеры ID из БД: ['.....']


In [124]:
# Проверим, есть ли hash_id в df
if 'hash_id' not in df_new.columns:
    logger.info("❌ Ошибка: в df нет столбца hash_id")
else:
    logger.info(f"✅ В df_new найден столбец unique_id. Всего строк: {len(df_new)}")

    # Преобразуем в set для быстрого сравнения
    new_hash_id = set(df_new['hash_id'])

    # Находим, сколько уже есть в БД
    already_exists_hash_id = new_hash_id & existing_df_hash_id 
# Оператор & между двумя set в Python означает пересечение: возвращает новый set, содержащий только те элементы, которые присутствуют в обоих множествах.    
     
    brand_new_hash_id = new_hash_id - existing_df_hash_id
# возвращает новый set состящий из элементо отсутствующих в новом set

    logger.info(f"🔁 Уже есть в БД: {len(already_exists_hash_id)}")
    logger.info(f"🆕 Новых ID: {len(brand_new_hash_id)}")

    # Фильтруем df — только новые строки
    df_new_hash_id = df_new[df_new['hash_id'].isin(brand_new_hash_id)].copy()

    logger.info(f"✅ Отфильтровано для загрузки: {len(df_new_hash_id)} строк")
    
    # Проверим, нет ли дублей
    if df_new_hash_id['hash_id'].duplicated().any():
        logger.info("⚠️ Найдены дубли по hash_id в новых данных!")
        df_new_hash_id.drop_duplicates(subset=['hash_id'], keep='first', inplace=True)
        logger.info(f"✅ После удаления дублей: {len(df_new_hash_id)} строк")

2025-09-30 17:06:59,234 | INFO | ✅ В df_new найден столбец unique_id. Всего строк: 442
2025-09-30 17:06:59,234 | INFO | 🔁 Уже есть в БД: 187
2025-09-30 17:06:59,235 | INFO | 🆕 Новых ID: 255
2025-09-30 17:06:59,237 | INFO | ✅ Отфильтровано для загрузки: 255 строк


#### UPSERT через временную таблицу

In [127]:
# Шаг 1: Создаём временную таблицу
df_new.to_sql(
    name='temp_sale',
    con=engine,
    schema='dm', # песочница
    if_exists='replace',  # пересоздаём каждый раз
    index=False,
    method='multi',
    chunksize=1000
)

logger.info("✅ Данные во временной таблице")

2025-09-30 17:06:59,378 | INFO | ✅ Данные во временной таблице


In [128]:
# Шаг 2: Выполняем UPSERT
# задаем переменную обновления строк
upsert_sql = """
    INSERT INTO dm.test_sale AS target
    SELECT 
        nomenclature,
        buyer,
        sale,
        date_doc,
        region,
        distribution_channel,
        quantity, 
        price,
        revenue_no_nds,
        tariff,
        price_in_currency,
        revenue_in_currency,
        rate_currency,
        country,
        incoterms,
        hash_id,
        bisiness_id,
        duplicate_index,
        unique_id,
        load_time
    FROM dm.temp_sale
    ON CONFLICT (unique_id)
    DO UPDATE SET
        nomenclature = EXCLUDED.nomenclature,
        buyer = EXCLUDED.buyer,
        sale = EXCLUDED.sale,
        date_doc = EXCLUDED.date_doc,
        region = EXCLUDED.region,
        distribution_channel = EXCLUDED.distribution_channel,
        quantity = EXCLUDED.quantity,
        price = EXCLUDED.price,
        revenue_no_nds = EXCLUDED.revenue_no_nds,
        tariff = EXCLUDED.tariff,
        price_in_currency = EXCLUDED.price_in_currency,
        revenue_in_currency = EXCLUDED.revenue_in_currency,
        rate_currency = EXCLUDED.rate_currency,
        country = EXCLUDED.country,
        incoterms = EXCLUDED.incoterms,
        hash_id = EXCLUDED.hash_id,
        bisiness_id = EXCLUDED.bisiness_id,
        duplicate_index = EXCLUDED.duplicate_index,
        unique_id = EXCLUDED.unique_id,
        load_time = EXCLUDED.load_time                
    WHERE target.hash_id IS DISTINCT FROM EXCLUDED.hash_id;
"""
# Обновляет только если хэш (hash_id)  изменится


# Выполняем в транзакции
with engine.begin() as conn:
    conn.execute(text(upsert_sql))

logger.info(f"✅ UPSERT выполнен: {len(df_new_id)} строк добавлено/обновлено")

# Шаг 3: Удаляем временную таблицу
with engine.begin() as conn:
    conn.execute(text("DROP TABLE IF EXISTS dm.temp_sale"))
    

2025-09-30 17:06:59,394 | INFO | ✅ UPSERT выполнен: 255 строк добавлено/обновлено


In [129]:
# Проверка сколько стало строк в БД после добавление новых строк
query_count_after = """
SELECT COUNT(*) AS total_count 
FROM dm.test_sale;
"""

result_after = pd.read_sql(query_count_after, engine)
logger.info(f"📊 Теперь в таблице: {result_after.iloc[0]['total_count']} строк")

2025-09-30 17:06:59,407 | INFO | 📊 Теперь в таблице: 501 строк


после запись новых строк получили 246 из таблицы с историческими данными + 255 стр из новой таблицы = 501, все правильно

In [130]:
# Проверка суммы по числовым полям  в БД
query_count_after = """
SELECT 
    sum(quantity) as sum_quantity,
    sum(price)  as sum_price,
    sum(revenue_no_nds)  as sum_revenue_no_nds
FROM dm.test_sale;
"""

result_after = pd.read_sql(query_count_after, engine)
print(result_after)
#logger.info(f"📊 Теперь в таблице: {result_after.iloc[0]['total_count']} строк")

   sum_quantity     sum_price  sum_revenue_no_nds
0      25000.00   50000000.00         70000000.00

#### Тест изменения данных в исходных данных
за основу берем 2й период, числовые поля х2


In [131]:
# Подготовка нового датафрейма исторические + новые данные 

import base64
# Кодируем в UTF-8, затем в Base64
credentials = f"{username}:{password}"
credentials_b64 = base64.b64encode(credentials.encode('utf-8')).decode('ascii')

headers = {
    'Authorization': f'Basic {credentials_b64}',
    'User-Agent': 'Python ETL Script'
}
start_time_all = datetime.now() # время запуска запроса

all_data_new_2 = [] 

# Основной цикл
for date_start, date_end in date_range_chunks(expanded_from_new, expanded_to_new, days=CHUNK_DAYS):
    start_time = datetime.now() # время запуска запроса
    logger.info(f"Запрос: {date_start} – {date_end}")
    attempt = 0
    success = False

    while attempt < 3 and not success: # 3 попытки на перезапуск запроса в случае ошибок, обрыва связи и тп
            attempt += 1
            try:
                # url-ссылка указана внутри цикла, т.к. вне цикла будет использована вся ссылка и перебор параметров не пойдет
                url = f'http://example/sale?BeginDate={date_start}&EndDate={date_end}'
                headers = {
                    'Authorization': f'Basic {credentials_b64}',
                    'User-Agent': 'Python ETL Script'
                }
                
                response = requests.get(
                    url,
                    headers=headers,
                    verify=False  # только для теста!
                )
                if response.status_code == 200:
                    df_new_2 = response.json()
                    all_data_new_2.extend(df_new_2)
                    logger.info(f"✅ Получено: {len(df_new_2)} строк")
                    success = True  # ✅ Устанавливаем успех
                else:
                    print(f"❌ Ошибка: {response.status_code}, {response.text[:200]}")
            except Exception as e:
                    logger.error(f"❌ Ошибка: {e}")
                    success = False  # ❌ Устанавливаем ошибку
            
        # повтор цикла если произошла ошибка 
            if not success and attempt < 3:
                logger.info("Повтор через 5 сек...")
                time.sleep(5)
        
    # расчет времени окончания запроса (чанка)
    duration = datetime.now() - start_time
    logger.info(f"✅ Готово за {duration.total_seconds():.1f} сек")
                
duration_all = datetime.now() - start_time_all
logger.info(f"✅ Вся обработка:  {duration_all.total_seconds():.1f} сек")

# ✅ Теперь сразу обрабатываем
if all_data_new_2:
    df_new_2 = pd.DataFrame(all_data_new_2)
        
    df_new_2['date_doc'] = pd.to_datetime(df_new_2['date_doc'], errors='coerce')
    

        # заполняем пропуски (при первичном анализе выявлены пропуски в полях ниже)
    future = pd.Timestamp('2099-12-31 23:59:59') # заглушка
   df_new_2['date_doc'] = pd.to_datetime(df_new_2['date_doc'], errors='coerce')


    logger.info(f"✅ DataFrame создан: {len(df_new_2)} строк")
    
else:
    logger.info("❌ Нет данных для обработки!")
   

2025-09-30 17:06:59,427 | INFO | Запрос: 20250113 – 20250120
2025-09-30 17:06:59,740 | INFO | ✅ Получено: 53 строк
2025-09-30 17:06:59,741 | INFO | ✅ Готово за 0.3 сек
2025-09-30 17:06:59,742 | INFO | Запрос: 20250120 – 20250127
2025-09-30 17:07:00,025 | INFO | ✅ Получено: 51 строк
2025-09-30 17:07:00,026 | INFO | ✅ Готово за 0.3 сек
2025-09-30 17:07:00,027 | INFO | Запрос: 20250127 – 20250203
2025-09-30 17:07:00,482 | INFO | ✅ Получено: 83 строк
2025-09-30 17:07:00,483 | INFO | ✅ Готово за 0.5 сек
2025-09-30 17:07:00,484 | INFO | Запрос: 20250203 – 20250210
2025-09-30 17:07:00,790 | INFO | ✅ Получено: 51 строк
2025-09-30 17:07:00,791 | INFO | ✅ Готово за 0.3 сек
2025-09-30 17:07:00,792 | INFO | Запрос: 20250210 – 20250217
2025-09-30 17:07:01,165 | INFO | ✅ Получено: 78 строк
2025-09-30 17:07:01,165 | INFO | ✅ Готово за 0.4 сек
2025-09-30 17:07:01,166 | INFO | Запрос: 20250217 – 20250224
2025-09-30 17:07:01,455 | INFO | ✅ Получено: 54 строк
2025-09-30 17:07:01,456 | INFO | ✅ Готово за 

In [133]:
# симитируем изменение числовых параметров через х2, так проще отследить изменения в нашей БД по итогу обновления

df_new_2['quantity'] = df_new_2['quantity']*2
df_new_2['price'] = df_new_2['price']*2
df_new_2['revenue_no_nds'] = df_new_2['revenue_no_nds']*2

display(f"исх.df_new сумма quantity: {df_new['quantity'].sum()}")
display(f"исх.df_new сумма price: {df_new['price'].sum()}")
display(f"исх.df_new сумма revenue_no_nds: {df_new['revenue_no_nds'].sum()}")

display(f"НОВЫЙ.df_new сумма quantity: {df_new_2['quantity'].sum()}")
display(f"НОВЫЙ.df_new сумма price: {df_new_2['price'].sum()}")
display(f"НОВЫЙ.df_new сумма revenue_no_nds: {df_new_2['revenue_no_nds'].sum()}")

'исх.df_new сумма quantity: 15000.00'

'исх.df_new сумма price: 30000000.00'

'исх.df_new сумма revenue_no_nds: 40000000.00'

'НОВЫЙ.df_new сумма quantity: 30000.00'

'НОВЫЙ.df_new сумма price: 60000000.00'

'НОВЫЙ.df_new сумма revenue_no_nds: 80000000.00'

наши числовые поля в сумме в 2 раза больше чем в исходном датафрейме

#### присваиваем hash_id
далее удалим дубликаты по hash_id на случай если получили дублирование при перекрытии чанков

In [135]:
df_new_2['hash_id'] = df_new_2.astype(str).apply(''.join, axis=1).apply(
        lambda x: hashlib.md5(x.encode()).hexdigest()
    )

In [136]:
# проверяем количество уникальных id через хэш
df_new_2['hash_id'].nunique()

442

In [137]:
# проверяем количество дубликатов
display(f"количество дубликатов: {df_new_2.duplicated().sum()}")
display(df_new_2[df_new_2.duplicated()])
# удаляем дубликаты
df_new_2 = df_new_2.drop_duplicates(['hash_id'])
# проверяем размер df
display(len(df_new_2))
logger.info(f"✅ Удалены дубликаты по hash_id")

# дубликаты появляются из-за пограничного времени 00:00:00 в date_doc
# в исходном отчете такие дубликаты отсутствуют

'количество дубликатов: 0'

Unnamed: 0,nomenclature,buyer,sales_doc,date_doc,region,distribution_channel,quantity,price,revenue_no_nds,tariff,price_in_currency,revenue_in_currency,rate_currency,country,incoterms,hash_id


442

2025-09-30 17:07:01,981 | INFO | ✅ Удалены дубликаты по hash_id


#### Проверяем пропуски, в т.ч. пустые значения ('')

In [138]:
# Проверяем наличие пустых строк, заполнены ''

display(f"Столбцы с пустыми строками: {df_new_2.columns[df_new_2.eq('').any()]}")


"Столбцы с пустыми строками: Index(['tariff', 'country', 'incoterms'], dtype='object')"

In [139]:
# заполняем пустные значения, аналогично как с обработкой исторических данных
df_new_2['tariff'] = df_new_2['tariff'].replace('', 0)
df_new_2['country'] = df_new_2['country'].replace('', 'unknown')
df_new_2['incoterms'] = df_new_2['incoterms'].replace('', 'unknown')

# столбцы   ['country'], ['incoterms'] оста.тся без преобразований - обработка на уровне view БД

In [140]:
display(df_new_2.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 442 entries, 0 to 441
Data columns (total 16 columns):
 #   Column                Non-Null Count  Dtype         
---  ------                --------------  -----         
 0   nomenclature          442 non-null    object        
 1   buyer                 442 non-null    object        
 2   sales_doc             442 non-null    object        
 3   date_doc              442 non-null    datetime64[ns]
 4   region                442 non-null    object        
 5   distribution_channel  442 non-null    object        
 6   quantity              442 non-null    float64       
 7   price                 442 non-null    float64       
 8   revenue_no_nds        442 non-null    float64       
 9   tariff                442 non-null    float64       
 10  price_in_currency     442 non-null    float64       
 11  revenue_in_currency   442 non-null    float64       
 12  rate_currency         442 non-null    float64       
 13  country             

None

In [141]:
# удаляем дубликаты
df_new_2 = df_new_2.drop_duplicates()
logger.info(f"✅ Удалены дубликаты")
logger.info(f"В таблице записей: {len(df_new_2)}")

2025-09-30 17:07:02,008 | INFO | ✅ Удалены дубликаты
2025-09-30 17:07:02,009 | INFO | В таблице записей: 442


In [142]:
df_new_2 = bisness_id(df_new_2)
    # Проверяем стабильность
logger.info(f"Уникальных unique_id: {df_new_2['unique_id'].nunique()}")

2025-09-30 17:07:02,019 | INFO | Уникальных unique_id: 442


In [143]:
# Добавим время загрузки
df_new_2['load_time'] = pd.Timestamp.now()

#### Инкрементальная загрузка

In [145]:
# Подключение в БД
# Читаем пароль
db_user = os.getenv("DB_USER")
db_password = os.getenv("DB_PASSWORD")
db_host = os.getenv("DB_HOST")
db_port = os.getenv("DB_PORT")
db_name = os.getenv("DB_NAME")

# Экранируем пароль
encoded_password = quote_plus(db_password)

# создаем ссылку на подключение
db_url = f"postgresql+psycopg2://{db_user}:{encoded_password}@{db_host}:{db_port}/{db_name}"

# Создаём движок для подключения по ссылке engine
engine = create_engine(db_url, pool_pre_ping=True) 

logger.info("✅ Подключение к БД готово. Проверяем таблицу...")

2025-09-30 17:07:02,045 | INFO | ✅ Подключение к БД готово. Проверяем таблицу...


In [146]:
# Проверка наличия таблицы и количества записей
query_count_2 = """
SELECT COUNT(*) AS total_count 
FROM dm.test_sale;
"""

try:
    result_2 = pd.read_sql(query_count_2, engine)
    logger.info(f"📊 В таблице dm.test_sale: {result_2.iloc[0]['total_count']} строк")
except Exception as e:
    logger.info(f"❌ Ошибка при запросе к таблице: {e}")

2025-09-30 17:07:02,079 | INFO | 📊 В таблице dm.test_sale: 501 строк


In [147]:
# Забираем все id из БД
query_ids_2 = """
SELECT DISTINCT unique_id
FROM dm.test_sale
WHERE unique_id IS NOT NULL;
"""

try:
    existing_df_2 = pd.read_sql(query_ids_2, engine)
    existing_df_2 = set(existing_df_2['unique_id'])
    logger.info(f"✅ Успешно загружено {len(existing_df_2)} уникальных ID из БД")
    # Покажем несколько первых
    print("Примеры ID из БД:", list(existing_df_2)[:5])
except Exception as e:
    logger.info(f"❌ Не удалось загрузить unique_id: {e}")
    existing_df_2 = set()

2025-09-30 17:07:02,088 | INFO | ✅ Успешно загружено 501 уникальных ID из БД


Примеры ID из БД: ['....']


In [148]:


# Проверим, есть ли unique_id в df
if 'unique_id' not in df_new_2.columns:
    logger.info("❌ Ошибка: в df нет столбца unique_id")
else:
    logger.info(f"✅ В df_new найден столбец unique_id. Всего строк: {len(df_new_2)}")

    # Преобразуем в set для быстрого сравнения
    new_id_2 = set(df_new_2['unique_id'])

    # Находим, сколько уже есть в БД
    already_exists_2 = new_id_2 & existing_df_2 
# Оператор & между двумя set в Python означает пересечение: возвращает новый set, содержащий только те элементы, которые присутствуют в обоих множествах.    
     
    brand_new_2 = new_id_2 - existing_df_2
# возвращает новый set состящий из элементо отсутствующих в новом set

    logger.info(f"🔁 Уже есть в БД: {len(already_exists_2)}")
    logger.info(f"🆕 Новых ID: {len(brand_new_2)}")

    # Фильтруем df — только новые строки
    df_new_2_filtre = df_new_2[df_new_2['unique_id'].isin(brand_new_2)].copy()

    logger.info(f"✅ Отфильтровано для загрузки: {len(df_new_2_filtre)} строк")
    
    # Проверим, нет ли дублей
    if df_new_2_filtre['hash_id'].duplicated().any():
        logger.info("⚠️ Найдены дубли по unique_id в новых данных!")
        df_new_2_filtre.drop_duplicates(subset=['unique_id'], keep='first', inplace=True)
        logger.info(f"✅ После удаления дублей: {len(df_new_2_filtre)} строк")

2025-09-30 17:07:02,094 | INFO | ✅ В df_new найден столбец unique_id. Всего строк: 442
2025-09-30 17:07:02,094 | INFO | 🔁 Уже есть в БД: 442
2025-09-30 17:07:02,095 | INFO | 🆕 Новых ID: 0
2025-09-30 17:07:02,097 | INFO | ✅ Отфильтровано для загрузки: 0 строк


In [149]:
# Забираем все id из БД
query_ids_hash_id_2 = """
SELECT DISTINCT hash_id
FROM dm.test_sale
WHERE unique_id IS NOT NULL;
"""

try:
    existing_df_hash_id_2 = pd.read_sql(query_ids_hash_id_2, engine)
    existing_df_hash_id_2 = set(existing_df_hash_id_2['hash_id'])
    logger.info(f"✅ Успешно загружено {len(existing_df_hash_id_2)} уникальных hash_id из БД")
    # Покажем несколько первых
    print("Примеры hash_id из БД:", list(existing_df_hash_id_2)[:5])
except Exception as e:
    logger.info(f"❌ Не удалось загрузить hash_id: {e}")
    existing_df_hash_id_2 = set()

2025-09-30 17:07:02,106 | INFO | ✅ Успешно загружено 501 уникальных hash_id из БД


Примеры hash_id из БД: ['.....']


In [151]:


# Проверим, есть ли hash_id в df_new_2
if 'hash_id' not in df_new_2.columns:
    logger.info("❌ Ошибка: в df нет столбца hash_id")
else:
    logger.info(f"✅ В df_new_2 найден столбец hash_id. Всего строк: {len(df_new_2)}")

    # Преобразуем в set для быстрого сравнения
    new_hash_id_2 = set(df_new_2['hash_id'])

    # Находим, сколько уже есть в БД
    already_exists_hash_id_2 = new_hash_id_2 & existing_df_hash_id_2 
# Оператор & между двумя set в Python означает пересечение: возвращает новый set, содержащий только те элементы, которые присутствуют в обоих множествах.    
     
    brand_new_hash_id_2 = new_hash_id_2 - existing_df_hash_id_2
# возвращает новый set состящий из элементо отсутствующих в новом set

    logger.info(f"🔁 Уже есть в БД: {len(already_exists_hash_id_2)}")
    logger.info(f"🆕 Новых ID: {len(brand_new_hash_id_2)}")

    # Фильтруем df — только новые строки
    df_new_hash_id_2 = df_new_2[df_new_2['hash_id'].isin(brand_new_hash_id_2)].copy()

    logger.info(f"✅ Отфильтровано для загрузки: {len(df_new_hash_id_2)} строк")
    
    # Проверим, нет ли дублей
    if df_new_hash_id_2['hash_id'].duplicated().any():
        logger.info("⚠️ Найдены дубли по unique_id в новых данных!")
        df_new_hash_id_2.drop_duplicates(subset=['hash_id'], keep='first', inplace=True)
        logger.info(f"✅ После удаления дублей: {len(df_new_hash_id_2)} строк")

2025-09-30 17:07:02,165 | INFO | ✅ В df_new_2 найден столбец hash_id. Всего строк: 442
2025-09-30 17:07:02,166 | INFO | 🔁 Уже есть в БД: 0
2025-09-30 17:07:02,167 | INFO | 🆕 Новых ID: 442
2025-09-30 17:07:02,169 | INFO | ✅ Отфильтровано для загрузки: 442 строк


#### UPSERT через временную таблицу

In [153]:
# Шаг 1: Создаём временную таблицу
#df_new_2.to_sql(
# передаем всю таблицу SQl сам разберется что обновить при измененном хэш и появлению новых unique_id
df_new_2.to_sql(    
    name='temp_sale',
    con=engine,
    schema='dm', # песочница
    if_exists='replace',  # пересоздаём каждый раз
    index=False,
    method='multi',
    chunksize=1000
)

logger.info("✅ Данные во временной таблице")

2025-09-30 17:07:02,579 | INFO | ✅ Данные во временной таблице


In [154]:
# Шаг 2: Выполняем UPSERT
# задаем переменную обновления строк
upsert_sql = """
    INSERT INTO dm.test_sale AS target
    SELECT 
        nomenclature,
        buyer,
        sale,
        date_doc,
        region,
        distribution_channel,
        quantity, 
        price,
        revenue_no_nds,
        tariff,
        price_in_currency,
        revenue_in_currency,
        rate_currency,
        country,
        incoterms,
        hash_id,
        bisiness_id,
        duplicate_index,
        unique_id,
        load_time
    FROM dm.temp_sale
    ON CONFLICT (unique_id)
    DO UPDATE SET
        nomenclature = EXCLUDED.nomenclature,
        buyer = EXCLUDED.buyer,
        sale = EXCLUDED.sale,
        date_doc = EXCLUDED.date_doc,
        region = EXCLUDED.region,
        distribution_channel = EXCLUDED.distribution_channel,
        quantity = EXCLUDED.quantity,
        price = EXCLUDED.price,
        revenue_no_nds = EXCLUDED.revenue_no_nds,
        tariff = EXCLUDED.tariff,
        price_in_currency = EXCLUDED.price_in_currency,
        revenue_in_currency = EXCLUDED.revenue_in_currency,
        rate_currency = EXCLUDED.rate_currency,
        country = EXCLUDED.country,
        incoterms = EXCLUDED.incoterms,
        hash_id = EXCLUDED.hash_id,
        bisiness_id = EXCLUDED.bisiness_id,
        duplicate_index = EXCLUDED.duplicate_index,
        unique_id = EXCLUDED.unique_id,
        load_time = EXCLUDED.load_time                
    WHERE target.hash_id IS DISTINCT FROM EXCLUDED.hash_id;
"""
# Обновляет только если хэш (hash_id)  изменится


# Выполняем в транзакции
with engine.begin() as conn:
    conn.execute(text(upsert_sql))

logger.info(f"✅ UPSERT выполнен: {len(df_new_2)} строк добавлено/обновлено")

# Шаг 3: Удаляем временную таблицу
with engine.begin() as conn:
    conn.execute(text("DROP TABLE IF EXISTS dm.temp_sale"))
    

2025-09-30 17:07:02,600 | INFO | ✅ UPSERT выполнен: 442 строк добавлено/обновлено


In [155]:
# Проверка сколько стало строк в БД
query_count_after = """
SELECT COUNT(*) AS total_count 
FROM dm.test_sale;
"""

result_after = pd.read_sql(query_count_after, engine)
logger.info(f"📊 Теперь в таблице: {result_after.iloc[0]['total_count']} строк")

2025-09-30 17:07:02,609 | INFO | 📊 Теперь в таблице: 501 строк


In [156]:
# Проверка сколько стало строк в БД
query_count_after = """
SELECT 
    sum(quantity) as sum_quantity,
    sum(price)  as sum_price,
    sum(revenue_no_nds)  as sum_revenue_no_nds
FROM dm.test_sale;
"""

result_after_2 = pd.read_sql(query_count_after, engine)
print(result_after_2)
#logger.info(f"📊 Теперь в таблице: {result_after.iloc[0]['total_count']} строк")

   sum_quantity     sum_price  sum_revenue_no_nds
0     40000.00  80000000.00     110000000.00


In [130]:
# Проверка суммы по числовым полям  в БД
query_count_after = """
SELECT 
    sum(quantity) as sum_quantity,
    sum(price)  as sum_price,
    sum(revenue_no_nds)  as sum_revenue_no_nds
FROM dm.test_sale;
"""

result_after = pd.read_sql(query_count_after, engine)
print(result_after)
#logger.info(f"📊 Теперь в таблице: {result_after.iloc[0]['total_count']} строк")

   sum_quantity     sum_price  sum_revenue_no_nds
0      25000.00   50000000.00         70000000.00

Сравним итоговй результаты по числовым полям: как видно, разница на сумму которую добавили в df_new_2
Инкрементальная загрузка данных прошла успешно.
На данном этапе можем расширить диапазон для загрузки истоирческих данных и записать все в БД. На следующем этапе дорабатываем наш скрипт в части установления динамической даты: 

**Сегодняшняя дата (без времени)**

today = date.today()

**Основной период: последние 30 дней**

DateTo = today - timedelta(days=30)

DateFrom = today

в скрипте оставим только ячейки с парсингом, предобработкой данных и инкрементальной записью в БД
Останется настроить запуск нашего скрипта в cron или в Airflow на пример ежедневно в 3:00 MSK