# Decision Tree

In [2]:
# thêm lớp SparkSession từ gói pyspark.sql dể sử dụng các chức năng như tạo dataframe, thao tác trên dữ liệu
from pyspark.sql import SparkSession

# tạo một đối tượng SparkSession bằng phương thức "builder", chạy trên một luồng cục bộ
# ứng dụng được đặt tên là "SparkMLLib_DecisionTree"
spark = SparkSession.builder.master("local[*]").appName("SparkMLLib_DecisionTree").getOrCreate()

24/05/20 23:49:18 WARN Utils: Your hostname, godwolf-2-7 resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
24/05/20 23:49:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/20 23:49:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# chỉ định "print" trong file sẽ được sử dụng theo Python3.x
from __future__ import print_function

# thêm các lớp Pipeline, DecisionTreeClassifier, StringIndexer, VectorIndexer, MulticlassClassificationEvaluator từ gói pyspark.ml 
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# tạo 1 dataframe từ dữ liệu trong tệp tin có định dạng LibSVM
data = spark.read.format("libsvm").load("/home/godwolf/Downloads/spark-3.5.1-bin-hadoop3/python/TH4/TH6_Attachments/sample_libsvm_data.txt")

# hiển thị một số hàng đầu tiên của dataframe
data.show()

# đếm số lượng hàng trong dataframe
data.count()

24/05/20 23:49:27 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan.
                                                                                

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[152,153,154...|
|  1.0|(692,[97,98,99,12...|
|  1.0|(692,[124,125,126...|
+-----+--------------------+
only showing top 20 rows



100

In [4]:
# tạo đối tượng "StringIndexer" để chuyển đổi cột label từ chuỗi thành số nguyên, cột đàu ra được đặt tên là indexedLabel 
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# tạo đối tượng "VectorIndexer" để xác định và chuyển đổi cột featuré thành dạng số nguyên, cột đầu ra được đặt tên là indexedFeatures
# maxCategories để xác định rằng nếu một đặc trưng ít hơn 4 danh mục thì sẽ được xem là 1 đặc trưng và được đổi thành số nguyên
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

                                                                                

In [5]:
data.show()

data.count()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[152,153,154...|
|  1.0|(692,[97,98,99,12...|
|  1.0|(692,[124,125,126...|
+-----+--------------------+
only showing top 20 rows



100

In [6]:
# chia dữ liệu thành 2 phần: tập huấn luyện và tâp kiểm tra. 70% giữ để huấn luyện và 30% giữ để kiểm tra
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# tạo đối tượng DecisionTreeClassifier để định nghĩa mô hình cây quyết định với giá trị đầu vào dự đoán là indexedLabel, indexedFeatures
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# tạo đối tượng Pipeline xử lý dữ liệu và mô hình hóa 
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# huấn luyện mô hình trên dữ liệu huấn luyện trainingData, hàm fit() sẽ thực hiện các công đoạn trong Pineline
model = pipeline.fit(trainingData)


[Stage 13:>                                                         (0 + 1) / 1]

                                                                                

In [7]:
# sử dụng mô hình đã được huấn luyện để thực hiện dự đoán trên dữ liệu kiểm tra testData
predictions = model.transform(testData)

# chọn 3 cột "prediction", "indexedLabel", "features" từ dataframe và hiển thị các giá trị của 3 cột
predictions.select("prediction", "indexedLabel", "features").show()

# tạo đối tượng MulticlassClassificationEvaluator để đánh giá hiệu suất của mô hình
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

# tính toán độ chính xác của mô hình
accuracy = evaluator.evaluate(predictions)

# hiển thị độ lỗi trên tập dữ liệu 
print("Test Error = %g " % (1.0 - accuracy))

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       1.0|         1.0|(692,[98,99,100,1...|
|       1.0|         1.0|(692,[121,122,123...|
|       1.0|         1.0|(692,[123,124,125...|
|       1.0|         1.0|(692,[124,125,126...|
|       1.0|         1.0|(692,[126,127,128...|
|       1.0|         1.0|(692,[126,127,128...|
|       1.0|         1.0|(692,[126,127,128...|
|       1.0|         1.0|(692,[151,152,153...|
|       1.0|         1.0|(692,[153,154,155...|
|       0.0|         1.0|(692,[154,155,156...|
|       1.0|         1.0|(692,[155,156,180...|
|       1.0|         1.0|(692,[234,235,237...|
|       0.0|         0.0|(692,[100,101,102...|
|       0.0|         0.0|(692,[123,124,125...|
|       0.0|         0.0|(692,[124,125,126...|
|       0.0|         0.0|(692,[125,126,127...|
|       0.0|         0.0|(692,[125,126,153...|
|       0.0|         0.0|(692,[126,127,128...|
|       0.0| 

# =================================

# Kmeans

In [8]:
spark_1 = SparkSession.builder.master("local[*]").appName("SparkMLLib_KMeans").getOrCreate()

24/05/21 00:15:46 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [9]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Loads data.
dataset_1 = spark_1.read.format("libsvm").load("/home/godwolf/Downloads/spark-3.5.1-bin-hadoop3/python/TH4/TH6_Attachments/sample_kmeans_data.txt")

dataset_1.show()

dataset_1.count()

24/05/21 00:18:28 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan.


+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|           (3,[],[])|
|  1.0|(3,[0,1,2],[0.1,0...|
|  2.0|(3,[0,1,2],[0.2,0...|
|  3.0|(3,[0,1,2],[9.0,9...|
|  4.0|(3,[0,1,2],[9.1,9...|
|  5.0|(3,[0,1,2],[9.2,9...|
+-----+--------------------+



6

In [14]:
# tạo đối tượng KMeans, thiết lập số lượng cụm cần phân là 2, thiết lập seed=1 để đảm bảo tính nhất quán của kết quả
kmeans = KMeans().setK(2).setSeed(1)

# huấn luyện mô hình trên dữ liệu dataset_1
model = kmeans.fit(dataset_1)

In [16]:
# sử dụng mô hình kmean đã huấn luyện để dự đoán cụm cho dữ liệ
predictions = model.transform(dataset_1)

# đánh giá chất lượng việc phân cụm
evaluator = ClusteringEvaluator()

# tính toán chỉ số silhouette để đánh giá chát lượng của việc phân cụm
silhouette = evaluator.evaluate(predictions)

# in ra đoạn text với giá trị của chỉ số silhouette
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.9997530305375207


In [17]:
# hiển thị các trung tâm của các cụm
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[9.1 9.1 9.1]
[0.1 0.1 0.1]


# =================================

# NaiveBayes

In [18]:
spark_2 = SparkSession.builder.master("local[*]").appName("SparkMLLib_NaiveBayes").getOrCreate()

24/05/21 00:31:26 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [21]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load training data
data = spark.read.format("libsvm").load("/home/godwolf/Downloads/spark-3.5.1-bin-hadoop3/python/TH4/TH6_Attachments/sample_libsvm_data.txt")

data.show()
data.count()

24/05/21 00:35:01 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan.


+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[152,153,154...|
|  1.0|(692,[97,98,99,12...|
|  1.0|(692,[124,125,126...|
+-----+--------------------+
only showing top 20 rows



100

In [22]:
# chia dữ liệu thành 2 tập dữ liệu: tập huân luyện (60%) và tập kiểm tra (40%)
splits = data.randomSplit([0.6, 0.4], 1234)

# gán tập dữ liêu đầu tiên trong danh sách splits cho biến train
train = splits[0]

# gán tập dữ liệu thứ hai trong danh sách splits cho biến test
test = splits[1]

# định nghĩa mô hình NaiveBayes
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# huấn luyện mô hình trên tập dữ liệu huấn luyện
model = nb.fit(train)

                                                                                

In [23]:
# sử dụng mô hình naivebayes đã được huấn luyện để dự đoán cho dữ liệu trong tập kiểm tra
predictions = model.transform(test)

# hiển thị kết quả sau khi dự đoán
predictions.show()

# đánh giá hiệu suất của mô hình
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# tính toán độ chính xác của mô hình
accuracy = evaluator.evaluate(predictions)

# in ra đoạn text và chỉ số độ chính xác của mô hình
print("Test set accuracy = " + str(accuracy))

+-----+--------------------+--------------------+-----------+----------+
|label|            features|       rawPrediction|probability|prediction|
+-----+--------------------+--------------------+-----------+----------+
|  0.0|(692,[95,96,97,12...|[-172664.79564650...|  [1.0,0.0]|       0.0|
|  0.0|(692,[98,99,100,1...|[-176279.15054306...|  [1.0,0.0]|       0.0|
|  0.0|(692,[122,123,124...|[-189600.55409526...|  [1.0,0.0]|       0.0|
|  0.0|(692,[124,125,126...|[-274673.88337431...|  [1.0,0.0]|       0.0|
|  0.0|(692,[124,125,126...|[-183393.03869049...|  [1.0,0.0]|       0.0|
|  0.0|(692,[125,126,127...|[-256992.48807619...|  [1.0,0.0]|       0.0|
|  0.0|(692,[126,127,128...|[-210411.53649773...|  [1.0,0.0]|       0.0|
|  0.0|(692,[127,128,129...|[-170627.63616681...|  [1.0,0.0]|       0.0|
|  0.0|(692,[127,128,129...|[-212157.96750469...|  [1.0,0.0]|       0.0|
|  0.0|(692,[127,128,129...|[-183253.80108550...|  [1.0,0.0]|       0.0|
|  0.0|(692,[128,129,130...|[-246528.93739632...|  

# =================================

# TFIDF

In [25]:
spark_3 = SparkSession.builder.master("local[*]").appName("SparkMLLib_TFIDF").getOrCreate()

In [26]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

In [27]:
# tạo 1 dataframe từ dữ liệu văn bản
sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

In [31]:
# tạo đối tượng tokenizer với giá trị đầu vào là giá trị của cột sentence và dữ liệu đầu ra sẽ được đưa vào cột words
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

# sử dụng tokenizer đã được tạo để biến đổi dữ liệu văn bản trong dataframe sau đó lưu vào wordsData
wordsData = tokenizer.transform(sentenceData)

# hiển thị kết quả là dữ liệu ban đầu với thêm cột words chứa các từ đã được tách ra 
wordsData.show()

+-----+--------------------+--------------------+
|label|            sentence|               words|
+-----+--------------------+--------------------+
|  0.0|Hi I heard about ...|[hi, i, heard, ab...|
|  0.0|I wish Java could...|[i, wish, java, c...|
|  1.0|Logistic regressi...|[logistic, regres...|
+-----+--------------------+--------------------+



In [32]:
# tạo đối tượng HashingTF với giá trị đầu vào là giá trị trong cột "words" đã được tách chữ, giá trị đầu ra sẽ được lưu vào cột rawFeatures
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)

# sử dụng hashingTF đã được tạo để biến đổi dữ liệu trong wordsData sau đó lưu két quả vào biến featurizedData
featurizedData = hashingTF.transform(wordsData)

# hiển thị kết quả là dữ liệu wordsData với thêm cột mới chứa vector đặc trưng  
featurizedData.show()

+-----+--------------------+--------------------+--------------------+
|label|            sentence|               words|         rawFeatures|
+-----+--------------------+--------------------+--------------------+
|  0.0|Hi I heard about ...|[hi, i, heard, ab...|(20,[6,8,13,16],[...|
|  0.0|I wish Java could...|[i, wish, java, c...|(20,[0,2,7,13,15,...|
|  1.0|Logistic regressi...|[logistic, regres...|(20,[3,4,6,11,19]...|
+-----+--------------------+--------------------+--------------------+



In [33]:
# tạo đối tượng IDF với giá trị đầu vào là giá trị của cột rawFeatures và giá trị đầu ra sẽ lưu vào côt features
idf = IDF(inputCol="rawFeatures", outputCol="features")

# sử dụng đối tượng idf đã được tạo để huấn luyện mô hình IDF trên tập dữ liệu featurizedData 
idfModel = idf.fit(featurizedData)

# sử dụng mô hình idf đã được huấn luyện để biến đổi dữ liệu trong featurizedData
rescaledData = idfModel.transform(featurizedData)

                                                                                

In [34]:
# hiển thị cột label và cột features trong dataframe rescaledData
rescaledData.select("label", "features").show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(20,[6,8,13,16],[...|
|  0.0|(20,[0,2,7,13,15,...|
|  1.0|(20,[3,4,6,11,19]...|
+-----+--------------------+

