In [None]:
from dask.distributed import Client, progress

client = Client('address_goes_here:8786')
# Local Address: ''
# Scheduler Address: ''
client

In [35]:
# Utilities
import sys
import psutil
import os
import pandas as pd
pd.set_option('display.max_columns', 500)
pd.set_option('display.max_rows', 3000)
import numpy as np
import gc
import time
import warnings
import itertools
import gcsfs
from google.cloud import storage
from google.oauth2 import service_account

# Dask
import dask
import dask.dataframe as dd
import dask.array as da
import dask.delayed as delayed

from numba import jit

from timeit import default_timer as timer

In [36]:
# Create dictionary calling out specific data types to lower the memory footprint 
test_df_column_types = {
    'detected': 'uint8',
    'flux': 'float32',
    'flux_err': 'float32',
    'mjd': 'float32',
    'object_id': 'uint64',
    'passband': 'uint8'}

train_df_column_types = {
    'detected': 'uint8',
    'flux': 'float32',
    'flux_err': 'float32',
    'mjd': 'float32',
    'object_id': 'uint32',
    'passband': 'uint8'}

train_meta_column_types = {   
    'ddf': 'uint8',
    'decl': 'float32',
    'distmod': 'float32',
    'gal_b': 'float32',
    'gal_l': 'float32',
    'hostgal_photoz': 'float32',
    'hostgal_photoz_err': 'float32',
    'hostgal_specz': 'float32',
    'object_id': 'uint32',
    'ra': 'float32',
    'target': 'uint16'}

test_meta_df_column_types = {   
    'ddf': 'uint16',
    'decl': 'float32',
    'distmod': 'float32',
    'gal_b': 'float32',
    'gal_l': 'float32',
    'hostgal_photoz': 'float32',
    'hostgal_photoz_err': 'float32',
    'hostgal_specz': 'float32',
    'object_id': 'uint64',
    'ra': 'float32'}

In [41]:
# Import the datasets from Google Cloud Storage Bucket
%%time
train_df = dd.read_csv('gs://plastiic/training_set.csv')
train_meta_df = dd.read_csv('gs://plastiic/training_set_metadata.csv')
test_meta_df = dd.read_csv('gs://plastiic/test_set_metadata.csv')

CPU times: user 79.6 ms, sys: 7.3 ms, total: 86.9 ms
Wall time: 725 ms


In [44]:
# Blocksize is how much of a chunk from the dataset you can place on the GPU
# Original file too large to place in memory 
%%time
test_df = dd.read_csv('gs://plastiic/test_set.csv', blocksize=10000000) # e9 = 1GB 

CPU times: user 930 ms, sys: 0 ns, total: 930 ms
Wall time: 1.54 s


In [46]:
# Compute flux ratio squared and flux by flux ratio squared from the dataset
def process_flux(df):
    flux_ratio_sq = df.flux / df.flux_err
    flux_by_flux_ratio_sq = (df.flux * flux_ratio_sq)
    
    df_flux = dd.concat([df, flux_ratio_sq, flux_by_flux_ratio_sq], axis=1)
    df_flux.columns = ['object_id', 'mjd', 'passband', 'flux', 'flux_err', 'detected', 'flux_ratio_sq', 'flux_by_flux_ratio_sq']
    return df_flux

In [47]:
def process_flux_agg(df):
    flux_w_mean = df.flux_by_flux_ratio_sq_sum / df.flux_ratio_sq_sum
    flux_diff = df.flux_max - df.flux_max
    flux_diff2 = flux_diff / df.flux_mean
    flux_diff3 = flux_diff / flux_w_mean
    
    df_flux_agg = dd.concat([df, flux_w_mean, flux_diff, flux_diff2, flux_diff3], axis=1)
    
    return df_flux_agg

In [None]:
def _get_length_sequences_where(x):
    """
    This method calculates the length of all sub-sequences where the array x is either True or 1.
    """
    if len(x) == 0:
        return [0]
    else:
        res = [len(list(group)) for value, group in itertools.groupby(x) if value == 1]
        return res if len(res) > 0 else [0]

In [49]:
# Below are two features created to be applied to the passband and flux columns 

def count_a_mean(df_series):
    m = df_series.mean()
    x = []
    for i in df_series.values:
        if i > m:
            y = x.append(i)
    z = np.count_nonzero(x)
    return z

def count_b_mean(df_series):
    m = df_series.mean()
    x = []
    for i in df_series.values:
        if i < m:
            y = x.append(i)
    z = np.count_nonzero(x)
    return z

# After creating a function, use the code below to create a Dask aggregation
count_above_mean = dd.Aggregation('count_above_mean', lambda x: x.agg(count_a_mean), 
                            lambda x: x.sum())
count_below_mean = dd.Aggregation('count_below_mean', lambda x: x.agg(count_b_mean), 
                            lambda x: x.sum())

In [51]:
# Create dictionary for what aggregations are desired using both built in methods and custom
# functions
aggs = {
'flux': ['min', 'max', 'mean', 'std', count_above_mean, count_below_mean],
'detected': ['mean'],
'flux_ratio_sq': ['sum'],
'flux_by_flux_ratio_sq': ['sum'],
'passband' : [count_above_mean, count_below_mean],
'mjd' : ['max', 'min'],
}


In [52]:
def featurize(df):
    start_df = process_flux(df)  
    agg_df = start_df.groupby(['object_id']).agg(aggs)

    return agg_df

In [53]:
warnings.filterwarnings('ignore', category = UserWarning)

# Implement a timer to determine how long it takes to create the features 
overall_start = timer()
start_df = featurize(train_df).compute()
overall_end = timer()
print(f"Total Time Elapsed: {round(overall_end - overall_start, 2)} seconds.")
start_df.head(10)

Total Time Elapsed: 12.13 seconds.


Unnamed: 0_level_0,flux,flux,flux,flux,flux,flux,detected,flux_ratio_sq,flux_by_flux_ratio_sq,passband,passband,mjd,mjd
Unnamed: 0_level_1,min,max,mean,std,count_above_mean,count_below_mean,mean,sum,sum,count_above_mean,count_below_mean,max,min
object_id,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2,Unnamed: 10_level_2,Unnamed: 11_level_2,Unnamed: 12_level_2,Unnamed: 13_level_2
615,-1100.440063,660.626343,-123.096998,394.109851,198.0,154.0,0.946023,-9060.302923,12706160.0,173,116,60624.2132,59750.4229
713,-14.735178,14.770886,-1.423351,6.471144,181.0,169.0,0.171429,-294.371461,8328.613,168,112,60674.0798,59825.26
730,-19.159811,47.310059,2.267434,8.022239,89.0,241.0,0.069697,335.303259,8367.311,154,104,60652.166,59798.3205
745,-15.494463,220.795212,8.909206,27.558208,72.0,279.0,0.173789,1645.743729,153089.3,167,112,60624.0722,59770.3662
1124,-16.543753,143.600189,7.145702,20.051722,77.0,275.0,0.173295,1189.568855,66679.88,173,116,60624.2132,59750.4229
1227,-12.695161,71.678154,0.367212,4.962633,150.0,180.0,0.009091,65.416809,3065.456,154,104,60652.166,59798.3205
1598,-13.854152,1448.715698,30.333023,167.317942,19.0,333.0,0.056818,2040.427595,1630938.0,173,116,60624.2132,59750.4229
1632,-14.525748,34.559826,1.760026,5.273147,123.0,229.0,0.008523,174.882875,2300.122,173,116,60624.2132,59750.4229
1920,-13.367912,231.828339,19.228827,48.004394,48.0,207.0,0.215686,2340.856321,321360.6,135,90,60436.984,59582.3282
1926,-52.764721,30.658575,-0.08521,6.288463,137.0,118.0,0.011765,52.433122,2256.502,135,90,60436.984,59582.3282


In [None]:
warnings.filterwarnings('ignore', category = UserWarning)

# Implement a timer to determine how long it takes to create the features 
overall_start = timer()
start_df = featurize(test_df).compute()
overall_end = timer()
print(f"Total Time Elapsed: {round(overall_end - overall_start, 2)} seconds.")
start_df.head(10)