In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://apache.volia.net/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
!tar -xvf spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext

sc = SparkContext('local')
spark = SparkSession(sc)


In [None]:
from pyspark.sql.types import *

logData = sc.textFile("/content/FW_PG_Meteo_2018-2019.txt").cache()

header = logData.first()

logData = logData.filter(lambda x: x != header)

logData = logData.map(lambda x: x.split(", "))
df = logData.toDF(header.split(","))

df.show()

+----+-----+---+----+------+------+----------+----------+--------------+-----------+-----------------+-----------+-------------+---------+-----------------+-----------------+
|Year|Month|Day|Hour|Minute|Second|     FW_PG|Wind_Speed|Wind_Direction|Temperature|Relative_Humidity|   Pressure|Precipitation|Dew Point|Absolute_Humidity|Specific_Humidity|
+----+-----+---+----+------+------+----------+----------+--------------+-----------+-----------------+-----------+-------------+---------+-----------------+-----------------+
|2018|    1|  1|   0|     0|     0|106.772863|  0.240000|      8.930000|   2.690000|        89.670000|1011.000000|     0.000000| 1.149374|         5.223580|         0.804220|
|2018|    1|  1|   0|     1|     0|105.187933|  0.290000|      8.900000|   2.740000|        90.250000|1011.010000|     0.000000| 1.288370|         5.275076|         0.805798|
|2018|    1|  1|   0|     2|     0|119.689349|  0.100000|      8.110000|   2.600000|        90.110000|1011.000000|     0.0000

# New Section

In [None]:

for col in df.columns:
  df = df.withColumn(col, df[col].cast(DoubleType()))


def drop_nan(df):
    for col in df.columns:
        df = df.na.drop(subset=col)
    return df

df_d = drop_nan(df)


In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.util import MLUtils


featureIndexer = VectorAssembler(inputCols=['Year','Month', 'Day', 'Hour', 'Minute', 'Wind_Speed', 'Wind_Direction', 'Temperature', 'Relative_Humidity', 'Pressure', 'Dew Point', 'Absolute_Humidity', 'Specific_Humidity'], outputCol = 'indexedFeatures')
(trainingData, testData) = df_d.randomSplit([0.7, 0.3])
dt = DecisionTreeRegressor(featuresCol="indexedFeatures", labelCol="FW_PG", maxDepth=25, minInstancesPerNode=5)
pipeline = Pipeline(stages=[featureIndexer, dt])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions_test = model.transform(testData)
predictions_train = model.transform(trainingData)

# Select (prediction, true label) and compute test error
evaluator_mse = RegressionEvaluator(
    labelCol="FW_PG", predictionCol="prediction", metricName="mse")
mse_test = evaluator_mse.evaluate(predictions_test)
print ("Mean Squared Error (MSE) on test data = %g" % mse_test)

mse_train = evaluator_mse.evaluate(predictions_train)
print ("Mean Squared Error (MSE) on train data = %g" % mse_train)

evaluator_r = RegressionEvaluator(
    labelCol="FW_PG", predictionCol="prediction", metricName="r2")
r_test = evaluator_r.evaluate(predictions_test)
print ("R2 on test data = %g" % r_test)

r_train = evaluator_r.evaluate(predictions_train)
print ("R2 on train data = %g" % r_train)


Mean Squared Error (MSE) on test data = 386.858
Mean Squared Error (MSE) on train data = 171.669
R2 on test data = 0.92872
R2 on train data = 0.968462
