In [1]:
import os
import pandas as pd
import datetime
import pangres
import hashlib

from dotenv import load_dotenv
from ftplib import FTP_TLS
from dateutil import parser
from sqlalchemy import create_engine
from sqlalchemy.sql import text

### Credentials

In [2]:
load_dotenv()
SOURCE_DB_URI = os.environ['SOURCE_DB_URI']
SOURCE_FTP_HOST = os.environ['SOURCE_FTP_HOST']
SOURCE_FTP_USER = os.environ['SOURCE_FTP_USER']
SOURCE_FTP_PWD = os.environ['SOURCE_FTP_PWD']
DWH_DB_URI = os.environ['DWH_DB_URI']

### Extract
На этот раз скачиваем только записи с последнего успешного запуска

In [3]:
source_db_conn = create_engine(SOURCE_DB_URI)
dwh_db_conn = create_engine(DWH_DB_URI)

In [4]:
# Для тестирования
last_etl_dt = datetime.datetime(2022, 11, 4, 14, 0, 0)
waybills_extract_dt = last_etl_dt - datetime.timedelta(hours=12)
rides_extract_dt =  last_etl_dt - datetime.timedelta(hours=2)

In [5]:
def get_delta_items(dir: str, dt: datetime.datetime):
    # Соединяемся по протоколу TLS
    ftps = FTP_TLS(host=SOURCE_FTP_HOST, user=SOURCE_FTP_USER, passwd=SOURCE_FTP_PWD)
    ftps.prot_p()
    ftps.cwd('../' + dir)
    items_to_extract = []
    items = []
    ftps.retrlines('LIST', items.append)
    for item in items:
        tokens = item.split(maxsplit = 9)
        name = tokens[8]
        timestamp_str = tokens[5] + " " + tokens[6] + " " + tokens[7]
        timestamp = parser.parse(timestamp_str)
        if timestamp > dt:
            items_to_extract.append(name)
    for item in items_to_extract:
        with open(os.path.join(dir + '/', item), 'wb') as file:
            ftps.retrbinary('RETR ' + item, file.write)
    ftps.quit()

In [6]:
get_delta_items('waybills', waybills_extract_dt)
get_delta_items('payments', last_etl_dt)

In [7]:
waybills = pd.DataFrame()

waybills_dir = os.getcwd() + '/waybills'
for file in sorted(os.listdir(waybills_dir)):
    if file.endswith('.xml'):
        waybills = pd.concat([waybills, (pd.read_xml('waybills/' + file, stylesheet='waybill.xsl'))], ignore_index=True)
    else:
        continue
        
waybills['issuedt'] = pd.to_datetime(waybills['issuedt'])
waybills['start'] = pd.to_datetime(waybills['start'])
waybills['stop'] = pd.to_datetime(waybills['stop'])
waybills

Unnamed: 0,issuedt,number,model,car,name,license,validto,start,stop
0,2022-11-04 02:01:03,АГ-921,Kia Rio,А274ЕТ197,Творовской Рамиль Сергеевич,68 65 808288,12.04.2023,2022-11-04 02:00:00,2022-11-04 10:00:00
1,2022-11-04 02:02:03,АГ-922,Kia Rio,К173СН190,Лисогоров Камиль Алексович,49 49 900114,27.04.2023,2022-11-04 02:00:00,2022-11-04 07:00:00
2,2022-11-04 02:04:03,АГ-923,Kia Rio,М919НР77,Удоненко Самир Мирославович,12 10 707581,28.04.2023,2022-11-04 02:00:00,2022-11-04 08:00:00
3,2022-11-04 02:12:04,АГ-924,Hyundai Solaris,Р030РС99,Мотаненко Борис Гордеевич,27 49 262016,17.04.2023,2022-11-04 02:00:00,2022-11-04 09:30:00
4,2022-11-04 02:13:04,АГ-925,Hyundai Solaris,Н990ВН750,Нумеранов Матвей Оскарович,67 10 141255,28.04.2023,2022-11-04 02:00:00,2022-11-04 04:30:00
...,...,...,...,...,...,...,...,...,...
104,2022-11-04 17:58:04,АД-026,Volkswagen Polo,Р177НТ77,Бревко Виктор Владимирович,89 36 606954,19.04.2023,2022-11-04 17:00:00,2022-11-04 20:30:00
105,2022-11-04 17:59:04,АД-027,Kia Rio,С214ТР777,Черноносов Аркадий Георгиевич,40 69 253321,12.04.2023,2022-11-04 17:00:00,2022-11-05 01:00:00
106,2022-11-04 17:59:04,АД-028,Kia Rio,О612ВХ190,Бубарев Абдулла Серафимович,46 17 449900,27.04.2023,2022-11-04 17:00:00,2022-11-04 22:00:00
107,2022-11-04 17:59:04,АД-029,Hyundai Solaris,К602СМ177,Мельницкой Марат Степанович,71 34 596283,30.04.2023,2022-11-04 17:00:00,2022-11-05 01:00:00


In [8]:
payments = pd.DataFrame()

payments_dir = os.getcwd() + '/payments'
for file in sorted(os.listdir(payments_dir)):
    if file.endswith('.csv'):
        payment = pd.read_csv('payments/' + file, sep='\t', names = ['transaction_dt', 'card_num', 'transaction_amt'])
        payments = pd.concat([payments, payment], ignore_index=True)
    else:
        continue

payments['transaction_id'] = payments['transaction_dt'].astype(str) + payments['card_num'].astype(str)
payments['transaction_id'] = payments['transaction_id'].apply(lambda x: hashlib.md5(x.encode()).hexdigest())
payments['transaction_dt'] = pd.to_datetime(payments['transaction_dt'], dayfirst=True)
payments = payments.set_index('transaction_id')
payments

Unnamed: 0_level_0,transaction_dt,card_num,transaction_amt
transaction_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
ba2bd4b65b64a2a764069a8551ffcd04,2022-11-04 14:04:03,5136914328783038,115.65
3f8be0f9373fd038ee3784228c7155a6,2022-11-04 14:05:04,4406668832871863,88.05
d665910a8cf68287e77f4c6a2d470e83,2022-11-04 14:05:04,5440583448953119,19.05
4ea005b124cd2d8200e9234677ae0c3f,2022-11-04 14:06:04,4841571322563631,431.10
1447f6fbe7de3fc21a43a6099fe1cbab,2022-11-04 14:07:04,4454353164481216,432.15
...,...,...,...
3170a31f30cadc7f0d8ce8f1d1591894,2022-11-04 17:52:04,4469557364625645,137.55
1afb155ee4d723c0f2564d22416c44da,2022-11-04 17:52:04,4627306132838612,152.10
4b2cf49778e3000f21183902140d73a2,2022-11-04 17:56:05,5254776868317840,144.60
cdb291f237c84be94d3a8e23fd6fbfb9,2022-11-04 17:57:04,4191520056603150,77.70


In [9]:
# Самая длительная поездка ~38 мин. Поэтому добавляем +2 часа к последнему успешному запуску для захвата записей, чтобы ничего не потерять.
rides = pd.read_sql(
    '''
    SELECT * 
    FROM main.rides 
    WHERE dt > %(dt)s
    ''', 
    source_db_conn,
    params={'dt': rides_extract_dt}
)
rides

Unnamed: 0,ride_id,dt,client_phone,card_num,point_from,point_to,distance,price
0,49636,2022-11-04 12:00:02,+7 (940) 018-91-98,4627 3083 4178 0544,"Артековская улица, 7 к2","Ленинградский проспект, 5 с1",12.59,188.85
1,49637,2022-11-04 12:01:02,+7 (966) 578-89-81,5290 4778 4268 5446,"улица Микояна, 10 к7","Осенняя улица, 3",15.04,225.60
2,49638,2022-11-04 12:01:02,+7 (969) 351-52-44,4047 5751 1170 9265,"Старокаширское шоссе, 2 к5","5-й Вербный проезд, 1",29.84,447.60
3,49639,2022-11-04 12:01:02,+7 (983) 933-70-11,4627 3019 1191 1789,"Новогорская улица, 71 к1","Большая Почтовая улица, 18/20 к4",6.72,100.80
4,49640,2022-11-04 12:02:02,+7 (954) 260-54-53,5489 9917 2143 1913,"Ботаническая улица, 7","Весенняя улица, 28",11.51,172.65
...,...,...,...,...,...,...,...,...
583,50219,2022-11-04 18:16:02,+7 (982) 991-87-13,5331 5719 3748 6057,"Отрадная улица, 3","Центральная улица, 63",10.91,163.65
584,50220,2022-11-04 18:17:01,+7 (995) 553-15-09,4191 5262 3626 7469,"Промышленная улица, 11А с41","улица Есенина, 4",9.16,137.40
585,50221,2022-11-04 18:18:01,+7 (990) 693-65-72,4390 5629 1435 8683,"проспект Андропова, 22 с2","Пятницкое шоссе, 45",25.83,387.45
586,50222,2022-11-04 18:18:01,+7 (928) 997-78-21,5545 6265 7587 9153,"Чермянский проезд, 5 с5","проезд Донелайтиса, 30",27.44,411.60


In [10]:
movement = pd.read_sql(
    '''
    SELECT * 
    FROM main.movement
    WHERE dt > '%s'
    ''' % rides_extract_dt, 
    source_db_conn, 
    index_col='movement_id')
movement['car_plate_num'] = movement['car_plate_num'].str.strip()
movement

Unnamed: 0_level_0,car_plate_num,ride,event,dt
movement_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
138377,Х253КЕ90,49634,READY,2022-11-04 12:00:04
138378,М423КТ197,49629,BEGIN,2022-11-04 12:00:04
138379,Т459РТ199,49599,END,2022-11-04 12:00:05
138380,Т459РТ199,49631,READY,2022-11-04 12:01:04
138381,Н798ОВ190,49630,BEGIN,2022-11-04 12:01:04
...,...,...,...,...
140009,Р071ХС77,50210,READY,2022-11-04 18:18:04
140010,Н798ОВ190,50215,READY,2022-11-04 18:18:04
140011,К405АР177,50212,BEGIN,2022-11-04 18:18:04
140012,Х253КЕ90,50214,BEGIN,2022-11-04 18:18:04


In [11]:
car_pool = pd.read_sql('SELECT * FROM main.car_pool', source_db_conn, index_col='plate_num')
car_pool.head()

Unnamed: 0_level_0,model,revision_dt,register_dt,finished_flg,update_dt
plate_num,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
М027МН199,Volkswagen Polo,2022-11-05,2022-10-12,N,2022-11-02 13:22:04
М685РА977,Kia Rio,2022-11-05,2022-10-12,N,2022-11-02 13:31:04
К931РК99,Kia Rio,2022-11-05,2022-10-12,N,2022-11-02 13:42:03
Р023ЕН77,Kia Rio,2022-11-05,2022-10-12,N,2022-11-02 13:43:04
О578РН77,Volkswagen Polo,2022-11-05,2022-10-12,N,2022-11-02 13:52:03


In [12]:
drivers = pd.read_sql('SELECT md5(last_name || first_name || middle_name || birth_dt) AS personnel_num, * FROM main.drivers', source_db_conn)
drivers

Unnamed: 0,personnel_num,driver_license,first_name,last_name,middle_name,driver_valid_to,card_num,update_dt,birth_dt
0,950b6ebd1b180e08f466599926a700b3,13 51 130647,Даниэль,Чемиренко,Артемьевич,2023-04-12,4205 4648 9442 7155,2022-10-12 12:47:03,1968-01-30
1,e45e6cce9975d936c7cc2b59381a6e7e,38 10 977977,Рослав,Головцов,Рославович,2023-04-12,7719 1583 7663 4759,2022-10-12 12:50:03,1976-11-04
2,ebb949357ead8c6986f11201bfbb73dd,34 48 464847,Алан,Сакобов,Тихонович,2023-04-12,1875 8923 3744 8928,2022-10-12 12:52:03,1978-08-18
3,f262885529fb4f01b1033799e64b8053,42 30 001788,Игорь,Тарлов,Ефимович,2023-04-12,2186 7201 8599 1460,2022-10-12 12:54:03,1993-12-28
4,2ffd09fe40ff00d40f051726d17c3cef,31 39 276519,Богдан,Свенюков,Юсуфович,2023-04-12,9590 3819 7544 6557,2022-10-12 12:54:03,1981-06-05
...,...,...,...,...,...,...,...,...,...
4021,1b0be372bb0b205c0794fd41d9325051,99 87 985081,Лев,Бузманов,Викторович,2023-05-04,4469 5832 1992 8554,2022-11-04 17:58:04,1986-10-30
4022,205ead1a034796091bff97fba3d77f48,50 05 347257,Богдан,Азавренюк,Прохорович,2023-05-04,4191 5274 0678 2535,2022-11-04 17:59:04,1979-05-12
4023,dd9aba0e2735ed87b5aece2d0050f0bb,21 33 074920,Антон,Сукаев,Игоревич,2023-05-04,4565 1562 7106 0245,2022-11-04 17:59:04,1986-03-20
4024,cf70e6231f4a0314422c07c4ad0bbeb3,91 16 717679,Ратмир,Биргин,Максимилианович,2023-05-04,4390 5611 3696 4740,2022-11-04 17:59:04,1973-10-06


## Transform Fact Tables

In [13]:
rides_time = movement.pivot(index='ride', columns='event', values='dt')
rides_time['END'] = rides_time['END'].fillna(rides_time['CANCEL'])
rides_time = rides_time.reindex(columns=['READY', 'BEGIN', 'END'])
rides_time = rides_time.rename(columns={'READY':'ride_arrival_dt', 'BEGIN':'ride_start_dt', 'END':'ride_end_dt'})
rides_time

event,ride_arrival_dt,ride_start_dt,ride_end_dt
ride,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
49599,NaT,NaT,2022-11-04 12:00:05
49605,NaT,NaT,2022-11-04 12:03:04
49607,NaT,NaT,2022-11-04 12:02:04
49612,NaT,NaT,2022-11-04 12:09:04
49613,NaT,NaT,2022-11-04 12:12:04
...,...,...,...
50213,2022-11-04 18:15:03,2022-11-04 18:17:04,NaT
50214,2022-11-04 18:17:04,2022-11-04 18:18:04,NaT
50215,2022-11-04 18:18:04,NaT,NaT
50216,2022-11-04 18:17:04,2022-11-04 18:17:04,NaT


In [14]:
rides_finish = movement.query("event == 'END' or event == 'CANCEL'")
rides_finish

Unnamed: 0_level_0,car_plate_num,ride,event,dt
movement_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
138379,Т459РТ199,49599,END,2022-11-04 12:00:05
138384,К602СМ177,49616,END,2022-11-04 12:01:04
138385,М027МН199,49618,END,2022-11-04 12:01:04
138387,Т561СО77,49607,END,2022-11-04 12:02:04
138389,У049НЕ799,49605,END,2022-11-04 12:03:04
...,...,...,...,...
139994,А274ЕТ197,50193,END,2022-11-04 18:15:03
139996,К405АР177,50184,END,2022-11-04 18:16:04
140005,Н798ОВ190,50209,CANCEL,2022-11-04 18:17:04
140008,Р071ХС77,50197,END,2022-11-04 18:17:04


In [15]:
# Подтягиваем время по законченным поездкам
# Осталось добавить номер водителя
fact_rides = rides_finish.merge(rides_time, how='left', on='ride')\
    .merge(rides, how='left', left_on='ride', right_on='ride_id', suffixes=['_event', '_begin'])
fact_rides = fact_rides[fact_rides.client_phone.notna()] # отсекаем поездки по которым попала только частичная информация
fact_rides

Unnamed: 0,car_plate_num,ride,event,dt_event,ride_arrival_dt,ride_start_dt,ride_end_dt,ride_id,dt_begin,client_phone,card_num,point_from,point_to,distance,price
11,У384НР90,49641,CANCEL,2022-11-04 12:11:04,2022-11-04 12:10:03,NaT,2022-11-04 12:11:04,49641.0,2022-11-04 12:03:02,+7 (962) 044-85-63,4276 4454 1683 8007,"Бескудниковский бульвар, 30 к3","улица Чёрное Озеро, 1",17.02,255.30
12,Р177НТ77,49643,CANCEL,2022-11-04 12:11:04,2022-11-04 12:10:03,NaT,2022-11-04 12:11:04,49643.0,2022-11-04 12:03:02,+7 (924) 228-80-23,4349 1443 4426 2413,"Чоботовская улица, 17 к1","Линейный проезд, 12",11.16,167.40
17,М506СН77,49656,CANCEL,2022-11-04 12:13:03,2022-11-04 12:13:03,NaT,2022-11-04 12:13:03,49656.0,2022-11-04 12:11:02,+7 (925) 130-30-12,5213 2435 2284 6201,"улица Свободы, 67 к2","Бирюлёвская улица, 12 к3",2.46,36.90
22,Т561СО77,49636,END,2022-11-04 12:14:04,2022-11-04 12:03:04,2022-11-04 12:04:04,2022-11-04 12:14:04,49636.0,2022-11-04 12:00:02,+7 (940) 018-91-98,4627 3083 4178 0544,"Артековская улица, 7 к2","Ленинградский проспект, 5 с1",12.59,188.85
23,К173СН190,49639,END,2022-11-04 12:14:04,2022-11-04 12:08:04,2022-11-04 12:09:04,2022-11-04 12:14:04,49639.0,2022-11-04 12:01:02,+7 (983) 933-70-11,4627 3019 1191 1789,"Новогорская улица, 71 к1","Большая Почтовая улица, 18/20 к4",6.72,100.80
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
577,А274ЕТ197,50193,END,2022-11-04 18:15:03,2022-11-04 18:04:04,2022-11-04 18:05:04,2022-11-04 18:15:03,50193.0,2022-11-04 17:59:02,+7 (974) 043-86-19,4276 8371 1417 4059,"Лихоборская набережная, 5А с2","4-я Линия, 133Б",9.54,143.10
578,К405АР177,50184,END,2022-11-04 18:16:04,2022-11-04 17:59:05,2022-11-04 18:08:04,2022-11-04 18:16:04,50184.0,2022-11-04 17:56:02,+7 (922) 354-33-20,5136 9161 7642 3468,"улица Мусы Джалиля, 5 к2","Садовая-Сухаревская улица, 14",11.42,171.30
579,Н798ОВ190,50209,CANCEL,2022-11-04 18:17:04,2022-11-04 18:17:04,NaT,2022-11-04 18:17:04,50209.0,2022-11-04 18:11:02,+7 (902) 682-16-15,4469 5831 9472 8227,"1-я Дубровская улица, 15 с28","улица Боженко, 11 к2",10.08,151.20
580,Р071ХС77,50197,END,2022-11-04 18:17:04,2022-11-04 18:08:04,2022-11-04 18:11:04,2022-11-04 18:17:04,50197.0,2022-11-04 18:02:02,+7 (943) 679-60-55,5316 8742 2880 4825,"Щёлковское шоссе, 77 с1","Высоковольтный проезд, 5 с5",6.96,104.40


In [16]:
# Достаем номер водителя через путевые листы
fact_waybills = waybills.merge(drivers[['personnel_num', 'driver_license']], how='left', left_on='license', right_on='driver_license')
fact_waybills

Unnamed: 0,issuedt,number,model,car,name,license,validto,start,stop,personnel_num,driver_license
0,2022-11-04 02:01:03,АГ-921,Kia Rio,А274ЕТ197,Творовской Рамиль Сергеевич,68 65 808288,12.04.2023,2022-11-04 02:00:00,2022-11-04 10:00:00,f35a03f104fec702cdd332d845bad954,68 65 808288
1,2022-11-04 02:02:03,АГ-922,Kia Rio,К173СН190,Лисогоров Камиль Алексович,49 49 900114,27.04.2023,2022-11-04 02:00:00,2022-11-04 07:00:00,5be16f7c03d6ce5560f85b2f9f024b5e,49 49 900114
2,2022-11-04 02:04:03,АГ-923,Kia Rio,М919НР77,Удоненко Самир Мирославович,12 10 707581,28.04.2023,2022-11-04 02:00:00,2022-11-04 08:00:00,30ea5d7402fa9d78639ab568fc6c5a2f,12 10 707581
3,2022-11-04 02:12:04,АГ-924,Hyundai Solaris,Р030РС99,Мотаненко Борис Гордеевич,27 49 262016,17.04.2023,2022-11-04 02:00:00,2022-11-04 09:30:00,74b8c9f0085cf1fcc6e01ee49d88b843,27 49 262016
4,2022-11-04 02:13:04,АГ-925,Hyundai Solaris,Н990ВН750,Нумеранов Матвей Оскарович,67 10 141255,28.04.2023,2022-11-04 02:00:00,2022-11-04 04:30:00,e98c59cf2ee43e298df8266cffc52565,67 10 141255
...,...,...,...,...,...,...,...,...,...,...,...
104,2022-11-04 17:58:04,АД-026,Volkswagen Polo,Р177НТ77,Бревко Виктор Владимирович,89 36 606954,19.04.2023,2022-11-04 17:00:00,2022-11-04 20:30:00,3b11570af5358abbc3bee6d495e14a5d,89 36 606954
105,2022-11-04 17:59:04,АД-027,Kia Rio,С214ТР777,Черноносов Аркадий Георгиевич,40 69 253321,12.04.2023,2022-11-04 17:00:00,2022-11-05 01:00:00,aba13b9effef05e596db39acec8ad9a8,40 69 253321
106,2022-11-04 17:59:04,АД-028,Kia Rio,О612ВХ190,Бубарев Абдулла Серафимович,46 17 449900,27.04.2023,2022-11-04 17:00:00,2022-11-04 22:00:00,135a2c3e86b53d7d55ba66bbb6aaa2f5,46 17 449900
107,2022-11-04 17:59:04,АД-029,Hyundai Solaris,К602СМ177,Мельницкой Марат Степанович,71 34 596283,30.04.2023,2022-11-04 17:00:00,2022-11-05 01:00:00,3e87c19bba0b13f1913ceb051dc58973,71 34 596283


In [17]:
fact_waybills = fact_waybills.dropna()[['number', 'personnel_num', 'car', 'start', 'stop', 'issuedt']]
fact_waybills = fact_waybills.rename(columns={
    'number':'waybill_num',
    'personnel_num':'driver_pers_num',
    'car':'car_plate_num',
    'start':'work_start_dt',
    'stop':'work_end_dt',
    'issuedt':'issue_dt'
})
fact_waybills = fact_waybills.set_index('waybill_num')
fact_waybills

Unnamed: 0_level_0,driver_pers_num,car_plate_num,work_start_dt,work_end_dt,issue_dt
waybill_num,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
АГ-921,f35a03f104fec702cdd332d845bad954,А274ЕТ197,2022-11-04 02:00:00,2022-11-04 10:00:00,2022-11-04 02:01:03
АГ-922,5be16f7c03d6ce5560f85b2f9f024b5e,К173СН190,2022-11-04 02:00:00,2022-11-04 07:00:00,2022-11-04 02:02:03
АГ-923,30ea5d7402fa9d78639ab568fc6c5a2f,М919НР77,2022-11-04 02:00:00,2022-11-04 08:00:00,2022-11-04 02:04:03
АГ-924,74b8c9f0085cf1fcc6e01ee49d88b843,Р030РС99,2022-11-04 02:00:00,2022-11-04 09:30:00,2022-11-04 02:12:04
АГ-925,e98c59cf2ee43e298df8266cffc52565,Н990ВН750,2022-11-04 02:00:00,2022-11-04 04:30:00,2022-11-04 02:13:04
...,...,...,...,...,...
АД-026,3b11570af5358abbc3bee6d495e14a5d,Р177НТ77,2022-11-04 17:00:00,2022-11-04 20:30:00,2022-11-04 17:58:04
АД-027,aba13b9effef05e596db39acec8ad9a8,С214ТР777,2022-11-04 17:00:00,2022-11-05 01:00:00,2022-11-04 17:59:04
АД-028,135a2c3e86b53d7d55ba66bbb6aaa2f5,О612ВХ190,2022-11-04 17:00:00,2022-11-04 22:00:00,2022-11-04 17:59:04
АД-029,3e87c19bba0b13f1913ceb051dc58973,К602СМ177,2022-11-04 17:00:00,2022-11-05 01:00:00,2022-11-04 17:59:04


In [18]:
# Наверное можно сделать как-то более элегантно, но я не придумал как :)
fact_rides['driver_pers_num'] = ''

for idx in fact_rides.index:
    fact_rides.loc[idx, 'driver_pers_num'] = fact_waybills[
        (fact_rides.loc[idx, 'car_plate_num'] == fact_waybills['car_plate_num']) & 
        (fact_rides.loc[idx, 'dt_begin'] > fact_waybills['work_start_dt']) & 
        (fact_rides.loc[idx, 'dt_begin'] < fact_waybills['work_end_dt'])
    ]['driver_pers_num'].values[0]

In [19]:
fact_rides = fact_rides[['ride', 'point_from', 'point_to', 'distance', 'price', 'client_phone', 'driver_pers_num', 'car_plate_num', 'ride_arrival_dt', 'ride_start_dt', 'ride_end_dt']]
fact_rides = fact_rides.rename(columns={
    'ride':'ride_id',
    'point_from':'point_from_txt',
    'point_to':'point_to_txt',
    'distance':'distance_val',
    'price':'price_amt',
    'client_phone':'client_phone_num'
})
fact_rides = fact_rides.set_index('ride_id')
fact_rides

Unnamed: 0_level_0,point_from_txt,point_to_txt,distance_val,price_amt,client_phone_num,driver_pers_num,car_plate_num,ride_arrival_dt,ride_start_dt,ride_end_dt
ride_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
49641,"Бескудниковский бульвар, 30 к3","улица Чёрное Озеро, 1",17.02,255.30,+7 (962) 044-85-63,c5ce8e6828667ae8cfd3e943832e50ec,У384НР90,2022-11-04 12:10:03,NaT,2022-11-04 12:11:04
49643,"Чоботовская улица, 17 к1","Линейный проезд, 12",11.16,167.40,+7 (924) 228-80-23,256ea2ac67e4d97447b0acfd756f2c75,Р177НТ77,2022-11-04 12:10:03,NaT,2022-11-04 12:11:04
49656,"улица Свободы, 67 к2","Бирюлёвская улица, 12 к3",2.46,36.90,+7 (925) 130-30-12,eea9f179c9d467084a645a4cc1d22924,М506СН77,2022-11-04 12:13:03,NaT,2022-11-04 12:13:03
49636,"Артековская улица, 7 к2","Ленинградский проспект, 5 с1",12.59,188.85,+7 (940) 018-91-98,4c8953236b6159d9f26303d0e12bf882,Т561СО77,2022-11-04 12:03:04,2022-11-04 12:04:04,2022-11-04 12:14:04
49639,"Новогорская улица, 71 к1","Большая Почтовая улица, 18/20 к4",6.72,100.80,+7 (983) 933-70-11,55c115e5e7d08da9c4da9900479f8432,К173СН190,2022-11-04 12:08:04,2022-11-04 12:09:04,2022-11-04 12:14:04
...,...,...,...,...,...,...,...,...,...,...
50193,"Лихоборская набережная, 5А с2","4-я Линия, 133Б",9.54,143.10,+7 (974) 043-86-19,82444016268d7734fee3075d42672ff2,А274ЕТ197,2022-11-04 18:04:04,2022-11-04 18:05:04,2022-11-04 18:15:03
50184,"улица Мусы Джалиля, 5 к2","Садовая-Сухаревская улица, 14",11.42,171.30,+7 (922) 354-33-20,0bb3aa774626934735c397b595972cf5,К405АР177,2022-11-04 17:59:05,2022-11-04 18:08:04,2022-11-04 18:16:04
50209,"1-я Дубровская улица, 15 с28","улица Боженко, 11 к2",10.08,151.20,+7 (902) 682-16-15,71dd48ad6c6995a43d5f6a181bd4dce6,Н798ОВ190,2022-11-04 18:17:04,NaT,2022-11-04 18:17:04
50197,"Щёлковское шоссе, 77 с1","Высоковольтный проезд, 5 с5",6.96,104.40,+7 (943) 679-60-55,377fa2ab215f616339c353b79e688434,Р071ХС77,2022-11-04 18:08:04,2022-11-04 18:11:04,2022-11-04 18:17:04


## Load Fact Tables

In [20]:
pangres.upsert(
    con=dwh_db_conn, 
    df=fact_rides,
    table_name='fact_rides',
    if_row_exists='ignore',
    schema='dwh_kazan'
)

In [21]:
pangres.upsert(
    con=dwh_db_conn, 
    df=fact_waybills,
    table_name='fact_waybills',
    if_row_exists='ignore',
    schema='dwh_kazan'
)

In [22]:
pangres.upsert(
    con=dwh_db_conn, 
    df=payments,
    table_name='fact_payments',
    if_row_exists='ignore',
    schema='dwh_kazan'
)

## Update Dim Tables

In [20]:
###############
# dim_cars table

# Get car_pool updates
updated_cars = pd.read_sql(
    """
    SELECT plate_num,
        update_dt AS start_dt,
        model AS model_name,
        revision_dt,
        '9999-01-01 00:00:00' AS end_dt
    FROM main.car_pool
    WHERE update_dt > %(dt)s
    """, 
    source_db_conn,
    params={'dt': last_etl_dt}
)
updated_cars.plate_num = updated_cars.plate_num.str.strip()
updated_cars

Unnamed: 0,plate_num,start_dt,model_name,revision_dt,end_dt


In [6]:
# Upload car_pool updates to temp_table
updated_cars.to_sql('work_temp_table', con=dwh_db_conn, schema='dwh_kazan', index=False, if_exists='replace')

0

In [10]:
# Update end_dt for previously loaded rows
dwh_db_conn.execute(
    """
    UPDATE dwh_kazan.dim_cars AS f
    SET end_dt = t.start_dt::TIMESTAMP - INTERVAL '1 second'
    FROM dwh_kazan.work_temp_table AS t
    WHERE f.plate_num = t.plate_num
    AND f.end_dt = '9999-01-01 00:00:00'
    """
)

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f14ffc00bb0>

In [None]:
# Upload car_pool updates to target table
updated_cars.to_sql('dim_cars', con=dwh_db_conn, schema='dwh_kazan', index=False, if_exists='append')

In [21]:
#################
# dim_drivers

# Get drivers updates
updated_drivers = pd.read_sql(
    '''
    SELECT 
        md5(last_name || first_name || middle_name || birth_dt) AS personnel_num,
        update_dt AS start_dt,
        last_name,
        first_name,
        middle_name,
        birth_dt,
        card_num,
        driver_license AS driver_license_num,
        driver_valid_to AS driver_license_dt,
        '9999-01-01 00:00:00' AS end_dt
    FROM main.drivers
    WHERE update_dt > %(dt)s
    ''', 
    source_db_conn,
    params={'dt': last_etl_dt}
)
updated_drivers

Unnamed: 0,personnel_num,start_dt,last_name,first_name,middle_name,birth_dt,card_num,driver_license_num,driver_license_dt,end_dt
0,4531e0e8cc3bd2c1a6fb2c080c0d460a,2022-11-04 14:10:04,Денишевский,Мирослав,Кириллович,1994-06-21,4406 6676 8238 4795,63 00 566786,2023-05-04,9999-01-01 00:00:00
1,39f65264bf982bc3adea7cf63b90e53c,2022-11-04 14:22:04,Щепановский,Умар,Кузьмич,1999-09-13,4276 8380 2023 8038,04 24 631118,2023-05-04,9999-01-01 00:00:00
2,949a467c50f373ea428039e55c2b5ec0,2022-11-04 14:23:04,Апокашев,Эмир,Даниилович,1995-03-04,4191 5266 8878 6917,44 59 773194,2023-05-04,9999-01-01 00:00:00
3,05f6e94c3c0073b1e9dc87753c83bb4a,2022-11-04 14:23:04,Заскалицкий,Савва,Аркадиевич,2000-05-12,4469 5871 0513 5745,73 69 388992,2023-05-04,9999-01-01 00:00:00
4,f2c961effae11fdb1db7b069116eeac9,2022-11-04 14:24:03,Аубокиров,Иса,Павлович,1972-12-02,4478 1801 5869 5663,39 76 437765,2023-05-04,9999-01-01 00:00:00
5,71019510b56112f48d0b64987e661efc,2022-11-04 14:24:03,Памфилович,Семен,Микаилович,1984-08-02,5264 8343 7614 1687,94 43 806012,2023-05-04,9999-01-01 00:00:00
6,fb8a7de4bf088f04f5a505cd6c22743b,2022-11-04 14:24:03,Салпиков,Ян,Львович,1998-02-26,5290 4766 4288 0165,18 08 144677,2023-05-04,9999-01-01 00:00:00
7,c5e787eb2d61d93d63f87decff1779a2,2022-11-04 15:01:04,Авдышоев,Вадим,Гордеевич,1989-08-05,5406 1667 9175 8434,65 38 267448,2023-05-04,9999-01-01 00:00:00
8,6145a2fe18069af0c654006de3ba31f7,2022-11-04 15:02:04,Брудзинский,Андрей,Геннадиевич,1972-12-05,4627 3015 2644 2121,65 49 762812,2023-05-04,9999-01-01 00:00:00
9,076138facc26fd99ea06f20569531922,2022-11-04 15:02:04,Шантаренко,Валерий,Артурович,1984-04-27,5235 2651 4384 4731,74 67 267769,2023-05-04,9999-01-01 00:00:00


In [15]:
# Upload drivers updates to temp_table
updated_drivers.to_sql('work_temp_table', con=dwh_db_conn, schema='dwh_kazan', index=False, if_exists='replace')

111

In [16]:
# Update end_dt for previously loaded rows
dwh_db_conn.execute(
    """
    UPDATE dwh_kazan.dim_drivers AS f
    SET end_dt = t.start_dt::TIMESTAMP - INTERVAL '1' second
    FROM dwh_kazan.work_temp_table AS t
    WHERE f.personnel_num = t.personnel_num
        AND f.end_dt = '9999-01-01 00:00:00'
    """
)

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f4a41f063a0>

In [17]:
# Upload drivers updates to target table
updated_drivers.to_sql('dim_drivers', con=dwh_db_conn, schema='dwh_kazan', index=False, if_exists='append')

111

In [33]:
#################
# dim_clients

# Get clients updates
updated_clients = pd.read_sql(
    '''
    SELECT
        client_phone AS phone_num,
        dt AS start_dt,
        card_num,
        LEAD (dt, 1, '9999-01-01 00:00:01') OVER (PARTITION BY client_phone ORDER BY dt) - INTERVAL '1 second' AS end_dt
    FROM
    (
        SELECT 
            client_phone,
            card_num,
            MIN(dt) AS dt
        FROM main.rides
        GROUP BY client_phone, card_num
    ) AS cards
    WHERE dt > %(dt)s
    ''',
    source_db_conn,
    params={'dt': last_etl_dt}
)
updated_clients

Unnamed: 0,phone_num,start_dt,card_num,end_dt
0,+7 (900) 053-83-92,2022-11-04 15:03:02,6766 3688 7668 7379,9999-01-01 00:00:00
1,+7 (900) 409-69-40,2022-11-04 14:56:01,4390 5628 3150 2215,9999-01-01 00:00:00
2,+7 (900) 498-53-29,2022-11-04 15:41:02,4909 8623 7550 1670,9999-01-01 00:00:00
3,+7 (900) 608-56-58,2022-11-04 14:39:02,5290 4762 0325 5294,9999-01-01 00:00:00
4,+7 (900) 921-36-72,2022-11-04 15:17:01,4908 5518 6948 5539,9999-01-01 00:00:00
...,...,...,...,...
301,+7 (998) 771-79-62,2022-11-04 16:46:01,5213 2436 1407 4837,9999-01-01 00:00:00
302,+7 (998) 915-26-35,2022-11-04 15:46:01,4909 8618 1535 4377,9999-01-01 00:00:00
303,+7 (998) 918-22-87,2022-11-04 14:09:01,4183 8485 7453 5863,9999-01-01 00:00:00
304,+7 (999) 248-87-97,2022-11-04 14:24:01,5331 5736 2745 9873,9999-01-01 00:00:00


In [35]:
# Upload clients updates to temp_table
updated_clients.to_sql('work_temp_table', con=dwh_db_conn, schema='dwh_kazan', index=False, if_exists='replace')

306

In [36]:
### Update end_dt for previously loaded rows
dwh_db_conn.execute(
    '''
    UPDATE dim_clients AS f
    SET end_dt = t.start_dt - INTERVAL '1 second'
    FROM work_temp_table AS t
    WHERE f.phone_num = t.phone_num
        AND f.end_dt = '9999-01-01 00:00:00'
        AND t.card_num != f.card_num
    '''
)

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f7766cc0160>

In [37]:
# Upload clients updates to dim_clients
updated_clients.to_sql('dim_clients', con=dwh_db_conn, schema='dwh_kazan', index=False, if_exists='append')

306

In [None]:
# Deduplicate dim_clients after adding new rows
dwh_db_conn.execute(
    '''
    DELETE
    FROM dim_clients dc1
    USING dim_clients dc2
    WHERE dc1.ctid < dc2.ctid
    AND dc1.phone_num = dc2.phone_num
    AND dc1.start_dt = dc2.start_dt;
    '''
)

## Set Last ETL datetime

In [38]:
# Set success event
dwh_db_conn.execute(
    text("INSERT INTO dwh_kazan.work_batchdate (loaded_until, status) VALUES(:dt, :st)"),
    {'dt': last_etl_dt, 'st': 'Success'}
)

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f49fb497a90>

In [28]:
# Get last success event
dwh_db_conn.execute(
    '''
    SELECT COALESCE(MAX(bd.loaded_until), '1900-01-01 00:00:00')
    FROM dwh_kazan.work_batchdate AS bd
    WHERE bd.status = 'Success'
    '''
).fetchone()[0]

datetime.datetime(2022, 10, 28, 19, 49)

In [38]:
# Drop temporary tables
dwh_db_conn.execute('DROP TABLE IF EXISTS work_temp_table, work_dim_clients_copy')

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f7766cbde20>