In [None]:
# PySpark 
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
# functions
from functions import *
import time
# ML
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.classification import LinearSVC, RandomForestClassifier, GBTClassifier, LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.feature import StandardScaler


In [None]:
conf = SparkConf().setAppName("spark").setMaster("local[*,20]").set("spark.driver.memory", "10g")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
iri = False
forward = False
qualified_names = {
    "xsd:QName",
    "prov:QUALIFIED_NAME"
}
label_map = {
    "Trusted": "1.0",
    "Uncertain": "0.0"
}
json_folder = ".../datasets/CM-Buildings/*.json"
label_csv = ".../datasets/CM-Buildings/graphs.csv"

In [None]:
def evaluate():
    file_and_path_rdd = spark.sparkContext.wholeTextFiles(json_folder)
    
    encoding_rdd = file_and_path_rdd.map(lambda x: (x[0].split("/")[-1], json_to_encoding(x[1],iri,forward,qualified_names)))
    # (file_name, prov_types of nodes)
    if forward:
        types_rdd = encoding_rdd.map(lambda x: (x[0], type_generate(x[1], level, specific_types_node, specific_types_edge)))
        # types_rdd = encoding_rdd.map(lambda x: (x[0], type_generate_mixed(x[1], level, specific_types_node, specific_types_edge)))
    else:
        types_rdd = encoding_rdd.map(lambda x: (x[0], type_generate_R(x[1], level, specific_types_node, specific_types_edge)))
    # (file_name, prov_types occurence in the graph)
    types_count_rdd = types_rdd.map(lambda x: (x[0], count_prov_types(level,x[1])))
    # All prov_types in this collection of graphs
    all_types = types_count_rdd.flatMap(lambda x: x[1].keys()).distinct().collect()
    # Number of distinct prov_types
    types_count = len(all_types)
    print(types_count)
    # index_map for prov_types, prov_type -> index
    index_map = {all_types[i]: i for i in range(types_count)}
    # index -> prov_type
    # Contruct feature vectors for each graph
    sparse_matrix_rdd = types_count_rdd.map(lambda x: (x[0], sparse_matrix(x[1], types_count, index_map)))
    feature_vector_rdd = sparse_matrix_rdd.map(lambda x: (x[0],Vectors.dense(x[1])))
    df_features = spark.createDataFrame(feature_vector_rdd).withColumnRenamed("_1", "file").withColumnRenamed("_2", "features")
    # Standardize features
    scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
    scaler_model = scaler.fit(df_features)
    df_features = scaler_model.transform(df_features)
    # Change the labels
    df_labels = spark.read.csv(label_csv, header=True)
    df_labels = df_labels.replace(label_map, subset=["label"])
    l = list(label_map.values())
    df_labels = df_labels.where(df_labels.label.isin(l))
    df_labels = df_labels.withColumn("label", df_labels["label"].cast(DoubleType()))
    # Join the features and labels
    df = df_features.join(df_labels, df_features.file == df_labels.graph_file).select(df_features.scaledFeatures, df_labels.label).withColumnRenamed("scaledFeatures", "features")
    # Oversample the training data
    labels = [float(x) for x in label_map.values()]
    count = {}
    for x in labels:
        count[x] = df.filter(df['label'] == x).count()
    maxValue = max(count.values())
    print(maxValue)
    ratio = {}
    for x in labels:
        ratio[x] = maxValue/count[x]
    dataframes = []
    for x in labels:
        if(count[x] == maxValue):
            dataframes.append(df.filter(df['label'] == x))
        else:
            dataframes.append(df.filter(df['label'] == x).sample(withReplacement=True, fraction=ratio[x]))
    train = dataframes[0]
    for dataframe in dataframes[1:]:
        train = train.union(dataframe)
    
    result = []
    
    # LinearSVC classifier
    start = time.time()
    svc = LinearSVC(maxIter = 100, threshold=0.0)

    pipeline = Pipeline(stages=[svc])
    paramGrid = ParamGridBuilder().addGrid(svc.regParam, [0.001,0.0001,0.00001]).addGrid(svc.maxIter, [100, 200]).build()
    # train the model and select the best model using "metricName"(hyperparameter tuning)
    crossval = CrossValidator(
        estimator=pipeline, 
        estimatorParamMaps=paramGrid, 
        evaluator=MulticlassClassificationEvaluator(metricName="accuracy"), 
        numFolds=10,
        collectSubModels=True)
    cvModel = crossval.fit(train)

    end = time.time()
    print(end-start)
    result.append(max(cvModel.avgMetrics))
    result.append(end-start)


    start = time.time()
    rf = RandomForestClassifier(labelCol="label", featuresCol="features")

    pipeline = Pipeline(stages=[rf])
    paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [5,10,20,30]).addGrid(rf.maxDepth, [4,5,6]).build()
    # train the model and select the best model using "metricName"(hyperparameter tuning)
    crossval = CrossValidator(
        estimator=pipeline, 
        estimatorParamMaps=paramGrid, 
        evaluator=MulticlassClassificationEvaluator(metricName="accuracy"), 
        numFolds=10,
        collectSubModels=True)
    cvModel = crossval.fit(train)
    end = time.time()
    result.append(max(cvModel.avgMetrics))
    result.append(end-start)
    print(end-start)
    start = time.time()
    gbt = GBTClassifier(labelCol="label", featuresCol="features")
    pipeline = Pipeline(stages=[gbt])
    paramGrid = ParamGridBuilder().addGrid(gbt.maxIter, [5,10]).addGrid(gbt.maxDepth, [3,4,5,6]).build()
    # train the model and select the best model using "metricName"(hyperparameter tuning)
    crossval = CrossValidator(
        estimator=pipeline, 
        estimatorParamMaps=paramGrid, 
        evaluator=MulticlassClassificationEvaluator(metricName="accuracy"), 
        numFolds=10,
        collectSubModels=True)
    cvModel = crossval.fit(train)
    end = time.time()
    result.append(max(cvModel.avgMetrics))
    result.append(end-start)
    print(end-start)
    start = time.time()
    lr = LogisticRegression(labelCol="label", featuresCol="features")
    pipeline = Pipeline(stages=[lr])
    paramGrid = ParamGridBuilder().addGrid(lr.maxIter, [5,10]).addGrid(lr.regParam, [0.2,0.3,0.4]).addGrid(lr.elasticNetParam, [0.6,0.7,0.8]).build()
    # train the model and select the best model using "metricName"(hyperparameter tuning)
    crossval = CrossValidator(
        estimator=pipeline,
        estimatorParamMaps=paramGrid,
        evaluator=MulticlassClassificationEvaluator(metricName="accuracy"),
        numFolds=10,
        collectSubModels=True)
    cvModel = crossval.fit(train)
    end = time.time()
    print(end-start)
    result.append(max(cvModel.avgMetrics))
    result.append(end-start)
    print(result)
    print(specific_types_edge, specific_types_node,level)

In [None]:
specific_types_edge = False
specific_types_node = False
level = 0
evaluate()