In [0]:
from pyspark.sql import Row
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import FloatType
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
from pyspark.sql.functions import explode, col, udf, mean as _mean, stddev as _stddev, log, log10
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.functions import lit
from pyspark.sql.functions import udf, expr, concat, col, count, when, isnan

spark = SparkSession(sc)
sqlc=SQLContext(sc)
# import data
train_features = sqlc.read.csv('/FileStore/tables/train_features.csv', header = True,inferSchema=True)
test_features = sqlc.read.csv('/FileStore/tables/test_features.csv', header = True,inferSchema=True)
train_targets_scored = sqlc.read.csv('/FileStore/tables/train_targets_scored.csv', header = True,inferSchema=True)

In [0]:
from pyspark.ml.feature import StringIndexer
# data cleaning and encoding
df_train = train_features.drop('cp_type')
df_train_clean = StringIndexer().setInputCol("cp_dose").setOutputCol("cp_dose_enco").fit(df_train).transform(df_train)
df_train_clean = StringIndexer().setInputCol("cp_time").setOutputCol("cp_time_enco").fit(df_train_clean).transform(df_train_clean)
df_train_clean = df_train_clean.drop('cp_dose')
df_train_clean = df_train_clean.drop('cp_time')

df_clean = df_train_clean.join(train_targets_scored, "sig_id", "inner")
df_clean = df_clean.drop('sig_id')
df_target = train_targets_scored.drop('sig_id')

In [0]:
import pyspark.sql.functions as f
# select targets as input
select_ind = [13, 93, 141, 147, 3, 59, 157, 114, 80, 101]
select_target = []
for i in select_ind:
  select_target.append(df_target.columns[i-1])
select_target
#l_target_col = df_target.schema.names
#target_type = ['inhibitor', 'antagonist', 'agonist', 'activator', 'agent', 'stimulant', 'blocker', 'diuretic', 'donor', 'steroid', 'laxative', 'medium', 'anti', 'scavenger', 'local', 'secretagogue', 'vitamin', 'analgesic', 'immu']
#DF = spark.createDataFrame(l_target_col, StringType())
#select_target = (DF.filter(DF.value.contains(target_type[0]))).agg(f.collect_list(col('value'))).collect()[0][0]

In [0]:
from pyspark.ml.classification import MultilayerPerceptronClassifier  
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler

# model implementation
def mlpc (df_clean, hidden_1, hidden_2, index):
  
  # model preparation 
  layers = [874, hidden_1, hidden_2, 2]
  assembler = VectorAssembler(inputCols=[x for x in df_clean.schema.names if x not in l_target_col], outputCol="features")
  logloss_train = 0
  logloss_test = 0
  epsilon = 1e-16
  
  # implement the model by using the index of the selected target list
  for i in range(index):
    
    # split train/test set
    label = select_target[i]
    train, test = df_clean.randomSplit([0.7, 0.3], 1234)
    mlp = MultilayerPerceptronClassifier(featuresCol = 'features', labelCol = label, maxIter=100, layers=layers, blockSize=128,seed=1234)
    train_df = assembler.transform(train).select(['features', label])
    test_df = assembler.transform(test).select(['features', label])
    
    # fit the model  
    model = mlp.fit(train_df) 
    
    # prediction and loss of training set
    predictions_train = model.transform(train_df)
    prediction_train = predictions_train.withColumn('label', col(label))
    probability_ind = udf(lambda v: float(v[1]), FloatType())
    prediction_train = prediction_train.select(probability_ind('probability').alias('p'), 'label', 'prediction', 'probability')
    prediction_train = (prediction_train.withColumn('logloss_train', -f.col('label')*f.log(f.col('p') + epsilon) - (1.-f.col('label'))*f.log(1.- f.col('p') + epsilon)))
    logloss_train += prediction_train.agg(f.mean('logloss_train').alias('logloss_train')).collect()[0]['logloss_train']
    # print the logloss of each target
    print (logloss_train, 'logloss_train', select_ind[i], select_target[i])
    
    #prediction and loss of testing set
    predictions_test = model.transform(test_df)
    prediction_test = predictions_test.withColumn('label', col(label))
    prediction_test = prediction_test.select(probability_ind('probability').alias('p'), 'label', 'prediction', 'probability')
    prediction_test = (prediction_test.withColumn('logloss_test', -f.col('label')*f.log(f.col('p') + epsilon) - (1.-f.col('label'))*f.log(1.- f.col('p') + epsilon)))
    logloss_test += prediction_test.agg(f.mean('logloss_test').alias('logloss_test')).collect()[0]['logloss_test']
    # print the logloss of each target
    print (logloss_test, 'logloss_test', select_ind[i], select_target[i])
    
  # calculate the mean
  logloss_test /= (index+1)
  logloss_train /= (index+1)
    
  return logloss_test, logloss_train
# display(predictions.select("label", "prediction", "probability"))

In [0]:
mlpc (df_clean, 128, 128, len(select_target))