The code in this notebook is based on the code as was originally written in [LSTM Preprocessing Point Picker](https://www.kaggle.com/code/seungmoklee/lstm-preprocessing-point-picker). The author of this notebook did a great job of setting a clear baseline.

I modified the code in the following part:
* Maximum pulse count is set to 96.
* Remove the features r_err and z_err.
* Remove all non-essential code and graphics. 

With these few changes the output files only contain the features for the events as I use them in my [Tensorflow LSTM Model Training TPU](https://www.kaggle.com/code/rsmits/tensorflow-lstm-model-training-tpu) notebook and [Tensorflow LSTM Model Inference](https://www.kaggle.com/code/rsmits/tensorflow-lstm-model-inference) notebook.

In [1]:
# Data I/O and preprocessing
import numpy as np
import pandas as pd
import pyarrow.parquet as pq

# System
import time
import os
import gc
from tqdm.notebook import tqdm
import tensorflow as tf
from contextlib import ExitStack
# multiprocessing
import multiprocessing

In [2]:
# Data setting
train_batch_id_first = 118
train_batch_id_last = 156
train_batch_ids = range(train_batch_id_first, train_batch_id_last + 1)

# Feature Settings
max_pulse_count = 96
n_features = 6  # time, charge, aux, x, y, z, rank 

# Directories
home_dir = "/kaggle/input/icecube-neutrinos-in-deep-ice/"
train_format = home_dir + 'train/batch_{batch_id:d}.parquet'
point_picker_format = 'pp_mpc96_n7_batch_{batch_id:d}.npz'

In [3]:
# counts
doms_per_string = 60
string_num = 86

# index
outer_long_strings = np.concatenate([np.arange(0, 25), np.arange(27, 34), np.arange(37, 44), np.arange(46, 78)])
inner_long_strings = np.array([25, 26, 34, 35, 36, 44, 45])
inner_short_strings = np.array([78, 79, 80, 81, 82, 83, 84, 85])
# known specs
outer_xy_resolution = 125. / 2
inner_xy_resolution = 70. / 2
long_z_resolution = 17. / 2
short_z_resolution = 7. / 2


# Sensor Geometry Data
sensor_geometry_df = pd.read_csv(home_dir + "sensor_geometry.csv")

# X, Y, Z coordinates
sensor_x = sensor_geometry_df.x
sensor_y = sensor_geometry_df.y
sensor_z = sensor_geometry_df.z
sensor_r_err = np.ones(doms_per_string * string_num)
sensor_z_err = np.ones(doms_per_string * string_num)
for string_id in outer_long_strings:
    sensor_r_err[string_id * doms_per_string:(string_id + 1) * doms_per_string] *= outer_xy_resolution
for string_id in np.concatenate([inner_long_strings, inner_short_strings]):
    sensor_r_err[string_id * doms_per_string:(string_id + 1) * doms_per_string] *= inner_xy_resolution

for string_id in outer_long_strings:
    sensor_z_err[string_id * doms_per_string:(string_id + 1) * doms_per_string] *= long_z_resolution
for string_id in np.concatenate([inner_long_strings, inner_short_strings]):
    for dom_id in range(doms_per_string):
        z = sensor_z[string_id * doms_per_string + dom_id]
        if (z < -156.) or (z > 95.5 and z < 191.5):
            sensor_z_err[string_id * doms_per_string + dom_id] *= short_z_resolution
# register
sensor_geometry_df["r_err"] = sensor_r_err
sensor_geometry_df["z_err"] = sensor_z_err


# Detector constants
c_const = 0.299792458  # speed of light [m/ns]

# Min / Max information
x_min = sensor_x.min()
x_max = sensor_x.max()
y_min = sensor_y.min()
y_max = sensor_y.max()
z_min = sensor_z.min()
z_max = sensor_z.max()

# Detector Valid Length
detector_length = np.sqrt((x_max - x_min)**2 + (y_max - y_min)**2 + (z_max - z_min)**2)
t_valid_length = detector_length / c_const

print(f"time valid length: {t_valid_length} ns")

time valid length: 6199.700247193777 ns


In [4]:
"""

## Single event reader function

- Pick-up important data points first
    - Rank 3 (First)
        - not aux, in valid time window
    - Rank 2
        - not aux, out of valid time window
    - Rank 1
        - aux, in valid time window
    - Rank 0 (Last)
        - aux, out of valid time window
    - In each ranks, take pulses from highest charge

"""

# read single event from batch_meta_df
def read_event(event_idx, batch_meta_df, max_pulse_count, batch_df, train=True):
    # read metadata
    batch_id, first_pulse_index, last_pulse_index = batch_meta_df.iloc[event_idx][["batch_id", "first_pulse_index", "last_pulse_index"]].astype("int")

    # read event
    event_feature = batch_df[first_pulse_index:last_pulse_index + 1]
    sensor_id = event_feature.sensor_id
    
    # merge features into single structured array
    dtype = [("time", "float16"),
             ("charge", "float16"),
             ("auxiliary", "float16"),
             ("x", "float16"),
             ("y", "float16"),
             ("z", "float16"),
             ("rank", "short"),
            ("r_err", "float16"),
            ("z_err", "float16")]
    event_x = np.zeros(last_pulse_index - first_pulse_index + 1, dtype)

    event_x["time"] = event_feature.time.values - event_feature.time.min()
    event_x["charge"] = event_feature.charge.values
    event_x["auxiliary"] = event_feature.auxiliary.values

    event_x["x"] = sensor_geometry_df.x[sensor_id].values
    event_x["y"] = sensor_geometry_df.y[sensor_id].values
    event_x["z"] = sensor_geometry_df.z[sensor_id].values

    event_x["r_err"] = sensor_geometry_df.r_err[sensor_id].values
    event_x["z_err"] = sensor_geometry_df.z_err[sensor_id].values
    
    # For long event, pick-up
    if len(event_x) > max_pulse_count:
        # Find valid time window
        t_peak = event_x["time"][event_x["charge"].argmax()]
        t_valid_min = t_peak - t_valid_length
        t_valid_max = t_peak + t_valid_length

        t_valid = (event_x["time"] > t_valid_min) * (event_x["time"] < t_valid_max)

        # rank
        event_x["rank"] = 2 * (1 - event_x["auxiliary"]) + (t_valid)

        # sort by rank and charge (important goes to backward)
        event_x = np.sort(event_x, order=["rank", "charge"])

        # pick-up from backward
        event_x = event_x[-max_pulse_count:]

        # resort by time
        event_x = np.sort(event_x, order="time")

    # resort by time
    event_x = np.sort(event_x, order="time")
        
    # for train data, give angles together
    azimuth, zenith = batch_meta_df.iloc[event_idx][["azimuth", "zenith"]].astype("float32")
    event_y = np.array([azimuth, zenith], dtype="float32")
        
    return event_idx, len(event_x), event_x, event_y

In [5]:
#230316 junseonglee11 implementation of normalization and inverse function 
#                     for data preprocessing
def normalize_data(x_data, y_data):
    #최소, 배율, time은 놔두는게 좋을듯
    norm_coeff_x = np.zeros((9, 2), np.float32)
    norm_coeff_x[0] = 0, 1000
    norm_coeff_x[1] = 0, 300
    norm_coeff_x[2] = 0, 1
    norm_coeff_x[3] = 0, 600
    norm_coeff_x[4] = 0, 600
    norm_coeff_x[5] = 0, 600
    #norm_coeff_x[6] = 0, 62.5
    #norm_coeff_x[7] = 0, 8.5
    #norm_coeff_x[8] = 0, 3.0

    #categorical

    for i in range(0, 6):
        x_data[:,:,i] = (x_data[:,:,i]-norm_coeff_x[i,0])/norm_coeff_x[i,1]
        
    #time to diff_time
    x_data[:,:-1,0] = x_data[:,1:,0] - x_data[:,:-1,0]
    x_data[:,-1,0] = 0            
    
    
    #pseudo momentum (next time position - current position)
    pseudo_momentum = x_data[:, :, 3:].copy()
    pseudo_momentum[:,:-1,:] = pseudo_momentum[:,1:,:] - x_data[:,:-1, 3:]
    pseudo_momentum[:,-1,:] = 0
    
    for i in range(0, 3):
        pseudo_momentum[:,:-1,i][x_data[:,:-1,0]<0]=0

    
    x_data[:,:-1,0][x_data[:,:-1,0]<0] = 0
    
    x_data = np.append(x_data, pseudo_momentum, axis = 2)

    return x_data, y_data

In [6]:
def create_example_protobuff(event_pulses, line_fit_angles, origin_azimuth, origin_zenith):
    # convert to binary string format for Example protobuf
    event_data = tf.io.serialize_tensor(tf.cast(event_pulses, tf.float16))
    return tf.train.Example(
        features=tf.train.Features(
            feature={
                'event_pulses': tf.train.Feature(bytes_list=tf.train.BytesList(value=[event_data.numpy()])),
                #'azimuth': tf.train.Feature(bytes_list=tf.train.BytesList(value=[azimuth.numpy()])),
                #'zenith': tf.train.Feature(bytes_list=tf.train.BytesList(value=[zenith.numpy()])),
                'fitted_azimuth': tf.train.Feature(float_list=tf.train.FloatList(value=[line_fit_angles[0]])),
                'fitted_zenith': tf.train.Feature(float_list=tf.train.FloatList(value=[line_fit_angles[1]])),
                'origin_azimuth': tf.train.Feature(float_list=tf.train.FloatList(value=[origin_azimuth])),
                'origin_zenith' : tf.train.Feature(float_list=tf.train.FloatList(value=[origin_zenith])),
            }
        )
    )

def write_tfrecords(filename, batch_x, line_fit_angles, batch_origin_y):
    options = tf.io.TFRecordOptions(compression_type='GZIP', compression_level=9)
    with ExitStack() as stack:
        writer = stack.enter_context(tf.io.TFRecordWriter(filename, options = options))

        # create example protobuffs from instances
        for i in tqdm(np.arange(0, len(batch_x))):
            event = tf.Variable(batch_x[i])
            #azimuth, zenith = dataset[i,1152:]            
            #azimuth, zenith = batch_y[i]       
            origin_azimuth, origin_zenith = batch_origin_y[i]
            example = create_example_protobuff(event, line_fit_angles[i], origin_azimuth, origin_zenith)
            writer.write(example.SerializeToString())
            
def save_to_tfrecord(batch_x, line_fit_angles, batch_origin_y, name):   
    # write Dataset to files
    write_tfrecords(f"{name}", batch_x, line_fit_angles, batch_origin_y)

# ⚡🧊⚡[LB 1.183] Polar Lightning
I took this from the invaluable works of roberthatch  
https://www.kaggle.com/code/roberthatch/lb-1-183-lightning-fast-baseline-with-polars/comments

In [7]:
## Configuration parameters
MODE = 'train'
#MODE = 'test'

# USE_POLARS = False
USE_POLARS = True

# TRAIN_MAX_EVENTS = 20000
TRAIN_MAX_EVENTS = None
TRAIN_BATCH_START = 1
TRAIN_N_BATCHES = 1

## I pulled in one piece of older code to demonstrate "before and after".
## Set to True (and USE_POLARS=False) if interested in seeing the difference.
USE_UNOPTIMIZED = False


#### HYPERPARAMETERS ####

## For setting auxiliary = False
## Hand-tuned and hand-validated, I mostly used batches 100-105, and probably early on also touched batch 1.
## TODO: revisit the deep core logic now that I've learned about the deep veto layer: https://www.kaggle.com/competitions/icecube-neutrinos-in-deep-ice/discussion/381702
FIND_BEST_POINTS = True
MIN_PRIMARY_DATAPOINTS = 2
MAX_Z = 3
MAX_DEEP_Z = 1
MAX_T = 350
MAX_DEEP_T = 180
if USE_POLARS:
    ## Only implmented in polars version.
    AUX_FALSE_WEIGHT = 0.01

## Ensemble and algorithm selection parameters.
## First weight is center of charge algorithm introduced in this notebook.
## Second weight is the unweighted version. Which turns out to be the least-squares algorithm: https://www.kaggle.com/competitions/icecube-neutrinos-in-deep-ice/discussion/381747
USE_ENSEMBLE = True
WEIGHTS = [0.58, 0.42]
if not USE_ENSEMBLE and not USE_POLARS:
    ALGORITHM = 'center of charge'
#     ALGORITHM = 'least squares'
    if ALGORITHM == 'least squares':
        USE_WEIGHTED_LEAST_SQUARES = False


## Constants
INPUT_DIR = '/kaggle/input/icecube-neutrinos-in-deep-ice'

## Basic configuration override logic
if MODE == 'test':
    TRAIN_MAX_EVENTS = None
if USE_ENSEMBLE:
    USE_WEIGHTED_LEAST_SQUARES = False
    
if USE_POLARS:
    try:
        import polars as pl
    except:
        print('Installing polars, please wait about 35 seconds...')
        !pip install /kaggle/input/polars01516/polars-0.15.16-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
        import polars as pl
        
import numpy as np
import pandas as pd
import math
import time
import gc
from tqdm.notebook import tqdm
tqdm.pandas()

## Condensed for space. See here for expanded original version: https://www.kaggle.com/code/sohier/mean-angular-error
def angular_dist_score(az_true, zen_true, az_pred, zen_pred):
    if not (np.all(np.isfinite(az_true)) and
            np.all(np.isfinite(zen_true)) and
            np.all(np.isfinite(az_pred)) and
            np.all(np.isfinite(zen_pred))):
        raise ValueError("All arguments must be finite")
    sa1 = np.sin(az_true)
    ca1 = np.cos(az_true)
    sz1 = np.sin(zen_true)
    cz1 = np.cos(zen_true)
    sa2 = np.sin(az_pred)
    ca2 = np.cos(az_pred)
    sz2 = np.sin(zen_pred)
    cz2 = np.cos(zen_pred)
    scalar_prod = sz1*sz2*(ca1*ca2 + sa1*sa2) + (cz1*cz2)
    scalar_prod =  np.clip(scalar_prod, -1, 1)
    return np.average(np.abs(np.arccos(scalar_prod)))

## TODO: It would be good to benchmark versus other implementations, like arctan2 used here: https://www.kaggle.com/code/shlomoron/icecube-eda-pca-baseline-cv-1-28-lb-1-274 

## This version has a small optimization trick, calculating azimuth without regard for z or zenith
## This version is suboptimal if the vectors are already unit vectors, or if you need 3d unit vectors again later for some other step.
def angles_from_vectors(vectors):
    v_squared = np.square(vectors)
    
    ## Shortcut optimization for azimuth: calculate 2d unit vectors for x and y independent of z
    xy_sq = np.sum(v_squared[:, 0:2], axis=1)
    xy_d = np.sqrt(xy_sq)[:, None]
    np.seterr(divide='ignore', invalid='ignore') ## Turn off the warning temporarily
    vectors[:, 0:2] = np.where(xy_d == 0, xy_d, vectors[:, 0:2]/xy_d)

    ## For z, use full 3d unit vector
    d = np.sqrt(xy_sq + v_squared[:, 2])
    vectors[:, 2] = np.where(d == 0, d, vectors[:, 2]/d)
    np.seterr(divide='warn', invalid='warn') ## Turn back on

    ## As mentioned by others, clip solely to avoid floating point errors, the unit vectors should already be within this range.
    vectors =  np.clip(vectors, -1, 1)

    azimuth = np.arccos(vectors[:, 0])
    ## if y < 0, convert from quadrants 1 and 2 to quadrants 3 and 4
    azimuth = np.where(vectors[:, 1] >= 0, azimuth, 2*math.pi - azimuth)
    azimuth = np.where(np.isfinite(azimuth), azimuth, 0.0)

    zenith = np.arccos(vectors[:, 2])
    ## IMPORTANT: zenith angles are not evenly distributed, so set the error case to pi/2!
    ## (even though x, y, z might be. It would be a fun exercise to check if random values
    ##  for x, y, z converted to zenith angles would match the observed distribution of zenith angles in the train labels)
    zenith = np.where(np.isfinite(zenith), zenith, math.pi/2)

    return np.stack([azimuth, zenith], axis=1)

## Takes a list of azimuth np arrays, a list of zenith np arrays, and an optional list of numerical weights,
## and ensembles into a final direction.
##
## It's not really optimal in terms of lines of code nor performance,
## since in most or all cases you are converting a unit vector to an angle,
## converting back to a unit vector, averaging, then converting to the final angle.
## However, it is quite convenient, because you can always use this at the end
## to ensemble the results originating from any number of notebooks or sources.
def average_angles(az_list, zen_list, weights=None):
    assert(len(az_list) == len(zen_list))
    total = az_list[0].shape[0]
    x = np.zeros(total)
    y = np.zeros(total)
    z = np.zeros(total)
    for i in range(len(az_list)):
        w = 1
        if weights is not None:
            w = weights[i]
        az = az_list[i]
        zen = zen_list[i]
        assert(az.shape[0] == total)
        assert(zen.shape[0] == total)
        if not (np.all(np.isfinite(az)) and
                np.all(np.isfinite(zen))):
            raise ValueError("All arguments must be finite")
        sz = np.sin(zen)
        x += w*np.cos(az)*sz
        y += w*np.sin(az)*sz
        z += w*np.cos(zen)
    tot_w = len(az_list)
    if weights is not None:
        tot_w = sum(weights)
    x = x / tot_w
    y = y / tot_w
    z = z / tot_w
    d = np.sqrt(np.square(x) + np.square(y) + np.square(z))
    x = x / d
    y = y / d
    z = z / d
    return angles_from_vectors(np.stack([x, y, z], axis=1))

def center_of_charge(batch):
    ## Groupby -> transform is the key trick to avoiding the dreaded 'for each event' loop,
    ## and thus getting ~30-60x speed boost improvement!
    ## If you need any min, max, mean or other simple value from the event group,
    ## you can precalculate it for each group and broadcast it
    batch['ev_t_min'] = batch.groupby('event_id')['time'].transform('min')
    batch['ev_t_max'] = batch.groupby('event_id')['time'].transform('max')
    
    ## Now we can just implement our formula! w0 and w1 are the time-weighted charge cases.
    ## Gather the values we need
    batch['w1'] = batch.charge * (batch.time - batch.ev_t_min) / (batch.ev_t_max - batch.ev_t_min)
    batch['w0'] = batch.charge - batch.w1
    batch['wx0'] = batch.x * batch.w0
    batch['wy0'] = batch.y * batch.w0
    batch['wz0'] = batch.z * batch.w0
    batch['wx1'] = batch.x * batch.w1
    batch['wy1'] = batch.y * batch.w1
    batch['wz1'] = batch.z * batch.w1
    df = batch[['w0', 'w1', 'wx0', 'wy0', 'wz0', 'wx1', 'wy1', 'wz1']]

    ## Calculate all the sums!
    df = df.groupby('event_id').sum()
    
    ## Now do the final divide of the weighted center by the sum of the weights.
    df[['wx0', 'wy0', 'wz0']] = df[['wx0', 'wy0', 'wz0']].div(df.w0, axis=0)
    df[['wx1', 'wy1', 'wz1']] = df[['wx1', 'wy1', 'wz1']].div(df.w1, axis=0)
    
    ## The direction the neutrino is traveling FROM is point0 - point1, instead of point1 - point0.
    ## Counter-intuitive to me, but fortunately, easy to notice and correct if your score is > 1.57 instead of less.
    df[['x', 'y', 'z']] = df[['wx0', 'wy0', 'wz0']].values - df[['wx1', 'wy1', 'wz1']].values

    df = df[['x', 'y', 'z']]
    df[['azimuth', 'zenith']] = angles_from_vectors(df.values)

    return(df[['azimuth', 'zenith']])

def least_squares(batch, weighted=False):
    batch['xt'] = batch.x * batch.time
    batch['yt'] = batch.y * batch.time
    batch['zt'] = batch.z * batch.time
    batch['tt'] = batch.time * batch.time
    if weighted:
        df = batch[['x', 'y', 'z', 'time', 'xt', 'yt', 'zt', 'tt']] * batch.charge.values[:, None]
        df['charge'] = batch.charge
        df = df.groupby('event_id').sum()
        df = df.div(df.charge, axis=0)
    else:
        df = batch[['x', 'y', 'z', 'time', 'xt', 'yt', 'zt', 'tt']]
        df = df.groupby('event_id').mean()
    df[['x', 'y', 'z']] = (
                              (df[['xt', 'yt', 'zt']].values - (df[['x', 'y', 'z']].values * df['time'].values[:, None]))
                            / (df['tt'].values - (df.time.values * df.time.values))[:, None]
                          )
    ## Reverse it
    df = -df[['x', 'y', 'z']]
    df[['azimuth', 'zenith']] = angles_from_vectors(df.values)
    return df[['azimuth', 'zenith']]


def process_batch(batch_id, sensor, max_events=None):
    print('load batch...')
    batch = pd.read_parquet(f'{INPUT_DIR}/{MODE}/batch_{batch_id}.parquet')

    ## Limit to max_events
    if max_events is not None:
        batch_i = batch.reset_index()
        event_ids = batch_i.event_id.drop_duplicates()
        end_index = event_ids.index[max_events]
        batch = batch_i[:end_index].set_index('event_id')
    print(batch.shape)

    ## Merge in sensor x,y,z data
    batch = batch.reset_index().merge(sensor, how='left', on='sensor_id', left_index=False).set_index('event_id')

    ## The logic for auxiliary = False is very basic, we can improve on it. Initial discussion here: 
    if FIND_BEST_POINTS:
        batch = find_best_points_pandas(batch)

    ## Limit to primary (aux=False) datapoints if there's enough of them.
    ## For event_ids with too few, set all rows to aux=False. This handles these cases without any for loop logic.
    ## MIN_PRIMARY_DATAPOINTS is a tuned value, based on optimizing the score on batches 101-110.
    ## But the result was 'as low as possible'.
    batch['primary_count'] = batch.groupby('event_id')['auxiliary'].transform('count') - batch.groupby('event_id')['auxiliary'].transform('sum')
    batch.loc[batch.primary_count < MIN_PRIMARY_DATAPOINTS, 'auxiliary'] = False
    batch = batch[batch.auxiliary == False]
    print(batch.shape)

    if USE_ENSEMBLE or ALGORITHM == 'center of charge':
        df = center_of_charge(batch)
        if USE_ENSEMBLE:
            df1 = df
    if USE_ENSEMBLE or ALGORITHM == 'least squares':
        df = least_squares(batch, weighted=USE_WEIGHTED_LEAST_SQUARES)
    if USE_ENSEMBLE:
        df[['azimuth', 'zenith']] = average_angles([df1.azimuth.values, df.azimuth.values],
                                                   [df1.zenith.values, df.zenith.values], 
                                                   weights=WEIGHTS)
    return df

def proximity(e, col):
    ## Since we explode the data to an NxN array, if N is too high it will take a long time and anyways we'll run out of memory.
    ## TODO: see how high we can go before we run out of memory
    ## TODO: consider subsampling instead of simply returning the baseline auxiliary setting?
    ##       And/or splitting on string_id to get much smaller groups
    if e.shape[0] > 2000:
        return e[:, col['auxiliary']]

    ## The magic None in the line below is a new thing I learned while working on this notebook.
    ## It is shorthand for np.newaxis, and broadcasts the array to another dimension.
    ## This allows us to create an NxN array for each event, so that we can
    ## check if *any* other row in the event meets our proximity in time and height requirements.
    ## For any matching pairs, then we set both rows as auxiliary = False. Rows without matches are auxiliary = True.
    deltas = np.abs(e[:, [col['string_id'], col['depth_id'], col['time']]] - e[:, None, [col['string_id'], col['depth_id'], col['time']]])
    dz = deltas[:, :, 1]

    ## if same depth or different string id, ignore by setting dz > the max threshold used later.
    dz[(dz == 0) | (deltas[:, :, 0] != 0)] = MAX_Z + MAX_DEEP_Z + 1

    ## if sensor is not a deep ice sensor, and time > MAX_T, ignore
    mask = (e[:, col['sensor_id']] < 4680)
    mask = np.broadcast_to(mask, (mask.shape[0], mask.shape[0])).T
    dz[mask & (deltas[:, :, 2] > MAX_T)] = MAX_Z + MAX_DEEP_Z + 1
    ## if sensor IS a deep ice sensor, and time > MAX_DEEP_T, ignore
    mask = (e[:, col['sensor_id']] >= 4680)
    mask = np.broadcast_to(mask, (mask.shape[0], mask.shape[0])).T
    dz[mask & (deltas[:, :, 2] > MAX_DEEP_T)] = MAX_Z + MAX_DEEP_Z + 1

    ## Now take the min (best) result for each row com
    dz = dz.min(axis=1)
    ## If no matches, the default, then everything is aux=True
    e[:, col['auxiliary']] = True
    ## If not deep ice and distance less than threshold, or deep ice and distance less than other threshold, then we have a match!
    e[((e[:, col['sensor_id']] < 4680) & (dz <= MAX_Z)) | ((e[:, col['sensor_id']] >= 4680) & (dz <= MAX_DEEP_Z)), col['auxiliary']] = False
    ## Return only the data needed to speed up the np.concatenate called next.
    return e[:, col['auxiliary']]

## You can ignore this one unless interested in a deep dive on performance optimization
## It is provided as a way of comparing the changes versus the pure numpy version.
## Comments removed from this copy to conserve vertical space.
def proximity_unoptimized(df):
    if df.shape[0] > 2000:
        return df
    deltas = np.abs(df[['string_id', 'depth_id', 'time']].values - df[['string_id', 'depth_id', 'time']].values[:, None, :])
    mask = (df.sensor_id < 4680)
    dz = deltas[:, :, 1]

    dz[(dz == 0) | (deltas[:, :, 0] != 0)] = MAX_Z + MAX_DEEP_Z + 1

    mask = (df.sensor_id < 4680)
    mask = np.broadcast_to(mask, (mask.shape[0], mask.shape[0])).T
    dz[mask & (deltas[:, :, 2] > MAX_T)] = MAX_Z + MAX_DEEP_Z + 1
    mask = (df.sensor_id >= 4680)
    mask = np.broadcast_to(mask, (mask.shape[0], mask.shape[0])).T
    dz[mask & (deltas[:, :, 2] > MAX_DEEP_T)] = MAX_Z + MAX_DEEP_Z + 1

    dz = dz.min(axis=1)
    df.auxiliary = True
    df.loc[((df.sensor_id < 4680) & (dz <= MAX_Z)) | ((df.sensor_id >= 4680) & (dz <= MAX_DEEP_Z)), 'auxiliary'] = False
    return df


def find_best_points_pandas(batch):
    if USE_UNOPTIMIZED:
        cols = ['sensor_id', 'time', 'auxiliary', 'string_id', 'depth_id']
        batch[cols] = batch[cols].groupby('event_id').progress_apply(proximity_unoptimized)
        return batch

    ## np.split used as a pure numpy equivalent of groupby
    ## Note this version didn't minimize the size of the inputs, but does make sure all dtypes are the same for an efficient np array.
    column_to_index = { k:v for v,k in enumerate(batch.columns)}
    events = np.split(batch.values.astype('float32'), np.unique(batch.index.values, return_index=True)[1][1:])

    ## Run each event sequentially in a list comprehension, then join back together with np.concatenate.
    ## So far tried and failed to find a reasonable solution to avoid this groupby > apply > join loop.
    ## Note that this line overrides the dtype of 'auxiliary' column to float32
    batch.auxiliary = np.concatenate([proximity(e, column_to_index) for e in tqdm(events)])
    return batch

def time_weighted_centering(batch, charge_weighted=True):
    ## Polars equivalent to groupby->transform is called 'over'. We again use this to get min and max without a for loop.
    batch = batch.with_columns([pl.col('time').min().over('event_id').alias('ev_t_min'),
                                pl.col('time').max().over('event_id').alias('ev_t_max')])
    if charge_weighted:
        batch = batch.with_columns((pl.col('charge') * (pl.col('time') - pl.col('ev_t_min'))
                                    / (pl.col('ev_t_max') - pl.col('ev_t_min'))).alias('w1'))
        batch = batch.with_columns((pl.col('charge') - pl.col('w1')).alias('w0'))
    else:
        batch = batch.with_columns(((pl.col('time') - pl.col('ev_t_min'))
                                    / (pl.col('ev_t_max') - pl.col('ev_t_min'))).alias('w1'))
        batch = batch.with_columns((pl.lit(1) - pl.col('w1')).alias('w0'))

    batch = batch.select(
        [
            pl.col('event_id'),
            pl.col('w0'),
            pl.col('w1'),
            (pl.col('x') * pl.col('w0')).alias('wx0'),
            (pl.col('y') * pl.col('w0')).alias('wy0'),
            (pl.col('z') * pl.col('w0')).alias('wz0'),
            (pl.col('x') * pl.col('w1')).alias('wx1'),
            (pl.col('y') * pl.col('w1')).alias('wy1'),
            (pl.col('z') * pl.col('w1')).alias('wz1'),
        ]
    ).collect().groupby('event_id', maintain_order=True).sum()

    ## The direction the neutrino is traveling FROM is point0 - point1, instead of point1 - point0.
    ## Counter-intuitive to me, but fortunately, easy to notice and correct if your score is > 1.57 instead of less.
    batch_values = batch.select(
        [
            ((pl.col('wx0') / pl.col('w0')) - (pl.col('wx1') / pl.col('w1'))).alias('x'),
            ((pl.col('wy0') / pl.col('w0')) - (pl.col('wy1') / pl.col('w1'))).alias('y'),
            ((pl.col('wz0') / pl.col('w0')) - (pl.col('wz1') / pl.col('w1'))).alias('z'),
        ]
    ).to_numpy()
    return angles_from_vectors(batch_values), batch


## We use the same numpy proximity function, the only difference is we convert from and to a Polars df instead of a Pandas df.
def find_best_points_polars(batch):
    ## Minimize the size of the inputs, and make sure all data types are the same for an efficient np array
    df = batch.select([pl.col('sensor_id'), pl.col('time'), pl.col('auxiliary'), pl.col('string_id'), pl.col('depth_id')])
    column_to_index = { k:v for v,k in enumerate(df.columns)}
    batch_values = df.to_numpy().astype('float32')

    ## Pure numpy equivalent of groupby
    events = np.split(batch_values, np.unique(batch.select(pl.col('event_id')), return_index=True)[1][1:])

    ## Run each event sequentially in a list comprehension, then join back together with np.concatenate. So far tried and failed to find a reasonable solution to avoid this groupby > apply > join loop.
    ## Rename 'auxiliary' -> 'best', and flip the boolean value
    batch = batch.with_columns(pl.Series(np.concatenate([proximity(e, column_to_index) for e in tqdm(events)]).astype('bool')).alias('best')).with_columns(pl.col('best').is_not())

    return batch



if USE_POLARS:
    sub = []
    #for batch_id in range(batch_id_start,batch_id_end):
def get_line_fit_angles(batch_id):
    print(MODE)
    ## Scan parquet is part of Polars lazy evaluation, so no cost yet,
    ## and it can figure out optimizations when we finally 'collect' it later.
    meta = pl.scan_parquet(f'{INPUT_DIR}/{MODE}_meta.parquet')

    print('load sensor data...')
    sensor = (pl.scan_csv(f'{INPUT_DIR}/sensor_geometry.csv')
                .with_columns([
                    pl.col('sensor_id').cast(pl.Int16),
                    (pl.col('sensor_id') // 60).alias('string_id'),
                    (pl.col('sensor_id') % 60).alias('depth_id'),                
                ])
             )

    print(sensor)

    if MODE == 'train':
        batch_id_start = TRAIN_BATCH_START
        batch_id_end = batch_id_start + TRAIN_N_BATCHES
    else:
        batch_id_start = meta.select(pl.col('batch_id')).collect()[0, 0]
        batch_id_end = meta.select(pl.col('batch_id')).collect()[-1, 0] + 1

    print(batch_id_start, batch_id_end)
    
    print(batch_id)
    t = time.time()
    max_events=TRAIN_MAX_EVENTS
    print('load batch...')
    batch = pl.scan_parquet(f'{INPUT_DIR}/{MODE}/batch_{batch_id}.parquet')

    ## Limit to max_events
    if max_events is not None:
        batch = batch.collect()
        last_event_id = batch.select(pl.col('event_id')).unique()[TRAIN_MAX_EVENTS-1, 0]
        batch = batch.lazy().filter(pl.col('event_id') <= last_event_id)

    ## Merge in sensor x,y,z data
    batch = batch.join(sensor, on='sensor_id', how='left').collect()

    ## The logic for auxiliary = False is very basic, we can improve on it. Initial discussion here: 
    if FIND_BEST_POINTS:
        batch = find_best_points_polars(batch)

    ## Use data point weights instead of filtering. Still set most points to 0 weight, unless they are the only data points available.
    ## MIN_PRIMARY_DATAPOINTS and AUX_FALSE_WEIGHT are tuned values, based on optimizing the score on batches 101-110.
    batch = batch.lazy().with_columns((pl.col('best').count().over('event_id') - pl.col('best').sum().over('event_id')).alias('best_count'))
    batch = batch.lazy().with_columns((pl.col('auxiliary').count().over('event_id') - pl.col('auxiliary').sum().over('event_id')).alias('non_aux_count'))
    batch = batch.with_columns(( pl.when( pl.col('best') | ((pl.col('best_count') < MIN_PRIMARY_DATAPOINTS) & (pl.col('non_aux_count') < MIN_PRIMARY_DATAPOINTS)) )
                                            .then(1.0)
                                            .otherwise(pl.when(pl.col('auxiliary').is_not())
                                                         .then(AUX_FALSE_WEIGHT)
                                                         .otherwise(0.0)
                                                    ) 
                                      ).alias('trust'))
    batch = batch.lazy().with_columns((pl.col('charge') * pl.col('trust')).alias('charge'))

    preds, events = time_weighted_centering(batch)
    if USE_ENSEMBLE:
        preds1 = preds
#             preds, events = time_weighted_centering(batch, charge_weighted=False)
        ## Instead of charge_weighted=False, set charge*aux to just equal aux.
        batch = batch.lazy().with_columns(pl.col('trust').alias('charge'))
        preds, events = time_weighted_centering(batch)
        preds = average_angles([preds1[:, 0], preds[:, 0]], [preds1[:, 1], preds[:, 1]], weights=WEIGHTS)

    '''
    if MODE == 'test':
        sub.append(events.select([pl.col('event_id'), pl.Series(preds[:, 0]).alias('azimuth'),
                                                     pl.Series(preds[:, 1]).alias('zenith')]))
    else:
        meta = meta.filter((pl.col('batch_id') >= batch_id_start) & (pl.col('batch_id') < batch_id_end))
        if isinstance(meta, pl.LazyFrame):
            meta = meta.collect()
        meta_values = meta.filter(pl.col('batch_id') == batch_id).select([pl.col('azimuth'), pl.col('zenith')]).to_numpy()
        if TRAIN_MAX_EVENTS is not None:
            print(angular_dist_score(meta_values[:TRAIN_MAX_EVENTS, 0], meta_values[:TRAIN_MAX_EVENTS, 1], preds[:, 0], preds[:, 1]))
        else:
            print(angular_dist_score(meta_values[:, 0], meta_values[:, 1], preds[:, 0], preds[:, 1]))
    '''
    print(f'Time: {time.time() - t:0.2f}s')
    return preds

#fitted_angle = get_line_fit_angles(1)


In [8]:
%%time
# Read Train Meta Data
train_meta_df = pd.read_parquet(home_dir + 'train_meta.parquet')

batch_counts = train_meta_df.batch_id.value_counts().sort_index()

batch_max_index = batch_counts.cumsum()
batch_max_index[train_meta_df.batch_id.min() - 1] = 0
batch_max_index = batch_max_index.sort_index()

def train_meta_df_spliter(batch_id):
    return train_meta_df.loc[batch_max_index[batch_id - 1]:batch_max_index[batch_id] - 1]

for batch_id in train_batch_ids:
    #line fit
    line_fit_angles = get_line_fit_angles(batch_id)
    
    print("Reading batch ", batch_id, end="")
    # get batch meta data and data
    batch_meta_df = train_meta_df_spliter(batch_id)
    batch_df = pd.read_parquet(train_format.format(batch_id=batch_id))

    # register pulses
    batch_x = np.zeros((len(batch_meta_df), max_pulse_count, n_features), dtype="float16")
    batch_y = np.zeros((len(batch_meta_df), 2), dtype="float32")
    
    batch_x[:, :, 2] = -1

    def read_event_local(event_idx):
        return read_event(event_idx, batch_meta_df, max_pulse_count, batch_df, train=True)

    # Proces Events
    iterator = range(len(batch_meta_df))
    with multiprocessing.Pool() as pool:
        for event_idx, pulse_count, event_x, event_y in pool.map(read_event_local, iterator):
            batch_x[event_idx, :pulse_count, 0] = event_x["time"]
            batch_x[event_idx, :pulse_count, 1] = event_x["charge"]
            batch_x[event_idx, :pulse_count, 2] = event_x["auxiliary"]
            batch_x[event_idx, :pulse_count, 3] = event_x["x"]
            batch_x[event_idx, :pulse_count, 4] = event_x["y"]
            batch_x[event_idx, :pulse_count, 5] = event_x["z"]

            batch_y[event_idx] = event_y
                
    del batch_meta_df, batch_df

    batch_x, batch_y = normalize_data(batch_x, batch_y)
    

    

    batch_x = batch_x.reshape(np.shape(batch_x)[0], max_pulse_count*(n_features+3))
    

    # Save    
    print(" DONE! Saving...")
    save_to_tfrecord(batch_x, line_fit_angles, batch_y, f'{batch_id}.tfrecord')
    del batch_x, line_fit_angles, batch_y
    gc.collect()
    #np.savez(point_picker_format.format(batch_id=batch_id), x=batch_x, y=batch_y)

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
118
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 124.76s
Reading batch  118 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
119
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 103.73s
Reading batch  119 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
120
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 97.28s
Reading batch  120 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
121
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 94.25s
Reading batch  121 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
122
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 94.78s
Reading batch  122 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
123
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 95.00s
Reading batch  123 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
124
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 103.40s
Reading batch  124 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
125
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 108.33s
Reading batch  125 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
126
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 99.34s
Reading batch  126 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
127
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 99.45s
Reading batch  127 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
128
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 97.10s
Reading batch  128 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
129
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 97.90s
Reading batch  129 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
130
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 94.83s
Reading batch  130 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
131
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 106.60s
Reading batch  131 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
132
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 108.06s
Reading batch  132 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
133
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 98.34s
Reading batch  133 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
134
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 100.39s
Reading batch  134 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
135
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 95.06s
Reading batch  135 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
136
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 94.67s
Reading batch  136 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
137
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 95.70s
Reading batch  137 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
138
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 116.54s
Reading batch  138 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
139
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 100.30s
Reading batch  139 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
140
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 98.63s
Reading batch  140 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
141
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 95.15s
Reading batch  141 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
142
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 96.31s
Reading batch  142 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
143
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 97.67s
Reading batch  143 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
144
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 107.17s
Reading batch  144 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
145
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 99.75s
Reading batch  145 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
146
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 99.91s
Reading batch  146 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
147
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 95.97s
Reading batch  147 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
148
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 96.57s
Reading batch  148 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
149
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 111.45s
Reading batch  149 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
150
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 102.52s
Reading batch  150 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
151
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 99.98s
Reading batch  151 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
152
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 97.35s
Reading batch  152 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
153
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 96.59s
Reading batch  153 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
154
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 97.87s
Reading batch  154 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
155
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 98.59s
Reading batch  155 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

train
load sensor data...
naive plan: (run LazyFrame.describe_optimized_plan() to see the optimized plan)

   WITH_COLUMNS:
   [col("sensor_id").strict_cast(Int16), [(col("sensor_id")) // (60i32)].alias("string_id"), [(col("sensor_id")) % (60i32)].alias("depth_id")]
    CSV SCAN /kaggle/input/icecube-neutrinos-in-deep-ice/sensor_geometry.csv
    PROJECT */4 COLUMNS

1 2
156
load batch...


  0%|          | 0/200000 [00:00<?, ?it/s]

Time: 105.58s
Reading batch  156 DONE! Saving...


  0%|          | 0/200000 [00:00<?, ?it/s]

CPU times: user 3h 37min 30s, sys: 21min 56s, total: 3h 59min 27s
Wall time: 7h 55min 13s


batch_x = np.zeros((200000, max_pulse_count, n_features), dtype="float16")
batch_y = np.zeros((200000, 2), dtype="float16")
batch_x, batch_y, batch_origin_y = normalize_data(batch_x, batch_y)
batch_x = batch_x.reshape(np.shape(batch_x)[0], max_pulse_count*(n_features+6))

# Save    
print(" DONE! Saving...")
save_to_tfrecord(batch_x, batch_origin_y, f'{batch_id}.tfrecord')

In [9]:
def decode_tfrecord(record_bytes):
    features = tf.io.parse_single_example(record_bytes, {
        'event_pulses': tf.io.FixedLenFeature([], tf.string),
        'fitted_azimuth': tf.io.FixedLenFeature([], tf.float32),               
        'fitted_zenith' : tf.io.FixedLenFeature([], tf.float32),
        'origin_azimuth': tf.io.FixedLenFeature([], tf.float32),               
        'origin_zenith' : tf.io.FixedLenFeature([], tf.float32),
    })        

    event_pulses = tf.io.parse_tensor(features['event_pulses'], out_type=tf.float16)
    event_pulses = tf.cast(event_pulses, tf.float16)
    event_pulses = tf.reshape(event_pulses, [max_pulse_count, 9])
    
    fitted_azimuth = features['fitted_azimuth']
    fitted_zenith  = features['fitted_zenith']
    fitted_targets = tf.stack([fitted_azimuth, fitted_zenith])
    
    origin_azimuth = features['origin_azimuth']    
    origin_zenith =  features['origin_zenith']
    origin_targets = tf.stack([origin_azimuth, origin_zenith])
    
    return event_pulses, fitted_targets, origin_targets

In [10]:
# Sample TFRecord Dataset
def get_train_dataset():
    # Read all TFRecord file paths
    FNAMES_TRAIN_TFRECORDS = tf.io.gfile.glob('/kaggle/working/1.tfrecord')
    # initialize TFRecord dataset
    train_dataset = tf.data.TFRecordDataset(FNAMES_TRAIN_TFRECORDS, num_parallel_reads=1, compression_type='GZIP')
    # Decode samples by mapping with decode function
    train_dataset = train_dataset.map(decode_tfrecord)
    # Batch samples
    train_dataset = train_dataset.batch(1000)
    return train_dataset

# Show Example Batch
train_dataset = get_train_dataset()
iteration = iter(train_dataset)
pulses=[]
targetss=[]
fitted_targetss=[]
origin_targetss=[]
for i in tqdm(range(0, 200)):
    event_pulses,fitted_targets, origin_targets = next(iteration)    
    pulses.append(event_pulses.numpy())
    fitted_targetss.append(fitted_targets.numpy())
    origin_targetss.append(origin_targets.numpy())
    
pulses = np.vstack(pulses)
fitted_targetss = np.vstack(fitted_targetss)
origin_targetss = np.vstack(origin_targetss)

print(np.shape(pulses))
print(np.shape(fitted_targetss))
print(np.shape(origin_targetss))
'''
sample_batch_x = batch_x[:1000]
sample_batch_y = batch_y[:1000]
sample_batch_origin_y = batch_origin_y[:1000]

sample_batch_x = sample_batch_x.reshape(1000, max_pulse_count, 9)

print(np.max(sample_batch_x - pulses))
print(np.max(sample_batch_y - targetss))
print(np.max(sample_batch_origin_y - origin_targetss))
'''