# Creating records 
## Pipeline 2.0
##### ASTROMER dev team

*July 07 2023*

In [1]:
cd /home

/home


In [2]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import os

from src.data.record import DataPipeline

%load_ext autoreload
%autoreload 2

2023-07-07 17:53:17.920837: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F AVX512_VNNI FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-07-07 17:53:18.028724: I tensorflow/core/util/port.cc:104] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.


In [3]:
METAPATH = './data/raw_data/alcock/new_metadata.parquet'
OBSPATH  = './data/raw_data/alcock/parquets/'

In [4]:
metadata = pd.read_parquet(METAPATH)

In [5]:
metadata['Class'] = pd.Categorical(metadata['Class'])
metadata['Label'] = metadata['Class'].cat.codes
metadata['Path'] = metadata['Path'].apply(lambda x: os.path.join(OBSPATH, x)) 

In [6]:
metadata.sample()

Unnamed: 0,ID,Class,Path,Band,newID,Label
19033,81.8878.45,0,./data/raw_data/alcock/parquets/81.8878.45.dat,1.0,19033,0


### Using DataPipeline class

In [7]:
# Create an instance of DataPipeline
config_path = './data/raw_data/alcock/config.toml'
pipeline = DataPipeline(metadata=metadata,
                        config_path=config_path)

[INFO] 21444 samples loaded


In [8]:
pipeline.sequential_features_dtype

['float', 'float', 'float']

To create training, validation, and testing splits we need to use the `train_val_test` method 
```
train_val_test(val_frac=0.2,
               test_frac=0.2,
               test_meta=None,
               val_meta=None,
               shuffle=True,
               id_column_name=None,
               k_fold=1)
``` 
where `val_frac` and `test_frac` are percentages containing the fraction of the metadata to be used as validation and testing subset respectively. 

Additionally, you can use `val_meta` and `test_meta` to use a preselected subset. **Notice that if you employ your own test/val subset, you should match one of the identifier columns of the main DataFrame** (by default it will assume the first column of the dataset is the identifier). 

Both `test_meta` and `val_meta` must be list of `DataFrames`

For cross-validation purposes, we can also sample different folds from the same dataset by using the `train_val_test(..., k_fold=1)` parameter.

If $k>1$ and **you want to use a predefined test/val selection**, you should pass a list of `DataFrame`s associated with each `test_meta`/`val_meta` fold as appropriate.

Don't worry about removing duplicated indices, the `train_val_test` method will do it for you.

In [9]:
test_metadata = metadata.sample(n=100)

In [10]:
k_folds = 3
pipeline.train_val_test(val_frac=0.2, 
                        test_meta=[test_metadata]*k_folds, 
                        k_fold=k_folds)

[INFO] Using ID col as sample identifier
[INFO] Shuffling
[INFO] Shuffling
[INFO] Shuffling


In [11]:
a = pipeline.metadata['subset_0']
for k in range(k_folds):
    if k == 0: continue
    b = pipeline.metadata[f'subset_{k}']
    c = np.array_equal(a[a != 'test'].values, b[b!= 'test'].values)
    a = b
    print('Do {}-folds partitions have the same elements: '.format(k_folds), c)

Do 3-folds partitions have the same elements:  False
Do 3-folds partitions have the same elements:  False


Now our metadata will contain an extra-column `subset` for the corresponding subset

In [12]:
pipeline.metadata.sample(3)

Unnamed: 0,ID,Class,Path,Band,newID,Label,subset_0,subset_1,subset_2
17937,80.6473.1946,4,./data/raw_data/alcock/parquets/80.6473.1946.dat,1.0,17937,4,train,train,train
7746,22.4985.129,0,./data/raw_data/alcock/parquets/22.4985.129.dat,1.0,7746,0,train,train,train
14255,76.10084.1146,2,./data/raw_data/alcock/parquets/76.10084.1146.dat,1.0,14255,2,train,train,validation


In [13]:
for k in range(k_folds):
    train_subset = pipeline.metadata[pipeline.metadata[f'subset_{k}'] == 'train']
    val_subset   = pipeline.metadata[pipeline.metadata[f'subset_{k}'] == 'validation']
    test_subset  = pipeline.metadata[pipeline.metadata[f'subset_{k}'] == 'test']

    print(train_subset.shape, val_subset.shape, test_subset.shape)

    print('test in train?: ', test_subset['ID'].isin(train_subset['ID']).all(),'\n',
          'val in train?: ', val_subset['ID'].isin(train_subset['ID']).all(),'\n',
          'val in test?: ', val_subset['ID'].isin(test_subset['ID']).all())

(17075, 9) (4269, 9) (100, 9)
test in train?:  False 
 val in train?:  False 
 val in test?:  False
(17075, 9) (4269, 9) (100, 9)
test in train?:  False 
 val in train?:  False 
 val in test?:  False
(17075, 9) (4269, 9) (100, 9)
test in train?:  False 
 val in train?:  False 
 val in test?:  False


Notice if you want to redo, you must initialize the object `DataPipeline` again

Now it is **time to the pipeline**

In [16]:
pipeline.context_features, pipeline.context_features_dtype

(['ID', 'Label', 'Class', 'Band'], ['string', 'integer', 'string', 'integer'])

In [17]:
%%time
var = pipeline.run(observations_path=OBSPATH, 
                   metadata_path=METAPATH,
                   n_jobs=8,
                   elements_per_shard=5000)

2023-07-07 17:53:58,542 - INFO - Starting DataPipeline operations

  0%|[38;2;0;255;0m                                                               [0m| 0/3 [00:00<?, ?it/s][0m[A
Processing fold 0/3:   0%|[38;2;0;255;0m                                          [0m| 0/3 [00:02<?, ?it/s][0m[A2023-07-07 17:54:00.960448: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:267] failed call to cuInit: UNKNOWN ERROR (34)
2023-07-07 17:54:00.960481: I tensorflow/compiler/xla/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (258e82d82bea): /proc/driver/nvidia/version does not exist
2023-07-07 17:54:00.960747: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F AVX512_VNNI FMA
To enable them in other operations, rebuild TensorFlow with the appropriate c

CPU times: user 31.6 s, sys: 7.95 s, total: 39.6 s
Wall time: 29.8 s


### Customize what happens within the preprocess function
### (NOT WORKING YET)

You must keep the same parameters of the method i.e., `row, context_features, sequential_features`. 

Also the **output** should be tuple containing the lightcurve (`pd.DataFrame`) and the context values (`dict`)


To modify the `process_sample` method we need to create a new class (`MyPipeline`) that inherits from `DataPipeline` 

In [30]:
import polars as pl
class MyPipeline(DataPipeline):
    @staticmethod
    def preprocess(scan, id_column, sequential_features):
        general_fn = pl.col("err") < 1.  # Clean the data on the big lazy dataframe
        
        def lc_fn(lc):
            lc = lc.sort('mjd') 
            lc = lc.lazy().with_column([
        ( pl.col("value") / pl.col("value").first()).alias("mjd")])
        per_lc_fn  = lambda lc: lc_fn(lc) #mjd 
        
        processed_obs = scan.filter(general_fn)
        processed_obs = processed_obs.drop_nulls()
        obs_grouped = processed_obs.groupby(id_column)
        obs_grouped.apply(per_lc_fn, schema=None)
        return processed_obs

Next steps are the same as using the original `DataPipeline` class

In [31]:
custom_pipeline = MyPipeline(metadata=metadata,
                             config_path=config_path)

[INFO] 21444 samples loaded


In [32]:
test_metadata = metadata.sample(n=100)
k_folds = 3
custom_pipeline.train_val_test(val_frac=0.2, 
                               test_meta=[test_metadata]*k_folds, 
                               k_fold=k_folds)

[INFO] Using ID col as sample identifier
[INFO] Shuffling
[INFO] Shuffling
[INFO] Shuffling


In [33]:
%%time
var = custom_pipeline.run(observations_path=OBSPATH, 
                           metadata_path=METAPATH,
                           n_jobs=8,
                           elements_per_shard=5000)

2023-07-07 18:00:45,968 - INFO - Starting DataPipeline operations

  0%|[38;2;0;255;0m                                                               [0m| 0/3 [00:00<?, ?it/s][0m[A
Processing fold 0/3:   0%|[38;2;0;255;0m                                          [0m| 0/3 [00:02<?, ?it/s][0m[A
Processing fold 0/3:  33%|[38;2;0;255;0m███████████▎                      [0m| 1/3 [00:11<00:22, 11.44s/it][0m[A
Processing fold 1/3:  33%|[38;2;0;255;0m███████████▎                      [0m| 1/3 [00:11<00:22, 11.44s/it][0m[A
Processing fold 1/3:  67%|[38;2;0;255;0m██████████████████████▋           [0m| 2/3 [00:20<00:09,  9.99s/it][0m[A
Processing fold 2/3:  67%|[38;2;0;255;0m██████████████████████▋           [0m| 2/3 [00:20<00:09,  9.99s/it][0m[A
Processing fold 2/3: 100%|[38;2;0;255;0m██████████████████████████████████[0m| 3/3 [00:29<00:00,  9.79s/it][0m[A
2023-07-07 18:01:15,355 - INFO - Finished execution of DataPipeline operations


CPU times: user 31.3 s, sys: 7.91 s, total: 39.2 s
Wall time: 29.4 s


# Reading

In [34]:
import tensorflow as tf
from src.data.record import deserialize
import glob

%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [35]:
root = './data/records_parquet/alcock/fold_0/train/'
record_files = glob.glob(os.path.join(root, '*.record'))
raw_dataset = tf.data.TFRecordDataset(record_files)
raw_dataset = raw_dataset.map(lambda x: deserialize(x, root))

In [36]:
for x in raw_dataset.take(1):
    print(x['input'][..., 0])

tf.Tensor(
[[48823.78  48824.805 48825.805 48828.777 48829.746 48831.727 48832.723
  48834.76  48835.652 48835.797 48836.75  48841.74  48842.777 48843.72
  48851.746 48854.664 48855.812 48856.695 48884.69  48885.645 48887.77
  48888.766 48894.77  48908.48  48915.633 48916.734 48917.55  48919.59
  48927.516 48928.555 48929.504 48930.617 48931.516 48933.664 48935.66
  48937.723 48938.547 48939.562 48941.617 48947.664 48948.72  48949.555
  48964.7   48965.55  48966.543 48984.547 48985.562 48987.57  48988.543
  48988.74  48989.56  48996.508 48998.51  48998.734 49000.527 49001.51
  49001.707 49002.53  49006.52  49006.71  49007.566 49007.69  49014.71
  49015.52  49015.637 49016.547 49016.67  49018.55  49018.676 49020.613
  49021.53  49021.656 49025.64  49029.496 49032.516 49033.617 49036.72
  49037.484 49040.574 49043.49  49043.61  49044.68  49045.477 49045.613
  49046.664 49048.47  49048.605 49049.527 49049.67  49050.67  49053.594
  49059.523 49060.504 49061.484 49062.48  49064.582 49065.54