In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from functools import reduce
from pyspark.ml.feature import StringIndexer, VectorAssembler,OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col

## Spark Session

In [2]:
def init_spark():
  return SparkSession \
        .builder \
        .appName("Big data project") \
        .config("spark.executor.memory", "8g") \
        .config("spark.driver.memory", "8g") \
        .getOrCreate()

spark = init_spark()

24/04/02 16:27:37 WARN Utils: Your hostname, DESKTOP-Raveena resolves to a loopback address: 127.0.1.1; using 172.21.63.63 instead (on interface eth0)
24/04/02 16:27:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/02 16:27:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Data preparation

In [3]:
genres_to_keep = ["action", "adventure", "horror", "crime", "romance", "thriller"]
dfs=[]
for gen in genres_to_keep:
    final_df_read_act=spark.read.option("header", "true").option("multiline", "true").csv("data/"+gen+".csv").withColumn("genre",lit(gen)).limit(10000)
    dfs.append(final_df_read_act)

def unionAll(dfs):
    return reduce(DataFrame.unionAll, dfs)

df = unionAll(dfs)

                                                                                

## Data Cleaning

In [4]:
df=df.drop("certificate","runtime","rating","votes","gross(in $)")
df.columns

['movie_id',
 'movie_name',
 'year',
 'genre',
 'description',
 'director',
 'director_id',
 'star',
 'star_id']

In [5]:
def drop_duplicates(train_df):
    print(train_df.count())
    train_df=train_df.dropDuplicates()
    print(train_df.count())
    
drop_duplicates(df)
#drop null values
df = df.dropna()

                                                                                

60000




59997


                                                                                

In [6]:
df.write.option("header", "true").csv("./final_dataset5_imdb.csv")

                                                                                

## Utils

In [7]:
def get_data_dataframe(dir="final_dataset.csv"):
    csv_directory = dir
    final_df_read = spark.read.option("header", "true").option("multiline", "true").csv(csv_directory)
    shuffled_df = final_df_read.withColumn("rand", rand())
    shuffled_df = shuffled_df.orderBy("rand")
    final_df_read = shuffled_df.drop("rand")
    return final_df_read

In [8]:
def print_class_distribution(df):
    # Group the DataFrame by the "genre" column and count the occurrences of each genre
    genre_counts = df.groupBy("genre").count()
    
    # Show the genre counts
    genre_counts.show()

In [9]:
final_df_read=get_data_dataframe("final_dataset5_imdb.csv")
# final_df_read.head(5)
print_class_distribution(final_df_read)

                                                                                

+---------+-----+
|    genre|count|
+---------+-----+
|  romance| 9831|
|   horror| 9585|
|adventure| 9434|
|    crime| 9758|
| thriller| 9653|
|   action| 9494|
+---------+-----+



In [10]:
df=get_data_dataframe("final_dataset5_imdb.csv").limit(50000)
df.count()

                                                                                

50000

In [11]:
def print_and_return_mapping_of_index_to_label(df,predicting='genre'):
    # Collect distinct pairs of (label, genre)
    label_genre_mapping = df.select("label", predicting).distinct().collect()
    map={}
    # Print the mapping
    for mapping in label_genre_mapping:
        print("Label %s is mapped to genre '%s'" % (mapping.label, mapping.genre))
        map[mapping.label]= mapping.genre
    return map

## Data Encoding

In [12]:
def get_stages_of_encoding(categoricalColumns =['year',
     'description',
     'director',
     'star'], predicting='genre'):
    stages = []
    for categoricalCol in categoricalColumns:
        stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
        # encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
        stages += [stringIndexer]
    label_stringIdx = StringIndexer(inputCol = predicting, outputCol = 'label')
    stages += [label_stringIdx]
    
    assemblerInputs = [c + "Index" for c in categoricalColumns] 
    assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
    stages += [assembler]
    return stages

In [13]:
def get_encoded_data(df):
    pipeline = Pipeline(stages = stages)
    pipelineModel = pipeline.fit(df)
    final_df_read = pipelineModel.transform(df)
    selectedCols = ['label', 'features','genre']+['year',
     'description',
     'director',
     'star']
    final_df_read = final_df_read.select(selectedCols)
    final_df_read.printSchema()
    return final_df_read

In [14]:
stages=get_stages_of_encoding()
final_df_read=get_encoded_data(final_df_read)

                                                                                

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- genre: string (nullable = true)
 |-- year: string (nullable = true)
 |-- description: string (nullable = true)
 |-- director: string (nullable = true)
 |-- star: string (nullable = true)



## Evaluation metrics

In [None]:
def print_statistis(predictions,mapping):

    # Compute raw scores on the test set
    predictionAndLabels = predictions.rdd.map(lambda lp: (lp.prediction, lp.label))
    
    # Instantiate metrics object
    metrics = MulticlassMetrics(predictionAndLabels)
    confusion_matrix = metrics.confusionMatrix().toArray()
    
    # Overall statistics
    precision = metrics.precision(1.0)
    recall = metrics.recall(1.0)
    f1Score = metrics.fMeasure(1.0)
    total_predictions = confusion_matrix.sum(axis=1)
    print("Summary Stats")
    print("Precision = %s" % precision)
    print("Recall = %s" % recall)
    print("F1 Score = %s" % f1Score)
    
    # Statistics by class
    labels = predictions.rdd.map(lambda lp: lp.label).distinct().collect()
    accuracies = {}
    for label in sorted(labels):
        print("______________________"+mapping[label]+"_____________________")
        print("Class %s precision = %s" % (label, metrics.precision(label)))
        print("Class %s recall = %s" % (label, metrics.recall(label)))
        print("Class %s F1 Measure = %s" % (label, metrics.fMeasure(label, beta=1.0)))
        
    
    # Calculate the accuracy for each label
    print(total_predictions)
    for label in range(len(total_predictions)):
        correct_predictions = confusion_matrix[label, label]
        accuracy = correct_predictions / total_predictions[label]
        accuracies[label] = accuracy
    
    # Print accuracies for each label
    for label, accuracy in accuracies.items():
        print("Accuracy for label %s: %s" % (label, accuracy))

## Model implementation

## Decision Trees

In [15]:
#decision tree
# Split the data into training and testing sets
train_df, test_df = final_df_read.randomSplit([0.8, 0.2], seed=42)

# Show the number of rows in each set
print("Training set count:", train_df.count())
print("Testing set count:", test_df.count())

24/04/02 16:29:37 WARN DAGScheduler: Broadcasting large task binary with size 11.2 MiB
                                                                                

Training set count: 46438


24/04/02 16:29:41 WARN DAGScheduler: Broadcasting large task binary with size 11.2 MiB
[Stage 122:>                                                        (0 + 8) / 8]

Testing set count: 11317


                                                                                

In [17]:
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxBins=60000)
dtModel = dt.fit(train_df)
predictions = dtModel.transform(test_df)
# predictions.show(10)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy:.2f}")

ConnectionRefusedError: [Errno 111] Connection refused

In [None]:
mapp=print_and_return_mapping_of_index_to_label(train_df)
print_statistis(predictions,mapp)

## DT with cross validation

In [45]:
#DT with cross validation
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxBins=60000)

paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5, 10, 15]) \
    .build()

cross_validator = CrossValidator(estimator=dt,
                                 estimatorParamMaps=paramGrid,
                                 evaluator=evaluator,
                                 numFolds=5)

cv_model = cross_validator.fit(train_df)
predictions = cv_model.transform(test_df)
evaluator = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy:.2f}")

In [None]:
mapp=print_and_return_mapping_of_index_to_label(train_df)
print_statistis(predictions,mapp)

## Random Classifier

In [None]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features",)
rf_model = rf.fit(train_df)

predictions = rf_model.transform(test_df)

evaluator = MulticlassClassificationEvaluator(labelCol='label', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = {:.2f}".format(accuracy))

In [None]:
mapp=print_and_return_mapping_of_index_to_label(train_df)
print_statistis(predictions,mapp)

## RF with cross validation

In [44]:
#randomClassifier with cross validation
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
# pipeline = Pipeline(stages=[stringIndexer, assembler, rf])

#the hyperparameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 30]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .build()

# cross-validator
cross_validator = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy"),
                          numFolds=5, seed=42)

# model training with the best hyperparameters
cv_model = cross_validator.fit(train_df)

best_rf_model = cv_model.bestModel.stages[-1]
importances = best_rf_model.featureImportances
feature_list = ['movie_id',
 'movie_name',
 'year',
 'genre',
 'description',
 'director',
 'director_id',
 'star',
 'star_id']

print("Feature Importances:")
for feature, importance in zip(feature_list, importances):
    print(f"{feature}: {importance:.4f}")


# Make predictions on the test data
predictions = cv_model.transform(test_df)

evaluator = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")

# Evaluate the model
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = {:.2f}".format(accuracy))

ConnectionRefusedError: [Errno 111] Connection refused

In [None]:
mapp=print_and_return_mapping_of_index_to_label(train_df)
print_statistis(predictions,mapp)