In [3]:
import pandas as pd
import numpy as np
import requests
import re

import json
import time
import datetime
import warnings
from sqlalchemy import create_engine
from sqlalchemy import text

from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

### Выгружаю данные из реестра оплат

In [4]:
### Создадим движок для postgresql
engine = create_engine('postgresql://hidden')

### **extract**

    Берем все данные из enriched с не пустыми email и дата
    Удаляем дубли в партиции по id транзакции 
    отдельным выражением выгружаем рассрочку с долей < 1 и > 0 чтобы пронумеровать покупки внутри airtable_id
    Затем отсортируем по клиент(airtable_id), amount, date и сгруппируем транзакции в группы по 4 платежа, присваиваем им 
    номер - так создаем package_id_numeric для 4 транзакции из 1 продажи по рассрочке.
    Выгружаем все остальные транзакции и нумеруем их также для создания package_id_numeric для продаж.



In [13]:
query = '''WITH ab AS (
                        SELECT * FROM 
                        (
                            SELECT дата as date,
                                   email as client_email, 
                                   телефон,
                                   amo_id,
                                   regexp_replace("source_(from_клиент)", '[{}]', '', 'g') as source,
                                   regexp_replace(клиент, '[{}]', '', 'g') as клиент,
                                   regexp_replace(предмет, '[{}]', '', 'g') as предмет,
                                   regexp_replace(класс, '[{}]', '', 'g') as класс,
                                   сумма as amount,
                                   сумма_заказа as amount_full, 
                                   transactionid as idtransaction,
                                   доля as fractional, 
                                   статус as status, 
                                   "пакет_(from_клиент)" as minutes,
                                   regexp_replace("bubble_id_(from_клиент)", '[{}]', '', 'g') as bubble_id,
                                   ROW_NUMBER() OVER (PARTITION BY transactionid, email, дата::date ORDER BY "дата") AS rn 
                            FROM level_1_raw_data.payment_register_enriched
                            WHERE email is not null and дата is not null
                        ) as aaa
                        where rn=1
),           
           
           
fractional as (SELECT date,
                      client_email,
                      телефон,
                      amo_id,
                      source,
                      amount,
                      amount_full,
                      idtransaction,
                      fractional,
                      status,
                      клиент,
                      предмет,
                      класс,
                      minutes,
                      bubble_id,
                      ROW_NUMBER() OVER (PARTITION BY клиент ORDER BY date) AS row_num
                FROM ab
                WHERE fractional > 0 
                AND fractional < 1
),
        
ranked_fract as (SELECT 
                        date,
                        client_email,
                        телефон,
                        amo_id,
                        source,
                        amount,
                        клиент,
                        предмет,
                        класс,
                        amount_full,
                        idtransaction,
                        fractional,
                        status,
                        minutes,
                        bubble_id,
                        (row_num-1)/4+1 AS package_number_of_client
                 FROM fractional
                 ORDER BY
                    клиент, amount, date
),

ranked_non_fract as (SELECT  date,
                             client_email,
                             телефон,
                             amo_id,
                             source,
                             amount,
                             клиент,
                             предмет,
                             класс,
                             amount_full,
                             idtransaction,
                             fractional,
                             status,
                             minutes,
                             bubble_id,
                             100000 + ROW_NUMBER() OVER (PARTITION BY клиент ORDER BY date) AS package_number_of_client
                      FROM ab
                      WHERE fractional = 0 or fractional = 1 or fractional is null
),

date_of_package as (
                    SELECT  
                            клиент,
                            package_number_of_client, 
                            min(date) as date_of_purchase
                    FROM (select * from ranked_fract union select * from ranked_non_fract) as aaa
                    GROUP BY клиент, package_number_of_client
),

date_of_package_phone as (
                    SELECT 
                        телефон, 
                        min(date) as first_date_of_purchase_phone
                    FROM ab
                    GROUP BY телефон
),

date_of_package_email as (
                    SELECT 
                        клиент, 
                        min(date) as first_date_of_purchase_email
                    FROM ab
                    GROUP BY клиент
)

SELECT date, 
       client_email, 
       клиент,
       предмет,
       класс,
       телефон, 
       amo_id,
       source,
       amount, 
       amount_full, 
       idtransaction, 
       fractional, 
       status,
       minutes, 
       date_of_purchase, 
       first_date_of_purchase_phone, 
       first_date_of_purchase_email, 
       package_number_of_client,
       bubble_id,
DENSE_RANK() OVER (partition by клиент ORDER BY date_of_purchase, package_number_of_client) AS package_id_numeric,
DENSE_RANK() OVER (partition by телефон ORDER BY date_of_purchase ) AS package_id_by_phone


FROM 
(
    select fd.*, 
           date_of_purchase, 
           first_date_of_purchase_phone, 
           first_date_of_purchase_email 
    from (select * from ranked_fract union select * from ranked_non_fract) as fd
    join date_of_package as dp
        on fd.клиент = dp.клиент 
         and fd.package_number_of_client = dp.package_number_of_client
    left join date_of_package_phone dpp
        on fd.телефон = dpp.телефон
    left join date_of_package_email dpe
        on fd.клиент = dpe.клиент
) as aaa
order by клиент, date;
'''

In [14]:
raw_data = pd.read_sql_query(query, engine)

### **transform**

    Создание признака первичной и вторичной оплаты на основе номера покупки клиента, где если покупка = 1 то это первичная продажа и все остальные это вторичные - присваиваем 0
    Создание колонки с видом оплаты на основе idtransaction :
        'tutgoodsplit' - это сплит 
         'tutgoodYM' - сбербанк
         'sbp_' - платежи по СБП
         'tutgood_16' - долями
         Циферный id с долей = 1 - это полные платежи 
         Циферный id с долей < 1 & > 0 - это внутренняя рассрочка

    Распаковка от {} в колонке статуса, минут, airtable_id и amo_id
    Создание итогового package_id на основе создаваемого package_id_numeric из шага выгрузки.
    Создание столбца с нумерацией внутри каждого package_id - это будет rn_package_id



In [50]:
def transform_transactions_data(raw_data):
    

    # Создание признака первичной и вторичной оплаты на основе номера покупки клиента
    raw_data['first_sale'] = raw_data['package_id_numeric'].apply(lambda x: 1 if x==1 else 0)
    
    
    
    # Функция для определения очереди продажи
    def first_sale_fully(first_sale):
        if first_sale == 1:
            return 'Первичная продажа'
        else:
            return 'Вторичная продажа'

    # Создаем новую колонку first_sale_full
    raw_data['first_sale_full'] = raw_data['first_sale'].apply(first_sale_fully)
    
    
    
    # Создание колонки с видом оплаты на основе idtransaction
    numeric_ids = raw_data[raw_data['idtransaction'].notna()]

    # Создаем регулярное выражение для поиска id c только цифрами
    regex = re.compile('^[0-9]+$')
    numeric_id = numeric_ids[numeric_ids['idtransaction'].str.match(regex)]


    # Создаем списки idtransaction для каждого вида оплаты
    tutgoodsplit_ids = numeric_ids[numeric_ids['idtransaction'].str.contains('tutgoodsplit')]['idtransaction'].tolist()
    tutgoodsplit_sberbank = numeric_ids[numeric_ids['idtransaction'].str.contains('tutgoodYM')]['idtransaction'].tolist()
    tutgoodsplit_sbp = numeric_ids[numeric_ids['idtransaction'].str.contains('sbp_')]['idtransaction'].tolist()
    tutgood_dol_ids = numeric_ids[numeric_ids['idtransaction'].str.contains('tutgood_16')]['idtransaction'].tolist()
    regular_ids = numeric_id[numeric_id['fractional'] == 1.0]['idtransaction'].tolist()
    installment_ids = numeric_id[(numeric_id['fractional'] > 0) & (numeric_id['fractional'] < 1)]['idtransaction'].tolist()

    
    
    # Функция для определения вида оплаты
    def determine_payment_type(idtransaction):
        if idtransaction in tutgoodsplit_ids:
            return 'Сплит'
        elif idtransaction in tutgood_dol_ids:
            return 'Доля'
        elif idtransaction in regular_ids:
            return 'Полный'
        elif idtransaction in installment_ids:
            return 'Рассрочка'
        elif idtransaction in tutgoodsplit_sberbank:
            return 'Сбербанк'
        elif idtransaction in tutgoodsplit_sbp:
            return 'СБП'
        else:
            return 'Неопределен'

    # Создаем новую колонку payment_type
    raw_data['payment_type'] = raw_data['idtransaction'].apply(determine_payment_type)

    
    

    # Преобразование дф перед загрузкой
    transactions = raw_data[[    'date',
                                 'idtransaction',
                                 'client_email',
                                 'клиент',
                                 'предмет',
                                 'класс',
                                 'amo_id',
                                 'source',
                                 'телефон',
                                 'amount',
                                 'amount_full',
                                 'fractional',
                                 'status',
                                 'package_id_numeric',
                                 'minutes',
                                 'payment_type',
                                 'first_date_of_purchase_phone',
                                 'package_id_by_phone',
                                 'first_sale'
                            ]]

    
    transactions = transactions.rename(columns={'телефон':'phone'})

    # Распаковка от {} в колонке статуса и минут
    for index, row in transactions.iterrows():
        for column in ['status', 'minutes','клиент','amo_id','source','предмет','класс']:
            value = row[column]
            if isinstance(value, str):
                value_parts = value.strip("{}")
                transactions.at[index, column] = value_parts


    # Правка типов данных
    transactions['amount'] = transactions['amount'].astype('float')
    transactions['amount_full'] = transactions['amount_full'].astype('float')
    transactions['fractional'] = transactions['fractional'].astype('float')
    
    # Новая строка создания колонки времени
    
    transactions['time'] = pd.to_datetime(transactions['date'])
    
    transactions['date'] = pd.to_datetime(transactions['date']).dt.date
    transactions['date'] = pd.to_datetime(transactions['date'])
    
    transactions['minutes'] = transactions['minutes'].str.replace('минут','')
    transactions['minutes'] = transactions['minutes'].str.replace('"','')
    transactions['minutes'] = transactions['minutes'].str.replace(' ','')
    
    transactions['phone'] = transactions['phone'].str.replace('+','')
    transactions['phone'] = transactions['phone'].str.replace('(','')
    transactions['phone'] = transactions['phone'].str.replace(')','')


    # Сортировка данных по дате, клиенту и номеру покупки
    transactions = transactions.sort_values(['клиент','date','first_sale'])
    
    # Создание уникального id пакета
    transactions['package_id'] = pd.factorize(list(zip(transactions['клиент'], \
                                                            transactions['package_id_numeric'])))[0] + 1
    
    transactions['minutes'] = pd.to_numeric(transactions['minutes'], downcast="integer")  
                                            
                                            


    def new_minutes(minutes):
        
        if minutes in [200, 500, 1000, 1500, 3000, 6000, 2000, 4000, 7500, 9000]:
            return minutes
        
        if minutes in [950, 550, 5000, 900, 2500, 400, 1850, 1250, 100, 10000, 300,
                       3750, 750, 600, 250]:
            return 'пакеты старого тарифа'
        else:
            return None
        


    transactions['new_minutes'] = transactions['minutes'].apply(new_minutes)
    
    
    
    # Сначала отсортируем DataFrame по package_id и data
    transactions = transactions.sort_values(by=['package_id', 'date'])
    
    # Добавим столбец с нумерацией внутри каждого package_id
    transactions['rn_package_id'] = transactions.groupby('package_id').cumcount() + 1
    
    
    # Функция для создания признака депозита
    def deposit(amount):
        if amount == 1000:
            return True
        else:
            return False

    # Создаем новую колонку deposit
    transactions['deposit'] = transactions['amount'].apply(deposit)
    
    

    
    return transactions



transactions = transform_transactions_data(raw_data=raw_data)


In [51]:
transactions[transactions['client_email'] == 'kaloeva_o@mail.ru']

Unnamed: 0,date,idtransaction,client_email,клиент,предмет,класс,amo_id,source,phone,amount,...,minutes,payment_type,first_date_of_purchase_phone,package_id_by_phone,first_sale,time,package_id,new_minutes,rn_package_id,deposit
19016,2024-02-14,2087025923,kaloeva_o@mail.ru,recMVuu8LU0yU9QMe,Математика,"""9 класс""",30445137,yandex,79320506621,6747.0,...,,Рассрочка,2023-11-02 11:06:42,2,1,2024-02-14 16:47:00,10478,,1,False
19017,2024-02-28,2093599631,kaloeva_o@mail.ru,recMVuu8LU0yU9QMe,Математика,"""9 класс""",30445137,yandex,79320506621,6747.0,...,,Рассрочка,2023-11-02 11:06:42,2,1,2024-02-28 16:48:00,10478,,2,False
24561,2023-11-02,2013861623,kaloeva_o@mail.ru,recRz8ENcD3FlTeGJ,Математика,"""9 класс""",30445137,yandex,79320506621,9122.0,...,2000.0,Рассрочка,2023-11-02 11:06:42,1,1,2023-11-02 11:06:42,13165,2000.0,1,False
24562,2023-11-16,2030540532,kaloeva_o@mail.ru,recRz8ENcD3FlTeGJ,Математика,"""9 класс""",30445137,yandex,89320506621,9122.0,...,2000.0,Рассрочка,2023-11-16 20:01:10,1,1,2023-11-16 20:01:10,13165,2000.0,2,False
24563,2024-01-31,2080277337,kaloeva_o@mail.ru,recRz8ENcD3FlTeGJ,Математика,"""9 класс""",30445137,yandex,79320506621,6747.0,...,2000.0,Рассрочка,2023-11-02 11:06:42,1,1,2024-01-31 13:46:56,13165,2000.0,3,False


In [52]:
transactions[transactions['client_email'] == 'ula89234622454@mail.ru']

Unnamed: 0,date,idtransaction,client_email,клиент,предмет,класс,amo_id,source,phone,amount,...,minutes,payment_type,first_date_of_purchase_phone,package_id_by_phone,first_sale,time,package_id,new_minutes,rn_package_id,deposit
2608,2024-02-23,tutgoodsplitnew_1708699520754,ula89234622454@mail.ru,rec4vJKTFbHeBs6hk,Математика,"""6 класс""",28558359,yandex,79234622454,21990.0,...,1000.0,Сплит,2023-10-23 14:43:35,3,1,2024-02-23 17:49:30,2312,1000.0,1,False
12476,2023-10-23,tutgoodsplit_1698061186436,ula89234622454@mail.ru,recgtUw730u16dVx1,Математика,"""6 класс""",28558359,yandex,79234622454,17990.0,...,1000.0,Сплит,2023-10-23 14:43:35,1,1,2023-10-23 14:43:35,19901,1000.0,1,False


## Загрузка в postgres


In [12]:
# transactions.to_sql(
#     name='transactions',
#     con=engine,
#     if_exists='replace',
#     index=False,
#     schema='level_2_analytical_tables',
#     chunksize=10000
# )

2944

-------------------------



-------------------------
**Валидация данных**


Будем проверять только последений день, где выручка должны быть не меньше 100 000 р чтобы не отображать данные при падении дагов 

In [56]:
from datetime import timedelta
import datetime as dt

In [61]:
def sum_amount_for_last_day(data):
    
    # Получаем вчерашнюю дату
    yesterday = pd.Timestamp.today().date() - timedelta(days=1)
    
    check_amount = transactions[transactions['date'].dt.date == yesterday]['amount'].sum()
    
    check_amount = pd.DataFrame({'amount_sum': [check_amount]})

    return check_amount
    
    

In [62]:
check_amount = sum_amount_for_last_day(transactions)
check_amount

Unnamed: 0,amount_sum
0,1006779.11


In [52]:
import great_expectations as ge

# Загрузка данных в Great Expectations
ge_df = ge.from_pandas(check_amount)

result = ge_df.validate(expectation_suite="parametrs_validate_money.json", result_format='COMPLETE')

if result['success']:
    print("Данные прошли валидацию.")
else:
    print("Данные не соответствуют ожиданиям:")
    for i in range(len(result.results)):
        if result.results[i].success is False:
            print(result.results[i])



Данные прошли валидацию.


In [53]:
# Загрузка данных в Great Expectations
ge_df = ge.from_pandas(check_amount)

In [54]:
result = ge_df.validate(expectation_suite="parametrs_validate_money.json", result_format='COMPLETE')

In [55]:
    if result['success']:
        print("Данные прошли валидацию.")
    else:
        print("Данные не соответствуют ожиданиям:")
        for i in range(len(result.results)):
            if result.results[i].success is False:
                print(result.results[i])



Данные прошли валидацию.
