## Подготовка данных для миграции в базу данных

In [151]:
import pandas as pd
import warnings

warnings.filterwarnings('ignore')

In [152]:
# Путь к файлу с данными
file_path = "retail_data.xlsx"

In [153]:
# Загружаем таблицы
df_transaction = pd.read_excel(file_path, sheet_name="transaction")
df_customer = pd.read_excel(file_path, sheet_name="customer")

In [154]:
# Группируем по product_id и характеристикам товара и считаем количество повторений каждой комбинации
product_counts = df_transaction.groupby(["product_id", 
                                         "brand", 
                                         "product_line", 
                                         "product_class", 
                                         "product_size"]).size().reset_index(name="count")

product_counts.head()

Unnamed: 0,product_id,brand,product_line,product_class,product_size,count
0,0,Giant Bicycles,Standard,medium,large,47
1,0,Giant Bicycles,Standard,medium,medium,58
2,0,Norco Bicycles,Road,medium,medium,114
3,0,Norco Bicycles,Standard,low,medium,61
4,0,Norco Bicycles,Standard,medium,medium,66


Из вывода видно, что у одного product_id могут быть разные характеристики товара. 
Необходимо определить правильные характеристики для каждого product_id.

In [155]:
# Определяем самую часто встречающуюся комбинацию для каждого product_id
correct_products = product_counts.loc[product_counts.groupby("product_id")["count"].idxmax()]
correct_products.head()

Unnamed: 0,product_id,brand,product_line,product_class,product_size,count
12,0,Solex,Standard,medium,medium,206
18,1,Giant Bicycles,Standard,medium,medium,270
21,2,Solex,Standard,medium,medium,194
22,3,Trek Bicycles,Standard,medium,large,354
23,4,Giant Bicycles,Standard,high,medium,180


In [156]:
# Создадим словарь соответствий product_id -> наиболее частая комбинация характеристик
product_replacements = correct_products.set_index("product_id")[["brand", 
                                                                 "product_line", 
                                                                 "product_class", 
                                                                 "product_size"]].to_dict(orient="index")

In [157]:
# Функция для замены неправильных значений на правильные
def replace_product_info(row):
    if row["product_id"] in product_replacements:
        correct_info = product_replacements[row["product_id"]]
        row["brand"] = correct_info["brand"]
        row["product_line"] = correct_info["product_line"]
        row["product_class"] = correct_info["product_class"]
        row["product_size"] = correct_info["product_size"]
    return row

In [158]:
# Применяем замену к каждому ряду в df_transaction
df_transaction = df_transaction.apply(replace_product_info, axis=1)

In [159]:
# Преобразуем тип данных online_order в булевый, а дату рождения в datetime
df_transaction["online_order"] = df_transaction["online_order"].astype(bool)
df_customer["DOB"] = pd.to_datetime(df_customer["DOB"], errors='coerce')

In [160]:
# Удаляем строки с пропущенными значениями из df_transaction и df_customer
df_transaction = df_transaction.dropna()
df_customer = df_customer.dropna()

In [161]:
# Оставляем только те транзакции, которые соответствуют существующим клиентам
valid_customers = set(df_customer['customer_id'])
df_transaction = df_transaction[df_transaction['customer_id'].isin(valid_customers)]

In [162]:
df_customer.columns

Index(['customer_id', 'first_name', 'last_name', 'gender', 'DOB', 'job_title',
       'job_industry_category', 'wealth_segment', 'deceased_indicator',
       'owns_car', 'address', 'postcode', 'state', 'country',
       'property_valuation'],
      dtype='object')

In [163]:
# Получаем уникальные значения пола
df_customer['gender'].unique()

array(['F', 'Male', 'Female', 'U', 'Femal'], dtype=object)

In [164]:
# Получаем уникальные значения штатов
df_customer['state'].unique()

array(['New South Wales', 'QLD', 'VIC', 'NSW', 'Victoria'], dtype=object)

Анализ данных показывает необходимость стандартизации:

- Пол: 5 вариантов записи ('F', 'Male', 'Female', 'U', 'Femal')
- Штаты: 5 вариантов записи ('New South Wales', 'QLD', 'VIC', 'NSW', 'Victoria')


In [165]:
# Создаем словарь для стандартизации значений пола:
# F, Femal -> Female
# Male -> Male  
# U -> Unknown
gender_mapping = {
    'F': 'Female',
    'Female': 'Female', 
    'Femal': 'Female',
    'Male': 'Male',
    'U': 'Unknown'
}

# Создаем словарь для стандартизации названий штатов:
# NSW -> New South Wales
# QLD -> Queensland 
# VIC, Victoria -> Victoria
state_mapping = {
    'NSW': 'New South Wales',
    'New South Wales': 'New South Wales',
    'QLD': 'Queensland',
    'VIC': 'Victoria',
    'Victoria': 'Victoria'
}

# Применяем маппинги для стандартизации значений в колонках gender и state
df_customer['gender'] = df_customer['gender'].map(gender_mapping)
df_customer['state'] = df_customer['state'].map(state_mapping)

In [116]:
# Проверка результата
print(df_customer['gender'].unique())

['Female' 'Male' 'Unknown']


In [117]:
# Проверка результата
print(df_customer['state'].unique())

['New South Wales' 'Queensland' 'Victoria']


In [118]:
df_transaction.info()

<class 'pandas.core.frame.DataFrame'>
Index: 19803 entries, 0 to 19999
Data columns (total 12 columns):
 #   Column            Non-Null Count  Dtype         
---  ------            --------------  -----         
 0   transaction_id    19803 non-null  int64         
 1   product_id        19803 non-null  int64         
 2   customer_id       19803 non-null  int64         
 3   transaction_date  19803 non-null  datetime64[ns]
 4   online_order      19803 non-null  bool          
 5   order_status      19803 non-null  object        
 6   brand             19803 non-null  object        
 7   product_line      19803 non-null  object        
 8   product_class     19803 non-null  object        
 9   product_size      19803 non-null  object        
 10  list_price        19803 non-null  float64       
 11  standard_cost     19803 non-null  float64       
dtypes: bool(1), datetime64[ns](1), float64(2), int64(3), object(5)
memory usage: 1.8+ MB


In [119]:
df_customer.info()

<class 'pandas.core.frame.DataFrame'>
Index: 2780 entries, 0 to 3996
Data columns (total 15 columns):
 #   Column                 Non-Null Count  Dtype         
---  ------                 --------------  -----         
 0   customer_id            2780 non-null   int64         
 1   first_name             2780 non-null   object        
 2   last_name              2780 non-null   object        
 3   gender                 2780 non-null   object        
 4   DOB                    2780 non-null   datetime64[ns]
 5   job_title              2780 non-null   object        
 6   job_industry_category  2780 non-null   object        
 7   wealth_segment         2780 non-null   object        
 8   deceased_indicator     2780 non-null   object        
 9   owns_car               2780 non-null   object        
 10  address                2780 non-null   object        
 11  postcode               2780 non-null   int64         
 12  state                  2780 non-null   object        
 13  country 

In [120]:
# Загружаем уникальные локации из таблицы клиентов (почтовый индекс, штат, страна)
df_locations = df_customer[['postcode', 'state', 'country']].drop_duplicates()

In [122]:
# Загружаем уникальные должности
df_jobs = df_customer[['job_title', 
                       'job_industry_category']].drop_duplicates().reset_index(drop=True)

df_jobs['job_title_id'] = df_jobs.index + 1  # Создаем ID для должностей

In [123]:
# Загружаем уникальные продукты
df_products = df_transaction[['product_id', 
                              'brand', 
                              'product_line', 
                              'product_class', 
                              'product_size']].drop_duplicates()

In [124]:
# Объединяем таблицу клиентов с таблицей должностей для получения job_title_id
df_customer = df_customer.merge(df_jobs, on=["job_title", 
                                             "job_industry_category"], how="left")

In [126]:
# Удаляем столбцы, которые теперь хранятся в отдельных таблицах:
# job_title и job_industry_category перенесены в таблицу jobs
# state и country перенесены в таблицу locations
df_customer = df_customer.drop(columns=['job_title', 
                                        'job_industry_category', 
                                        'state', 
                                        'country'])

In [128]:
# Удаляем столбцы, которые теперь хранятся в таблице products
df_transaction = df_transaction.drop(columns=['brand', 
                                              'product_line', 
                                              'product_class', 
                                              'product_size'])

## Загрузка данных в PostgreSQL

In [129]:
import psycopg2

In [130]:
# Создаем подключение к базе данных PostgreSQL
# Параметры подключения
DB_HOST = "localhost"  # Хост базы данных
DB_PORT = "5432"      # Порт PostgreSQL по умолчанию
DB_NAME = "retail_db" # Название базы данных
DB_USER = "postgres"  # Имя пользователя
DB_PASS = "SUPER_PASS"    # Пароль пользователя

# Устанавливаем соединение с базой данных
conn = psycopg2.connect(
    host=DB_HOST,
    port=DB_PORT,
    database=DB_NAME,
    user=DB_USER,
    password=DB_PASS
)

In [131]:
# Получаем список всех таблиц в схеме public

query = """
SELECT table_name 
FROM information_schema.tables
WHERE table_schema = 'public'
ORDER BY table_name;
"""

pd.read_sql_query(query, conn)

Unnamed: 0,table_name
0,customers
1,job_titles
2,locations
3,products
4,transactions


In [132]:
# Создаем курсор для выполнения SQL-запросов к базе данных
cursor = conn.cursor()

# Очищаем таблицу locations (включая связанные данные через CASCADE)
cursor.execute("TRUNCATE TABLE locations CASCADE;")
conn.commit()

# Построчно загружаем данные о локациях из DataFrame в базу данных
for _, row in df_locations.iterrows():
    try:
        cursor.execute(
            "INSERT INTO locations (postcode, state, country) VALUES (%s, %s, %s)",
            (row['postcode'], row['state'], row['country'])
        )
        conn.commit()  # Сохраняем изменения в базе данных
    except Exception as e:
        conn.rollback()  # Откатываем изменения при ошибке
        print(f"Ошибка при вставке строки {row}: {e}")

In [133]:
# Очищаем таблицу должностей
cursor.execute("TRUNCATE TABLE job_titles CASCADE;")
conn.commit()

# Загружаем данные о должностях из DataFrame в таблицу job_titles
try:
    for _, row in df_jobs.iterrows():
        cursor.execute(
            """
            INSERT INTO job_titles (job_title_id, job_title, job_industry_category) 
            VALUES (%s, %s, %s)""", 
            (
            row['job_title_id'],
            row['job_title'] if not pd.isna(row['job_title']) else None,
            row['job_industry_category'] if not pd.isna(row['job_industry_category']) else None
        ))
    conn.commit()
    
    # Выводим количество загруженных записей для проверки
    cursor.execute("SELECT COUNT(*) FROM job_titles")
    print(f"Загружено записей: {cursor.fetchone()[0]}")
except Exception as e:
    conn.rollback()
    print(f"Ошибка: {e}")

Загружено записей: 869


In [142]:
# Очищаем таблицу customers и связанные данные через CASCADE
cursor.execute("TRUNCATE TABLE customers CASCADE;")
conn.commit()

# Загружаем данные клиентов из DataFrame в таблицу customers
try:
    for _, row in df_customer.iterrows():
        # Обработка пустого значения даты рождения
        dob = None if pd.isna(row['DOB']) else row['DOB']
        
        # Вставка записи о клиенте со всеми атрибутами
        cursor.execute(
            """INSERT INTO customers 
               (customer_id, first_name, last_name, gender, DOB, 
                job_title_id, wealth_segment, deceased_indicator, 
                owns_car, address, postcode, property_valuation) 
               VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
            (row['customer_id'], row['first_name'], row['last_name'], 
             row['gender'], dob, row['job_title_id'], 
             row['wealth_segment'], row['deceased_indicator'], 
             row['owns_car'], row['address'], row['postcode'], 
             row['property_valuation'])
        )
    conn.commit()
    
    # Проверяем успешность загрузки по количеству записей
    cursor.execute("SELECT COUNT(*) FROM customers")
    print(f"Загружено записей: {cursor.fetchone()[0]}")
    
except Exception as e:
    # Откат транзакции при возникновении ошибки
    conn.rollback()
    print(f"Ошибка: {e}")

Загружено записей: 2780


In [144]:
# Очищаем таблицу products и связанные данные через CASCADE
cursor.execute("TRUNCATE TABLE products CASCADE;")
conn.commit()

# Загружаем данные о продуктах из DataFrame в таблицу products
try:
    for _, row in df_products.iterrows():
        # Вставка записи о продукте со всеми атрибутами
        cursor.execute(
        """INSERT INTO products 
        (product_id, brand, product_line, 
        product_class, product_size) VALUES (%s, %s, %s, %s, %s)""",
        (row['product_id'], row['brand'], row['product_line'], 
         row['product_class'],row['product_size'])
    )
    conn.commit()
    
    # Проверяем успешность загрузки по количеству записей
    cursor.execute("SELECT COUNT(*) FROM products")
    print(f"Загружено записей: {cursor.fetchone()[0]}")
    
except Exception as e:
    # Откат транзакции при возникновении ошибки
    conn.rollback()
    print(f"Ошибка: {e}")

Загружено записей: 101


In [148]:
# Очищаем таблицу транзакций и связанные данные через CASCADE
cursor.execute("TRUNCATE TABLE transactions CASCADE;")
conn.commit()

# Загружаем данные о транзакциях из DataFrame в таблицу transactions
try:
    for _, row in df_transaction.iterrows():
        # Вставка записи о транзакции со всеми атрибутами
        cursor.execute(
        """INSERT INTO transactions 
        (transaction_id, product_id, customer_id, 
        transaction_date, online_order, order_status, 
        list_price, standard_cost) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
        (row['transaction_id'], row['product_id'], 
         row['customer_id'], row['transaction_date'], 
         row['online_order'], row['order_status'], row['list_price'], 
         row['standard_cost'])
    )
    conn.commit()
    
    # Проверяем успешность загрузки по количеству записей
    cursor.execute("SELECT COUNT(*) FROM transactions")
    print(f"Загружено записей: {cursor.fetchone()[0]}")
    
except Exception as e:
    # Откат транзакции при возникновении ошибки
    conn.rollback()
    print(f"Ошибка: {e}")

Загружено записей: 13890
