In [None]:
# import necessary modules from sparkkgml and libraries
import time
from pyspark.sql import SparkSession
from sparkkgml.data_acquisition import DataAcquisition
from sparkkgml.feature_engineering import FeatureEngineering
from sparkkgml.vectorization import Vectorization
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import size,length,concat_ws
from pyspark.sql.types import StringType

In [None]:
# prepare endpoint and query
endpoint = "https://sparkkgml.arcc.albany.edu/lmdb"
query ="""SELECT
        ?movie
        ?movie_title
        ?genre
        ?director
        ?actor
        (<http://www.w3.org/2001/XMLSchema#int>(?movie__down_runtime) as ?runtime)
        WHERE { 
        ?movie <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://data.linkedmdb.org/movie/film> . 
        ?movie <http://data.linkedmdb.org/movie/genre> ?movie__down_genre . ?movie__down_genre <http://data.linkedmdb.org/movie/film_genre_name> ?genre .
       
        
        OPTIONAL { ?movie <http://purl.org/dc/terms/title> ?movie_title . } 
        OPTIONAL { ?movie <http://data.linkedmdb.org/movie/runtime> ?movie__down_runtime . } 
        OPTIONAL { ?movie <http://data.linkedmdb.org/movie/actor> ?movie__down_actor . ?movie__down_actor <http://data.linkedmdb.org/movie/actor_name> ?actor . } 
        OPTIONAL { ?movie <http://data.linkedmdb.org/movie/director> ?movie__down_director .  ?movie__down_director <http://data.linkedmdb.org/movie/director_name> ?director . } 
        
        FILTER (?genre = 'Spy' || ?genre = 'Superhero' )
        }
        """

In [None]:
# run the code in a loop and append runtimes and accuracies for every run 

data_acq_times=[]
feature_eng_times=[]
vectorization_times=[]
vectorAssembler_times=[]
total_times=[]
train_accuracies= []
test_accuracies=[]


for i in range(10):
    total_time_start = time.time()
    # create an instance of DataAcquisition
    # set parameters for null values
    dataAcquisitionObject=DataAcquisition(spark)
    dataAcquisitionObject.set_amputationMethod('nullReplacement')
    dataAcquisitionObject.set_nullReplacementMethod('customValue')
    dataAcquisitionObject.set_customValueVariable(-1)
    dataAcquisitionObject.set_customStringValueVariable(' ')

    # retrieve the data as a Spark DataFrame
    start_time1 = time.time()
    df = dataAcquisitionObject.getDataFrame(endpoint=endpoint, query=query)
    end_time1 = time.time()
    data_acq_times.append(round(end_time1 - start_time1, 2))

    # create an instance of FeatureEngineering
    # call getFeatures function and get features for every column
    featureEngineeringObject=FeatureEngineering()
    start_time1 = time.time()
    df2,features=featureEngineeringObject.getFeatures(df)
    end_time1 = time.time()
    feature_eng_times.append(round(end_time1 - start_time1, 2))

    # apply preprocess to transform genres to have only 1 genre per movie
    filtered_df = df2.filter(size(df2.genre) <= 1)
    filtered_df = filtered_df.withColumn('genre', concat_ws(',', filtered_df['genre']))
    features['genre']['isListOfEntries']=False
    features['genre']['featureType']='Single_Categorical_String'

    # create an instance of Vectorization module
    # call vectorize function and digitaze all the features
    vectorizationObject=Vectorization()
    start_time1 = time.time()
    digitized_df=vectorizationObject.vectorize(filtered_df,features)
    end_time1 = time.time()
    vectorization_times.append(round(end_time1 - start_time1, 2))

    # assemble features into a vector
    assembler = VectorAssembler(inputCols=["movie_title", "director", "runtime", "actor"], outputCol="features")
    start_time1 = time.time()
    assembled_data = assembler.transform(digitized_df)
    end_time1 = time.time()
    vectorAssembler_times.append(round(end_time1 - start_time1, 2))

    # split the data into training and testing sets (70% training, 30% testing)
    train_data, test_data = assembled_data.randomSplit([0.7, 0.3])
    # create a RandomForestClassifier
    rf_classifier = RandomForestClassifier(labelCol="genre", featuresCol="features", numTrees=10)
    # train the model
    model = rf_classifier.fit(train_data)

    # make predictions on both train and test data
    train_predictions = model.transform(train_data)
    test_predictions = model.transform(test_data)

    # evaluate the model on train data using MulticlassClassificationEvaluator
    train_evaluator = MulticlassClassificationEvaluator(labelCol="genre", predictionCol="prediction", metricName="accuracy")
    train_accuracy = train_evaluator.evaluate(train_predictions)

    # evaluate the model on test data using MulticlassClassificationEvaluator
    test_evaluator = MulticlassClassificationEvaluator(labelCol="genre", predictionCol="prediction", metricName="accuracy")
    test_accuracy = test_evaluator.evaluate(test_predictions)

    # save the accuracies
    train_accuracies.append(round(train_accuracy, 5))
    test_accuracies.append(round(test_accuracy, 5))

    # Print the train and test accuracies
    #print("Train Accuracy = {:.2f}%".format(train_accuracy * 100))
    #print("Test Accuracy = {:.2f}%".format(test_accuracy * 100))
    total_time_end = time.time()
    total_times.append(round(total_time_end - total_time_start, 2))    
    #print(f"Total Time: {total_time_end - total_time_start:.2f} seconds")

In [None]:
print('data_acq_times:', data_acq_times)
print('feature_eng_times:', feature_eng_times)
print('vectorization_times:', vectorization_times)
print('vectorAssembler_times:', vectorAssembler_times)
print('total_times:', total_times)
print('train_accuracies:', train_accuracies)
print('test_accuracies:', test_accuracies)