In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('linear_regression_adv').getOrCreate()

from pyspark.ml.regression import LinearRegression

In [2]:
df = spark.read.csv("aoti_processed_file.csv",inferSchema=True,header=True)

In [3]:
df.printSchema()

root
 |-- No: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- PM2.5: double (nullable = true)
 |-- PM10: double (nullable = true)
 |-- SO2: double (nullable = true)
 |-- NO2: double (nullable = true)
 |-- CO: double (nullable = true)
 |-- O3: double (nullable = true)
 |-- PM2.5_lv: double (nullable = true)
 |-- PM10_lv: double (nullable = true)
 |-- SO2_lv: double (nullable = true)
 |-- NO2_lv: double (nullable = true)
 |-- CO_lv: double (nullable = true)
 |-- O3_lv: double (nullable = true)



In [4]:
df = df.withColumnRenamed("PM2.5","PM25")
df.printSchema()
PM25 = df.select("PM25")
PM25.describe().show()

root
 |-- No: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- PM25: double (nullable = true)
 |-- PM10: double (nullable = true)
 |-- SO2: double (nullable = true)
 |-- NO2: double (nullable = true)
 |-- CO: double (nullable = true)
 |-- O3: double (nullable = true)
 |-- PM2.5_lv: double (nullable = true)
 |-- PM10_lv: double (nullable = true)
 |-- SO2_lv: double (nullable = true)
 |-- NO2_lv: double (nullable = true)
 |-- CO_lv: double (nullable = true)
 |-- O3_lv: double (nullable = true)

+-------+-----------------+
|summary|             PM25|
+-------+-----------------+
|  count|            28894|
|   mean|54.04170696773637|
| stddev|50.19011249088544|
|    min|              3.0|
|    max|            252.0|
+-------+-----------------+



In [5]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [6]:
assembler = VectorAssembler(
    inputCols=["PM10","SO2","NO2","CO","O3"],
    outputCol="features")
output = assembler.transform(df)
output.printSchema()

root
 |-- No: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- PM25: double (nullable = true)
 |-- PM10: double (nullable = true)
 |-- SO2: double (nullable = true)
 |-- NO2: double (nullable = true)
 |-- CO: double (nullable = true)
 |-- O3: double (nullable = true)
 |-- PM2.5_lv: double (nullable = true)
 |-- PM10_lv: double (nullable = true)
 |-- SO2_lv: double (nullable = true)
 |-- NO2_lv: double (nullable = true)
 |-- CO_lv: double (nullable = true)
 |-- O3_lv: double (nullable = true)
 |-- features: vector (nullable = true)



In [7]:
final_data = output.select("features","PM25")
final_data.show()

+--------------------+----+
|            features|PM25|
+--------------------+----+
|[4.0,4.0,7.0,300....| 4.0|
|[8.0,4.0,7.0,300....| 8.0|
|[7.0,5.0,10.0,300...| 7.0|
|[6.0,11.0,11.0,30...| 6.0|
|[3.0,12.0,12.0,30...| 3.0|
|[5.0,18.0,18.0,40...| 5.0|
|[3.0,18.0,32.0,50...| 3.0|
|[6.0,19.0,41.0,50...| 3.0|
|[6.0,16.0,43.0,50...| 3.0|
|[8.0,12.0,28.0,40...| 3.0|
|[6.0,9.0,12.0,400...| 3.0|
|[6.0,9.0,14.0,400...| 3.0|
|[6.0,7.0,13.0,300...| 3.0|
|[6.0,7.0,12.0,400...| 3.0|
|[9.0,7.0,11.0,400...| 6.0|
|[15.0,7.0,14.0,40...| 8.0|
|[19.0,9.0,13.0,40...| 9.0|
|[23.0,11.0,15.0,4...|10.0|
|[20.0,8.0,20.0,50...|11.0|
|[14.0,12.0,30.0,5...| 8.0|
+--------------------+----+
only showing top 20 rows



In [8]:
lr = lr = LinearRegression(labelCol="PM25")
# Fit the model to the data.
lrModel = lr.fit(final_data)
print("Coefficients: {}".format(str(lrModel.coefficients)))
print('\n')
print("Intercept:{}".format(str(lrModel.intercept)))

Coefficients: [0.48711049881974533,-0.36115157065540504,0.10968736026230073,0.033537979649147266,0.1176260387646341]


Intercept:-20.337780961159506


In [9]:
# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
trainingSummary.residuals.show()

# Print Root Mean Squared Error. 
print("RMSE: {}".format(trainingSummary.rootMeanSquaredError))

# Print R-Squared.
print("r2: {}".format(trainingSummary.r2))

+--------------------+
|           residuals|
+--------------------+
|   3.947534847045034|
|   5.999092851766054|
|   5.988796995512839|
|   7.650755596767347|
|   6.363551303619687|
|     6.2500738360119|
|   2.216890445298599|
|  0.9529065484865029|
| -0.5851749615335855|
|  0.3483415843496651|
| 0.46496713027950776|
|  0.3632184485195431|
|   2.751522516091857|
| -0.7278401660898375|
|  0.8028896589485939|
|-0.33120937599214884|
| -0.4476608696980158|
| -0.6579223666615199|
| -2.7117761933361955|
| -1.2651201327727684|
+--------------------+
only showing top 20 rows

RMSE: 23.091530562941383
r2: 0.7883179012780994


In [10]:
print(lr.explainParams())

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
featuresCol: features column name. (default: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: PM25)
maxIter: max number of iterations (>= 0). (default: 100)
predictionCol: prediction column name. (default: prediction)
regParam: regularization parameter (>= 0). (default: 0.0)
solver: the solver algorithm for optimization. If this is not set or empty, default value is 'auto'. (default: auto)
standardization: whether to standardize the training features before fitting the model. (default: True)
tol: the convergence tolerance for iterative algorithms (>= 0). (default: 1e-06)
weightCol: weight column name. If this is not set or empty, we treat all instance weights as 1.0. (undefin

In [11]:
lr = LinearRegression(labelCol='PM25',fitIntercept=False,regParam=0)
lrModel = lr.fit(final_data)
trainingSummary = lrModel.summary
# Print Root Mean Squared Error. 
print("RMSE: {}".format(trainingSummary.rootMeanSquaredError))

# Print R-Squared.
print("r2: {}".format(trainingSummary.r2))

RMSE: 23.91406912213034
r2: 0.8948641888223404


In [12]:
#Set training set and testing set
train_data,test_data = final_data.randomSplit([0.8,0.2])

train_data.describe().show()

test_data.describe().show()

+-------+-----------------+
|summary|             PM25|
+-------+-----------------+
|  count|            23195|
|   mean| 53.8937379445469|
| stddev|50.14229394632353|
|    min|              3.0|
|    max|            252.0|
+-------+-----------------+

+-------+------------------+
|summary|              PM25|
+-------+------------------+
|  count|              5699|
|   mean|54.643942709600076|
| stddev| 50.38419484977075|
|    min|               3.0|
|    max|             252.0|
+-------+------------------+



In [13]:
#Set fitIntercept as False
lr = LinearRegression(labelCol='PM25',fitIntercept=False)

# Fit the model to the data.
lrModel = lr.fit(train_data)
print("Coefficients: {}".format(str(lrModel.coefficients)))
print('\n')
print("Intercept:{}".format(str(lrModel.intercept)))

Coefficients: [0.5229084713543641,-0.4295762479741102,-0.09354202832905938,0.029997610291327793,-0.016776246942747007]


Intercept:0.0


In [14]:
#Evaluate the model against the testing data
test_results = lrModel.evaluate(test_data)

#Show the difference between predicted value and testing data
test_results.residuals.show()

+-------------------+
|          residuals|
+-------------------+
| 21.294050822930405|
|-4.1319088621213105|
|-3.8797445904438224|
| 10.241724686627023|
|  8.217531320461472|
|  13.90000522017873|
|-3.3388147199265337|
| -3.095649974045994|
| -7.155599837217949|
|-10.119263032177331|
|-3.0432141633926815|
| -2.556703002053018|
| -2.932895888120875|
| 0.5152536954265519|
| 1.0212747289317754|
| 0.5658013378956941|
| -2.937247099519496|
| -5.189192469556083|
|-2.5412120586847067|
|  -5.13400323653911|
+-------------------+
only showing top 20 rows



In [15]:
# Print Root Mean Squared Error. 
print("RSME: {}".format(test_results.rootMeanSquaredError))

# Print R-Squared.
print("R2: {}".format(test_results.r2))

RSME: 24.482794561799658
R2: 0.8914919771243914
