In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark import SparkContext, SparkConf
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import when

In [2]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Co-training") \
    .getOrCreate()

sc = spark.sparkContext

In [4]:
df = spark.read.format('csv').option("header",'True').load("traincotraining.csv")
#read the training data

In [5]:
from pyspark.sql.functions import col
for col in df.columns:
     df = df.withColumn(col, df[col].astype("float"))

In [6]:
va_x = VectorAssembler(inputCols = ["I","V","L","F","C","M","A","G","T","S","hdHydro_mean"], outputCol = "features_x")
va_y = VectorAssembler(inputCols = ["W","Y","P","H","E","Q","D","N","K","R","helical_mean"], outputCol = "features_y")
df = va_x.transform(df)
df = va_y.transform(df)
# create the features

In [7]:
labeled_x = df.filter('Cytoplasm_x != -1')
labeled_y = df.filter('Cytoplasm_y != -1')
unlabeled = df.subtract(labeled_x).subtract(labeled_y)
# divide into 3 dataframe
ratio = labeled_x.filter("Cytoplasm_x == 0").count() / labeled_x.filter("Cytoplasm_x == 1").count()
# the initial ratio

In [12]:
MAXITER = 3
import time

In [13]:
for i in range(MAXITER):
    
    print(f'Iteration: {i} - labeled size x: {labeled_x.count()},labeled size y:{labeled_y.count()},unlabeled size : {unlabeled.count()}')
    
    training = time.time()
    
    # build model
    rf_x = RandomForestClassifier(labelCol='Cytoplasm_x',featuresCol="features_x",maxDepth=3,numTrees=16,seed=1234567)
    rf_y = RandomForestClassifier(labelCol='Cytoplasm_y',featuresCol="features_y",maxDepth=3,numTrees=16,seed=1234567)
    pipe_x = Pipeline (stages = [rf_x])
    pipe_y = Pipeline (stages = [rf_y])
    # training
    model_x = pipe_x.fit(labeled_x)
    model_y = pipe_y.fit(labeled_y)
    
    print(f'1.Training time: {time.time()-training}')
    # predict
    pred_unlabeled_x = model_x.transform(unlabeled).cache()
    pred_unlabeled_y = model_y.transform(unlabeled).cache()
    
    pred_unlabeled_x = pred_unlabeled_x.withColumn('xs', vector_to_array('probability'))
    pred_unlabeled_x = pred_unlabeled_x.withColumn('class_0_prob_x', F.col('xs')[0])
    pred_unlabeled_x = pred_unlabeled_x.withColumn('class_1_prob_x', F.col('xs')[1])
    
    pred_unlabeled_y = pred_unlabeled_y.withColumn('ys', vector_to_array('probability'))
    pred_unlabeled_y = pred_unlabeled_y.withColumn('class_0_prob_y', F.col('ys')[0])
    pred_unlabeled_y = pred_unlabeled_y.withColumn('class_1_prob_y', F.col('ys')[1])
    
    zero_x = pred_unlabeled_x.filter('class_0_prob_x >= 0.6')
    one_x = pred_unlabeled_x.filter('class_1_prob_x >= 0.6')
    zero_y = pred_unlabeled_y.filter('class_0_prob_y >= 0.6')
    one_y = pred_unlabeled_y.filter('class_1_prob_y >= 0.6')
    
    class_0_x = zero_x.count()
    class_1_x = one_x.count()
    class_0_y = zero_y.count()
    class_1_y = one_y.count()
    # collect the credible data 
    if 0 in (class_0_x, class_1_x, class_0_y, class_1_y):
        print("i = ", i, "break \n")
        break

    if class_0_x > class_1_x:
        ratio_x = round(class_1_x * ratio) / class_0_x
        class_0_x = round(class_1_x * ratio)
        to_add_class0_y = zero_x.sample(ratio_x)
        to_add_class1_y = one_x
    else:
        ratio_x = round(class_0_x / ratio) / class_1_x
        class_1_x = round(class_0_x / ratio) 
        to_add_class0_y = zero_x
        to_add_class1_y = one_x.sample(ratio_x)
        
    if class_0_y > class_1_y:
        ratio_y = round(class_1_y * ratio) / class_0_y
        class_0_y = round(class_1_y * ratio)
        to_add_class0_x = zero_y.sample(ratio_y)
        to_add_class1_x = one_y
    else:
        ratio_y = round(class_0_x / ratio)/ class_1_y
        class_1_y = round(class_0_y / ratio) 
        to_add_class0_x = zero_y
        to_add_class1_x = one_y.sample(ratio_y)
    
    print(f'2.Confidences time: {time.time()-training}')
    # fix the dataframe as the column should be same when union and substract
    to_add_class0_y = to_add_class0_y.drop('rawPrediction','probability','xs','class_0_prob_x','class_1_prob_x','prediction')
    to_add_class1_y = to_add_class1_y.drop('rawPrediction','probability','xs','class_0_prob_x','class_1_prob_x','prediction')
    to_add_class0_x = to_add_class0_x.drop('rawPrediction','probability','ys','class_0_prob_y','class_1_prob_y','prediction')
    to_add_class1_x = to_add_class1_x.drop('rawPrediction','probability','ys','class_0_prob_y','class_1_prob_y','prediction')

    delete_xy = to_add_class0_x.union(to_add_class1_x).union(to_add_class0_y).union(to_add_class1_y).distinct()
    unlabeled = unlabeled.subtract(delete_xy)
    
    to_add_class0_x = to_add_class0_x.withColumn("Cytoplasm_x",to_add_class0_x.Cytoplasm_x*0)
    to_add_class1_x = to_add_class1_x.withColumn("Cytoplasm_x",to_add_class1_x.Cytoplasm_x*(-1))
    to_add_class0_y = to_add_class0_y.withColumn("Cytoplasm_y",to_add_class0_y.Cytoplasm_y*0)
    to_add_class1_y = to_add_class1_y.withColumn("Cytoplasm_y",to_add_class1_y.Cytoplasm_y*(-1))
    
    update = time.time()
    labeled_x = labeled_x.union(to_add_class0_x).union(to_add_class1_x)
    labeled_y = labeled_y.union(to_add_class0_y).union(to_add_class1_y)
   
    
    pred_unlabeled_x.unpersist()
    pred_unlabeled_y.unpersist()
    end = time.time()
    print(f'3.Updating sets time: {time.time()-update}')

Iteration: 0 - labeled size x: 367,labeled size y:367,unlabeled size : 13944
1.Training time: 2.359300374984741
2.Confidences time: 11.482417345046997
3.Updating sets time: 0.09477710723876953
Iteration: 1 - labeled size x: 1429,labeled size y:1147,unlabeled size : 12165
1.Training time: 22.318379878997803
2.Confidences time: 81.86222696304321
3.Updating sets time: 0.4574544429779053
Iteration: 2 - labeled size x: 3668,labeled size y:5691,unlabeled size : 6354
1.Training time: 80.32440090179443
2.Confidences time: 199.50757265090942
3.Updating sets time: 2.3138020038604736


## we need to build a classifier assembler

In [14]:
# train 
model_x = pipe_x.fit(labeled_x)
model_y = pipe_y.fit(labeled_y)

In [15]:
# read the test dataset
test = spark.read.format('csv').option("header", 'true').load("testselftraining.csv") 

In [16]:
# preprocess the test
numerical_colstest = test.columns
test = test.select([F.col(c).cast("float").alias(c) for c in numerical_colstest])
test = test.withColumn('Cytoplasm',when(test.Cytoplasm.between(2,6),1).otherwise(0))test = va_x.transform(test)
test = va_y.transform(test)

In [17]:
# predict the test data
final_pred = model_x.transform(test)
final_pred = final_pred.withColumn('xs', vector_to_array('probability'))
final_pred = final_pred.withColumn('class_0_prob_x', F.col('xs')[0])
final_pred = final_pred.withColumn('class_1_prob_x', F.col('xs')[1])
final_pred = final_pred.drop('probability').drop('rawPrediction').drop('xs').drop('prediction')

In [18]:
final_pred = model_y.transform(final_pred)
final_pred = final_pred.withColumn('ys', vector_to_array('probability'))
final_pred = final_pred.withColumn('class_0_prob_y', F.col('ys')[0])
final_pred = final_pred.withColumn('class_1_prob_y', F.col('ys')[1])
final_pred = final_pred.drop('probability').drop('rawPrediction').drop('ys').drop('x').drop('U')

In [19]:
from pyspark.sql.functions import col
for col in final_pred.columns:
    final_pred = final_pred.withColumn("prediction", final_pred.prediction.astype("float"))

In [20]:
from pyspark.sql import functions as F
final_pred = final_pred.withColumn('prediction', 
    F.when(((F.col('class_1_prob_x')>=F.col('class_0_prob_x')) &(F.col('class_1_prob_x')>=F.col('class_0_prob_y'))) | ((F.col('class_1_prob_y')>=F.col('class_0_prob_x')) &(F.col('class_1_prob_y')>=F.col('class_0_prob_y'))), 1)
    .otherwise(0))

In [21]:
final_pred= final_pred.withColumn('TP', 
    when((final_pred['Cytoplasm']== 1) & (final_pred['prediction']==1), 1)
    .otherwise(0))
final_pred = final_pred.withColumn('TN', 
    when((final_pred['Cytoplasm']== 0) & (final_pred['prediction']==0), 1)
    .otherwise(0))
final_pred = final_pred.withColumn('FP', 
    when((final_pred['Cytoplasm']== 1) & (final_pred['prediction']==0), 1)
    .otherwise(0))
final_pred = final_pred.withColumn('FN', 
    when((final_pred['Cytoplasm']== 0) & (final_pred['prediction']==1), 1)
    .otherwise(0))

In [22]:
TP = final_pred.filter('TP==1').count()
TN = final_pred.filter('TN==1').count()
FP = final_pred.filter('FP ==1').count()
FN = final_pred.filter('FN==1').count()
acc = (TP+TN)/(TP+TN+FP+FN)
S = TP/(TP+TN)
E = TN/(TN+FP)

In [23]:
acc,S,E

(0.45112781954887216, 0.48333333333333334, 0.3563218390804598)