In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

In [3]:
sc =SparkContext()

In [4]:
spark = SparkSession(sc)

### Load data

In [5]:
import pandas as pd

In [6]:
df = pd.read_excel('Du lieu cung cap/CCPP/Folds5x2_pp.ods')
df = spark.createDataFrame(df)

df.show(3)

+-----+-----+-------+-----+------+
|   AT|    V|     AP|   RH|    PE|
+-----+-----+-------+-----+------+
|14.96|41.76|1024.07|73.17|463.26|
|25.18|62.96|1020.04|59.08|444.37|
| 5.11| 39.4|1012.16|92.14|488.56|
+-----+-----+-------+-----+------+
only showing top 3 rows



In [7]:
df.count()

9568

### Format data

In [8]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [9]:
df.columns

['AT', 'V', 'AP', 'RH', 'PE']

In [10]:
assembler = VectorAssembler(
            inputCols = ['AT', 'V', 'AP', 'RH'],
            outputCol = 'features') #input

In [11]:
data_pre = assembler.transform(df)

In [12]:
data_pre.show(2)

+-----+-----+-------+-----+------+--------------------+
|   AT|    V|     AP|   RH|    PE|            features|
+-----+-----+-------+-----+------+--------------------+
|14.96|41.76|1024.07|73.17|463.26|[14.96,41.76,1024...|
|25.18|62.96|1020.04|59.08|444.37|[25.18,62.96,1020...|
+-----+-----+-------+-----+------+--------------------+
only showing top 2 rows



In [13]:
data_pre.select("features").show(2,False)

+---------------------------+
|features                   |
+---------------------------+
|[14.96,41.76,1024.07,73.17]|
|[25.18,62.96,1020.04,59.08]|
+---------------------------+
only showing top 2 rows



### Scale data

In [14]:
from pyspark.ml.feature import MinMaxScaler

In [15]:
scaler = MinMaxScaler(inputCol="features", outputCol="ScaledFeatures")
# , withStd=True, withMean=False

In [16]:
final_data = scaler.fit(data_pre).transform(data_pre)

In [17]:
final_data.count()

9568

In [18]:
final_data = final_data.na.drop()
final_data.count()

9568

In [19]:
final_data.show(5)

+-----+-----+-------+-----+------+--------------------+--------------------+
|   AT|    V|     AP|   RH|    PE|            features|      ScaledFeatures|
+-----+-----+-------+-----+------+--------------------+--------------------+
|14.96|41.76|1024.07|73.17|463.26|[14.96,41.76,1024...|[0.37252124645892...|
|25.18|62.96|1020.04|59.08|444.37|[25.18,62.96,1020...|[0.66203966005665...|
| 5.11| 39.4|1012.16|92.14|488.56|[5.11,39.4,1012.1...|[0.09348441926345...|
|20.86|57.32|1010.24|76.64|446.48|[20.86,57.32,1010...|[0.53966005665722...|
|10.82| 37.5|1009.23|96.62| 473.9|[10.82,37.5,1009....|[0.25524079320113...|
+-----+-----+-------+-----+------+--------------------+--------------------+
only showing top 5 rows



In [20]:
final_data = final_data.select('ScaledFeatures','PE')

### Train model

In [21]:
# Chia tập dữ liệu
train_data, test_data = final_data.randomSplit([0.8,0.2])
train_data.describe().show()

+-------+------------------+
|summary|                PE|
+-------+------------------+
|  count|              7687|
|   mean| 454.1700741511647|
| stddev|17.031163025671304|
|    min|            421.57|
|    max|            495.76|
+-------+------------------+



In [22]:
test_data.describe().show()

+-------+------------------+
|summary|                PE|
+-------+------------------+
|  count|              1881|
|   mean|455.16164274322176|
| stddev|17.194243773684697|
|    min|            420.26|
|    max|            495.21|
+-------+------------------+



In [23]:
# Train model
from pyspark.ml.regression import LinearRegression

In [24]:
lr = LinearRegression(featuresCol = "ScaledFeatures",
                      labelCol = "PE",
                      predictionCol = "prediction")

In [25]:
lrModel = lr.fit(train_data)

In [26]:
print("Coefficiens: {} Intercept: {}" .format(lrModel.coefficients, lrModel.intercept))

Coefficiens: [-69.32167095942808,-13.128799893031823,2.7257279763784066,-11.663991490246497] Intercept: 502.2293576277128


### Evaluate model

In [27]:
test_result = lrModel.evaluate(test_data)

In [28]:
test_result.residuals.show(5)

+-------------------+
|          residuals|
+-------------------+
| -5.027167434374746|
|-0.4985496383010286|
|  2.852908279108817|
|  7.116460376241719|
|-1.4745726761236142|
+-------------------+
only showing top 5 rows



In [29]:
print("RMSE: {}" .format(test_result.rootMeanSquaredError))
print("MSE: {}" .format(test_result.meanSquaredError))
print("r2: {}" .format(test_result.r2))

RMSE: 4.408877540754137
MSE: 19.43820116936625
r2: 0.9342159118408833


In [30]:
# Check test dataset
test_model = lrModel.transform(test_data)
#Inspect results
test_model.select("prediction", "PE").show(5)

+------------------+------+
|        prediction|    PE|
+------------------+------+
| 486.3071674343747|481.28|
|483.47854963830105|482.98|
|487.10709172089116|489.96|
| 481.9335396237583|489.05|
|486.38457267612364|484.91|
+------------------+------+
only showing top 5 rows



In [32]:
# Save model
lrModel.save('lrModel_Cau3')

### Predict with new data

In [33]:
from pyspark.ml.regression import LinearRegressionModel
# # Load model
lrModel2 = LinearRegressionModel.load('lrModel_Cau3')

In [34]:
df_new = pd.read_excel('Du lieu cung cap/CCPP/Folds5x2_pp.ods',sheet_name=2)
df_new = spark.createDataFrame(df_new)

df_new.show(3)

+-----+-----+-------+-----+------+
|   AT|    V|     AP|   RH|    PE|
+-----+-----+-------+-----+------+
| 9.44| 40.0|1015.62|81.16|471.32|
|23.49| 49.3|1003.35|77.96|442.76|
| 4.99|39.04|1020.45|78.89|472.52|
+-----+-----+-------+-----+------+
only showing top 3 rows



In [35]:
# Prepare data
data_pre_new = assembler.transform(df_new)
# Scale data
final_data_new = scaler.fit(data_pre).transform(data_pre)
final_data_new = final_data.select('ScaledFeatures','PE')

In [36]:
result = lrModel.transform(final_data_new)
#Inspect results
result.select("prediction", "PE").show(5)

+------------------+------+
|        prediction|    PE|
+------------------+------+
| 467.2335268609262|463.26|
| 444.1423194680943|444.37|
| 483.3587598977769|488.56|
|450.53683014847707|446.48|
|471.69129526017514| 473.9|
+------------------+------+
only showing top 5 rows

