# Download replays from Ballchaser website

In [1]:
from typing import Dict, List, Tuple, Union

In [2]:
import datetime
from ballchaser.client import BallChaser

ball_chaser = BallChaser("AWSZ0JiezKKjPVHQFXNQiPImI6YIKobD6eogELef", backoff=True, max_tries=5)

# search and retrieve replay metadata
replays = [
    replay
    for replay in ball_chaser.list_replays(player_name="GarrettG", replay_count=20, replay_date_after=datetime.datetime(2019, 6,1, tzinfo=datetime.timezone.utc))
]

# retrieve replay statistics
replay_stats = [
    ball_chaser.download_replay(replay["id"], 'replays')
    for replay in replays
]

# Create game frames using Carball

Note: you need boxcars-py 0.1.15 which is not available on pypi, so you need to compile it yourself. 

Also, from time to time again Rocket League may change its replay format. If this just stops working, update the cargo crate boxcars, then rebuild boxcars-py (and reinstall from wheel).

In [75]:
import boxcars_py
from typing import Dict, List, Tuple, Union

class BoosterUpdate:
    frame : int
    boost_pad_name : str
    player_name : Union[str, None]
    pickup_value : Union[int, None]

    def __repr__(self) -> str:
        return f'Frame {self.frame}: Pad {self.boost_pad_name} changed by {self.player_name} to {self.pickup_value}'

def get_booster_updates(replay_path: str) -> Dict[int, List[BoosterUpdate]]:
    with open(replay_path, 'rb') as f:
        buf = f.read()
    replay = boxcars_py.parse_replay(buf)

    names : List[str] = replay['names']
    boost_name_ids = [idx for (idx, name) in enumerate(names) if name.startswith('VehiclePickup_Boost_TA_')]

    player_names = [p['Name'] for p in replay['properties']['PlayerStats']]

    current_boost_actor_ids = set()
    actor_id_to_name_id = {}
    booster_updates = []

    instigator_ids = set()
    frame_idx_to_booster_update = dict()
    for frame_idx, frame in enumerate(replay['network_frames']['frames']):
        for new_actor in frame['new_actors']:
            actor_id = new_actor['actor_id']
            name_id = new_actor['name_id']
            if name_id in boost_name_ids:
                current_boost_actor_ids.add(actor_id)
                actor_id_to_name_id[actor_id] = name_id
            else:
                if actor_id in current_boost_actor_ids:
                    current_boost_actor_ids.remove(actor_id)

        for updated_actor in frame['updated_actors']:
            actor_id = updated_actor['actor_id']
            if actor_id in current_boost_actor_ids and 'PickupNew' in updated_actor['attribute']:
                update = BoosterUpdate()
                update.frame = frame_idx
                update.boost_pad_name = names[actor_id_to_name_id[actor_id]]
                update.player_name = str(updated_actor['attribute']['PickupNew']['instigator'])
                instigator_ids.add(updated_actor['attribute']['PickupNew']['instigator'])
                
                update.pickup_value = updated_actor['attribute']['PickupNew']['picked_up']
                
                booster_updates.append(update)
                if frame_idx not in frame_idx_to_booster_update:
                    frame_idx_to_booster_update[frame_idx] = []
                frame_idx_to_booster_update[frame_idx].append(update)
    
    
    actor_id_to_player_name = dict()
    instigator_to_player_name = dict()

    for frame_idx, frame in enumerate(replay['network_frames']['frames']):
        for new_actor in frame['updated_actors']:
            actor_id = new_actor['actor_id']
            if 'attribute' in new_actor and 'String' in new_actor['attribute'] and new_actor['attribute']['String'] in player_names:
                actor_id_to_player_name[actor_id] = new_actor['attribute']['String']
        
        for new_actor in frame['updated_actors']:
            actor_id = new_actor['actor_id']
            if actor_id in instigator_ids and 'attribute' in new_actor and 'ActiveActor' in new_actor['attribute'] and new_actor['attribute']['ActiveActor']['actor'] in actor_id_to_player_name:
                if actor_id in instigator_to_player_name and actor_id_to_player_name[new_actor['attribute']['ActiveActor']['actor']] != instigator_to_player_name[actor_id]:
                    print(f'Overriting instigator id {actor_id} at frame {frame_idx} (prev: {instigator_to_player_name[actor_id]})')
                instigator_to_player_name[actor_id] = actor_id_to_player_name[new_actor['attribute']['ActiveActor']['actor']]
        if frame_idx in frame_idx_to_booster_update:
            for update in frame_idx_to_booster_update[frame_idx]:
                if update.player_name == 'None' or update.player_name == '-1':
                    update.player_name = None
                else:
                    update.player_name = instigator_to_player_name[int(update.player_name)]

    
    return frame_idx_to_booster_update

In [80]:
from rlgym.utils.gamestates import GameState, PhysicsObject, PlayerData
from rlgym.utils.math import euler_to_rotation, rotation_to_quaternion
from pandas import DataFrame, Series
from bisect import bisect_left
from carball.analysis.analysis_manager import AnalysisManager
import numpy as np
import carball
from scipy.spatial import KDTree
from rlgym.utils.common_values import BOOST_LOCATIONS
import pandas as pd

boost_pad_location_tree = KDTree(BOOST_LOCATIONS)

class KeyWrapper:
    def __init__(self, iterable, key):
        self.it = iterable
        self.key = key

    def __getitem__(self, i):
        return self.key(self.it[i])

    def __len__(self):
        return len(self.it)

def get_orange_goal_frames(analysis_manager: AnalysisManager) -> List[int]:
    return [g.frame_number for g in analysis_manager.game.goals if g.player_team]
def get_blue_goal_frames(analysis_manager: AnalysisManager) -> List[int]:
    return [g.frame_number for g in analysis_manager.game.goals if not g.player_team]
def read_physics_object(row: Series, name: str) -> PhysicsObject:
    o = PhysicsObject()
    o.position = np.array([row['pos_x'][name], row['pos_y'][name], row['pos_z'][name]])
    o.linear_velocity = np.array([row['vel_x'][name], row['vel_y'][name], row['vel_z'][name]])
    o.angular_velocity = np.array([row['ang_vel_x'][name], row['ang_vel_y'][name], row['ang_vel_z'][name]])
    rotation_euler = np.array([row['rot_x'][name], row['rot_y'][name], row['rot_z'][name]])
    o.quaternion = euler_to_rotation(rotation_euler)
    return o

def quaternion_multiply(quaternion0, quaternion1):
    """ From https://stackoverflow.com/q/40915069
    
    Return multiplication of two quaternions.

    >>> q = quaternion_multiply([1, -2, 3, 4], [-5, 6, 7, 8])
    >>> numpy.allclose(q, [-44, -14, 48, 28])
    True

    """
    x0, y0, z0, w0 = quaternion0
    x1, y1, z1, w1 = quaternion1
    return np.array((
         x1*w0 + y1*z0 - z1*y0 + w1*x0,
        -x1*z0 + y1*w0 + z1*x0 + w1*y0,
         x1*y0 - y1*x0 + z1*w0 + w1*z0,
        -x1*x0 - y1*y0 - z1*z0 + w1*w0), dtype=np.float64)

def read_physics_object_inverted(row: Series, name: str):
    o = PhysicsObject()
    o.pos = np.array([row['pos_x'][name], row['pos_y'][name], row['pos_z'][name]]) * np.array([-1, -1, 1])
    o.linear_velocity = np.array([row['vel_x'][name], row['vel_y'][name], row['vel_z'][name]]) * np.array([-1, -1, 1])
    o.angular_velocity = np.array([row['ang_vel_x'][name], row['ang_vel_y'][name], row['ang_vel_z'][name]]) * np.array([-1, -1, 1])
    rotation_euler = np.array([row['rot_x'][name], row['rot_y'][name], row['rot_z'][name]])
    o.quaternion = quaternion_multiply(rotation_to_quaternion(euler_to_rotation(rotation_euler)), np.array([0, 0, 0, -1]))


BOOST_FULL : float = 1
BOOST_EMPTY : float = 0

def get_game_states(replay_path : str):
    boost_updates = get_booster_updates(replay_path)
    boost_name_to_index : Dict[str, int] = dict()
    analysis_manager = carball.analyze_replay_file(replay_path)
    player_id_to_index = {p.id.id : idx for (idx, p) in enumerate(analysis_manager.protobuf_game.players)}
    players : Tuple[str, bool] = [(p.name, p.is_orange) for p in analysis_manager.game.players]
    hits : List[Tuple[int, int]] = [(hit.frame_number, player_id_to_index[hit.player_id.id]) for hit in  analysis_manager.protobuf_game.game_stats.hits]
    orange_goal_frames = get_orange_goal_frames(analysis_manager)
    blue_goal_frames = get_blue_goal_frames(analysis_manager)
    df = analysis_manager.get_data_frame()

    last_touch = None
    boost_pads : np.ndarray = np.array([BOOST_FULL] * GameState.BOOST_PADS_LENGTH)

    boost_pad_name_to_idx : Dict[str, int] = dict()

    def get_booster_idx(booster_name : str, player_data : PlayerData):
        # if idx is known for that boost pad, return it
        idx = boost_pad_name_to_idx.get(booster_name, None)
        if idx is not None:
            return idx
        
        # else return closest boost pad idx
        idx = boost_pad_location_tree.query(player_data.car_data.position)[1]
        boost_pad_name_to_idx[booster_name] = idx
        
        return idx
        
    
    for row in df.iterrows():
        frame_idx = int(row[0])
        row = row[1].unstack()

        orange_goals = sum(1 for g in orange_goal_frames if g <= frame_idx)
        blue_goals = sum(1 for g in blue_goal_frames if g <= frame_idx)

        player_objects = []
        player_index = {}
        for name, is_orange in players:
            player = PlayerData()
            player.car_data = read_physics_object(row, name)
            player.inverted_car_data = read_physics_object_inverted(row, name)
            #TODO: player.on_ground
            player.boost_amount = row['boost'][name]
            
            #if not pd.isna(row['boost_collect'][name]):
            #    player_pickups = [p for p in boost_updates if p.player_name == name]
            #    pickup_idx = bisect_left(KeyWrapper(player_pickups, key = lambda c: c.frame_idx), frame_idx - 5)
            #    pickup_frame = player_pickups[pickup_idx].frame_idx
            #    print('frame_idx diff', pickup_frame - frame_idx)
            #    print(f'frame_idx {frame_idx} player {name} picked up boost at {player.car_data.position}')

            player_index[name] = len(player_objects)
            player_objects.append(player)
        
        for pad_update in boost_updates.get(frame_idx, []):
            if pad_update.pickup_value < 255:
                # clear boost pad
                player_name = pad_update.player_name
                assert(player_name is not None)
                player_data = player_objects[player_index[player_name]]
                booster_idx = get_booster_idx(pad_update.boost_pad_name, player_data)
                boost_pads[booster_idx] = BOOST_EMPTY
            else:
                # activate boost pad
                assert(player_data.player_name is None)
                boost_pads[boost_pad_name_to_idx[pad_update.boost_pad_name]] = BOOST_FULL
                

        ball = read_physics_object(row, 'ball')
        touch_index = bisect_left(KeyWrapper(hits, key=lambda c: c[0]), frame_idx)
        last_touch = hits[touch_index - 1][1] if touch_index > 0 else None
        
        state = GameState()
        state.game_type = 0
        state.blue_score = blue_goals
        state.orange_score = orange_goals
        state.ball = ball
        state.last_touch = last_touch
        state.players = player_objects

        state.boost_pads = boost_pads.copy()
        state.inverted_boost_pads = state.boost_pads[::-1]

        yield state

In [3]:
import glob

replay_path = glob.glob("replays/*")[0]

In [49]:
with open(replay_path, 'rb') as f:
    buf = f.read()
replay = boxcars_py.parse_replay(buf)
booster_updates = get_booster_updates(replay_path)

In [81]:
for idx, gs in enumerate(get_game_states(replay_path)):
    pass

Overriting instigator id 34 at frame 4947 (prev: El Ferxxo Mor)
Overriting instigator id 201 at frame 8540 (prev: CAMA62)


Found bot not in bot list
Dropping these columns[('game', 'is_overtime')]
Goal is not shot: frame 3038 by G2 GarrettG
The player never hit the ball during the "carry"
The player never hit the ball during the "carry"
The player never hit the ball during the "carry"


(183.27567214221165, 11)


ValueError: tuple.index(x): x not in tuple