In [19]:
from sqlalchemy import create_engine, text
import pandas as pd
from config.settings import DATABASES
import datetime

# Логирование времени
start_time = datetime.datetime.now()
print(f"Начало процесса загрузки данных: {start_time}")

# Конфигурация таблиц и параметров
table_name = "data_loader_omsdata"
table_name_temp = "temp_oms_data"

column_check = "patient"
columns_for_update = ['talon', 'source']
sep = ';'
dtype = str
encoding = 'utf-8'

column_mapping = {
    "Талон": "talon",
    "Источник": "source",
    "Статус": "status",
    "Цель": "goal",
    "Пациент": "patient",
    "Дата рождения": "birth_date",
    "Пол": "gender",
    "Код СМО": "smo_code",
    "ЕНП": "enp",
    "Начало лечения": "treatment_start",
    "Окончание лечения": "treatment_end",
    "Врач": "doctor",
    "Посещения": "visits",
    "Посещения в МО": "mo_visits",
    "Посещения на Дому": "home_visits",
    "Диагноз основной (DS1)": "main_diagnosis",
    "Сопутствующий диагноз (DS2)": "additional_diagnosis",
    "Первоначальная дата ввода": "initial_input_date",
    "Дата последнего изменения": "last_change_date",
    "Сумма": "amount",
    "Санкции": "sanctions",
    "КСГ": "ksg",
    "Отчетный период выгрузки": "report_period",
}

# Настройка подключения к базе данных
postgres_settings = DATABASES['default']
engine = create_engine(
    f'postgresql://{postgres_settings["USER"]}:{postgres_settings["PASSWORD"]}@{postgres_settings["HOST"]}:{postgres_settings["PORT"]}/{postgres_settings["NAME"]}'
)

# Счетчики для отслеживания процесса
row_counts = {}

# Подсчёт строк в основной таблице до обработки
with engine.connect() as connection:
    initial_count_query = f"SELECT COUNT(*) FROM {table_name};"
    row_counts["before_processing"] = connection.execute(text(initial_count_query)).scalar()

print(f"Количество строк в {table_name} до обработки: {row_counts['before_processing']}")

# Чтение CSV
try:
    df = pd.read_csv(r'C:\Users\frdro\Downloads\Telegram Desktop\journal_20241004(2).csv', sep=sep, dtype=str, encoding=encoding)
except Exception as e:
    print(f"Ошибка при чтении CSV: {e}")
    raise

# Переименование и фильтрация столбцов
df = df[list(column_mapping.keys())].rename(columns=column_mapping)
df.dropna(subset=[column_check], inplace=True)
df.fillna('-', inplace=True)
df = df.replace('`', '', regex=True)
df.replace('\u00A0', ' ', regex=True, inplace=True)
df = df.astype(str)

# Удаление дубликатов из DataFrame
df = df.drop_duplicates(subset=columns_for_update)

# Подсчёт строк после чтения CSV
row_counts["after_reading_csv"] = len(df)
print(f"Количество строк в CSV после чтения и обработки: {row_counts['after_reading_csv']}")

# Проверка существования временной таблицы и её очистка
with engine.connect() as connection:
    exists = connection.execute(
        text(f"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = '{table_name_temp}')")
    ).scalar()
    if exists:
        connection.execute(text(f"TRUNCATE TABLE {table_name_temp};"))
        print(f"Таблица {table_name_temp} очищена")
    else:
        print(f"Таблица {table_name_temp} отсутствует, будет создана автоматически при загрузке данных")

# Вставка новых данных во временную таблицу
df.to_sql(table_name_temp, engine, if_exists='replace', index=False)

# Создание индексов
with engine.connect() as connection:
    create_index_query = f"""
    DO $$ BEGIN
        IF NOT EXISTS (
            SELECT 1 
            FROM pg_indexes 
            WHERE schemaname = 'public' 
              AND tablename = '{table_name}' 
              AND indexname = 'idx_{table_name}_update'
        ) THEN
            CREATE INDEX idx_{table_name}_update
            ON {table_name} ({', '.join(columns_for_update)});
        END IF;
    END $$;
    """
    connection.execute(text(create_index_query))
    print("Индекс создан для основной таблицы.")

# Вычисление строк для обновления и вставки
with engine.connect() as connection:
    rows_to_update_query = f"""
    SELECT COUNT(*)
    FROM {table_name_temp} AS temp
    INNER JOIN {table_name} AS target
    ON { ' AND '.join([f'temp.{col} = target.{col}' for col in columns_for_update]) }
    """
    row_counts["to_update"] = connection.execute(text(rows_to_update_query)).scalar()

    rows_to_insert_query = f"""
    SELECT COUNT(*)
    FROM {table_name_temp} AS temp
    LEFT JOIN {table_name} AS target
    ON { ' AND '.join([f'temp.{col} = target.{col}' for col in columns_for_update]) }
    WHERE target.{columns_for_update[0]} IS NULL
    """
    row_counts["to_insert"] = connection.execute(text(rows_to_insert_query)).scalar()

print(f"Количество строк для обновления: {row_counts['to_update']}")
print(f"Количество строк для добавления: {row_counts['to_insert']}")

# Получение недостающих столбцов
with engine.connect() as connection:
    table_columns_query = f"""
    SELECT column_name
    FROM information_schema.columns
    WHERE table_name = '{table_name}'
    """
    table_columns = [row[0] for row in connection.execute(text(table_columns_query)).fetchall()]

    temp_table_columns_query = f"""
    SELECT column_name
    FROM information_schema.columns
    WHERE table_name = '{table_name_temp}'
    """
    temp_table_columns = [row[0] for row in connection.execute(text(temp_table_columns_query)).fetchall()]

missing_columns = [col for col in table_columns if col not in temp_table_columns and col != 'id']

# Формирование и выполнение SQL-запроса
merge_query = f"""
INSERT INTO {table_name} ({', '.join(column_mapping.values()) + ', ' + ', '.join(missing_columns)})
SELECT {', '.join(column_mapping.values())}, {', '.join([f"'-' AS {col}" for col in missing_columns])}
FROM {table_name_temp}
ON CONFLICT ({', '.join(columns_for_update)})
DO UPDATE SET
{', '.join([f"{col} = EXCLUDED.{col}" for col in column_mapping.values() if col not in columns_for_update])},
{', '.join([f"{col} = COALESCE(EXCLUDED.{col}, '-')" for col in missing_columns])};
"""

# Удаляем символы новой строки
merge_query = " ".join(merge_query.split())


try:
    with engine.begin() as connection:
        connection.execute(text(merge_query))

    print(f"Данные успешно вставлены/обновлены в таблице {table_name}.")
except Exception as e:
    print(f"Ошибка при выполнении merge-запроса: {e}")
# Подсчёт строк после обработки
with engine.connect() as connection:
    final_count_query = f"SELECT COUNT(*) FROM {table_name};"
    row_counts["after_processing"] = connection.execute(text(final_count_query)).scalar()

print(f"Количество строк в {table_name} после загрузки и обновления: {row_counts['after_processing']}")

# Итоговая статистика
end_time = datetime.datetime.now()
print("Итоговые показатели обработки:")
print(f"1. Количество строк в {table_name} до обработки: {row_counts['before_processing']}")
print(f"2. Количество строк в CSV после чтения и обработки: {row_counts['after_reading_csv']}")
print(f"3. Количество строк обновленных в {table_name}: {row_counts['to_update']}")
print(f"4. Количество строк добавленных в {table_name}: {row_counts['to_insert']}")
print(f"5. Количество строк в {table_name} после загрузки и обновления: {row_counts['after_processing']}")
print(f"Общее время выполнения: {end_time - start_time}")


In [20]:
from sqlalchemy import create_engine, text
import pandas as pd
from config.settings import DATABASES
import datetime

In [21]:
# Настройки
file_path = r'C:\Users\frdro\Downloads\Telegram Desktop\journal_20241004(2).csv'
table_name = "data_loader_omsdata"
table_name_temp = "temp_oms_data"
column_check = "patient"
columns_for_update = ['talon', 'source']
sep = ';'
dtype = str
encoding = 'utf-8'

column_mapping = {
    "Талон": "talon",
    "Источник": "source",
    "Статус": "status",
    "Цель": "goal",
    "Пациент": "patient",
    "Дата рождения": "birth_date",
    "Пол": "gender",
    "Код СМО": "smo_code",
    "ЕНП": "enp",
    "Начало лечения": "treatment_start",
    "Окончание лечения": "treatment_end",
    "Врач": "doctor",
    "Посещения": "visits",
    "Посещения в МО": "mo_visits",
    "Посещения на Дому": "home_visits",
    "Диагноз основной (DS1)": "main_diagnosis",
    "Сопутствующий диагноз (DS2)": "additional_diagnosis",
    "Первоначальная дата ввода": "initial_input_date",
    "Дата последнего изменения": "last_change_date",
    "Сумма": "amount",
    "Санкции": "sanctions",
    "КСГ": "ksg",
    "Отчетный период выгрузки": "report_period",
}

# Настройка подключения к базе данных
postgres_settings = DATABASES['default']
engine = create_engine(
    f'postgresql://{postgres_settings["USER"]}:{postgres_settings["PASSWORD"]}@{postgres_settings["HOST"]}:{postgres_settings["PORT"]}/{postgres_settings["NAME"]}'
)


In [22]:
# Логирование времени
start_time = datetime.datetime.now()
print(f"Начало процесса загрузки данных: {start_time}")

# Счетчики для отслеживания процесса
row_counts = {}

# Подсчёт строк в основной таблице до обработки
with engine.connect() as connection:
    initial_count_query = f"SELECT COUNT(*) FROM {table_name};"
    row_counts["before_processing"] = connection.execute(text(initial_count_query)).scalar()

print(f"Количество строк в {table_name} до обработки: {row_counts['before_processing']}")

Начало процесса загрузки данных: 2025-01-16 00:47:42.194476
Количество строк в data_loader_omsdata до обработки: 658396


In [23]:
# Чтение и обработка CSV файла
try:
    df = pd.read_csv(file_path, sep=sep, dtype=str, encoding=encoding)
except Exception as e:
    print(f"Ошибка при чтении CSV: {e}")
    raise

# Переименование и фильтрация столбцов
df = df[list(column_mapping.keys())].rename(columns=column_mapping)
df.dropna(subset=[column_check], inplace=True)
df.fillna('-', inplace=True)
df = df.replace('`', '', regex=True)
df.replace('\u00A0', ' ', regex=True, inplace=True)
df = df.astype(str)

# Удаление дубликатов из DataFrame
df = df.drop_duplicates(subset=columns_for_update)

# Подсчёт строк после чтения CSV
row_counts["after_reading_csv"] = len(df)
print(f"Количество строк в CSV после чтения и обработки: {row_counts['after_reading_csv']}")

Количество строк в CSV после чтения и обработки: 597711


In [24]:
# Проверка существования временной таблицы и её очистка
with engine.connect() as connection:
    exists = connection.execute(
        text(f"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = '{table_name_temp}')")
    ).scalar()
    if exists:
        connection.execute(text(f"TRUNCATE TABLE {table_name_temp};"))
        print(f"Таблица {table_name_temp} очищена")
    else:
        print(f"Таблица {table_name_temp} отсутствует, будет создана автоматически при загрузке данных")

# Вставка новых данных во временную таблицу
df.to_sql(table_name_temp, engine, if_exists='replace', index=False)


Таблица temp_oms_data очищена


711

In [25]:
# Создание индексов
with engine.connect() as connection:
    create_index_query = f"""
    DO $$ BEGIN
        IF NOT EXISTS (
            SELECT 1 
            FROM pg_indexes 
            WHERE schemaname = 'public' 
              AND tablename = '{table_name}' 
              AND indexname = 'idx_{table_name}_update'
        ) THEN
            CREATE INDEX idx_{table_name}_update
            ON {table_name} ({', '.join(columns_for_update)});
        END IF;
    END $$;
    """
    create_index_query_temp = f"""
    DO $$ BEGIN
        IF NOT EXISTS (
            SELECT 1 
            FROM pg_indexes 
            WHERE schemaname = 'public' 
              AND tablename = '{table_name_temp}' 
              AND indexname = 'idx_{table_name_temp}_update'
        ) THEN
            CREATE INDEX idx_{table_name_temp}_update
            ON {table_name_temp} ({', '.join(columns_for_update)});
        END IF;
    END $$;
    """
    connection.execute(text(create_index_query))
    print(f"Индекс создан для {table_name}.")

    connection.execute(text(create_index_query_temp))

    print(f"Индекс создан для {table_name_temp}.")


Индекс создан для data_loader_omsdata.
Индекс создан для temp_oms_data.


In [26]:
# Вычисление строк для обновления и вставки
with engine.connect() as connection:
    rows_to_update_query = f"""
    SELECT COUNT(*)
    FROM {table_name_temp} AS temp
    INNER JOIN {table_name} AS target
    ON { ' AND '.join([f'temp.{col} = target.{col}' for col in columns_for_update]) }
    """
    row_counts["to_update"] = connection.execute(text(rows_to_update_query)).scalar()

    rows_to_insert_query = f"""
    SELECT COUNT(*)
    FROM {table_name_temp} AS temp
    LEFT JOIN {table_name} AS target
    ON { ' AND '.join([f'temp.{col} = target.{col}' for col in columns_for_update]) }
    WHERE target.{columns_for_update[0]} IS NULL
    """
    row_counts["to_insert"] = connection.execute(text(rows_to_insert_query)).scalar()

print(f"Количество строк для обновления: {row_counts['to_update']}")
print(f"Количество строк для добавления: {row_counts['to_insert']}")

Количество строк для обновления: 597711
Количество строк для добавления: 0


In [27]:
# Получение недостающих столбцов
with engine.connect() as connection:
    table_columns_query = f"""
    SELECT column_name
    FROM information_schema.columns
    WHERE table_name = '{table_name}'
    """
    table_columns = [row[0] for row in connection.execute(text(table_columns_query)).fetchall()]

    temp_table_columns_query = f"""
    SELECT column_name
    FROM information_schema.columns
    WHERE table_name = '{table_name_temp}'
    """
    temp_table_columns = [row[0] for row in connection.execute(text(temp_table_columns_query)).fetchall()]

missing_columns = [col for col in table_columns if col not in temp_table_columns and col != 'id']


In [28]:
# Формирование и выполнение SQL-запроса
merge_query = f"""
INSERT INTO {table_name} ({', '.join(column_mapping.values()) + ', ' + ', '.join(missing_columns)})
SELECT {', '.join(column_mapping.values())}, {', '.join([f"'-' AS {col}" for col in missing_columns])}
FROM {table_name_temp}
ON CONFLICT ({', '.join(columns_for_update)})
DO UPDATE SET
{', '.join([f"{col} = EXCLUDED.{col}" for col in column_mapping.values() if col not in columns_for_update])},
{', '.join([f"{col} = COALESCE(EXCLUDED.{col}, '-')" for col in missing_columns])};
"""

# Удаляем символы новой строки
merge_query = " ".join(merge_query.split())


try:
    with engine.begin() as connection:
        connection.execute(text(merge_query))

    print(f"Данные успешно вставлены/обновлены в таблице {table_name}.")
except Exception as e:
    print(f"Ошибка при выполнении merge-запроса: {e}")

Данные успешно вставлены/обновлены в таблице data_loader_omsdata.


In [29]:
# Подсчёт строк после обработки
with engine.connect() as connection:
    final_count_query = f"SELECT COUNT(*) FROM {table_name};"
    row_counts["after_processing"] = connection.execute(text(final_count_query)).scalar()

print(f"Количество строк в {table_name} после загрузки и обновления: {row_counts['after_processing']}")

# Итоговая статистика
end_time = datetime.datetime.now()
print("Итоговые показатели обработки:")
print(f"1. Количество строк в {table_name} до обработки: {row_counts['before_processing']}")
print(f"2. Количество строк в CSV после чтения и обработки: {row_counts['after_reading_csv']}")
print(f"3. Количество строк обновленных в {table_name}: {row_counts['to_update']}")
print(f"4. Количество строк добавленных в {table_name}: {row_counts['to_insert']}")
print(f"5. Количество строк в {table_name} после загрузки и обновления: {row_counts['after_processing']}")
print(f"Общее время выполнения: {end_time - start_time}")

Количество строк в data_loader_omsdata после загрузки и обновления: 658396
Итоговые показатели обработки:
1. Количество строк в data_loader_omsdata до обработки: 658396
2. Количество строк в CSV после чтения и обработки: 597711
3. Количество строк обновленных в data_loader_omsdata: 597711
4. Количество строк добавленных в data_loader_omsdata: 0
5. Количество строк в data_loader_omsdata после загрузки и обновления: 658396
Общее время выполнения: 0:01:55.211622
