Skip to content

Commit

Permalink
Merge d847e75 into 07297fe
Browse files Browse the repository at this point in the history
  • Loading branch information
DivvyCr committed Aug 4, 2020
2 parents 07297fe + d847e75 commit a30a5e6
Show file tree
Hide file tree
Showing 4 changed files with 281 additions and 11 deletions.
267 changes: 259 additions & 8 deletions carball/analysis/events/bump_detection/bump_analysis.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,288 @@
import itertools
import logging
from typing import Dict

import numpy as np
import pandas as pd
from carball.generated.api.player_pb2 import Player

from carball.generated.api.stats.events_pb2 import Bump

from carball.generated.api.player_id_pb2 import PlayerId
from carball.json_parser.game import Game

from carball.generated.api import game_pb2

logger = logging.getLogger(__name__)

# Decreasing this, risks not counting bumps where one car is directly behind another (driving in the same direction).
# Increasing this, risks counting non-contact close proximity (e.g. one car cleanly jumped over another =/= bump).
PLAYER_CONTACT_DISTANCE = 200

# Needs to be relatively high to account for two cars colliding 'diagonally': /\
MAX_BUMP_ALIGN_ANGLE = 60

# Currently arbitrary:
MIN_BUMP_VELOCITY = 5000

# Approx. half of goal height.
# (could be used to discard all aerial contact as bumps, although rarely an aerial bump WAS, indeed, intended)
AERIAL_BUMP_HEIGHT = 300


# TODO Post-bump analysis // Bump impact analysis.
class BumpAnalysis:
def __init__(self, game: Game, proto_game: game_pb2):
self.proto_game = proto_game

def get_bumps_from_game(self, data_frame: pd.DataFrame):
def get_bumps_from_game(self, data_frame: pd.DataFrame, player_map):
self.create_bumps_from_demos(self.proto_game)

self.analyze_bumps(data_frame)
self.create_bumps_from_player_proximity(data_frame, player_map)

def create_bumps_from_demos(self, proto_game):
for demo in proto_game.game_metadata.demos:
self.add_bump(demo.frame_number, demo.victim_id, demo.attacker_id, True)

def create_bumps_from_player_proximity(self, data_frame: pd.DataFrame, player_map: Dict[str, Player]):
"""
Attempt to find all instances between each possible player combination
where they got within PLAYER_CONTACT_DISTANCE.
Then, add each instance to the API.
NOTES:
Currently, this yields more 'bumps' than there are actually.
This is mostly due to aerial proximity, where a bump was NOT intended or no contact was made.
This also occurs near the ground, where cars flip awkwardly past each other.
"""

# An array of player names to get player combinations; and a dict of player names to their IDs to create bumps.
player_names = []
player_name_to_id = {}
for player in player_map.values():
player_names.append(player.name)
player_name_to_id[player.name] = player.id

# For each player pair combination (nCr), get all frames where they got close and then filter those as bumps.
for player_pair in itertools.combinations(player_names, 2):
players_close_frame_idxs = BumpAnalysis.get_players_close_frame_idxs(data_frame,
str(player_pair[0]),
str(player_pair[1]))

if len(players_close_frame_idxs) > 0:
likely_bumps = BumpAnalysis.filter_bumps(data_frame, player_pair, players_close_frame_idxs)
self.add_non_demo_bumps(likely_bumps, player_name_to_id)
else:
logger.info("Players (" + player_pair[0] + " and " + player_pair[1] + ") did not get close "
"during the match.")

def add_bump(self, frame: int, victim_id: PlayerId, attacker_id: PlayerId, is_demo: bool) -> Bump:
"""
Add a new bump to the proto_game object.
"""
bump = self.proto_game.game_stats.bumps.add()
bump.frame_number = frame
bump.attacker_id.id = attacker_id.id
bump.victim_id.id = victim_id.id
if is_demo:
bump.is_demo = True

def analyze_bumps(self, data_frame:pd.DataFrame):
for bump in self.proto_game.game_stats.bumps:
self.analyze_bump(bump, data_frame)
def add_non_demo_bumps(self, likely_bumps, player_name_to_id):
"""
Add a new bump to the proto_game object.
This method takes an array of likely (filtered) bumps, in the following form:
(frame_idx, attacker_name, victim_name)
and carefully adds them to the proto_game object (i.e. check for demo duplicates).
"""

# Get an array of demo frame idxs to compare later.
demo_frame_idxs = []
for demo in self.proto_game.game_metadata.demos:
demo_frame_idxs.append(demo.frame_number)

# For each bump tuple, if its frame index is not similar to a demo frame index, add it via add_bump().
for likely_bump in likely_bumps:
likely_bump_frame_idx = likely_bump[0]
if not any(np.isclose(demo_frame_idxs, likely_bump_frame_idx, atol=10)):
self.add_bump(likely_bump[0], player_name_to_id[likely_bump[2]], player_name_to_id[likely_bump[1]],
is_demo=False)

@staticmethod
def filter_bumps(data_frame, player_pair, players_close_frame_idxs):
"""
Filter the frames where two players got close - the filtered frames are likely bumps.
The main principle used is the angle between two vectors (aka 'alignment'):
the velocity vector of player A;
the positional vector of the difference between the positions of player B and player A.
Both of these vectors point away from player A, and if the angle between them is small - it is likely that
Player A bumped Player B. (Velocity going 'through' Player B's Position)
Some further checks are done to categorise the bump (i.e. is_aerial_bump(), is_bump_velocity() )
"""
likely_bumps = []

# Split a list of frame indexes into intervals where indexes are within 3 of each other (i.e. consecutive).
players_close_frame_idxs_intervals = BumpAnalysis.get_players_close_intervals(players_close_frame_idxs)

# For each such interval, take (currently only) the first frame index and analyse car behaviour.
for interval in players_close_frame_idxs_intervals:
frame_before_bump = interval[0]

# Calculate both player bump alignments (see comment at method top).
p1_alignment_before = BumpAnalysis.get_player_bump_alignment(data_frame, frame_before_bump,
player_pair[0], player_pair[1])
p2_alignment_before = BumpAnalysis.get_player_bump_alignment(data_frame, frame_before_bump,
player_pair[1], player_pair[0])

# Determine the attacker and the victim (see method for more info).
attacker, victim = BumpAnalysis.determine_attacker_victim(player_pair[0], player_pair[1],
p1_alignment_before, p2_alignment_before)

# Determine if the bump was above AERIAL_BUMP_HEIGHT.
is_aerial_bump = BumpAnalysis.is_aerial_bump(data_frame, player_pair[0], player_pair[1], frame_before_bump)

# Append the current bump data to likely bumps, if there is an attacker and a victim
# and if it wasn't an aerial bump (most often it isn't intended, and there is often awkward behaviour).
if attacker is not None and victim is not None and not is_aerial_bump:
likely_bump = (frame_before_bump, attacker, victim)
likely_bumps.append(likely_bump)

# NOT YET IMPLEMENTED: Check if interval is quite long - players may be in rule 1 :) or might be a scramble.
# BumpAnalysis.analyse_prolonged_proximity(data_frame, interval, player_pair[0], player_pair[1])

return likely_bumps

@staticmethod
def get_player_bump_alignment(data_frame, frame_idx, p1_name, p2_name):
"""
Calculate and return the angle between:
the velocity vector of player A;
the positional vector of the difference between the positions of player B and player A.
"""

# Get the necessary data from the DataFrame at the given frame index.
p1_vel_df = data_frame[p1_name][['vel_x', 'vel_y', 'vel_z']].loc[frame_idx]
p1_pos_df = data_frame[p1_name][['pos_x', 'pos_y', 'pos_z']].loc[frame_idx]
p2_pos_df = data_frame[p2_name][['pos_x', 'pos_y', 'pos_z']].loc[frame_idx]

# Get the distance vector, directed from p1 to p2.
# Then, convert it to a unit vector.
pos1_df = p2_pos_df - p1_pos_df
pos1 = [pos1_df.pos_x, pos1_df.pos_y, pos1_df.pos_y]
unit_pos1 = pos1 / np.linalg.norm(pos1)

# Get the velocity vector of p1.
# Then, convert it to a unit vector.
vel1 = [p1_vel_df.vel_x, p1_vel_df.vel_y, p1_vel_df.vel_z]
unit_vel1 = vel1 / np.linalg.norm(vel1)

# Find the angle between the positional vector and the velocity vector.
# NOTE: This is currently converted to DEGREES, not sure if this is bad..? ( - DivvyC)
ang = (np.arccos(np.clip(np.dot(unit_vel1, unit_pos1), -1.0, 1.0))) * 180 / np.pi
return ang

@staticmethod
def get_players_close_frame_idxs(data_frame, p1_name, p2_name):
"""
For a pair of players, find all frame indexes where they got within PLAYER_CONTACT_DISTANCE of each other.
Note that they did NOT necessarily make contact.
"""

# Separate the positional data of each given player from the full DataFrame and lose the NaN value rows.
p1_pos_df = data_frame[p1_name][['pos_x', 'pos_y', 'pos_z']].dropna(axis=0)
p2_pos_df = data_frame[p2_name][['pos_x', 'pos_y', 'pos_z']].dropna(axis=0)

# Calculate the vector distances between the players, and store them as a pd.Series (1D DataFrame).
distances = (p1_pos_df.pos_x - p2_pos_df.pos_x) ** 2 + \
(p1_pos_df.pos_y - p2_pos_df.pos_y) ** 2 + \
(p1_pos_df.pos_z - p2_pos_df.pos_z) ** 2
distances = np.sqrt(distances)

# Only keep values < PLAYER_CONTACT_DISTANCE (see top of class).
players_close_series = distances[distances < PLAYER_CONTACT_DISTANCE]
# Get the frame indexes of the values (as an ndarray).
players_close_frame_idxs = players_close_series.index.to_numpy()
return players_close_frame_idxs

@staticmethod
def get_players_close_intervals(players_close_frame_idxs):
"""
Separate a list of frame indexes into intervals with consecutive frame indexes.
E.g. [3, 4, 5, 7, 19, 21, 23, 24, 57] is turned into [[3, 4, 5, 7], [21, 23, 24], [57]]
"""

all_intervals = []
interval = []
for index, frame_idx in enumerate(players_close_frame_idxs):
diffs = np.diff(players_close_frame_idxs)
interval.append(frame_idx)
if index >= len(diffs) or diffs[index] >= 3:
all_intervals.append(interval)
interval = []
return all_intervals

@staticmethod
def determine_attacker_victim(p1_name, p2_name, p1_alignment, p2_alignment):
"""
Try to 'guesstimate' the attacker and the victim by comparing bump alignment angles.
If both bump alignments are above MAX_BUMP_ALIGN_ANGLE, both values are None (no solid attacker/victim)
If both bump alignments are within 45deg of each other, both values are None (both attackers)
:return: A tuple in the form (Attacker, Victim) or (None, None).
"""

if p1_alignment < MAX_BUMP_ALIGN_ANGLE or p2_alignment < MAX_BUMP_ALIGN_ANGLE:
if abs(p1_alignment - p2_alignment) < 45:
return None, None
elif p1_alignment < p2_alignment:
return p1_name, p2_name
elif p2_alignment < p1_alignment:
return p2_name, p1_name

return None, None

@staticmethod
def analyse_prolonged_proximity(data_frame, interval, p1_name, p2_name):
# TODO Redo this to do some proper analysis.
if len(interval) > 10:
print(" > Scramble between " + p1_name + " and " + p2_name)
# NOTE: Could try analysing immediate post-bump effects.
# elif len(interval) >= 5:
# frame_after_bump = interval[len(interval) - 1]
# p1_alignment_after = BumpAnalysis.get_player_bump_alignment(data_frame, frame_after_bump,
# p1_name, p2_name)
# p2_alignment_after = BumpAnalysis.get_player_bump_alignment(data_frame, frame_after_bump,
# p2_name, p1_name)

@staticmethod
def is_aerial_bump(data_frame: pd.DataFrame, p1_name: str, p2_name: str, at_frame: int):
p1_pos_z = data_frame[p1_name].pos_z.loc[at_frame]
p2_pos_z = data_frame[p2_name].pos_z.loc[at_frame]
if all(x > AERIAL_BUMP_HEIGHT for x in [p1_pos_z, p2_pos_z]):
# if all(abs(y) > 5080 for y in [p1_pos_y, p2_pos_y]):
# print("Backboard bump?")
return True
else:
return False

@staticmethod
def is_bump_alignment(bump_angles):
# Check if all bump alignment angles in the first half of the interval are above MAX_BUMP_ALIGN_ANGLE.
if all(x > MAX_BUMP_ALIGN_ANGLE for x in bump_angles):
return False
else:
return True

def analyze_bump(self, bump: Bump, data_frame:pd.DataFrame):
frame_number = bump.frame_number
@staticmethod
def is_bump_velocity(data_frame: pd.DataFrame, p1_name: str, p2_name: str, at_frame: int):
p1_vel_mag = np.sqrt(data_frame[p1_name].vel_x.loc[at_frame] ** 2 +
data_frame[p1_name].vel_y.loc[at_frame] ** 2 +
data_frame[p1_name].vel_z.loc[at_frame] ** 2)
p2_vel_mag = np.sqrt(data_frame[p2_name].vel_x.loc[at_frame] ** 2 +
data_frame[p2_name].vel_y.loc[at_frame] ** 2 +
data_frame[p2_name].vel_z.loc[at_frame] ** 2)
# Check if initial player velocities are below MIN_BUMP_VELOCITY.
if all(x < MIN_BUMP_VELOCITY for x in [p1_vel_mag, p2_vel_mag]):
return False
else:
return True
2 changes: 1 addition & 1 deletion carball/analysis/events/event_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def create_bumps(self, game: Game, proto_game: game_pb2.Game, player_map: Dict[s
data_frame: pd.DataFrame):
logger.info("Looking for bumps.")
bumpAnalysis = BumpAnalysis(game=game, proto_game=proto_game)
bumpAnalysis.get_bumps_from_game(data_frame)
bumpAnalysis.get_bumps_from_game(data_frame, player_map)
logger.info("Found %s bumps.", len(proto_game.game_stats.bumps))

def create_dropshot_events(self, game: Game, proto_game: game_pb2.Game, player_map: Dict[str, Player]):
Expand Down
16 changes: 16 additions & 0 deletions carball/tests/stats/bump_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from carball.tests.utils import get_raw_replays, run_analysis_test_on_replay

from carball.analysis.analysis_manager import AnalysisManager


class Test_Bumps:
def test_calculate_bumps_correctly(self, replay_cache):
def test(analysis: AnalysisManager):
proto_game = analysis.get_protobuf_data()
count_bumps = 0
for i in proto_game.game_stats.bumps:
if not i.is_demo:
count_bumps += 1
assert count_bumps == 3

run_analysis_test_on_replay(test, get_raw_replays()["3_BUMPS"], cache=replay_cache)
7 changes: 5 additions & 2 deletions carball/tests/stats/demo_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ class Test_Demos:
def test_calculate_demos_correctly(self, replay_cache):
def test(analysis: AnalysisManager):
proto_game = analysis.get_protobuf_data()
bumps = proto_game.game_stats.bumps
assert len(bumps) == 1
count_demo_bumps = 0
for i in proto_game.game_stats.bumps:
if i.is_demo:
count_demo_bumps += 1
assert count_demo_bumps == 1

run_analysis_test_on_replay(test, get_raw_replays()["1_DEMO"], cache=replay_cache)

0 comments on commit a30a5e6

Please sign in to comment.