# ⚡🧊⚡[LB 1.183] Polar Lightning
In this notebook, I cover a number of strategies and optimization techniques:

### Strategies:
1. Dual time-weighted center of charge: A simple way to calculate best fit line using all 5 dimensions (x,y,z,time,charge): see Section 1. **LB 1.214**. Train Batch 1: 1.21728. <br>
 b. Similarly: SolverWorld discusses [here][1] and implements [here][2] a least squares algorithm using 4 dimensions (x,y,z,time): LB 1.214. Train Batch 1: 1.21724. <br>
 c. UPDATE: It turns out that this least squares formula is **exactly** equivalent to the time-weighted formula for #1, unweighted by 'charge'. After applying a weighted version of the least squares algorithm, it gets exactly the same results as my separately derived formula. I keep both variants in my notebook to summarize and compare. And the high-level description of each is very different, even though the formula ends up identical.
2. Find best points: a baseline method for labeling auxiliary column ourselves. I first discussed this concept [here][3], and will explain more about this strategy in Section 2. **LB 1.184**. Train Batch 1: 1.18865.
3. Ensemble of charge weighted and unweighted: **LB 1.183**. Train Batch 1: 1.18753.
4. Small optimization on top of #2: use 1/100th weight for aux=False points not selected with 'best points' algorithm. **LB 1.182??**. Train Batch 1: 1.18623.

### Optimization Techniques
1. Avoid for loops or groupby-apply where possible. Gets the 1.214 baseline from around 1 hour to about 2 minutes, a 30x improvement. Note: All times are in terms of submitting and scoring a notebook against the full test dataset. Highlighted in Section 1.
2. To optimize labeling auxiliary columns, use pure numpy with minimized input/output data size for the unfortunate 'for each event' part. Gets the 1.184 baseline from ~ an hour to about 8 minutes, a 6x improvement. Part of Section 2.
3. Try using Polars instead of Pandas! This is my first time using it, please join me in exploring what it can do! See Section 3. Gets the 1.214 baseline from about 120 seconds to about 100 seconds, even with a 35 second pip install cost. So about 2x faster. No real improvement, though, on overall execution time when finding best points, still about 8 minutes.

# Credits
Thanks to many Kagglers who have shared ideas. I shamelessly stole the Notebook intro formatting from cdeotte - for example [here][4]. As credited above, SolverWorld for summarizing and highlighting the least squares algorithm, which I added for both comparison and for ensembling. And many people contributed to my understanding of the physics of IceCube neutrino detection.

[1]: https://www.kaggle.com/competitions/icecube-neutrinos-in-deep-ice/discussion/381747
[2]: https://www.kaggle.com/code/solverworld/icecube-neutrino-path-least-squares-1-214
[3]: https://www.kaggle.com/competitions/icecube-neutrinos-in-deep-ice/discussion/379677
[4]: https://www.kaggle.com/code/cdeotte/candidate-rerank-model-lb-0-575

<br>

# Notes
Below are notes about versions:
* **Version 1: LB 1.214** Baseline LB submit for center of charge algorithm. Same code as V3.
* **Version 2: LB 1.184** Baseline LB submit for find best points optimization. Same code as V3.
* **Versions 3-5: LB 1.183** LB submit for ensembling charge-weighted with unweighted. Initial public version with multiple strategies and optimizations.
* **Version 6: est LB 1.182** (based on train batch improvement of ~0.0013) LB submit for adding AUX_FALSE_WEIGHT=0.01, a new hyperparameter. Introduces weighting instead of binary point selection.
* **Version 7** Stay tuned for more versions...

# Configuration Parameters
In this notebook, and in my normal pathfinding process, I try out a variety of methods. In these sorts of rapid iterative prototyping environments, I often use global switches to quickly and easily maintain multiple approaches.

In [1]:
## Configuration parameters
# MODE = 'train'
MODE = 'test'

# USE_POLARS = False
USE_POLARS = True

# TRAIN_MAX_EVENTS = 20000
TRAIN_MAX_EVENTS = None
TRAIN_BATCH_START = 1
TRAIN_N_BATCHES = 1

## I pulled in one piece of older code to demonstrate "before and after".
## Set to True (and USE_POLARS=False) if interested in seeing the difference.
USE_UNOPTIMIZED = False


#### HYPERPARAMETERS ####

## For setting auxiliary = False
## Hand-tuned and hand-validated, I mostly used batches 100-105, and probably early on also touched batch 1.
## TODO: revisit the deep core logic now that I've learned about the deep veto layer: https://www.kaggle.com/competitions/icecube-neutrinos-in-deep-ice/discussion/381702
FIND_BEST_POINTS = True
MIN_PRIMARY_DATAPOINTS = 2
MAX_Z = 3
MAX_DEEP_Z = 1
MAX_T = 350
MAX_DEEP_T = 180
if USE_POLARS:
    ## Only implmented in polars version.
    AUX_FALSE_WEIGHT = 0.01

## Ensemble and algorithm selection parameters.
## First weight is center of charge algorithm introduced in this notebook.
## Second weight is the unweighted version. Which turns out to be the least-squares algorithm: https://www.kaggle.com/competitions/icecube-neutrinos-in-deep-ice/discussion/381747
USE_ENSEMBLE = True
WEIGHTS = [0.58, 0.42]
if not USE_ENSEMBLE and not USE_POLARS:
    ALGORITHM = 'center of charge'
#     ALGORITHM = 'least squares'
    if ALGORITHM == 'least squares':
        USE_WEIGHTED_LEAST_SQUARES = False


## Constants
INPUT_DIR = '/kaggle/input/icecube-neutrinos-in-deep-ice'

## Basic configuration override logic
if MODE == 'test':
    TRAIN_MAX_EVENTS = None
if USE_ENSEMBLE:
    USE_WEIGHTED_LEAST_SQUARES = False

# Imports and Helper Functions

In [2]:
if USE_POLARS:
    try:
        import polars as pl
    except:
        print('Installing polars, please wait about 35 seconds...')
        !pip install /kaggle/input/polars01516/polars-0.15.16-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
        import polars as pl

In [3]:
import numpy as np
import pandas as pd
import math
import time
import gc
from tqdm import tqdm
tqdm.pandas()

In [4]:
## Condensed for space. See here for expanded original version: https://www.kaggle.com/code/sohier/mean-angular-error
def angular_dist_score(az_true, zen_true, az_pred, zen_pred):
    if not (np.all(np.isfinite(az_true)) and
            np.all(np.isfinite(zen_true)) and
            np.all(np.isfinite(az_pred)) and
            np.all(np.isfinite(zen_pred))):
        raise ValueError("All arguments must be finite")
    sa1 = np.sin(az_true)
    ca1 = np.cos(az_true)
    sz1 = np.sin(zen_true)
    cz1 = np.cos(zen_true)
    sa2 = np.sin(az_pred)
    ca2 = np.cos(az_pred)
    sz2 = np.sin(zen_pred)
    cz2 = np.cos(zen_pred)
    scalar_prod = sz1*sz2*(ca1*ca2 + sa1*sa2) + (cz1*cz2)
    scalar_prod =  np.clip(scalar_prod, -1, 1)
    return np.average(np.abs(np.arccos(scalar_prod)))

In [5]:
## TODO: It would be good to benchmark versus other implementations, like arctan2 used here: https://www.kaggle.com/code/shlomoron/icecube-eda-pca-baseline-cv-1-28-lb-1-274 

## This version has a small optimization trick, calculating azimuth without regard for z or zenith
## This version is suboptimal if the vectors are already unit vectors, or if you need 3d unit vectors again later for some other step.
def angles_from_vectors(vectors):
    v_squared = np.square(vectors)
    
    ## Shortcut optimization for azimuth: calculate 2d unit vectors for x and y independent of z
    xy_sq = np.sum(v_squared[:, 0:2], axis=1)
    xy_d = np.sqrt(xy_sq)[:, None]
    np.seterr(divide='ignore', invalid='ignore') ## Turn off the warning temporarily
    vectors[:, 0:2] = np.where(xy_d == 0, xy_d, vectors[:, 0:2]/xy_d)

    ## For z, use full 3d unit vector
    d = np.sqrt(xy_sq + v_squared[:, 2])
    vectors[:, 2] = np.where(d == 0, d, vectors[:, 2]/d)
    np.seterr(divide='warn', invalid='warn') ## Turn back on

    ## As mentioned by others, clip solely to avoid floating point errors, the unit vectors should already be within this range.
    vectors =  np.clip(vectors, -1, 1)

    azimuth = np.arccos(vectors[:, 0])
    ## if y < 0, convert from quadrants 1 and 2 to quadrants 3 and 4
    azimuth = np.where(vectors[:, 1] >= 0, azimuth, 2*math.pi - azimuth)
    azimuth = np.where(np.isfinite(azimuth), azimuth, 0.0)

    zenith = np.arccos(vectors[:, 2])
    ## IMPORTANT: zenith angles are not evenly distributed, so set the error case to pi/2!
    ## (even though x, y, z might be. It would be a fun exercise to check if random values
    ##  for x, y, z converted to zenith angles would match the observed distribution of zenith angles in the train labels)
    zenith = np.where(np.isfinite(zenith), zenith, math.pi/2)

    return np.stack([azimuth, zenith], axis=1)

In [6]:
## Takes a list of azimuth np arrays, a list of zenith np arrays, and an optional list of numerical weights,
## and ensembles into a final direction.
##
## It's not really optimal in terms of lines of code nor performance,
## since in most or all cases you are converting a unit vector to an angle,
## converting back to a unit vector, averaging, then converting to the final angle.
## However, it is quite convenient, because you can always use this at the end
## to ensemble the results originating from any number of notebooks or sources.
def average_angles(az_list, zen_list, weights=None):
    assert(len(az_list) == len(zen_list))
    total = az_list[0].shape[0]
    x = np.zeros(total)
    y = np.zeros(total)
    z = np.zeros(total)
    for i in range(len(az_list)):
        w = 1
        if weights is not None:
            w = weights[i]
        az = az_list[i]
        zen = zen_list[i]
        assert(az.shape[0] == total)
        assert(zen.shape[0] == total)
        if not (np.all(np.isfinite(az)) and
                np.all(np.isfinite(zen))):
            raise ValueError("All arguments must be finite")
        sz = np.sin(zen)
        x += w*np.cos(az)*sz
        y += w*np.sin(az)*sz
        z += w*np.cos(zen)
    tot_w = len(az_list)
    if weights is not None:
        tot_w = sum(weights)
    x = x / tot_w
    y = y / tot_w
    z = z / tot_w
    d = np.sqrt(np.square(x) + np.square(y) + np.square(z))
    x = x / d
    y = y / d
    z = z / d
    return angles_from_vectors(np.stack([x, y, z], axis=1))

# Section 1 - Center of Charge with Pandas
In this section, we demonstrate the core algorithm. 

If USE_BEST_POINTS is True, we also use our algorithm in Section 2 to get higher quality data points as input.

Later, in Section 3, we will rewrite this section using Polars for lightning fast execution time!

⚡🧊⚡**WARNING**⚡🧊⚡: When substituting Polars in a Pandas Python environment, there is a distinct possibility of emitting Cherenkov radiation due to code execution that's faster than the speed of light in a Pandas medium!

### Dual time-weighted Center of Charge in more detail
The core concept is if the charge - the light - from the neutrino collision travels omni-directionally and also in the direction of the neutrino, then if we center all 'early' time events versus all 'late' time events, the spherical omni-directional attributes will, on average, cancel out, leaving only the directional attributes we are looking for. We could do a naive technique like first half vs last half, but in reality we want to treat all time gaps as equal relative to the length of the time gap. So we can normalize all timestamps as 0 to 1 (0 is earliest to 1 as latest) or the inverse 1 to 0 (1 is earliest). In this way, we calculate two points, draw a line from earliest to latest to know the direction the neutrino is going, then (**IMPORTANT**) flip it to answer the organizer's stated question of where did the neutrino come from.

Using 'charge' instead of mass, we can trivially calculate the 'center of charge' of an arbitrary number of points. The time weights just become another weighting besides charge in this very simple algorithm.

If v_i is each observation's x,y,z coordinates, and t0_i (t1_i) is the calculated time weights, then simply:

``` python3
v0 = sum(v_i * charge_i * t0_i) / sum(charge_i * t0_i)
v1 = sum(v_i * charge_i * t1_i) / sum(charge_i * t1_i)
```
Which can also be expressed in the form as simply sum(w * v) / sum(w), for arbitrary weights, which in our case we are using dual time-weights: w0 = charge * t0; w1 = charge * t1.

Then from that, v1 - v0 is the direction vector, with the reverse (v0 - v1) being the direction the neutrino is from.

In [7]:
def center_of_charge(batch):
    ## Groupby -> transform is the key trick to avoiding the dreaded 'for each event' loop,
    ## and thus getting ~30-60x speed boost improvement!
    ## If you need any min, max, mean or other simple value from the event group,
    ## you can precalculate it for each group and broadcast it
    batch['ev_t_min'] = batch.groupby('event_id')['time'].transform('min')
    batch['ev_t_max'] = batch.groupby('event_id')['time'].transform('max')
    
    ## Now we can just implement our formula! w0 and w1 are the time-weighted charge cases.
    ## Gather the values we need
    batch['w1'] = batch.charge * (batch.time - batch.ev_t_min) / (batch.ev_t_max - batch.ev_t_min)
    batch['w0'] = batch.charge - batch.w1
    batch['wx0'] = batch.x * batch.w0
    batch['wy0'] = batch.y * batch.w0
    batch['wz0'] = batch.z * batch.w0
    batch['wx1'] = batch.x * batch.w1
    batch['wy1'] = batch.y * batch.w1
    batch['wz1'] = batch.z * batch.w1
    df = batch[['w0', 'w1', 'wx0', 'wy0', 'wz0', 'wx1', 'wy1', 'wz1']]

    ## Calculate all the sums!
    df = df.groupby('event_id').sum()
    
    ## Now do the final divide of the weighted center by the sum of the weights.
    df[['wx0', 'wy0', 'wz0']] = df[['wx0', 'wy0', 'wz0']].div(df.w0, axis=0)
    df[['wx1', 'wy1', 'wz1']] = df[['wx1', 'wy1', 'wz1']].div(df.w1, axis=0)
    
    ## The direction the neutrino is traveling FROM is point0 - point1, instead of point1 - point0.
    ## Counter-intuitive to me, but fortunately, easy to notice and correct if your score is > 1.57 instead of less.
    df[['x', 'y', 'z']] = df[['wx0', 'wy0', 'wz0']].values - df[['wx1', 'wy1', 'wz1']].values

    df = df[['x', 'y', 'z']]
    df[['azimuth', 'zenith']] = angles_from_vectors(df.values)

    return(df[['azimuth', 'zenith']])

### Weighted least-squares algorithm
SolverWorld discusses (unweighted) least squares algorithm [here](https://www.kaggle.com/competitions/icecube-neutrinos-in-deep-ice/discussion/381747) and implements [here](https://www.kaggle.com/code/solverworld/icecube-neutrino-path-least-squares-1-214)

$$v_{est} = \frac{ \langle ri * ti \rangle -  \langle r \rangle * \langle t \rangle}{\langle t^2 \rangle - \langle t \rangle ^2}$$

UPDATE: This algorithm uses 4 dimensions of data (x,y,z,time). How do we modify the formula to give arbitrary weights to each point?

Instead of <> meaning take the mean, let's generalize that as <> meaning sum(w_i * _ith_value_inside_<>) / sum(w_i). In the normal case, where all w_i = 1, that simplifies to the mean.

Now we have 5 dimensions using this expanded formula.



In [8]:
def least_squares(batch, weighted=False):
    batch['xt'] = batch.x * batch.time
    batch['yt'] = batch.y * batch.time
    batch['zt'] = batch.z * batch.time
    batch['tt'] = batch.time * batch.time
    if weighted:
        df = batch[['x', 'y', 'z', 'time', 'xt', 'yt', 'zt', 'tt']] * batch.charge.values[:, None]
        df['charge'] = batch.charge
        df = df.groupby('event_id').sum()
        df = df.div(df.charge, axis=0)
    else:
        df = batch[['x', 'y', 'z', 'time', 'xt', 'yt', 'zt', 'tt']]
        df = df.groupby('event_id').mean()
    df[['x', 'y', 'z']] = (
                              (df[['xt', 'yt', 'zt']].values - (df[['x', 'y', 'z']].values * df['time'].values[:, None]))
                            / (df['tt'].values - (df.time.values * df.time.values))[:, None]
                          )
    ## Reverse it
    df = -df[['x', 'y', 'z']]
    df[['azimuth', 'zenith']] = angles_from_vectors(df.values)
    return df[['azimuth', 'zenith']]


In [9]:
def process_batch(batch_id, sensor, max_events=None):
    print('load batch...')
    batch = pd.read_parquet(f'{INPUT_DIR}/{MODE}/batch_{batch_id}.parquet')

    ## Limit to max_events
    if max_events is not None:
        batch_i = batch.reset_index()
        event_ids = batch_i.event_id.drop_duplicates()
        end_index = event_ids.index[max_events]
        batch = batch_i[:end_index].set_index('event_id')
    print(batch.shape)

    ## Merge in sensor x,y,z data
    batch = batch.reset_index().merge(sensor, how='left', on='sensor_id', left_index=False).set_index('event_id')

    ## The logic for auxiliary = False is very basic, we can improve on it. Initial discussion here: 
    if FIND_BEST_POINTS:
        batch = find_best_points_pandas(batch)

    ## Limit to primary (aux=False) datapoints if there's enough of them.
    ## For event_ids with too few, set all rows to aux=False. This handles these cases without any for loop logic.
    ## MIN_PRIMARY_DATAPOINTS is a tuned value, based on optimizing the score on batches 101-110.
    ## But the result was 'as low as possible'.
    batch['primary_count'] = batch.groupby('event_id')['auxiliary'].transform('count') - batch.groupby('event_id')['auxiliary'].transform('sum')
    batch.loc[batch.primary_count < MIN_PRIMARY_DATAPOINTS, 'auxiliary'] = False
    batch = batch[batch.auxiliary == False]
    print(batch.shape)

    if USE_ENSEMBLE or ALGORITHM == 'center of charge':
        df = center_of_charge(batch)
        if USE_ENSEMBLE:
            df1 = df
    if USE_ENSEMBLE or ALGORITHM == 'least squares':
        df = least_squares(batch, weighted=USE_WEIGHTED_LEAST_SQUARES)
    if USE_ENSEMBLE:
        df[['azimuth', 'zenith']] = average_angles([df1.azimuth.values, df.azimuth.values],
                                                   [df1.zenith.values, df.zenith.values], 
                                                   weights=WEIGHTS)
    return df

# Section 2 - Find Best Points
In this section, I try to optimize which points are selected to be processed by the center of charge OR any other algorithm. This is just a baseline algorithm.

The algorithm with current parameters: Check the nearest 3 sensors above and 3 below, for any hits within 350ns of the hit. In current algorithm, it has to be a hit on the neighbor, a second hit on the same sensor within 375ns is not sufficient. For Deep Ice strings, check only the nearest sensor above and below within 180ns.

In early drafts, I tried calculating pure distance (which can be precomputed for a 5620x5620 sensor matrix). However, the early results showed an optimal distance well under the minimum xy distance separating strings. Which after reading a bunch of discussions and IceCube overviews, did NOT surprise me. So I simplified to simply using vertical neighbors and seeing how many is optimal. And likewise tuning how far apart to set the optimal max time delta. 1000ns is probably too long, if you are mainly using adjacent sensors that logically SHOULD both trigger for the high energy events you are looking for.

Of course, future work can include a hugely stratified event characterization, so that we use different hyperparameters for 'best points' based on factors like total observed charge and/or number of rows, and other factors. Possibly stratified for 'edge' strings vs 'inner' strings. Another to-do that should see big gains is to use fuzzy logic, not binary logic. Convert this to a (sigmoidal?) likelihood algorithm and use the result as weights instead of just 'yes' vs 'no' binary weighting.

Deep ice strings are NOT symmetric compared with the other strings! My 'center of charge' algorithm relies on symmetry. Current solution to that is to use different parameters that only take the highest quality points. It's more of a compromise than a solution to this particular issue. There might be some corner case math or other trick that could do a better job of preventing anti-symmetry bias from reducing the accuracy of the final prediction.

In [10]:
def proximity(e, col):
    ## Since we explode the data to an NxN array, if N is too high it will take a long time and anyways we'll run out of memory.
    ## TODO: see how high we can go before we run out of memory
    ## TODO: consider subsampling instead of simply returning the baseline auxiliary setting?
    ##       And/or splitting on string_id to get much smaller groups
    if e.shape[0] > 2000:
        return e[:, col['auxiliary']]

    ## The magic None in the line below is a new thing I learned while working on this notebook.
    ## It is shorthand for np.newaxis, and broadcasts the array to another dimension.
    ## This allows us to create an NxN array for each event, so that we can
    ## check if *any* other row in the event meets our proximity in time and height requirements.
    ## For any matching pairs, then we set both rows as auxiliary = False. Rows without matches are auxiliary = True.
    deltas = np.abs(e[:, [col['string_id'], col['depth_id'], col['time']]] - e[:, None, [col['string_id'], col['depth_id'], col['time']]])
    dz = deltas[:, :, 1]

    ## if same depth or different string id, ignore by setting dz > the max threshold used later.
    dz[(dz == 0) | (deltas[:, :, 0] != 0)] = MAX_Z + MAX_DEEP_Z + 1

    ## if sensor is not a deep ice sensor, and time > MAX_T, ignore
    mask = (e[:, col['sensor_id']] < 4680)
    mask = np.broadcast_to(mask, (mask.shape[0], mask.shape[0])).T
    dz[mask & (deltas[:, :, 2] > MAX_T)] = MAX_Z + MAX_DEEP_Z + 1
    ## if sensor IS a deep ice sensor, and time > MAX_DEEP_T, ignore
    mask = (e[:, col['sensor_id']] >= 4680)
    mask = np.broadcast_to(mask, (mask.shape[0], mask.shape[0])).T
    dz[mask & (deltas[:, :, 2] > MAX_DEEP_T)] = MAX_Z + MAX_DEEP_Z + 1

    ## Now take the min (best) result for each row com
    dz = dz.min(axis=1)
    ## If no matches, the default, then everything is aux=True
    e[:, col['auxiliary']] = True
    ## If not deep ice and distance less than threshold, or deep ice and distance less than other threshold, then we have a match!
    e[((e[:, col['sensor_id']] < 4680) & (dz <= MAX_Z)) | ((e[:, col['sensor_id']] >= 4680) & (dz <= MAX_DEEP_Z)), col['auxiliary']] = False
    ## Return only the data needed to speed up the np.concatenate called next.
    return e[:, col['auxiliary']]

In [11]:
## You can ignore this one unless interested in a deep dive on performance optimization
## It is provided as a way of comparing the changes versus the pure numpy version.
## Comments removed from this copy to conserve vertical space.
def proximity_unoptimized(df):
    if df.shape[0] > 2000:
        return df
    deltas = np.abs(df[['string_id', 'depth_id', 'time']].values - df[['string_id', 'depth_id', 'time']].values[:, None, :])
    mask = (df.sensor_id < 4680)
    dz = deltas[:, :, 1]

    dz[(dz == 0) | (deltas[:, :, 0] != 0)] = MAX_Z + MAX_DEEP_Z + 1

    mask = (df.sensor_id < 4680)
    mask = np.broadcast_to(mask, (mask.shape[0], mask.shape[0])).T
    dz[mask & (deltas[:, :, 2] > MAX_T)] = MAX_Z + MAX_DEEP_Z + 1
    mask = (df.sensor_id >= 4680)
    mask = np.broadcast_to(mask, (mask.shape[0], mask.shape[0])).T
    dz[mask & (deltas[:, :, 2] > MAX_DEEP_T)] = MAX_Z + MAX_DEEP_Z + 1

    dz = dz.min(axis=1)
    df.auxiliary = True
    df.loc[((df.sensor_id < 4680) & (dz <= MAX_Z)) | ((df.sensor_id >= 4680) & (dz <= MAX_DEEP_Z)), 'auxiliary'] = False
    return df


In [12]:
def find_best_points_pandas(batch):
    if USE_UNOPTIMIZED:
        cols = ['sensor_id', 'time', 'auxiliary', 'string_id', 'depth_id']
        batch[cols] = batch[cols].groupby('event_id').progress_apply(proximity_unoptimized)
        return batch

    ## np.split used as a pure numpy equivalent of groupby
    ## Note this version didn't minimize the size of the inputs, but does make sure all dtypes are the same for an efficient np array.
    column_to_index = { k:v for v,k in enumerate(batch.columns)}
    events = np.split(batch.values.astype('float32'), np.unique(batch.index.values, return_index=True)[1][1:])

    ## Run each event sequentially in a list comprehension, then join back together with np.concatenate.
    ## So far tried and failed to find a reasonable solution to avoid this groupby > apply > join loop.
    ## Note that this line overrides the dtype of 'auxiliary' column to float32
    batch.auxiliary = np.concatenate([proximity(e, column_to_index) for e in tqdm(events)])
    return batch

# Run Sections 1 and 2 with pandas version

In [13]:
if not USE_POLARS:
    print(MODE)
    ## Note: this is so slow (until Polars). Never did check how much faster removing the unnecessary columns is.
    ## I only actually use event_id, batch_id and azimuth, zenith labels.
    ## It would also be very reasonable to create a dataset with 660 train_meta_{n}.parquet files,
    ## to avoid the completely unnecessary loading costs of the huge 100M columns.
    meta = pd.read_parquet(f'{INPUT_DIR}/{MODE}_meta.parquet')
    print(meta.shape)

    print('load sensor data...')
    sensor = pd.read_csv(f'{INPUT_DIR}/sensor_geometry.csv')
    sensor['string_id'] = sensor.sensor_id // 60
    sensor['depth_id'] = sensor.sensor_id % 60
    print(sensor.shape)

    if MODE == 'train':
        batch_id_start = TRAIN_BATCH_START
        batch_id_end = batch_id_start + TRAIN_N_BATCHES
    else:
        batch_id_start = meta.batch_id.values[0]
        batch_id_end = meta.batch_id.values[-1] + 1

    sub = []
    for batch_id in range(batch_id_start,batch_id_end):
        print(batch_id)
        t = time.time()
        df = process_batch(batch_id, sensor, max_events=TRAIN_MAX_EVENTS)
        print(f'Time: {time.time() - t:0.2f}s')
        if MODE == 'test':
            sub.append(df)
        else:
            if TRAIN_MAX_EVENTS is not None:
                print(angular_dist_score(meta[meta.batch_id == batch_id].azimuth.values[:TRAIN_MAX_EVENTS],
                                         meta[meta.batch_id == batch_id].zenith.values[:TRAIN_MAX_EVENTS],
                                         df.azimuth.values, df.zenith.values))
            else:
                print(angular_dist_score(meta[meta.batch_id == batch_id].azimuth.values, meta[meta.batch_id == batch_id].zenith.values,
                                         df.azimuth.values, df.zenith.values))

    if MODE == 'test':
        sub = pd.concat(sub, axis=0)
        sub.to_csv('submission.csv', index=True)
        print(sub)

    print('done')

# Section 3 - Polars
Let's see what performance we get when using Polars instead of Pandas!

In [14]:
def time_weighted_centering(batch, charge_weighted=True):
    ## Polars equivalent to groupby->transform is called 'over'. We again use this to get min and max without a for loop.
    batch = batch.with_columns([pl.col('time').min().over('event_id').alias('ev_t_min'),
                                pl.col('time').max().over('event_id').alias('ev_t_max')])
    if charge_weighted:
        batch = batch.with_columns((pl.col('charge') * (pl.col('time') - pl.col('ev_t_min'))
                                    / (pl.col('ev_t_max') - pl.col('ev_t_min'))).alias('w1'))
        batch = batch.with_columns((pl.col('charge') - pl.col('w1')).alias('w0'))
    else:
        batch = batch.with_columns(((pl.col('time') - pl.col('ev_t_min'))
                                    / (pl.col('ev_t_max') - pl.col('ev_t_min'))).alias('w1'))
        batch = batch.with_columns((pl.lit(1) - pl.col('w1')).alias('w0'))

    batch = batch.select(
        [
            pl.col('event_id'),
            pl.col('w0'),
            pl.col('w1'),
            (pl.col('x') * pl.col('w0')).alias('wx0'),
            (pl.col('y') * pl.col('w0')).alias('wy0'),
            (pl.col('z') * pl.col('w0')).alias('wz0'),
            (pl.col('x') * pl.col('w1')).alias('wx1'),
            (pl.col('y') * pl.col('w1')).alias('wy1'),
            (pl.col('z') * pl.col('w1')).alias('wz1'),
        ]
    ).collect().groupby('event_id', maintain_order=True).sum()

    ## The direction the neutrino is traveling FROM is point0 - point1, instead of point1 - point0.
    ## Counter-intuitive to me, but fortunately, easy to notice and correct if your score is > 1.57 instead of less.
    batch_values = batch.select(
        [
            ((pl.col('wx0') / pl.col('w0')) - (pl.col('wx1') / pl.col('w1'))).alias('x'),
            ((pl.col('wy0') / pl.col('w0')) - (pl.col('wy1') / pl.col('w1'))).alias('y'),
            ((pl.col('wz0') / pl.col('w0')) - (pl.col('wz1') / pl.col('w1'))).alias('z'),
        ]
    ).to_numpy()
    return angles_from_vectors(batch_values), batch


In [15]:
## We use the same numpy proximity function, the only difference is we convert from and to a Polars df instead of a Pandas df.
def find_best_points_polars(batch):
    ## Minimize the size of the inputs, and make sure all data types are the same for an efficient np array
    df = batch.select([pl.col('sensor_id'), pl.col('time'), pl.col('auxiliary'), pl.col('string_id'), pl.col('depth_id')])
    column_to_index = { k:v for v,k in enumerate(df.columns)}
    batch_values = df.to_numpy().astype('float32')

    ## Pure numpy equivalent of groupby
    events = np.split(batch_values, np.unique(batch.select(pl.col('event_id')), return_index=True)[1][1:])

    ## Run each event sequentially in a list comprehension, then join back together with np.concatenate. So far tried and failed to find a reasonable solution to avoid this groupby > apply > join loop.
    ## Rename 'auxiliary' -> 'best', and flip the boolean value
    batch = batch.with_columns(pl.Series(np.concatenate([proximity(e, column_to_index) for e in tqdm(events)]).astype('bool')).alias('best')).with_columns(pl.col('best').is_not())

    return batch

### Prep for running batches

In [16]:
if USE_POLARS:
    print(MODE)
    ## Scan parquet is part of Polars lazy evaluation, so no cost yet,
    ## and it can figure out optimizations when we finally 'collect' it later.
    meta = pl.scan_parquet(f'{INPUT_DIR}/{MODE}_meta.parquet')

    print('load sensor data...')
    sensor = (pl.scan_csv(f'{INPUT_DIR}/sensor_geometry.csv')
                .with_columns([
                    pl.col('sensor_id').cast(pl.Int16),
                    (pl.col('sensor_id') // 60).alias('string_id'),
                    (pl.col('sensor_id') % 60).alias('depth_id'),                
                ])
             )

    print(sensor)

    if MODE == 'train':
        batch_id_start = TRAIN_BATCH_START
        batch_id_end = batch_id_start + TRAIN_N_BATCHES
    else:
        batch_id_start = meta.select(pl.col('batch_id')).collect()[0, 0]
        batch_id_end = meta.select(pl.col('batch_id')).collect()[-1, 0] + 1

    print(batch_id_start, batch_id_end)

test
load sensor data...
naive plan: (run LazyFrame.explain(optimized=True) to see the optimized plan)

 WITH_COLUMNS:
 [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) floor_div (60)].alias("string_id"), [(col("sensor_id")) % (60)].alias("depth_id")]

    Csv SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS
661 662


### Process each batch

In [17]:
if USE_POLARS:
    sub = []
    for batch_id in range(batch_id_start,batch_id_end):
        print(batch_id)
        t = time.time()
        max_events=TRAIN_MAX_EVENTS
        print('load batch...')
        batch = pl.scan_parquet(f'{INPUT_DIR}/{MODE}/batch_{batch_id}.parquet')

        ## Limit to max_events
        if max_events is not None:
            batch = batch.collect()
            last_event_id = batch.select(pl.col('event_id')).unique()[TRAIN_MAX_EVENTS-1, 0]
            batch = batch.lazy().filter(pl.col('event_id') <= last_event_id)

        ## Merge in sensor x,y,z data
        batch = batch.join(sensor, on='sensor_id', how='left').collect()

        ## The logic for auxiliary = False is very basic, we can improve on it. Initial discussion here: 
        if FIND_BEST_POINTS:
            batch = find_best_points_polars(batch)

        ## Use data point weights instead of filtering. Still set most points to 0 weight, unless they are the only data points available.
        ## MIN_PRIMARY_DATAPOINTS and AUX_FALSE_WEIGHT are tuned values, based on optimizing the score on batches 101-110.
        batch = batch.lazy().with_columns((pl.col('best').count().over('event_id') - pl.col('best').sum().over('event_id')).alias('best_count'))
        batch = batch.lazy().with_columns((pl.col('auxiliary').count().over('event_id') - pl.col('auxiliary').sum().over('event_id')).alias('non_aux_count'))
        batch = batch.with_columns(( pl.when( pl.col('best') | ((pl.col('best_count') < MIN_PRIMARY_DATAPOINTS) & (pl.col('non_aux_count') < MIN_PRIMARY_DATAPOINTS)) )
                                                .then(1.0)
                                                .otherwise(pl.when(pl.col('auxiliary').is_not())
                                                             .then(AUX_FALSE_WEIGHT)
                                                             .otherwise(0.0)
                                                        ) 
                                          ).alias('trust'))
        batch = batch.lazy().with_columns((pl.col('charge') * pl.col('trust')).alias('charge'))

        preds, events = time_weighted_centering(batch)
        if USE_ENSEMBLE:
            preds1 = preds
#             preds, events = time_weighted_centering(batch, charge_weighted=False)
            ## Instead of charge_weighted=False, set charge*aux to just equal aux.
            batch = batch.lazy().with_columns(pl.col('trust').alias('charge'))
            preds, events = time_weighted_centering(batch)
            preds = average_angles([preds1[:, 0], preds[:, 0]], [preds1[:, 1], preds[:, 1]], weights=WEIGHTS)

        if MODE == 'test':
            sub.append(events.select([pl.col('event_id'), pl.Series(preds[:, 0]).alias('azimuth'),
                                                         pl.Series(preds[:, 1]).alias('zenith')]))
        else:
            meta = meta.filter((pl.col('batch_id') >= batch_id_start) & (pl.col('batch_id') < batch_id_end))
            if isinstance(meta, pl.LazyFrame):
                meta = meta.collect()
            meta_values = meta.filter(pl.col('batch_id') == batch_id).select([pl.col('azimuth'), pl.col('zenith')]).to_numpy()
            if TRAIN_MAX_EVENTS is not None:
                print(angular_dist_score(meta_values[:TRAIN_MAX_EVENTS, 0], meta_values[:TRAIN_MAX_EVENTS, 1], preds[:, 0], preds[:, 1]))
            else:
                print(angular_dist_score(meta_values[:, 0], meta_values[:, 1], preds[:, 0], preds[:, 1]))
        print(f'Time: {time.time() - t:0.2f}s')

    if MODE == 'test':
        sub = pl.concat(sub)
        sub.write_csv('submission.csv')
        print(sub)

    print('done')


661
load batch...


100%|██████████| 3/3 [00:00<00:00, 736.19it/s]

Time: 0.11s
shape: (3, 3)
┌──────────┬──────────┬──────────┐
│ event_id ┆ azimuth  ┆ zenith   │
│ ---      ┆ ---      ┆ ---      │
│ i64      ┆ f64      ┆ f64      │
╞══════════╪══════════╪══════════╡
│ 2092     ┆ 2.62718  ┆ 2.342972 │
│ 7344     ┆ 4.248741 ┆ 3.141593 │
│ 9482     ┆ 4.345112 ┆ 1.587    │
└──────────┴──────────┴──────────┘
done



