In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro, to_avro
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json
from pyspark.ml import Pipeline,PipelineModel

<span style="color: blue; font-size:20px; font-weight:bold ">Tạo SparkSession và cấu hình để tương tác với Kafka và MongoDB</span> 

In [2]:
# spark.sparkContext.stop()

#Spark Session creation configured to interact with Kfka and MongoDB
spark = SparkSession.builder.appName("pyspark-notebook").\
config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-avro_2.12:3.0.0,org.mongodb.spark:mongo-spark-connector_2.12:3.0.0").\
config("spark.mongodb.input.uri","mongodb://intent-mongo-1:27017/twitter_db.tweets").\
config("spark.mongodb.output.uri","mongodb://intent-mongo-1:27017/twitter_db.tweets").\
getOrCreate()

Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/lib/python3.7/dist-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6c9cbdb3-30f0-44f9-a2ae-c0dadc855965;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.7.5 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in

Tạo một phiên làm việc Spark (SparkSession), cấu hình các thư viện cần thiết để làm việc với Kafka và MongoDB.

In [3]:
# spark.sparkContext.stop()

In [4]:
print(spark.version)

3.0.0


<span style="color: blue; font-size:20px; font-weight:bold ">Đọc Schema từ File JSON</span>

In [5]:
#Read schema file and create schema of string type
json_schema = ''
with open("schema/out/tweet_schema.json") as f:
    new_schema = StructType.fromJson(json.load(f))
    json_schema = new_schema.simpleString()

Đọc schema JSON từ file để định nghĩa cấu trúc dữ liệu của các tweet

<span style="color: blue; font-size:20px; font-weight:bold ">Đọc Dữ Liệu từ Kafka</span>

In [6]:
#Read data from Kafka topic
json_tweets = spark\
  .readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "intent-kafka-1:9092")\
  .option("subscribe", "tweets")\
  .option("startingOffsets", "earliest")\
  .load()\
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

Thiết lập một stream để đọc dữ liệu từ Kafka, từ topic "twitter_demo".

In [7]:
json_tweets.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



In [8]:
# Kiểm tra schema đọc từ tệp
print(json_schema)

struct<continuation_token:string,results:array<struct<binding_values:string,bookmark_count:bigint,community_note:string,conversation_id:string,creation_date:string,expanded_url:string,extended_entities:struct<media:array<struct<additional_media_info:struct<monetizable:boolean>,display_url:string,expanded_url:string,ext_media_availability:struct<status:string>,features:struct<large:struct<faces:array<struct<h:bigint,w:bigint,x:bigint,y:bigint>>>,medium:struct<faces:array<struct<h:bigint,w:bigint,x:bigint,y:bigint>>>,orig:struct<faces:array<struct<h:bigint,w:bigint,x:bigint,y:bigint>>>,small:struct<faces:array<struct<h:bigint,w:bigint,x:bigint,y:bigint>>>>,id_str:string,indices:array<bigint>,media_key:string,media_url_https:string,original_info:struct<focus_rects:array<struct<h:bigint,w:bigint,x:bigint,y:bigint>>,height:bigint,width:bigint>,sizes:struct<large:struct<h:bigint,resize:string,w:bigint>,medium:struct<h:bigint,resize:string,w:bigint>,small:struct<h:bigint,resize:string,w:bigin

In [9]:
# #Refine raw data red from Kafka topic
# refined_tweets = json_tweets\
#         .select(from_json("value", json_schema)\
#         .alias("data"))\
#         .where("data.lang='en'and data.created_at is not null and data.text is not null")\
#         .select("data.text",
#                 from_unixtime(col("data.timestamp_ms")/1000,'yyyy-MM-dd HH:mm:ss').alias("timestamp_ms")) #Translate milliseconds to UTC timestamp
# refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', r'http\S+', ''))
# refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', '@\w+', ''))
# refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', '#', ''))
# refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', 'RT', ''))
# refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', ':', ''))

In [10]:
# refined_tweets = json_tweets\
#     .select(from_json("value", json_schema).alias("data"))\
#     .selectExpr("data.results as results")\
#     .withColumn("result", explode("results"))\
#     .where("result.language = 'en' and result.creation_date is not null and result.text is not null")\
#     .select("result.text",
#             from_unixtime(col("result.timestamp")/1000, 'yyyy-MM-dd HH:mm:ss').alias("timestamp_ms"))
# refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', r'http\S+', ''))
# refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', '@\w+', ''))
# refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', '#', ''))
# refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', 'RT', ''))
# refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', ':', ''))

<span style="color: blue; font-size:20px; font-weight:bold ">Tiền Xử Lý Dữ Liệu</span>

In [11]:
# Làm sạch và chuẩn hóa dữ liệu
refined_tweets = json_tweets\
    .select(from_json("value", json_schema).alias("data"))\
    .select("data.results")\
    .selectExpr("explode(results) as tweet")\
    .select("tweet.*")\
    .where("creation_date is not null and text is not null")\
    .select(
        col("text"),
        from_unixtime(col("timestamp")/1000, 'yyyy-MM-dd HH:mm:ss').alias("timestamp_ms")
    )
# Làm sạch văn bản
refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', r'http\S+', ''))
refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', '@\w+', ''))
refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', '#', ''))
refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', 'RT', ''))
refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', ':', ''))


24/11/21 02:36:18 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Chuyển đổi dữ liệu JSON từ Kafka thành cấu trúc dữ liệu định nghĩa bởi schema.

Lọc các tweet bằng cách chọn các tweet có ngôn ngữ là tiếng Anh và có thời gian tạo và nội dung không null.

Loại bỏ các liên kết, tài khoản, hashtag, ký hiệu RT và dấu : khỏi nội dung tweet để làm sạch dữ liệu.

<span style="color: blue; font-size:20px; font-weight:bold ">Tải Mô Hình và Dự Đoán</span>

In [12]:
import os

# Định nghĩa thư mục
dir = "sentiment/"

# Kiểm tra xem thư mục có tồn tại không
if not os.path.exists(dir):
    os.makedirs(dir)
    print(f"Thư mục {dir} đã được tạo.")

# Tải mô hình từ thư mục
model = PipelineModel.load(dir)

                                                                                

Tải mô hình phân tích cảm xúc từ thư mục sentiment

In [13]:
def process_row(df, epoch_id):
    """Applies model to the df and writes data to MongoDB

    Parameters
    ----------
    df : DataFrame
        Streaming Dataframe
    epoch_id : int
        Unique id for each micro batch/epoch
    """
    predictions = model.transform(df)
    predictions.show()
    predictions.select("timestamp_ms","text","prediction").write.format("mongo").mode("append").save()

Định nghĩa hàm process_row để áp dụng mô hình phân tích cảm xúc lên các dữ liệu stream và lưu kết quả dự đoán vào MongoDB.

<span style="color: blue; font-size:20px; font-weight:bold ">Ghi Kết Quả và Khởi Động Stream</span>

In [None]:
#Writes streaming dataframe to ForeachBatch console which ingests data to MongoDB
refined_tweets \
    .writeStream \
    .option("checkpointLocation", "checkpoint/data") \
    .foreachBatch(process_row).start().awaitTermination()

24/11/21 02:36:30 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
                                                                                

+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|                text|       timestamp_ms|      reviewTokensUf|        reviewTokens|                  cv|            features|       rawPrediction|         probability|prediction|
+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|Abe Uta fights ba...|1970-01-20 22:24:01|[abe, uta, fights...|[abe, uta, fights...|(71899,[21,1784,3...|(71899,[21,1784,3...|[0.01983008209077...|[0.50495735807437...|       0.0|
|JudoJuniors 🇹🇯 ...|1970-01-21 01:07:11|[judojuniors, 🇹?...|[judojuniors, 🇹?...|(71899,[687,1784,...|(71899,[687,1784,...|[-0.1641730088018...|[0.45904868581038...|       1.0|
|JudoJuniors 🇹🇯 ...|1970-01-21 01:07:11|[judojuniors, 🇹?...|[judojuniors, 🇹?...|(71899,[687,1784,...|(71

24/11/21 02:36:32 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
                                                                                

In [None]:
# Đọc dữ liệu từ MongoDB
df = spark.read \
    .format("mongo") \
    .option("uri", "mongodb://intent-mongo-1:27017/twitter_db.tweets") \
    .load()

# Hiển thị một số hàng dữ liệu để kiểm tra
df.show()


<span style="color: red; font-size:30px; font-weight:bold ">Mục đích</span>

### File streamlistener xử lý dữ liệu stream từ Kafka, tiền xử lý văn bản, áp dụng mô hình phân tích cảm xúc và lưu kết quả vào MongoDB. Nó cho phép liên tục theo dõi và phân tích các tweet theo thời gian thực.