In [None]:
# Installing required packages
!pip install pyspark
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [None]:
from pyspark.sql import SQLContext

In [None]:
spark = SparkSession \
    .builder \
    .appName("ML_Regression_Example1") \
    .getOrCreate()

# Create Spark Context

In [None]:
sc = spark.sparkContext


sqlContext = SQLContext(sc)



In [None]:
# from google.colab import files
# uploaded = files.upload()


In [None]:
file='minute_weather.csv'

df = sqlContext.read.load(file,
                          format='com.databricks.spark.csv',
                          header='true',inferSchema='true')

In [None]:
df.columns

['rowID',
 'hpwren_timestamp',
 'air_pressure',
 'air_temp',
 'avg_wind_direction',
 'avg_wind_speed',
 'max_wind_direction',
 'max_wind_speed',
 'min_wind_direction',
 'min_wind_speed',
 'rain_accumulation',
 'rain_duration',
 'relative_humidity']

In [None]:
df.printSchema()

root
 |-- rowID: integer (nullable = true)
 |-- hpwren_timestamp: timestamp (nullable = true)
 |-- air_pressure: double (nullable = true)
 |-- air_temp: double (nullable = true)
 |-- avg_wind_direction: double (nullable = true)
 |-- avg_wind_speed: double (nullable = true)
 |-- max_wind_direction: double (nullable = true)
 |-- max_wind_speed: double (nullable = true)
 |-- min_wind_direction: double (nullable = true)
 |-- min_wind_speed: double (nullable = true)
 |-- rain_accumulation: double (nullable = true)
 |-- rain_duration: double (nullable = true)
 |-- relative_humidity: double (nullable = true)



In [None]:
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
rowID,1587257,793628.0,458201.7724491034,0,1587256
air_pressure,1587257,916.8301266904355,3.0515931266797334,905.0,929.5
air_temp,1587257,61.85144042833238,11.833623786835483,31.64,99.5
avg_wind_direction,1586824,161.96537927331576,95.20811970204007,0.0,359.0
avg_wind_speed,1586824,2.774272067979729,2.060757793563034,0.0,32.3
max_wind_direction,1586824,163.40304784903682,92.3672342806411,0.0,359.0
max_wind_speed,1586824,3.3998134008569094,2.4231674336171545,0.1,36.0
min_wind_direction,1586824,166.82637078844283,97.4627462007766,0.0,359.0
min_wind_speed,1586824,2.1331304542923206,1.7453450849327021,0.0,32.0


In [None]:
#df.describe('air_pressure_9am').show()

In [None]:
len(df.columns)

13

In [None]:
df.count()

1587257

In [None]:
df = df.na.drop()

In [None]:
df.count()

1586823

In [None]:
df.stat.corr("avg_wind_speed", "max_wind_speed")

0.9850086439429298

In [None]:
df.stat.corr("avg_wind_speed", "min_wind_speed")

0.970987039163073

In [None]:
df.stat.corr("avg_wind_direction", "max_wind_direction")

0.7163162675632868

In [None]:
df.stat.corr("avg_wind_direction", "min_wind_direction")

0.6227626989594485

In [None]:
df = df.drop('rowID')
df = df.drop('hpwren_timestamp')

In [None]:
df.show()

+------------+--------+------------------+--------------+------------------+--------------+------------------+--------------+-----------------+-------------+-----------------+
|air_pressure|air_temp|avg_wind_direction|avg_wind_speed|max_wind_direction|max_wind_speed|min_wind_direction|min_wind_speed|rain_accumulation|rain_duration|relative_humidity|
+------------+--------+------------------+--------------+------------------+--------------+------------------+--------------+-----------------+-------------+-----------------+
|       912.3|   63.86|             161.0|           0.8|             215.0|           1.5|              43.0|           0.2|              0.0|          0.0|             39.9|
|       912.3|   64.22|              77.0|           0.7|             143.0|           1.2|             324.0|           0.3|              0.0|          0.0|             43.0|
|       912.3|    64.4|              89.0|           1.2|             112.0|           1.6|              12.0|          

In [None]:
# #drop high correlated column
# for col in ['max_wind_direction','min_wind_direction','max_wind_speed','min_wind_speed']:
#     df = df.drop(col) # Drop each column individually in a loop
# df.show()

#  Regression

In [None]:
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml.feature import VectorAssembler

In [None]:
df.count(), len(df.columns)

(1586823, 11)

In [None]:
df.columns

['air_pressure',
 'air_temp',
 'avg_wind_direction',
 'avg_wind_speed',
 'max_wind_direction',
 'max_wind_speed',
 'min_wind_direction',
 'min_wind_speed',
 'rain_accumulation',
 'rain_duration',
 'relative_humidity']

In [None]:
featureColumns =df.columns[:-1]

In [None]:
featureColumns

['air_pressure',
 'air_temp',
 'avg_wind_direction',
 'avg_wind_speed',
 'max_wind_direction',
 'max_wind_speed',
 'min_wind_direction',
 'min_wind_speed',
 'rain_accumulation',
 'rain_duration']

In [None]:
assembler = VectorAssembler(inputCols=featureColumns, outputCol="features")
assembled = assembler.transform(df)

In [None]:
assembled.show(10)

+------------+--------+------------------+--------------+------------------+--------------+------------------+--------------+-----------------+-------------+-----------------+--------------------+
|air_pressure|air_temp|avg_wind_direction|avg_wind_speed|max_wind_direction|max_wind_speed|min_wind_direction|min_wind_speed|rain_accumulation|rain_duration|relative_humidity|            features|
+------------+--------+------------------+--------------+------------------+--------------+------------------+--------------+-----------------+-------------+-----------------+--------------------+
|       912.3|   63.86|             161.0|           0.8|             215.0|           1.5|              43.0|           0.2|              0.0|          0.0|             39.9|[912.3,63.86,161....|
|       912.3|   64.22|              77.0|           0.7|             143.0|           1.2|             324.0|           0.3|              0.0|          0.0|             43.0|[912.3,64.22,77.0...|
|       912.3| 

In [None]:
select_assembled=assembled.select("relative_humidity","features")
select_assembled=select_assembled.withColumnRenamed("relative_humidity","label")

In [None]:
(trainData, testData) = select_assembled.randomSplit([0.8,0.2], seed = 13234 )

In [None]:
trainData.count(),testData.count()

(1269970, 316853)

In [None]:
from pyspark.ml.regression import LinearRegression


In [None]:
trainData.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.4|[923.2,52.88,17.0...|
|  1.4|[923.3,52.52,18.0...|
|  1.4|[923.3,52.52,18.0...|
|  1.4|[923.3,52.7,15.0,...|
|  1.4|[924.0,52.16,21.0...|
|  1.4|[924.0,52.34,57.0...|
|  1.4|[924.0,52.52,20.0...|
|  1.4|[924.0,53.06,22.0...|
|  1.5|[918.1,65.84,179....|
|  1.5|[923.2,52.88,26.0...|
|  1.5|[923.2,53.06,16.0...|
|  1.5|[923.2,53.06,19.0...|
|  1.5|[923.3,52.52,29.0...|
|  1.5|[924.0,52.16,16.0...|
|  1.5|[924.0,52.34,16.0...|
|  1.5|[924.0,52.7,15.0,...|
|  1.5|[924.0,53.06,25.0...|
|  1.5|[924.0,53.24,9.0,...|
|  1.6|[918.1,65.84,176....|
|  1.6|[923.2,52.7,101.0...|
+-----+--------------------+
only showing top 20 rows



In [None]:
testData.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.5|[923.2,52.88,4.0,...|
|  1.5|[924.0,52.88,23.0...|
|  1.5|[924.0,53.06,27.0...|
|  1.5|[924.0,53.24,17.0...|
|  1.6|[923.2,53.06,17.0...|
|  1.6|[923.3,52.52,50.0...|
|  1.7|[917.2,77.18,356....|
|  1.7|[917.2,77.36,341....|
|  1.7|[917.7,65.12,210....|
|  1.7|[917.9,64.94,207....|
|  1.7|[918.6,78.08,30.0...|
|  1.7|[924.3,53.06,36.0...|
|  1.8|[916.9,63.68,128....|
|  1.8|[917.2,77.0,343.0...|
|  1.8|[917.2,77.54,0.0,...|
|  1.8|[917.2,77.54,3.0,...|
|  1.8|[917.2,77.54,331....|
|  1.8|[917.2,77.54,346....|
|  1.8|[917.3,77.18,359....|
|  1.8|[917.3,77.36,340....|
+-----+--------------------+
only showing top 20 rows



In [None]:
lr = LinearRegression()


In [None]:
model1 = lr.fit(trainData)


In [None]:
print("Coefficients: %s" % str(model1.coefficients))
print("Intercept: %s" % str(model1.intercept))

Coefficients: [-2.704714514629627,-1.3745025238590267,0.010435642037595607,1.2916432775746927,0.030109846776819636,-2.5167563790933984,0.007688226234587475,0.5654703023674562,-0.2728730955700857,0.003306293317095326]
Intercept: 2608.263072102212


In [None]:
trainingSummary = model1.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

numIterations: 0
objectiveHistory: [0.0]
+-------------------+
|          residuals|
+-------------------+
|-37.153869506928096|
| -37.61583670041464|
|-37.420708092741826|
|-36.862542523775666|
| -36.52984158687841|
|-37.590904058900016|
|-35.186753739778034|
| -34.97215222702253|
|-40.941532554883906|
|-37.402616647066225|
| -36.44935961127658|
|-36.900211501525064|
| -38.01137372501489|
| -36.32060157146407|
|-35.297045461522885|
|-34.990982332352814|
| -35.34081636760038|
| -33.91191874005608|
| -40.56766708197883|
| -41.60798449957101|
+-------------------+
only showing top 20 rows

RMSE: 17.811709
r2: 0.537880


In [None]:
predictions = model1.transform(testData)

In [None]:
predictions.show()

+-----+--------------------+------------------+
|label|            features|        prediction|
+-----+--------------------+------------------+
|  1.5|[923.2,52.88,4.0,...| 37.95694105806797|
|  1.5|[924.0,52.88,23.0...| 36.80996509498755|
|  1.5|[924.0,53.06,27.0...|36.548706354930346|
|  1.5|[924.0,53.24,17.0...| 36.12380891353587|
|  1.6|[923.2,53.06,17.0...|38.221102757330755|
|  1.6|[923.3,52.52,50.0...|41.081942758492005|
|  1.7|[917.2,77.18,356....| 26.19375166611826|
|  1.7|[917.2,77.36,341....| 35.96233988700078|
|  1.7|[917.7,65.12,210....| 46.22369581055864|
|  1.7|[917.9,64.94,207....| 45.49516791366068|
|  1.7|[918.6,78.08,30.0...|15.924435596208696|
|  1.7|[924.3,53.06,36.0...| 36.60254253996254|
|  1.8|[916.9,63.68,128....|46.741937062555735|
|  1.8|[917.2,77.0,343.0...| 35.83725856534875|
|  1.8|[917.2,77.54,0.0,...| 21.99347405825256|
|  1.8|[917.2,77.54,3.0,...|22.039176483111987|
|  1.8|[917.2,77.54,331....|  35.4182665578187|
|  1.8|[917.2,77.54,346....|35.559883095

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

# Change predictionCol to 'prediction'
Evaluator = RegressionEvaluator(labelCol="label", predictionCol='prediction', metricName='rmse')

rmse = Evaluator.evaluate(predictions)
print("RMSE: %.3f" % rmse)

RMSE: 17.840


In [None]:
#sc.stop()