In [42]:
import numpy as np
import pandas as pd
import featuretools as ft
import featuretools.variable_types as vtypes

import os
PATH = '/home/ubuntu/data/astro/partitions/'

train = pd.read_csv(f'{PATH}ptrain/test_set.csv')
train_meta = pd.read_csv(f'{PATH}ptrain/test_set_metadata.csv')

In [16]:
train.head()

Unnamed: 0,object_id,mjd,passband,flux,flux_err,detected
0,615,59750.4229,2,-544.810303,3.622952,1
1,615,59750.4306,1,-816.434326,5.55337,1
2,615,59750.4383,3,-471.385529,3.801213,1
3,615,59750.445,4,-388.984985,11.395031,1
4,615,59752.407,2,-681.858887,4.041204,1


In [17]:
train_meta.head()

Unnamed: 0,object_id,ra,decl,gal_l,gal_b,ddf,hostgal_specz,hostgal_photoz,hostgal_photoz_err,distmod,mwebv,target
0,615,349.046051,-61.943836,320.79653,-51.753706,1,0.0,0.0,0.0,,0.017,92
1,713,53.085938,-27.784405,223.525509,-54.460748,1,1.8181,1.6267,0.2552,45.4063,0.007,88
2,730,33.574219,-6.579593,170.455585,-61.548219,1,0.232,0.2262,0.0157,40.2561,0.021,42
3,745,0.189873,-45.586655,328.254458,-68.969298,1,0.3037,0.2813,1.1523,40.7951,0.007,90
4,1124,352.711273,-63.823658,316.922299,-51.059403,1,0.1934,0.2415,0.0176,40.4166,0.024,90


In [310]:
class AstroP():
    def __init__(self, partition, train = False, feature_defs = None):
        self.partition = partition
        # Read in data
        self.data = pd.read_csv(f'{PATH}p{partition}/test_set.csv')
        self.metadata = pd.read_csv(f'{PATH}p{partition}/test_set_metadata.csv')
        
        # If training, extract the target and run deep feature synthesis
        if train:
            self.target = np.array(self.metadata.pop('target'))
        
        # Make an entityset
        self.es = self.make_entityset()
        
        if not train:
            assert(feature_defs is not None), "Feature defs must be provided with testing set"
            
        self.feature_defs = feature_defs
        
        # If features have already been calculated for this partition
        if os.path.exists(f'{PATH}p{partition}/fm.csv'):
            self.existing_feature_matrix = pd.read_csv(f'{PATH}p{partition}/fm.csv')
            if len(self.existing_feature_matrix) == len(self.metadata):
                if "Unnamed: 0" in self.existing_feature_matrix:
                    self.existing_feature_matrix.drop(columns  = ["Unnamed: 0"], inplace = True)
                self.feature_matrix = self.existing_feature_matrix.copy()
                self.calculated_features = list(self.feature_matrix.columns)
            else:
                self.calculated_features = []
                self.existing_feature_matrix = None
                self.feature_matrix = None
        else:
            self.calculated_features = []
            self.existing_feature_matrix = None
            self.feature_matrix = None
            
        if os.path.exists(f'{PATH}p{partition}/features.txt'):
            self.calculated_feature_defs = ft.load_features(f'{PATH}p{partition}/features.txt')
        else:
            self.calculated_feature_defs = []

        
    def make_entityset(self):
        es = ft.EntitySet(id = 'astro')
        es.entity_from_dataframe(entity_id = 'data', dataframe = self.data,
                                 make_index = True, index = 'index', time_index = 'mjd', 
                                 variable_types = {'detected': vtypes.Categorical,
                                                   'passband': vtypes.Categorical})
        es.entity_from_dataframe(entity_id = 'objects', dataframe = self.metadata,
                                 index = 'object_id',
                                 variable_types = {'ddf': vtypes.Categorical})
        
        es.add_relationship(ft.Relationship(es['objects']['object_id'], es['data']['object_id']))
        return es

    def deep_feature_synthesis(self, features_only = True):
        if features_only:
            self.feature_defs = ft.dfs(entityset = self.es, 
                                       agg_primitives = ['last', 'min', 'max', 'count', 'sum', 'std'],
                                       trans_primitives = ['time_since_previous', 'cum_max', 'cum_sum', 'cum_mean'],
                                       target_entity = 'objects', 
                                       features_only = features_only)
        else:
            self.feature_matrix, self.feature_defs = ft.dfs(entityset = self.es,
                                                            agg_primitives = ['last', 'min', 'max', 'count', 'sum', 'std'],
                                                            trans_primitives = ['time_since_previous', 'cum_max', 'cum_sum', 'cum_mean'],
                                                            target_entity = 'objects', 
                                                            features_only = features_only)
            
    def calculate_features(self):
        assert(self.feature_defs is not None), "Feature definitions must be provided"
        # Remove the features that have already been calculated
        self.feature_defs = [f for f in self.feature_defs if f.get_name() not in self.calculated_features]
        
        if self.feature_defs != []:
        
            # Calcuate the feature matrix
            self.feature_matrix = ft.calculate_feature_matrix(self.feature_defs, entityset = self.es).reset_index()
            
            if self.existing_feature_matrix is not None:
                self.existing_feature_matrix = self.existing_feature_matrix[[c for c in self.existing_feature_matrix if c not in self.feature_matrix]]
                self.feature_matrix = pd.concat([self.feature_matrix, self.existing_feature_matrix], axis = 1)
            
            # Add the features to those already calculated for this feature matrix
            self.calculated_features = list(self.feature_matrix.columns)
            self.calculated_feature_defs.extend(self.feature_defs)
            
        else:
            print('All Features Already Calculated for Partition')
            self.feature_matrix = self.existing_feature_matrix.copy()
    
    def save_features(self):
        assert(self.feature_matrix is not None), "Feature matrix must be calculated"
                
        self.feature_matrix.to_csv(f'{PATH}p{self.partition}/fm.csv')
        ft.save_features(self.calculated_feature_defs, f'{PATH}p{self.partition}/features.txt')
        
        
    def make_predictions(self, trained_model, fit_imputer):
        if 'target' in self.feature_matrix:
            self.feature_matrix.drop(columns = ['target'], inplace = True)
            
        self.train_features = fit_imputer.transform(self.feature_matrix.drop(columns = ['object_id']))
        self.predictions = trained_model.predict_proba(self.train_features)
        
        self.prediction_df = pd.DataFrame(columns = ['object_id',
                                                     'class_6',
                                                     'class_15',
                                                     'class_16',
                                                     'class_42',
                                                     'class_52',
                                                     'class_53',
                                                     'class_62',
                                                     'class_64',
                                                     'class_65',
                                                     'class_67',
                                                     'class_88',
                                                     'class_90',
                                                     'class_92',
                                                     'class_95',
                                                     'class_99'])
        
        self.prediction_df['object_id'] = self.feature_matrix['object_id']
        self.prediction_df.loc[:, 'class_6': 'class_95'] = self.predictions
        self.prediction_df['class_99'] = 0
        
        self.prediction_df.to_csv(f'{PATH}p{self.partition}/predictions.csv', index = False)

In [311]:
trainp = AstroP('train', train = True)
trainp.deep_feature_synthesis()

In [312]:
trainp.calculated_features[-10:]

['SUM(data.CUM_SUM(flux by object_id))',
 'SUM(data.CUM_SUM(flux_err by object_id))',
 'SUM(data.CUM_MEAN(flux by object_id))',
 'SUM(data.CUM_MEAN(flux_err by object_id))',
 'STD(data.CUM_MAX(flux by object_id))',
 'STD(data.CUM_MAX(flux_err by object_id))',
 'STD(data.CUM_SUM(flux by object_id))',
 'STD(data.CUM_SUM(flux_err by object_id))',
 'STD(data.CUM_MEAN(flux by object_id))',
 'STD(data.CUM_MEAN(flux_err by object_id))']

In [313]:
trainp.calculate_features()

All Features Already Calculated for Partition


In [314]:
features = ft.load_features(f'{PATH}ptrain/features.txt')
features[-10].get_name()

'SUM(data.CUM_SUM(flux by object_id))'

In [315]:
feature_matrix = pd.read_csv(f'{PATH}ptrain/fm.csv')
feature_matrix.head()

Unnamed: 0,object_id,ra,decl,gal_l,gal_b,hostgal_specz,hostgal_photoz,hostgal_photoz_err,distmod,mwebv,target,ddf,LAST(data.mjd),LAST(data.flux),LAST(data.flux_err),LAST(data.detected),LAST(data.passband),MIN(data.mjd),MIN(data.flux),MIN(data.flux_err),MAX(data.mjd),MAX(data.flux),MAX(data.flux_err),COUNT(data),SUM(data.mjd),SUM(data.flux),SUM(data.flux_err),STD(data.mjd),STD(data.flux),STD(data.flux_err),LAST(data.CUM_MAX(flux by object_id)),LAST(data.CUM_MAX(flux_err by object_id)),LAST(data.CUM_SUM(flux by object_id)),LAST(data.CUM_SUM(flux_err by object_id)),LAST(data.CUM_MEAN(flux by object_id)),LAST(data.CUM_MEAN(flux_err by object_id)),MIN(data.CUM_MAX(flux by object_id)),MIN(data.CUM_MAX(flux_err by object_id)),MIN(data.CUM_SUM(flux by object_id)),MIN(data.CUM_SUM(flux_err by object_id)),MIN(data.CUM_MEAN(flux by object_id)),MIN(data.CUM_MEAN(flux_err by object_id)),MAX(data.CUM_MAX(flux by object_id)),MAX(data.CUM_MAX(flux_err by object_id)),MAX(data.CUM_SUM(flux by object_id)),MAX(data.CUM_SUM(flux_err by object_id)),MAX(data.CUM_MEAN(flux by object_id)),MAX(data.CUM_MEAN(flux_err by object_id)),SUM(data.CUM_MAX(flux by object_id)),SUM(data.CUM_MAX(flux_err by object_id)),SUM(data.CUM_SUM(flux by object_id)),SUM(data.CUM_SUM(flux_err by object_id)),SUM(data.CUM_MEAN(flux by object_id)),SUM(data.CUM_MEAN(flux_err by object_id)),STD(data.CUM_MAX(flux by object_id)),STD(data.CUM_MAX(flux_err by object_id)),STD(data.CUM_SUM(flux by object_id)),STD(data.CUM_SUM(flux_err by object_id)),STD(data.CUM_MEAN(flux by object_id)),STD(data.CUM_MEAN(flux_err by object_id))
0,615,349.046051,-61.943836,320.79653,-51.753706,0.0,0.0,0.0,,0.017,92,1,60624.2132,157.0802,8.453112,1,5,59750.4229,-1100.440063,2.13051,60624.2132,660.626343,12.845472,352,21188790.0,-43330.143249,1577.92539,310.024487,394.109851,1.744747,660.626343,12.845472,-43330.143249,1577.92539,-123.096998,4.482743,-544.810303,3.622952,-47609.703104,3.622952,-680.622314,3.622952,660.626343,12.845472,-544.810303,1577.92539,-123.096998,6.093141,188975.718945,4255.575773,-11562060.0,281899.617014,-86522.016147,1625.08361,286.469679,0.986632,11859.31152,456.138608,122.080641,0.243272
1,713,53.085938,-27.784405,223.525509,-54.460748,1.8181,1.6267,0.2552,45.4063,0.007,88,1,60674.0798,-8.669188,2.216094,0,0,59825.26,-14.735178,0.639458,60674.0798,14.770886,9.115748,350,21089870.0,-498.17276,825.86699,310.247414,6.471144,1.509888,14.770886,9.115748,-498.17276,825.86699,-1.423351,2.35962,9.110147,1.013889,-498.17276,1.013889,-1.423351,1.013889,14.770886,9.115748,657.694251,825.86699,9.110147,2.857584,4936.586924,3038.174287,95747.45,145632.756253,981.928803,826.13714,1.187667,1.059203,314.762389,240.883371,2.295335,0.131081
2,730,33.574219,-6.579593,170.455585,-61.548219,0.232,0.2262,0.0157,40.2561,0.021,42,1,60652.166,-1.497761,5.210212,0,5,59798.3205,-19.159811,0.695106,60652.166,47.310059,11.281384,330,19873240.0,748.253237,815.450286,302.781791,8.022239,1.721134,47.310059,11.281384,748.253237,815.450286,2.267434,2.471061,1.177371,1.3643,-21.964732,1.3643,-1.178117,1.261774,47.310059,11.281384,752.289347,815.450286,2.423176,2.852247,7373.074188,2999.147222,65308.34,136359.215481,266.444608,817.54139,17.189027,2.247293,286.501217,241.159072,0.964939,0.134242
3,745,0.189873,-45.586655,328.254458,-68.969298,0.3037,0.2813,1.1523,40.7951,0.007,90,1,60624.0722,2.976642,4.417903,0,5,59770.3662,-15.494463,0.56717,60624.0722,220.795212,55.892746,351,21130140.0,3127.131254,897.007002,306.144145,27.558208,3.537324,220.795212,55.892746,3127.131254,897.007002,8.909206,2.555576,0.161398,0.619084,-5.122443,0.619084,-0.720523,0.610841,220.795212,55.892746,3145.846778,897.007002,14.895624,2.778598,51375.481456,7028.121168,627552.3,151328.66046,2718.72488,843.419415,100.22952,17.822295,1417.156478,262.779735,5.851403,0.227629
4,1124,352.711273,-63.823658,316.922299,-51.059403,0.1934,0.2415,0.0176,40.4166,0.024,90,1,60624.2132,6.331686,7.496435,0,5,59750.4229,-16.543753,0.695277,60624.2132,143.600189,11.38369,352,21188790.0,2515.287161,969.057329,310.024487,20.051722,1.933837,143.600189,11.38369,2515.287161,969.057329,7.145702,2.753004,1.929243,1.531847,-21.471966,1.531847,-3.545689,1.531847,143.600189,11.38369,2515.287161,969.057329,7.376462,4.111077,19640.701538,3818.555806,234799.8,172192.258916,794.732089,987.490122,63.265464,0.950609,960.678845,280.168342,3.167277,0.17029


In [316]:
trainp.deep_feature_synthesis()
trainp.feature_defs[-10:]

[<Feature: SUM(data.CUM_SUM(flux by object_id))>,
 <Feature: SUM(data.CUM_SUM(flux_err by object_id))>,
 <Feature: SUM(data.CUM_MEAN(flux by object_id))>,
 <Feature: SUM(data.CUM_MEAN(flux_err by object_id))>,
 <Feature: STD(data.CUM_MAX(flux by object_id))>,
 <Feature: STD(data.CUM_MAX(flux_err by object_id))>,
 <Feature: STD(data.CUM_SUM(flux by object_id))>,
 <Feature: STD(data.CUM_SUM(flux_err by object_id))>,
 <Feature: STD(data.CUM_MEAN(flux by object_id))>,
 <Feature: STD(data.CUM_MEAN(flux_err by object_id))>]

In [317]:
trainp.es

Entityset: astro
  Entities:
    data [Rows: 1421705, Columns: 7]
    objects [Rows: 7848, Columns: 11]
  Relationships:
    data.object_id -> objects.object_id

In [318]:
trainp.es['data']

Entity: data
  Variables:
    index (dtype: index)
    object_id (dtype: id)
    mjd (dtype: numeric_time_index)
    flux (dtype: numeric)
    flux_err (dtype: numeric)
    detected (dtype: categorical)
    passband (dtype: categorical)
  Shape:
    (Rows: 1421705, Columns: 7)

In [319]:
train_meta.head()

Unnamed: 0,object_id,ra,decl,gal_l,gal_b,ddf,hostgal_specz,hostgal_photoz,hostgal_photoz_err,distmod,mwebv,target
0,615,349.046051,-61.943836,320.79653,-51.753706,1,0.0,0.0,0.0,,0.017,92
1,713,53.085938,-27.784405,223.525509,-54.460748,1,1.8181,1.6267,0.2552,45.4063,0.007,88
2,730,33.574219,-6.579593,170.455585,-61.548219,1,0.232,0.2262,0.0157,40.2561,0.021,42
3,745,0.189873,-45.586655,328.254458,-68.969298,1,0.3037,0.2813,1.1523,40.7951,0.007,90
4,1124,352.711273,-63.823658,316.922299,-51.059403,1,0.1934,0.2415,0.0176,40.4166,0.024,90


In [320]:
pd.options.display.max_columns = 100
feature_matrix.head()

Unnamed: 0,object_id,ra,decl,gal_l,gal_b,hostgal_specz,hostgal_photoz,hostgal_photoz_err,distmod,mwebv,target,ddf,LAST(data.mjd),LAST(data.flux),LAST(data.flux_err),LAST(data.detected),LAST(data.passband),MIN(data.mjd),MIN(data.flux),MIN(data.flux_err),MAX(data.mjd),MAX(data.flux),MAX(data.flux_err),COUNT(data),SUM(data.mjd),SUM(data.flux),SUM(data.flux_err),STD(data.mjd),STD(data.flux),STD(data.flux_err),LAST(data.CUM_MAX(flux by object_id)),LAST(data.CUM_MAX(flux_err by object_id)),LAST(data.CUM_SUM(flux by object_id)),LAST(data.CUM_SUM(flux_err by object_id)),LAST(data.CUM_MEAN(flux by object_id)),LAST(data.CUM_MEAN(flux_err by object_id)),MIN(data.CUM_MAX(flux by object_id)),MIN(data.CUM_MAX(flux_err by object_id)),MIN(data.CUM_SUM(flux by object_id)),MIN(data.CUM_SUM(flux_err by object_id)),MIN(data.CUM_MEAN(flux by object_id)),MIN(data.CUM_MEAN(flux_err by object_id)),MAX(data.CUM_MAX(flux by object_id)),MAX(data.CUM_MAX(flux_err by object_id)),MAX(data.CUM_SUM(flux by object_id)),MAX(data.CUM_SUM(flux_err by object_id)),MAX(data.CUM_MEAN(flux by object_id)),MAX(data.CUM_MEAN(flux_err by object_id)),SUM(data.CUM_MAX(flux by object_id)),SUM(data.CUM_MAX(flux_err by object_id)),SUM(data.CUM_SUM(flux by object_id)),SUM(data.CUM_SUM(flux_err by object_id)),SUM(data.CUM_MEAN(flux by object_id)),SUM(data.CUM_MEAN(flux_err by object_id)),STD(data.CUM_MAX(flux by object_id)),STD(data.CUM_MAX(flux_err by object_id)),STD(data.CUM_SUM(flux by object_id)),STD(data.CUM_SUM(flux_err by object_id)),STD(data.CUM_MEAN(flux by object_id)),STD(data.CUM_MEAN(flux_err by object_id))
0,615,349.046051,-61.943836,320.79653,-51.753706,0.0,0.0,0.0,,0.017,92,1,60624.2132,157.0802,8.453112,1,5,59750.4229,-1100.440063,2.13051,60624.2132,660.626343,12.845472,352,21188790.0,-43330.143249,1577.92539,310.024487,394.109851,1.744747,660.626343,12.845472,-43330.143249,1577.92539,-123.096998,4.482743,-544.810303,3.622952,-47609.703104,3.622952,-680.622314,3.622952,660.626343,12.845472,-544.810303,1577.92539,-123.096998,6.093141,188975.718945,4255.575773,-11562060.0,281899.617014,-86522.016147,1625.08361,286.469679,0.986632,11859.31152,456.138608,122.080641,0.243272
1,713,53.085938,-27.784405,223.525509,-54.460748,1.8181,1.6267,0.2552,45.4063,0.007,88,1,60674.0798,-8.669188,2.216094,0,0,59825.26,-14.735178,0.639458,60674.0798,14.770886,9.115748,350,21089870.0,-498.17276,825.86699,310.247414,6.471144,1.509888,14.770886,9.115748,-498.17276,825.86699,-1.423351,2.35962,9.110147,1.013889,-498.17276,1.013889,-1.423351,1.013889,14.770886,9.115748,657.694251,825.86699,9.110147,2.857584,4936.586924,3038.174287,95747.45,145632.756253,981.928803,826.13714,1.187667,1.059203,314.762389,240.883371,2.295335,0.131081
2,730,33.574219,-6.579593,170.455585,-61.548219,0.232,0.2262,0.0157,40.2561,0.021,42,1,60652.166,-1.497761,5.210212,0,5,59798.3205,-19.159811,0.695106,60652.166,47.310059,11.281384,330,19873240.0,748.253237,815.450286,302.781791,8.022239,1.721134,47.310059,11.281384,748.253237,815.450286,2.267434,2.471061,1.177371,1.3643,-21.964732,1.3643,-1.178117,1.261774,47.310059,11.281384,752.289347,815.450286,2.423176,2.852247,7373.074188,2999.147222,65308.34,136359.215481,266.444608,817.54139,17.189027,2.247293,286.501217,241.159072,0.964939,0.134242
3,745,0.189873,-45.586655,328.254458,-68.969298,0.3037,0.2813,1.1523,40.7951,0.007,90,1,60624.0722,2.976642,4.417903,0,5,59770.3662,-15.494463,0.56717,60624.0722,220.795212,55.892746,351,21130140.0,3127.131254,897.007002,306.144145,27.558208,3.537324,220.795212,55.892746,3127.131254,897.007002,8.909206,2.555576,0.161398,0.619084,-5.122443,0.619084,-0.720523,0.610841,220.795212,55.892746,3145.846778,897.007002,14.895624,2.778598,51375.481456,7028.121168,627552.3,151328.66046,2718.72488,843.419415,100.22952,17.822295,1417.156478,262.779735,5.851403,0.227629
4,1124,352.711273,-63.823658,316.922299,-51.059403,0.1934,0.2415,0.0176,40.4166,0.024,90,1,60624.2132,6.331686,7.496435,0,5,59750.4229,-16.543753,0.695277,60624.2132,143.600189,11.38369,352,21188790.0,2515.287161,969.057329,310.024487,20.051722,1.933837,143.600189,11.38369,2515.287161,969.057329,7.145702,2.753004,1.929243,1.531847,-21.471966,1.531847,-3.545689,1.531847,143.600189,11.38369,2515.287161,969.057329,7.376462,4.111077,19640.701538,3818.555806,234799.8,172192.258916,794.732089,987.490122,63.265464,0.950609,960.678845,280.168342,3.167277,0.17029


In [321]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.impute import SimpleImputer

imputer = SimpleImputer()
train_features = imputer.fit_transform(feature_matrix.drop(columns = ['object_id', 'target']))
model = RandomForestClassifier(n_estimators = 100, max_depth = 20)
model.fit(train_features, np.array(train_meta['target']))

RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
            max_depth=20, max_features='auto', max_leaf_nodes=None,
            min_impurity_decrease=0.0, min_impurity_split=None,
            min_samples_leaf=1, min_samples_split=2,
            min_weight_fraction_leaf=0.0, n_estimators=100, n_jobs=None,
            oob_score=False, random_state=None, verbose=0,
            warm_start=False)

In [322]:
trainp.make_predictions(model, imputer)

In [323]:
partitions = list(range(10))

for p in partitions:
    astrop = AstroP(partition=p, train = False, feature_defs = trainp.feature_defs)
    astrop.calculate_features()
    astrop.save_features()
    astrop.make_predictions(model, imputer)

All Features Already Calculated for Partition
All Features Already Calculated for Partition


In [324]:
astrop.feature_matrix.head()

Unnamed: 0,object_id,ra,decl,gal_l,gal_b,hostgal_specz,hostgal_photoz,hostgal_photoz_err,distmod,mwebv,ddf,LAST(data.mjd),LAST(data.flux),LAST(data.flux_err),LAST(data.detected),LAST(data.passband),MIN(data.mjd),MIN(data.flux),MIN(data.flux_err),MAX(data.mjd),MAX(data.flux),MAX(data.flux_err),COUNT(data),SUM(data.mjd),SUM(data.flux),SUM(data.flux_err),STD(data.mjd),STD(data.flux),STD(data.flux_err),LAST(data.CUM_MAX(flux by object_id)),LAST(data.CUM_MAX(flux_err by object_id)),LAST(data.CUM_SUM(flux by object_id)),LAST(data.CUM_SUM(flux_err by object_id)),LAST(data.CUM_MEAN(flux by object_id)),LAST(data.CUM_MEAN(flux_err by object_id)),MIN(data.CUM_MAX(flux by object_id)),MIN(data.CUM_MAX(flux_err by object_id)),MIN(data.CUM_SUM(flux by object_id)),MIN(data.CUM_SUM(flux_err by object_id)),MIN(data.CUM_MEAN(flux by object_id)),MIN(data.CUM_MEAN(flux_err by object_id)),MAX(data.CUM_MAX(flux by object_id)),MAX(data.CUM_MAX(flux_err by object_id)),MAX(data.CUM_SUM(flux by object_id)),MAX(data.CUM_SUM(flux_err by object_id)),MAX(data.CUM_MEAN(flux by object_id)),MAX(data.CUM_MEAN(flux_err by object_id)),SUM(data.CUM_MAX(flux by object_id)),SUM(data.CUM_MAX(flux_err by object_id)),SUM(data.CUM_SUM(flux by object_id)),SUM(data.CUM_SUM(flux_err by object_id)),SUM(data.CUM_MEAN(flux by object_id)),SUM(data.CUM_MEAN(flux_err by object_id)),STD(data.CUM_MAX(flux by object_id)),STD(data.CUM_MAX(flux_err by object_id)),STD(data.CUM_SUM(flux by object_id)),STD(data.CUM_SUM(flux_err by object_id)),STD(data.CUM_MEAN(flux by object_id)),STD(data.CUM_MEAN(flux_err by object_id))
0,20009,33.574219,-6.579593,170.455585,-61.548219,0.4098,0.383,0.0287,41.5737,0.021,1,60652.166,-9.577223,5.21041,0,5,59798.3205,-11.557432,0.694576,60652.166,56.558254,11.295143,330,19873240.0,1268.682081,816.53099,302.781791,10.124093,1.721242,56.558254,11.295143,1268.682081,816.53099,3.844491,2.474336,-0.175194,1.363742,-17.855209,1.363742,-0.645806,1.26161,56.558254,11.295143,1301.508107,816.53099,5.954076,2.85228,11814.523448,3001.328446,204597.685022,136563.654092,858.936602,818.402263,22.971386,2.253521,596.373661,241.651931,2.41084,0.134996
1,33009,347.861847,-61.943836,321.519104,-51.424048,,1.298,0.2619,44.8009,0.017,1,60624.2132,1.492075,7.434762,0,5,59750.4229,-20.470024,0.668844,60624.2132,21.552124,11.289636,352,21188790.0,248.948654,953.378348,310.024487,4.028392,1.924182,21.552124,11.289636,248.948654,953.378348,0.70724,2.708461,-1.207479,1.50616,-3.335152,1.50616,-1.207479,1.50616,21.552124,11.289636,269.126899,953.378348,2.219693,4.060926,7123.347749,3785.35433,66959.574261,169642.349453,409.446104,973.735469,4.159731,0.946251,79.178867,275.450372,0.539142,0.170176
2,51009,33.398438,-4.331149,167.226341,-59.936551,0.3056,0.5203,0.5404,42.3648,0.018,1,60652.166,9.963378,5.200578,0,5,59798.3205,-19.047565,0.690592,60652.166,23.039862,11.249372,330,19873240.0,354.066028,810.973602,302.781791,4.217869,1.717779,23.039862,11.249372,354.066028,810.973602,1.072927,2.457496,0.014427,1.35594,-26.921005,1.35594,-1.560131,1.251436,23.039862,11.249372,354.066028,810.973602,1.087207,2.838865,4251.072789,2990.711026,24001.348116,135638.423405,59.195105,813.21609,7.453385,2.241016,126.042869,239.839413,0.519596,0.133681
3,53009,52.207031,-28.29155,224.208534,-55.300157,,0.7537,0.0439,43.3437,0.007,1,60674.0798,2.030457,1.832701,0,0,59825.26,-15.010591,0.467387,60674.0798,44.233864,9.080844,350,21089870.0,891.676439,772.0424,310.247414,8.431158,1.536201,44.233864,9.080844,891.676439,772.0424,2.547647,2.205835,-0.109828,0.929863,-16.048998,0.929863,-2.450985,0.929863,44.233864,9.080844,899.390831,772.0424,3.863937,2.77844,9892.42813,3024.433759,142231.731168,136486.890928,555.150489,776.725082,15.861669,1.068654,406.781657,225.130046,1.512367,0.132407
4,74009,359.814819,-44.399834,330.775011,-69.801007,,0.5705,0.0577,42.6064,0.009,1,60624.0722,14.379038,4.435518,0,5,59770.3662,-12.704075,0.57597,60624.0722,35.827271,56.024967,351,21130140.0,965.966927,898.516172,306.144145,6.999946,3.546588,35.827271,56.024967,965.966927,898.516172,2.752043,2.559875,0.608991,0.622723,-4.586205,0.622723,-0.655172,0.614921,35.827271,56.024967,965.966927,898.516172,2.752043,2.788307,5717.037814,7037.671935,63118.169987,151287.860511,275.028796,843.85754,10.138179,17.875275,276.664799,262.812233,0.783502,0.228618


In [325]:
import findspark
findspark.init('/usr/local/spark-2.3.2-bin-hadoop2.7/')
import pyspark

In [326]:
spark_conf = pyspark.SparkConf()
# spark_conf.set('num.executors', 1)
spark_conf.set('spark.executor.memory', '56G')
spark_conf.getAll()

[('spark.master', 'local[*]'),
 ('spark.submit.deployMode', 'client'),
 ('spark.executor.memory', '56G'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'pyspark-shell')]

In [327]:
sc.stop()
sc = pyspark.SparkContext(master = 'spark://ip-172-31-79-46.ec2.internal:7077',
                                     appName = 'astro', conf = spark_conf)

In [328]:
def partition_prediction(p):
    astrop = AstroP(partition=p, train = False, feature_defs = trainp.feature_defs)
    astrop.calculate_features()
    astrop.save_features()
    astrop.make_predictions(model, imputer)
    return 1

In [None]:
r = sc.parallelize(list(range(1000)), numSlices = 1000).\
   map(partition_prediction).collect()
sc.stop()

In [None]:
preds = []
for p in range(1000):
    preds.append(pd.read_csv(f'{PATH}p{p}/predictions.csv', skiprows = [0], header = None))
    print(f'{round(100* (p / 1000), 2)}% complete.', end = '\r')
    
    
preds = pd.concat(preds)
preds.head()

In [None]:
header = list(pd.read_csv(f'{PATH}p0/predictions.csv', nrows = 1).columns)
header