/
process_data.py
380 lines (321 loc) · 12.3 KB
/
process_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
from variables import *
from utility import *
import sqlalchemy
from sqlalchemy.pool import NullPool
import datetime
import time as time_
import requests
import json
import pandas as pd
import urllib3
urllib3.disable_warnings()
def connection2(database_entry):
'''
Parameters
----------
database_entry : information to connect to database
Returns
-------
conn : connector to database
engine : engine to database
'''
engine=sqlalchemy.create_engine(database_entry,poolclass=NullPool)
conn=engine.connect()
return conn,engine
def update_player(df_player):
'''
Parameters
----------
df_player : dataframe to upsert into database-> table player
Returns
-------
None.
'''
conn,engine=connection2(database_entry)
df_player.to_sql('player_copy',conn,if_exists='replace',index=False)
conn.execute("""
INSERT INTO player
SELECT *
FROM player_copy
ON CONFLICT ON CONSTRAINT player_un
DO UPDATE
SET possible_steam_id=EXCLUDED.possible_steam_id,
possible_smurf_id=EXCLUDED.possible_smurf_id,
possible_smurf_game_id=EXCLUDED.possible_smurf_game_id
""")
conn.close()
def update_stream(df):
'''
Parameters
----------
df : dataframe to upsert into database -> table stream
Returns
-------
None.
'''
conn,engine=connection2(database_entry)
df.to_sql('stream_copy', conn, if_exists='replace', index = False)
conn.execute("""
INSERT INTO stream
SELECT *
FROM stream_copy
ON CONFLICT ON CONSTRAINT stream_un
DO UPDATE
SET processed=EXCLUDED.processed,game_id=EXCLUDED.game_id,player_id=EXCLUDED.player_id
""")
conn.close()
def processed_cleaner(days=10):
'''
Parameters
----------
days : how many day in the past we clean data to allow a new reprocessing
Returns
-------
None.
'''
time=datetime.datetime.now()-datetime.timedelta(days=days)
conn,engine=connection2(database_entry)
df=pd.read_sql_query(f"select * from stream where play=True and processed=True and game_id='' and time>'{time}'",conn)
if len(df)==0:
conn.close()
return
df['game_id']='-1'
df['player_id']='-1'
df['processed']=False
df.to_sql('stream_copy2', conn, if_exists='replace', index = False)
conn.execute("""
INSERT INTO stream
SELECT *
FROM stream_copy2
ON CONFLICT ON CONSTRAINT stream_un
DO UPDATE
SET processed=EXCLUDED.processed,game_id=EXCLUDED.game_id,player_id=EXCLUDED.player_id
""")
conn.close()
def combine_game_hero(game_hero1,pos_hero1,game_hero2,pos_hero2):
'''
Parameters
----------
game_hero1 : list of up to 10 heroes id in slot order from left to right
pos_hero1 : list of the up to 10 index position of the heroes
game_hero2 : list of up to 10 heroes id in slot order from left to right
pos_hero2 : list of the up to 10 index position of the heroes
Returns
-------
game_hero: list of up to 10 heroes id in slot order from left to right, either game_hero1 if combine==False
or combination of game_hero1&2 if combine=True
pos_hero: list of the up to 10 index position of the heroes, either pos_hero1 if combine==False
or combination of pos_hero1&2 if combine=True
combine : if combination sucessful -> True else False
'''
combine=True
dico1={y:x for x,y in zip(game_hero1,pos_hero1)}
dico2={y:x for x,y in zip(game_hero2,pos_hero2)}
game_hero=[-1]*10
pos_hero=pos_hero1
for x in sorted(set(pos_hero1+pos_hero2)):
if x in dico1 and x in dico2:
if dico1[x]==dico2[x]:
game_hero[x]=dico1[x]
else:
combine=False
return game_hero1,pos_hero1,combine
elif x in dico1:
game_hero[x]=dico1[x]
elif x in dico2:
game_hero[x]=dico2[x]
pos_hero.append(x)
pos_hero.sort()
game_hero=[x for x in game_hero if x!=-1]
return game_hero,pos_hero,combine
def compare_match(game_info,game_hero1,pos_hero1,time):
game_time=datetime.datetime.fromtimestamp(game_info['start_time'])
if abs(time-game_time)>datetime.timedelta(minutes=360):
#6 hours margin to match rare duplicate game; long pause + long game should be way shorter than that
return False
game_hero=game_info['teama']+game_info['teamb']
lenn=len(pos_hero1)
for i in range(lenn):
pos=pos_hero1[i]
if game_hero[pos]!=game_hero1[i]:
return False
return True
def find_player_id(game_id,stratz_token,stratz_url):
'''
Parameters
----------
game_id : game_id
stratz_token : stratz token
stratz_url : url to connect to stratz servers
Returns
-------
set_player : set of player_id in the game
'''
headers = {"Authorization": f"Bearer {stratz_token}"}
query=f"{{match(id:{game_id}){{players{{steamAccountId}}}}}}"
req = requests.post(stratz_url, json={"query":query}, headers=headers)
json_results=json.loads(req.content.decode('utf-8'))['data']['match']['players']
set_player={int(x['steamAccountId']) for x in json_results}
return set_player
def get_match(hero_list,pos_list,time_out=10):
'''
Parameters
----------
hero_list : list of up to 10 heroes id in slot order from left to right
pos_list : list of the up to 10 index position of the heroes
time_out : seconds before timeout the connection
Returns
-------
json_results : all matches info that match with that combination of heroes
'''
url='https://api.opendota.com/api/findMatches?'
teama='&'.join(f'teamA={x}' for x,y in zip(hero_list,pos_list) if y<5)
teamb='&'.join(f'teamB={x}' for x,y in zip(hero_list,pos_list) if y>=5)
url+=teama+"&"+teamb
http =urllib3.PoolManager()
req=http.request('GET',url=url,timeout=time_out)
json_results=json.loads(req.data.decode('utf-8'))
return json_results
def process_data(df):
'''
Parameters
----------
df : dataframe from database -> table player
Returns
-------
df : dataframe processed
'''
lenn=len(df)
i=0
while True:
while i<lenn:
if df.iloc[i]['play']==True:
break
else:
df.loc[[i],'processed']=True
i+=1
if i>=lenn:
break
lst_indice=[i]
channel_name=df.iloc[i]['channel_name']
time=df.iloc[i]['time']
game_hero1,pos_hero1=get_hero_id(df.iloc[i].game_hero,df.iloc[i].pos_hero)
combine=True
i+=1
while i<lenn:
if df.iloc[i]['play']==False:
indices_df=df.iloc[[i]].index
df.loc[indices_df,'processed']=True
break
if df.iloc[i]['channel_name']!=channel_name:
break
time_temp=df.iloc[i]['time']
if time_temp-time>datetime.timedelta(minutes=19):
# 19->data taken in windows of 10 min (shorter to longer is 1 to 19 min)
break
game_hero2,pos_hero2=get_hero_id(df.iloc[i].game_hero,df.iloc[i].pos_hero)
game_hero1,pos_hero1,combine=combine_game_hero(game_hero1,pos_hero1,game_hero2,pos_hero2)
if combine==False:
break
lst_indice.append(i)
time=time_temp
i+=1
set_possible_match=set()
set_player_id=set()
if len(lst_indice)<=1:
indices_df=df.iloc[lst_indice].index
df.loc[indices_df,'processed']=True
continue#fail check need at least 2 detections in a row to consider the game
try:
time_.sleep(1)#needed without a payed key from opendota; else timeout
game_info=get_match(game_hero1,pos_hero1,20)
for x in game_info:
match_match=compare_match(x,game_hero1,pos_hero1,time)
if match_match==True:
set_possible_match.add(x['match_id'])
for game_id in set_possible_match:
players=find_player_id(game_id,stratz_token,stratz_url)
set_player_id=set_player_id.union(players)
game_id_val=':'.join(list(map(str,set_possible_match)))
indices_df=df.iloc[lst_indice].index
df.loc[indices_df,'game_id']=game_id_val
df.loc[indices_df,'player_id']=':'.join(list(map(str,set_player_id)))
df.loc[indices_df,'processed']=True
except:
pass
return df
def associate_player_id(database_entry=database_entry,df=False,hours=6):
'''
Parameters
----------
database_entry : information to connect to database
df : dataframe from database -> table stream, if False will look for it automatically
hours : if df==False how many hours in the recent past are ignore to create df
Returns
-------
None. -> will directly update the tables stream & player in the database
'''
if not type(df)==pd.core.frame.DataFrame:
time=datetime.datetime.now()-datetime.timedelta(hours=hours)
conn,engine=connection2(database_entry)
df=pd.read_sql_query(f"select * from stream where processed=False and time<'{time}'",conn)
conn.close()
else:
df['time']=pd.to_datetime(df['time'])
df=df.sort_values(['channel_name','time'])
df=df.reset_index(drop=True)
df=process_data(df)
#load table from database (can optimize by loading only channel id in df)
conn,engine=connection2(database_entry)
df_player=pd.read_sql_query("select * from player",conn)
conn.close()
lenn=len(df)
for i in range(lenn):
channel_name,game_id,player_id=df.iloc[i][['channel_name','game_id','player_id']]
if player_id=='-1' or game_id=='':
continue
if not channel_name in df_player['channel_name'].values:
df_player.loc[len(df_player.index)]=[channel_name,player_id,'','']
continue
pos=df_player.index[df_player['channel_name'] ==channel_name].tolist()
set_player_id=set(df_player[df_player['channel_name']==channel_name]['possible_steam_id'].iloc[0].split(':'))
if game_id in df_player['possible_smurf_game_id'][pos].iloc[0].split(':'):
continue
set_player_id_next_game=set(player_id.split(':'))
set_player_id_temp=set_player_id.intersection(set_player_id_next_game)
if len(set_player_id_temp)==0:
df_player.loc[pos,'possible_smurf_game_id']+=game_id+':'
df_player.loc[pos,'possible_smurf_id']+=player_id+':'
else:
df_player.loc[pos,'possible_steam_id']=':'.join(map(str,set_player_id_temp))
update_stream(df)
update_player(df_player)
#update the 2 tables of database
def find_smurf_user(df=False):
'''
Parameters
----------
df : dataframe containing only twitch_channel with smurfing evidence (in case already loaded)
Returns
-------
df : dataframe containing only twitch_channel with smurfing evidence
'''
if not type(df)==pd.core.frame.DataFrame:
conn,engine=connection2(database_entry)
df=pd.read_sql_query("select * from player where possible_smurf_id<>''",conn)
conn.close()
df['most_probable_smurf_id']=''
lenn=len(df)
for i in range(lenn):
lst_name=df['possible_smurf_id'].iloc[i].split(':')[:-1]
dico_name=dict()
for x in lst_name:
if x not in dico_name:
dico_name[x]=1
else:
dico_name[x]+=1
max_val=max(dico_name.values(),key=lambda x:int(x))
maxkeyarray = [key for key in dico_name if int(dico_name[key]) == max_val]
if len(maxkeyarray)==1:
df['most_probable_smurf_id'].iloc[i]=maxkeyarray[0]
return df