# Import thư viện

In [1]:
# Import libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
import time
import re
import os
import random

In [2]:
# Create Spark Session
spark = SparkSession.builder \
    .appName("Support Vector Machines") \
    .master("local[*]") \
    .getOrCreate()

# Tiền xử lý

In [3]:
# Preprocessing CSV file
def split_csv(line):
    columns = line.split(",")
    if len(columns) > 4:
        columns[3] = ",".join(columns[3:])
    return columns[:4]

In [4]:
# Cleaning text
def clean_text(text):
    # Remove URLs using a comprehensive regex
    text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text)

    # Remove excessive punctuation (multiple consecutive special characters)
    text = re.sub(r'\?{2,}', ' ', text)

    # Remove numbers
    text = re.sub(r'\d+', '', text)

    # Keep mentions and hashtags, remove other special characters
    text = re.sub(r'[^\w\s@#]', '', text)

    # Convert to lowercase
    text = text.lower()

    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text).strip()

    return text

# Đọc dữ liệu

In [5]:
# Random số từ 1-10
input_num = random.randint(1, 10)
input_path = os.path.join(os.getcwd(), f"input/spark_input_{input_num}/tweets.csv")
print("Đường dẫn file:", input_path)

Đường dẫn file: /home/jovyan/input/spark_input_10/tweets.csv


In [6]:
# Read data
rdd = spark.sparkContext.textFile(input_path) \
    .map(split_csv) \
    .map(lambda columns: (
        float(columns[1]),  # Sentiment
        clean_text(columns[3])  # Tweet text cleaned
    ))

# Convert RDD to DataFrame
from pyspark.sql.types import StructType, StructField, DoubleType, StringType
schema = StructType([
    StructField("label", DoubleType(), False),
    StructField("tweet", StringType(), False)
])

df = spark.createDataFrame(rdd, schema)
    
# In ra một số thông tin về DataFrame để kiểm tra
print("\nThông tin về DataFrame:")
print("- Số lượng dòng:", df.count())
print("- Schema:")
df.printSchema()

print("\nMẫu 5 dòng đầu tiên:")
df.show(5, truncate=False)

# Thống kê phân bố nhãn
print("\nPhân bố nhãn sentiment:")
df.groupBy("label").count().orderBy("label").show()



Thông tin về DataFrame:
- Số lượng dòng: 1000000
- Schema:
root
 |-- label: double (nullable = false)
 |-- tweet: string (nullable = false)


Mẫu 5 dòng đầu tiên:
+-----+-----------------------------------------------------------------------------------------------------+
|label|tweet                                                                                                |
+-----+-----------------------------------------------------------------------------------------------------+
|0.0  |is so sad for my apl friend                                                                          |
|0.0  |i missed the new moon trailer                                                                        |
|1.0  |omg its already o                                                                                    |
|0.0  |omgaga im sooo im gunna cry ive been at this dentist since i was suposed just get a crown put on mins|
|0.0  |i think mi bf is cheating on me t_t                        

In [7]:
# Áp dụng TF-IDF
# 1. Tokenization
tokenizer = Tokenizer(inputCol="tweet", outputCol="words")
words_data = tokenizer.transform(df)

# 2. TF (Term Frequency)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
featurized_data = hashingTF.transform(words_data)

# 3. IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(featurized_data)
rescaled_data = idf_model.transform(featurized_data)

In [8]:
rescaled_data.show(20, truncate=False)

+-----+------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [9]:
# Check empty tweets
rescaled_data.filter(rescaled_data["tweet"] == "").show(10, truncate=False)
# Check null words
rescaled_data.filter(rescaled_data["words"].isNull()).count()

+-----+-----+-----+-----------+--------+
|label|tweet|words|rawFeatures|features|
+-----+-----+-----+-----------+--------+
+-----+-----+-----+-----------+--------+



0

In [10]:
# Chia training/test
test_size = 0.25

training_data, test_data = rescaled_data.randomSplit([1 - test_size, test_size], seed=1234)

In [11]:
# Huấn luyện mô hình SVM
start_time = time.time()

lsvc = LinearSVC(maxIter=10, regParam=0.1)
model = lsvc.fit(training_data)

# Dự đoán
predictions = model.transform(test_data)

In [12]:
# Đánh giá
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="accuracy"
)

accuracy = evaluator.evaluate(predictions)

# Tính F1-score
prediction_and_labels = predictions.select("prediction", "label").rdd.map(
    lambda row: (float(row['prediction']), float(row['label']))
)
metrics = MulticlassMetrics(prediction_and_labels)

# In kết quả
print("Confusion Matrix:")
print(metrics.confusionMatrix())
print(f"Accuracy: {accuracy}")
print(f"F1 Score: {metrics.weightedFMeasure()}")
print(f"Training time: {time.time() - start_time:.2f} seconds")



Confusion Matrix:
DenseMatrix([[ 84789.,  32107.],
             [ 29027., 103880.]])
Accuracy: 0.7552711536690913
F1 Score: 0.7550391935829374
Training time: 75.82 seconds


In [13]:
predictions.printSchema()

root
 |-- label: double (nullable = false)
 |-- tweet: string (nullable = false)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rawFeatures: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [14]:
# Display the first few rows of predictions
predictions.select('label', 'tweet', 'prediction').show(20, truncate=False)

+-----+--------------------------------------------------------------------------------------------------------------------------------------+----------+
|label|tweet                                                                                                                                 |prediction|
+-----+--------------------------------------------------------------------------------------------------------------------------------------+----------+
|0.0  |# @rockchic aw poor you hope its not oinkflu                                                                                          |0.0       |
|0.0  |# cough # runny nose or stuffy nose # sore throat # body aches # headache more and i would have flu                                   |0.0       |
|0.0  |# todays weather sucks and its perfect close pleaseeee i hope they dont keep my on call                                               |0.0       |
|0.0  |###@ i hate the dentist i dont want to go                            

In [15]:
# Dừng Spark Session
spark.stop()