# Выполнение индвидуальных заданий по практической работе №8. "Анализ метода загрузки данных". Зацепин Никита Алексеевич АБП-231

In [None]:
# Определение вспомогательных функций
def execute_sql(sql_query, fetch=False):
    """Выполняет SQL-запрос и опционально возвращает результаты."""
    if not connection or not cursor:
        print("Нет подключения к БД.")
        return None
    try:
        cursor.execute(sql_query)
        if fetch:
            results = cursor.fetchall()
            return results
        else:
            return True 
    except (Exception, Error) as error:
        print(f"Ошибка выполнения SQL: {error}")
        return None
    
def create_table(tbl_name):
    """Создает стандартную таблицу для данных продаж, удаляя ее, если она существует."""
    print(f"\nПопытка создать таблицу: {tbl_name}")
    drop_success = execute_sql(f"DROP TABLE IF EXISTS {tbl_name};")
    if drop_success is None: 
        print(f"Не удалось выполнить DROP TABLE для {tbl_name}. Создание таблицы отменено.")
        return 
    create_query = f"""
    CREATE TABLE {tbl_name} (
        id INTEGER PRIMARY KEY,         -- Уникальный идентификатор, первичный ключ
        quantity INTEGER,               -- Количество
        cost NUMERIC(10, 2),            -- Стоимость (NUMERIC для точности)
        total_revenue NUMERIC(12, 2)    -- Общая выручка (NUMERIC для точности)
    );
    """
    print(f"Запрос на создание таблицы {tbl_name}:\n{create_query}")

    create_success = execute_sql(create_query)
    if create_success:
        print(f"Таблица '{tbl_name}' успешно создана.")
    else:
        print(f"Не удалось создать таблицу '{tbl_name}'.")


In [None]:

def load_via_copy_stringio(df, tbl_name):
    """Загружает данные из DataFrame через StringIO, используя copy_expert."""
    if not connection or not cursor or df is None:
        print("Нет подключения к БД или DataFrame пуст.")
        return False 
    print(f"Загрузка данных в '{tbl_name}' с использованием copy_expert (StringIO)...")
    start_time = time.time()
    buffer = io.StringIO() 
    try:
        df.to_csv(buffer, index=False, header=True, sep=',')
    except Exception as e:
         print(f"Ошибка конвертации DataFrame в CSV: {e}")
         buffer.close() 
    sql_copy = f"COPY {tbl_name} FROM STDIN WITH (FORMAT CSV, HEADER TRUE, DELIMITER ',')"
    try:
        cursor.copy_expert(sql=sql_copy, file=buffer)
        duration = time.time() - start_time
        print(f"Успешно: Загрузка (StringIO) в '{tbl_name}' завершена за {duration:.2f} сек.")
        return True 
    except (Exception, Error) as error:
        print(f"ОШИБКА при выполнении copy_expert (StringIO) для '{tbl_name}': {error}")
        return False
    finally:
        buffer.close() 

def load_via_copy_file(file_path, tbl_name):
    """Загружает данные напрямую из CSV файла, используя copy_expert."""
    if not connection or not cursor:
       print("Нет подключения к БД.")
       return False 
    if not os.path.exists(file_path):
        print(f"ОШИБКА: Файл '{file_path}' не найден.")
        return False 
    print(f"Загрузка данных в '{tbl_name}' с использованием copy_expert (file: {os.path.basename(file_path)})...")
    start_time = time.time()
    sql_copy = f"COPY {tbl_name} FROM STDIN WITH (FORMAT CSV, HEADER TRUE, DELIMITER ',')"
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            cursor.copy_expert(sql=sql_copy, file=f)
        duration = time.time() - start_time
        print(f"Успешно: Загрузка (file) в '{tbl_name}' завершена за {duration:.2f} сек.")
        return True
    except (Exception, Error) as error:
        print(f"ОШИБКА при выполнении copy_expert (file) для '{tbl_name}': {error}")
        return False
print("Вспомогательные функции определены.")

Вспомогательные функции определены.


In [None]:

def load_df_from_sql(sql_query):
    """Выполняет SQL-запрос и загружает результаты в Pandas DataFrame."""
    if not connection:
        print("Нет подключения к БД для загрузки DataFrame.")
        return None
    print(f"Загрузка данных из SQL в DataFrame: {sql_query[:100]}...")
    try:
    
        df = pd.read_sql_query(sql_query, connection)
        print(f"Успешно: Загружено {len(df)} строк в DataFrame.")
        return df 
    except (Exception, Error) as error:
        print(f"ОШИБКА при загрузке DataFrame из SQL: {error}")
        return None 
print("Вспомогательные функции определены.")

Вспомогательные функции определены.


In [6]:
!pip install psycopg2-binary pandas sqlalchemy matplotlib numpy




[notice] A new release of pip is available: 24.0 -> 25.0.1
[notice] To update, run: C:\Users\User\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


In [None]:

"""
Мини-проект: Решение для Варианта 21
"""
import psycopg2
from psycopg2 import Error
from psycopg2 import extras 
import pandas as pd
from sqlalchemy import create_engine
import io 
import time
import matplotlib.pyplot as plt
import numpy as np
import os 

DB_USER = "postgres"
DB_PASSWORD = "nekit"
DB_HOST = "localhost"
DB_PORT = "5432"
DB_NAME = "pracei"

small_table_name = 'sales_small'
big_table_name = 'sales_big'
small_csv_path = 'upload_test_data.csv'
big_csv_path = 'upload_test_data_big.csv'

def connect_db():
    try:
        connection = psycopg2.connect(
            dbname=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD,
            host=DB_HOST,
            port=DB_PORT
        )
        cursor = connection.cursor()
        print("Успешное подключение к PostgreSQL")
        return connection, cursor
    except Exception as e:
        print(f"Ошибка подключения к PostgreSQL: {e}")
        return None, None

def create_table(table_name):
    try:
        cursor.execute(f"""
        CREATE TABLE {table_name} (
            id INTEGER PRIMARY KEY,
            quantity INTEGER,
            cost NUMERIC(10, 2),
            total_revenue NUMERIC(12, 2)
        );
        """)
        connection.commit()
        print(f"Таблица {table_name} успешно создана или уже существует")
    except Exception as e:
        print(f"Ошибка при создании таблицы {table_name}: {e}")

def load_via_copy_file(file_path, table_name):
    try:
        with open(file_path, 'r') as f:
            cursor.copy_expert(f"""
            COPY {table_name}(id, quantity, cost, total_revenue) 
            FROM STDIN WITH CSV HEADER DELIMITER ','
            """, f)
        connection.commit()
        print(f"Данные из {file_path} успешно загружены в {table_name}")
    except Exception as e:
        print(f"Ошибка при загрузке данных в {table_name}: {e}")

def execute_sql(query, fetch=False):
    try:
        cursor.execute(query)
        connection.commit()
        if fetch:
            return cursor.fetchall()
        return True
    except Exception as e:
        print(f"Ошибка выполнения SQL запроса: {e}")
        return None

def load_df_from_sql(query):
    try:
        return pd.read_sql(query, connection)
    except Exception as e:
        print(f"Ошибка загрузки данных в DataFrame: {e}")
        return None


connection, cursor = connect_db()
if not connection or not cursor:
    print("Подключение к базе данных неактивно. Пожалуйста, выполните настройку подключения.")
else:
    print("--- Запуск Варианта 21 (Упрощенная загрузка) ---")

    print("\n--- Задача 1: Создание таблиц ---")
    create_table(small_table_name)
    create_table(big_table_name)
   
    print(f"\n--- Задача 2: Загрузка данных из '{small_csv_path}' в '{small_table_name}' (метод StringIO) ---")
    if os.path.exists(small_csv_path):
        try:
            print(f"Чтение {small_csv_path} в DataFrame...")
            df_small_for_load = pd.read_csv(small_csv_path)
            print(f"Прочитано {len(df_small_for_load)} строк.")

            load_via_copy_stringio(df_small_for_load, small_table_name)

        except pd.errors.EmptyDataError:
             print(f"ОШИБКА: Файл '{small_csv_path}' пуст или имеет неверный формат.")
        except MemoryError:
             print(f"ОШИБКА: Недостаточно памяти для загрузки '{small_csv_path}' в DataFrame.")
        except Exception as e:
            print(f"ОШИБКА при чтении файла '{small_csv_path}': {e}")
    else:
        print(f"ОШИБКА: Файл '{small_csv_path}' не найден. Загрузка не выполнена.")

    print(f"\n--- Задача 3: Загрузка данных из '{big_csv_path}' в '{big_table_name}' (метод file) ---")
    if os.path.exists(big_csv_path):
        load_via_copy_file(big_csv_path, big_table_name)
    else:
        print(f"ОШИБКА: Файл '{big_csv_path}' не найден. Загрузка не выполнена.")

    print("\n--- Задача 4: SQL Анализ таблицы sales_small ---")
    sql_query_task4 = f"""
    SELECT (COUNT(CASE WHEN total_revenue > 100 THEN 1 END) * 100.0 / 
    COUNT(*)) AS percentage_above_100
    FROM {small_table_name};
    """
    print("Выполнение SQL запроса:")
    print(sql_query_task4)
    results_task4 = execute_sql(sql_query_task4, fetch=True)

    if results_task4 is not None:
        print("\nРезультаты запроса (id, total_revenue):")
        if results_task4:
            for row in results_task4:
                print(row)
        else:
            print("Запрос успешно выполнен, но не вернул строк.")
    else:
        print("Ошибка выполнения SQL запроса.")

print("\n--- Задача 5: Визуализация данных из sales_big с помощью Python ---")
sql_query_task5 = f"SELECT quantity, cost FROM {small_table_name} LIMIT 5000;"
print(f"Получение данных для графика: {sql_query_task5}")

df_plot_data = load_df_from_sql(sql_query_task5)

if df_plot_data is not None and not df_plot_data.empty:
        print(f"Загружено {len(df_plot_data)} строк для построения графика.")
        df_plot_data['color'] = np.where(df_plot_data['cost'] > 5, 'red', 'blue')
        fig, ax = plt.subplots(figsize=(10, 6))
        ax.scatter(df_plot_data['quantity'], df_plot_data['cost'], c=df_plot_data['color'], alpha=0.6, s=15)
        ax.set_xlabel('Количество (Quantity)')
        ax.set_ylabel('Стоимость (Cost)')
        ax.set_title('Рассчет медианного значения cost из DataFrame sales_small.')
        ax.grid(True, linestyle='--', alpha=0.6)
        plt.show()
elif df_plot_data is not None and df_plot_data.empty:
        print("Запрос выполнен, но данные из sales_big для графика не получены.")
else:
        print("Не удалось загрузить данные из sales_big для построения графика.")


Успешное подключение к PostgreSQL
--- Запуск Варианта 30 (Упрощенная загрузка) ---

--- Задача 1: Создание таблиц ---
Ошибка при создании таблицы sales_small: ОШИБКА:  отношение "sales_small" уже существует

Ошибка при создании таблицы sales_big: ОШИБКА:  текущая транзакция прервана, команды до конца блока транзакции игнорируются


--- Задача 2: Загрузка данных из 'upload_test_data.csv' в 'sales_small' (метод StringIO) ---
Чтение upload_test_data.csv в DataFrame...
Прочитано 1000 строк.
Загрузка данных в 'sales_small' с использованием copy_expert (StringIO)...
ОШИБКА при выполнении copy_expert (StringIO) для 'sales_small': ОШИБКА:  текущая транзакция прервана, команды до конца блока транзакции игнорируются


--- Задача 3: Загрузка данных из 'upload_test_data_big.csv' в 'sales_big' (метод file) ---
Ошибка при загрузке данных в sales_big: ОШИБКА:  текущая транзакция прервана, команды до конца блока транзакции игнорируются


--- Задача 4: SQL Анализ таблицы sales_small ---
Выполнение SQL 

  return pd.read_sql(query, connection)
