# PySpark ML

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkML').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/12 14:04:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
training = spark.read.csv('open-meteo-53.44N20.25E193m.csv', header=True, inferSchema=True)
training.show()

+-------------------+----------------+-------+-----------+------------+------------+------------+-------------------+-------------------+
|               time|temperature_2m_c|rain_mm|snowfall_cm|snow_depth_m|weather_code|visibility_m|wind_speed_10m_km_h|wind_speed_80m_km_h|
+-------------------+----------------+-------+-----------+------------+------------+------------+-------------------+-------------------+
|2023-12-12 00:00:00|             1.8|    0.0|        0.0|        0.06|          45|        80.0|                4.2|               10.8|
|2023-12-12 01:00:00|             1.5|    0.0|        0.0|        0.06|           3|       100.0|                3.6|                9.2|
|2023-12-12 02:00:00|             1.1|    0.0|        0.0|        0.06|           3|       100.0|                2.9|                8.4|
|2023-12-12 03:00:00|             0.9|    0.0|        0.0|        0.06|           3|       100.0|                3.4|                8.6|
|2023-12-12 04:00:00|             

In [3]:
training.printSchema()

root
 |-- time: timestamp (nullable = true)
 |-- temperature_2m_c: double (nullable = true)
 |-- rain_mm: double (nullable = true)
 |-- snowfall_cm: double (nullable = true)
 |-- snow_depth_m: double (nullable = true)
 |-- weather_code: integer (nullable = true)
 |-- visibility_m: double (nullable = true)
 |-- wind_speed_10m_km_h: double (nullable = true)
 |-- wind_speed_80m_km_h: double (nullable = true)


In [4]:
training.columns

['time',
 'temperature_2m_c',
 'rain_mm',
 'snowfall_cm',
 'snow_depth_m',
 'weather_code',
 'visibility_m',
 'wind_speed_10m_km_h',
 'wind_speed_80m_km_h']

In [5]:
from pyspark.ml.feature import VectorAssembler

featureassembler = VectorAssembler(
    inputCols=['rain_mm', 'snowfall_cm', 'wind_speed_10m_km_h'],
    outputCol='Independent Features',
)

In [6]:
output=featureassembler.transform(training)
output.show()

+-------------------+----------------+-------+-----------+------------+------------+------------+-------------------+-------------------+--------------------+
|               time|temperature_2m_c|rain_mm|snowfall_cm|snow_depth_m|weather_code|visibility_m|wind_speed_10m_km_h|wind_speed_80m_km_h|Independent Features|
+-------------------+----------------+-------+-----------+------------+------------+------------+-------------------+-------------------+--------------------+
|2023-12-12 00:00:00|             1.8|    0.0|        0.0|        0.06|          45|        80.0|                4.2|               10.8|       [0.0,0.0,4.2]|
|2023-12-12 01:00:00|             1.5|    0.0|        0.0|        0.06|           3|       100.0|                3.6|                9.2|       [0.0,0.0,3.6]|
|2023-12-12 02:00:00|             1.1|    0.0|        0.0|        0.06|           3|       100.0|                2.9|                8.4|       [0.0,0.0,2.9]|
|2023-12-12 03:00:00|             0.9|    0.0|

In [7]:
finalized_data=output.select('Independent Features', 'temperature_2m_c')
finalized_data.show()

+--------------------+----------------+
|Independent Features|temperature_2m_c|
+--------------------+----------------+
|       [0.0,0.0,4.2]|             1.8|
|       [0.0,0.0,3.6]|             1.5|
|       [0.0,0.0,2.9]|             1.1|
|       [0.0,0.0,3.4]|             0.9|
|       [0.0,0.0,3.6]|             0.9|
|       [0.0,0.0,3.6]|             0.7|
|       [0.0,0.0,4.7]|             0.6|
|       [0.0,0.0,5.4]|             0.6|
|       [0.0,0.0,3.3]|             0.6|
|       [0.0,0.0,5.5]|             0.7|
|       [0.0,0.0,7.4]|             1.0|
|       [0.0,0.0,8.6]|             1.1|
|       [0.0,0.0,8.4]|             1.1|
|       [0.0,0.0,9.0]|             1.1|
|       [0.1,0.0,7.9]|             1.2|
|       [0.0,0.0,7.8]|             1.2|
|       [0.0,0.0,7.7]|             1.2|
|       [0.0,0.0,6.6]|             1.3|
|       [0.0,0.0,6.9]|             1.4|
|       [0.0,0.0,6.6]|             1.4|
+--------------------+----------------+


In [9]:
from pyspark.ml.regression import LinearRegression
train_data, test_data = finalized_data.randomSplit([0.7, 0.3])
regressor = LinearRegression(
    featuresCol='Independent Features',
    labelCol='temperature_2m_c',
)
regressor = regressor.fit(train_data)

23/12/12 14:07:15 WARN Instrumentation: [c84c5f63] regParam is zero, which might cause numerical instability and overfitting.


In [10]:
regressor.coefficients

DenseVector([0.7194, -10.4897, 0.1235])

In [11]:
regressor.intercept

0.3888137575676659

In [12]:
pred_results = regressor.evaluate(test_data)
pred_results.predictions.show(100, False)

+--------------------+----------------+-------------------+
|Independent Features|temperature_2m_c|prediction         |
+--------------------+----------------+-------------------+
|[0.0,0.0,3.4]       |0.9             |0.8087458837287183 |
|[0.0,0.0,3.6]       |0.9             |0.8334477735028978 |
|[0.0,0.0,4.2]       |1.8             |0.9075534428254365 |
|[0.0,0.0,4.7]       |0.4             |0.9693081672608853 |
|[0.0,0.0,4.7]       |1.0             |0.9693081672608853 |
|[0.0,0.0,5.2]       |1.1             |1.0310628916963342 |
|[0.0,0.0,5.4]       |0.6             |1.0557647814705138 |
|[0.0,0.0,6.6]       |1.4             |1.203976120115591  |
|[0.0,0.0,7.1]       |-2.0            |1.2657308445510398 |
|[0.0,0.0,7.7]       |-3.2            |1.3398365138735786 |
|[0.0,0.0,8.2]       |0.8             |1.4015912383090274 |
|[0.0,0.0,9.0]       |-3.2            |1.5003987974057456 |
|[0.0,0.0,9.0]       |1.1             |1.5003987974057456 |
|[0.0,0.0,9.4]       |-1.4            |1

In [13]:
pred_results.meanAbsoluteError, pred_results.meanSquaredError

(1.8152703348574324, 4.968960063172967)

In [14]:
spark.stop()