In [1]:
import os
GPU_id = 2
os.environ['CUDA_VISIBLE_DEVICES'] = str(GPU_id)

In [2]:
import warnings
warnings.filterwarnings("ignore")
import math
import cudf as gd
import pandas as pd
import numpy as np
from numba import cuda
import time
import matplotlib.pyplot as plt
%matplotlib inline

print(gd.__version__)

0.8.0+0.g8fa7bd3.dirty


In [3]:
PATH = '../input'

In [4]:
GPU_MEMORY = 32 # GB. 
#GPU_MEMORY = 16 # GB. Both 32 and 16 GB have been tested

TEST_ROWS = 453653104 # number of rows in test data
# no skip if your gpu has 32 GB memory
# otherwise, skip rows porportionally
OVERHEAD = 1.4 # to run end-to-end on a single GPU 
SKIP_ROWS = int((1 - GPU_MEMORY/(32.0*OVERHEAD))*TEST_ROWS) 

## Functions

In [5]:
def delta_in_group(val,val_delta, default):
    for i in range(cuda.threadIdx.x, len(val), cuda.blockDim.x):
        if i>0:
            val_delta[i] = val[i]-val[i-1]
        else:
            val_delta[i] = default

def compute_delta(df):
    for col,d in zip(['flux','mjd'],[0,180]):
        df = rename_col(df,col,'val')
        df = df.groupby('object_id',method="cudf",as_index=False).apply_grouped(delta_in_group,incols=['val'],
                                  outcols={'val_delta': np.float32},
                                  kwargs={'default':d},
                                  tpb=32)
        df = rename_col(df,'val',col)
        df = rename_col(df,'val_delta','%s_delta'%col)
        if col=='flux':
            df.drop_column(col)
    return df

def clip_and_log_transform(df):
    df['flux'] = df['flux'].applymap(lambda x: math.log1p(x+10) if x>-10 else 0)
    df['flux_err'] = df['flux_err'].applymap(lambda x: math.log1p(x))
    return df

In [6]:
def rename_col(df,old_col,new_col):
    cols = [new_col if i==old_col else i for i in df.columns]
    df.columns = cols
    return df

In [7]:
def segment(df):
    df['is_boundary'] = df['mjd_delta']>90
    df['is_boundary'] = df['is_boundary'].astype('int32')
    df = df.sort_values(['object_id','mjd'])
    df.drop_column('mjd')
    df['seq_id'] = df['is_boundary'].cumsum()
    return df

In [8]:
def reset_boundary(df):
    df['flux_delta'] = df['flux_delta']*(1-df['is_boundary'])
    #df['mjd_delta'] = df['mjd_delta']*(1-df['is_boundary'])
    df.drop_column('is_boundary')
    return df

In [9]:
def get_order_in_group(row_id,step):
    for i in range(cuda.threadIdx.x, len(row_id), cuda.blockDim.x):
        step[i] = i

def add_step(df):
    df['row_id'] = np.arange(df.shape[0])
    df = df.groupby('object_id',method="cudf").apply_grouped(get_order_in_group,incols=['row_id'],
                                  outcols={'step': np.int32},
                                  tpb=32)
    df = df.sort_values('row_id')
    return df

### Read data

In [10]:
%%time

ts_cols = ['object_id', 'mjd', 'passband', 'flux', 'flux_err', 'detected']
ts_dtypes = ['int32', 'float32', 'int8', 'float32','float32','int32']

train_gd = gd.read_csv('%s/training_set.csv'%PATH,
            names=ts_cols,dtype=ts_dtypes,skiprows=1)
test_gd = gd.read_csv('%s/test_set.csv'%PATH,
            names=ts_cols,dtype=ts_dtypes,skiprows=1+SKIP_ROWS) # skip the header

CPU times: user 17.9 s, sys: 5.92 s, total: 23.8 s
Wall time: 23.8 s


In [11]:
train_gd.drop_column('detected')
test_gd.drop_column('detected')

In [12]:
train_gd.head().to_pandas()

Unnamed: 0,object_id,mjd,passband,flux,flux_err
0,615,59750.421875,2,-544.810303,3.622952
1,615,59750.429688,1,-816.434387,5.55337
2,615,59750.4375,3,-471.385498,3.801213
3,615,59750.441406,4,-388.984955,11.395031
4,615,59752.40625,2,-681.858826,4.041203


### Description of columns
- object_id: Foreign key with the metadata. Int32
- mjd: the time in Modified Julian Date (MJD) of the observation. 
- passband: The specific LSST passband integer, such that u, g, r, i, z, Y = 0, 1, 2, 3, 4, 5 in which it was viewed. 
- flux: the measured flux (brightness) in the passband of observation as listed in the passband column. 
- flux_err: the uncertainty on the measurement of the flux listed above. Float32
- detected: If 1, the object's brightness is significantly different

### Normalization

In [13]:
%%time
train_gd = clip_and_log_transform(train_gd)
test_gd = clip_and_log_transform(test_gd)

CPU times: user 376 ms, sys: 36 ms, total: 412 ms
Wall time: 419 ms


### Differentiation and segmentation

In [14]:
%%time
train_gd = train_gd.sort_values(['object_id','mjd'])
test_gd = test_gd.sort_values(['object_id','mjd'])

CPU times: user 944 ms, sys: 520 ms, total: 1.46 s
Wall time: 2.46 s


In [15]:
%%time
train_gd = compute_delta(train_gd)
test_gd = compute_delta(test_gd)

CPU times: user 17.2 s, sys: 8.06 s, total: 25.3 s
Wall time: 7.96 s


In [16]:
%%time
train_gd = segment(train_gd)
test_gd = segment(test_gd)

CPU times: user 1.02 s, sys: 716 ms, total: 1.74 s
Wall time: 3.38 s


### Reset the values of `flux_delta` and `mjd_delta` at the boundary of subsequence

In [17]:
%%time
train_gd = reset_boundary(train_gd)
test_gd = reset_boundary(test_gd)

CPU times: user 12 ms, sys: 20 ms, total: 32 ms
Wall time: 32.2 ms


### Lastly, we scale `mjd_delta_clean`

In [18]:
train_gd['mjd_delta'] /= 90 # scale mjd_delta
test_gd['mjd_delta'] /= 90

In [19]:
for col in train_gd.columns:
    print(col,train_gd[col].min(),train_gd[col].max())

object_id 615 130779836
passband 0 5
flux_err 0.3810037076473236 14.619336128234863
flux_delta -14.704561233520508 14.704561233520508
mjd_delta 0.0 3.4457030296325684
seq_id 1 26220


In [20]:
%%time
train_pd = train_gd.to_pandas()
test_pd = test_gd.to_pandas()

CPU times: user 56.8 s, sys: 28.4 s, total: 1min 25s
Wall time: 10.7 s


### Create step number for each light curve

To reduce memory consumption, we delete other columns

In [21]:
cols = [col for col in train_gd.columns]
for col in cols:
    if col!='object_id':
        train_gd.drop_column(col)
        test_gd.drop_column(col)

In [22]:
%%time
train_gd = add_step(train_gd)
test_gd = add_step(test_gd)

CPU times: user 18.3 s, sys: 11.3 s, total: 29.6 s
Wall time: 7.98 s


In [23]:
%%time
train_pd['step'] = train_gd['step'].to_pandas().values
test_pd['step'] = test_gd['step'].to_pandas().values

CPU times: user 10.9 s, sys: 5.12 s, total: 16 s
Wall time: 2.78 s


In [24]:
train_pd.head()

Unnamed: 0,object_id,passband,flux_err,flux_delta,mjd_delta,seq_id,step
0,615,2,1.531034,0.0,2.0,1,0
1,615,1,1.879979,0.0,8.7e-05,1,1
2,615,3,1.568869,0.0,8.7e-05,1,2
3,615,4,2.517296,0.0,4.3e-05,1,3
4,615,2,1.617645,0.0,0.021832,1,4


In [25]:
%%time
train_pd.to_pickle('train_rnn.pkl')
test_pd.to_pickle('test_rnn.pkl')

CPU times: user 10 s, sys: 30.4 s, total: 40.5 s
Wall time: 41 s


### Conclusion

In this notebook we process the light curve data in order to train a RNN. 
- `object_id`, `seq_id` and `step` can be used to sample sequences from the dataframe.
- `mjd_delta` and `flux_delta` enable RNN to learn the temporal pattern of light curves.
- `passband` is an original feature and will be embedded.