In [1]:
# PySpark 
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
# functions
from functions import *
import time
# ML
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.classification import LinearSVC, RandomForestClassifier, GBTClassifier, LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.feature import StandardScaler


In [2]:
conf = SparkConf().setAppName("spark").setMaster("local[*,20]").set("spark.driver.memory", "10g")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
iri = False
forward = False
qualified_names = {
    "xsd:QName",
    "prov:QUALIFIED_NAME"
}
label_map = {
    "Valor": 0.0,
    "Instinct": 1.0,
    "Mystic": 2.0
}


22/04/06 15:36:42 WARN Utils: Your hostname, cuiyeshuaideMacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.31.122 instead (on interface en0)
22/04/06 15:36:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/06 15:36:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/04/06 15:36:45 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
def giao():
    file_and_path_rdd = spark.sparkContext.wholeTextFiles(json_folder)
    
    encoding_rdd = file_and_path_rdd.map(lambda x: (x[0].split("/")[-1], json_to_encoding(x[1],iri,forward,qualified_names)))
    # (file_name, prov_types of nodes)
    if forward:
        types_rdd = encoding_rdd.map(lambda x: (x[0], type_generate(x[1], level, specific_types_node, specific_types_edge)))
        # types_rdd = encoding_rdd.map(lambda x: (x[0], type_generate_mixed(x[1], level, specific_types_node, specific_types_edge)))
    else:
        types_rdd = encoding_rdd.map(lambda x: (x[0], type_generate_R(x[1], level, specific_types_node, specific_types_edge)))
    # (file_name, prov_types occurence in the graph)
    types_count_rdd = types_rdd.map(lambda x: (x[0], count_prov_types(level,x[1])))
    # All prov_types in this collection of graphs
    all_types = types_count_rdd.flatMap(lambda x: x[1].keys()).distinct().collect()
    # Number of distinct prov_types
    types_count = len(all_types)
    print(types_count)
    # index_map for prov_types, prov_type -> index
    index_map = {all_types[i]: i for i in range(types_count)}
    # index -> prov_type
    # Contruct feature vectors for each graph
    sparse_matrix_rdd = types_count_rdd.map(lambda x: (x[0], sparse_matrix(x[1], types_count, index_map)))
    feature_vector_rdd = sparse_matrix_rdd.map(lambda x: (x[0],Vectors.dense(x[1])))
    df_features = spark.createDataFrame(feature_vector_rdd).withColumnRenamed("_1", "file").withColumnRenamed("_2", "features")
    # Standardize features
    scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
    scaler_model = scaler.fit(df_features)
    df_features = scaler_model.transform(df_features)
    # Change the labels
    df_labels = spark.read.csv(label_csv, header=True)
    df_labels = df_labels.replace(label_map, subset=["label"])
    l = list(label_map.values())
    df_labels = df_labels.where(df_labels.label.isin(l))
    df_labels = df_labels.withColumn("label", df_labels["label"].cast(DoubleType()))
    # Join the features and labels
    df = df_features.join(df_labels, df_features.file == df_labels.graph_file).select(df_features.scaledFeatures, df_labels.label).withColumnRenamed("scaledFeatures", "features")
    # Oversample the training data
    labels = [float(x) for x in label_map.values()]
    count = {}
    for x in labels:
        count[x] = df.filter(df['label'] == x).count()
    maxValue = max(count.values())
    print(maxValue)
    ratio = {}
    for x in labels:
        ratio[x] = maxValue/count[x]
    dataframes = []
    for x in labels:
        if(count[x] == maxValue):
            dataframes.append(df.filter(df['label'] == x))
        else:
            dataframes.append(df.filter(df['label'] == x).sample(withReplacement=True, fraction=ratio[x]))
    train = dataframes[0]
    for dataframe in dataframes[1:]:
        train = train.union(dataframe)
    
    result = []



    start = time.time()
    rf = RandomForestClassifier(labelCol="label", featuresCol="features")

    pipeline = Pipeline(stages=[rf])
    paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [5,10,20,30]).addGrid(rf.maxDepth, [4,5,6]).build()
    # train the model and select the best model using "metricName"(hyperparameter tuning)
    crossval = CrossValidator(
        estimator=pipeline, 
        estimatorParamMaps=paramGrid, 
        evaluator=MulticlassClassificationEvaluator(metricName="accuracy"), 
        numFolds=10,
        collectSubModels=True)
    cvModel = crossval.fit(train)
    end = time.time()
    result.append(max(cvModel.avgMetrics))
    result.append(end-start)
    print(list(zip(cvModel.avgMetrics, paramGrid)))
    print(end-start)

    start = time.time()
    lr = LogisticRegression(labelCol="label", featuresCol="features")
    pipeline = Pipeline(stages=[lr])
    paramGrid = ParamGridBuilder().addGrid(lr.maxIter, [5,10]).addGrid(lr.regParam, [0.2,0.3,0.4]).addGrid(lr.elasticNetParam, [0.6,0.7,0.8]).build()
    # train the model and select the best model using "metricName"(hyperparameter tuning)
    crossval = CrossValidator(
        estimator=pipeline,
        estimatorParamMaps=paramGrid,
        evaluator=MulticlassClassificationEvaluator(metricName="accuracy"),
        numFolds=10,
        collectSubModels=True)
    cvModel = crossval.fit(train)
    end = time.time()
    print(end-start)
    result.append(max(cvModel.avgMetrics))
    result.append(end-start)
    print(list(zip(cvModel.avgMetrics, paramGrid)))
    print(result)
    

In [4]:
json_folder = "/Users/cuiyeshuai/Documents/UG modules/Individual Project/provenance-kernel-evaluation-master/datasets/PG-T/*.json"
label_csv = "/Users/cuiyeshuai/Documents/UG modules/Individual Project/provenance-kernel-evaluation-master/datasets/PG-T/graphs.csv"
label_map = {
    "Valor": "0.0",
    "Instinct": "1.0",
    "Mystic": "2.0"
}

In [5]:
specific_types_edge = True
specific_types_node = True
level = 1
giao()

                                                                                

15


                                                                                

417
[(0.8344805505691006, {Param(parent='RandomForestClassifier_24123b47ea89', name='numTrees', doc='Number of trees to train (>= 1).'): 5, Param(parent='RandomForestClassifier_24123b47ea89', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 4}), (0.8360934537949072, {Param(parent='RandomForestClassifier_24123b47ea89', name='numTrees', doc='Number of trees to train (>= 1).'): 5, Param(parent='RandomForestClassifier_24123b47ea89', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5}), (0.8337125014139548, {Param(parent='RandomForestClassifier_24123b47ea89', name='numTrees', doc='Number of trees to train (>= 1).'): 5, Param(parent='RandomForestClassifier_24123b47ea89', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1

22/04/06 15:37:46 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/04/06 15:37:46 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/04/06 15:37:49 ERROR Instrumentation: java.lang.IllegalArgumentException
	at org.apache.xbean.asm9.ClassReader.<init>(ClassReader.java:259)
	at org.apache.xbean.asm9.ClassReader.<init>(ClassReader.java:177)
	at org.apache.xbean.asm9.ClassReader.<init>(ClassReader.java:163)
	at org.apache.xbean.asm9.ClassReader.<init>(ClassReader.java:284)
	at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:49)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:358)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2477)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$1(RDD.scala:860)
	at org.apache.spark.rdd.RDDOperationScope$.wi

Py4JJavaError: An error occurred while calling o35843.fit.
: java.lang.IllegalArgumentException
	at org.apache.xbean.asm9.ClassReader.<init>(ClassReader.java:259)
	at org.apache.xbean.asm9.ClassReader.<init>(ClassReader.java:177)
	at org.apache.xbean.asm9.ClassReader.<init>(ClassReader.java:163)
	at org.apache.xbean.asm9.ClassReader.<init>(ClassReader.java:284)
	at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:49)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:358)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2477)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$1(RDD.scala:860)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:859)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1231)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1222)
	at org.apache.spark.ml.optim.loss.RDDLossFunction.calculate(RDDLossFunction.scala:61)
	at org.apache.spark.ml.optim.loss.RDDLossFunction.calculate(RDDLossFunction.scala:47)
	at breeze.optimize.CachedDiffFunction.calculate(CachedDiffFunction.scala:24)
	at breeze.optimize.OWLQN$$anon$1.calculate(OWLQN.scala:75)
	at breeze.optimize.OWLQN$$anon$1.calculate(OWLQN.scala:72)
	at breeze.optimize.BacktrackingLineSearch.iterations(BacktrackingLineSearch.scala:30)
	at breeze.optimize.ApproximateLineSearch.minimize(LineSearch.scala:26)
	at breeze.optimize.ApproximateLineSearch.minimize$(LineSearch.scala:26)
	at breeze.optimize.BacktrackingLineSearch.minimize(BacktrackingLineSearch.scala:11)
	at breeze.optimize.OWLQN.determineStepSize(OWLQN.scala:87)
	at breeze.optimize.OWLQN.determineStepSize(OWLQN.scala:17)
	at breeze.optimize.FirstOrderMinimizer.$anonfun$infiniteIterations$1(FirstOrderMinimizer.scala:60)
	at scala.collection.Iterator$$anon$7.next(Iterator.scala:140)
	at breeze.util.IteratorImplicits$RichIterator$$anon$2.next(Implicits.scala:74)
	at org.apache.spark.ml.classification.LogisticRegression.trainImpl(LogisticRegression.scala:1009)
	at org.apache.spark.ml.classification.LogisticRegression.$anonfun$train$1(LogisticRegression.scala:628)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:495)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:286)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:115)
	at sun.reflect.GeneratedMethodAccessor197.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


In [None]:
specific_types_edge = False
specific_types_node = True
level = 1
giao()

In [None]:
specific_types_edge = True
specific_types_node = False
level = 1
giao()

In [None]:
specific_types_edge = False
specific_types_node = False
level = 1
giao()

In [None]:
specific_types_edge = True
specific_types_node = True
level = 2
giao()

In [None]:
specific_types_edge = False
specific_types_node = True
level = 2
giao()

In [None]:
specific_types_edge = True
specific_types_node = False
level = 2
giao()

In [None]:
specific_types_edge = False
specific_types_node = False
level = 2
giao()

In [None]:
specific_types_edge = True
specific_types_node = True
level = 3
giao()

In [None]:
specific_types_edge = False
specific_types_node = True
level = 3
giao()

In [None]:
specific_types_edge = False
specific_types_node = True
level = 3
giao()

In [None]:
specific_types_edge = False
specific_types_node = False
level = 3
giao()

In [None]:
specific_types_edge = True
specific_types_node = True
level = 4
giao()

In [None]:
specific_types_edge = False
specific_types_node = True
level = 4
giao()

In [None]:
specific_types_edge = False
specific_types_node = True
level = 4
giao()

In [None]:
specific_types_edge = False
specific_types_node = False
level = 4
giao()

In [None]:
specific_types_edge = True
specific_types_node = True
level = 5
giao()

In [None]:
specific_types_edge = False
specific_types_node = True
level = 5
giao()

In [None]:
specific_types_edge = True
specific_types_node = False
level = 5
giao()

In [None]:
specific_types_edge = False
specific_types_node = False
level = 5
giao()

In [None]:
specific_types_edge = True
specific_types_node = True
level = 6
giao()

In [None]:
specific_types_edge = False
specific_types_node = True
level = 6
giao()

In [None]:
specific_types_edge = True
specific_types_node = False
level = 6
giao()

In [None]:
specific_types_edge = False
specific_types_node = False
level = 6
giao()