# Работа с большими данными и ETL
## Упр. - Загрузка данных о продажах из CSV-файла, очистка данных и загрузка в PostgreSQL
- Загрузите CSV-файл с данными о продажах, используя pandas.
- Очистите данные, удалив дубликаты и обработав пропущенные значения.
- Преобразуйте тип данных столбца "Дата" в формат даты.
- Загрузите очищенные данные в таблицу PostgreSQL, используя команду COPY.

In [3]:
# Драйвер для PostgreSQL
!pip install psycopg2-binary 



In [4]:
import pandas as pd
import psycopg2
# Математика
import numpy as np

### Загрузите CSV-файл с данными о продажах, используя pandas.

In [6]:
# загрузим 
url = r'C:\Users\79181\n.brykovskaya\module_2\5SQL\files\ecom_raw.csv'
try:
    df_raw = pd.read_csv(url)
except:
   print(f"Датасет  не загружен!")  

In [7]:
print(f"Датасет загружен, посмотрим \
несколько случайных строк:")
df_raw.sample(n=2, random_state=0).T

Датасет загружен, посмотрим несколько случайных строк:


Unnamed: 0,4627,2021
date,2019-10-09 14:00:00,2019-03-19 11:00:00
customer_id,ed0ff3ae-e963-4eef-a969-013bfe62d711,4b2f19f1-ffbb-4f7f-abc5-bb33c27c1a9c
order_id,72904,14581
product,"Сушилка для белья потолочная Лиана 2,0 м 1703009",Пеларгония Toscana Angeleyes Amarillo Rosato у...
quantity,1,1
price,599.0,112.0
total,599.0,112.0
value_segment,Medium-value,Lost
product_category,хозяйственная утварь,цветы/сад


### Очистите данные, удалив дубликаты и обработав пропущенные значения.

Выводим информацию о датафрейме
df.info()

In [10]:
df_raw.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 11061 entries, 0 to 11060
Data columns (total 9 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   date              11061 non-null  object 
 1   customer_id       11061 non-null  object 
 2   order_id          11061 non-null  int64  
 3   product           11061 non-null  object 
 4   quantity          11061 non-null  int64  
 5   price             11061 non-null  float64
 6   total             11029 non-null  float64
 7   value_segment     11061 non-null  object 
 8   product_category  11061 non-null  object 
dtypes: float64(2), int64(2), object(5)
memory usage: 777.9+ KB


In [11]:
print(f"В датафрейме {df_raw.shape[1]} столбцов. \
Названия столбцов в алфавитном порядке:")
for col in sorted(df_raw.columns):
    print(' -',col)

В датафрейме 9 столбцов. Названия столбцов в алфавитном порядке:
 - customer_id
 - date
 - order_id
 - price
 - product
 - product_category
 - quantity
 - total
 - value_segment


В датафрейме `df_raw` все столбцы оформлены в едином регистре и стиле - snake_case. Переименований на данном этапе не требуется.

Проверим таблицу на полные дубликаты строк.

In [13]:
print(f'Количество явных дубликатов в датафрейме: \
{df_raw.duplicated().sum()} шт. или {round(df_raw.duplicated().sum() / df_raw.shape[0] *100, 1)} %')    

Количество явных дубликатов в датафрейме: 5540 шт. или 50.1 %


В датасете много дубликатов. Такое количество дубликатов - не случайно. 50% дубликатов наводит на мысль, что данные были записаны в таблицу дважды, учитывая, что это полные дубли. 

Посмотрим на датафрейм с фильтром на дубликаты, причем покажем  датафрейм с параметром keep в методе duplicated = False, это помечает все дубликаты как True.  Значение keep=False удобно для изучения всех дублей, часто его применяют вместе с сортировкой по повторам, чтобы визуально было легче их сравнивать.

выведем 10 первых строк датасета.

In [15]:
df_raw[df_raw.duplicated(keep=False)].sort_values(by=['order_id', 'product']).head(10)

Unnamed: 0,date,customer_id,order_id,product,quantity,price,total,value_segment,product_category
3573,2019-06-02 20:00:00,d17861fa-db5f-4de4-93fd-dc5e60d24d3e,12624,"Осина обыкновенная, Высота 25-50 см",1,150.0,150.0,Low-value,цветы/сад
9094,2019-06-02 20:00:00,d17861fa-db5f-4de4-93fd-dc5e60d24d3e,12624,"Осина обыкновенная, Высота 25-50 см",1,150.0,150.0,Low-value,цветы/сад
3574,2019-06-02 20:00:00,d17861fa-db5f-4de4-93fd-dc5e60d24d3e,12624,"Осина обыкновенная, Высота 50-100 см",1,225.0,225.0,Low-value,цветы/сад
9095,2019-06-02 20:00:00,d17861fa-db5f-4de4-93fd-dc5e60d24d3e,12624,"Осина обыкновенная, Высота 50-100 см",1,225.0,225.0,Low-value,цветы/сад
97,2018-10-05 00:00:00,b80e4826-7218-4bf9-ac08-eb2c81ab3f62,13547,Рассада зелени для кухни Лаванды в кассете по ...,1,315.0,315.0,Lost,цветы/сад
5618,2018-10-05 00:00:00,b80e4826-7218-4bf9-ac08-eb2c81ab3f62,13547,Рассада зелени для кухни Лаванды в кассете по ...,1,315.0,315.0,Lost,цветы/сад
98,2018-10-05 00:00:00,b80e4826-7218-4bf9-ac08-eb2c81ab3f62,13547,Рассада зелени для кухни Розмарина в кассете п...,1,207.0,207.0,Lost,цветы/сад
5619,2018-10-05 00:00:00,b80e4826-7218-4bf9-ac08-eb2c81ab3f62,13547,Рассада зелени для кухни Розмарина в кассете п...,1,207.0,207.0,Lost,цветы/сад
99,2018-10-05 00:00:00,b80e4826-7218-4bf9-ac08-eb2c81ab3f62,13547,Рассада зелени для кухни Тимьяна в кассете по ...,1,162.0,162.0,Lost,цветы/сад
5620,2018-10-05 00:00:00,b80e4826-7218-4bf9-ac08-eb2c81ab3f62,13547,Рассада зелени для кухни Тимьяна в кассете по ...,1,162.0,162.0,Lost,цветы/сад


Строки просто дублируются - вероятно при записи в файл произошел сбой и данные полностью задублировались.
Избавимся от дубликатов.

In [17]:
#удаляем явные дубликаты  -перезаписываем в df
df = df_raw.copy (deep=True)
df = df.drop_duplicates()
print(f'Количество явных дубликатов в датафрейме после удаления: \
{df.duplicated().sum()} шт. или {round(df.duplicated().sum() / df.shape[0] *100, 1)} %')    

Количество явных дубликатов в датафрейме после удаления: 0 шт. или 0.0 %


Для датасетa выведем информацию, которая необходима для принятия решений о предобработке. Получим информацию о каждом столбце и его типе данных, сопоставим с хранимой информацией в полях.


In [19]:
print(f'Выведем информацию о столбцах датафрейма df :')
print('----------------------------------------------------------------------')
for column in df.columns:
    print(f"{df[column].dtypes}    - {column}")    
print('----------------------------------------------------------------------')

print(f'Кол-во столбцов с каждым типом данных в \
датафрейме:\n\
----------------------------------------------------------------------\
\n{df.dtypes.value_counts()} шт.')    

Выведем информацию о столбцах датафрейма df :
----------------------------------------------------------------------
object    - date
object    - customer_id
int64    - order_id
object    - product
int64    - quantity
float64    - price
float64    - total
object    - value_segment
object    - product_category
----------------------------------------------------------------------
Кол-во столбцов с каждым типом данных в датафрейме:
----------------------------------------------------------------------
object     5
int64      2
float64    2
Name: count, dtype: int64 шт.


В датафрейме df в столбце date содержится информация о дате, при этом тип данных указан строковый  - object. Для дальнейшего анализа необходимо преобразовать  его в формат даты.

В остальных столбцах содержащаяся информация соответствует типу данных. Преобразований не требуется. 
Укажем в каких столбцах требуется замена:

| поле            |хранимая информация    | текущий тип данных |требуется замена на тип данных|
|:----------------|:----------------------|:-------------------|:-----------------------------|
|date   |дата|            object     |                datetime         |


### Произведем замену.

In [21]:
print(f"Столбец date тип данных до \
замены {df['date'].dtypes}", end=', ')

df['date'] = (pd.to_datetime(
    df['date']))

print(f"после замены \
{df['date'].dtypes}.", end='\n')

Столбец date тип данных до замены object, после замены datetime64[ns].


#### Анализ пропусков 
- оценим полноту данных. Используем пользовательскую функцию, которая выведет информацию о пропусках.

In [23]:
# объявления функции reserch_na
def reserch_na(dfr):
    """
    """
    print(f"Всего датасет содержит {dfr.isnull().values.sum()} \
пропусков.\nЭто \
{round(dfr.isnull().values.sum() * 100 / (len(dfr) * dfr.shape[1]), 1)}\
% от всех данных\n\
=========================================")
    print('Пропусков в столбцах:\n\
=========================================')
    print(dfr.isna().sum(),'\n\
=========================================')
    print('Процент пропусков в каждом из столбцов:\n\
=========================================')
    for col in dfr.columns:
        pct_missing = np.mean(dfr[col].isnull())
        if (pct_missing*100) > 0:
            print('{} - {}%'
                  .format(col,(pct_missing*100).round(2)))

In [24]:
# вызов функции reserch_na для датафрейма
reserch_na(df)

Всего датасет содержит 16 пропусков.
Это 0.0% от всех данных
Пропусков в столбцах:
date                 0
customer_id          0
order_id             0
product              0
quantity             0
price                0
total               16
value_segment        0
product_category     0
dtype: int64 
Процент пропусков в каждом из столбцов:
total - 0.29%


заполним пропуски  - это удастся сделать, так как столбец вычисляемый quantity * price                            

In [26]:
# заполним пропуски
df['total'] = df.apply(
    lambda row: row['quantity'] * row['price'] 
    if np.isnan(row['total']) 
    else row['total'],
    axis=1)
# проверим пропуски после заполнения
if df.isnull().values.sum() == 0:
    print('Пропуски удалены успешно!')
else:
    print('Что-то пошло не так!')    

Пропуски удалены успешно!


Произвели предобработку таблицы, изменили тип данных, заполнили пропуски, удалили дубликаты.
Теперь сформируем таблицу с нужными столбцами (будем выгружать не все).


In [28]:
# делаем копию
df_final = df.copy(deep=True)
# оставляем только нужные столбцы + переименовываем
df_final = (df_final[['date', 
                     'order_id', 
                     'customer_id', 
                     'product', 
                     'total']].rename(columns={'total': 'revenue'}))
df_final.head()

Unnamed: 0,date,order_id,customer_id,product,revenue
0,2018-10-01,68477,ee47d746-6d2f-4d3c-9622-c31412542920,"Комнатное растение в горшке Алое Вера, d12, h30",142.0
1,2018-10-01,68477,ee47d746-6d2f-4d3c-9622-c31412542920,"Комнатное растение в горшке Кофе Арабика, d12,...",194.0
2,2018-10-01,68477,ee47d746-6d2f-4d3c-9622-c31412542920,Радермахера d-12 см h-20 см,112.0
3,2018-10-01,68477,ee47d746-6d2f-4d3c-9622-c31412542920,Хризолидокарпус Лутесценс d-9 см,179.0
4,2018-10-01,68477,ee47d746-6d2f-4d3c-9622-c31412542920,Циперус Зумула d-12 см h-25 см,112.0


### Загрузите очищенные данные в таблицу PostgreSQL, используя команду COPY.

In [30]:
import sqlalchemy as sa

In [31]:
import os
from sqlalchemy import text, create_engine

In [32]:
#поиск файла .env и загрузка из него переменных среды - файл в папке с ноутбуком 
# д.лежать +в гитигноре
from dotenv import load_dotenv
#грузим пароли запуская поиск файла .env локально
load_dotenv()

True

In [33]:
df_final.to_csv('ecom.csv', index=False) 

In [34]:
# Подключение к PostgreSQL
database_url = "postgresql://postgres:os.getenv('PASSWORD')@localhost:5432/postgres"
engine = create_engine(database_url) 

In [35]:
# Загрузка данных в PostgreSQL
# Очищенный CSV-файл
file_path = "cleaned_ecom.csv"
table_name = "ecom"

# SQL запрос для удаления таблицы 
drop_table_sql = "DROP TABLE IF EXISTS ecom;"

# SQL запрос для создания таблицы 
create_table_sql = """
CREATE TABLE IF NOT EXISTS ecom (
    DATE TIMESTAMP,
    ORDER_ID INT,
    CUSTOMER_ID TEXT,
    PRODUCT TEXT,
    REVENUE NUMERIC(10,2)
    );
"""

In [None]:
# Открываем соединение через psycopg2
conn = engine.raw_connection()  # Создаем явное подключение
try:
    cur = conn.cursor()
    
    # Создаем таблицу, если она не существует
    cur.execute(create_table_sql)
    conn.commit()

    # Загружаем данные из CSV файла
    with open(file_path, "r") as f:
        print("Загрузка данных...")
        cur.copy_expert(f"COPY {table_name} FROM STDIN WITH CSV HEADER", f)
        print("Данные успешно загружены!")

    conn.commit()
    cur.close()
    print("Таблица создана (если не существовала) и данные успешно загружены в PostgreSQL!")
    
finally:
    conn.close()  # Закрываем соединение