## Library

In [1]:
import os
import re

from pyspark.sql.session import SparkSession
from pyspark.sql.functions import regexp_replace, col, when, split, expr, lower, sum, min, max, concat_ws

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql.functions import udf, lit
from pyspark.sql.types import DoubleType
from pyspark.ml.linalg import DenseVector
 

In [3]:
spark = SparkSession.builder \
    .appName("IMDB ML Project") \
    .config("spark.pyspark.python", "python") \
    .getOrCreate()

## Data Proccessing

#### Get Data From Local

In [4]:
data_dir = "../IMBD_ML/Data"
merged_data = None
for folder in os.listdir(data_dir):
    folder_path = os.path.join(data_dir, folder)
    if os.path.isdir(folder_path):
        for file in os.listdir(folder_path):
            if file.startswith("merged_movies_data_") and file.endswith(".csv"):
                file_path = os.path.join(folder_path, file)
                # Read CSV
                data = spark.read.csv(file_path, header=True,inferSchema=True)

                if merged_data is None:
                    merged_data = data
                else:
                    merged_data = merged_data.union(data)
        

In [5]:
merged_data.printSchema()

root
 |-- Title: string (nullable = true)
 |-- Movie Link: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- MPA: string (nullable = true)
 |-- Rating: double (nullable = true)
 |-- Votes: string (nullable = true)
 |-- budget: double (nullable = true)
 |-- grossWorldWide: double (nullable = true)
 |-- gross_US_Canada: double (nullable = true)
 |-- opening_weekend_Gross: double (nullable = true)
 |-- directors: string (nullable = true)
 |-- writers: string (nullable = true)
 |-- stars: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- countries_origin: string (nullable = true)
 |-- filming_locations: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- Languages: string (nullable = true)
 |-- wins: string (nullable = true)
 |-- nominations: string (nullable = true)
 |-- oscars: string (nullable = true)
 |-- release_date: string (nullable = true)



#### Cleaning Data

In [6]:
# Clean names
for col_name in merged_data.columns:
    merged_data = merged_data.withColumn(col_name, lower(col(col_name)))
    merged_data = merged_data.withColumnRenamed(col_name, col_name.lower().strip().replace(" ","_"))

In [7]:
# Extract ID , drop movie link
merged_data = merged_data.withColumn("movie_link", regexp_replace(col("movie_link"), r"/\?ref_=.*$", ""))
merged_data = merged_data.withColumn("id",\
                                    when(col("movie_link").contains("/"),\
                                    split(col("movie_link"), "/").getItem(4)).otherwise(None))
merged_data = merged_data.drop('movie_link')

In [8]:
# Change Duration to numeric
merged_data = merged_data.withColumn("duration", regexp_replace(col("duration"), "h", ""))
merged_data = merged_data.withColumn("duration", regexp_replace(col("duration"), "m", ""))

merged_data = merged_data.withColumn("duration",
    when(col("duration").contains(" "), 
         (split(col("duration"), " ").getItem(0).cast("float") * expr("3600")) +  # Giờ -> Phút
         split(col("duration"), " ").getItem(1).cast("float") * expr("60"))                   # Phút
    .otherwise(col("duration").cast("float") * expr("60"))  # Nếu chỉ có số phút, giữ nguyên
)


In [9]:
# Change to numeric
col_names = ['rating', 'budget', 'votes', 'grossworldwide', 'gross_us_canada', 'opening_weekend_gross']

for col_name in col_names:
    merged_data = merged_data.withColumn(col_name, col(col_name).cast("float"))
    merged_data = merged_data.withColumn(
        col_name,
        when(col(col_name).contains('k'), regexp_replace(col(col_name), 'k', '').cast("float") * 1_000)
        .when(col(col_name).contains('m'), regexp_replace(col(col_name), 'm', '').cast("float") * 1_000_000)
        .when(col(col_name).contains('b'), regexp_replace(col(col_name), 'b', '').cast("float") * 1_000_000_000)
        .otherwise(col(col_name).cast("float"))
    )

In [10]:
# Remove [ ] ' "
col_names = ['directors','writers','stars','genres','countries_origin','filming_locations','production_companies','languages','wins']
for col_name in col_names:
    merged_data = merged_data.withColumn(col_name, 
                                     regexp_replace(col(col_name), r"[\[\]\'\"]", ""))

In [11]:
# Remove number front title
merged_data = merged_data.withColumn("title", regexp_replace(col("title"), r"^\d+\.\s*", ""))

In [12]:
#  Drop Duplicate
merged_data = merged_data.dropDuplicates(['id'])
# Drop unsuitable
merged_data = merged_data.drop(col('release_date'))

#### Recommend model

In [13]:
col_for_rec = ['id','title','directors','stars','genres','languages']
rec_data = merged_data.select(*col_for_rec)
rec_data = rec_data.withColumn('combind',
                               concat_ws(" ", col("directors"),
                                         col("stars"), col("genres"),
                                         col("languages")))
rec_data = rec_data.withColumn(
    'combind',
    regexp_replace(col('combind'),', ','')
)

In [14]:
# Tokenizer
token = Tokenizer(inputCol='combind',outputCol='words')
combind_data = token.transform(rec_data)

# IF-IDF
hashingTF = HashingTF(inputCol = "words", outputCol="rawfeatures", numFeatures = 50000)
tf = hashingTF.transform(combind_data)
idf = IDF(inputCol='rawfeatures',outputCol='features')
tf_idf = idf.fit(tf)

# data final
tf_idf_data = tf_idf.transform(tf).select('id','title','features')

In [15]:
def Get_ID_Movie(movie_name, data):
    res = data.filter(col('title').contains(movie_name)).select('id','title')
    return res if res else None

In [16]:
def Recommend_Movie_Based_ID(id_movie, data, top_rec=5):
    # Lấy vector của phim cần tìm
    movie_vector_row = data.filter(data['id'] == id_movie).select("features").collect()
    
    # Kiểm tra nếu không tìm thấy phim
    if not movie_vector_row:
        print(f"Can't find: '{id_movie}' in dataset!")
        return None
    
    movie_vector = movie_vector_row[0]['features']

    # Cosine similarity function
    def cosine_similarity(vec1, vec2):
        vec1, vec2 = DenseVector(vec1), DenseVector(vec2)
        dot_product = float(vec1.dot(vec2))
        norm1 = float(vec1.norm(2))
        norm2 = float(vec2.norm(2))
        return dot_product / (norm1 * norm2) if norm1 * norm2 != 0 else 0.0
    
    cosine_sim_udf = udf(lambda x : cosine_similarity(x,movie_vector), DoubleType())
    
    # Tính cosine similarity
    top_rec_movie = (
        data.withColumn("similarity_score", cosine_sim_udf(col("features")))
        .orderBy(col("similarity_score").desc())
        .select("id", "title")
    )
    
    return top_rec_movie.limit(top_rec)

In [17]:
def Get_Recommend_Movie(name, data):
    list_id = Get_ID_Movie(name,data).select('id').collect()
    
    if len(list_id) == 1:
        return Recommend_Movie_Based_ID(list_id[0]['id'], data)

    if len(list_id) != 1:
        return None

In [18]:
movie_title = 'summer and smoke'
res = Get_Recommend_Movie(movie_title,tf_idf_data)
if res:
    print(f'I found multiple movies. Here is a recommendation based on: {movie_title}')
    res.show()
else:
    print(f'I cant find this: {movie_title} in database. Please provide more detailed information about the name.')

I found multiple movies. Here is a recommendation based on: summer and smoke


Py4JJavaError: An error occurred while calling o1026.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 141.0 failed 1 times, most recent failure: Lost task 5.0 in stage 141.0 (TID 403) (NguyenTien executor driver): java.io.IOException: Cannot run program "python3": CreateProcess error=2, The system cannot find the file specified
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1170)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1089)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:181)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:54)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:858)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.io.IOException: CreateProcess error=2, The system cannot find the file specified
	at java.base/java.lang.ProcessImpl.create(Native Method)
	at java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:500)
	at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:159)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1126)
	... 32 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2488)
	at org.apache.spark.rdd.RDD.$anonfun$reduce$1(RDD.scala:1139)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1121)
	at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$1(RDD.scala:1568)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1555)
	at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.$anonfun$executeCollect$1(limit.scala:291)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:285)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:392)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:420)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:392)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4333)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3539)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.io.IOException: Cannot run program "python3": CreateProcess error=2, The system cannot find the file specified
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1170)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1089)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:181)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:54)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:858)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more
Caused by: java.io.IOException: CreateProcess error=2, The system cannot find the file specified
	at java.base/java.lang.ProcessImpl.create(Native Method)
	at java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:500)
	at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:159)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1126)
	... 32 more


In [None]:
import sys
print(sys.executable)