# Imports and connections

In [2]:
from sqlalchemy import create_engine, text

import pandas as pd
from getpass import getpass

In [13]:
host = 'test-task-rto.c5qems882moh.ap-southeast-1.rds.amazonaws.com:5432'
user = 'user_test'
database_name = 'postgres'

In [10]:
password = getpass()

 ········


# Data load

In [14]:
engine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}/{database_name}')

Для данного продукта, произведем сразу предобработку в sql с фильтрацией маркированных записей, а также преобразуем чтобы каждая строка содержала информацию о каждой сделки, а не о конкретных событиях в сделке

Логика запроса:

1. Будем считать что сделка является закрытой полностью после последнего закрывающего события. Для того чтобы узнать точную дату открытия сделки и ее закрытия, будем выбирать минимальную дату и максимальную дату соответственно у событий для сделки.

При этом для сохранения остальных полей в случае отсутствия данных в одной из получившихся таблиц с событиями открытия и закрытия, при джойне используем оператор COALESCE. (Можно было сделать после присоединение этих полей через left join и row number + partition by positionid , однако мы все равно таким бы образом обратились опять ко всей таблице mt5_deals при этом еще и нагрузив сортировкой внутри partition by)

2. Выбираем маркированные записи как и для продукта mt4

3. Фильтруем сделки по признаку маркировки


Таким образом после предобработки данных **для расчета метрик можно применить такой же алгоритм как и для mt4** с некоторыми модификациями по колонкам

Комментарии по коду расчета метрик в notebook к mt-4

In [32]:
query = f'''with 
            positions_and_dates as 
            (
            	select COALESCE(t_open_date.positionid, t_close_date.positionid) as positionid,
            		   t_open_date.open_time,
            		   t_close_date.close_time,
            		   COALESCE(t_open_date.login, t_close_date.login) as login,
            		   COALESCE(t_open_date.symbol, t_close_date.symbol) as symbol,
            		   COALESCE(t_open_date."action", t_close_date."action") as "action"
            	from 
            	(	select positionid,
            			   login,
            			   symbol,
            			   "action",
            			   MIN("time") as open_time
            		from hr_vacancies.mt5_deals
            		where entry = '0'
            		group by positionid, login, symbol, "action" ) as t_open_date
            	FULL JOIN
            		(select positionid,
            				login,
            				symbol,
            				"action",
            			    MAX("time") as close_time
            		from hr_vacancies.mt5_deals
            		where entry = '1'
            		group by  positionid, login, symbol, "action") as t_close_date
            	on t_open_date.positionid = t_close_date.positionid
            ),
            marked_trades as 
            (
            	SELECT positionid, 
            		   1 as marked
            	FROM hr_vacancies.mt5_marked_trades mt
            	WHERE (cast(mt."type" as INTEGER)  & 2) = 2
            ),
            clear_trades as (
            	select positions_and_dates.* from positions_and_dates
            	left join marked_trades
            	on positions_and_dates.positionid = marked_trades.positionid
            	where marked is null
            )
            select * from clear_trades
         '''
df_trades_mt5 = pd.read_sql(query, con=engine) 

In [33]:
df_trades_mt5.head(5)

Unnamed: 0,positionid,open_time,close_time,login,symbol,action
0,2336291336,,2022-02-03 19:30:43.000000,16341886,USDCHF,1
1,2353786387,,2022-02-03 19:30:44.000000,16341886,USDCHF,1
2,2444372010,,2022-02-03 19:30:46.000000,16341886,USDZAR,1
3,2444374089,,2022-02-03 19:31:25.000000,16341886,EURGBP,1
4,2444674808,,2022-02-03 19:30:46.000000,16341886,USDZAR,1


In [34]:
len(df_trades_mt5['positionid'].unique())

229179

# Task №1

In [40]:
df_mt5_metric_1 = df_trades_mt5.copy()
df_mt5_metric_1['open_time'] = pd.to_datetime(df_mt5_metric_1['open_time'])
df_mt5_metric_1['close_time'] = pd.to_datetime(df_mt5_metric_1['close_time'])
df_mt5_metric_1['time'] = df_mt5_metric_1['close_time'] - df_mt5_metric_1['open_time']
df_mt5_metric_1 = df_mt5_metric_1.dropna()
df_mt5_metric_1

Unnamed: 0,positionid,open_time,close_time,login,symbol,action,time
458,2497850641,2022-02-03 18:21:47,2022-02-03 19:01:09,18672718,EURUSD,1,0 days 00:39:22
1727,2501420343,2022-02-02 12:00:07,2022-02-02 12:06:27,20458869,EURUSD,1,0 days 00:06:20
1835,2501604145,2022-02-02 12:00:15,2022-02-02 12:00:24,20458869,EURUSD,0,0 days 00:00:09
2245,2501974079,2022-02-01 09:39:58,2022-02-01 09:50:14,20458869,EURUSD,0,0 days 00:10:16
2248,2501974223,2022-02-01 09:40:00,2022-02-01 09:50:30,20458869,EURUSD,0,0 days 00:10:30
...,...,...,...,...,...,...,...
228962,2505927932,2022-02-07 23:12:51,2022-02-07 23:13:33,23154646,US30,0,0 days 00:00:42
228963,2505928025,2022-02-07 23:14:31,2022-02-07 23:48:22,20781124,US30,1,0 days 00:33:51
228964,2505928032,2022-02-07 23:14:39,2022-02-07 23:48:25,20781124,US30,1,0 days 00:33:46
229004,2505928927,2022-02-07 23:30:38,2022-02-07 23:34:29,22837778,US30,0,0 days 00:03:51


In [57]:
df_mt5_metric_1 = df_mt5_metric_1[(df_mt5_metric_1['time'] < pd.Timedelta(minutes=1))]
df_mt5_metric_1 = df_mt5_metric_1.groupby(by='login',as_index = False)['positionid'].count()
df_mt5_metric_1.head(5)

Unnamed: 0,login,positionid
0,11278370,36
1,11421210,65
2,12142323,6
3,13174526,28
4,13174584,28


# Task № 2

In [62]:
df_mt_5_metric_2 = df_trades_mt5.copy()
df_mt_5_metric_2['open_time'] = pd.to_datetime(df_mt_5_metric_2['open_time'])
df_mt_5_metric_2 = df_mt_5_metric_2.dropna(subset=['open_time'])
df_mt_5_metric_2.head(5)

Unnamed: 0,positionid,open_time,close_time,login,symbol,action
458,2497850641,2022-02-03 18:21:47,2022-02-03 19:01:09.000000,18672718,EURUSD,1
548,2498135077,2022-02-03 17:44:57,,18672718,EURUSD,1
636,2498388285,2022-02-03 16:41:37,,18672718,EURUSD,1
1062,2499997501,2022-02-03 15:46:42,,18672718,EURUSD,1
1095,2500074432,2022-02-03 15:46:34,,18672718,EURUSD,1


In [63]:
def check_opposite_trades(x):
    res = x.value_counts()
    if res.shape[0] > 1: # если есть 1 и 0 то все пары это их перемноженное кол-во
        return res.iloc[0] * res.iloc[1]
    return 0 
    
def check_metric_2(df_user: pd.DataFrame):
    df_user = df_user.sort_index() # для применения оконной функции индекс должен возрастать
    return df_user.rolling('30s', min_periods=2)['action'].apply(check_opposite_trades).sum() # оконная функция в 30 секунд включительно
    
df_mt_5_metric_2 = df_mt_5_metric_2.set_index('open_time')
df_mt_5_metric_2 = df_mt_5_metric_2[['login','action']].groupby(by='login', as_index = False).apply(check_metric_2)
df_mt_5_metric_2





Unnamed: 0,login,None
0,11278370,0.0
1,11421210,453.0
2,12067608,1784.0
3,12142323,1753.0
4,12246132,160.0
...,...,...
195,23307997,973.0
196,23315319,35075.0
197,23317398,81.0
198,23336487,643.0


In [64]:
df_mt_5_metric_2 = df_mt_5_metric_2.rename(columns = {df_mt_5_metric_2.columns[1]:'metric_2'})
df_mt_5_metric_2

Unnamed: 0,login,metric_2
0,11278370,0.0
1,11421210,453.0
2,12067608,1784.0
3,12142323,1753.0
4,12246132,160.0
...,...,...
195,23307997,973.0
196,23315319,35075.0
197,23317398,81.0
198,23336487,643.0


# Сохранение результатов mt-5 metric1, metric_2

In [68]:
df_res = pd.merge(df_trades_mt5[['login']].drop_duplicates(), df_mt5_metric_1, how='left', on='login')
df_res = pd.merge(df_res, df_mt_5_metric_2, how='left', on='login')
df_res = df_res.rename(columns = {'positionid':'metric_1'})
df_res

Unnamed: 0,login,metric_1,metric_2
0,16341886,32.0,113.0
1,14048482,5.0,0.0
2,20940019,,319.0
3,15580519,8.0,1.0
4,18007988,9.0,25.0
...,...,...,...
195,23336487,1.0,643.0
196,23342763,31.0,1843.0
197,19323257,34.0,0.0
198,23283270,1.0,0.0


In [88]:
df_res.to_csv('mt_5_metrics_1_2.csv')

# Task №3

In [71]:
df_mt_5_metric_3 = df_trades_mt5.copy()
df_mt_5_metric_3['open_time'] = pd.to_datetime(df_mt_5_metric_3['open_time'])
df_mt_5_metric_3 = df_mt_5_metric_3.dropna(subset=['open_time'])
df_mt_5_metric_3['action'] = df_mt_5_metric_3['action'].astype(int)
df_mt_5_metric_3.head(5)

Unnamed: 0,positionid,open_time,close_time,login,symbol,action
458,2497850641,2022-02-03 18:21:47,2022-02-03 19:01:09.000000,18672718,EURUSD,1
548,2498135077,2022-02-03 17:44:57,,18672718,EURUSD,1
636,2498388285,2022-02-03 16:41:37,,18672718,EURUSD,1
1062,2499997501,2022-02-03 15:46:42,,18672718,EURUSD,1
1095,2500074432,2022-02-03 15:46:34,,18672718,EURUSD,1


In [72]:
windows_boundaries = pd.date_range(pd.to_datetime('2022-02-01 00:00:00'),
                                   pd.to_datetime('2022-02-08 00:00:00'),
                                   freq=pd.Timedelta(seconds=30))

# будем использовать правую границу окна
df_mt_5_metric_3['right_boundary'] = pd.cut(df_mt_5_metric_3['open_time'], windows_boundaries).apply(lambda x: x.right) 
df_mt_5_metric_3

Unnamed: 0,positionid,open_time,close_time,login,symbol,action,right_boundary
458,2497850641,2022-02-03 18:21:47,2022-02-03 19:01:09.000000,18672718,EURUSD,1,2022-02-03 18:22:00
548,2498135077,2022-02-03 17:44:57,,18672718,EURUSD,1,2022-02-03 17:45:00
636,2498388285,2022-02-03 16:41:37,,18672718,EURUSD,1,2022-02-03 16:42:00
1062,2499997501,2022-02-03 15:46:42,,18672718,EURUSD,1,2022-02-03 15:47:00
1095,2500074432,2022-02-03 15:46:34,,18672718,EURUSD,1,2022-02-03 15:47:00
...,...,...,...,...,...,...,...
229174,2505930176,2022-02-07 23:52:28,,20852969,EURUSD,1,2022-02-07 23:52:30
229175,2505930177,2022-02-07 23:52:30,,20852969,EURUSD,1,2022-02-07 23:52:30
229176,2505930376,2022-02-07 23:57:01,,16668054,EURCAD,0,2022-02-07 23:57:30
229177,2505930417,2022-02-07 23:57:49,,20971742,EURUSD,0,2022-02-07 23:58:00


In [73]:
def count_trades_in_window(x):
    res = x.value_counts()
    res.index = res.index.astype(int)
    return res.to_dict()

df_mt_5_metric_3 = df_mt_5_metric_3[['login','symbol','action', 'right_boundary']]
df_mt_5_metric_3 = df_mt_5_metric_3.groupby(by=['login','symbol','right_boundary'],
                                            observed = True,
                                            as_index = False)['action'].apply(count_trades_in_window).reset_index()
df_mt_5_metric_3

Unnamed: 0,login,symbol,right_boundary,level_3,action
0,11278370,GBPUSD,2022-02-01 02:00:30,0,1.0
1,11278370,GBPUSD,2022-02-01 02:00:30,1,
2,11278370,GBPUSD,2022-02-01 02:30:00,0,4.0
3,11278370,GBPUSD,2022-02-01 02:30:00,1,
4,11278370,GBPUSD,2022-02-01 02:55:30,0,3.0
...,...,...,...,...,...
248509,23342763,XTIUSD,2022-02-07 18:09:00,1,1.0
248510,23342763,XTIUSD,2022-02-07 18:29:00,0,
248511,23342763,XTIUSD,2022-02-07 18:29:00,1,1.0
248512,23342763,XTIUSD,2022-02-07 18:55:30,0,1.0


In [74]:
df_mt_5_metric_3 = df_mt_5_metric_3.rename(columns = {'action':'trades_count',
                                                      df_mt_5_metric_3.columns[3]:'action'})
df_mt_5_metric_3

Unnamed: 0,login,symbol,right_boundary,action,trades_count
0,11278370,GBPUSD,2022-02-01 02:00:30,0,1.0
1,11278370,GBPUSD,2022-02-01 02:00:30,1,
2,11278370,GBPUSD,2022-02-01 02:30:00,0,4.0
3,11278370,GBPUSD,2022-02-01 02:30:00,1,
4,11278370,GBPUSD,2022-02-01 02:55:30,0,3.0
...,...,...,...,...,...
248509,23342763,XTIUSD,2022-02-07 18:09:00,1,1.0
248510,23342763,XTIUSD,2022-02-07 18:29:00,0,
248511,23342763,XTIUSD,2022-02-07 18:29:00,1,1.0
248512,23342763,XTIUSD,2022-02-07 18:55:30,0,1.0


In [75]:
df_mt_5_metric_3 = df_mt_5_metric_3.dropna() # уменьшим количество операций для последующей предобработки
df_mt_5_metric_3

Unnamed: 0,login,symbol,right_boundary,action,trades_count
0,11278370,GBPUSD,2022-02-01 02:00:30,0,1.0
2,11278370,GBPUSD,2022-02-01 02:30:00,0,4.0
4,11278370,GBPUSD,2022-02-01 02:55:30,0,3.0
6,11278370,GBPUSD,2022-02-01 12:33:00,0,4.0
8,11278370,GBPUSD,2022-02-01 13:31:00,0,5.0
...,...,...,...,...,...
248504,23342763,XTIUSD,2022-02-07 16:10:00,0,1.0
248506,23342763,XTIUSD,2022-02-07 16:28:00,0,1.0
248509,23342763,XTIUSD,2022-02-07 18:09:00,1,1.0
248511,23342763,XTIUSD,2022-02-07 18:29:00,1,1.0


In [76]:
from tqdm.notebook import tqdm
cmd = 0
dfs_cmd_0 = []
for product in tqdm(df_mt_5_metric_3['symbol'].unique()): # для каждого продукта
    df_cmd_0 = pd.DataFrame(index = df_trades_mt5['login'].unique(), columns = windows_boundaries) # создаем матрицу где индекс пользователь а колонки время
    for user in df_trades_mt5['login'].unique(): # заполняем каждую строку такой матрицы
        user_product = df_mt_5_metric_3[(df_mt_5_metric_3['login'] == user) & 
                                        (df_mt_5_metric_3['symbol'] == product) &
                                        (df_mt_5_metric_3['action'] == cmd)]
        df_cmd_0.loc[user, user_product['right_boundary'].values] = user_product['trades_count'].values
        
    df_cmd_0 = df_cmd_0.fillna(0) 
    dfs_cmd_0.append(df_cmd_0.values)

len(dfs_cmd_0)

  0%|          | 0/55 [00:00<?, ?it/s]


Downcasting object dtype arrays on .fillna, .ffill, .bfill is deprecated and will change in a future version. Call result.infer_objects(copy=False) instead. To opt-in to the future behavior, set `pd.set_option('future.no_silent_downcasting', True)`


Downcasting object dtype arrays on .fillna, .ffill, .bfill is deprecated and will change in a future version. Call result.infer_objects(copy=False) instead. To opt-in to the future behavior, set `pd.set_option('future.no_silent_downcasting', True)`


Downcasting object dtype arrays on .fillna, .ffill, .bfill is deprecated and will change in a future version. Call result.infer_objects(copy=False) instead. To opt-in to the future behavior, set `pd.set_option('future.no_silent_downcasting', True)`


Downcasting object dtype arrays on .fillna, .ffill, .bfill is deprecated and will change in a future version. Call result.infer_objects(copy=False) instead. To opt-in to the future behavior, set `pd.set_option('future.no_silent_downcasting', True

55

In [79]:
from tqdm.notebook import tqdm
cmd = 1
dfs_cmd_1 = []
for product in tqdm(df_mt_5_metric_3['symbol'].unique()): # для каждого продукта
    df_cmd_1 = pd.DataFrame(index = df_trades_mt5['login'].unique(), columns = windows_boundaries) # создаем матрицу где индекс пользователь а колонки время
    for user in df_trades_mt5['login'].unique(): # заполняем каждую строку такой матрицы
        user_product = df_mt_5_metric_3[(df_mt_5_metric_3['login'] == user) & 
                                        (df_mt_5_metric_3['symbol'] == product) &
                                        (df_mt_5_metric_3['action'] == cmd)]
        df_cmd_1.loc[user, user_product['right_boundary'].values] = user_product['trades_count'].values
        
    df_cmd_1 = df_cmd_1.fillna(0) 
    dfs_cmd_1.append(df_cmd_1.values)

len(dfs_cmd_1)

  0%|          | 0/55 [00:00<?, ?it/s]


Downcasting object dtype arrays on .fillna, .ffill, .bfill is deprecated and will change in a future version. Call result.infer_objects(copy=False) instead. To opt-in to the future behavior, set `pd.set_option('future.no_silent_downcasting', True)`


Downcasting object dtype arrays on .fillna, .ffill, .bfill is deprecated and will change in a future version. Call result.infer_objects(copy=False) instead. To opt-in to the future behavior, set `pd.set_option('future.no_silent_downcasting', True)`


Downcasting object dtype arrays on .fillna, .ffill, .bfill is deprecated and will change in a future version. Call result.infer_objects(copy=False) instead. To opt-in to the future behavior, set `pd.set_option('future.no_silent_downcasting', True)`


Downcasting object dtype arrays on .fillna, .ffill, .bfill is deprecated and will change in a future version. Call result.infer_objects(copy=False) instead. To opt-in to the future behavior, set `pd.set_option('future.no_silent_downcasting', True

55

In [80]:
import numpy as np
tensor_cmd_1 = np.dstack(dfs_cmd_1)
tensor_cmd_0 = np.dstack(dfs_cmd_0)
tensor_cmd_1.shape, tensor_cmd_0.shape

((200, 20161, 55), (200, 20161, 55))

In [81]:
tensor_cmd_0 = np.transpose(tensor_cmd_0, (2, 0, 1))  
tensor_cmd_1 = np.transpose(tensor_cmd_1, (2, 1, 0))  

result = np.matmul(tensor_cmd_0, tensor_cmd_1)

result

array([[[120.,   0.,   0., ...,   0.,   0.,   0.],
        [  0.,   0.,   0., ...,   0.,   0.,   0.],
        [  0.,   0.,  28., ...,   0.,   0.,   0.],
        ...,
        [  0.,   0.,   0., ...,   0.,   0.,   0.],
        [  0.,   0.,   0., ...,   0.,   0.,   0.],
        [  0.,   0.,   0., ...,   0.,   0.,   0.]],

       [[  0.,   0.,   0., ...,   0.,   0.,   0.],
        [  0.,   0.,   0., ...,   0.,   0.,   0.],
        [  0.,   0.,  27., ...,   0.,   0.,   0.],
        ...,
        [  0.,   0.,   0., ...,   0.,   0.,   0.],
        [  0.,   0.,   0., ...,   0.,   0.,   0.],
        [  0.,   0.,   0., ...,   0.,   0.,   0.]],

       [[ 77.,   0.,   0., ...,   0.,   0.,   0.],
        [  0.,   0.,   0., ...,   0.,   0.,   0.],
        [  0.,   0.,  13., ...,   0.,   0.,   0.],
        ...,
        [  0.,   0.,   0., ...,   0.,   0.,   0.],
        [  0.,   0.,   0., ...,   0.,   0.,   0.],
        [  0.,   0.,   0., ...,   0.,   0.,   0.]],

       ...,

       [[  0.,   0.,   0

In [82]:
result.shape

(55, 200, 200)

In [83]:
result = np.sum(result, axis=0)
result.shape

(200, 200)

In [84]:
result[np.diag_indices(result.shape[0])] = 0
result


array([[ 0.,  0.,  0., ..., 13.,  0.,  0.],
       [ 0.,  0.,  0., ...,  0.,  0.,  0.],
       [ 0.,  0.,  0., ...,  4.,  0.,  0.],
       ...,
       [13.,  0.,  4., ...,  0., 19., 46.],
       [ 0.,  0.,  0., ..., 19.,  0.,  0.],
       [ 0.,  0.,  0., ..., 46.,  0.,  0.]])

In [85]:
users = df_trades_mt5['login'].unique()

indexes_users = np.where(result > 10)
df_task_3 = pd.DataFrame(columns = ['user_1','user_2'])
df_task_3['user_1'] = [users[i] for i  in indexes_users[0]]
df_task_3['user_2'] = [users[i] for i  in indexes_users[1]]
df_task_3

Unnamed: 0,user_1,user_2
0,16341886,15580519
1,16341886,15107092
2,16341886,14958660
3,16341886,15473639
4,16341886,20458869
...,...,...
13939,22549938,23303573
13940,22549938,23307997
13941,22549938,23317398
13942,22549938,23315319


In [86]:
df_task_3.to_csv('mt_5_metrics_3.csv')

# Extra

In [89]:
query = f'''
    select distinct cnt_login 
    from (
    	select COUNT(distinct login) as cnt_login
    	from hr_vacancies.mt5_deals
    	group by positionid) a
 ''' # проверил что для сделки только один логин


query = f'''
        select * from hr_vacancies.mt5_deals md 
        where entry not in ('0','1','3') 
        ''' # наличие в entry значения "0.1" - подозреваю ошибка в данных

# также посмотрел кол-во маркированных записей и не маркированных. Их кол-во не имеет дизбаланса также для продукта mt-5