In [1]:
import pandas as pd
import swifter # To parallelise pandas operations
import h5py
import sys
sys.path.append('../../pyriodogram/')
import ndft_features as ndft
import tensorflow as tf
import os
import glob
import numpy as np
import collections
from joblib import Parallel, delayed
from tqdm import tqdm
tqdm.pandas()

In [2]:
TFRECORDS_DIR = '../records/'
EXAMPLES_PER_RECORD = 50000
PLASTICC_CATEGORIES = [6, 15 ,16 ,42 ,52 ,53 ,62 ,64 ,65 ,67 ,88 ,90 ,92 ,95 ,99]
CLASSIFIER_CATEGORIES = {cat:idx for idx, cat in enumerate(PLASTICC_CATEGORIES)}
NUM_BANDS = 6

# Read augmented dataset

In [None]:
flux_data = pd.read_hdf('kyle_final_augment.h5', 'df')
meta_data = pd.read_hdf('kyle_final_augment.h5', 'meta')

In [None]:
flux_data = flux_data.astype({'object_id':float, 'mjd':float,
                              'passband':int, 'flux': float,
                             'flux_err':float, 'detected':int})

In [None]:
meta_data = meta_data.astype({'object_id':float, 'ra':float, 'decl':float,
                             'gal_l':float, 'gal_b':float, 'ddf':int,
                             'hostgal_specz':float, 'hostgal_photoz':float,
                             'hostgal_photoz_err':float,'distmod':float,
                             'mwebv':float, 'target':int,'fold':int})

# Group and rename dynamic features

In [None]:
def reduce_arrays(df):
    df = df.sort_values('mjd')
    return df['mjd'].values, df['flux'].values,df['flux_err'].values, df['detected'].values

In [None]:
df_dynfeat = flux_data.groupby(['object_id',
                                'passband']).apply(reduce_arrays)
df_dynfeat = pd.DataFrame(df_dynfeat)

In [None]:
def name_cols(ds):
    mjd, flux, flux_err, detected = ds[0]
    return pd.Series({'object_id': ds['object_id'],'passband':ds['passband'] ,
                      'mjd': mjd, 'flux': flux, 'flux_err': flux_err,
                    'detected': detected})
    

In [None]:
df_dynfeat = df_dynfeat.reset_index().swifter.apply(name_cols, axis=1)

# Obtain Fourier features

In [None]:
def extract_fourier_feats(ds):
    freqs, mag, phase, Pn, proba = ndft.extract(ds['mjd'],
                            ds['flux'], oversampling = 4, tolerance = 1e-5)
    ds['freqs'] = freqs
    ds['mag'] = mag
    ds['phase'] = phase
    ds['period'] = Pn
    ds['proba'] = proba
    return ds

In [None]:
df_dynfeat = df_dynfeat.swifter.apply(extract_fourier_feats, axis=1)

In [None]:
df = pd.merge(df_dynfeat, meta_data, on='object_id', how='left')

***Read pickles***

In [3]:
pickle_files = glob.glob(TFRECORDS_DIR + 'fold_*')
df = []
for f in pickle_files:
    df_fold = pd.read_pickle(f)
    df += [df_fold]
df = pd.concat(df)

In [4]:
df.head(2)

Unnamed: 0,object_id,passband,mjd,flux,flux_err,detected,freqs,mag,phase,period,...,gal_l,gal_b,ddf,hostgal_specz,hostgal_photoz,hostgal_photoz_err,distmod,mwebv,target,fold
1386,1920.0,0,"[59582.3282, 59583.2409, 59584.2432, 59585.236...","[-0.39618459999999955, -3.2643575999999994, -3...","[2.872674, 2.532204, 2.558159, 2.934097, 3.362...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0, ...","[0.00029251541965783417, 0.0005850308393156683...","[10.713621685288528, 33.88469008077931, 60.409...","[-1.8355565736121962, -1.327994501707416, -0.6...","[-0.09825567423829489, 1.1302372983702167, 2.5...",...,234.919132,42.24555,1,0.3088,0.3229,0.336,41.1401,0.027,90,0
1387,1920.0,1,"[59588.2266, 59591.2168, 59594.272, 59618.2024...","[-16.78730248888889, -16.631796488888888, -15....","[1.015543, 2.080464, 3.034246, 1.172148, 3.211...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...","[0.00029558992832890275, 0.0005911798566578055...","[111.73998679333911, 370.2667976947086, 664.91...","[-2.760655839180063, -1.7692688937666163, -0.9...","[0.5404477046253409, 2.3413844932081647, 10.15...",...,234.919132,42.24555,1,0.3088,0.3229,0.336,41.1401,0.027,90,0


# Write to tfrecords

In [5]:
def _int64_list_feature(values):
    """Returns a TF-Feature of int64_list.

    Args:
      values: A scalar or list of values.

    Returns:
      A TF-Feature.
    """
    # Flat numpy array (we actually need a list)
    if isinstance(values, np.ndarray):
        values = np.reshape(values, [-1])
        
    if not isinstance(values, collections.Iterable):
        values = [values]

    return tf.train.Feature(int64_list=tf.train.Int64List(value=values))

def _float_list_feature(values):
    """Returns a TF-Feature of FloatList.

    Args:
      values: A scalar or list of values.

    Returns:
      A TF-Feature.
    """
    
    # Flat numpy array (we actually need a list)
    if isinstance(values, np.ndarray):
        values = np.reshape(values, [-1])
    
    if not isinstance(values, collections.Iterable):
        values = [values]

    return tf.train.Feature(float_list=tf.train.FloatList(value=values))


def _bytes_list_feature(values):
    """Returns a TF-Feature of bytes.

    Args:
      values: A string.

    Returns:
      A TF-Feature.
    """
    def norm2bytes(value):
        return value.encode() if isinstance(value, str) and six.PY3 else value
    
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[norm2bytes(values)]))

def row_to_tfexample(rows):
    """Converts band rows for one object to tf example.
    Args:
      rows: data frame with object data.
    Returns:
      tf example.
    """
    NUM_BANDS = 6
    
    # Timeless features
    features = {'object/id': _float_list_feature(rows.index[0]),
                'object/target': _int64_list_feature(CLASSIFIER_CATEGORIES[rows.iloc[0]['target']]),
                'ddf': _int64_list_feature(rows.iloc[0]['ddf']),
                'hostgal_specz': _float_list_feature(rows.iloc[0]['hostgal_specz']), 
                'hostgal_photoz': _float_list_feature(rows.iloc[0]['hostgal_photoz']), 
                'hostgal_photoz_err': _float_list_feature(rows.iloc[0]['hostgal_photoz_err']), 
                'distmod': _float_list_feature(rows.iloc[0]['distmod']), 
                'mwebv': _float_list_feature(rows.iloc[0]['mwebv'])}
    
    for band in range(NUM_BANDS):
        row = df[df['passband'] == band].iloc[0]
        # Time dependent features by band
        features.update({'band_%i/num_samples'%band: _int64_list_feature(len(row['detected'])),
                         'band_%i/detected'%band: _int64_list_feature(row['detected']),
                         'band_%i/flux'%band: _float_list_feature(row['flux']),
                         'band_%i/flux_err'%band: _float_list_feature(row['flux_err']),
                         'band_%i/mjd'%band: _float_list_feature(row['mjd']),
                         'band_%i/dft/freqs'%band: _float_list_feature(row['freqs']),
                         'band_%i/dft/mag'%band: _float_list_feature(row['mag']),
                         'band_%i/dft/phase'%band: _float_list_feature(row['phase']),
                         'band_%i/dft/periodogram'%band: _float_list_feature(row['period']),
                         'band_%i/dft/proba'%band: _float_list_feature(row['proba'])})
    return tf.train.Example(features=tf.train.Features(feature=features))

def convert_subset(df, examples_per_record, output_path, fold):
    """Converts fold  to tf records
    Args:
        df: pandas dataframe,
        examples_per_record: number of samples saved in one tf record,
        output_path: path to save tf records,
        fold: cross validation fold.
                
    
    """
    def _get_output_filename(output_path, idx, num_files):
        if idx is None:
            idx = 0; num_files=0;
        return '%s-shard_%02d_of_%02d.tfrecord'%(output_path, idx, num_files)

    if len(df) == 0:
        print('-> %s fold is empty'%fold)
        return
    print('\n-> Processing %s fold...'%fold)
    # Initialize progress bar and counter
    # Initialize tfrecord idx counter
    if examples_per_record is None:
        tfrecord_idx = None
    else:
        tfrecord_idx = 1
    # tf writer
    object_ids = df['object_id'].unique()

    num_records = int(np.ceil(len(object_ids)/examples_per_record)) 
    print(_get_output_filename(output_path, tfrecord_idx, num_records))
    writer = tf.python_io.TFRecordWriter(_get_output_filename(output_path, tfrecord_idx,
                                                              num_records))
    # Save DataFrame, just in case...
    #df.to_pickle('%s-features.pkl'%output_path)  
    #idx = 1
    #progress = tf.keras.utils.Progbar(len(object_ids), interval=0.05)
    _ = df.groupby('object_id').progress_apply(lambda x: writer.write(row_to_tfexample(x).SerializeToString()))
    
    #_ = df.groupby('object_id').apply(lambda x: writer.write(row_to_tfexample(x).SerializeToString()))

    #df = df.set_index('object_id')
    #for object_id in object_ids:
        #rows = df.iat[object_id]
        # Prepare example
        #rows = df.iloc[i:i+6]
        #example = row_to_tfexample(rows)
        #writer.write(example.SerializeToString())
        #progress.update(idx)
        #if examples_per_record is not None and idx%examples_per_record==0:
            # Close current writer and set a new one into a new file
            #tfrecord_idx += 1
            #writer.close()
            #writer = tf.python_io.TFRecordWriter(_get_output_filename(output_path, 
                                                                      #tfrecord_idx, num_records))
        #idx += 1
    writer.close()

In [7]:
if not os.path.exists(os.path.dirname(TFRECORDS_DIR)):
    try:
        os.makedirs(os.path.dirname(TFRECORDS_DIR))
    except OSError as exc: # Guard against race condition
        if exc.errno != errno.EEXIST:
            raise

# Save dataset descriptors
Nfolds = df['fold'].unique()
df_try = df[:100]
for i in Nfolds: 
    convert_subset(df_try[df_try['fold'] == i], EXAMPLES_PER_RECORD, 
                   TFRECORDS_DIR + 'fold_%02d_of_%02d'%(i, len(Nfolds)-1), i)


  0%|          | 0/17 [00:00<?, ?it/s][A


-> Processing 0 fold...
../records/fold_00_of_04-shard_01_of_01.tfrecord



 12%|█▏        | 2/17 [00:10<01:22,  5.49s/it][A
 18%|█▊        | 3/17 [00:11<00:57,  4.12s/it][A
 24%|██▎       | 4/17 [00:12<00:41,  3.17s/it][A
 29%|██▉       | 5/17 [00:13<00:30,  2.50s/it][A
 35%|███▌      | 6/17 [00:14<00:22,  2.03s/it][A
 41%|████      | 7/17 [00:15<00:17,  1.71s/it][A
 47%|████▋     | 8/17 [00:16<00:13,  1.48s/it][A
 53%|█████▎    | 9/17 [00:17<00:10,  1.33s/it][A
 59%|█████▉    | 10/17 [00:18<00:08,  1.22s/it][A

KeyboardInterrupt: 

In [None]:
Parallel(n_jobs=len(Nfolds))(delayed(convert_subset)(df_try[df_try['fold'] == i], EXAMPLES_PER_RECORD, 
                   TFRECORDS_DIR + 'fold_%02d_of_%02d'%(i, len(Nfolds)-1), i) for i in Nfolds)

## Test tfrecords

In [None]:
import glob
records = glob.glob(TFRECORDS_DIR + 'fold_*.tfrecord')
for example in tf.python_io.tf_record_iterator(records[0]):
    result = tf.train.Example.FromString(example)
    break

# Create metadata

In [None]:
 metadatas = []
for i in range(n_folds):
    class_frequency = folds_train[i]['target'].value_counts(normalize=True)
    plasticc_class_weights = (1/(class_frequency)).to_dict()
    classifier_class_weights = {CLASSIFIER_CATEGORIES[k]:v for k, v in plasticc_class_weights.items()}
    classifier_class_weights_sorted_list = [v for k, v in sorted(classifier_class_weights.items())]
    metadatas.append({'train_objects':folds_train[i]['object_id'].tolist(),
            'val_objects':folds_val[i]['object_id'].tolist(),
            'train_class_weights':classifier_class_weights,
            'train_class_weights_sorted_list':classifier_class_weights_sorted_list,
            'train_stats':[]})#folds_train_dft_stats[i]})