In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName('BigData').getOrCreate()

In [8]:
# Đọc dữ liệu từ file csv
train_df = spark.read.csv('Train.csv', header=True, inferSchema=True)
test_df = spark.read.csv('Test.csv', header=True, inferSchema=True)

train_df.show()


+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Superma

In [9]:
# Kiểm tra cấu trúc của dữ liệu
train_df.printSchema()

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: double (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: double (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: double (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: integer (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: double (nullable = true)


In [10]:
# Liệt kế giá trị null của từng đặc trưng
missing_value = train_df.select(
    [count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in train_df.columns]).toPandas().T
missing_value = missing_value.rename(columns={0: 'count'})
print(missing_value)

                           count
Item_Identifier                0
Item_Weight                 1463
Item_Fat_Content               0
Item_Visibility                0
Item_Type                      0
Item_MRP                       0
Outlet_Identifier              0
Outlet_Establishment_Year      0
Outlet_Size                 2410
Outlet_Location_Type           0
Outlet_Type                    0
Item_Outlet_Sales              0


In [11]:
# Tính giá trị xuất hiện nhiều nhất trong tập dữ liệu
mode_train = train_df.groupBy('Outlet_Size').count().orderBy('count', ascending=False).first()[0]

# Thay thế giá trị null bằng giá trị xuất hiện nhiều nhất cho tập train
train_df = train_df.fillna({'Outlet_Size': mode_train})

# Kiểm tra lại giá trị null
mode_test = test_df.groupBy('Outlet_Size').count().orderBy('count', ascending=False).first()[0]

# Thay thế giá trị null bằng giá trị xuất hiện nhiều nhất cho tập test 
test_df = test_df.fillna({'Outlet_Size': mode_test})

# Kiểm tra số lượng giá trị null còn lại của 'Outlet_Size'
missing_train = train_df.filter(col('Outlet_Size').isNull()).count()
missing_test = test_df.filter(col('Outlet_Size').isNull()).count()

print(missing_train, missing_test)

0 0


In [12]:
from pyspark.sql.functions import mean

# Tính giá trị trung bình của 'Item_Weight'
mean_train = train_df.select(mean(col('Item_Weight'))).collect()[0][0]

mean_test = test_df.select(mean(col('Item_Weight'))).collect()[0][0]


# Thay thế giá trị null bằng giá trị trung bình
train_df = train_df.fillna({'Item_Weight': mean_train})

test_df = test_df.fillna({'Item_Weight': mean_test})

# Kiểm tra số lượng giá trị null còn lại của 'Item_Weight'
missing_train = train_df.filter(col('Item_Weight').isNull()).count()
missing_test = test_df.filter(col('Item_Weight').isNull()).count()

print(missing_train, missing_test)

0 0


In [13]:
# Get the column names and their data types
column_data_types = train_df.dtypes

# Separate numeric and categorical columns
num_columns = [column for column, dtype in column_data_types if dtype in ['int', 'double']]
cat_columns = [column for column, dtype in column_data_types if dtype == 'string']

# Create dataframes for numeric and categorical columns
BM_num = train_df.select(num_columns)
BM_cat = train_df.select(cat_columns)

# Print the column names
print(num_columns)
print(cat_columns)

# Print the value counts for each categorical column
for column in cat_columns[1:]:
    train_df.groupBy(column).count().show()

['Item_Weight', 'Item_Visibility', 'Item_MRP', 'Outlet_Establishment_Year', 'Item_Outlet_Sales']
['Item_Identifier', 'Item_Fat_Content', 'Item_Type', 'Outlet_Identifier', 'Outlet_Size', 'Outlet_Location_Type', 'Outlet_Type']
+----------------+-----+
|Item_Fat_Content|count|
+----------------+-----+
|         low fat|  112|
|         Low Fat| 5089|
|              LF|  316|
|         Regular| 2889|
|             reg|  117|
+----------------+-----+
+--------------------+-----+
|           Item_Type|count|
+--------------------+-----+
|       Starchy Foods|  148|
|        Baking Goods|  648|
|              Breads|  251|
|Fruits and Vegeta...| 1232|
|                Meat|  425|
|         Hard Drinks|  214|
|         Soft Drinks|  445|
|           Household|  910|
|           Breakfast|  110|
|               Dairy|  682|
|         Snack Foods| 1200|
|              Others|  169|
|             Seafood|   64|
|              Canned|  649|
|        Frozen Foods|  856|
|  Health and Hygiene|  520|

In [14]:
from pyspark.sql.functions import when, col

# Đổi tên giá trị 'LF', 'low fat' thành 'Low Fat' và 'reg' thành 'Regular'
train_df = train_df.withColumn('Item_Fat_Content',
                               when(col('Item_Fat_Content') == 'LF', 'Low Fat')
                               .when(col('Item_Fat_Content') == 'low fat', 'Low Fat')
                               .when(col('Item_Fat_Content') == 'reg', 'Regular')
                               .otherwise(col('Item_Fat_Content')))

test_df = test_df.withColumn('Item_Fat_Content',
                             when(col('Item_Fat_Content') == 'LF', 'Low Fat')
                             .when(col('Item_Fat_Content') == 'low fat', 'Low Fat')
                             .when(col('Item_Fat_Content') == 'reg', 'Regular')
                             .otherwise(col('Item_Fat_Content')))

# Kiểm tra lại giá trị của 'Item_Fat_Content'
train_df.groupBy('Item_Fat_Content').count().show()

+----------------+-----+
|Item_Fat_Content|count|
+----------------+-----+
|         Low Fat| 5517|
|         Regular| 3006|
+----------------+-----+


In [15]:
from pyspark.sql.functions import lit
from datetime import date

# Tạo cột 'Outlet_Age' với giá trị là hiện tại trừ đi năm thành lập cửa hàng
train_df = train_df.withColumn('Outlet_Age', lit(date.today().year) - col('Outlet_Establishment_Year'))

test_df = test_df.withColumn('Outlet_Age', lit(date.today().year) - col('Outlet_Establishment_Year'))

# Kiểm tra lại giá trị của 'Outlet_Age'
train_df.select('Outlet_Age').show()
test_df.select('Outlet_Age').show()

+----------+
|Outlet_Age|
+----------+
|        24|
|        14|
|        24|
|        25|
|        36|
|        14|
|        36|
|        38|
|        21|
|        16|
|        24|
|        26|
|        24|
|        26|
|        36|
|        26|
|        14|
|        24|
|        38|
|        19|
+----------+
only showing top 20 rows

+----------+
|Outlet_Age|
+----------+
|        24|
|        16|
|        25|
|        16|
|        38|
|        26|
|        14|
|        38|
|        21|
|        16|
|        16|
|        21|
|        38|
|        24|
|        36|
|        24|
|        21|
|        14|
|        38|
|        25|
+----------+


In [16]:
from pyspark.sql.functions import countDistinct

# Tính số lượng giá trị duy nhất của từng cột
unique_values = {column: BM_cat.select(countDistinct(column)).first()[0] for column in BM_cat.columns}

# In ra số lượng giá trị duy nhất của từng cột
for column, unique_count in unique_values.items():
    print(f"{column}: {unique_count}")

Item_Identifier: 1559
Item_Fat_Content: 5
Item_Type: 16
Outlet_Identifier: 10
Outlet_Size: 3
Outlet_Location_Type: 3
Outlet_Type: 4


In [17]:
from pyspark.ml.feature import StringIndexer

# Liệt kê các cột cần được mã hóa (do có giá trị là string, chuỗi)
columns_to_encode = ['Item_Fat_Content', 'Outlet_Size', 'Outlet_Location_Type']

# Áp dụng StringIndexer (biến đổi chuỗi thành số) cho từng cột trong list
for column in columns_to_encode:
    indexer = StringIndexer(inputCol=column, outputCol=column + "_index")
    train_df = indexer.fit(train_df).transform(train_df)
    test_df = indexer.fit(test_df).transform(test_df)

# Hiển thị dữ liệu
train_df.show()

+---------------+------------------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+----------------------+-----------------+--------------------------+
|Item_Identifier|       Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|Outlet_Age|Item_Fat_Content_index|Outlet_Size_index|Outlet_Location_Type_index|
+---------------+------------------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+----------------------+-----------------+--------------------------+
|          FDA15|               9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|       

In [18]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

# Liệt kê các cột cần được mã hóa (do có giá trị là string, chuỗi)
columns_to_encode = ['Item_Type', 'Outlet_Type']

# Áp dụng StringIndexer (biến đổi chuỗi thành số) và OneHotEncoder (biến đổi số thành vector) cho từng cột trong danh sách cột cần được mã hóa

for column in columns_to_encode:
    # StringIndexer
    indexer = StringIndexer(inputCol=column, outputCol=column + "_index")
    train_indexed = indexer.fit(train_df).transform(train_df)
    test_indexed = indexer.fit(test_df).transform(test_df)

    # OneHotEncoder
    encoder = OneHotEncoder(inputCols=[column + "_index"], outputCols=[column + "_vec"])
    train_fe = encoder.fit(train_indexed).transform(train_indexed)
    test_fe = encoder.fit(test_indexed).transform(test_indexed)

In [19]:
# Danh sách các cột sẽ bị xóa
columns_to_drop = ['Item_Identifier', 'Outlet_Identifier', 'Outlet_Establishment_Year', 'Outlet_Type', 'Item_Type',
                   'Item_Fat_Content', 'Outlet_Size', 'Outlet_Location_Type']

# Xoá các cột trong danh sách ở trên
train_fe = train_df.drop(*columns_to_drop)

test_fe = test_df.drop(*columns_to_drop)

train_fe.show()

+------------------+---------------+--------+-----------------+----------+----------------------+-----------------+--------------------------+
|       Item_Weight|Item_Visibility|Item_MRP|Item_Outlet_Sales|Outlet_Age|Item_Fat_Content_index|Outlet_Size_index|Outlet_Location_Type_index|
+------------------+---------------+--------+-----------------+----------+----------------------+-----------------+--------------------------+
|               9.3|    0.016047301|249.8092|         3735.138|        24|                   0.0|              0.0|                       2.0|
|              5.92|    0.019278216| 48.2692|         443.4228|        14|                   1.0|              0.0|                       0.0|
|              17.5|    0.016760075| 141.618|          2097.27|        24|                   0.0|              0.0|                       2.0|
|              19.2|            0.0| 182.095|           732.38|        25|                   1.0|              0.0|                       0.0|

In [20]:
from pyspark.ml.feature import VectorAssembler

# Danh sách các cột đầu vào
input_columns = [column for column in train_fe.columns if column != 'Item_Outlet_Sales']

# Khỏw tạo đối tượng VectorAssembler (tạo vector đầu vào)
assembler = VectorAssembler(inputCols=input_columns, outputCol='features')

# Áp dụng VectorAssembler cho tập train và test
train_data = assembler.transform(train_fe)
test_data = assembler.transform(test_fe)

# Now 'features' column is added to the dataframes

In [21]:
# Chia tập dữ liệu thành tập train và test
X_train, X_test = train_data.randomSplit([0.8, 0.2], seed=0)

In [22]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
def cross_val(model_name, model, X, y, num_folds):
    """
    :param model_name: Tên của mô hình 
    :param model: Mô hình
    :param X: Tập dữ liệu đầu vào
    :param y: Tên cột chứa giá trị mục tiêu
    :param num_folds: Số lượng folds (folds: tập con)
    :return: 
    """
    
    # tạo lưới tham số (lưới tham số: tập các tham số mà mô hình sẽ thử)
    paramGrid = ParamGridBuilder().build()

    # Khởi tạo đối tượng RegressionEvaluator (đánh giá mô hình hồi quy)
    evaluator = RegressionEvaluator(labelCol=y, predictionCol="prediction")

    # Khởi tạo đối tượng CrossValidator (xác thực chéo)
    crossval = CrossValidator(estimator=model,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=num_folds)

    # Fit mô hình với tập dữ liệu
    cv_model = crossval.fit(X)

    # Dự đoán kết quả trên tập dữ liệu
    predictions = cv_model.transform(X)

    # Tính hệ số xác định (R2)
    r2_score = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
    print(f"{model_name} - R2 score: {r2_score:.3f}")
    
    # Tính căn bậc hai của sai số bình phương trung bình (RMSE - Root Mean Squared Error)
    rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
    print(f"{model_name} - RMSE: {rmse:.3f}")
    
    # Tính sai số bình phương trung bình (MSE - Mean Squared Error)
    mse = evaluator.evaluate(predictions, {evaluator.metricName: "mse"})
    print(f"{model_name} - MSE: {mse:.3f}")
    
    # Tính sai số tuyệt đối trung bình (MAE)
    mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
    print(f"{model_name} - MAE: {mae:.3f}")
    
    

In [23]:
from pyspark.ml.regression import LinearRegression


# Khởi tạo đối tượng LinearRegression (hồi quy tuyến tính)
lr = LinearRegression(featuresCol='features', labelCol='Item_Outlet_Sales')

# Huấn luyện mô hình với tập train
lr_model = lr.fit(X_train)  # Use the complete training data with features and target variable

# Dự đoán kết quả trên tập test
predictions = lr_model.transform(X_test)


23/11/11 10:54:31 WARN Instrumentation: [a62c2a71] regParam is zero, which might cause numerical instability and overfitting.
23/11/11 10:54:31 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/11/11 10:54:31 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
23/11/11 10:54:31 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [24]:
# In các hệ số
coefficients = lr_model.coefficients
print("Coefficients: " + str(coefficients))

Coefficients: [-3.0748481941624637,-4323.313787355399,15.56516273349169,21.09510706263851,80.34038963135492,-233.64258709995346,-164.18797958431293]


In [25]:
# Hệ số chặn
intercept = lr_model.intercept
print("Intercept: %s" % str(intercept))

Intercept: 16.256254912483193


In [26]:
# Kiểm tra chép
cross_val('Linear Regression', lr, train_data, 'Item_Outlet_Sales', 3)

23/11/11 10:54:32 WARN Instrumentation: [0475d129] regParam is zero, which might cause numerical instability and overfitting.
23/11/11 10:54:33 WARN Instrumentation: [20a24841] regParam is zero, which might cause numerical instability and overfitting.
23/11/11 10:54:33 WARN Instrumentation: [51e88504] regParam is zero, which might cause numerical instability and overfitting.
23/11/11 10:54:33 WARN Instrumentation: [d2e7bd87] regParam is zero, which might cause numerical instability and overfitting.


Linear Regression - R2 score: 0.355
Linear Regression - RMSE: 1369.939
Linear Regression - MSE: 1876731.663
Linear Regression - MAE: 1017.505


In [27]:
# Thêm côt 'residuals' (sai số) cho tập dữ liệu
predictions = predictions.withColumn('residuals', col('Item_Outlet_Sales') - col('prediction'))

actual_pred_df = predictions.select(['Item_Outlet_Sales', 'prediction', 'residuals'])

actual_pred_df.show()

actual_pred_df.write.csv('predictions.csv', header=True, mode='overwrite')

+-----------------+------------------+-------------------+
|Item_Outlet_Sales|        prediction|          residuals|
+-----------------+------------------+-------------------+
|        1670.4922| 1364.971889055528| 305.52031094447193|
|         1927.491|1357.6735791471706|  569.8174208528294|
|        2048.6666|1794.0348828671165| 254.63171713288352|
|         361.5294|1618.4819392455342|-1256.9525392455344|
|        2253.0672| 2290.250717838041|  -37.1835178380411|
|        1995.4026| 505.3286416408073| 1490.0739583591926|
|          539.298| 627.5258896620741| -88.22788966207406|
|         2956.152|2544.2883363543756| 411.86366364562446|
|        2059.9852|2158.8175134138073| -98.83231341380724|
|        4781.7756|3619.5951916148965| 1162.1804083851034|
|        3231.1274|2004.0505271777552| 1227.0768728222447|
|         561.9352|2031.8796157482388| -1469.944415748239|
|         3588.662|2859.7844501407367|  728.8775498592631|
|         2609.936|2636.1476353566536| -26.2116353566534