In [None]:
import numpy as np
import requests
import time

from keras.optimizers import *
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation

from pyspark import SQLContext, SparkContext
from pyspark import SparkConf

from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.linalg import Vectors

from distkeras.trainers import *
from distkeras.predictors import *
from distkeras.transformers import *
from distkeras.evaluators import *
from distkeras.utils import *

from sklearn.cluster import KMeans
from sklearn import metrics
from sklearn.model_selection import StratifiedKFold

%pylab inline

In [None]:
# Load the dataset and labels
x=np.load('MCx.npy')
y=np.load('MCy.npy')

In [None]:
conf = SparkConf()
sc = SparkContext(conf = conf)
spark = SQLContext(sc)

In [None]:
# Calculate the rank of each feature
R=[]
for h in range(x.shape[1]):
    kmeans = KMeans(init='k-means++', n_clusters=2, n_init=10)
    ff=kmeans.fit_predict(x[:,h].reshape(-1,1))
    r=metrics.homogeneity_score(y,ff) #Use the homogeneity score as a rank of the feature
    R.append(r)

In [None]:
np.hstack((np.arange(43).reshape(-1,1),((np.array(R)-0.5)*2).reshape(-1,1)))[argsort(np.array(R)),:]

In [None]:
#Arrange feature accroding to thier ranks
Rnk=np.argsort(np.array(R))

In [None]:
#Initiate the cross-validation splitter
kfolds=StratifiedKFold(n_splits=5,shuffle=True)

In [None]:
#Per each set of ranks, use cross-validation to calculate accuracy.
smr=[]
for j in range(Rnk.shape[0]):
    fd=x[:,Rnk[j:]]
    pp=0
    for train,test in kfolds.split(fd,y):
        dff = map(lambda x: (int(float(x[-1])), Vectors.dense(x[:-1])),np.hstack((fd[train],y[train].reshape(-1,1))))
        TrD = spark.createDataFrame(dff,schema=["label", "features"])
        dff = map(lambda x: (int(float(x[-1])), Vectors.dense(x[:-1])),np.hstack((fd[test],y[test].reshape(-1,1))))
        TsD = spark.createDataFrame(dff,schema=["label", "features"])
        model = Sequential()
        model.add(Dense(128,input_dim=fd.shape[1],activation='relu',use_bias=True))
        model.add(Dropout(0.5))
        model.add(Dense(64,activation='relu',use_bias=True))
        model.add(Dropout(0.5))
        model.add(Dense(32,activation='relu',use_bias=True))
        model.add(Dropout(0.5))
        model.add(Dense(9,activation='softmax',use_bias=True)) #The number of neurons is equal to the number of classes
        model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['categorical_accuracy'])
        trainer = SingleTrainer(keras_model=model, worker_optimizer='adam', loss='categorical_crossentropy', num_epoch=0)
        trained_model = trainer.train(TrD)
        st = time.time()
        predictor = ModelPredictor(keras_model=trained_model)
        et+=time.time()-st
        ts=np.array(predictor.predict(TsD).collect())
        pp=pp+metrics.accuracy_score(y[test],ts)
    pp=pp/kfolds.n_splits
    gg=predictor.predict(TsD).collect()
    smr.append([j, pp, et/2.540047]) #Calculate the time requires to predict a label per each object in uS.

In [None]:
smr