In [1]:
from pyspark.sql import Row
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.types import FloatType
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
from pyspark.sql import functions as F
from pyspark.sql.functions import explode, col, udf, mean as _mean, stddev as _stddev, log, log10
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.functions import lit
#sc = SparkContext()
#spark = SparkSession(sc)
spark = SparkSession.builder.master('local[*]').config('spark.executor.memory','30g').appName('my-cool-app').getOrCreate()

In [2]:
spark.conf.get('spark.executor.memory')

'30g'

In [3]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, StandardScaler, VectorAssembler, VectorSlicer, PCA
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier, NaiveBayes, LogisticRegression
import time
from pyspark.ml.functions import vector_to_array
from pyspark.ml.clustering import KMeans
import re
from functools import reduce

In [4]:
# Functions for Feature Engineering
def encode_cat_features(df, cat_features):

  indexed_cols = [''.join([col_name, '_indexed']) for col_name in cat_features]
  encoded_cols = [''.join([col_name, '_encoded']) for col_name in cat_features]
  string_indexers = [StringIndexer(inputCol=cat_features[i], outputCol=indexed_cols[i]) for i in range(len(cat_features))]
    
  encoder = OneHotEncoder(inputCols=indexed_cols, outputCols=encoded_cols)
  
  pipline = Pipeline(stages=string_indexers + [encoder])
  
  encoded_df = pipline.fit(df).transform(df)
  encoded_df = encoded_df.drop(*indexed_cols + cat_features)

  return encoded_df

def normalize_features(df, cols, normalizer, output_cols, if_drop=True):
  """
  """
  normalizer_lst = []
  vectorized_cols = []
  vector_assembers = []
  
  if isinstance(cols, list):
    cols = {'cols': cols}
  
  if isinstance(output_cols, str):
    output_cols = {'cols': output_cols}
  
  for k, v in cols.items():
    
    temp_normalizer = normalizer.copy()
    vectorized_col = ''.join([output_cols[k], '_v'])
    vector_assember = VectorAssembler(inputCols=v, outputCol=vectorized_col)
    
    temp_normalizer.setInputCol(vectorized_col)
    temp_normalizer.setOutputCol(output_cols[k])
    
    normalizer_lst.append(temp_normalizer)
    vectorized_cols.append(vectorized_col)
    vector_assembers.append(vector_assember)
  
  pipline = Pipeline(stages=vector_assembers + normalizer_lst)
  normalized_df = pipline.fit(df).transform(df).drop(*vectorized_cols)
  
  if if_drop:
    
    for k, v in cols.items():
      
      normalized_df = normalized_df.drop(v)
  
  return normalized_df

def add_pca_features(df, g_cols, c_cols, k=40):
  
  ## normalize g-col and c-col
  std_scaler = StandardScaler(withMean=True)
  
  input_cols = {
    'g_cols': g_cols, 
    'c_cols': c_cols}
  
  output_cols = {
    'g_cols': 'g_normalized', 
    'c_cols': 'c_normalized'}
  
  normalized_df = normalize_features(df, input_cols, std_scaler, output_cols, if_drop=False)
  
  ## perform PCA on g-cols and c-cols
  g_col_pca = PCA(k=k, inputCol='g_normalized', outputCol='g_col_pca')
  c_col_pca = PCA(k=k, inputCol='c_normalized', outputCol='c_col_pca')
  
  pipeline = Pipeline(stages=[g_col_pca, c_col_pca])
  pca_df = pipeline.fit(normalized_df).transform(normalized_df)
  
  return pca_df
  
def add_stats_features(df, g_cols, c_cols):
  
  @udf('double')
  def cols_sum(*lst):

    return sum(lst)

  @udf('double')
  def cols_mean(*lst):

    n = len(lst)
    s = sum(lst)

    return s / n

  @udf('double')
  def cols_var(*lst):

    n = len(lst)
    s = sum(lst) / n
    total = 0

    for x in lst:

      total += (x - s)**2 

    return total / n
  
  @udf('double')
  def cols_min(*lst):
    
    return min(lst)
  
  @udf('double')
  def cols_max(*lst):
    
    return max(lst)
  
  stats_dict = {
    'min_stats': cols_min,
    'max_stats': cols_max,
    'var_stats': cols_var,
    'mean_stats': cols_mean,
    'sum_stats': cols_sum
  }
  
  for name, func in stats_dict.items():
    
    df = df.withColumn(''.join(['g_cols_', name]), func(*[col(g_col) for g_col in g_cols]))
    df = df.withColumn(''.join(['c_cols_', name]), func(*[col(c_col) for c_col in c_cols]))
  
  return df

def add_kmeans_features(df, g_cols, c_cols, k=2, num_iter=10):
  
  kmeans_g = KMeans(k=k, featuresCol=g_cols, predictionCol='g_col_k_mean', seed=16)
  kmeans_c = KMeans(k=k, featuresCol=c_cols, predictionCol='c_col_k_mean', seed=16)
  
  kmeans_df = kmeans_g.fit(df).transform(df)
  kmeans_df = kmeans_c.fit(kmeans_df).transform(kmeans_df)
  
  return kmeans_df

def feature_engineering(df, num_cluster=2, num_comp=40, num_iter=10):
  
  ## get g-col and c-col
  g_cols = list(filter(lambda v: re.match('g-.+', v), df.columns))
  c_cols = list(filter(lambda v: re.match('c-.+', v), df.columns))
  
  ## PCA
  pca_df = add_pca_features(df, g_cols, c_cols, num_comp)

  ## stats features on g and c cols
  stats_df = add_stats_features(pca_df, g_cols, c_cols)
  
  ## add k-means features
  kmeans_df = add_kmeans_features(stats_df, g_cols='g_normalized', c_cols='c_normalized', k=num_cluster, num_iter=num_iter)
  
  return kmeans_df

In [5]:
from tqdm import tqdm
# Multilabel Classifier
class MultiLabelClassifier:
    
    def __init__(self, clf, labels, feature_col,  
                 hyperparameters={}, 
                 predict_col=['probability','prediction'],
                 method=lambda prob_col, pred_col: float(pred_col if len(prob_col) == 1 else prob_col[1])):
        '''
        Initialize a multilabelclassifier
        clf: the model to use
        labels: a list of labels to predict
        feature_col: the feature column
        predict_col: the prediction column where the prediction is located
        hyperparameters: all optional hyperparameters that can tune
        method: a method of how to get the final prediction for one class
        '''
        self.clf = clf
        self.labels = labels
        self.feature_col = feature_col
        self.predict_col = predict_col
        self.hyperparameters = hyperparameters
        self.method = method
        self._trained_clfs = []
        self.res = None

    def fit(self, train):
        train.cache()
        self._trained_clfs = [self.clf(labelCol=label, featuresCol=self.feature_col, **self.hyperparameters)
                              .fit(train) 
                              for label in tqdm(self.labels)]
        train.unpersist()
        return self

    def transform(self, x_test):
        # convert method to udf
        get_predict = udf(self.method,FloatType())
        #target assembler
        va = VectorAssembler(inputCols=self.labels, outputCol='targets')
        ## transform this vector self.output_col to an array
        select_cols = [self.feature_col, 'targets', 'sig_id']
        res = va.transform(x_test).select(*select_cols)
        for i, clf in tqdm(enumerate(self._trained_clfs)):
            res = clf.transform(res)
            new_col = self.labels[i]
            res = res.withColumn(new_col, get_predict(*self.predict_col))
            select_cols.append(new_col)
            res = res.select(*select_cols)
        self.res = res
        return res.select(*select_cols[2:])
    
    def score(self):
        #target assembler
        va = VectorAssembler(inputCols=self.labels, outputCol='predicts')
        ## transform this vector self.output_col to an array
        df = va.transform(self.res).select('targets', 'predicts')
        df = df.withColumn('targets', vector_to_array('targets'))
        df = df.withColumn('predicts', vector_to_array('predicts'))
        import math
        @udf('double')
        def log_loss(y, y_hat):
            r = 0
            cut = 1e-15
            for t, p in zip(y, y_hat):
                p = max(min(p, 1-cut),cut)
                r += t * math.log(p) + (1 - t) * math.log(1 - p)
            return r/len(y)
        df = df.select(log_loss('targets','predicts').alias('log_loss'))
        return df.select((-_mean(col('log_loss'))).alias('score'))
        

In [6]:
from pyspark.ml import Transformer
class ProbTransformer(Transformer):
    
    def __init__(self, outputCol,
                 dropCols=['rawPrediction','probability','prediction'],
                 predict_col=['probability','prediction'],
                 method=lambda prob_col, pred_col: float(pred_col if len(prob_col) == 1 else prob_col[1])):
        self.outputCol = outputCol
        self.dropCols = dropCols
        self.predict_col = predict_col
        self.method = method
    
    def transform(self, data):
        get_predict = udf(self.method,FloatType())
        return data.withColumn(self.outputCol, get_predict(*self.predict_col)).drop(*self.dropCols)

In [7]:
from pyspark.ml.evaluation import Evaluator
class MultilabelEvaluator(Evaluator):
    
    def __init__(self, method=lambda prob_col, pred_col: float(pred_col if len(prob_col) == 1 else prob_col[1]), predictionCol=['probability', 'prediction'], labelCol="label"):
        
        self.predictionCol = predictionCol
        self.labelCol = labelCol
        self.method = method
        
    def _evaluate(self, dataset):
        
        get_predict = udf(self.method,FloatType())
        dataset = dataset.withColumn('pred_prob', get_predict(*self.predictionCol))
        cut = 1e-15
        new_dataset = dataset.select((-col(self.labelCol) * log(col('pred_prob') + cut) - (1.0 - col(self.labelCol)) * log(1.0 - col('pred_prob') + cut)).alias('log_loss'))        
        score = new_dataset.select(_mean(col('log_loss')).alias('score')).collect()[0]['score']
        return score
        
    def isLargerBetter(self):
        return False

In [8]:
def baseline(model, train_data, test_data):
    print('Start training')
    start = time.time()
    clf = model.fit(train_data)
    print('Train finished with time:', time.time() - start)
    
    print('Start training prediction')
    start = time.time()
    train_pred = clf.transform(train_data)
    print('Training prediction finished! time:', time.time() - start)

    print('Start training scoring')
    s = time.time()
    clf.score().show()
    print('Calculation finished with time:', time.time() - s)
    
    print('Start test prediction')
    start = time.time()
    validation_pred = clf.transform(test_data)
    print('Validation prediction finished! time:', time.time() - start)

    print('Start test scoring')
    s = time.time()
    clf.score().show()
    print('Calculation finished with time:', time.time() - s)


In [9]:
#file_path = '/FileStore/tables/'
file_path = './'
sample_id = 'sig_id'
train_df = spark.read.csv(file_path+'train_features.csv', header=True, inferSchema=True)
target_df = spark.read.csv(file_path+'train_targets_scored.csv', header=True, inferSchema=True)
train_drug_df = spark.read.csv(file_path+'train_drug.csv', header=True, inferSchema=True)
target_nonscored_df = spark.read.csv(file_path+'train_targets_nonscored.csv', header=True, inferSchema=True)
test_df = spark.read.csv(file_path+'test_features.csv', header=True, inferSchema=True)


In [10]:
## add indicator column to both train and test so we can combine them later
train_df = train_df.withColumn('is_test', lit(0))
test_df = test_df.withColumn('is_test', lit(1))

## Combine train and test df
full_df = train_df.union(test_df)

In [11]:
## encode features
target_cols = ['cp_type', 'cp_dose']
encoded_df = encode_cat_features(full_df, target_cols)

In [12]:
## feature engineering
fe_df = feature_engineering(encoded_df, num_comp=20, num_iter=5)

## select all the feature columns

pca_cols = list(filter(lambda v: re.match('.+_pca', v), fe_df.columns))
stats_cols = list(filter(lambda v: re.match('.+_stats', v), fe_df.columns))
k_means_cols = list(filter(lambda v: re.match('.+_k_mean', v), fe_df.columns))
cat_cols = list(filter(lambda v: re.match('.+_encoded', v), fe_df.columns)) + ['cp_time']

## stack them to a single feature vector
vector_assember_train = VectorAssembler(inputCols=pca_cols + stats_cols + k_means_cols + cat_cols, outputCol='all_features')
fe_df = vector_assember_train.transform(fe_df)

## normalize all the features
normalizer = StandardScaler(withMean=True)
cols = ['all_features']
output_cols = 'features'
fe_df = normalize_features(fe_df, cols, normalizer, output_cols, if_drop=False)

## split train, test df
fe_train = fe_df.filter(fe_df['is_test'] == 0)
final_test = fe_df.filter(fe_df['is_test'] == 1).select(['sig_id', 'features'])

## join training target with training features
labels = target_df.drop('sig_id').columns
final_train = fe_train.join(target_df, ['sig_id']).select(*(['sig_id','features']+labels))

#### 

In [13]:
## train test split
(train, validation) = final_train.randomSplit([0.8, 0.2], 16)


In [14]:
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import time

s = time.time()
# Cross Validation
labels = target_df.drop(sample_id).columns
train = train.select(*[sample_id, 'features']+labels)
label_probs = [label+'_prob' for label in labels]
feature_col = 'features'
#hyperparameters = {'maxIter':10}
hyperparameters = {}
clf = DecisionTreeClassifier
fold_num = 3

stages = []
for label in labels:
    model = clf(labelCol=label, featuresCol=feature_col, **hyperparameters)
    pipeline = Pipeline(stages=[model])
    paramGrid = ParamGridBuilder() \
    .addGrid(model.maxDepth, [2, 5]) \
    .addGrid(model.impurity, ['gini', 'entropy']) \
    .build()
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=MultilabelEvaluator(labelCol=label),
                              numFolds=2,
                              parallelism=3)  # use 3+ folds in practice
    probTransformer = ProbTransformer(outputCol=label+'_prob')
    stages.append(crossval)
    stages.append(probTransformer)
pipeline = Pipeline(stages=stages)
model = pipeline.fit(train)
prediction = model.transform(train)
prediction.select(*[sample_id]+label_probs).show()
print(time.time()-s)

Py4JJavaError: An error occurred while calling o133161.collectToPython.
: org.apache.spark.SparkException: Job 4385 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:979)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:977)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:977)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2257)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2170)
	at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:1973)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:1973)
	at org.apache.spark.SparkContext.$anonfun$new$35(SparkContext.scala:631)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
	at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2164)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:385)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3450)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3447)
	at jdk.internal.reflect.GeneratedMethodAccessor260.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/nick/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1207, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/nick/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1033, in send_command
    response = connection.send_command(command)
  File "/home/nick/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1211, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while receiving
----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 61714)
Traceback (most recent call last):
  File "/home/nick/miniconda3/lib/python3.8/socketserver.py", line 316, in _handle_reque

In [11]:
# baseline
labels = target_df.drop(sample_id).columns
# DecisionTreeClassifier baseline
model = MultiLabelClassifier(DecisionTreeClassifier, labels, 'features')
baseline(model, train, validation)

Start training


100%|██████████| 206/206 [06:00<00:00,  1.75s/it]
0it [00:00, ?it/s]

Train finished with time: 361.6467852592468
Start training prediction


206it [01:25,  2.40it/s]


Training prediction finished! time: 86.2536518573761
Start training scoring


0it [00:00, ?it/s]

+--------------------+
|               score|
+--------------------+
|0.015354268249097913|
+--------------------+

Calculation finished with time: 40.25636100769043
Start test prediction


206it [01:28,  2.32it/s]


Validation prediction finished! time: 89.02509927749634
Start test scoring
+------------------+
|             score|
+------------------+
|0.0345501050800465|
+------------------+

Calculation finished with time: 18.61404776573181


In [12]:
# LogisticRegression baseline
model = MultiLabelClassifier(LogisticRegression, labels, 'features')
baseline(model, train, validation)

Start training


100%|██████████| 206/206 [22:57<00:00,  6.69s/it]
0it [00:00, ?it/s]

Train finished with time: 1377.9056544303894
Start training prediction


206it [01:27,  2.35it/s]


Training prediction finished! time: 87.99577140808105
Start training scoring


0it [00:00, ?it/s]

+--------------------+
|               score|
+--------------------+
|0.014305106546989223|
+--------------------+

Calculation finished with time: 41.924421310424805
Start test prediction


206it [02:43,  1.26it/s]


Validation prediction finished! time: 164.35324668884277
Start test scoring
+--------------------+
|               score|
+--------------------+
|0.019969526627343505|
+--------------------+

Calculation finished with time: 28.378750562667847


In [13]:
# LogisticRegression baseline
from pyspark.ml.feature import MinMaxScaler

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel = scaler.fit(final_train)
scaledData = scalerModel.transform(final_train)
(train, validation) = scaledData.randomSplit([0.8, 0.2], 16)

hyperparameters = {'smoothing': 1,
                  'modelType':"multinomial"}
model = MultiLabelClassifier(NaiveBayes, labels, 'scaledFeatures')
baseline(model, train, validation)

Start training


100%|██████████| 206/206 [07:45<00:00,  2.26s/it]
0it [00:00, ?it/s]

Train finished with time: 466.04742336273193
Start training prediction


206it [01:26,  2.38it/s]


Training prediction finished! time: 86.83528208732605
Start training scoring


0it [00:00, ?it/s]

+--------------------+
|               score|
+--------------------+
|0.020495972786880496|
+--------------------+

Calculation finished with time: 31.32410764694214
Start test prediction


206it [01:25,  2.41it/s]


Validation prediction finished! time: 85.70474433898926
Start test scoring
+--------------------+
|               score|
+--------------------+
|0.020215978410364932|
+--------------------+

Calculation finished with time: 18.266181230545044


In [13]:
file_path = './'
sample_id = 'sig_id'
train = spark.read.orc(file_path+'train')
validation = spark.read.orc(file_path+'validation')
target_df = spark.read.csv(file_path+'train_targets_scored.csv', header=True, inferSchema=True)