In [1]:
from pyspark.ml.feature import (RegexTokenizer, Tokenizer, HashingTF, IDF,
                                StopWordsRemover, CountVectorizer, StopWordsRemover, StringIndexer, OneHotEncoder)
from pyspark.ml.evaluation import (BinaryClassificationEvaluator,
                                  MulticlassClassificationEvaluator)
from pyspark.sql.types import (LongType ,StringType, IntegerType,
                               FloatType, DoubleType, ArrayType)
from pyspark.sql.functions import col, udf, avg
from pyspark.sql.types import IntegerType
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from sklearn.model_selection import train_test_split
from pyspark.ml.clustering import LDA
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import countDistinct
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql import HiveContext
from pyspark.sql import functions as F

# Create Train-Validation-Test DOI sets

In [3]:
#FOR ENSEMBLE EXPERIMENT ONLY!!!
df_doiCateg = spark.sql("SELECT * FROM taxiarchis.doi_categories_4_4_features")
df = spark.sql("SELECT * FROM taxiarchis.section_4_4_ensemble").dropna()

indexer = StringIndexer(inputCol="Category", outputCol="label")
indexed = indexer.fit(df_doiCateg).transform(df_doiCateg)

#transfrom to pandas
df_cat = indexed.toPandas()

#TRAINING-VALIDATION (0.4-0.6)
#Split the DOIs for training and Test-----random state =42

X_train, X_validation, y_train, y_validation  = train_test_split(df_cat['DOI'],df_cat['label'], test_size=0.6, random_state=42)


#KEEP IT FOR VECTORIZE LATER
second_split_back_up = X_validation



#VALIDATION-TEST (0.66-0.34)--->(0.4,0.2) from original dataset
X_validation, X_test, y_validation, y_test = train_test_split(X_validation, y_validation, test_size=0.34, random_state=42)



print(len(X_train))
print(len(X_validation))
print(len(X_test))
print(len(second_split_back_up))

# TF-IDF

In [5]:
regexTokenizer = RegexTokenizer(inputCol="Section", outputCol="tokens", pattern="\\W")
remover = StopWordsRemover(inputCol='tokens', outputCol='filtered')
count_vec = CountVectorizer(inputCol='filtered', outputCol='count_vec')
idf = IDF(inputCol='count_vec', outputCol='features',minDocFreq=2)

#STRINGINDEXER THE DOC_ID

int_category = StringIndexer(inputCol='Category',outputCol='label')
int_sections = StringIndexer(inputCol = 'doc_id', outputCol ='uni_sec_id')


prep_pipeline = Pipeline(stages = [int_category, int_sections, regexTokenizer, remover, count_vec, idf])
pre_processing = prep_pipeline.fit(df)
pre_processed_data = pre_processing.transform(df)

final_data = pre_processed_data.select(['doc_id','uni_sec_id','DOI','label','features']).cache()
# display(final_data)

# LDA

In [7]:
from pyspark.ml.clustering import LDA

regexTokenizer = RegexTokenizer(inputCol='Section', outputCol="tokens", pattern="\\W")
remover = StopWordsRemover(inputCol='tokens', outputCol='filtered')
count_vec = CountVectorizer(inputCol='filtered', outputCol='features')

int_category = StringIndexer(inputCol='Category',outputCol='label')
int_id = StringIndexer(inputCol='doc_id',outputCol='id') #map each unique document with an id
#PIPELINE


prep_pipeline = Pipeline(stages = [int_category, int_id, regexTokenizer, remover, count_vec])

pre_processing = prep_pipeline.fit(df)
pre_processed_data = pre_processing.transform(df)

final_data_LDA = pre_processed_data.cache()



final_data_LDA = final_data_LDA.dropDuplicates(['id'])
# final_data_LDA = final_data_LDA.select(['id','DOI','contENT','features','Section','Category','Sections','label'])
# final_data_LDA.count()

# Trains a LDA model.
lda = LDA(k=50, maxIter=50, optimizer="em")
model = lda.fit(final_data_LDA.select('id','features'))


#Create LDA features
transformed = model.transform(final_data_LDA)
final_data = transformed.select(['DOI','label','topicDistribution']).withColumnRenamed("topicDistribution", "features").cache()
display(final_data)

# Classification Algorithms

In [9]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import (RandomForestClassifier, GBTClassifier,
                                      DecisionTreeClassifier)
from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.ml.classification import LogisticRegression

#Multinomial Naive Bayes
nb = NaiveBayes()

#Decision Tree - Random Forest 
dtc = DecisionTreeClassifier(maxDepth = 10)
# maxDepth = 15
rfc = RandomForestClassifier(numTrees = 150, maxDepth = 10)

#Linear SVM - One vs All
lsvc = LinearSVC(maxIter=15, regParam=0.15)
ovr = OneVsRest(classifier=lsvc  )

#Logistic Regression
lr = LogisticRegression(maxIter=20, regParam=0.1, elasticNetParam=0.1 )



##Split Training - Validation set

In [11]:
#train
train = final_data.filter(final_data.DOI.isin(list(X_train)))

#validation
validation = final_data.filter(final_data.DOI.isin(list(X_validation)))


#For strong classifier Ensemble
sec_classifir_vec  = final_data.filter(final_data.DOI.isin(list(second_split_back_up)))

print(train.count())
print(validation.count())

##Evaluation Function

In [13]:
def evaluation_measures(new_df_p):
    '''
    Predict all the evaluation Measures:
    Accuracy, F1-Score, Recall, Precision
    '''
    
    ev_m = []
    
    #ACCURACY
    acc_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName = 'accuracy')
    accuracy = acc_eval.evaluate(new_df_p)
    print("Accuracy = %g" % accuracy)
    ev_m.append(accuracy)
    
    #F1-SCORE
    acc_eval_f1 = MulticlassClassificationEvaluator(metricName = 'f1')
    f1_score = acc_eval_f1.evaluate(new_df_p)
    print("f1 = %g" % f1_score)
    ev_m.append(f1_score)
    
    #RECALL
    acc_eval_recall = MulticlassClassificationEvaluator(metricName = 'weightedRecall')
    recall = acc_eval_recall.evaluate(new_df_p)
    print("weightedRecall = %g" % recall)
    ev_m.append(recall)
    
    #PRECISION
    acc_eval_precission = MulticlassClassificationEvaluator(metricName = "weightedPrecision")
    precission = acc_eval_precission.evaluate(new_df_p)
    print("weightedPrecision = %g" % precission)
    ev_m.append(precission)
    
    return ev_m

In [14]:
#train classifier - train set
paper_class = dtc.fit(train)

#Prepare features for the second Classifier
df_Second = paper_class.transform(sec_classifir_vec)

#predict labels - validation set
validation_results = paper_class.transform(validation)
evaluation_measures(validation_results)



# Check sections order

In [16]:
def find_maximum(test_results):
    '''
    Finds the maximum probability and 
    prepares the dataframe for the Majority Vote.
    '''  
    udf_wf_var = udf(lambda x: round(np.max(np.array(x)),10), returnType=FloatType()) #Define UDF function
    df =  test_results.withColumn('WF_Var',udf_wf_var('probability')).dropna()

    return df.groupby("DOI").agg(F.collect_list('prediction').alias('list_categ'),F.collect_list('WF_Var').alias('prob_categ'),F.collect_list('doc_id').alias('Sec_ids'))

test_df_sect = find_maximum(validation_results)
display(test_df_sect)

# Prepare Vectorized Dataframe for the Second Classifier

In [18]:
import numpy as np
from pyspark.sql.types import (LongType ,StringType, IntegerType,
                               FloatType, DoubleType, ArrayType)
from pyspark.sql.functions import countDistinct


def find_maximum(test_results):
    '''
    Finds the maximum probability and 
    prepares the dataframe for the Majority Vote.
    '''  
    udf_wf_var = udf(lambda x: round(np.max(np.array(x)),10), returnType=FloatType()) #Define UDF function
    df =  test_results.withColumn('WF_Var',udf_wf_var('probability')).dropna()

    return df.groupby("DOI").agg(F.collect_list('prediction').alias('list_categ'),F.collect_list('WF_Var').alias('prob_categ'))


def filter_arrays(ar):
    '''
    Returns only the arrays that match with 
    the array_length.
    '''
    array_length = 4
    
    if len(ar)!=array_length:
          return int(0)
    else:
          return int(1)
    

def clear_corrupt_vectors(new_set):
    '''
    Filter out corrupt or unmatching 
    vector lengths by using the filter_arrays
    function. The function has a fixed length of 4
    need to be changed if needs to be tested higher vectors.
    '''
    flt = udf( filter_arrays,IntegerType())
    filt_df =new_set.withColumn('binary_filter',flt(new_set.prob_categ))

    return filt_df.filter(col('binary_filter').isin([1]))


def vectorize_array(df):
    '''
    Vectorize a dataframe's column with arrays.
    Preparing the dataframe for the classifier.
    '''
    to_vector = udf(lambda a: Vectors.dense(a), VectorUDT())
    
    return df.select('DOI', to_vector("prob_categ").alias("features"))
    

def prepare_for_strong_classifier(test_results):
    '''
    Uses all the above functions to transform
    '''
    
    #Find Maximum Probability and prepare dataframe for majority vote
    df_prep = find_maximum(test_results)

    #prepare the dataset to send to the stronger classifier
    return vectorize_array(clear_corrupt_vectors(df_prep))
    

df_train_2 = prepare_for_strong_classifier(df_Second)#TOD-DO CHECK IF THE COLUMSN ARE RIGHT ADN IS THE CORRECT DATAFRAME!!

# Inner Merge with real Categories
df_train_2 = df_train_2.join(indexed , on=['DOI'], how='inner').dropDuplicates().dropna().cache()
display(df_train_2)


# Split Training_Set_2 - Test Set

In [20]:
#DONT DO IT FOR THE SECOND EXPERIMENT

#validation
validation_2 = df_train_2.filter(df_train_2.DOI.isin(list(X_validation)))

#test
test_df_f = df_train_2.filter(df_train_2.DOI.isin(list(X_test)))

print(validation_2.count())
print(test_df_f.count())

# Train all Second Classifiers

In [22]:
dict_algo={'nb':'Multinomial Naive Bayes',
           'lr':'Multinomial Logistic Regression',
           'dtc':'Decision Tree Classifier',
           'rfc':'Random Forest Classifier',
           'ovr':'One vs Rest Linear SVM'
}


algorithms = [dtc, nb,lr,ovr,rfc]
names = ["dtc", "nb", 'lr', "ovr",'rfc']
for algo, n in zip(algorithms,names):
    print(dict_algo[n])
    paper_class_2 = algo.fit(validation_2)
    df_Second_results = paper_class_2.transform(test_df_f)
    evaluation_measures(df_Second_results)

In [23]:
#train classifier - train set
paper_class_2 = nb.fit(validation_2)

#predict labels - test set
df_Second_results = paper_class_2.transform(test)
# display(df_Second_results)
#EVALUATE
evaluation_measures(df_Second_results)

# Simple majority vote

In [26]:
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import functions as F
from pyspark.sql import functions as F
import numpy as np
import random 
from pyspark.sql.types import IntegerType,DoubleType
from pyspark.sql.functions import col


def prepare_rand_majority(df):
  '''
  Return a dataframe which contains
  in each row a document and a list of
  the predictions of the classifier prepared 
  for the majority vote.
  '''
  return  df.groupby("DOI").agg(F.collect_list("prediction").alias('list_categ'))


def majority_vote_random(votes):
    '''
    Predict the majority vote by
    returning randomly when we have more
    than one winner class.
    '''

    vote_counts = {}
    
    for vote in votes:
        if vote in vote_counts:
            vote_counts[vote] += 1
        else:
            vote_counts[vote] = 1
    winners = []
    max_count = np.max(vote_counts.values())
    
    for vote, count in vote_counts.items():
        if count == max_count:
            winners.append(vote)
            
    return random.choice(winners)

  
def final_prediction_maj_random(df):
    '''
    Computes the final predictions based
    on a simple majority vote.
    '''
    df_for_majority_vote = prepare_rand_majority(df)
    #udf apply function
    majority_function = udf(lambda z: majority_vote_random(z), DoubleType())
#     final_prediction = df_for_majority_vote.withColumn('prediction', majority_function(col('list_categ')))
    f_res = df_for_majority_vote.withColumn('prediction', majority_function(col('list_categ')))
    return f_res.join(indexed , on=['DOI'], how='inner').dropDuplicates().dropna().cache()