In [None]:
import time

start_time = time.time()

# Train a model using XGBoost

## Imports

In [None]:
# Standard library imports
import os
import random
import math
import logging

# Third-party library imports
import numpy as np
import pandas as pd
import dask.dataframe as dd

# Typing imports
from typing import List, Tuple


In [None]:
# data directory
DATA_DIR = "data"

# parameters
BATCH = 1
EVENT = 24
PULSE_AMOUNT = 200
EXCLUDE_AUXILIARY = True
IS_TRAINING = True
SET = 'train' if IS_TRAINING else 'test'

# logging
LOG_LEVEL = logging.INFO

# Setup logging

In [None]:
# set up logging
logging.basicConfig(filename='info.log', level=LOG_LEVEL)

## Variables

## Functions

In [None]:
def seed_it_all(seed=7):
    """ Attempt to be Reproducible """
    os.environ['PYTHONHASHSEED'] = str(seed)
    random.seed(seed)
    np.random.seed(seed)
    
seed_it_all(10)

### For optimization

In [None]:
import importlib

def reduce_mem_usage(df, verbose=True):
    numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
    start_mem = df.memory_usage().sum() / 1024**2
    for col in df.columns:
        print(f'Optimizing col {col}')
        col_type = df[col].dtypes
        if col_type in numerics:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float16)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)

    end_mem = df.memory_usage().sum() / 1024**2
    print('Memory usage after optimization is: {:.2f} MB'.format(end_mem))
    print('Decreased by {:.1f}%'.format(100 * (start_mem - end_mem) / start_mem))

    return df

def import_data(file: str):
    """create a dataframe and optimize its memory usage"""
    function_name = f"read_{file.split('.')[-1]}"
    function = getattr(pd, function_name)
    df = function(file)
    df = reduce_mem_usage(df)
    return df

def get_event_df(batch_df: dd.DataFrame, sensor_geometry: pd.DataFrame, event_id: int) -> pd.DataFrame:
    """
    Get a DataFrame for a specific event.

    Parameters:
    train_batch_df (pandas.DataFrame): The batch DataFrame.
    sensor_geometry (pandas.DataFrame): The sensor geometry DataFrame.
    event_id (str): The event identifier.

    Returns:
    pandas.DataFrame: A DataFrame containing data for the specified event.
    """
    if EXCLUDE_AUXILIARY:
        batch_df = batch_df[~batch_df['auxiliary']].drop(columns=['auxiliary'])
    
    event_df = batch_df[batch_df['event_id'] == event_id].compute()
    # print('event_df out', event_df.head())
    event_df = pd.merge(
        left=event_df,
        right=sensor_geometry,
        how='inner',
        on='sensor_id'
    )
    
    ## Drop columns that are not needed for prediction
    return event_df.drop(columns=['event_id', 'sensor_id'])

### For Geometry

In [None]:
def cartesian_to_sphere(x: float, y: float, z: float) -> Tuple[float, float]:
    """Maps vector cartesian coordinates (x, y, z) from the origin to spherical angles azimuth and zenith.
    
    See: https://en.wikipedia.org/wiki/Spherical_coordinate_system

    Args:
        x (float): The x-coordinate of the point.
        y (float): The y-coordinate of the point.
        z (float): The z-coordinate of the point.

    Returns:
        tuple[float, float]: The azimuth and zenith angles in radians.
    """
    x2y2 = x**2 + y**2
#     print('cartesian_to_sphere x**2 + y**2', x2y2)
    r = math.sqrt(x2y2 + z**2)
#     print('cartesian_to_sphere math.sqrt(x2y2 + z**2)', r)
    x_dv_py = 0 if x == 0 else x / math.sqrt(x2y2)
    azimuth = math.acos(x_dv_py) * np.sign(y)
#     print('math.acos(x / math.sqrt(x2y2)) * np.sign(y)', azimuth)
    zenith = math.acos(z / r)
#     print('zenith', zenith)
    
#     print('cartesian_to_sphere takes',x,y,z)
#     print('cartesian_to_sphere returns',azimuth, zenith)
    return azimuth, zenith


def sphere_to_cartesian(azimuth: float, zenith: float) -> Tuple[float, float, float]:
    """Map spherical coordinates to cartesian coordinates.
    see: https://stackoverflow.com/a/10868220/4521646
    
    Args:
        azimuth (float): The azimuth angle in radians.
        zenith (float): The zenith angle in radians.

    Returns:
        tuple: The x, y, z vector cartesian coordinates of the point from the origin.
    """
    x = math.sin(zenith) * math.cos(azimuth)
    y = math.sin(zenith) * math.sin(azimuth)
    z = math.cos(zenith)
    return x, y, z


def adjust_sphere(azimuth:float, zenith:float) -> Tuple[float, float]:
    """Adjust azimuth and zenith to be within [-pi, pi]

    Args:
        azimuth (float): The azimuth to adjust
        zenith (float): The zenith to adjust

    Returns:
        float: The adjusted azimuth and zenith
    """
    print('adjust_sphere takes',azimuth, zenith)
    
    if zenith < 0:
        zenith += math.pi
        azimuth += math.pi
    if azimuth < 0:
        azimuth += math.pi * 2
    azimuth = azimuth % (2 * math.pi)
#     print('adjust_sphere returns',azimuth, zenith)
    return azimuth, zenith

### For scoring

In [None]:
def angular_dist_score(az_true:float, zen_true:float, az_pred:float, zen_pred:float):
    '''
    calculate the MAE of the angular distance between two directions.
    The two vectors are first converted to cartesian unit vectors,
    and then their scalar product is computed, which is equal to
    the cosine of the angle between the two vectors. The inverse 
    cosine (arccos) thereof is then the angle between the two input vectors
    
    The lower the angle, the more similar the two vectors are meaning the score is better.
    
    Parameters:
    -----------
    
    az_true : float (or array thereof)
        true azimuth value(s) in radian
    zen_true : float (or array thereof)
        true zenith value(s) in radian
    az_pred : float (or array thereof)
        predicted azimuth value(s) in radian
    zen_pred : float (or array thereof)
        predicted zenith value(s) in radian
    
    Returns:
    --------
    
    dist : float
        mean over the angular distance(s) in radian
    '''
    
    if not (np.all(np.isfinite(az_true)) and
            np.all(np.isfinite(zen_true)) and
            np.all(np.isfinite(az_pred)) and
            np.all(np.isfinite(zen_pred))):
        raise ValueError("All arguments must be finite")
    
    # pre-compute all sine and cosine values
    sa1 = np.sin(az_true)
    ca1 = np.cos(az_true)
    sz1 = np.sin(zen_true)
    cz1 = np.cos(zen_true)
    
    sa2 = np.sin(az_pred)
    ca2 = np.cos(az_pred)
    sz2 = np.sin(zen_pred)
    cz2 = np.cos(zen_pred)
    
    # scalar product of the two Cartesian vectors (x = sz*ca, y = sz*sa, z = cz)
    scalar_prod = sz1*sz2*(ca1*ca2 + sa1*sa2) + (cz1*cz2)
    
    # scalar product of two unit vectors is always between -1 and 1, this is against numerical instability
    # that might otherwise occur from the finite precision of the sine and cosine functions
    scalar_prod =  np.clip(scalar_prod, -1, 1)
    
    # convert back to an angle (in radian)
    return np.average(np.abs(np.arccos(scalar_prod)))

## Load the dataframes

In [None]:
sensor_geometry = import_data(f'{DATA_DIR}/sensor_geometry.csv')

In [None]:

meta_dfd = dd.read_parquet(f'{DATA_DIR}/{SET}_meta.parquet', 
    blocksize=64000000 # = 64 Mb chunks
)

## Test input preparation

In [None]:
test_batch_dfd = dd.read_parquet(f'{DATA_DIR}/{SET}/batch_1.parquet', 
        blocksize=64000000 # = 64 Mb chunks,
    ).reset_index()


test_batch_dfd.head(1)


In [None]:
def make_input_vector_shape(df: pd.DataFrame) -> pd.DataFrame:
    """Corrects the shape of the input vector.

    Args:
        df (pd.DataFrame): The input dataframe not sized.

    Returns:
        pd.DataFrame: The newly sized dataframe that has the correct shape and filled with zeros.
    """
    if len(df) < PULSE_AMOUNT:
        
        blank_df = pd.DataFrame(
                index=range(len(df), PULSE_AMOUNT), columns=df.columns
            ).fillna(0)
        return pd.concat([df, blank_df], ignore_index=True)
        
    elif len(df) > PULSE_AMOUNT:
        return df.head(PULSE_AMOUNT)
        
    else:
        return df

In [None]:
def get_input_vector(df: pd.DataFrame, event_id: int) -> pd.DataFrame:
    """Changes the rows of a dataframe to columns

    Args:
        df (pd.DataFrame): The dataframe to be converted that currently has observations in rows

    Returns:
        pd.DataFrame: A single observation in columns
    """
    df = make_input_vector_shape(df)
    df = df.stack().reset_index()
    df['id'] = df['level_0'].astype(str) + '_' + df['level_1']
    df =  df.drop(columns=['level_0','level_1']).set_index('id')

    return df.T.set_index(pd.Index([event_id]))

In [None]:
event_df = get_event_df(test_batch_dfd, sensor_geometry, 24)
event_df

In [None]:
input_vec = get_input_vector(event_df, 24)
input_vec

## Train the model

In [204]:
import xgboost as xgb
import datetime

av_batch_time_secs = None
av_event_time_secs = None
train_start_time = time.time()

batches = meta_dfd['batch_id'].unique().compute().values

for i, batch_id in enumerate(batches):
    
    print('Processing batch ', batch_id, ' of ', len(batches))
    
    # The batch dataframe to be populated with events
    batch_df = None
    
    batch_dfd = dd.read_parquet(f'{DATA_DIR}/{SET}/batch_{batch_id}.parquet', 
        blocksize=64000000 # = 64 Mb chunks,
    ).reset_index()
    
        
    # Loop through unique event IDs
    events = batch_dfd['event_id'].unique().compute().values
    
    for j, event_id in enumerate(events):
        
        print('Processing event', event_id, ' of ', len(events), ' in batch ', batch_id)
        event_df = get_event_df(batch_dfd, sensor_geometry, event_id)
        
        input_vec =  get_input_vector(event_df, event_id)
        
                # check if a DataFrame exists
        if batch_df is not None:
            # do something with the DataFrame
            batch_df = pd.concat([ batch_df, input_vec])
        else:
            # handle the case where the DataFrame does not exist
            batch_df = input_vec
         
        
        # Time tracking
        current_time = time.time() - train_start_time
        mins = current_time / 60
        print("Total time taken so far : ", round(mins, 2), " Minutes")
        av_event_time_secs = current_time if av_event_time_secs is None else (av_event_time_secs + current_time) / j + 1
        print('Average event time: ', round(av_event_time_secs, 2), " Seconds")
        remaining_events = len(events) - j - 1
        remaining_event_minutes = (av_event_time_secs * remaining_events)
        print(
            'Remaining Events to process for batch: ',
              remaining_events, '. Est time remaining to process: ', 
              round(remaining_event_minutes / 60, 2), " Minutes"
            )
    
    if batch_df is not None:
        # get the current date and time
        now = datetime.datetime.now()
        # create a date string with the format day-month-year-hour:minute
        date_string = now.strftime('%d-%m-%Y-%H:%M')
        # define the file path
        file_path = f'{DATA_DIR}/artifacts/{SET}/{date_string}/{batch_id}.npy'
        # create the parent directories if they don't exist
        parent_dir = os.path.dirname(file_path)
        
        os.makedirs(parent_dir, exist_ok=True)

        batch_df.to_numpy(file_path)
        
        current_time = time.time() - train_start_time
        av_batch_time_secs = current_time if av_batch_time_secs is None else (av_batch_time_secs + current_time) / i + 1
        print('Average batch time: ', round(av_batch_time_secs / 60, 2), " Minutes")
        remaining_batches = len(events) - i - 1
        remaining_batch_hours = (av_batch_time_secs * remaining_batches) / 60 / 60
        print(
            'Remaining Events to process for batch: ',
              remaining_batches, '. Est time remaining to process: ', 
              round(remaining_batch_hours, 2), " Hours"
            )

        
        


Processing batch  1  of  660
Processing event 24  of  200000  in batch  1
Total time taken so far :  0.08  Minutes
Average event time:  4.53  Seconds
Processing event 41  of  200000  in batch  1
Total time taken so far :  0.11  Minutes
Average event time:  12.01  Seconds
Processing event 59  of  200000  in batch  1
Total time taken so far :  0.14  Minutes
Average event time:  11.22  Seconds
Processing event 67  of  200000  in batch  1
Total time taken so far :  0.17  Minutes
Average event time:  8.2  Seconds
Processing event 72  of  200000  in batch  1
Total time taken so far :  0.21  Minutes
Average event time:  6.13  Seconds
Processing event 77  of  200000  in batch  1
Total time taken so far :  0.24  Minutes
Average event time:  5.08  Seconds
Processing event 79  of  200000  in batch  1
Total time taken so far :  0.27  Minutes
Average event time:  4.55  Seconds
Processing event 82  of  200000  in batch  1
Total time taken so far :  0.3  Minutes
Average event time:  4.24  Seconds
Pro

In [None]:
submission = pd.DataFrame(columns=['event_id', 'azimuth', 'zenith'])

for batch_id in meta_dfd['batch_id'].unique().compute().values:
    
    print('Processing batch ', batch_id)
    
    batch_dfd = dd.read_parquet(f'{DATA_DIR}/{SET}/batch_{batch_id}.parquet', 
        blocksize=64000000 # = 64 Mb chunks,
    ).reset_index()
    
        
    # Loop through unique event IDs
    for event_id in batch_dfd['event_id'].unique().compute().values:
        
        print('Processing event', event_id, ' in batch ', batch_id)
        event_df = get_event_df(batch_id,sensor_geometry, event_id)
        
        input_vec =  get_input_vector(event_df, event_id)
        
        submission = pd.concat([ new_row, submission.loc[:]]) 
        current_time = time.time() 
        print("Total time taken so far : ", current_time - start_time, "seconds")

## Make the prediction

In [None]:
batch_ids = meta_dfd['batch_id'].unique().compute().values # type: ignore

In [None]:
end_time = time.time()
total_time = end_time - start_time
total_hours = total_time / 60 / 60
print("Total time taken: ", round(total_hours,2), "Hours")