# Load everything

In [1]:
import pandas as pd
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt

import findspark  # Get rid of this in DataBricks
findspark.init()  # Get rid of this in DataBricks
from pyspark.sql import Row
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.types import FloatType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import IntegerType

from pyspark.sql.functions import udf
from pyspark.sql import functions as F 
from pyspark.sql.functions import explode, col, udf, mean as _mean, stddev as _stddev, log, log10
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.functions import lit

from pyspark.ml.feature import StringIndexer

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation


from pyspark.ml.tuning import CrossValidator, ParamGridBuilder



from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorSlicer
from pyspark.sql.window import Window
from pyspark.sql.functions import monotonically_increasing_id 
from pyspark.ml.feature import PCA
from pyspark.ml.classification import GBTClassifier


In [2]:
from tqdm import tqdm

In [3]:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.functions import vector_to_array


In [4]:
config = SparkConf().setAll([('spark.executor.memory', '30g'), ('spark.executor.cores', '4'), ('spark.cores.max', '6'), ('spark.driver.memory','8g')])
config.setAppName("proj")
config.set("spark.dynamicAllocation.minExecutors", "2");
config.set("spark.dynamicAllocation.maxExecutors", "2");
config.set("spark.dynamicAllocation.initialExecutors", "2"); # the number must be between the min and max

# config.set("spark.sql.execution.arrow.enabled", "true")

sc = SparkContext(conf=config)  # start a new sc with the current config
spark = SparkSession(sc)
sqlc=SQLContext(sc)
print(sc.getConf().getAll())  # print all the configuration

[('spark.dynamicAllocation.initialExecutors', '2'), ('spark.executor.memory', '30g'), ('spark.app.id', 'local-1607225993012'), ('spark.executor.id', 'driver'), ('spark.dynamicAllocation.minExecutors', '2'), ('spark.driver.host', '172.18.40.157'), ('spark.executor.cores', '4'), ('spark.dynamicAllocation.maxExecutors', '2'), ('spark.rdd.compress', 'True'), ('spark.driver.port', '38447'), ('spark.driver.memory', '8g'), ('spark.serializer.objectStreamReset', '100'), ('spark.master', 'local[*]'), ('spark.submit.pyFiles', ''), ('spark.submit.deployMode', 'client'), ('spark.ui.showConsoleProgress', 'true'), ('spark.app.name', 'proj'), ('spark.cores.max', '6')]


In [5]:
df_train = spark.read.csv('train_feats.csv', header='true', inferSchema= 'true')   # path in HDFS file system
df_label = spark.read.csv('train_targets_scored.csv', header='true', inferSchema= 'true')
df = df_train.join(df_label, on=['sig_id'], how='left_outer')  # Jjoin them together

# Drop vechile

In [6]:
df = df.filter(df.cp_type == 'trt_cp')
df = df.drop('cp_type')

# OneHot

In [7]:
indexer = StringIndexer(inputCol="cp_dose", outputCol="cp_dose_cat")
df1 = indexer.fit(df).transform(df)
indexer = StringIndexer(inputCol="cp_time", outputCol="cp_time_cat")
df1 = indexer.fit(df1).transform(df1)
df1 = df1.drop('cp_dose')
df1 = df1.drop('cp_time')

encoder = OneHotEncoder(inputCols=["cp_time_cat", "cp_dose_cat"],
                        outputCols=["cp_time_onehot", "cp_dose_onehot"])

model = encoder.fit(df1)
df1 = model.transform(df1)
df1 = df1.withColumn("cp_time_cols", vector_to_array("cp_time_onehot")).select(df1.columns + [col("cp_time_cols")[i] for i in range(2)])
df1 = df1.withColumn("cp_dose_cols", vector_to_array("cp_dose_onehot")).select(df1.columns + [col("cp_dose_cols")[i] for i in range(1)])
df1 = df1.drop('cp_dose_cat',
 'cp_time_cat',
 'cp_time_onehot',
 'cp_dose_onehot',
)


# Feature Engineering  (credit to Jeff)

In [8]:
gene_feature_names = [name for name in df1.columns if 'g-' in name]
cell_feature_names =  [name for name in df1.columns if 'c-' in name]

df2 = df1.withColumn("gene_max", F.greatest(*gene_feature_names))
df2 = df2.withColumn("gene_min", F.least(*gene_feature_names))
df2 = df2.withColumn("cell_max", F.greatest(*cell_feature_names))
df2 = df2.withColumn("cell_min", F.least(*cell_feature_names))


# Featrue engineering -- gene sequence

In [13]:
# gene_sub_df = df2.select(*gene_feature_names)
# gene_sub_df_T = spark.createDataFrame(gene_sub_df.toPandas().T)

# w = Window.rowsBetween(-10,0)


# gene_sub_df_T_roll = gene_sub_df_T.select(
#     '*', 
#     *( F.avg(i).over(w).alias(i + '_roll') for i in gene_sub_df_T.columns)
# ).drop(*gene_sub_df_T.columns)  # apply rolling average transformation for each sample
# gene_rolled = spark.createDataFrame(gene_sub_df_T_roll.toPandas().T)

# gene_rolled = gene_rolled.select([col(c).alias('g_' + c + '_rolled') for c in gene_rolled.columns])\
#                                 .drop(*gene_rolled.columns)  # just renameing the columns... that's all


# window = Window.orderBy(F.col('monotonically_increasing_id'))
# gene_rolled = gene_rolled.withColumn("monotonically_increasing_id", F.monotonically_increasing_id())\
#                         .withColumn('row_number2', F.row_number().over(window))\
#                         .drop('monotonically_increasing_id')

# gene_rolled.write.parquet('gene_rolled.parquet')

In [9]:
gene_rolled = spark.read.parquet("gene_rolled.parquet")

In [10]:

window = Window.orderBy("monotonically_increasing_id")
df3 = df2.withColumn("monotonically_increasing_id", F.monotonically_increasing_id())\
                        .withColumn('row_number2', F.row_number().over(window))\
                        .drop('monotonically_increasing_id')        

df4 = df3.join(gene_rolled, on = 'row_number2', how = 'left')\
                .drop('row_number2')\
                .drop(*gene_feature_names)  # merging with the main frame


# Drop high correlation features -- mostly in cell features

In [11]:
feature_columns = cell_feature_names  # This came from the previous section
vectorAssembler = VectorAssembler(inputCols = feature_columns, outputCol = 'feats' )
feature_vector = vectorAssembler.transform(df2).select("feats")
# pyspark implementation of determining the correlations
corr_matrix = Correlation.corr(feature_vector, "feats").head()[0]

# Convert the correlation desne matrix and apply mask and to get the indicies where high correlations are observed
# In here, I convert the correlation matrix to numpy, and then use numpy's mask to obtain the lower traingle of the
# matrix. I used numpy becasue pyspark does not have mask.

corr_Array = corr_matrix.toArray()
masked_corr = np.ma.masked_where(np.triu(np.ones_like(corr_Array, dtype=bool)), corr_Array, copy=True)  
idx_high_corr_feats = set(np.argwhere(masked_corr > 0.90)[:,0])  # Set threshold to 90%

# Identify the column to drop and then drop it.
features_to_drop = np.array(feature_columns)[list(idx_high_corr_feats)].tolist()


# Finally

df5 = df4.drop(*features_to_drop)

# Feature trans

???

# Training

In [12]:
final_feature_names = list(set(df5.columns) - set(df_label.columns))

vectorAssembler = VectorAssembler(inputCols = final_feature_names, outputCol = 'feats' )
df6 = vectorAssembler.transform(df5).drop(*final_feature_names)

## PCA

In [13]:
k = 100 
pca = PCA(k = k, inputCol='feats', outputCol='pca_features')
df7 = pca.fit(df6).transform(df6)

---

In [25]:
def train_individual_label(df, label_name):
    
    
    if df.filter(df[label_name] == 1).count() >= 1:
    
        temp_df = df.select('pca_features', label_name)

        # stratify split of the dataframe for train-test split
        seed = 42
        fractions = {1: 0.8, 0: 0.8}
        train_df = temp_df.stat.sampleBy(label_name, fractions, seed, )
        test_df =  temp_df.subtract(train_df)


        # Over/down sampling of the training dataframe, becasue of the imbalanced class label

        activation_samples = train_df.filter(train_df[label_name] == 1)
        non_activation_samples = train_df.filter(train_df[label_name] == 0)
        ratio = activation_samples.count() / non_activation_samples.count()

        default_down_sample_ratio = 0.5  # This can be changed, but for now,i am just setting it as 0.5

        upsample_ratio = default_down_sample_ratio / ratio

        activation_samples_up = activation_samples.sample(True, upsample_ratio, 42)   # Upsample the activation samples

        non_activation_samples_down = non_activation_samples.sample(True, default_down_sample_ratio, 42)  # Meanwhile, downsampling the non-activated samples 


        final_train_df = activation_samples_up.union(non_activation_samples_down).orderBy(F.rand())


        



        


        # Finally, use CV to train the model and get the best parameters

        clf = GBTClassifier(featuresCol='pca_features',  labelCol = label_name, maxIter=10)
        paramGrid = ParamGridBuilder()\
                        .addGrid(clf.stepSize, [0.1,  0.05])\
                        .build()
        
        evaluator = MulticlassClassificationEvaluator(labelCol=label_name, metricName='logLoss')
        crossval = CrossValidator(estimator=clf,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=2)  

        cvModel = crossval.fit(final_train_df)
        prediction_df = cvModel.transform(final_train_df)

        cvModel.save(f"./GBT/{label_name}.model")
        prediction_df.write.save(f"./GBT/{label_name}_prediction_df.parquet", format="parquet")
        final_train_df.write.save(f"./GBT/{label_name}_train_df.parquet", format="parquet")

        
        prediction_test = cvModel.transform(test_df)
        log_loss = evaluator.evaluate(prediction_test)

        with open(f"./GBT/log.log", 'a') as f:
            f.write(str(log_loss) + '\n') 

        return True
    else:
        return False

Stratify 
ref: https://stackoverflow.com/questions/47637760/stratified-sampling-with-pyspark/47672336

In [21]:
with open(f"./GBT/log.log", 'w') as f:
    pass

train_individual_label(df7,  '5-alpha_reductase_inhibitor',)

True

In [None]:
with open(f"./logistics/log.log", 'w') as f:
    pass


temp_dict = {}
for name in tqdm(df_label.columns[1:]):
    temp_dict[name] = train_individual_label(df7,  name)

 33%|███▎      | 67/206 [9:56:46<22:07:23, 572.98s/it]

# SCRAP

In [None]:
gene_rolled.show(1)

In [44]:
gene_sub_df = df2.select(*gene_feature_names)
gene_sub_df_T = spark.createDataFrame(gene_sub_df.toPandas().T)

# w = Window.rowsBetween(-10,0)


# gene_sub_df_T_roll = gene_sub_df_T.select(
#     '*', 
#     *( F.avg(i).over(w).alias(i + '_roll') for i in gene_sub_df_T.columns)
# ).drop(*gene_sub_df_T.columns)  # apply rolling average transformation for each sample
# gene_rolled = spark.createDataFrame(gene_sub_df_T_roll.toPandas().T)

# gene_rolled.write.save(f"gene_rolled_1.parquet", format="parquet")


In [52]:
gene_sub_df_T_roll.toPandas().T

KeyboardInterrupt: 

In [None]:
gene_sub_df_T.columns

In [None]:
gene_sub_df_T.show(1)

In [43]:
gene_sub_df.toPandas().T

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,21938,21939,21940,21941,21942,21943,21944,21945,21946,21947
g-0,1.0620,0.0743,0.6280,-0.5138,-0.3254,-0.6111,2.044,0.2711,-0.3014,-0.0630,...,0.4123,-1.0140,1.7380,-0.1150,0.1420,0.1608,0.1394,-1.3260,0.6660,-0.8598
g-1,0.5577,0.4087,0.5817,-0.2491,-0.4009,0.2941,1.700,0.5133,0.5545,0.2564,...,-0.1551,0.1709,-1.2900,-0.8037,-0.3696,-1.0500,-0.0636,0.3478,0.2324,1.0240
g-2,-0.2479,0.2991,1.5540,-0.2656,0.9700,-0.9901,-1.539,-0.1327,-0.2576,-0.5279,...,1.8100,-0.4291,-0.4533,0.0988,-0.0093,0.2551,-0.1112,-0.3743,0.4392,-0.1361
g-3,-0.6208,0.0604,-0.0764,0.5288,0.6919,0.2277,5.944,2.5950,-0.1390,-0.2541,...,0.5042,1.8750,-1.1640,-0.1301,-0.2495,-0.2239,-0.5080,0.9905,0.2044,0.7952
g-4,-0.1944,1.0190,-0.0323,4.0620,1.4180,1.2810,-2.167,0.6980,-0.6487,-0.0182,...,-1.2380,0.9859,-0.4841,0.2013,-0.0175,-0.2431,-0.4713,-0.7178,0.8531,-0.3611
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
g-767,-0.5582,-0.1214,0.8427,0.5699,1.7660,0.1498,-4.810,-1.8910,1.5170,-0.0956,...,-0.6917,0.2422,-0.4985,-0.5512,0.0186,0.5243,-0.2297,-0.1022,-2.8720,-0.4017
g-768,0.3008,-0.1626,0.5797,0.1996,-1.0020,-0.4674,4.713,1.1250,0.1690,-0.4281,...,-0.2549,0.0374,0.9495,0.1937,-0.4246,-0.0003,0.7221,0.5247,0.1794,1.5410
g-769,1.6490,-0.3340,0.3143,0.4374,-0.7534,0.9579,-5.431,0.6872,0.7831,0.6985,...,0.3235,1.2840,-0.0680,-0.3399,0.0017,0.1715,0.5099,0.5438,0.3109,0.3633
g-770,0.2968,-0.3289,0.8133,0.1588,0.5000,0.1993,4.011,0.0641,-0.4754,-0.8000,...,-0.1167,-0.8051,-0.4224,0.2947,-0.4474,0.8418,-0.1423,-0.1875,-0.3491,-3.1970


In [32]:
string = """Product,Category,Revenue
Thin,phone,6000
Normal,tablet,1500
Mini,tablet,5500
Ultra Thin,phone,5000
Very Thin,phone,6000
Big,tablet,2500
Bendable,phone,3000
Foldable,phone,3000
Pro,tablet,4500
Pro+,tablet,6500"""

with open("example.csv", "w") as f:
    f.write(string)

In [37]:
w = Window.rowsBetween(-10,0)

dfdddd = spark.read.csv('example.csv', header='true',inferSchema=True)   # path in HDFS file system


dfdddd.select(
    '*', 
    F.avg('Revenue').over(w)
).show()

+----------+--------+-------+--------------------------------------------------------------+
|   Product|Category|Revenue|avg(Revenue) OVER (ROWS BETWEEN -10 FOLLOWING AND CURRENT ROW)|
+----------+--------+-------+--------------------------------------------------------------+
|      Thin|   phone|   6000|                                                        6000.0|
|    Normal|  tablet|   1500|                                                        3750.0|
|      Mini|  tablet|   5500|                                             4333.333333333333|
|Ultra Thin|   phone|   5000|                                                        4500.0|
| Very Thin|   phone|   6000|                                                        4800.0|
|       Big|  tablet|   2500|                                             4416.666666666667|
|  Bendable|   phone|   3000|                                             4214.285714285715|
|  Foldable|   phone|   3000|                                         

In [35]:
dfdddd.show()

+----------+--------+-------+
|   Product|Category|Revenue|
+----------+--------+-------+
|      Thin|   phone|   6000|
|    Normal|  tablet|   1500|
|      Mini|  tablet|   5500|
|Ultra Thin|   phone|   5000|
| Very Thin|   phone|   6000|
|       Big|  tablet|   2500|
|  Bendable|   phone|   3000|
|  Foldable|   phone|   3000|
|       Pro|  tablet|   4500|
|      Pro+|  tablet|   6500|
+----------+--------+-------+



In [60]:
my_set = {'Geeks', 'for', 'geeks'} 
  
s = list(my_set)

TypeError: 'list' object is not callable

In [62]:
type(list)

list

In [None]:
cell_feature_names

In [32]:
df.select("cp_type").distinct().show()

+-------+
|cp_type|
+-------+
| trt_cp|
+-------+



In [None]:
fractions = df.select("x1").distinct().withColumn("fraction", lit(0.8)).rdd.collectAsMap()


In [6]:
from pyspark.sql.functions import lit


In [7]:
lit(0.8)

Column<b'0.8'>

In [8]:
from pyspark.sql.functions import lit
list = [(2147481832,23355149,1),(2147481832,973010692,1),(2147481832,2134870842,1),(2147481832,541023347,1),(2147481832,1682206630,1),(2147481832,1138211459,1),(2147481832,852202566,1),(2147481832,201375938,1),(2147481832,486538879,1),(2147481832,919187908,1),(214748183,919187908,1),(214748183,91187908,1)]
df = spark.createDataFrame(list, ["x1","x2","x3"])

In [9]:
df

DataFrame[x1: bigint, x2: bigint, x3: bigint]

In [10]:
fractions = df.select("x1").distinct().withColumn("fraction", lit(0.8)).rdd.collectAsMap()


In [12]:
fractions

{2147481832: 0.8, 214748183: 0.8}

In [15]:
df.select("x1").distinct().withColumn("fraction", lit(0.8)).show()

+----------+--------+
|        x1|fraction|
+----------+--------+
|2147481832|     0.8|
| 214748183|     0.8|
+----------+--------+



In [None]:
temp_df = features_and_targets.withColumn('target_vector', (vector_to_string(array([features_and_targets[col] for col in target_names])))).select(['sig_id', 'target_vector'])
string_indexer = StringIndexer(inputCol = 'target_vector', outputCol = 'target')
string_indexer_model = string_indexer.fit(temp_df)
temp_df = string_indexer_model.transform(temp_df).drop('target_vector')

data = features_and_targets.join(temp_df, features_and_targets.sig_id == temp_df.sig_id, how = 'inner').drop(temp_df.sig_id)