In [85]:
from pyspark.ml.feature import OneHotEncoderEstimator, VectorAssembler
from pyspark.sql import *
from pyspark import SparkConf

from pyspark.sql import DataFrame
from pyspark.sql.functions import rand
from pyspark.sql.types import IntegerType, DoubleType

from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, CountVectorizer, StringIndexer

from pyspark.ml.classification import LogisticRegression, LinearSVC, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import time 

In [86]:
import random
rseed = 1024
random.seed(rseed)

start_time = time.time()



In [87]:
from pyspark.sql.functions import col

def base_features_gen_pipeline(input_descript_col="descript", input_category_col="category", output_feature_col="features", output_label_col="label"):
    token = Tokenizer(inputCol=input_descript_col, outputCol="words")
    cv = CountVectorizer(inputCol="words", outputCol=output_feature_col)
    index = StringIndexer(inputCol=input_category_col, outputCol=output_label_col)
    return Pipeline(stages=[token, cv, index])

def gen_meta_features(training_df, nb_0, nb_1, nb_2, svm_0, svm_1, svm_2):
    k_fold = training_df.select("group").distinct().count()
    flag = 0
    for k in range(0,k_fold):
        train_group = training_df.filter(training_df["group"]!=k) 
        test_group = training_df.filter(training_df["group"]==k) 
        
        pipe = Pipeline(stages=[nb_0, nb_1, nb_2, svm_0, svm_1, svm_2])
        model = pipe.fit(train_group)
        
        if flag==0:
            result = model.transform(test_group)
            flag = flag+1
        else:
            temp = model.transform(test_group)
            result = result.union(temp)
    
    result = result.withColumn("joint_pred_0", 2*col("nb_pred_0")+col("svm_pred_0"))
    result = result.withColumn("joint_pred_1", 2*col("nb_pred_1")+col("svm_pred_1"))
    result = result.withColumn("joint_pred_2", 2*col("nb_pred_2")+col("svm_pred_2"))
#     result = result.withColumn("joint_pred_0", (joint("nb_pred_0", "svm_pred_0")).cast(DoubleType()))
#     result = result.withColumn("joint_pred_1", (joint("nb_pred_1", "svm_pred_1")).cast(DoubleType()))
#     result = result.withColumn("joint_pred_2", (joint("nb_pred_2", "svm_pred_2")).cast(DoubleType()))
    return result
    


def test_prediction(test_df, base_features_pipeline_model, gen_base_pred_pipeline_model, gen_meta_feature_pipeline_model, meta_classifier):
    base_feature = base_features_pipeline_model.transform(test_df)
    feature_model = gen_base_pred_pipeline_model.transform(base_feature)
    feature_join = feature_model.withColumn("joint_pred_0", 2*col("nb_pred_0")+col("svm_pred_0"))
    feature_join = feature_join.withColumn("joint_pred_1", 2*col("nb_pred_1")+col("svm_pred_1"))
    feature_join = feature_join.withColumn("joint_pred_2", 2*col("nb_pred_2")+col("svm_pred_2"))
    final_feature = gen_meta_feature_pipeline_model.transform(feature_join)
    pred = meta_classifier.transform(final_feature).select("id", "label", "final_prediction")
    return pred
    

In [88]:

def gen_binary_labels(df):
    df = df.withColumn('label_0', (df['label'] == 0).cast(DoubleType()))
    df = df.withColumn('label_1', (df['label'] == 1).cast(DoubleType()))
    df = df.withColumn('label_2', (df['label'] == 2).cast(DoubleType()))
    return df

# Create a Spark Session
conf = SparkConf().setMaster("local[*]").setAppName("lab3")
spark = SparkSession.builder.config(conf=conf).getOrCreate()

# Load data
train_data = spark.read.load("proj2train.csv", format="csv", sep="\t", inferSchema="true", header="true")
test_data = spark.read.load("proj2test.csv", format="csv", sep="\t", inferSchema="true", header="true")



In [89]:
# build the pipeline from task 1.1
base_features_pipeline = base_features_gen_pipeline()
# Fit the pipeline using train_data
base_features_pipeline_model = base_features_pipeline.fit(train_data)
# Transform the train_data using fitted pipeline
training_set = base_features_pipeline_model.transform(train_data)

training_set.printSchema()
training_set.select("id", "features", "label").show(5)

root
 |-- id: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- descript: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)

+---+--------------------+-----+
| id|            features|label|
+---+--------------------+-----+
|  0|(5421,[1,18,31,39...|  1.0|
|  1|(5421,[0,1,15,20,...|  0.0|
|  2|(5421,[3,109,556,...|  0.0|
|  3|(5421,[1,2,3,5,6,...|  1.0|
|  4|(5421,[2,3,4,8,11...|  1.0|
+---+--------------------+-----+
only showing top 5 rows



In [90]:
# assign random groups and binarize the labels
training_set = training_set.withColumn('group', (rand(rseed)*5).cast(IntegerType()))
training_set = gen_binary_labels(training_set)

training_set.printSchema()
training_set.show(5)

root
 |-- id: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- descript: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)
 |-- group: integer (nullable = true)
 |-- label_0: double (nullable = false)
 |-- label_1: double (nullable = false)
 |-- label_2: double (nullable = false)

+---+--------+--------------------+--------------------+--------------------+-----+-----+-------+-------+-------+
| id|category|            descript|               words|            features|label|group|label_0|label_1|label_2|
+---+--------+--------------------+--------------------+--------------------+-----+-----+-------+-------+-------+
|  0|    MISC|I've been there t...|[i've, been, ther...|(5421,[1,18,31,39...|  1.0|    4|    0.0|    1.0|    0.0|
|  1|    FOOD|Stay away from th...|[stay, away, from...|(5421,[0,1,15,20,...|  0.0|    4|    1.0|    0.

In [91]:
# define base models
nb_0 = NaiveBayes(featuresCol='features', labelCol='label_0', predictionCol='nb_pred_0', probabilityCol='nb_prob_0', rawPredictionCol='nb_raw_0')
nb_1 = NaiveBayes(featuresCol='features', labelCol='label_1', predictionCol='nb_pred_1', probabilityCol='nb_prob_1', rawPredictionCol='nb_raw_1')
nb_2 = NaiveBayes(featuresCol='features', labelCol='label_2', predictionCol='nb_pred_2', probabilityCol='nb_prob_2', rawPredictionCol='nb_raw_2')
svm_0 = LinearSVC(featuresCol='features', labelCol='label_0', predictionCol='svm_pred_0', rawPredictionCol='svm_raw_0')
svm_1 = LinearSVC(featuresCol='features', labelCol='label_1', predictionCol='svm_pred_1', rawPredictionCol='svm_raw_1')
svm_2 = LinearSVC(featuresCol='features', labelCol='label_2', predictionCol='svm_pred_2', rawPredictionCol='svm_raw_2')

# build pipeline to generate predictions from base classifiers, will be used in task 1.3
gen_base_pred_pipeline = Pipeline(stages=[nb_0, nb_1, nb_2, svm_0, svm_1, svm_2])
gen_base_pred_pipeline_model = gen_base_pred_pipeline.fit(training_set)

# task 1.2
meta_features = gen_meta_features(training_set, nb_0, nb_1, nb_2, svm_0, svm_1, svm_2)
# meta_features.printSchema()

In [92]:
# build onehotencoder and vectorassembler pipeline
onehot_encoder = OneHotEncoderEstimator(inputCols=['nb_pred_0', 'nb_pred_1', 'nb_pred_2', 'svm_pred_0', 'svm_pred_1', 'svm_pred_2', 'joint_pred_0', 'joint_pred_1', 'joint_pred_2'], outputCols=['vec{}'.format(i) for i in range(9)])
vector_assembler = VectorAssembler(inputCols=['vec{}'.format(i) for i in range(9)], outputCol='meta_features')
gen_meta_feature_pipeline = Pipeline(stages=[onehot_encoder, vector_assembler])
gen_meta_feature_pipeline_model = gen_meta_feature_pipeline.fit(meta_features)
meta_features = gen_meta_feature_pipeline_model.transform(meta_features)

# train the meta clasifier
lr_model = LogisticRegression(featuresCol='meta_features', labelCol='label', predictionCol='final_prediction', maxIter=20, regParam=1., elasticNetParam=0)
meta_classifier = lr_model.fit(meta_features)

# task 1.3
pred_test = test_prediction(test_data, base_features_pipeline_model, gen_base_pred_pipeline_model, gen_meta_feature_pipeline_model, meta_classifier)
print("Schema of pred_test : ")
pred_test.printSchema()
end_time = time.time()
print("total running time: ", end_time-start_time)
# Evaluation
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",metricName='f1')
print("evaluate score of testing data is: ")
print(evaluator.evaluate(pred_test, {evaluator.predictionCol:'final_prediction'}))
spark.stop()

Schema of pred_test : 
root
 |-- id: integer (nullable = true)
 |-- label: double (nullable = false)
 |-- final_prediction: double (nullable = false)

total running time:  162.86369013786316
evaluate score of testing data is: 
0.7483312619309965
