In [1]:
!pip install pyspark



In [2]:
#Kết nối gg drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Khởi tạo SparkSession
spark = SparkSession.builder.appName("MilkQualityPrediction").getOrCreate()

print("SparkSession đã được khởi tạo thành công!")


SparkSession đã được khởi tạo thành công!


In [5]:
from pyspark.ml.feature import VectorAssembler, StringIndexer

# Tải dữ liệu từ file CSV
file_path = "/content/drive/MyDrive/Sử dụng spark dự doán chất lượng sữa/milknew.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Hiển thị 5 dòng đầu tiên của dữ liệu
print("DataFrame ban đầu:")
df.show(5)

# Đổi tên các cột để dễ làm việc
df = df.withColumnRenamed("Temprature", "Temperature") \
       .withColumnRenamed("Fat ", "Fat")

# Biến đổi cột mục tiêu 'Grade' từ chuỗi thành số
string_indexer = StringIndexer(inputCol="Grade", outputCol="label")
indexed_df = string_indexer.fit(df).transform(df)

# Xây dựng vector đặc trưng từ các cột đầu vào
feature_cols = ['pH', 'Temperature', 'Taste', 'Odor', 'Fat', 'Turbidity', 'Colour']
vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
final_df = vector_assembler.transform(indexed_df)

# Hiển thị DataFrame sau khi tiền xử lý
print("\nDataFrame sau khi tiền xử lý:")
final_df.show(5)


DataFrame ban đầu:
+---+----------+-----+----+----+---------+------+------+
| pH|Temprature|Taste|Odor|Fat |Turbidity|Colour| Grade|
+---+----------+-----+----+----+---------+------+------+
|6.6|        35|    1|   0|   1|        0|   254|  high|
|6.6|        36|    0|   1|   0|        1|   253|  high|
|8.5|        70|    1|   1|   1|        1|   246|   low|
|9.5|        34|    1|   1|   0|        1|   255|   low|
|6.6|        37|    0|   0|   0|        0|   255|medium|
+---+----------+-----+----+----+---------+------+------+
only showing top 5 rows


DataFrame sau khi tiền xử lý:
+---+-----------+-----+----+---+---------+------+------+-----+--------------------+
| pH|Temperature|Taste|Odor|Fat|Turbidity|Colour| Grade|label|            features|
+---+-----------+-----+----+---+---------+------+------+-----+--------------------+
|6.6|         35|    1|   0|  1|        0|   254|  high|  2.0|[6.6,35.0,1.0,0.0...|
|6.6|         36|    0|   1|  0|        1|   253|  high|  2.0|[6.6,36.0,0.0,

In [8]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression # Import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator # Import ParamGridBuilder and CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator # Import MulticlassClassificationEvaluator


# Chia dữ liệu thành tập huấn luyện và kiểm tra
(training_data, test_data) = final_df.randomSplit([0.8, 0.2], seed=123)

# Khởi tạo mô hình Logistic Regression
lr = LogisticRegression(labelCol="label", featuresCol="features")

# Xây dựng lưới siêu tham số (Hyperparameter Grid)
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Tạo trình đánh giá (Evaluator) với độ chính xác (accuracy)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# Thiết lập CrossValidator
cross_val = CrossValidator(estimator=lr,
                           estimatorParamMaps=param_grid,
                           evaluator=evaluator,
                           numFolds=3) # Sử dụng 3 folds để kiểm tra chéo

# Huấn luyện mô hình với kiểm tra chéo
cv_model = cross_val.fit(training_data)

# Lấy mô hình tốt nhất từ kết quả kiểm tra chéo
best_model = cv_model.bestModel

# Đánh giá mô hình tốt nhất trên tập dữ liệu kiểm tra
predictions = best_model.transform(test_data)
accuracy = evaluator.evaluate(predictions)

# In kết quả
print(f"Siêu tham số tốt nhất: regParam={best_model.getRegParam()}, elasticNetParam={best_model.getElasticNetParam()}")
print(f"Độ chính xác (Accuracy) trên tập kiểm tra: {accuracy:.4f}")

Siêu tham số tốt nhất: regParam=0.01, elasticNetParam=0.5
Độ chính xác (Accuracy) trên tập kiểm tra: 0.8670
