In [1]:
import json
import pandas as pd
import os
from enum import IntEnum
import numpy as np

In [2]:
PATH1 = "../../data/pilot1/game_data/game_logs/"
PATH2 = "../../data/pilot2/game_data/game_logs/"
PATH3 = "../../collected_data/pilot3/game_data/game_logs/"
PATH = PATH3
# P60d520539b147c4acaf44b77_voff_m1_g1_t2021_7_10_1_4
SAVE_PATH = 'test.csv'

In [3]:
# key events to log

def game_summary(data):
    ''' returns lists pertaining to game data '''
    ai_deaths = []
    ai_kills = []
    right_to_left = []
    left_to_right = []
    player_deaths = []

    up_signals = []
    down_signals = []

    ai_side_over = []
    player_side_over = []
    all_over = []

    ai_shots = []

    final_frame = None

    ai_deaths = []
    for event in data['events']:
        f = event['frame']
        if event['killed'] == 'AI':
            ai_deaths.append(f)
        elif event['killed'] == 'PLAYER':
            player_deaths.append(f)
        if event['killer'] == 'AI' and event['type'] == 'SHOT':
            ai_kills.append(f)

    # chose to ignore side changes that happened near AI_agent deaths; not sure if this is the best idea

    last_ai_x = 1000
    last_lives = 4
    last_left_enemies_count = 30
    last_right_enemies_count = 30
    recent_death = False

    right_to_left = []
    left_to_right = []


    for frame in data['frames']:
        up = frame['signal_up']
        down = frame['signal_down']
        ai_x = frame['ai_position']
        frame_num = frame['frame_number']
        left_count = len(frame['enemies_left_positions'])
        right_count = len(frame['enemies_right_positions'])
        shoot = frame['ai_actual_action']['shoot']

        if last_left_enemies_count > 0 and left_count == 0:
            player_side_over.append(frame_num)
        if last_right_enemies_count > 0 and right_count == 0:
            ai_side_over.append(frame_num)
        if last_left_enemies_count + last_right_enemies_count > 0 and left_count == 0 and right_count == 0:
            all_over.append(frame_num)

        if shoot:
            ai_shots.append(frame_num)

        if up:
            up_signals.append(frame_num)
        if down:
            down_signals.append(frame_num)

        for n in ai_deaths:
            if n < frame_num and n > frame_num - 50:
                recent_death = True
        if not recent_death:
            if last_ai_x >= 400 and ai_x < 400:
                right_to_left.append(frame_num)
            if last_ai_x <= 400 and ai_x > 400:
                left_to_right.append(frame_num)
        last_ai_x = ai_x
        last_left_enemies_count = left_count
        last_right_enemies_count = right_count

    final_frame = data['frames'][-1]['frame_number']
    
    return {'ai_deaths': ai_deaths,
            'player_deaths': player_deaths,
            'ai_kills': ai_kills,
            'ai_shots': ai_shots,
            'right_to_left': right_to_left,
            'left_to_right': left_to_right,
            'up_signals': up_signals,
            'down_signals': down_signals,
            'ai_side_over': ai_side_over,
            'player_side_over': player_side_over,
            'all_over': all_over}

In [4]:
def search_recent(l, frame_num, l_search_range=50, right=False, r_search_range=50, left=True):
    ''' search for events in list l that happened within a search_range based from frame_num '''
    for x in l:
        if left and x <= frame_num and x > frame_num - l_search_range:
            return x
        if right and x >= frame_num and x < frame_num + r_search_range:
            return x
    return False

def search_recent_df(df, col, frame_num, search_range=50, right=False, left=True):
    d = None
    if left and right:
        d = df.loc[(df['frame_number'] > frame_num - search_range) & (df['frame_number'] < frame_num + search_range)]
    elif left:
        d = df.loc[(df['frame_number'] > frame_num - search_range) & (df['frame_number'] < frame_num)]
    elif right:
        d = df.loc[(df['frame_number'] < frame_num + search_range) & (df['frame_number'] > frame_num)]
    if len(d.loc[d[col] == True]) > 0:
        return True
    return False
            

In [5]:
class Code(IntEnum):
    SHOT = 1
    RIGHT_TO_LEFT = 2
    LEFT_TO_RIGHT = 3
    AI_DEATH = 4
    INACTIVE = 5 #not shooting
    REACTIVE = 6
    AI_FINISH = 7
    P_FINISH = 8
    FINISH = 9
    TESTING = 10
    MISTAKE = 11
    R_ENEMIES_LOW = 12
    L_ENEMIES_LOW = 13
    IGNORANT_LOW = 14 # on left side but right is low
    P_DEATH = 15
    R_ENEMIES_LOW_2 = 16
    L_ENEMIES_LOW_2 = 17
    IGNORANT_MORE = 18 # on left side but right has more
    
    

In [6]:
def down_cat(frame_num, search_range, df, g):
    """ buckets down signals """
    codes = []
    if search_recent(g['right_to_left'], frame_num, search_range * 1.5, True, 30):
        codes.append(Code.RIGHT_TO_LEFT)
#     if search_recent(g['left_to_right'], frame_num, search_range * 1.5, True, 30):
#         codes.append(Code.LEFT_TO_RIGHT)
    if search_recent(g['ai_deaths'], frame_num, search_range):
        codes.append(Code.AI_DEATH)
    if search_recent(g['player_deaths'], frame_num, search_range):
        codes.append(Code.P_DEATH)
    if not search_recent(g['ai_shots'], frame_num, 75):
        codes.append(Code.INACTIVE)
    if search_recent_df(df, 'enemies_right_low', frame_num, search_range) and (df.at[frame_num,'ai_position'] > 400):
        codes.append(Code.R_ENEMIES_LOW)
#     if search_recent_df(df, 'enemies_left_low', frame_num, search_range):
#         codes.append(Code.L_ENEMIES_LOW)
        
    if (df.at[frame_num,'ai_position'] < 400) and (df.at[frame_num,'player_position'] < 400) and (df.at[frame_num, 'enemies_right_low_2'] == True):
        codes.append(Code.IGNORANT_LOW)
    if (df.at[frame_num,'ai_position'] < 400) and (df.at[frame_num,'player_position'] < 400) and (len(df.at[frame_num,'enemies_left_positions']) < len(df.at[frame_num,'enemies_right_positions'])):
        codes.append(Code.IGNORANT_MORE)
    
    return codes

In [7]:
def up_cat(frame_num, search_range, df, g):
    """ buckets up signals """
    codes = []
    if search_recent(g['right_to_left'], frame_num, search_range * 1.5, True, 30):
        codes.append(Code.RIGHT_TO_LEFT)
    if search_recent(g['left_to_right'], frame_num, search_range * 1.5, True, 30):
        codes.append(Code.LEFT_TO_RIGHT)
    if search_recent(g['ai_kills'], frame_num, search_range):
        codes.append(Code.SHOT)
    x = search_recent(g['ai_shots'], frame_num, search_range)
    if x and not search_recent(g['ai_shots'], x, 75):
        codes.append(Code.REACTIVE)
    
    a_o = search_recent(g['ai_side_over'], frame_num, search_range, True, search_range)
    p_o = search_recent(g['player_side_over'], frame_num, search_range, True, search_range)
    if a_o and df.at[a_o, 'ai_position'] > 400:
        codes.append(Code.AI_FINISH)
    if p_o and df.at[p_o, 'ai_position'] < 400:
        codes.append(Code.P_FINISH)
    if search_recent(g['all_over'], frame_num, search_range, True, search_range):
        codes.append(Code.FINISH)
        
    return codes

In [8]:
# print(df.loc[df['enemies_right_low'] == True])
# print(search_recent_df(df, 'enemies_right_low', 2350))

In [9]:
def predict_codes(PATH, SEARCH_RANGE):
    dfs = []
    for file in sorted(os.listdir(PATH)):
        if not file.startswith('.'):
            with open(PATH + file, 'r') as f:
                data = json.loads(f.read())
            df = pd.json_normalize(data, record_path = ['frames'])

            # add columns for enemies_y data
            df['enemies_right_low'] = [z > 450 for z in [max([x[1] for x in y]) if len(y) > 0 else 0 for y in df['enemies_right_positions']]]
            df['enemies_right_low_2'] = [z > 400 for z in [max([x[1] for x in y]) if len(y) > 0 else 0 for y in df['enemies_right_positions']]]
            df['enemies_left_low'] = [z > 450 for z in [max([x[1] for x in y]) if len(y) > 0 else 0 for y in df['enemies_right_positions']]]
            df['enemies_left_low_2'] = [z > 400 for z in [max([x[1] for x in y]) if len(y) > 0 else 0 for y in df['enemies_right_positions']]]

            # get game data
            game_sum = game_summary(data)

            # make new dataframe and append
            df1 = df.copy()
            df1['player_id'] = data['player_id']
            df1['mode'] = data['mode']
            df1['code'] = ""
            df1.loc[df1['signal_up'] == True, 'code'] = df1.loc[df1['signal_up'] == True]['frame_number'].map(lambda x: up_cat(x, SEARCH_RANGE, df, game_sum))
            df1.loc[df1['signal_down'] == True, 'code'] = df1.loc[df1['signal_down'] == True]['frame_number'].map(lambda x: down_cat(x, SEARCH_RANGE, df, game_sum))
            df1 = df1.loc[df['signal_up'] | df['signal_down']]
            df1['num_code'] = df1['code'].map(lambda x: [int(code) for code in x])
            dfs.append(df1[['player_id', 'mode', 'signal_up', 'signal_down', 'tried_signal_up', 'tried_signal_down', 'frame_number', 'code', 'num_code']][df['signal_up'] | df['signal_down']])

#             print('loading: {} mode{}'.format(data['player_id'], data['mode']))
            
    # finalize data
    d = pd.concat(dfs)
    d.loc[d['signal_down'],['signal_type']] = 'down'
    d.loc[d['signal_up'],['signal_type']] = 'up'
    final_signals_data = d[['player_id', 'mode', 'signal_type', 'frame_number', 'num_code']]
    return final_signals_data

In [10]:
# final_signals_data.to_csv(SAVE_PATH, index=False)

In [11]:
def str_to_array(s):
    """ convert '['1,2,3']' to [1,2,3] """
    no_brackets = s[1:-1]
    return list(map(int, np.array(no_brackets.split(','))))

In [12]:
def per_match(a,b):
    """ 
        check how much of array a is in array b 
        returns percentage of match
    """
    matches = 0
    if 10 in a or 11 in a:
        if 10 in a:
            a.remove(10)
        if 11 in a:
            a.remove(11)
        a.append(999)
    for x in a:
        if x in b:
            matches += 1
    return matches / len(a)
            
    

In [13]:
def per_false_match(a,b):
    """ 
        check how many false positives in b
    """
    err = 0
    for x in b:
        if x not in a:
            err += 1
    return err / len(b)
            

In [14]:
def code_check(df):
    df1 = df.copy()
    # change all 'unknown' to 999
    df1['num_code'] = df1['num_code'].astype(str)
    df1.loc[df1['num_code'] == '[]', ['num_code']] = '[999]'

    df1['code'] = df1['code'].map(str_to_array)
    df1['num_code'] = df1['num_code'].map(str_to_array)

    df1['match_pct'] = df1.apply(lambda x: per_match(x.code, x.num_code), axis=1)
    df1['false_pct'] = df1.apply(lambda x: per_false_match(x.code, x.num_code), axis=1)

    df1['match_any'] = df1['match_pct'].apply(lambda x: 1 if x > 0 else 0)
    df1['false_any'] = df1['false_pct'].apply(lambda x: 1 if x > 0 else 0)
    
    return {'match_any_pct': round(df1.match_pct.mean(), 2),
            'avg_match_pct': round(df1.match_any.mean(), 2),
            'false_any_pct': round(df1.false_any.mean(), 2),
            'avg_false_pct': round(df1.false_pct.mean(), 2)}

In [15]:
# codes_df.to_csv('what.csv', index=False)
# manual_codes.to_csv('what2.csv', index=False)

In [16]:
# get user annotated codes

def run(path, search_range):
    # codes_df = pd.concat([predict_codes(PATH1, 150), predict_codes(PATH2, 150)])
    codes_df = predict_codes(path, search_range)

#     manual_codes = pd.read_csv('my_buckets.csv')
#     manual_codes = manual_codes.reset_index(drop=True)
#     manual_codes = manual_codes.sort_values(['player_id', 'mode', 'frame_number'])

    codes_df = codes_df.reset_index(drop=True)
    codes_df = codes_df.sort_values(['player_id', 'mode', 'frame_number'])

#     codes_df['code'] = manual_codes['code'].values
    
    return codes_df

In [17]:
# %%timeit
stats = {}
for r in [160]:
    print(f'running on {r}')
    result = run(PATH3, r)
    result.to_csv(f'pilot3_data/bucket_results/buckets_{r}.csv')
#     stats[r] = code_check(result)
    


running on 160




KeyError: 'code'

In [None]:
stats