In [46]:
import os
import pandas as pd
from tqdm import tqdm
import warnings
from pathlib import Path
import re
from collections import defaultdict
import sshtunnel
from clickhouse_connect import get_client

#Настройки отображения
pd.options.display.max_columns = 100
pd.options.display.max_rows = 50

## 1. Соединяем файлы единого формата в общий DataFrame

In [None]:
file_paths = [
    'Путь 1',
    'Путь 2',
    'Путь 3',
    'Путь 4',

]

df_read = pd.concat([pd.read_excel(file) for file in file_paths], ignore_index=True)

# Создаем промежуточный DataFrame для обработки данных
df_for_work = df_read[['Неделя', 'Клиент (вх)', 'Код ТТ', 'Бар-код', 'Адр', 'Адрес (вх)', 'Номенклатура (вх)', 'Подгруппа товара (вх)', 'Подгруппа Отчетности',
                         'Поставщик РЦ', 'Тип поставщика РЦ', 'Производитель','Бренд (вх)', 'Продажи, шт.', 
                         'Продажи в руб.', 'Себестоимость в руб.']]

# Проводим предобработку

#Заменяем формат числовых полей с int и float на string для дальнейшего создания уникального ключа
df_for_work['Код ТТ'] = df_for_work['Код ТТ'].astype('string')
df_for_work['Бар-код'] = df_for_work['Бар-код'].astype('string')

#Устраняем возможные ошибки в данных (лишние пробелы)
df_for_work['Код ТТ'] = df_for_work['Код ТТ'].str.strip()
df_for_work['Бар-код'] = df_for_work['Бар-код'].str.strip()
df_for_work['Клиент (вх)'] = df_for_work['Клиент (вх)'].replace('Окей', 'ОК')

# Добавляем новые столбцы
df_for_work['Год'] = 2025
df_for_work['Месяц'] = 6

# Создаем уникальные ключи для точного определения торговых точек и уникальных SKU
df_for_work['Клиент_КодТТ'] = df_for_work['Клиент (вх)'].str.cat(df_for_work['Код ТТ'], sep='_')
df_for_work['Клиент_БарКод'] = df_for_work['Клиент (вх)'].str.cat(df_for_work['Бар-код'], sep='_')


# Выстраиваем данные в удобном порядке и приводим поля к названиям в базе данных
result_small_frs = df_for_work[['Год', 'Месяц', 'Неделя', 'Клиент (вх)', 'Код ТТ', 'Бар-код', 'Клиент_КодТТ', 'Клиент_БарКод', 'Адр', 'Адрес (вх)', 
                         'Подгруппа товара (вх)', 'Подгруппа Отчетности', 'Бренд (вх)', 'Номенклатура (вх)', 'Поставщик РЦ', 
                         'Производитель', 'Продажи, шт.', 'Продажи в руб.', 'Себестоимость в руб.']]

result_small_frs = result_small_frs.rename(columns={
    'Год':'year',
    'Месяц':'month',
    'Неделя':'week',
    'Клиент (вх)':'client_enter',
    'Код ТТ':'code_tt',
    'Бар-код':'bar_code',
    'Клиент_КодТТ':'client_code_tt',
    'Клиент_БарКод':'client_bar_code', 
    'Адр':'adr', 
    'Адрес (вх)':'adress_enter',
    'Подгруппа товара (вх)':'subgroup_product', 
    'Подгруппа Отчетности':'subgroup_report', 
    'Бренд (вх)':'brand_enter', 
    'Номенклатура (вх)':'product_name_enter',
    'Поставщик РЦ':'supplier',
    'Производитель':'producer',
    'Продажи, шт.':'sales_quantity',
    'Продажи в руб.':'sales_price',
    'Себестоимость в руб.':'cost_price'
})

# Выводим результаты
print(f"Размер датасета (строки, столбцы): {result_small_frs.shape}") 
result_small_frs.head(2)

### Проверяем записи на соответствие справочникам SKU и торговых точек

In [None]:
guide_sku = pd.read_excel("Путь к справочнику")

# Приводим поля к единым названиям
guide_sku = guide_sku.rename(columns={'Клиент_БарКод':'client_bar_code', 
                          'Клиент (вх)':'client_enter',
                          'Бар-код':'bar_code',
                          'Номенклатура (вх)':'product_name_enter',
                          'Подгруппа товара (вх)':'subgroup_product', 
                          'Подгруппа Отчетности':'subgroup_report', 
                          'Поставщик РЦ':'supplier',
                          'Производитель':'producer',
                          'Бренд (вх)':'brand_enter', 
                          'Продажи, шт.':'sales_quantity',
                          'Продажи в руб.':'sales_price',
                          'Себестоимость в руб.':'cost_price',
                          # Последующие столбцы созданы с помощью обработки сырых данных (см. файл create_directories_sku_adress.ipynb) для унификации 
                          # одинаковых SKU, имеющих различия в бар-кодах из-за разных клиентов
                          'Производитель1':'our_producer',
                          'Подгруппа товара1':'our_subgroup_product',
                          'Бренд1':'our_brand',
                          'Газация':'our_sparkling',
                          'Упаковка1':'our_package',
                          'Объем_л':'our_volume',
                          'Вкус':'our_flavor',
                          'Номенклатура1':'our_product_name'})

# Проверка на наличие исключений
unprocessed = result_small_frs[
    ~result_small_frs['client_bar_code'].isin(guide_sku['client_bar_code'])
]
# Формируем удобный формат для удобства работы с файлом в Excel
unprocessed_filled = unprocessed.fillna('Пусто')
unprocessed_filled = unprocessed_filled.groupby(['client_bar_code','client_enter', 'bar_code', 'subgroup_product', 'subgroup_report', 
                     'brand_enter', 'product_name_enter', 'supplier','producer'], as_index=False).agg({'sales_quantity':'sum', 
                                                                                                       'sales_price':'sum', 'cost_price':'sum'})
unprocessed_filled.to_excel('exception_sku.xlsx')
unprocessed_filled

Unnamed: 0,client_bar_code,client_enter,bar_code,subgroup_product,subgroup_report,brand_enter,product_name_enter,supplier,producer,sales_quantity,sales_price,cost_price


In [None]:
guide_adress = pd.read_excel("Путь к справочнику")

# Приводим поля к единым названиям
guide_adress = guide_adress.rename(columns={'Клиент_КодТТ':'client_code_tt', 
                          'Клиент (вх)':'client_enter',
                          'Код ТТ':'code_tt',
                          'Адр':'adr',
                          'Адрес (вх)':'adress_enter', 
                          # Последующие столбцы созданы с помощью обработки сырых данных (см. файл create_directories_sku_adress.ipynb) для группировки 
                          # адресов по городам, регионам и федеральным округам
                          'Адрес1':'our_adress',
                          'Федеральный округ':'our_federal_district',
                          'Регион РФ':'our_region',
                          'Код региона ISO':'iso_code',
                          'Город':'our_city'})

# Проверка на наличие исключений
unprocessed_adress = result_small_frs[
    ~result_small_frs['client_code_tt'].isin(guide_adress['client_code_tt'])
    ]
# Формируем удобный формат для удобства работы с файлом в Excel
unprocessed_adress_adress = unprocessed_adress.groupby(['client_code_tt','client_enter','code_tt', 'adress_enter'], as_index=False).agg({'sales_quantity':'sum'})

unprocessed_adress_adress.to_excel('exception_adress.xlsx')
unprocessed_adress_adress

Unnamed: 0,client_code_tt,client_enter,code_tt,adress_enter,sales_quantity


## 2. Обрабатываем сложные файлы Excel (пустые строки, несколько листов и пр.)

In [None]:
file1 = 'Путь к файлу 1'  # Один лист (заголовки во второй строке)
file2 = 'Путь к файлу 2'  # Два листа (без заголовков)

# Читаем первый файл, где заголовки во второй строке
df1 = pd.read_excel(file1, header=1)

# Получаем имена колонок из первого файла
headers = df1.columns.tolist()

# Читаем второй файл (все листы) без заголовков
xls = pd.ExcelFile(file2)
sheets = xls.sheet_names

df_list = []
for sheet in sheets:
    # Читаем данные без заголовков и вручную назначаем колонки
    df_sheet = pd.read_excel(xls, sheet_name=sheet, header=None)
    df_sheet = df_sheet.iloc[2:]  # Пропускаем первые 2 строки с заголовками
    df_sheet.columns = headers    # Назначаем правильные заголовки
    df_list.append(df_sheet)

# Объединяем все DataFrame
all_dataframes = [df1] + df_list
combined_df = pd.concat(all_dataframes, ignore_index=True)

# Проверка результата
print(f"Размер итогового датасета: {combined_df.shape}")
print(f"Колонки: {combined_df.columns.tolist()}")
combined_df.head()

### Формируем структуру DataFrame идентичную с предыдущим, заполняем пропуски в данных

In [None]:
# Колонка 'Адрес (вх)' присутсвовала, но была пустой. Адреса хранятся в поле 'Адрес'. Исправляем
combined_df['Адрес (вх)'] = combined_df['Адрес']

# Заполняем пробелы в данных
combined_df.loc[
    combined_df['Код ТТ'].isin(['X799', '33Z9', '31IB', '33YN', '30WF', '35BS', '34OD', 'X531',  '366C',  '3ABU', '37BJ', '3AIA', 'X582']) & 
    combined_df['Клиент (вх)'].isna(), 
    'Клиент (вх)'
] = 'Пятерочка'

# Создаем промежуточный DataFrame для обработки данных
df_for_work = combined_df[['Неделя', 'Клиент (вх)', 'Код ТТ', 'Бар-код', 'Адр', 'Адрес (вх)', 'Номенклатура (вх)', 'Подгруппа товара (вх)', 'Подгруппа Отчетности',
                         'Поставщик РЦ', 'Тип поставщика РЦ', 'Производитель','Бренд (вх)', 'Продажи, шт.', 
                         'Продажи в руб.', 'Себестоимость в руб.']]

#Заменяем формат числовых полей с int и float на string для дальнейшего создания уникального ключа
df_for_work['Код ТТ'] = df_for_work['Код ТТ'].astype('string')
df_for_work['Бар-код'] = df_for_work['Бар-код'].astype('string')

#Устраняем возможные ошибки в данных (лишние пробелы)
df_for_work['Код ТТ'] = df_for_work['Код ТТ'].str.strip()
df_for_work['Бар-код'] = df_for_work['Бар-код'].str.strip()
# Если код ТТ 5-значный, приводим к установленному 6-значному формату
df_for_work['Код ТТ'] = df_for_work['Код ТТ'].apply(lambda x: x.zfill(6) if len(x) == 5 else x)


# Добавляем новые столбцы
df_for_work['Год'] = 2025
df_for_work['Месяц'] = 6

# Создаем уникальные ключи для точного определения торговых точек и уникальных SKU
df_for_work['Клиент_КодТТ'] = df_for_work['Клиент (вх)'].str.cat(df_for_work['Код ТТ'], sep='_')
df_for_work['Клиент_БарКод'] = df_for_work['Клиент (вх)'].str.cat(df_for_work['Бар-код'], sep='_')

# Выстраиваем данные в удобном порядке и приводим поля к названиям в базе данных
result_all_files = df_for_work[['Год', 'Месяц', 'Неделя', 'Клиент (вх)', 'Код ТТ', 'Бар-код', 'Клиент_КодТТ', 'Клиент_БарКод', 'Адр', 'Адрес (вх)', 
                         'Подгруппа товара (вх)', 'Подгруппа Отчетности', 'Бренд (вх)', 'Номенклатура (вх)', 'Поставщик РЦ', 
                         'Производитель', 'Продажи, шт.', 'Продажи в руб.', 'Себестоимость в руб.']]

result_all_files = result_all_files.rename(columns={
    'Год':'year',
    'Месяц':'month',
    'Неделя':'week',
    'Клиент (вх)':'client_enter',
    'Код ТТ':'code_tt',
    'Бар-код':'bar_code',
    'Клиент_КодТТ':'client_code_tt',
    'Клиент_БарКод':'client_bar_code', 
    'Адр':'adr', 
    'Адрес (вх)':'adress_enter',
    'Подгруппа товара (вх)':'subgroup_product', 
    'Подгруппа Отчетности':'subgroup_report', 
    'Бренд (вх)':'brand_enter', 
    'Номенклатура (вх)':'product_name_enter',
    'Поставщик РЦ':'supplier',
    'Производитель':'producer',
    'Продажи, шт.':'sales_quantity',
    'Продажи в руб.':'sales_price',
    'Себестоимость в руб.':'cost_price'
})
# Выводим результаты
print(f"Размер датасета (строки, столбцы): {result_all_files.shape}") 
result_all_files.head(2)

### Проверяем записи на соответствие справочникам SKU и торговых точек

In [None]:
# Проверка на наличие исключений
unprocessed = result_all_files[
    ~result_all_files['client_bar_code'].isin(guide_sku['client_bar_code'])
]
# Формируем удобный формат для удобства работы с файлом в Excel
unprocessed_filled = unprocessed.fillna('Пусто')
unprocessed_filled = unprocessed_filled.groupby(['client_bar_code','client_enter', 'bar_code', 'subgroup_product', 'subgroup_report', 
                     'brand_enter', 'product_name_enter', 'supplier','producer'], as_index=False).agg({'sales_quantity':'sum', 
                                                                                                       'sales_price':'sum', 'cost_price':'sum'})
unprocessed_filled.to_excel('exception_sku.xlsx')
unprocessed_filled

Unnamed: 0,client_bar_code,client_enter,bar_code,subgroup_product,subgroup_report,brand_enter,product_name_enter,supplier,producer,sales_quantity,sales_price,cost_price


In [None]:
# Проверка на наличие исключений
unprocessed_adress = result_all_files[
    ~result_all_files['client_code_tt'].isin(guide_adress['client_code_tt'])
    ]
# Формируем удобный формат для удобства работы с файлом в Excel
unprocessed_adress_adress = unprocessed_adress.groupby(['client_code_tt','client_enter','code_tt', 'adress_enter'], as_index=False).agg({'sales_quantity':'sum'})

unprocessed_adress_adress.to_excel('exception_adress.xlsx')
unprocessed_adress_adress

## 3. Объединение и структурирование DataFrame

In [None]:
result_all = pd.concat([result_all_files, result_small_frs], ignore_index=True)

result_all = result_all[['year', 'month', 'week', 'client_enter', 'code_tt', 'bar_code', 'client_code_tt', 'client_bar_code', 'adr', 
    'adress_enter', 'subgroup_product', 'subgroup_report', 'brand_enter', 'product_name_enter', 'supplier', 'producer',
    'sales_quantity', 'sales_price', 'cost_price']]

print(f"Размер датасета (строки, столбцы): {result_all.shape}") 
result_all.head(2)

## 4. Добавление данных в ClickHouse

In [None]:
def append_to_clickhouse(
    df: pd.DataFrame,
    database_name: str = 'Имя БД',
    table_name: str = 'Имя таблицы в БД',
    ssh_host: str = "IP адрес хоста",
    ssh_port: int = 22, # стандартный вариант, но Ваш может не совпадать
    ssh_user: str = "Логин пользователя базы данных",
    ssh_private_key_path: str = "Путь к Вашему приватному ключу",
    ssh_private_key_password: str = "Пароль от приватного ключа",
    clickhouse_host: str = 'Хост ClickHouse',
    clickhouse_port: int = 8123, # стандартный вариант, но Ваш может не совпадать
    clickhouse_user: str = 'Логин юзера ClickHouse',
    clickhouse_password: str = 'Пароль юзера ClickHouse',
    local_bind_port: int = 8123 # Можно оставить 8123 или выбрать другой свободный порт, например 5555
):

    # Приведение типов для совместимости с ClickHouse
    type_conversions = {
        'client_enter': 'string',
        'code_tt': 'string',
        'bar_code': 'string',
        'client_code_tt': 'string',
        'client_bar_code': 'string',
        'adr': 'string',
        'adress_enter': 'string',
        'subgroup_product': 'string',
        'subgroup_report': 'string',
        'brand_enter': 'string',
        'product_name_enter': 'string',
        'supplier': 'string',
        'producer': 'string',
        'sales_quantity': 'int64'
    }
    
    for col, dtype in type_conversions.items():
        if col in df.columns:
            df[col] = df[col].astype(dtype)
    
    print("Подготовленный DataFrame для записи:")
    print(df.dtypes)
    print("\n" + "="*50 + "\n")

    with sshtunnel.SSHTunnelForwarder(
        (ssh_host, ssh_port),
        ssh_username=ssh_user,
        ssh_pkey=ssh_private_key_path,
        ssh_private_key_password=ssh_private_key_password,
        remote_bind_address=(clickhouse_host, clickhouse_port),
        local_bind_address=('127.0.0.1', local_bind_port)
    ) as tunnel:
        print(f"SSH-туннель запущен. Локальный порт: {local_bind_port}")

        try:
            # Подключение к ClickHouse через туннель
            client = get_client(
                host='127.0.0.1',
                port=local_bind_port,
                user=clickhouse_user,
                password=clickhouse_password,
                database=database_name
            )

            # Проверка существования таблицы
            table_exists = client.command(
                f"EXISTS TABLE {database_name}.{table_name}"
            )
            
            if not table_exists:
                raise ValueError(f"Таблица {database_name}.{table_name} не существует!")
            
            # Вставка данных (добавление к существующим)
            client.insert_df(f'{database_name}.{table_name}', df)
            print(f"Данные успешно добавлены в таблицу '{database_name}.{table_name}'.")
            
            # Проверка количества строк после вставки
            row_count = client.command(
                f"SELECT count() FROM {database_name}.{table_name}"
            )
            print(f"Всего строк в таблице после вставки: {row_count}")

            client.close()
            print("Соединение с ClickHouse закрыто.")

        except Exception as e:
            print(f"Ошибка при работе с ClickHouse: {str(e)}")
            raise

    print("SSH-туннель закрыт.")

append_to_clickhouse(result_all)