In [1]:
import pyarrow.parquet as pq
import sqlite3
import pandas as pd
import sqlalchemy
from tqdm import tqdm
import os
from typing import Any, Dict, List, Optional
import numpy as np

from graphnet.data.sqlite.sqlite_utilities import create_table

def load_input(meta_batch: pd.DataFrame, geometry_table : pd.DataFrame, input_data_folder: str) -> pd.DataFrame:
        """
        Will load the corresponding detector readings associated with the meta data batch.
        """
        batch_id = pd.unique(meta_batch['batch_id'])

        assert len(batch_id) == 1, "contains multiple batch_ids. Did you set the batch_size correctly?"

        detector_readings = pd.read_parquet(path = f'{input_data_folder}/batch_{batch_id[0]}.parquet')
        sensor_positions = geometry_table.loc[detector_readings['sensor_id'], ['x', 'y', 'z']]
        sensor_positions.index = detector_readings.index

        for column in sensor_positions.columns:
            if column not in detector_readings.columns:
                detector_readings[column] = sensor_positions[column]

        detector_readings['auxiliary'] = detector_readings['auxiliary'].replace({True: 1, False: 0})
        return detector_readings.reset_index()

def add_to_table(database_path: str,
                      df: pd.DataFrame,
                      table_name:  str,
                      is_primary_key: bool,
                      ) -> None:
    """Writes meta data to sqlite table. 

    Args:
        database_path (str): the path to the database file.
        df (pd.DataFrame): the dataframe that is being written to table.
        table_name (str, optional): The name of the meta table. Defaults to 'meta_table'.
        is_primary_key(bool): Must be True if each row of df corresponds to a unique event_id. Defaults to False.
    """
    try:
        print(database_path)
        create_table(   columns=  df.columns,
                        database_path = database_path, 
                        table_name = table_name,
                        integer_primary_key= is_primary_key,
                        index_column = 'event_id')
    except sqlite3.OperationalError as e:
        if 'already exists' in str(e):
            pass
        else:
            raise e
    engine = sqlalchemy.create_engine("sqlite:///" + database_path)
    df.to_sql(table_name, con=engine, index=False, if_exists="append", chunksize = 200000)
    engine.dispose()
    return

def convert_to_sqlite(meta_data_path: str,
                      database_path: str,
                      input_data_folder: str,
                      geometry_table : pd.DataFrame,
                      batch_size: int = 200000,
                      accepted_batch_ids=None) -> None:
    """Converts a selection of the Competition's parquet files to a single sqlite database.

    Args:
        meta_data_path (str): Path to the meta data file.
        batch_size (int): the number of rows extracted from meta data file at a time. Keep low for memory efficiency.
        database_path (str): path to database. E.g. '/my_folder/data/my_new_database.db'
        input_data_folder (str): folder containing the parquet input files.
        accepted_batch_ids (List[int]): The batch_ids you want converted. Defaults to None (all batches will be converted)
    """
    meta_data_iter = pq.ParquetFile(meta_data_path).iter_batches(batch_size = batch_size)
    
    if not database_path.endswith('.db'):
        database_path = database_path+'.db'
        
    converted_batches = [] 
    progress_bar = tqdm(total = None)
    for meta_data_batch in meta_data_iter:
        unique_batch_ids = pd.unique(meta_data_batch['event_id']).tolist()
        meta_data_batch  = meta_data_batch.to_pandas()
        if accepted_batch_ids is not None:
            meta_data_batch = meta_data_batch.loc[meta_data_batch['batch_id'].isin(accepted_batch_ids)].copy()
        if len(meta_data_batch) == 0:
            continue
        add_to_table(database_path = database_path,
                    df = meta_data_batch,
                    table_name='meta_table',
                    is_primary_key= True)
        pulses = load_input(meta_batch=meta_data_batch, geometry_table=geometry_table, input_data_folder= input_data_folder)
        del meta_data_batch # memory
        add_to_table(database_path = database_path,
                    df = pulses,
                    table_name='pulse_table',
                    is_primary_key= False)
        del pulses # memory
        progress_bar.update(1)
    progress_bar.close()
    del meta_data_iter # memory
    print(f'Conversion Complete!. Database available at\n {database_path}')

[1;34mgraphnet[0m: [32mINFO    [0m 2023-02-23 18:20:10 - get_logger - Writing log to [1mlogs/graphnet_20230223-182010.log[0m


  warn(f"Failed to load image Python extension: {e}")


In [3]:
!rm '../data/test_database.db'
input_data_folder = '../data/test'
geometry_table = pd.read_csv('../data/sensor_geometry.csv')
meta_data_path = '../data/test_meta.parquet'

database_path = '../data/test_database'
convert_to_sqlite(meta_data_path,
                  database_path=database_path,
                  input_data_folder=input_data_folder,
                  geometry_table=geometry_table,
                  accepted_batch_ids=None)

rm: cannot remove '../data/test_database.db': No such file or directory


1it [00:00,  4.24it/s]

../data/test_database.db
../data/test_database.db
Conversion Complete!. Database available at
 ../data/test_database.db





In [8]:
#!rm '../data/train_database.db'
input_data_folder = '../data/train'
geometry_table = pd.read_csv('../data/sensor_geometry.csv')
meta_data_path = '../data/train_meta.parquet'

database_path = '../data/train_database_52_53'
convert_to_sqlite(meta_data_path,
                  database_path=database_path,
                  input_data_folder=input_data_folder,
                  geometry_table=geometry_table,
                  accepted_batch_ids=[52, 53])

0it [00:00, ?it/s]

../data/train_database_52_53.db
../data/train_database_52_53.db


1it [13:27, 807.32s/it]

../data/train_database_52_53.db
../data/train_database_52_53.db


2it [32:51, 985.83s/it]

Conversion Complete!. Database available at
 ../data/train_database_52_53.db





In [4]:
from pytorch_lightning.callbacks import EarlyStopping
from torch.optim.adam import Adam
from graphnet.data.constants import FEATURES, TRUTH
from graphnet.models import StandardModel
from graphnet.models.detector.icecube import IceCubeKaggle
from graphnet.models.gnn import DynEdge
from graphnet.models.graph_builders import KNNGraphBuilder
from graphnet.models.task.reconstruction import DirectionReconstructionWithKappa, ZenithReconstructionWithKappa, AzimuthReconstructionWithKappa
from graphnet.training.callbacks import ProgressBar, PiecewiseLinearLR
from graphnet.training.loss_functions import VonMisesFisher3DLoss, VonMisesFisher2DLoss
from graphnet.training.labels import Direction
from graphnet.training.utils import make_dataloader
from graphnet.utilities.logging import get_logger
from pytorch_lightning import Trainer
import pandas as pd

logger = get_logger()

def build_model(config: Dict[str,Any], train_dataloader: Any) -> StandardModel:
    """Builds GNN from config"""
    # Building model
    detector = IceCubeKaggle(
        graph_builder=KNNGraphBuilder(nb_nearest_neighbours=8),
    )
    gnn = DynEdge(
        nb_inputs=detector.nb_outputs,
        global_pooling_schemes=["min", "max", "mean"],
    )

    if config["target"] == 'direction':
        task = DirectionReconstructionWithKappa(
            hidden_size=gnn.nb_outputs,
            target_labels=config["target"],
            loss_function=VonMisesFisher3DLoss(),
        )
        prediction_columns = [config["target"] + "_x", 
                              config["target"] + "_y", 
                              config["target"] + "_z", 
                              config["target"] + "_kappa" ]
        additional_attributes = ['zenith', 'azimuth', 'event_id']

    model = StandardModel(
        detector=detector,
        gnn=gnn,
        tasks=[task],
        optimizer_class=Adam,
        optimizer_kwargs={"lr": 1e-03, "eps": 1e-03},
        scheduler_class=PiecewiseLinearLR,
        scheduler_kwargs={
            "milestones": [
                0,
                len(train_dataloader) / 2,
                len(train_dataloader) * config["fit"]["max_epochs"],
            ],
            "factors": [1e-02, 1, 1e-02],
        },
        scheduler_config={
            "interval": "step",
        },
    )
    model.prediction_columns = prediction_columns
    model.additional_attributes = additional_attributes
    
    return model

def load_pretrained_model(config: Dict[str,Any], state_dict_path: str) -> StandardModel:
    train_dataloader, _ = make_dataloaders(config = config)
    model = build_model(config = config, 
                        train_dataloader = train_dataloader)
    #model._inference_trainer = Trainer(config['fit'])
    model.load_state_dict(state_dict_path)
    model.prediction_columns = [config["target"] + "_x", 
                              config["target"] + "_y", 
                              config["target"] + "_z", 
                              config["target"] + "_kappa" ]
    model.additional_attributes = ['event_id'] #'zenith', 'azimuth',  not available in test data
    return model

def make_dataloaders(config: Dict[str, Any]) -> List[Any]:
    """Constructs training and validation dataloaders for training with early stopping."""
    
    train_dataloader = make_dataloader(db = config['path'],
                                            selection = pd.read_csv(config['train_selection'])[config['index_column']].ravel().tolist() if config['train_selection'] else None,
                                            pulsemaps = config['pulsemap'],
                                            features = features,
                                            truth = truth,
                                            batch_size = config['batch_size'],
                                            num_workers = config['num_workers'],
                                            shuffle = True,
                                            labels = {'direction': Direction()},
                                            index_column = config['index_column'],
                                            truth_table = config['truth_table'],
                                            )
    
    validate_dataloader = make_dataloader(db = config['path'],
                                            selection = pd.read_csv(config['validate_selection'])[config['index_column']].ravel().tolist() if config['validate_selection'] else None,
                                            pulsemaps = config['pulsemap'],
                                            features = features,
                                            truth = truth,
                                            batch_size = config['batch_size'],
                                            num_workers = config['num_workers'],
                                            shuffle = False,
                                            labels = {'direction': Direction()},
                                            index_column = config['index_column'],
                                            truth_table = config['truth_table'],
                                          
                                            )
    return train_dataloader, validate_dataloader

def inference(model, config: Dict[str, Any]) -> pd.DataFrame:
    """Applies model to the database specified in config['inference_database_path'] and saves results to disk."""
    # Make Dataloader
    test_dataloader = make_dataloader(db = config['inference_database_path'],
                                            selection = None, # Entire database
                                            pulsemaps = config['pulsemap'],
                                            features = features,
                                            truth = truth,
                                            batch_size = config['batch_size'],
                                            num_workers = config['num_workers'],
                                            shuffle = False,
                                            labels = None, # Cannot make labels in test data
                                            index_column = config['index_column'],
                                            truth_table = config['truth_table'],
                                            )
    
    # Get predictions
    results = model.predict_as_dataframe(
        gpus = [0],
        dataloader = test_dataloader,
        prediction_columns=model.prediction_columns,
        additional_attributes=['event_id']
    )
    return results

In [5]:
def prepare_dataframe(df, angle_post_fix = '_reco', vec_post_fix = '') -> pd.DataFrame:
    r = np.sqrt(df['direction_x'+ vec_post_fix]**2 + df['direction_y'+ vec_post_fix]**2 + df['direction_z' + vec_post_fix]**2)
    df['zenith' + angle_post_fix] = np.arccos(df['direction_z'+ vec_post_fix]/r)
    df['azimuth'+ angle_post_fix] = np.arctan2(df['direction_y'+ vec_post_fix],df['direction_x' + vec_post_fix]) #np.sign(results['true_y'])*np.arccos((results['true_x'])/(np.sqrt(results['true_x']**2 + results['true_y']**2)))
    df['azimuth'+ angle_post_fix][df['azimuth'  + angle_post_fix]<0] = df['azimuth'  + angle_post_fix][df['azimuth'  +  angle_post_fix]<0] + 2*np.pi 

    drop_these_columns = []
    for column in results.columns:
        if column not in ['event_id', 'zenith', 'azimuth']:
            drop_these_columns.append(column)
    return df.drop(columns = drop_these_columns).iloc[:,[0,2,1]].set_index('event_id')

In [6]:
# Constants
features = FEATURES.KAGGLE
truth = TRUTH.KAGGLE

# Configuration
config = {
        "path": '../data/test_database.db',
        "inference_database_path": '../data/test_database.db',
        "pulsemap": 'pulse_table',
        "truth_table": 'meta_table',
        "features": features,
        "truth": truth,
        "index_column": 'event_id',
        "run_name_tag": 'graphnet_baseline_submission_test',
        "batch_size": 200,
        "num_workers": 4,
        "target": 'direction',
        "early_stopping_patience": 5,
        "fit": {
                "max_epochs": 50,
                "gpus": [0],
                "distribution_strategy": None,
                },
        'train_selection': None,
        'validate_selection':  None,
        'test_selection': None,
        'base_dir': 'training'
}
model   = load_pretrained_model(config = config, 
                                state_dict_path='../dynedge_pretrained_batch_1_to_50/state_dict.pth')
results = inference(model, config)



Predicting: 0it [00:00, ?it/s]

In [7]:
submission_df = prepare_dataframe(results, angle_post_fix = '')
submission_df.to_csv('../submission.csv')