In [5]:
# -*- coding: utf-8 -*- Line 2
#----------------------------------------------------------------------------
# Created By  : Eungi Cho
# Created Date: 26/05/22
# Updated Date: 30/05/22
# version ='1.0'
# ---------------------------------------------------------------------------

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

plt.style.use('default')

In [16]:
# Price Alarm System Enhancement
# https://www.notion.so/tridge/Price-Alarm-System-Enhancement-61295d8973d6483aaa4fc238eb160e39

import pathlib
print(pathlib.Path().absolute())

df_raw = pd.read_csv('/Users/cho-eungi/Practice/CSV/market_entry_price.csv')
print(df_raw.shape)
print(df_raw.isnull().sum()) # final_unit 제외 모든 컬럼에서 null값이 모두 imputation 되어 있다.
df_raw.head()

/Users/cho-eungi/Practice/Tridge
(10619563, 11)
source_id          0
country            0
market_id          0
product_id         0
entry_id           0
currency           0
final_unit    930027
date               0
price_min          0
price_max          0
price_avg          0
dtype: int64


Unnamed: 0,source_id,country,market_id,product_id,entry_id,currency,final_unit,date,price_min,price_max,price_avg
0,201,South Africa,1487,131,92926374,ZAR,kg,2020-07-20,19.64,21.2,19.956
1,39,India,810,490,41039702,INR,kg,2020-07-06,11.8,12.5,12.2
2,41,India,2188,133,50157058,INR,kg,2020-07-06,50.0,52.7,51.4
3,556,Bangladesh,6581,545,84458922,BDT,kg,2020-07-13,4400.0,4800.0,4600.0
4,150,Turkey,2482,126,58387432,TRY,,2020-07-13,10.0,15.0,11.288


In [None]:
entry_lst = np.sort(df_raw['entry_id'].unique())
np.random.seed(1)
sample_entry = np.random.choice(entry_lst, 1000)

In [14]:
# 테이블 순서 정렬 
df_raw = df_raw.sort_values(by = ['source_id', 'market_id', 'entry_id', 'date'])
# datetime으로 전환
df_raw['date'] = pd.to_datetime(df_raw['date'])
df_raw

Unnamed: 0,source_id,country,market_id,product_id,entry_id,currency,final_unit,date,price_min,price_max,price_avg
706962,1,Canada,44,123,41488155,USD,,2020-07-27,16.47,16.47,16.47
663161,1,Canada,44,123,41488155,USD,,2020-08-03,18.66,18.66,18.66
802368,1,Canada,44,123,41488155,USD,,2020-08-10,18.72,18.72,18.72
887966,1,Canada,44,123,41488155,USD,,2020-08-17,17.40,17.40,17.40
9127387,1,Canada,44,123,41488200,USD,,2020-05-18,22.96,22.96,22.96
...,...,...,...,...,...,...,...,...,...,...,...
2520406,773,Argentina,8878,314,121256237,ARS,kg,2022-03-14,296.00,296.00,296.00
2520408,773,Argentina,8878,314,121256237,ARS,kg,2022-03-21,296.00,296.00,296.00
2520407,773,Argentina,8878,314,121256237,ARS,kg,2022-03-28,296.00,296.00,296.00
2520404,773,Argentina,8878,314,121256237,ARS,kg,2022-04-04,298.00,298.00,298.00


In [15]:
# 매주 월요일 date range 출력: 2020 ~ 2022
date_range = pd.date_range('2020-01-01', '2022-06-30', freq = 'W-MON')
time_df = pd.DataFrame({'date': date_range})

# time_df innerjoin df_raw
df = pd.merge(time_df, df_raw, left_on = 'date', right_on = 'date', how = 'inner')

In [448]:
# 2022-02-01 이후의 데이터를 기준으로 test sample 생성
df = df.loc[(df['date'] > '2022-02-01') & (df['date'] < '2022-05-27')].sort_values(by = ['entry_id', 'date'])
df.set_index(np.arange(len(df)), inplace=True)

# Alarm 1: Abnormal Price Change (rate > 2)

In [449]:
# alarm 1. 1) 특정 entry에서 report된 price_avg 값이 2) 그 전 시점의 price_avg_chg와의 비가 2배 이상

df['price_avg_shift'] = df['price_avg'].shift(1)
# price_avg_shift 와 price_avg 컬럼 모두 null 이 아닌 경우에만 비율 계산. 아니면 0
df['price_avg_chg'] = np.where((df['price_avg'].notnull()) & (df['price_avg_shift'].notnull())
                               , df['price_avg'] / df['price_avg_shift']
                               , 0)
df

Unnamed: 0,date,source_id,country,market_id,product_id,entry_id,currency,final_unit,price_min,price_max,price_avg,price_avg_shift,price_avg_chg
0,2022-02-07,44,Belgium,2100,164,40857050,EUR,,700.000000,700.000000,700.000000,,0.000000
1,2022-02-14,44,Belgium,2100,164,40857050,EUR,,700.000000,700.000000,700.000000,700.0,1.000000
2,2022-02-21,44,Belgium,2100,164,40857050,EUR,,750.000000,750.000000,750.000000,700.0,1.071429
3,2022-02-28,44,Belgium,2100,164,40857050,EUR,,750.000000,750.000000,750.000000,750.0,1.000000
4,2022-03-07,44,Belgium,2100,164,40857050,EUR,,800.000000,800.000000,800.000000,750.0,1.066667
...,...,...,...,...,...,...,...,...,...,...,...,...,...
1791500,2022-05-23,262,Brazil,865,248,133982014,BRL,kg,83.000000,85.000000,90.000000,35.0,2.571429
1791501,2022-05-23,501,Brazil,896,126,133986457,BRL,,35.000000,35.000000,35.000000,90.0,0.388889
1791502,2022-05-23,501,Brazil,683,132,133986696,BRL,,72.500000,72.500000,72.500000,35.0,2.071429
1791503,2022-05-23,495,Taiwan,3588,131,134005487,TWD,kg,7.900000,14.300000,9.800000,72.5,0.135172


In [450]:
df['rank'] = df.groupby(by = 'entry_id')['date'].rank("dense", ascending = True)
df['price_avg_chg_'] = np.where(df['rank'] == 1, np.nan, df['price_avg_chg'])
df.drop(['price_avg_shift', 'price_avg_chg', 'rank'], axis = 1, inplace = True)
df

Unnamed: 0,date,source_id,country,market_id,product_id,entry_id,currency,final_unit,price_min,price_max,price_avg,price_avg_chg_
0,2022-02-07,44,Belgium,2100,164,40857050,EUR,,700.000000,700.000000,700.000000,
1,2022-02-14,44,Belgium,2100,164,40857050,EUR,,700.000000,700.000000,700.000000,1.000000
2,2022-02-21,44,Belgium,2100,164,40857050,EUR,,750.000000,750.000000,750.000000,1.071429
3,2022-02-28,44,Belgium,2100,164,40857050,EUR,,750.000000,750.000000,750.000000,1.000000
4,2022-03-07,44,Belgium,2100,164,40857050,EUR,,800.000000,800.000000,800.000000,1.066667
...,...,...,...,...,...,...,...,...,...,...,...,...
1791500,2022-05-23,262,Brazil,865,248,133982014,BRL,kg,83.000000,85.000000,90.000000,
1791501,2022-05-23,501,Brazil,896,126,133986457,BRL,,35.000000,35.000000,35.000000,
1791502,2022-05-23,501,Brazil,683,132,133986696,BRL,,72.500000,72.500000,72.500000,
1791503,2022-05-23,495,Taiwan,3588,131,134005487,TWD,kg,7.900000,14.300000,9.800000,


In [451]:
df['alarm1'] = np.where(df['price_avg_chg_'] > 2, 1, 0)
df.loc[df['alarm1'] == 1].shape
# df.loc[df['alarm1'] == 1]

(10582, 13)

# Alarm 2: Discontinued price data (consecutive null cnt)

In [452]:
# alarm 2. 1) 특정 entry에서 report된 price_ave 값이 2) 연속적으로 null을 3번 기록한 경우에 해당

# count consecitive nan function
def count_consec_nan(array):
    consec_cnt = array.isnull().astype(int).groupby(array.notnull().astype(int).cumsum()).cumsum()
    return consec_cnt

# entry 별로 price_avg 컬럼에서 consecutive nan을 count하는 새로운 컬럼을 추가
df['consec_null3'] = df.groupby(by = 'entry_id')['price_avg'].transform(count_consec_nan)

# 3이상을 기록할 경우 alarm2 col = 1
df['alarm2'] = np.where(df['consec_null3'] >=3, 1, 0)

In [453]:
df.loc[df['alarm2'] == 1].shape

(0, 15)

# Alarm 3: Abnormal entry

In [454]:
# alarm 3. 1) 특정 entry에서 report된 price_ave 값이 2) 임계치 이상을 벗어난 경우
# 임계치를 whisker = .75 percentile +- 1.5*IQR
def exceed_3sigma(array):
    threshold_min = np.min(array) - 3 * np.std(array)
    threshold_max = np.min(array) + 3 * np.std(array)
    if threshold_min < 0:
        threshold_min = 0
    
    check_col = []
    for i in array:
        if i > threshold_max or i < threshold_min:
            check_col.append(1)
        else:
            check_col.append(0)
    return np.asarray(check_col)

df['alarm3'] = df.groupby(by = 'entry_id')['price_avg'].transform(exceed_3sigma)
df.loc[df['alarm3'] == 1].shape

(199007, 16)

# Alarm 4: Constant price

In [455]:
# alarm 4. 1) 특정 entry에서 report된 price_ave 값이 2) 일정 임계기간동안 같은 값을 반복하는 경우

# shift(1)을 통해 비교 column 생성. But entry_id로 groupby되어 나타나는 첫 번째 shift값은 np.nan으로 변경
df['price_avg_shift'] = df['price_avg'].shift(1)
df['Rank'] = df.groupby('entry_id')['date'].rank("dense", ascending = True)
df['price_avg_shift_'] = np.where(df['Rank'] == 1, np.nan, df['price_avg_shift'])
df.drop(['price_avg_shift', 'Rank'], axis = 1, inplace = True)

# price_avg 와 price_avg_shift_ 간 비교한 후, groupby하여 cumsum을 적용함으로써 
# 같은(same) 값의 consecutive count 실행
df['consec_count_same'] = (df['price_avg'] == df['price_avg_shift_']).groupby(
    (df['price_avg'] != df['price_avg_shift_']).cumsum()
).cumsum()

# comsec count threshoold 8로 설정 (2개월)): entry별, 혹은 market별로 다르게 설정해야할지?
# 1) np.where multiple conditions: https://stackoverflow.com/questions/39109045/numpy-where-with-multiple-conditions
# OR
# 2) for group, subset in df.groupby['market_id'] -> if group == 'certain_market_id' & subset.consec_count_same == 'certain value':
# df.loc[(df['market_id'] == group) & (df['consec_count_same'] == 'certain_value')]['alarm4'] == 1
threshold = 8
df['alarm4'] = np.where(df['consec_count_same'] >= threshold, 1, 0)
df.loc[df['alarm4'] == 1].shape

(83061, 19)

# Alarm 5: Abnormal entry count change

In [456]:
# alarm 5. 1) 특정 소스에서 감지된 엔트리의 총 개수합이 2) 임계값 이상 변화한 경우
df_source = df.sort_values(by = ['date', 'source_id', 'entry_id'])
df_source

Unnamed: 0,date,source_id,country,market_id,product_id,entry_id,currency,final_unit,price_min,price_max,price_avg,price_avg_chg_,alarm1,consec_null3,alarm2,alarm3,price_avg_shift_,consec_count_same,alarm4
252562,2022-02-07,1,Netherlands,2801,97,40890057,USD,kg,18.470000,20.750000,19.610000,,0,0,0,0,,0,0
252563,2022-02-07,1,Netherlands,2801,97,40890057,USD,kg,18.470000,20.750000,19.610000,,0,0,0,0,,0,0
252594,2022-02-07,1,Netherlands,2801,97,40890061,USD,kg,19.450000,21.400000,20.430000,,0,0,0,0,,0,0
252595,2022-02-07,1,Netherlands,2801,97,40890061,USD,kg,19.450000,21.400000,20.430000,,0,0,0,0,,0,0
252606,2022-02-07,1,Netherlands,3894,118,40890241,USD,kg,3.200000,6.090000,5.330000,,0,0,0,0,,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1748522,2022-05-23,772,Germany,4433,870,121239100,EUR,,3.933333,5.166667,4.550000,0.989130,0,0,0,0,4.600,0,0
1748523,2022-05-23,772,Germany,4433,870,121239100,EUR,,3.933333,5.166667,4.550000,1.000000,0,0,0,0,4.550,1,0
1748524,2022-05-23,772,Germany,4433,870,121239100,EUR,,3.933333,5.166667,4.550000,1.000000,0,0,0,0,4.550,2,0
1748556,2022-05-23,772,Germany,4209,174,121239105,EUR,,6.543333,6.683333,6.613333,1.057287,0,0,0,1,6.255,0,0


In [457]:
# 소스에 해당되는 entry를 count한 후, 크롤링되는 시점마다 그 전 시점과의 개수 차이를 계산
# 일정 threshold를 정한 후, 개수의 차이의 절대값이 해당 threshold보다 크면 alarm 5 출력
entry_countBysource = pd.DataFrame(df_source.groupby(by = ['date', 'source_id'])['entry_id'].count()).reset_index(level = (0,1))
entry_countBysource = entry_countBysource.sort_values(by = ['source_id', 'date'])
entry_countBysource['diff'] = entry_countBysource['entry_id'] - entry_countBysource['entry_id'].shift(1)
entry_countBysource['Rank'] = entry_countBysource.groupby(by = 'source_id')['date'].rank("dense", ascending = True)
entry_countBysource['diff_'] = np.where(entry_countBysource['Rank'] == 1, np.nan, entry_countBysource['diff'])
entry_countBysource.drop(['Rank', 'diff'], axis = 1, inplace = True)

In [458]:
threshold_alarm5 = 20
result = entry_countBysource.loc[entry_countBysource['diff_'].abs() > threshold_alarm5]
result_dict = result.to_dict('records')

In [459]:
for record in result_dict:
    df['alarm5'] = np.where((df['date'] == record['date']) & (df['source_id'] == record['source_id']),
                           1, 0)
df.loc[df['alarm5'] == 1].shape

(108, 20)

In [None]:
# class Price_Alarm(object):
#     def __init__(self):
#         pass
    
#     def groupby_array(self, column1, column2):
#         return self.groupby(by = column1)[column2]
    
#     def exceed_3sigma(self, column1, column2):
#         array = self.groupby(by = column1)[column2]

#         threshold_min = np.min(array) - 3 * np.std(array)
#         threshold_max = np.min(array) + 3 * np.std(array)
#         if threshold_min < 0:
#             threshold_min = 0

#         check_col = []
#         for i in array:
#             if i > threshold_max or i < threshold_min:
#                 check_col.append(1)
#             else:
#                 check_col.append(0)
#         return np.asarray(check_col)
    
#     def alarm_3(self):
#         array = self.groupby_array("entry_id", "price_avg")
#         self.