In [1]:
# Sử dụng thư viện findspark để tìm môi trường spark
import findspark
findspark.init()

In [2]:
# import các thư viện cần thiết
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.streaming import StreamingContext
import pyspark.sql.types as tp
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row, Column
import sys

In [3]:
# Khởi tạo spark
sc = SparkContext(appName="PySpark")
spark = SparkSession(sc)

# Đọc dữ liệu

In [4]:
# Khai báo cấu trúc của dữ liệu
my_schema = tp.StructType([
    tp.StructField(name='id', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='label', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='tweet', dataType=tp.StringType(), nullable=True)
])


In [5]:
# Đọc file dữ liệu
my_data = spark.read.csv('twitter_sentiments.csv', schema=my_schema, header=True)
my_data.show(2)

+---+-----+--------------------+
| id|label|               tweet|
+---+-----+--------------------+
|  1|    0| @user when a fat...|
|  2|    0|@user @user thank...|
+---+-----+--------------------+
only showing top 2 rows



In [6]:
my_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- tweet: string (nullable = true)



# Tạo các bước xử lý dữ liệu

In [7]:
# Tạo các bước xử lý dữ liệu
# Tách các từ trong câu để tạo thành các token
stage_1 = RegexTokenizer(inputCol='tweet', outputCol='tokens', pattern='\\W')
# Loại bỏ stopwords
stage_2 = StopWordsRemover(inputCol='tokens', outputCol='filtered_words')
# Tạo vector
stage_3 = Word2Vec(inputCol='filtered_words',
                   outputCol='vector', vectorSize=100)



# Khởi tạo model

In [8]:
# Khởi tạo model
model = LogisticRegression(featuresCol='vector', labelCol='label')

In [9]:
# Khởi tạo pipeline
pipeline = Pipeline(stages=[stage_1, stage_2, stage_3, model])
# fit pipeline
pipelineFit = pipeline.fit(my_data)


In [10]:
# Dự đoán dữ liệu
pred = pipelineFit.transform(my_data)
print(pred.printSchema())

root
 |-- id: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- tweet: string (nullable = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- vector: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

None


In [11]:
# Đánh giá độ chính xác của model
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "accuracy")
accuracy = evaluator.evaluate(pred)
print("Accuracy: ", accuracy)

Accuracy:  0.9404292597459483


# Streaming

In [12]:
# Khởi tạo hàm đánh giá các tweet nhận được khi streaming
def get_prediction(tweet_text):
	try:
		# Lọc những tweet trống
		tweet_text = tweet_text.filter(lambda x: len(x) > 0)
		# Khởi dataframe với tên cột là 'tweet' và các dòng sẽ chứa các tweet
		rowRdd = tweet_text.map(lambda w: Row(tweet=w))
		#Tạo spark dataframe
		wordsDataFrame = spark.createDataFrame(rowRdd)
		# Dự đoán dữ liệu nhận được bằng pipeline của model trước đó
		result  = pipelineFit.transform(wordsDataFrame).select('tweet','prediction')
		# show kết quả dự đoán
		result.show()
		# Lưu kết quả vào thư mục prediction với định dạng csv
		result.repartition(1).write.csv(path="prediction", mode="append", header=True)
	except: 
		# Nếu không nhận được dữ liệu khi streaming hoặc có lỗi xảy ra
		print('No data')

In [13]:
# Khởi tại StreamingContext với thời gian gom dữ liệu là 10 giây/1 lần
ssc = StreamingContext(sc, batchDuration=10)
# Đọc dữ liệu truyền đến từ server có hostname 'localhost' và cổng 12345
lines = ssc.socketTextStream('localhost', 12345)
# Dự đoán từng dòng dữ liệu đọc được từ server
lines.foreachRDD(get_prediction)
# Bắt đầu việc tính toán
ssc.start()
# Chờ lệnh ngắt streaming
ssc.awaitTermination()
# 0 is pos
# 1 is neg

+--------------------+----------+
|               tweet|prediction|
+--------------------+----------+
| @user when a fat...|       0.0|
|@user @user thank...|       0.0|
|  bihday your maj...|       0.0|
+--------------------+----------+

No data
+--------------------+----------+
|               tweet|prediction|
+--------------------+----------+
|#model   i love u...|       0.0|
| factsguide: soci...|       0.0|
|[2/2] huge fan fa...|       0.0|
| @user camping to...|       0.0|
+--------------------+----------+

+--------------------+----------+
|               tweet|prediction|
+--------------------+----------+
|the next school y...|       0.0|
|we won!!! love th...|       0.0|
| @user @user welc...|       0.0|
| â #ireland con...|       0.0|
|we are so selfish...|       0.0|
+--------------------+----------+

+--------------------+----------+
|               tweet|prediction|
+--------------------+----------+
|i get to see my d...|       0.0|
|@user #cnn calls ...|       0.0|
|no