## Co-Random-Forest Implentation

## Initialization

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext,SparkConf
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType,StringType
conf =SparkConf().setMaster("local[*]").setAppName("CW") 
sc = SparkContext(conf = conf)
spark = SparkSession(sc)

# change the path here to the path of your dataset
path = "./new_feature.csv"
df = spark.read.option("header",True).option("inferSchema",True).csv(path)


import numpy as np
from numpy import allclose
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.linalg import Vectors
from  pyspark.ml.classification import RandomForestClassifier


assembler = VectorAssembler(inputCols=df.drop('Class').columns,outputCol="features")
df=assembler.transform(df)

indexer = StringIndexer(inputCol = 'Class', outputCol = 'ClassIndex')
df = indexer.fit(df).transform(df)
df=df.select('features', 'ClassIndex')

# change the parameters here ti set the labeled and unlabeled ratio and training/test ratio
unlabel_ratio=0.95
test_ratio=0.10
num_of_Base_Classifier=6
rand_seed=np.random.randint(99999)

train, test = df.randomSplit([1-test_ratio, test_ratio], seed=rand_seed)
L_train, U_train = train.randomSplit([1-unlabel_ratio, unlabel_ratio], seed=rand_seed)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/30 15:04:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

## The implementation of Co-Forest algorithm

In [2]:
from pyspark.ml import Pipeline
from pyspark.sql.functions import concat, col, lit,struct,udf, array
from pyspark.sql.types import IntegerType
from pyspark.sql import Row
import functools
import random
import math
import time

def current_milli_time():
    return round(time.time() * 1000)

class CoForest_Config:
    def __init__(self,num_classifiers=6,m_numFeatures=0,m_seed=1,m_KValue=0,m_threshold=0.75,max_iter=10000,m_classifiers=None):
        self.num_classifiers=num_classifiers
        self.m_numFeatures=m_numFeatures
        self.m_seed=m_seed
        self.m_KValue=m_KValue
        self.m_threshold=m_threshold
        self.m_numOriginalLabeledInsts=0
        self.m_classifiers=m_classifiers
        self.max_iter=max_iter
        self.num_class=0
# you can change the configuration of training there
coforest_conf=CoForest_Config() # the name must be exactly coforest_conf
num_class=5
num_classifiers=coforest_conf.num_classifiers
m_threshold=coforest_conf.m_threshold
        
def getPredictionOnLabelData(Instances,m_classifiers):
    index_Ins = Instances.zipWithIndex().map(lambda x: (x[1],x[0]))
    L_data = Instances.map(lambda x:x[0]).toDF()
    predic=m_classifiers[0].evaluate(L_data)
    predic=predic.predictions.select("prediction","probability")
    columns = [col("prediction"),col("probability")] 
    predicRDD=predic.withColumn("outcome", struct(columns)).select("outcome").rdd.zipWithIndex().map(lambda x: (x[1],(x[0],)))
    for i in range(1,len(m_classifiers)):
        predic.unpersist()
        predic=m_classifiers[i].evaluate(L_data).predictions.select("prediction","probability")
        tempRDD=predic.withColumn("outcome", struct(columns)).select("outcome").rdd.zipWithIndex().map(lambda x: (x[1],x[0]))
        predicRDD=predicRDD.join(tempRDD).map(lambda x:(x[0],(*x[1][0],x[1][1])))
    outCome=predicRDD.join(index_Ins)
    outCome=outCome.sortBy(lambda x: x[0]).map(lambda x:(x[1][0],x[1][1][1],x[1][1][2],x[1][1][0]))
    # release Memory
    outCome.take(1)
    index_Ins.unpersist()
    predic.unpersist()
    L_data.unpersist()
    predicRDD.unpersist()
    return outCome

def blockSample(iter1):
    numblock=10000      # 块的大小
    randomindexblock={} # 块内每行的随机抽样的行
    i= -1
    for x in iter1:
        i+=1
        index=i%numblock
        if(index==0):
            randomindexblock = gen_randomindexblock(numblock)
        if(index in randomindexblock.keys()):
            yield (x[0],x[1] + [randomindexblock[index]]) # 样本x被随机选中了,并且选中了randomindexblock(index)次,插在后面
        else:
            yield (x[0],x[1] + [0])

def gen_randomindexblock(numblock):
    result={}
    tmp=-1
    for j in range(numblock):
            # 随机生成一个数据,属于[0,numblock)
        tmp=random.randint(0,numblock)
        if not tmp in result.keys():  # 如果不存在则新建
            result[tmp] = 0
        result[tmp] = result[tmp]+1
    return result

def blockSample2(classify_i,iter1):    
    for x in iter1:
        samplenum=x[1][classify_i] ## 抽样次数
        if samplenum>0: # 如果抽样次数大于0
            for y in range(0,samplenum): # 遍历抽样的次数,每次重复输出一次
                yield (x[0],1.0)       # weight=1.0

def resampleWithWeightsforallclassify(Config,Dataset):
    labeled_tmp=None 
    index=0 
    labeled_tmp=Dataset.map(lambda x: (x, []))
    for i in range(0,Config.num_classifiers):
        labeled_tmp=labeled_tmp.mapPartitions(blockSample)

    labeleds_outofbag=labeled_tmp.map(lambda x: (x[0],1.0, [i > 0 for i in x[1]]))
    # 每个分类器的抽样的袋内数据:根据抽样次数,某些样本会出现多次,有些样本会不出现
    #　Array[RDD[InstLabeledinofbag]]
    labeleds_inofbag=[]
    # 遍历分类器
    for i in range(0,Config.num_classifiers):
        tmp_func=functools.partial(blockSample2,i)
        labeleds_inofbag.append(labeled_tmp.mapPartitions(tmp_func))
    return (labeleds_inofbag,labeleds_outofbag)

def reduceError(input1,input2):
    return (input1[0]+input2[0],input1[1]+input2[1])

def measureError(m_threshold,Instances,id1):
    mapFunc=functools.partial(errorSample,m_threshold,id1)
    (err,count)=Instances.mapPartitions(mapFunc).reduce(reduceError)
    return err/count

def errorSample(m_threshold,id1,iter1):
    err=0
    count=0
    for x in iter1:
        distr=outOfBagDistributionForInstanceExcluded((x[0],x[2]),id1)
        if(getConfidence(distr)>m_threshold): # add self.m_class
            count += x[1]
            if int(maxIndex(distr))!=int(x[3]["ClassIndex"]):
                err += x[1]
    yield (err,count)
    
# override def distributionForInstance(Instance:Vector): Array[Double] ={
#     val index:Int=solModel_lr.predict(Instance).toInt
#     val result:Array[Double]=new Array(10)
#     result(index)=1.0
#     result
#   } 就是返回一个one-hot vector, 分类的class标记为1,所以此处不使用distribution,直接返回prediction class index
#  当然 也可以是用 c[0].evaluate(test).predictions.take(20)[0]["probability"] 来获得distribution
def outOfBagDistributionForInstanceExcluded(Instance,idExcluded):
    distr = [0 for i in range(0,num_class)] # replace with number of classes
#     print(Instance," ",idExcluded)
    for i in range(0,num_classifiers): # replace with num_classifier
        if(Instance[1][i]==False and i!=idExcluded):
#             d=m_classifiers[i].distributionForInstance(Instance[0]) # add self.m_class
#             ins=sc.parallelize([Instance[0]]).toDF()
#             d=m_classifiers[i].evaluate(ins).predictions.head()[0]["prediction"]
#             ins.unpersist()
#             print(Instance[0])
            d=Instance[0][i]["outcome"]["prediction"]
#             print("prediction is ",d)
#         for iClass in range(0,CF_cofig.Get_numClasses):
            distr[int(d)]+=1
    if(sum(distr)!= 0):
        distr=normalize(distr) # add self.m_class
    return distr

def normalize(inputs):
    sumVal=0
    output=[0 for i in range(len(inputs))]
    for i in range(len(inputs)):
        sumVal+=inputs[i]
    for i in range(len(inputs)):
        output[i]=inputs[i]/sumVal
    return output

def getConfidence(p):
    return p[maxIndex(p)]

def maxIndex(inputs):
    a = np.argmax(np.array(inputs))
    return np.argmax(np.array(inputs))


def sampleWithWeightsforoneclassify(Dataset,Datasetnum,samplenum):
    return Dataset.sample(withReplacement=False, \
                   fraction=min(1.0,float(samplenum)/float(Datasetnum)), \
                   seed=current_milli_time())
    
def getPredictionOnUnLabelData(Instances,m_classifiers):
    index_Ins = Instances.rdd.zipWithIndex().map(lambda x: (x[1],(1.0,x[0])))
    predic=m_classifiers[0].evaluate(Instances)
    predic=predic.predictions.select("prediction","probability")
    columns = [col("prediction"),col("probability")] 
    predicRDD=predic.withColumn("outcome", struct(columns)).select("outcome").rdd.zipWithIndex().map(lambda x: (x[1],(x[0],)))
    for i in range(1,len(m_classifiers)):
        predic=m_classifiers[i].evaluate(Instances).predictions.select("prediction","probability")
        tempRDD=predic.withColumn("outcome", struct(columns)).select("outcome").rdd.zipWithIndex().map(lambda x: (x[1],x[0]))
        predicRDD=predicRDD.join(tempRDD).map(lambda x:(x[0],(*x[1][0],x[1][1])))
    outCome=predicRDD.join(index_Ins)
    outCome=outCome.sortBy(lambda x: x[0]).map(lambda x:(x[1][0],x[1][1][0],x[1][1][1]))
    
    # release Memory
    outCome.take(1)
    index_Ins.unpersist()
    predic.unpersist()
    tempRDD.unpersist()
    predicRDD.unpersist()
    return outCome


def distributionForInstanceExcluded(Instance,idExcluded):
    res=[0.0 for i in range(0,num_class)]
    for i in range(0,num_classifiers):
        if(i!=idExcluded):
            d=Instance[0][i]["outcome"]["prediction"]
            res[int(d)]+=1
    return normalize(res)
    
def isHighConfidence(Instance,idExcluded):
    distr = distributionForInstanceExcluded(Instance,idExcluded) # 使用除第idExcluded以外的所有分类器对未标记样本inst综合预测出该样本属于每个分类的概率
    confidence = getConfidence(distr) # 找出每个分类概率中最max的概率作为置信度
    if(confidence > m_threshold): # 如果置信度大于阀值
        predictlabel = maxIndex(distr) # 找出每个分类概率中第几个分类是最max的,作为分类结果
        return (True,predictlabel,confidence) # 能成为标记数据集
    else:
        return (False,-1,0.0) # 不能成为标记数据集

def sampleUnlabel(classify_i,iter1):
    for x in iter1:
        (isConfident,predictlabeled,weight)=isHighConfidence(x,classify_i)
        if isConfident:
            yield (x[2],predictlabeled,weight)
            
def modelClassify(Instances,m_classifiers):
    index_Ins = Instances.rdd.zipWithIndex().map(lambda x: (x[1],x[0]))
    predic=m_classifiers[0].evaluate(Instances)
    predicRDD=predic.predictions.select("prediction").rdd.zipWithIndex().map(lambda x: (x[1],(x[0],)))
    for i in range(1,len(m_classifiers)):
        predic=m_classifiers[i].evaluate(Instances).predictions.select("prediction")
        tempRDD=predic.rdd.zipWithIndex().map(lambda x: (x[1],x[0]))
        predicRDD=predicRDD.join(tempRDD).map(lambda x:(x[0],(*x[1][0],x[1][1])))
    predicRDD=predicRDD.mapPartitions(getLabelBasedOnVoting)
    outCome=index_Ins.join(predicRDD).map(lambda x: Row(features=x[1][0]["features"],ClassIndex=x[1][0]["ClassIndex"], \
                                              prediction=x[1][1])).toDF()
    # release Memory
    outCome.take(1)
    index_Ins.unpersist()
    predic.unpersist()
    tempRDD.unpersist()
    predicRDD.unpersist()
    return outCome

def getLabelBasedOnVoting(iter1):
    for x in iter1:
        distr=distributionForInstance(x[1])
        yield (x[0],float(maxIndex(distr)))
            
def distributionForInstance(Instance):
    res=[0.0 for i in range(0,num_class)]
    for i in range(0,num_classifiers):
        d=Instance[i]["prediction"]
        res[int(d)]+=1
    return normalize(res)
            

def buildClassifier(Config,labeledSet,unlabeledSet):
    err =  np.zeros(Config.num_classifiers)
    err_prime = np.zeros(Config.num_classifiers)
    s_prime = np.zeros(Config.num_classifiers)
    s = np.zeros(Config.num_classifiers)
    global num_classifiers
    num_classifiers=Config.num_classifiers
    
    random.seed(Config.m_seed)
    Config.m_numOriginalLabeledInsts = labeledSet.count()
    m_KValue = Config.m_numFeatures
    if (m_KValue < 1):
        m_KValue = int(math.log2(len(labeledSet.columns)))+1
        Config.m_numFeatures=m_KValue
    
    Config.num_class=df.groupBy("ClassIndex").count().count()
    global num_class
    num_class=Config.num_class
    
    randSeeds = np.random.randint(99999, size=Config.num_classifiers)
    m_classifiers = []
    # you can change the configuation of base classifier here
    classifier_config = RandomForestClassifier(numTrees=1,maxDepth=12,maxBins=32,labelCol='ClassIndex',featuresCol='features',seed=Config.m_seed,bootstrap=False)
    
    (labeleds_inofbag,labeleds_outofbag)=resampleWithWeightsforallclassify(Config,labeledSet.rdd)
    for i in range(0,Config.num_classifiers):
        classifier_config.setSeed(randSeeds[i])
        m_classifiers.append(classifier_config.fit(labeleds_inofbag[i].map(lambda x:x[0]).toDF()))
        err_prime[i] = 0.5;
        s_prime[i] = 0;

    Config.m_classifiers=m_classifiers
    bChanged = True;
    Iteration=0
    Li=[0 for i in range(0,Config.num_classifiers)]

    while(bChanged & (Iteration < Config.max_iter)):
        bChanged = False;
        bUpdate = [False for i in range(Config.num_classifiers)]
        predic_data=getPredictionOnLabelData(labeleds_outofbag,m_classifiers)
        for i in range(0,Config.num_classifiers):
            err[i] = measureError(Config.m_threshold,predic_data,i)
            Li[i] = 0
            if(err[i] < err_prime[i]):
                if(s_prime[i] == 0):
                    s_prime[i] = min(unlabeledSet.count() / 10, 1000)
                weight = 0             
                numWeightsAfterSubsample = int(math.ceil(err_prime[i] * s_prime[i] / err[i] - 1))
                
                subSamplingSet=sampleWithWeightsforoneclassify(unlabeledSet,unlabeledSet.count(),numWeightsAfterSubsample)
                tmp_func=functools.partial(sampleUnlabel,i)
                un_predic_data=getPredictionOnUnLabelData(subSamplingSet,m_classifiers).mapPartitions(tmp_func)
                s[i]=un_predic_data.map(lambda x: x[2]).reduce(lambda x,y:x+y)
                Li[i]=un_predic_data.map(lambda x: Row(features=x[0]["features"],ClassIndex=float(x[1]))).toDF()
                Li_i_size=Li[i].count()
                if(s_prime[i] < float(Li_i_size)):
                    if(err[i]*s[i] < err_prime[i]*s_prime[i]):
                        bUpdate[i] = True
                # release Memory
                un_predic_data.unpersist()
                subSamplingSet.unpersist()
                
        
        # update classifiers
        for i in range(Config.num_classifiers):
            if(bUpdate[i]):
                bChanged = True
                newDataSet=labeledSet.union(Li[i])
                m_classifiers[i]=classifier_config.fit(newDataSet)
                newDataSet.unpersist()
                err_prime[i] = err[i];
                s_prime[i] = s[i];
        Iteration+=1
        
        # release Memory
        predic_data.unpersist()
        for i in range(0,len(Li)):
            if Li[i] == 0:
                continue
            Li[i].unpersist()
        print("The iteration: ",Iteration)

    return Iteration,m_classifiers

## Training

In [3]:
## to-do:目前看来，还有两个错误:
# 1.为什么confidence一直是1啊，奇怪(解决了，normailize写错了,无语)
# 2.把unlabeled data的ClassIndex换成之前训练器得出的结果，不要用原本的label去训练下一个训练器(解决了,toDF一直infer schema error,把ClassIndex换成float就没有错误了,不知道spark为什么这么设计,实在是不好用)
# 3.内存使用的太厉害啦，用memory tuning的一些技巧和unpersist()来减少内存使用

coforest_conf.max_iter=100
iter_times,CORF_classifiers=buildClassifier(coforest_conf,L_train,U_train)

22/04/30 15:04:46 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

The iteration:  1


                                                                                

The iteration:  2


                                                                                

The iteration:  3


## Test

In [4]:
outcome=modelClassify(test,CORF_classifiers)
outcome.show()

+--------------------+----------+----------+
|            features|ClassIndex|prediction|
+--------------------+----------+----------+
|[-108.55273845353...|       3.0|       1.0|
|[-11.633480773034...|       1.0|       1.0|
|[-7.6028835753394...|       4.0|       4.0|
|[-3.9311887350913...|       3.0|       3.0|
|[4.93261867119043...|       4.0|       4.0|
|[8.66099912148604...|       1.0|       1.0|
|[10.6187336459507...|       1.0|       1.0|
|[12.4764321663229...|       3.0|       3.0|
|[16.3147134254692...|       3.0|       2.0|
|[21.1733697878802...|       2.0|       2.0|
|[27.9307098171451...|       4.0|       4.0|
|[29.5026653660435...|       1.0|       1.0|
|[30.5157508836321...|       4.0|       2.0|
|[33.1589097915249...|       1.0|       1.0|
|[36.3775090462241...|       1.0|       1.0|
|[38.7207549754442...|       1.0|       1.0|
|[41.4118672978134...|       4.0|       2.0|
|[44.4927384799594...|       3.0|       3.0|
|[48.2790218950527...|       2.0|       2.0|
|[49.66461



In [5]:
a=outcome.withColumn("outCome",F.when(F.col("ClassIndex")==F.col("prediction"), 1).otherwise(0))
accuracy=a.groupBy('outCome').sum('outCome').collect()[0]["sum(outCome)"]/a.count()
accuracy

0.7802633260897354

## Evaluation

In [6]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


evaluator_acc = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="ClassIndex",metricName="accuracy")
evaluator_f1 = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="ClassIndex",metricName="f1")
evaluator_pre = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="ClassIndex",metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol="ClassIndex", metricName="weightedRecall")

print('pyspark accuracy: %.6f' %evaluator_acc.evaluate(a))
print('pyspark f1-score: %.6f' %evaluator_f1.evaluate(a))
print('pyspark precision: %.6f' %evaluator_pre.evaluate(a))
print('pyspark recall: %.6f' %evaluator_recall.evaluate(a))

pyspark accuracy: 0.780263
pyspark f1-score: 0.779555
pyspark precision: 0.783554
pyspark recall: 0.780263


## Stop

In [8]:
spark.stop()
sc.stop()