In [2]:
import pandas as pd 
from sqlalchemy import create_engine
from sqlalchemy.engine.url import URL
import plotly.express as px

In [9]:
ads = pd.read_csv(
    r'C:\Users\ads.csv', parse_dates=["created_at"])
leads = pd.read_csv(
    r'C:\Users\leads.csv', parse_dates=["lead_created_at"])
purchases = pd.read_csv(
    r'C:\Users\purchases.csv', parse_dates=["purchase_created_at"])

# EDA

## Начнем с анализа данных в ads 

In [162]:
ads.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8602 entries, 0 to 8601
Data columns (total 10 columns):
 #   Column           Non-Null Count  Dtype         
---  ------           --------------  -----         
 0   created_at       8602 non-null   datetime64[ns]
 1   d_ad_account_id  8602 non-null   object        
 2   d_utm_source     8602 non-null   object        
 3   d_utm_medium     8602 non-null   object        
 4   d_utm_campaign   8602 non-null   object        
 5   d_utm_content    8602 non-null   object        
 6   d_utm_term       0 non-null      float64       
 7   m_clicks         8602 non-null   float64       
 8   m_cost           8602 non-null   float64       
 9   price            4211 non-null   float64       
dtypes: datetime64[ns](1), float64(4), object(5)
memory usage: 672.2+ KB


In [10]:
ads['d_utm_campaign'] = ads['d_utm_campaign'].astype(str)
ads['d_utm_content'] = ads['d_utm_content'].astype(str)

1. Дата

In [26]:
fig = px.histogram(ads, x="created_at")
fig.show()

Разброс с 2 января по 17 сентября, пик пришелся на март 

2. Источники 

In [27]:
ads['d_utm_source'].unique()

array(['yandex'], dtype=object)

Только яндекс.

3. Типы трафика 

In [31]:
ads['d_utm_medium'].unique()

array(['cpc'], dtype=object)

Только CPC – Cost per Click. Рекламодатель оплачивает клик по рекламе независимо от того, привёл он к продаже или другому целевому действию или нет

4. Компании

In [32]:
ads['d_utm_campaign'].unique()

array([48306435, 48306450, 48306473, 48306487, 48306494, 48306518,
       48306461, 48306469, 60279528, 72000794], dtype=int64)

In [103]:
# посмотрим на графики каждой компании 
grouped_df = ads.groupby(['d_utm_campaign', "created_at"])[
    'created_at'].count()
grouped_df = grouped_df.to_frame().rename(
    columns={'created_at': 'count'}).reset_index()
fig = px.line(grouped_df, x="created_at", y="count", color="d_utm_campaign",
              color_discrete_map=company_colors, title="Количество рекламных объявлений по датам для каждой компании")
fig.show()

5. контент-метка

In [36]:
# Все уникального контента меток
ads['d_utm_content'].unique().shape[0]

121

In [94]:
ads.groupby(['d_utm_campaign', 'd_utm_content'])[
    'd_utm_content'].count()


d_utm_campaign  d_utm_content
48306435        8404700756       253
                8404700757       251
                8813476161       243
                8813476162       252
                8813476163       244
                                ... 
72000794        11844860344       20
                11844860345       20
                11844860346       20
                11844860347       20
                11844860348       19
Name: d_utm_content, Length: 121, dtype: int64

In [101]:
ads.groupby('d_utm_campaign')[
    'd_utm_content'].unique().apply(lambda x: len(x))

d_utm_campaign
48306435     6
48306450     5
48306461    36
48306469    18
48306473     6
48306487     6
48306494     6
48306518    24
60279528     8
72000794     6
Name: d_utm_content, dtype: int64

Разные компании содержат разный контент 

6. utm_term

Utm_term - это одно из значений UTM-метки, используемое для идентификации условий поиска рекламной системы.

In [81]:
ads['d_utm_term'].notna().sum()

0

Все значения пусты, tm_term относится к опциональным меткам, т.е. ее использование не обязательно.

In [84]:
ads['d_ad_account_id'].unique()

array(['xo-for-client-ya'], dtype=object)

Похоже что реклама была только в яндексе

In [17]:
ads['d_utm_source'].unique()

array(['yandex'], dtype=object)

## Leads

In [117]:
leads.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 23540 entries, 0 to 23539
Data columns (total 8 columns):
 #   Column               Non-Null Count  Dtype         
---  ------               --------------  -----         
 0   lead_created_at      23540 non-null  datetime64[ns]
 1   lead_id              23540 non-null  object        
 2   d_lead_utm_source    12331 non-null  object        
 3   d_lead_utm_medium    10100 non-null  object        
 4   d_lead_utm_campaign  9569 non-null   object        
 5   d_lead_utm_content   3338 non-null   object        
 6   d_lead_utm_term      328 non-null    object        
 7   client_id            18601 non-null  object        
dtypes: datetime64[ns](1), object(7)
memory usage: 1.4+ MB


In [18]:
leads['d_lead_utm_source'].unique()

array([nan, 'vkontakte', 'ycard', 'vk', 'vkmegacity', 'sms', 'instagram',
       'zvonobot', 'yandex', 'whatsapp', 'kviz', 'megacity', 'VOICEROBOT',
       'telegram', 'viber', 'eLama-google', 'site', 'df310722',
       'ycard#!/tproduct/323718988-1498486301712', 'google',
       'clients_day', 'inst', 'intagram', 'lift', 'banner', 'razdatka',
       'outdoor', 'promo1', 'promo5', 'promo6', 'promo3', 'promo2',
       'promo8', 'promo7', 'ff110922', 'promo11', 'pt1109', 'vkontakteso',
       'zapisnatrenirovk', 'ycard#!/tproduct/323718988-1646116631071',
       'dz1809', 'ycard#!/tproduct/323718988-1498486363994',
       'telegramsamara', 'vkmegasity', 'vkontakteknopka',
       'vkontaktereklama', 'navigaciya', 'megasity', '%utm_source%',
       'vkontaktevertical'], dtype=object)

In [136]:
leads['d_lead_utm_source'].unique()

array([nan, 'vkontakte', 'ycard', 'vk', 'vkmegacity', 'sms', 'instagram',
       'zvonobot', 'yandex', 'whatsapp', 'kviz', 'megacity', 'VOICEROBOT',
       'telegram', 'viber', 'eLama-google', 'site', 'df310722',
       'ycard#!/tproduct/323718988-1498486301712', 'google',
       'clients_day', 'inst', 'intagram', 'lift', 'banner', 'razdatka',
       'outdoor', 'promo1', 'promo5', 'promo6', 'promo3', 'promo2',
       'promo8', 'promo7', 'ff110922', 'promo11', 'pt1109', 'vkontakteso',
       'zapisnatrenirovk', 'ycard#!/tproduct/323718988-1646116631071',
       'dz1809', 'ycard#!/tproduct/323718988-1498486363994',
       'telegramsamara', 'vkmegasity', 'vkontakteknopka',
       'vkontaktereklama', 'navigaciya', 'megasity', '%utm_source%',
       'vkontaktevertical'], dtype=object)

In [137]:
leads['d_lead_utm_medium'].unique()

array([nan, 'social', 'cpc', 'email', 'organic', 'banner', 'razdatka',
       'zvonobot', 'sociak', 'cpa', 'vk', 'socia', 'banne', 'mobile',
       '%utm_medium%'], dtype=object)

In [132]:
leads.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 23540 entries, 0 to 23539
Data columns (total 8 columns):
 #   Column               Non-Null Count  Dtype         
---  ------               --------------  -----         
 0   lead_created_at      23540 non-null  datetime64[ns]
 1   lead_id              23540 non-null  object        
 2   d_lead_utm_source    12331 non-null  object        
 3   d_lead_utm_medium    10100 non-null  object        
 4   d_lead_utm_campaign  9569 non-null   object        
 5   d_lead_utm_content   3338 non-null   object        
 6   d_lead_utm_term      328 non-null    object        
 7   client_id            18601 non-null  object        
dtypes: datetime64[ns](1), object(7)
memory usage: 1.4+ MB


In [182]:
ads.duplicated().sum()

0

## Purchases

In [4]:
purchases.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 66180 entries, 0 to 66179
Data columns (total 4 columns):
 #   Column               Non-Null Count  Dtype         
---  ------               --------------  -----         
 0   purchase_created_at  66180 non-null  datetime64[ns]
 1   purchase_id          66180 non-null  object        
 2   client_id            66179 non-null  object        
 3   m_purchase_amount    66180 non-null  float64       
dtypes: datetime64[ns](1), float64(1), object(2)
memory usage: 2.0+ MB


In [7]:
purchases['client_id'][0]

'7011bdcd-6fd8-11e7-80fc-c412f533dba1'

В id клиента str тип данных,что и логично, хотя в схеме указан int.

In [8]:
purchases['purchase_id'][0]

'd3198d39-6d16-40c9-bff7-aa28bd6e2991'

аналогично 

In [15]:
pivot_table = purchases.pivot_table(
    values='m_purchase_amount', index='client_id', aggfunc='sum')
pivot_table

In [23]:
purchases.isna().sum()

purchase_created_at    0
purchase_id            0
client_id              1
m_purchase_amount      0
dtype: int64

In [24]:
purchases

Unnamed: 0,purchase_created_at,purchase_id,client_id,m_purchase_amount
0,2022-02-21,d3198d39-6d16-40c9-bff7-aa28bd6e2991,7011bdcd-6fd8-11e7-80fc-c412f533dba1,9950.0
1,2022-02-21,48a0ad24-77aa-4064-a971-dd0d6f1f6c50,a2771bb0-6fd6-11e7-80fc-c412f533dba1,8700.0
2,2022-02-21,267ff20f-a56c-480a-b2f7-1f7b1a746f9f,2687f503-6fd7-11e7-80fc-c412f533dba1,0.0
3,2022-02-21,19fba5f0-785d-45bd-903a-34ba529c6404,01122a97-45bc-11eb-ac23-c412f533dba1,8700.0
4,2022-02-21,00ac921d-32e3-4200-9435-a46d5fef4a29,a55b8c4e-d5a3-11e9-abbe-c412f533dba1,500.0
...,...,...,...,...
66175,2022-02-19,8c1cdf21-2c6a-4ae0-8de4-8c86b6a217a5,d1f7b6d1-644f-11ea-abf3-c412f533dba1,110.0
66176,2022-02-19,161adaa3-419d-4c9e-bbab-ad9d32caaf0c,41ec1281-8232-11eb-ac27-c412f533dba1,119.0
66177,2022-08-31,db2b1f43-9963-44a4-b81f-a35d086250fa,85b0f39c-0c60-11eb-ac1a-c412f533dba1,9450.0
66178,2022-08-31,35cd2dd2-dc7c-4245-a28d-de3c352aef7e,b04dc805-0e32-11ec-9043-848f69e142cf,6495.0


## Joinим таблички 

id Лидов разные 

In [22]:
full_df = pd.merge(ads,
                   leads,
                   left_on=['created_at', 'd_utm_source', 'd_utm_medium', 'd_utm_campaign', 'd_utm_content'
       ],
                   right_on=['lead_created_at', 'd_lead_utm_source', 'd_lead_utm_medium', 'd_lead_utm_campaign', 'd_lead_utm_content'
       ], 
                   how='inner')
full_df.drop(columns=['lead_created_at', 'd_lead_utm_source', 'd_lead_utm_medium', 'd_lead_utm_campaign',
                       'd_lead_utm_content', 'd_utm_term', 'd_lead_utm_term'
                        ], inplace=True)

In [20]:
full_df.isna().sum()

created_at           0
d_ad_account_id      0
d_utm_source         0
d_utm_medium         0
d_utm_campaign       0
d_utm_content        0
m_clicks             0
m_cost               0
lead_id              0
client_id          369
dtype: int64

Left Merge (объединение по левой таблице):
NaN значения сохраняются из левой таблицы. Если в правой таблице есть соответствующее значение для ключа, оно будет объединено с NaN из левой таблицы.Поэтому nan здесь нужно убрать

In [23]:
full_df.dropna(inplace=True)

In [24]:
full_df = pd.merge(full_df,
                   purchases, on='client_id', how='left')
full_df

Unnamed: 0,created_at,d_ad_account_id,d_utm_source,d_utm_medium,d_utm_campaign,d_utm_content,m_clicks,m_cost,lead_id,client_id,purchase_created_at,purchase_id,m_purchase_amount
0,2022-08-08,xo-for-client-ya,yandex,cpc,48306450,8404701659,15.0,496.896,136c5ccc-16e9-11ed-9074-848f69e142cf,6ba575d8-f458-11ec-9070-848f69e142cf,2022-06-25,1c40ccbb-af39-48d6-8931-e217f665b4fc,0.0
1,2022-08-08,xo-for-client-ya,yandex,cpc,48306450,8404701661,5.0,124.704,8c5a5080-16f8-11ed-9074-848f69e142cf,ddb3e740-a856-11ec-905f-848f69e142cf,2022-03-20,1337d48f-993c-4d58-884d-45126a0f33e4,0.0
2,2022-08-08,xo-for-client-ya,yandex,cpc,48306450,8404701661,5.0,124.704,8c5a5080-16f8-11ed-9074-848f69e142cf,ddb3e740-a856-11ec-905f-848f69e142cf,2022-03-30,327f9ca8-ab45-46cd-ae39-303fce562ab2,285.0
3,2022-08-08,xo-for-client-ya,yandex,cpc,48306450,8404701661,5.0,124.704,8c5a5080-16f8-11ed-9074-848f69e142cf,ddb3e740-a856-11ec-905f-848f69e142cf,2022-08-08,8e090ba8-e323-4189-bb3c-d9e1483a7212,6194.0
4,2022-08-08,xo-for-client-ya,yandex,cpc,48306450,8404701665,20.0,385.896,dd50a605-1714-11ed-9074-848f69e142cf,ca65c322-1ac9-11ec-9045-848f69e142c9,2022-10-14,cac4fba6-1eb0-4c65-9d9d-10d31ee1d69f,160.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
2157,2022-07-07,xo-for-client-ya,yandex,cpc,48306435,8813476162,3.0,93.108,59d3c222-fe21-11ec-9072-848f69e142cf,a7e5d538-fe8e-11ec-9072-848f69e142cf,2022-07-23,c95f7ac1-db2f-432d-9388-ea7005bc5451,0.0
2158,2022-07-07,xo-for-client-ya,yandex,cpc,48306435,8813476162,3.0,93.108,59d3c222-fe21-11ec-9072-848f69e142cf,a7e5d538-fe8e-11ec-9072-848f69e142cf,2022-07-10,afdda038-c761-4030-ab00-45b563b660a2,8495.0
2159,2022-07-07,xo-for-client-ya,yandex,cpc,48306450,8404701665,6.0,133.800,2d611bc8-fdeb-11ec-9072-848f69e142cf,ab8eb454-fdec-11ec-9072-848f69e142cf,NaT,,
2160,2022-07-07,xo-for-client-ya,yandex,cpc,48306487,8404707410,5.0,146.556,b10ef332-fe13-11ec-9072-848f69e142cf,f8e4ae80-febc-11ec-9072-848f69e142cf,NaT,,


In [25]:
full_df.isna().sum()

created_at               0
d_ad_account_id          0
d_utm_source             0
d_utm_medium             0
d_utm_campaign           0
d_utm_content            0
m_clicks                 0
m_cost                   0
lead_id                  0
client_id                0
purchase_created_at    704
purchase_id            704
m_purchase_amount      704
dtype: int64

In [26]:
full_df.dropna(inplace=True)

In [27]:
full_df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 1458 entries, 0 to 2161
Data columns (total 13 columns):
 #   Column               Non-Null Count  Dtype         
---  ------               --------------  -----         
 0   created_at           1458 non-null   datetime64[ns]
 1   d_ad_account_id      1458 non-null   object        
 2   d_utm_source         1458 non-null   object        
 3   d_utm_medium         1458 non-null   object        
 4   d_utm_campaign       1458 non-null   object        
 5   d_utm_content        1458 non-null   object        
 6   m_clicks             1458 non-null   float64       
 7   m_cost               1458 non-null   float64       
 8   lead_id              1458 non-null   object        
 9   client_id            1458 non-null   object        
 10  purchase_created_at  1458 non-null   datetime64[ns]
 11  purchase_id          1458 non-null   object        
 12  m_purchase_amount    1458 non-null   float64       
dtypes: datetime64[ns](2), float64(3), obje

Каждому лиду “в зачет” идут только те покупки, которые клиент сделал в первые 15 дней после создания заявки. Поэтому чтобы убрать разночтения сделаем условие 

In [93]:
condition = (full_df['purchase_created_at'] -
             full_df['created_at']) <= pd.Timedelta(days=15)
clear_df = full_df[condition]
clear_df

Unnamed: 0,created_at,d_ad_account_id,d_utm_source,d_utm_medium,d_utm_campaign,d_utm_content,m_clicks,m_cost,lead_id,client_id,purchase_created_at,purchase_id,m_purchase_amount
0,2022-08-08,xo-for-client-ya,yandex,cpc,48306450,8404701659,15.0,496.896,136c5ccc-16e9-11ed-9074-848f69e142cf,6ba575d8-f458-11ec-9070-848f69e142cf,2022-06-25,1c40ccbb-af39-48d6-8931-e217f665b4fc,0.0
1,2022-08-08,xo-for-client-ya,yandex,cpc,48306450,8404701661,5.0,124.704,8c5a5080-16f8-11ed-9074-848f69e142cf,ddb3e740-a856-11ec-905f-848f69e142cf,2022-03-20,1337d48f-993c-4d58-884d-45126a0f33e4,0.0
2,2022-08-08,xo-for-client-ya,yandex,cpc,48306450,8404701661,5.0,124.704,8c5a5080-16f8-11ed-9074-848f69e142cf,ddb3e740-a856-11ec-905f-848f69e142cf,2022-03-30,327f9ca8-ab45-46cd-ae39-303fce562ab2,285.0
3,2022-08-08,xo-for-client-ya,yandex,cpc,48306450,8404701661,5.0,124.704,8c5a5080-16f8-11ed-9074-848f69e142cf,ddb3e740-a856-11ec-905f-848f69e142cf,2022-08-08,8e090ba8-e323-4189-bb3c-d9e1483a7212,6194.0
5,2022-08-08,xo-for-client-ya,yandex,cpc,48306450,8404701665,20.0,385.896,dd50a605-1714-11ed-9074-848f69e142cf,ca65c322-1ac9-11ec-9045-848f69e142c9,2022-04-20,0d25f368-820a-4f22-97e8-157b704b98e8,390.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
2146,2022-09-08,xo-for-client-ya,yandex,cpc,48306450,8404701661,8.0,202.248,cefe5d59-2f53-11ed-907b-848f69e142cf,822ca8fd-d538-11e9-abbe-c412f533dba1,2022-09-12,9863f7c7-52d2-427c-87ef-b7586d853afa,10445.0
2151,2022-07-07,xo-for-client-ya,yandex,cpc,48306435,8813476162,3.0,93.108,59d3c222-fe21-11ec-9072-848f69e142cf,a7e5d538-fe8e-11ec-9072-848f69e142cf,2022-07-10,055dc1b2-f7ce-4e04-b715-7b6711db1e18,700.0
2153,2022-07-07,xo-for-client-ya,yandex,cpc,48306435,8813476162,3.0,93.108,59d3c222-fe21-11ec-9072-848f69e142cf,a7e5d538-fe8e-11ec-9072-848f69e142cf,2022-07-10,8158aae8-14a2-4e35-be4d-31225a44b690,0.0
2158,2022-07-07,xo-for-client-ya,yandex,cpc,48306435,8813476162,3.0,93.108,59d3c222-fe21-11ec-9072-848f69e142cf,a7e5d538-fe8e-11ec-9072-848f69e142cf,2022-07-10,afdda038-c761-4030-ab00-45b563b660a2,8495.0


In [106]:
grouped = clear_df.groupby(
    ['created_at', 'd_utm_source', 'd_utm_medium', 'd_utm_campaign'])

aggregated = grouped.agg({
    'm_clicks': 'sum',
    'm_cost': 'sum',
    'lead_id': 'nunique',
    'purchase_id': 'nunique',
    'm_purchase_amount': 'sum'
})

aggregated['CPL'] = aggregated['m_cost'] / aggregated['lead_id']
aggregated['ROAS'] = aggregated['m_purchase_amount'] / aggregated['m_cost']
aggregated.reset_index(inplace=True)
aggregated

Unnamed: 0,created_at,d_utm_source,d_utm_medium,d_utm_campaign,m_clicks,m_cost,lead_id,purchase_id,m_purchase_amount,CPL,ROAS
0,2022-01-04,yandex,cpc,48306450,30.0,723.024,1,3,9990.0,723.024,13.816969
1,2022-01-05,yandex,cpc,48306450,11.0,217.380,1,1,290.0,217.380,1.334069
2,2022-01-06,yandex,cpc,48306450,24.0,451.896,1,2,380.0,451.896,0.840901
3,2022-01-07,yandex,cpc,48306450,34.0,612.468,3,4,10080.0,204.156,16.458003
4,2022-01-08,yandex,cpc,48306450,8.0,136.776,1,1,8200.0,136.776,59.952038
...,...,...,...,...,...,...,...,...,...,...,...
274,2022-09-12,yandex,cpc,48306450,71.0,1987.800,2,4,17509.0,993.900,8.808230
275,2022-09-13,yandex,cpc,48306450,212.0,6652.236,3,11,39214.0,2217.412,5.894860
276,2022-09-13,yandex,cpc,48306473,2.0,23.328,1,1,12495.0,23.328,535.622428
277,2022-09-14,yandex,cpc,48306435,4.0,163.236,1,1,9999.0,163.236,61.254870


In [94]:

fig = px.line(aggregated, x="created_at", y="ROAS", color="d_utm_campaign",
              title="ROAS")
fig.show()

In [102]:
aggregated.shape

(279, 11)

# data_base

In [120]:
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.engine.url import URL
import sqlalchemy
import numpy as np 

In [112]:
# # Сохраняем каждый DataFrame в базу данных
# ads.to_sql('ads', engine, if_exists='replace', index=False)
# leads.to_sql('leads', engine, if_exists='replace', index=False)
# purchases.to_sql('purchases', engine, if_exists='replace', index=False)

180

In [91]:
# запрос для нужной таблицы 
query = '''
SELECT 
    ads.created_at,
    ads.d_utm_source,
    ads.d_utm_medium,
    ads.d_utm_campaign,
    ads.m_clicks,
    ads.m_cost,
    leads.lead_id,
    purchases.purchase_id,
    purchases.m_purchase_amount
FROM 
    ads
INNER JOIN 
    leads ON ads.created_at = leads.lead_created_at
    AND ads.d_utm_source = leads.d_lead_utm_source
    AND ads.d_utm_medium = leads.d_lead_utm_medium
    AND ads.d_utm_campaign = leads.d_lead_utm_campaign
    AND ads.d_utm_content = leads.d_lead_utm_content
LEFT JOIN 
    purchases ON leads.client_id = purchases.client_id
WHERE 
    ads.created_at IS NOT NULL
    AND leads.lead_created_at IS NOT NULL
    AND purchases.m_purchase_amount IS NOT NULL
    AND EXTRACT(DAY FROM (purchases.purchase_created_at - ads.created_at)) <= 15;
    '''


# Класс pipeline

In [181]:
class Pipeline:
    def __init__(self, query=None, database=None):
        
        '''
        Класс Pipeline инициализируется с запросом к БД,
        по-умолчанию запрос к нашей БД
        '''
        
        # подключение к БД
        if database is None:
            self.database = {
                'drivername': 'postgresql',
                'username': 'postgres',
                'password': '123456',
                'host': 'localhost',
                'port': 5433,
                'database': 'leads_data',
                'query': {}
            }
        else:
            self.database = database

        # базовый запрос к БД
        if query is None:
            self.query = '''
                SELECT 
                    ads.created_at,
                    ads.d_utm_source,
                    ads.d_utm_medium,
                    ads.d_utm_campaign,
                    ads.m_clicks,
                    ads.m_cost,
                    leads.lead_id,
                    purchases.purchase_id,
                    purchases.m_purchase_amount
                FROM 
                    ads
                INNER JOIN 
                    leads ON ads.created_at = leads.lead_created_at
                    AND ads.d_utm_source = leads.d_lead_utm_source
                    AND ads.d_utm_medium = leads.d_lead_utm_medium
                    AND ads.d_utm_campaign = leads.d_lead_utm_campaign
                    AND ads.d_utm_content = leads.d_lead_utm_content
                LEFT JOIN 
                    purchases ON leads.client_id = purchases.client_id
                WHERE 
                    ads.created_at IS NOT NULL
                    AND leads.lead_created_at IS NOT NULL
                    AND purchases.m_purchase_amount IS NOT NULL
                    AND EXTRACT(DAY FROM (purchases.purchase_created_at - ads.created_at)) <= 15;
                    '''
        else:
            self.query = query
            
        
    
    
    def get_data(self):
        
        '''
        Метод get_data возвращает наш запрос из БД
        '''
        
        engine = create_engine(
            f"{self.database['drivername']}://{self.database['username']}:{self.database['password']}@"
            f"{self.database['host']}:{self.database['port']}/{self.database['database']}",
            connect_args=self.database['query']
        )
        return pd.read_sql_query(self.query, con=engine)
    
    
    def check_data(self, df: pd.DataFrame = None, expected_types: dict = None,
                   utm_source_list: list = None,
                   utm_medium_list: list = None,
                   utm_campaign_list: list = None,
                   outlier_threshold: float = 1):
        '''
        Метод check_data проверяет данные на корректность:
        1. Соответствие типу данных, по-умолчанию схема указана, можно свою
        2. Соответствие источнику, медиумов и компаний. Можно использовать свои списки для проверок
        3. Проверку дубликатов 
        4. Проверка корректности дат: не больше чем сегодня и не раньше 01.01.2022
        5. Проверку на отрицательные значения для числовых столбцов
        6. Проверка на выбросы, параметр outlier_threshold отвечает за процентильный порог 
        '''
        if df is None:
            df = self.get_data()
            
        if expected_types is None:
            expected_types = {
                'created_at': np.dtype('<M8[ns]'),
                'd_utm_source': np.dtype('O'),
                'd_utm_medium': np.dtype('O'),
                'd_utm_campaign': np.dtype('O'),
                'm_clicks': np.dtype('float64'),
                'm_cost': np.dtype('float64'),
                'lead_id': np.dtype('O'),
                'purchase_id': np.dtype('O'),
                'm_purchase_amount': np.dtype('float64')
            }

        if utm_source_list is None:
            utm_source_list = ['yandex']

        if utm_medium_list is None:
            utm_medium_list = ['cpc']

        if utm_campaign_list is None:
            utm_campaign_list = ['48306450', '48306518', '48306494',
                                 '48306435', '48306487', '48306473', '72000794', '48306461']

        # Проверка соответствия типов данных
        for col, dtype in expected_types.items():
            if df[col].dtype != dtype:
                print(
                    f"Столбец '{col}' Не соответствует ожидаемому типу данных {dtype}.")

        # Проверка на наличие NaN значений
        nan_values = df.isnull().sum()
        if nan_values.any():
            print(
                f"Обнаружены NaN значения в следующих столбцах:\n{nan_values[nan_values > 0]}")

        # Проверка на наличие дубликатов
        duplicate_values = df.duplicated().sum()
        if duplicate_values:
            print(
                f"Обнаружено {duplicate_values} дубликатов в данных.")

        # Проверка дат на превышение текущей даты и дат до 01.01.2022
        today = datetime.today()
        date_2022 = datetime(2022, 1, 1)
        date_columns = [col for col, dtype in expected_types.items()
                        if dtype == np.dtype('<M8[ns]')]

        for col in date_columns:
            # Проверка на даты после текущей даты
            dates_after_today = df[df[col] > today]
            if not dates_after_today.empty:
                print(
                    f"Обнаружены даты в столбце '{col}', превышающие текущую дату.")
                print(dates_after_today)

            # Проверка на даты до 01.01.2022
            dates_before_2022 = df[df[col] < date_2022]
            if not dates_before_2022.empty:
                print(
                    f"Обнаружены даты в столбце '{col}', до 01.01.2022:")
                print(dates_before_2022)

        # Проверка источников, кампаний и медиумов
        if 'd_utm_source' in df.columns:
            invalid_utm_source = df[~df['d_utm_source'].isin(utm_source_list)]
            if not invalid_utm_source.empty:
                print(f"Обнаружены неверные значения в 'd_utm_source':")
                print(invalid_utm_source)

        if 'd_utm_medium' in df.columns:
            invalid_utm_medium = df[~df['d_utm_medium'].isin(utm_medium_list)]
            if not invalid_utm_medium.empty:
                print(f"Обнаружены неверные значения в 'd_utm_medium':")
                print(invalid_utm_medium)

        if 'd_utm_campaign' in df.columns:
            invalid_utm_campaign = df[~df['d_utm_campaign'].isin(
                utm_campaign_list)]
            if not invalid_utm_campaign.empty:
                print(f"Обнаружены неверные значения в 'd_utm_campaign':")
                print(invalid_utm_campaign)

        # Проверка на отрицательные значения и выбросы в числовых столбцах
        numeric_columns = df.select_dtypes(
            include=['float64', 'int64']).columns.tolist()

        for col in numeric_columns:
            if df[col].min() < 0:
                print(
                    f"Обнаружены отрицательные значения в столбце '{col}'.")

            percentile_99 = df[col].quantile(outlier_threshold)
            outliers = df[df[col] > percentile_99]
            if not outliers.empty:
                print(f"Обнаружены выбросы в столбце '{col}':")
                display(outliers)
        return df

    def aggregate_data(self, df: pd.DataFrame = None):
        
        '''
        Метод aggregate_data принимает df и агрегирует данные к нужному нам формату,
        так же он проверяет наличие необходимых полей.
        По-умолчанию применяет метод   check_data() для создания df и проверки данных
        на случай если в данных содержатся ошибки 
        '''
        if df is None:
            df = self.check_data()

        required_columns = ['created_at', 'd_utm_source', 'd_utm_medium', 'd_utm_campaign',
                            'm_clicks', 'm_cost', 'lead_id', 'purchase_id', 'm_purchase_amount']
        
        if not all(col in df.columns for col in required_columns):
            raise ValueError("DataFrame doesn't contain all required columns.")

        grouped = df.groupby(
            ['created_at', 'd_utm_source', 'd_utm_medium', 'd_utm_campaign'])
        aggregated = grouped.agg({
            'm_clicks': 'sum',
            'm_cost': 'sum',
            'lead_id': 'nunique',
            'purchase_id': 'nunique',
            'm_purchase_amount': 'sum'
        })
        aggregated['CPL'] = (aggregated['m_cost'] /
                             aggregated['lead_id']).round(2)
        aggregated['ROAS'] = aggregated['m_purchase_amount'] / aggregated['m_cost']
        aggregated['ROAS'] = aggregated['ROAS'].replace(np.inf, 0).round(2)
        aggregated['m_cost'] = aggregated['m_cost'].round(2)
        aggregated.reset_index(inplace=True)
        return aggregated
    

    def to_database(self, df: pd.DataFrame = None, table_name = 'end_to_end'):
        
        '''
        метод to_database() принимает агрегированные данные,
        по-умолчанию использует метод aggregate_data() для получения данных;
        проверяет соответствию необходимому типу; 
        проверяет есть ли подобная таблица в БД, если нет создает;
        при наличии похожей таблицы в добавляет новые  строчки 
        '''

        if df is None:
            df = self.aggregate_data()

        # Проверка типов данных 
        expected_types = {
            'created_at': 'datetime64[ns]',
            'd_utm_source': 'object',
            'd_utm_medium': 'object',
            'd_utm_campaign': 'object',
            'm_clicks': 'float64',
            'm_cost': 'float64',
            'lead_id': 'int64',
            'purchase_id': 'int64',
            'm_purchase_amount': 'float64',
            'CPL': 'float64',
            'ROAS': 'float64'
        }

        for col, dtype in expected_types.items():
            if df[col].dtype != dtype:
                raise ValueError(
                    f"Column '{col}' doesn't match expected data type {dtype}.")

        engine = create_engine(
            f"{self.database['drivername']}://{self.database['username']}:{self.database['password']}@"
            f"{self.database['host']}:{self.database['port']}/{self.database['database']}",
            connect_args=self.database['query']
        )
        
        table_name = table_name
        
        #Проверяет наличие таблицы в БД
        
        if not sqlalchemy.inspect(engine).has_table(table_name):
            df.to_sql(table_name, engine, index=False, if_exists='replace'
                        )
            print(
                f"Таблица {table_name} успешно создана и данные сохранены.")
        else:
            existing_data = pd.read_sql_table(table_name, engine)
            new_rows = df[~df.apply(tuple, 1).isin(
                existing_data.apply(tuple, 1))]
            if not new_rows.empty:
                new_rows.to_sql(table_name, engine, index=False, if_exists='append',
                                )
                print(f"Данные добавлены в таблицу {table_name}.")

            else:
                print(
                    f"Новые данные отсутствуют. Таблица {table_name} остается без изменений.")

In [182]:
pipeline = Pipeline()
data = pipeline.aggregate_data()
data

Unnamed: 0,created_at,d_utm_source,d_utm_medium,d_utm_campaign,m_clicks,m_cost,lead_id,purchase_id,m_purchase_amount,CPL,ROAS
0,2022-01-04,yandex,cpc,48306450,30.0,723.02,1,3,9990.0,723.02,13.82
1,2022-01-05,yandex,cpc,48306450,11.0,217.38,1,1,290.0,217.38,1.33
2,2022-01-06,yandex,cpc,48306450,24.0,451.90,1,2,380.0,451.90,0.84
3,2022-01-07,yandex,cpc,48306450,34.0,612.47,3,4,10080.0,204.16,16.46
4,2022-01-08,yandex,cpc,48306450,8.0,136.78,1,1,8200.0,136.78,59.95
...,...,...,...,...,...,...,...,...,...,...,...
274,2022-09-12,yandex,cpc,48306450,71.0,1987.80,2,4,17509.0,993.90,8.81
275,2022-09-13,yandex,cpc,48306450,212.0,6652.24,3,11,39214.0,2217.41,5.89
276,2022-09-13,yandex,cpc,48306473,2.0,23.33,1,1,12495.0,23.33,535.62
277,2022-09-14,yandex,cpc,48306435,4.0,163.24,1,1,9999.0,163.24,61.25
