In [0]:
# File location and type
file_location = "/FileStore/tables/weatherHistory.csv"
file_type = "csv"

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.csv(file_location,
                    header=True,
                    inferSchema=True)

In [0]:
df.printSchema()

root
 |-- Formatted Date: timestamp (nullable = true)
 |-- Summary: string (nullable = true)
 |-- Precip Type: string (nullable = true)
 |-- Temperature (C): double (nullable = true)
 |-- Apparent Temperature (C): double (nullable = true)
 |-- Humidity: double (nullable = true)
 |-- Wind Speed (km/h): double (nullable = true)
 |-- Wind Bearing (degrees): double (nullable = true)
 |-- Visibility (km): double (nullable = true)
 |-- Loud Cover: double (nullable = true)
 |-- Pressure (millibars): double (nullable = true)
 |-- Daily Summary: string (nullable = true)



In [0]:
df.show()

+-------------------+-------------+-----------+------------------+------------------------+--------+------------------+----------------------+------------------+----------+--------------------+--------------------+
|     Formatted Date|      Summary|Precip Type|   Temperature (C)|Apparent Temperature (C)|Humidity| Wind Speed (km/h)|Wind Bearing (degrees)|   Visibility (km)|Loud Cover|Pressure (millibars)|       Daily Summary|
+-------------------+-------------+-----------+------------------+------------------------+--------+------------------+----------------------+------------------+----------+--------------------+--------------------+
|2006-03-31 22:00:00|Partly Cloudy|       rain| 9.472222222222221|      7.3888888888888875|    0.89|           14.1197|                 251.0|15.826300000000002|       0.0|             1015.13|Partly cloudy thr...|
|2006-03-31 23:00:00|Partly Cloudy|       rain| 9.355555555555558|       7.227777777777776|    0.86|           14.2646|                 259.

In [0]:
df.select("Humidity").show()

+--------+
|Humidity|
+--------+
|    0.89|
|    0.86|
|    0.89|
|    0.83|
|    0.83|
|    0.85|
|    0.95|
|    0.89|
|    0.82|
|    0.72|
|    0.67|
|    0.54|
|    0.55|
|    0.51|
|    0.47|
|    0.46|
|     0.6|
|    0.63|
|    0.69|
|     0.7|
+--------+
only showing top 20 rows



In [0]:
df.columns

Out[6]: ['Formatted Date',
 'Summary',
 'Precip Type',
 'Temperature (C)',
 'Apparent Temperature (C)',
 'Humidity',
 'Wind Speed (km/h)',
 'Wind Bearing (degrees)',
 'Visibility (km)',
 'Loud Cover',
 'Pressure (millibars)',
 'Daily Summary']

In [0]:
## Print no numerical columns
df.select(["Summary","Daily Summary", "Precip Type", "Formatted Date"]).show()

+-------------+--------------------+-----------+-------------------+
|      Summary|       Daily Summary|Precip Type|     Formatted Date|
+-------------+--------------------+-----------+-------------------+
|Partly Cloudy|Partly cloudy thr...|       rain|2006-03-31 22:00:00|
|Partly Cloudy|Partly cloudy thr...|       rain|2006-03-31 23:00:00|
|Mostly Cloudy|Partly cloudy thr...|       rain|2006-04-01 00:00:00|
|Partly Cloudy|Partly cloudy thr...|       rain|2006-04-01 01:00:00|
|Mostly Cloudy|Partly cloudy thr...|       rain|2006-04-01 02:00:00|
|Partly Cloudy|Partly cloudy thr...|       rain|2006-04-01 03:00:00|
|Partly Cloudy|Partly cloudy thr...|       rain|2006-04-01 04:00:00|
|Partly Cloudy|Partly cloudy thr...|       rain|2006-04-01 05:00:00|
|Partly Cloudy|Partly cloudy thr...|       rain|2006-04-01 06:00:00|
|Partly Cloudy|Partly cloudy thr...|       rain|2006-04-01 07:00:00|
|Partly Cloudy|Partly cloudy thr...|       rain|2006-04-01 08:00:00|
|Partly Cloudy|Partly cloudy thr..

In [0]:
df.select("Precip Type").distinct().collect()

Out[24]: [Row(Precip Type='rain'), Row(Precip Type='snow'), Row(Precip Type='null')]

In [0]:
df.select("Daily Summary").distinct().collect()

Out[25]: [Row(Daily Summary='Foggy until night.'),
 Row(Daily Summary='Partly cloudy overnight.'),
 Row(Daily Summary='Partly cloudy throughout the day and breezy in the evening.'),
 Row(Daily Summary='Partly cloudy in the afternoon.'),
 Row(Daily Summary='Clear throughout the day.'),
 Row(Daily Summary='Breezy starting overnight continuing until afternoon and mostly cloudy starting overnight continuing until evening.'),
 Row(Daily Summary='Foggy overnight.'),
 Row(Daily Summary='Partly cloudy throughout the day.'),
 Row(Daily Summary='Mostly cloudy starting overnight continuing until evening and breezy starting overnight continuing until morning.'),
 Row(Daily Summary='Mostly cloudy starting overnight continuing until evening.'),
 Row(Daily Summary='Foggy in the morning.'),
 Row(Daily Summary='Partly cloudy throughout the day and breezy starting in the morning continuing until evening.'),
 Row(Daily Summary='Foggy starting overnight continuing until night.'),
 Row(Daily Summary='Partl

In [0]:
df.select("Summary").distinct().collect()

Out[26]: [Row(Summary='Breezy'),
 Row(Summary='Humid and Mostly Cloudy'),
 Row(Summary='Windy and Overcast'),
 Row(Summary='Foggy'),
 Row(Summary='Humid and Partly Cloudy'),
 Row(Summary='Windy and Foggy'),
 Row(Summary='Breezy and Partly Cloudy'),
 Row(Summary='Dry'),
 Row(Summary='Partly Cloudy'),
 Row(Summary='Clear'),
 Row(Summary='Mostly Cloudy'),
 Row(Summary='Breezy and Foggy'),
 Row(Summary='Breezy and Overcast'),
 Row(Summary='Dangerously Windy and Partly Cloudy'),
 Row(Summary='Breezy and Mostly Cloudy'),
 Row(Summary='Windy and Partly Cloudy'),
 Row(Summary='Windy'),
 Row(Summary='Dry and Partly Cloudy'),
 Row(Summary='Windy and Mostly Cloudy'),
 Row(Summary='Overcast'),
 Row(Summary='Humid and Overcast'),
 Row(Summary='Drizzle'),
 Row(Summary='Breezy and Dry'),
 Row(Summary='Windy and Dry'),
 Row(Summary='Light Rain'),
 Row(Summary='Dry and Mostly Cloudy'),
 Row(Summary='Rain')]

In [0]:
df = df.drop("Summary", "Daily Summary", "Formatted Date")

In [0]:
## Handling Categorical Features

from pyspark.ml.feature import StringIndexer

In [0]:
indexer = StringIndexer(inputCol="Precip Type", outputCol="Precip_Type_indexed")

df_new = indexer.fit(df).transform(df)


In [0]:
df_new = df_new.drop("Precip Type", "Apparent Temperature (C)")
df_new.show()

+------------------+--------+------------------+----------------------+------------------+----------+--------------------+-------------------+
|   Temperature (C)|Humidity| Wind Speed (km/h)|Wind Bearing (degrees)|   Visibility (km)|Loud Cover|Pressure (millibars)|Precip_Type_indexed|
+------------------+--------+------------------+----------------------+------------------+----------+--------------------+-------------------+
| 9.472222222222221|    0.89|           14.1197|                 251.0|15.826300000000002|       0.0|             1015.13|                0.0|
| 9.355555555555558|    0.86|           14.2646|                 259.0|15.826300000000002|       0.0|             1015.63|                0.0|
| 9.377777777777778|    0.89|3.9284000000000003|                 204.0|           14.9569|       0.0|             1015.94|                0.0|
|  8.28888888888889|    0.83|           14.1036|                 269.0|15.826300000000002|       0.0|             1016.41|                0.0|

In [0]:
df_new.columns

Out[41]: ['Temperature (C)',
 'Humidity',
 'Wind Speed (km/h)',
 'Wind Bearing (degrees)',
 'Visibility (km)',
 'Loud Cover',
 'Pressure (millibars)',
 'Precip_Type_indexed']

In [0]:
## Create a final column with independent features for model input
from pyspark.ml.feature import VectorAssembler

feature_assembler = VectorAssembler(inputCols=[
 'Humidity',
 'Wind Speed (km/h)',
 'Wind Bearing (degrees)',
 'Visibility (km)',
 'Loud Cover',
 'Pressure (millibars)',
 'Precip_Type_indexed'], outputCol="Independent Features")

output=feature_assembler.transform(df_new)

In [0]:
output.select("Independent Features").show()

+--------------------+
|Independent Features|
+--------------------+
|[0.89,14.1197,251...|
|[0.86,14.2646,259...|
|[0.89,3.928400000...|
|[0.83,14.1036,269...|
|[0.83,11.0446,259...|
|[0.85,13.9587,258...|
|[0.95,12.3648,259...|
|[0.89,14.1519,260...|
|[0.82,11.3183,259...|
|[0.72,12.52580000...|
|[0.67,17.5651,290...|
|[0.54,19.7869,316...|
|[0.55,21.94430000...|
|[0.51,20.6885,289...|
|[0.47,15.37550000...|
|[0.46,10.4006,288...|
|[0.6,14.4095,251....|
|[0.63,11.15730000...|
|[0.69,8.5169,163....|
|[0.7,7.6314000000...|
+--------------------+
only showing top 20 rows



In [0]:
df_final = output.select("Independent Features", "Temperature (C)")

In [0]:
df_final.show()

+--------------------+------------------+
|Independent Features|   Temperature (C)|
+--------------------+------------------+
|[0.89,14.1197,251...| 9.472222222222221|
|[0.86,14.2646,259...| 9.355555555555558|
|[0.89,3.928400000...| 9.377777777777778|
|[0.83,14.1036,269...|  8.28888888888889|
|[0.83,11.0446,259...| 8.755555555555553|
|[0.85,13.9587,258...| 9.222222222222221|
|[0.95,12.3648,259...| 7.733333333333334|
|[0.89,14.1519,260...|  8.77222222222222|
|[0.82,11.3183,259...| 10.82222222222222|
|[0.72,12.52580000...| 13.77222222222222|
|[0.67,17.5651,290...|16.016666666666666|
|[0.54,19.7869,316...|17.144444444444446|
|[0.55,21.94430000...|17.800000000000004|
|[0.51,20.6885,289...|17.333333333333332|
|[0.47,15.37550000...| 18.87777777777778|
|[0.46,10.4006,288...|18.911111111111115|
|[0.6,14.4095,251....| 15.38888888888889|
|[0.63,11.15730000...|15.550000000000002|
|[0.69,8.5169,163....|14.255555555555553|
|[0.7,7.6314000000...|13.144444444444442|
+--------------------+------------

In [0]:
from pyspark.ml.regression import LinearRegression

train_data, test_data = df_final.randomSplit([0.75, 0.25])
regressor = LinearRegression(featuresCol='Independent Features', labelCol='Temperature (C)')
regressor = regressor.fit(train_data)

In [0]:
regressor.coefficients

Out[53]: DenseVector([-26.9437, -0.2054, 0.0025, 0.1989, 0.0, -0.0008, -10.601])

In [0]:
regressor.intercept

Out[54]: 33.51686283813776

In [0]:
preds = regressor.evaluate(test_data)

In [0]:
preds.predictions.show()

+--------------------+------------------+------------------+
|Independent Features|   Temperature (C)|        prediction|
+--------------------+------------------+------------------+
|(7,[0,3],[0.93,0....| 11.11111111111111| 8.568105685385753|
|(7,[0,3],[0.93,1....|11.161111111111113| 8.769835297630777|
|(7,[0,3,5],[0.61,...|              12.3|18.254210195088888|
|(7,[0,3,5],[0.73,...| 13.58888888888889| 15.02335445104725|
|(7,[0,3,5],[0.75,...|17.200000000000003|14.483717081861535|
|(7,[0,3,5],[0.75,...|13.355555555555554|15.644520649396526|
|(7,[0,3,5],[0.78,...|12.711111111111112| 14.83189855510777|
|(7,[0,3,5],[0.8,1...|13.177777777777777|14.298688725450258|
|(7,[0,3,5],[0.85,...|10.216666666666667|11.736206928467492|
|(7,[0,3,5],[0.87,...|11.555555555555554|11.211457768727513|
|(7,[0,3,5],[0.87,...|15.994444444444444|11.235864646552724|
|(7,[0,3,5],[0.87,...|21.688888888888894|11.665787304111074|
|(7,[0,3,5],[0.87,...|18.333333333333332|12.232468146491716|
|(7,[0,3,5],[0.89,...| 8

In [0]:
#3 Print Results

preds.r2, preds.meanAbsoluteError, preds.meanSquaredError

Out[57]: (0.5864082714406205, 4.9694839855224, 37.5059296150884)