In [1]:
# !pip install vncorenlp
# !pip install angdetect
# !pip install underthesea

In [2]:
from datetime import datetime
# This funtion will format "publishedAtDate" column for later usage
def formatToDatetime(date_string):
    date_string_without_fraction = date_string[:-5] + 'Z'
    date_format = "%Y-%m-%dT%H:%M:%SZ"
    return datetime.strptime(date_string_without_fraction, date_format)

# How to apply to dataframe
# df['date_column'] = df['date_column'].apply(lambda x: convert_to_datetime(x))

In [3]:
from langdetect import detect
def removeOtherLanguage(data):
    for i in range(len(data)):
        try: 
            if detect(str(data.loc[i, 'text'])) != "vi":
                #print(str(data.loc[i, 'text']), i)
                data = data.drop(i)
        except:
            #print(str(data.loc[i, 'text']))
            data = data.drop(i)
    return data

In [4]:
def unicodeReplace(text):
    replacements = {
        "òa": "oà", "óa": "oá", "ỏa": "oả", "õa": "oã", "ọa": "oạ",
        "òe": "oè", "óe": "oé", "ỏe": "oẻ", "õe": "oẽ", "ọe": "oẹ",
        "ùy": "uỳ", "úy": "uý", "ủy": "uỷ", "ũy": "uỹ", "ụy": "uỵ",
        "Ủy": "Uỷ", "\n": "." , "\t": "."  # Add more replacements as needed
    }
    for old, new in replacements.items():
        text = text.replace(old, new)
    return text

def unicode(data):
    data['text'] = data['text'].apply(unicodeReplace)
    return data

In [5]:
import re

def remove_emojis(text):
    emoji_pattern = re.compile("["
                               u"\U0001F600-\U0001F64F"  # emoticons
                               u"\U0001F300-\U0001F5FF"  # symbols & pictographs
                               u"\U0001F680-\U0001F6FF"  # transport & map symbols
                               u"\U0001F700-\U0001F77F"  # alchemical symbols
                               u"\U0001F780-\U0001F7FF"  # Geometric Shapes Extended
                               u"\U0001F800-\U0001F8FF"  # Supplemental Arrows-C
                               u"\U0001F900-\U0001F9FF"  # Supplemental Symbols and Pictographs
                               u"\U0001FA00-\U0001FA6F"  # Chess Symbols
                               u"\U0001FA70-\U0001FAFF"  # Symbols and Pictographs Extended-A
                               u"\U00002702-\U000027B0"  # Dingbats
                               u"\U000024C2-\U0001F251" 
                               "]+", flags=re.UNICODE)
    return emoji_pattern.sub(r'', text)

In [6]:
# Define the base Vietnamese alphabet without tone marks
vietnamese_alphabet = "aăâbcdđeêghiklmnoôơpqrstuưvwxy"
vietnamese_letter_with_tone = "áàạãảắằẵẳặấầẩẫậéèẻẽẹềếểễệòóỏõọồốổỗộờớởỡợúùũủụứừửữựíìĩỉịýỳỹỷỵ"

# Create uppercase Vietnamese letters with tone marks
uppercase_vietnamese_letters_with_tone = [char.upper() for char in vietnamese_letter_with_tone]
uppercase_vietnamese_alphabet = vietnamese_alphabet.upper()

# Combine the lists into strings
lowercase_string = vietnamese_alphabet + "".join(vietnamese_letter_with_tone)
uppercase_string = uppercase_vietnamese_alphabet + "".join(uppercase_vietnamese_letters_with_tone)
allcase_string = lowercase_string + uppercase_string

In [7]:
from string import punctuation
punctuation = "!\"#$%&'()*+,./:;<=>?@[\]^_`{|}~"

In [8]:
def stickyPreprocess(data):
    def processText(text):
        result = []
        for letter_id in range(len(text) - 2):
            prev, letter, after = text[letter_id], text[letter_id + 1], text[letter_id + 2]

            if letter in punctuation:
                if prev in allcase_string:
                    result.append(letter_id + 1)
                if after in allcase_string:
                    result.append(letter_id + 2)

            if letter in uppercase_string and prev in lowercase_string and letter_id != 0:
                result.extend([letter_id, letter_id + 1])

        for index in reversed(result):
            text = text[:index] + " " + text[index:]

        return text

    data['text'] = data['text'].apply(processText)
    return data

In [9]:
from vncorenlp import VnCoreNLP
annotator = VnCoreNLP("../vncorenlp/VnCoreNLP-1.1.1.jar", annotators="wseg", max_heap_size='-Xmx500m')

In [10]:
import pandas as pd

abbreviation_dict = '../vncorenlp/abbreviation_dictionary_vn.xlsx'
df = pd.read_csv('../vncorenlp/abbreviation_dictionary_vn.csv')
abbreviation_dict = df.set_index("abbreviation")["meaning"].to_dict()

In [11]:
def abbreviationPreprocess(data):
    def replaceWord(word, dictionary):
        return dictionary.get(word, word)

    def processText(text):
        annotator_text = annotator.tokenize(text)

        tokens = [it for sublist in annotator_text for it in sublist if it != '_']
        tokens = [replaceWord(it.lower(), abbreviation_dict) for it in tokens]

        sentences = [' '.join(sublist) for sublist in annotator_text]

        return pd.Series([' '.join(tokens), sentences], index=['text', 'sentences'])

    data[['text', 'sentences']] = data['text'].apply(processText)

    return data

In [12]:
import underthesea

def sentimentCal(sentences):
    sentiments = [underthesea.sentiment(text) for text in sentences]
    return sentiments

In [13]:
import joblib
model = joblib.load('../data_result/model.joblib')
selected_columns = ["pos_prop", "neg_prop", "reviewLength", "reviewHour"]

In [14]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pymongo import MongoClient
import requests
import pandas as pd
import json
import time
import gzip

In [15]:
scala_version = '2.12'  # your scala version
spark_version = '3.5.0' # your spark version
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.6.0' #your kafka version
]
spark = SparkSession.builder.master("local").appName("PREDICT_RATINGS_OF_GOOGLE_LOCAL_REVIEWS_IE212_O11_GROUP7").config("spark.jars.packages", ",".join(packages)).getOrCreate()
spark

In [16]:
# Usage of KafkaConsumer class
bootstrap_servers = 'localhost:9092'
topic_name = 'PREDICT_RATINGS_OF_GOOGLE_LOCAL_REVIEWS_IE212_O11_GROUP7'

kafkaDf = spark.read.format("kafka").option("kafka.bootstrap.servers", bootstrap_servers)\
                .option("subscribe", topic_name)\
                .option("startingOffsets", "earliest").load()

In [17]:
# Connect to local server
client = MongoClient("mongodb://127.0.0.1:27017/")

# Create database called animals
mydb = client["ie212_o11_group7"]

# Create Collection (table) called shelterA
collection = mydb.reviews

In [20]:
from time import sleep
from IPython.display import display, clear_output
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, ArrayType

# Khởi tạo SparkSession
spark = SparkSession.builder.appName("PREDICT_RATINGS_OF_GOOGLE_LOCAL_REVIEWS_IE212_O11_GROUP7").getOrCreate()

# Định nghĩa schema cho dữ liệu JSON
json_schema = StructType([
            StructField("reviewId", StringType(), True),
            StructField("placeId", StringType(), True),
            StructField("title", StringType(), True),
            StructField("location/lat", DoubleType(), True),
            StructField("location/lng", DoubleType(), True),
            StructField("categories", StringType(), True),
            StructField("categoryName", StringType(), True),
            StructField("reviewerId", StringType(), True),
            StructField("name", StringType(), True),
            StructField("stars", IntegerType(), True),
            StructField("text", StringType(), True),
            StructField("publishedAtDate", StringType(), True),
            StructField("last_update_time", DoubleType(), True)
            ])

# Đọc dữ liệu từ Kafka topic và chuyển đổi thành DataFrame
kafka_stream_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", bootstrap_servers) \
    .option("subscribe", topic_name) \
    .load()

kafkaDf = spark.read.format("kafka").option("kafka.bootstrap.servers", bootstrap_servers)\
                .option("subscribe", topic_name)\
                .option("startingOffsets", "earliest").load()

# Chuyển đổi giá trị từ JSON string sang struct với schema đã định nghĩa
json_stream_df = kafkaDf.selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", json_schema).alias("data")) \
    .select("data.*")

# Hàm callback để cập nhật average_rating
def update_average_rating(row):
    # Kết nối tới MongoDB
    client = MongoClient("mongodb://127.0.0.1:27017/")
    mydb = client["ie212_o11_group7"]
    places_collection = mydb.places
    
    # Lấy thông tin từ row
    place_id = row['placeId']
    stars = row['stars'] or 0
    predict_rating = row['Predict_rating']
    
    # Lấy thông tin từ MongoDB
    place_info = places_collection.find_one({'placeId': place_id})
    
    if place_info:
        total_rows = place_info['total_rows']

        current_average_rating = place_info['average_rating']
        new_average_rating = (current_average_rating * total_rows + stars) / (total_rows + 1)

        # Tính toán average_rating_new
        current_average_predict_rating = place_info['average_predict_rating']
        new_average_predict_rating = (current_average_predict_rating * total_rows + predict_rating) / (total_rows + 1)
        
        # Cập nhật dữ liệu trong MongoDB
        places_collection.update_one({'placeId': place_id},
                                     {'$set': {'average_rating': new_average_rating}})

        # Cập nhật dữ liệu trong MongoDB
        places_collection.update_one({'placeId': place_id},
                                     {'$set': {'average_predict_rating': new_average_predict_rating,
                                               'total_rows': total_rows + 1}})

last_consumer_time = 0
while True:
    try:
        # Sắp xếp DataFrame theo thời gian cập nhật giảm dần và chỉ lấy dòng đầu tiên
        newest_review_df = json_stream_df.sort(col("last_update_time").desc()).limit(1)
        temp_time = newest_review_df.select("last_update_time").first()[0]
        if temp_time > last_consumer_time:
            last_consumer_time = temp_time
        else:
            # Nếu thời gian không lớn hơn, tiếp tục vòng lặp
            continue

        # newest_review_df = json_stream_df.limit(1)
        newest_review_df_pandas = newest_review_df.toPandas()

        # Check if the pandas DataFrame is empty
        if newest_review_df_pandas.empty:
            continue
        newest_review_df_pandas = removeOtherLanguage(newest_review_df_pandas)
        if newest_review_df_pandas.empty:
            continue
        
        newest_review_df_pandas = unicode(newest_review_df_pandas)
        newest_review_df_pandas['text'] = newest_review_df_pandas['text'].apply(remove_emojis)
        newest_review_df_pandas = stickyPreprocess(newest_review_df_pandas)
        newest_review_df_pandas = abbreviationPreprocess(newest_review_df_pandas)
        newest_review_df_pandas["sentiment"] = newest_review_df_pandas["sentences"].apply(sentimentCal)

        newest_review_df_pandas['sentiment'] = newest_review_df_pandas['sentiment'].apply(lambda sentiments: ["neutral" if sentiment is None else sentiment for sentiment in sentiments])

        newest_review_df_pandas["text"] = [item + " ." for item in newest_review_df_pandas["text"]]
        newest_review_df_pandas['publishedAtDate'] = newest_review_df_pandas['publishedAtDate'].apply(lambda x: formatToDatetime(x))
        newest_review_df_pandas["reviewHour"] = [item.hour for item in newest_review_df_pandas["publishedAtDate"]]
        newest_review_df_pandas["reviewLength"] = [len(item) for item in newest_review_df_pandas["text"]]

        count_pos = newest_review_df_pandas['sentiment'].apply(lambda sentiments: sum(sentiment == "positive" for sentiment in sentiments))
        count_neg = newest_review_df_pandas['sentiment'].apply(lambda sentiments: sum(sentiment == "negative" for sentiment in sentiments))

        newest_review_df_pandas['num_sentiments'] = newest_review_df_pandas['sentiment'].apply(lambda sentiments: len(sentiments) if sentiments else 0)

        newest_review_df_pandas['pos_prop'] = count_pos / newest_review_df_pandas['num_sentiments']
        newest_review_df_pandas['neg_prop'] = count_neg / newest_review_df_pandas['num_sentiments']

        X = newest_review_df_pandas[selected_columns].copy()
        newest_review_df_pandas["Predict_rating"] = model.predict(X)

        # newest_review_df = newest_review_df.drop(columns = ['text'], ['pos_prop'], ['neg_prop'], [])
        
        # Chuyển DataFrame thành JSON string và lấy giá trị cột 'value'
        # message_value = newest_review_df.toJSON().first()
        json_string = newest_review_df_pandas.to_json(orient='records')
        json_list = json.loads(json_string)
        first_row = json_list[0]

        # In ra giá trị để kiểm tra
        #print(message_value)
        print(first_row)

        # Insert documents (rows) into the database's collection (table)
        collection.insert_one(first_row)
        
        # Gọi hàm callback để cập nhật average_rating
        update_average_rating(first_row)
        sleep(10)
        # clear_output(wait=True)

    except KeyboardInterrupt:
        print("break")
        break

print("Live view ended...")


{'reviewId': 'ChdDSUhNMG9nS0VJQ0FnSUR0bU5hMmtRRRAB', 'placeId': 'ChIJPZM8uVYpdTERLvi8c7WWMtU', 'title': '1 Dessert & Cafe', 'location/lat': 10.79644, 'location/lng': 106.6857013, 'categories': 'Quán cà phê', 'categoryName': 'Quán cà phê', 'reviewerId': '1.02E+20', 'name': 'Trọng Việt Đinh', 'stars': None, 'text': '1 dessert & cafe 277 hoàng_sa , tân_định . chỉ phù_hợp cho làm_việc trực tuyến . có bán bánh . giá trung_bình + . hơi ồn . .', 'publishedAtDate': 1706866730000, 'last_update_time': 1708631248.0162046, 'sentences': ['1 dessert & cafe 277 Hoàng_sa , tân_định .', 'Chỉ phù_hợp cho làm_việc online .', 'Có bán bánh .', 'Giá trung_bình + .', 'Hơi ồn .'], 'sentiment': ['positive', 'neutral', 'neutral', 'negative', 'negative'], 'reviewHour': 9, 'reviewLength': 124, 'num_sentiments': 5, 'pos_prop': 0.2, 'neg_prop': 0.4, 'Predict_rating': 3.0353781552}
{'reviewId': 'ChdDSUhNMG9nS0VJQ0FnSUR0NmRMNndBRRAB', 'placeId': 'ChIJC8n56g0pdTER0YYOkKUfacU', 'title': '27 ELITE SHOPPING COMPLEX', 'lo

In [None]:
newest_review_df.toDF()

IllegalArgumentException: requirement failed: The number of columns doesn't match.
Old column names (13): reviewId, placeId, title, location/lat, location/lng, categories, categoryName, reviewerId, name, stars, text, publishedAtDate, last_update_time
New column names (0): 