In [1]:
from kafka import KafkaConsumer
import json
from pyspark.sql import SparkSession
from pyspark.ml.feature import CountVectorizer, CountVectorizerModel, StringIndexer
from pyspark.ml.linalg import DenseVector
from pyspark.sql.functions import col, split
from pyspark.ml import Pipeline

In [2]:
kafka_bootstrap_servers = 'localhost:9092'
kafka_topic = 'anime'

In [3]:
consumer = KafkaConsumer(
    kafka_topic,
    bootstrap_servers=kafka_bootstrap_servers,
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    group_id='anime-consumer-group'
)

In [4]:
spark = SparkSession.builder \
    .appName("Content-Based Filtering") \
    .getOrCreate()

In [5]:
model_path = "./models/genre_vectorizer"
count_vectorizer_model = CountVectorizerModel.load(model_path)

In [6]:
data_path = "./data/cleaned/anime_data_cleaned.csv"
data = spark.read.csv(data_path, header=True, inferSchema=True)
data = data.dropna(subset=['link', 'title', 'genres', 'rate'])
data = data.withColumnRenamed('link', 'item_id')
data = data.withColumn("rate", col("rate").cast("double"))
data = data.withColumn("genres", split(col("genres"), ","))

data = count_vectorizer_model.transform(data)

In [7]:
item_indexer = StringIndexer(inputCol="item_id", outputCol="item_index")
preprocessing_pipeline = Pipeline(stages=[item_indexer])
preprocessed_pipeline_model = preprocessing_pipeline.fit(data)
preprocessed_data = preprocessed_pipeline_model.transform(data)

In [8]:
def recommend_by_title(title, top_n=5):
    item_data = preprocessed_data.filter(col("title") == title).select("item_id", "genre_vector", "genres").first()
    if not item_data:
        print(f"No anime found with title: {title}")
        return

    item_id = item_data["item_id"]
    genre_vector = DenseVector(item_data["genre_vector"].toArray())

    broadcast_genre_vector = spark.sparkContext.broadcast(genre_vector)

    def calculate_similarity(row):
        target_vector = broadcast_genre_vector.value
        row_vector = DenseVector(row.genre_vector.toArray())
        dot_product = sum(target_vector[i] * row_vector[i] for i in range(len(target_vector)))
        norm_target = sum(x ** 2 for x in target_vector) ** 0.5
        norm_row = sum(x ** 2 for x in row_vector) ** 0.5
        similarity = dot_product / (norm_target * norm_row) if norm_target and norm_row else 0.0
        return row.title, row.item_id, row.genres, similarity

    similar_items_rdd = preprocessed_data.rdd.map(calculate_similarity)

    similar_items = similar_items_rdd.filter(lambda x: x[1] != item_id).takeOrdered(top_n, key=lambda x: -x[3])

    for title, item_id, genres, similarity in similar_items:
        print(f"Title: {title}, Link: {item_id}, Genres: {genres}, Similarity: {similarity}")


In [9]:
for message in consumer:
    record = message.value  # Lấy giá trị của message từ Kafka

    print(f"Consumed record from Kafka: {record}")
    userId = record['user_id']
    item_id = record['item_id']
    titles_watched = record['titles_watched']
    genres = record['genres']
    rate = record['rate']
    
    # Tiền xử lý dữ liệu nhận được từ Kafka (chuyển genres thành vector)
    data = spark.createDataFrame([{
        'user_id': userId,
        'item_id': item_id,
        'title': titles_watched,
        'genres': genres,
        'rate': rate
    }])

    transformed_data = count_vectorizer_model.transform(data)
    print("Recommendation after watch: ", titles_watched)

    recommend_by_title(titles_watched)


Consumed record from Kafka: {'user_id': 1, 'item_id': 'https://animevietsub.page/phim/mushoku-tensei-isekai-ittara-honki-dasu-2nd-season-a4158/', 'titles_watched': 'Thất Nghiệp Chuyển Sinh Part 2', 'genres': ['Fantasy', ' Ecchi', ' Drama', ' Magic'], 'rate': 9.3}
Recommendation after watch:  Thất Nghiệp Chuyển Sinh Part 2
Title: Thất Nghiệp Chuyển Sinh Special, Link: https://animevietsub.page/phim/mushoku-tensei-isekai-ittara-honki-dasu-special-a4489/, Genres: ['Fantasy', ' Ecchi', ' Drama', ' Magic'], Similarity: 1.0
Title: Thất Nghiệp Chuyển Sinh Mùa 2, Link: https://animevietsub.page/phim/mushoku-tensei-isekai-Ittara-honki-dasu-2nd-season-a4627/, Genres: ['Fantasy', ' Ecchi', ' Drama', ' Magic'], Similarity: 1.0
Title: Thất Nghiệp Chuyển Sinh, Link: https://animevietsub.page/phim/that-nghiep-chuyen-sinh-a3940/, Genres: ['Fantasy', ' Drama', ' Magic'], Similarity: 0.8660254037844387
Title: Fate/kaleid liner Prisma☆Illya 2wei! OVA, Link: https://animevietsub.page/phim/fatekaleid-liner

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "c:\Users\Illya\Workspace\Nam4\IE212\DoAn\Anime\venv\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "c:\Users\Illya\Workspace\Nam4\IE212\DoAn\Anime\venv\lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "C:\laragon\bin\python\python-3.10\lib\socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 