#Earth Surface Temperature Analytics

This noteboook contains the code and results involving an investigation where we answer questions and make predictions based on the datasets GlobalLandTemperaturesByCountry.csv and GlobalLandTemperaturesByCity.csv obtained from Climate Change: Earth Surface Temperature Data created by the Berkeley Earth Surface Temperature Study, i.e. https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data in December 2020. As the investigation of these datasets are based on First World Countries we also use the dataset FirstWorldCountries2020.csv obtained from https://worldpopulationreview.com/country-rankings/first-world-countries in January 2021. Due to the large scale of the data used in this project, i.e. Multicore Big Data processing, we can use SparkSQL and/or DataFrame API (PySpark) from Apache Spark to create, transform and perform analytics on Spark DataFrames, which is executed via a Hadoop cluster on Databricks. Please ensure that the datasets used in this project are imported/stored to the Databricks File System, DBFS. 

The goals of this project which are covered in this notebook are:

1.   Investigate which First World Countries (top five) have had the biggest change in average land temperature between 1850 and 2013.
2.   Investigate which Cities (top five) of the five First World Countries from Goal 1 have seen the biggest change in average land temperature between 1850 and 2013.
3.   Predict the average land temperature of the City found to have the greatest change in average land temperature from Goal 2 for the years 2014-2020.

The specific timeframe of the years 1850 to 2013 is selected due to the increased industrialisation throughout the world since the mid-20th century.

In order to achieve the goals stated above, a data analytic pipeline is used. The pipeline for this project comprises of the following stages:


1.   Data Collection
2.   Data Transformation
3.   Feature Engineering (Goal 3 only)
4.   Algorithm Selection (Goal 3 only, for Predictive Analysis), i.e. Supervised Machine Learning Algorithms Linear Regression, Decision Tree Regression and Random Forest Regression
5.   Training (Goal 3 only, for Predictive Analysis)
6.   Testing (Goal 3 only, for Predictive Analysis)
7.   Descriptive or Predictive Analysis
8.   Data Visualisation

Due to the nature of the goals, i.e. Goal 2 depends on the results of Goal 1 and Goal 3 depends on the results of Goal 2, we shall implement the data analytic pipeline three times, once for each goal.

NOTE: The features AverageTemperature and AverageTemperatureUncertainty are measured in degree Celcius (°C).

##Goal 1:  Top five First World Countries that have had the biggest change in average land temperature between 1850 and 2013

In this section we aim to discover, using the Global Land Temperatures By Country dataset and the First World Countries 2020 dataset, the top five First World Countries that have had the biggest change in average land temperature between 1850 and 2013. To achieve this aim, we will use the data analytic pipeline described at the beginning of this notebook.

###Stage 1: Data Collection

Read in the Global Land Temeratures By Country dataset and First World Countries 2020 dataset as Spark DataFrames.

In [0]:
# Read Global Land Temeratures By Country dataset from  /FileStore/tables/GlobalLandTemperaturesByCountry.csv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType

country_land_temp_sdf = (sqlContext.read.format("csv").
  option("header", "true").
  option("delimiter", ",").                      
  option("nullValue", "NA").                      
  option("inferSchema", True).
  load("/FileStore/tables/GlobalLandTemperaturesByCountry.csv"))

country_land_temp_sdf.printSchema()

In [0]:
# See Global Land Temeratures By Country data
display(country_land_temp_sdf)

dt,AverageTemperature,AverageTemperatureUncertainty,Country
1743-11-01T00:00:00.000+0000,4.384,2.294,Åland
1743-12-01T00:00:00.000+0000,,,Åland
1744-01-01T00:00:00.000+0000,,,Åland
1744-02-01T00:00:00.000+0000,,,Åland
1744-03-01T00:00:00.000+0000,,,Åland
1744-04-01T00:00:00.000+0000,1.53,4.68,Åland
1744-05-01T00:00:00.000+0000,6.702000000000001,1.789,Åland
1744-06-01T00:00:00.000+0000,11.609000000000002,1.577,Åland
1744-07-01T00:00:00.000+0000,15.342,1.41,Åland
1744-08-01T00:00:00.000+0000,,,Åland


In [0]:
# Read First World Countries 2020 dataset from  /FileStore/tables/FirstWorldCountries2020.csv
first_world_countries_sdf = (sqlContext.read.format("csv").
  option("header", "true").
  option("delimiter", ",").                      
  option("nullValue", "NA").                      
  option("inferSchema", True).
  load("/FileStore/tables/FirstWorldCountries2020.csv"))

first_world_countries_sdf.printSchema()

In [0]:
# See First World Countries 2020 data
display(first_world_countries_sdf)

country,humanDevelopmentIndex,pop2020
Norway,0.953,5421.241
Switzerland,0.944,8654.622
Australia,0.939,25499.884
Ireland,0.938,4937.786
Germany,0.936,83783.942
Iceland,0.935,341.243
Hong Kong,0.933,7496.981
Sweden,0.933,10099.265
Singapore,0.932,5850.342
Netherlands,0.931,17134.872


###Stage 2: Data Transformation

Taking country_land_temp_sdf, extract year and month from dt and select the required features.

In [0]:
# Extract year and month from dt and select features

from pyspark.sql.functions import month, year

country_land_temp_sdf_cleaned = country_land_temp_sdf.select(year(country_land_temp_sdf['dt']).alias('year'),
                                                             month(country_land_temp_sdf['dt']).alias('month'),
                                                             country_land_temp_sdf['AverageTemperature'],
                                                             country_land_temp_sdf['Country'])

country_land_temp_sdf_cleaned.show()

In [0]:
# Check that year and month are of data type int
country_land_temp_sdf_cleaned.dtypes

We want years 1850-2013.
Filter country_land_temp_sdf_cleaned for years greater than 1849.

In [0]:
# Filter year > 1849
country_land_temp_sdf_cleaned = country_land_temp_sdf_cleaned.filter(country_land_temp_sdf_cleaned['year'] > 1849)[['year', 'month', 'AverageTemperature', 'Country']]

country_land_temp_sdf_cleaned.show()

Drop any null values from country_land_temp_sdf_cleaned.

In [0]:
# Drop nulls
country_land_temp_sdf_cleaned = country_land_temp_sdf_cleaned.dropna()

country_land_temp_sdf_cleaned.createOrReplaceTempView("country_land_temp_sdf_cleaned")

country_land_temp_sdf_cleaned.show()

Now we take first_world_countries_sdf. In order to get the first world countries from this dataframe we will filter the countries where the humanDevelopmentIndex is greater than the average.

First calculate the average of humanDevelopmentIndex in first_world_countries_sdf.

In [0]:
# Calculate average of humanDevelopmentIndex

from pyspark.sql.functions import avg, col

average_hdi = first_world_countries_sdf.select(avg(col('humanDevelopmentIndex')).alias('avgHumanDevelopmentIndex')).collect()
average_hdi[0]

Then from first_world_countries_sdf, filter rows where the feature humanDevelopmentIndex is greater than avgHumanDevelopmentIndex and select the feature country, renamed as FirstWorldCountry.

In [0]:
# Filter humanDevelopmentIndex > avgHumanDevelopmentIndex and select FirstWorldCountry
first_world_countries_sdf_cleaned = first_world_countries_sdf.filter(first_world_countries_sdf['humanDevelopmentIndex'] > average_hdi[0]['avgHumanDevelopmentIndex']).\
                                                            select(first_world_countries_sdf['country'].alias('FirstWorldCountry'))

first_world_countries_sdf_cleaned.createOrReplaceTempView("first_world_countries_sdf_cleaned")

first_world_countries_sdf_cleaned.show()

Now merge country_land_temp_sdf_cleaned with first_world_countries_sdf_cleaned on Country=FirstWorldCountry, projecting year, month, AverageTemperature and Country in ascending order.

In [0]:
# Merge
fw_country_land_temp_sdf = sqlContext.sql("select year, month, AverageTemperature, Country FROM country_land_temp_sdf_cleaned join first_world_countries_sdf_cleaned ON Country=FirstWorldCountry ORDER BY Country, year, month")

fw_country_land_temp_sdf.show()

Check if any Country in country_land_temp_sdf_cleaned did not match with a FirstWorldCountry in first_world_countries_sdf_cleaned in the merge operation.

In [0]:
# Number of missing countries

from pyspark.sql.functions import countDistinct

# Number of countries in first_world_countries_sdf_cleaned
num_first_world_countries = first_world_countries_sdf_cleaned.select(countDistinct('FirstWorldCountry').alias('num_of_FWC')).collect()
# Number of countries in fw_country_land_temp_sdf
num_countries = fw_country_land_temp_sdf.select(countDistinct('Country').alias('num_of_countries')).collect()

# Calculate the difference
num_missing_countries = num_first_world_countries[0]['num_of_FWC'] - num_countries[0]['num_of_countries']

# Print
missing_countries = "%d countries from first_world_countries_sdf_cleaned are missing in fw_country_land_temp_sdf after the merge operation" %(num_missing_countries)
print(missing_countries)

There are 7 countries in the First World Countries 2020 dataset that are not in the Global Land Temperatures By Country Dataset. What are they and why are they missing?

Do another merge operation on the same DataFrames, but this time do a right join and remove the inner part.

In [0]:
# Right merge
missing_country_sdf = sqlContext.sql("select DISTINCT Country, FirstWorldCountry FROM country_land_temp_sdf_cleaned right join first_world_countries_sdf_cleaned ON Country=FirstWorldCountry WHERE Country IS NULL")

missing_country_sdf.show()

A possible reason for the absence of records for the countries above in the Global Land Temperatures By Country dataset would be due to the fact that these countries were not founded at the time of records being kept for this dataset. Taking for example the Maldives, which was not founded until 1965 (https://www.britannica.com/place/Maldives/History).

###Stage 3: Descriptive Analysis

Taking fw_country_land_temp_sdf, calculate the yearly average temperature for each First World Country and project to a new DataFrame fw_country_yearly_land_temp_sdf with features year, Country and YearlyAverageTemperature.

In [0]:
# YearlyAverageTemperature for each First World Country
fw_country_yearly_land_temp_sdf = fw_country_land_temp_sdf.groupBy('year', 'Country').agg(avg(col('AverageTemperature')).alias('YearlyAverageTemperature')).orderBy('Country', 'year')

fw_country_yearly_land_temp_sdf.show()

Taking fw_country_yearly_land_temp_sdf, calculate the max and min average temperature for each First World Country and project each computation to new DataFrames max_fw_country_land_temp_sdf and min_fw_country_land_temp_sdf. Then merge these new DataFrames and call this maxmin_fw_country_land_temp_sdf.

In [0]:
# max_fw_country_land_temp_sdf

from pyspark.sql.functions import min, max

max_fw_country_land_temp_sdf = fw_country_yearly_land_temp_sdf.groupBy('Country').agg(max(col('YearlyAverageTemperature')).alias('MaxAverageTemperature')).orderBy('Country')

max_fw_country_land_temp_sdf.createOrReplaceTempView("max_fw_country_land_temp_sdf")

max_fw_country_land_temp_sdf.show()

In [0]:
# min_fw_country_land_temp_sdf

min_fw_country_land_temp_sdf = fw_country_yearly_land_temp_sdf.groupBy('Country').agg(min(col('YearlyAverageTemperature')).alias('MinAverageTemperature')).orderBy('Country').\
                                select(col('Country').alias('Country_x'), 'MinAverageTemperature')

min_fw_country_land_temp_sdf.createOrReplaceTempView("min_fw_country_land_temp_sdf")

min_fw_country_land_temp_sdf.show()

In [0]:
# Merge
maxmin_fw_country_land_temp_sdf = sqlContext.sql("select Country, maxAverageTemperature, minAverageTemperature FROM max_fw_country_land_temp_sdf join min_fw_country_land_temp_sdf ON Country=Country_x ORDER BY Country")

maxmin_fw_country_land_temp_sdf.show()

Taking maxmin_fw_country_land_temp_sdf, calculate the difference of max and min average temperature for each First World Country and project to a new DataFrame diff_fw_country_land_temp_sdf. We use the absolute value function to ensure the values are positive.

Additionally, as we want to see the First World Countries that have had the biggest change in average land temperature between 1850 and 2013, we order the feature diffAverageTemperature in descending order.

In [0]:
# diff_fw_country_land_temp_sdf

from pyspark.sql.functions import abs

diff_fw_country_land_temp_sdf = maxmin_fw_country_land_temp_sdf.select('Country',
                                                                       abs(col('maxAverageTemperature') - col('minAverageTemperature')).alias('diffAverageTemperature')).orderBy('diffAverageTemperature', ascending=False)

diff_fw_country_land_temp_sdf.show()

We want the First World Countries (top five) have had the biggest change in average land temperature between 1850 and 2013.

Filter the first five rows where diffAverageTemperature > 5.3.

(We could use .show(5), however the code given enables us to display the data in the next section)

In [0]:
# Filter first five rows of diff_fw_country_land_temp_sdf
diff_fw_country_land_temp_sdf = diff_fw_country_land_temp_sdf.filter(col('diffAverageTemperature') > 5.3)

diff_fw_country_land_temp_sdf.show()

###Stage 4: Data Visualisation

For the final stage, we display the data that shows the top five First World Countries that have had the biggest change in average land temperature between 1850 and 2013.

In [0]:
# Visualise the data in diff_fw_country_land_temp_sdf
display(diff_fw_country_land_temp_sdf)

Country,diffAverageTemperature
Saudi Arabia,11.82575
Finland,5.604791666666667
Uzbekistan,5.590000000000002
Denmark,5.3412500000000005
Kazakhstan,5.3135


Therefore in descending order, Saudi Arabia, Finland, Uzbekistan, Denmark and Kazakhstan have had the biggest change in average land temperature, i.e. 11.83°C, 5.60°C, 5.59°C, 5.34°C and 5.31°C respectively, between 1850 and 2013.

##Goal 2: Top five Cities of the five First World Countries from Goal 1 that have seen the biggest change in average land temperature between 1850 and 2013

In this section we aim to discover, using the results found in Goal 1 and the Global Land Temperatures By City dataset, the top five cities of the First World Countries Saudi Arabia, Finland, Uzbekistan, Denmark and Kazakhstan that have seen the biggest change in average land temperature between 1850 and 2013. To achieve this aim, we will use the data analytic pipeline described at the beginning of this notebook.

###Stage 1: Data Collection

Read in the Global Land Temeratures By City dataset as a Spark DataFrame.

In [0]:
# Read Global Land Temeratures By City dataset from  /FileStore/tables/GlobalLandTemperaturesByCity.csv

city_land_temp_sdf = (sqlContext.read.format("csv").
  option("header", "true").
  option("delimiter", ",").                      
  option("nullValue", "NA").                      
  option("inferSchema", True).
  load("/FileStore/tables/GlobalLandTemperaturesByCity.csv"))

city_land_temp_sdf.printSchema()

In [0]:
# See Global Land Temeratures By Major City data
display(city_land_temp_sdf)

dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
1743-11-01T00:00:00.000+0000,6.068,1.737,Århus,Denmark,57.05N,10.33E
1743-12-01T00:00:00.000+0000,,,Århus,Denmark,57.05N,10.33E
1744-01-01T00:00:00.000+0000,,,Århus,Denmark,57.05N,10.33E
1744-02-01T00:00:00.000+0000,,,Århus,Denmark,57.05N,10.33E
1744-03-01T00:00:00.000+0000,,,Århus,Denmark,57.05N,10.33E
1744-04-01T00:00:00.000+0000,5.787999999999999,3.624,Århus,Denmark,57.05N,10.33E
1744-05-01T00:00:00.000+0000,10.644,1.283,Århus,Denmark,57.05N,10.33E
1744-06-01T00:00:00.000+0000,14.050999999999998,1.347,Århus,Denmark,57.05N,10.33E
1744-07-01T00:00:00.000+0000,16.082,1.396,Århus,Denmark,57.05N,10.33E
1744-08-01T00:00:00.000+0000,,,Århus,Denmark,57.05N,10.33E


###Stage 2: Data Transformation

Taking city_land_temp_sdf, extract year and month from dt and select the required features.

In [0]:
# Extract year and month from dt and select features

city_land_temp_sdf_cleaned = city_land_temp_sdf.select(year(city_land_temp_sdf['dt']).alias('year'),
                                                       month(city_land_temp_sdf['dt']).alias('month'),
                                                       city_land_temp_sdf['AverageTemperature'],
                                                       city_land_temp_sdf['City'],      
                                                       city_land_temp_sdf['Country'])

city_land_temp_sdf_cleaned.show()

We want years 1850-2013 and the cities of the First World Countries Saudi Arabia, Finland, Uzbekistan, Denmark and Kazakhstan.
Filter city_land_temp_sdf_cleaned for years greater than 1849 and then filter for the countries mentioned.

In [0]:
# Filter year > 1849 and countries Saudi Arabia, Finland, Uzbekistan, Denmark and Kazakhstan
city_land_temp_sdf_cleaned = city_land_temp_sdf_cleaned.filter(city_land_temp_sdf_cleaned['year'] > 1849)

city_land_temp_sdf_cleaned = city_land_temp_sdf_cleaned.filter((col('Country') == 'Saudi Arabia') | (col('Country') == 'Finland') | (col('Country') == 'Uzbekistan') | (col('Country') == 'Denmark') | (col('Country') == 'Kazakhstan'))

city_land_temp_sdf_cleaned.show()

Drop any null values from city_land_temp_sdf_cleaned.

In [0]:
# Drop nulls
city_land_temp_sdf_cleaned = city_land_temp_sdf_cleaned.dropna()

city_land_temp_sdf_cleaned.show()

###Stage 3: Descriptive Analysis

Taking city_land_temp_sdf_cleaned, calculate the yearly average temperature for each city and project to a new DataFrame city_yearly_land_temp_sdf with features year, City, Country and YearlyAverageTemperature.

In [0]:
# YearlyAverageTemperature for each City
city_yearly_land_temp_sdf = city_land_temp_sdf_cleaned.groupBy('year', 'City', 'Country').agg(avg(col('AverageTemperature')).alias('YearlyAverageTemperature')).orderBy('Country', 'City', 'year')

city_yearly_land_temp_sdf.show()

Taking city_yearly_land_temp_sdf, calculate the max and min average temperature for each city and project each computation to new DataFrames max_city_land_temp_sdf and min_city_land_temp_sdf. Then merge these new DataFrames and call this maxmin_city_land_temp_sdf.

In [0]:
# max_city_land_temp_sdf

max_city_land_temp_sdf = city_yearly_land_temp_sdf.groupBy('City', 'Country').agg(max(col('YearlyAverageTemperature')).alias('MaxAverageTemperature')).orderBy('Country', 'City')

max_city_land_temp_sdf.createOrReplaceTempView("max_city_land_temp_sdf")

max_city_land_temp_sdf.show()

In [0]:
# min_city_land_temp_sdf

min_city_land_temp_sdf = city_yearly_land_temp_sdf.groupBy('City', 'Country').agg(min(col('YearlyAverageTemperature')).alias('MinAverageTemperature')).orderBy('Country', 'City').\
                                select(col('City').alias('City_x'), col('Country').alias('Country_x'), 'MinAverageTemperature')

min_city_land_temp_sdf.createOrReplaceTempView("min_city_land_temp_sdf")

min_city_land_temp_sdf.show()

In [0]:
# Merge
maxmin_city_land_temp_sdf = sqlContext.sql("select City, Country, maxAverageTemperature, minAverageTemperature FROM max_city_land_temp_sdf join min_city_land_temp_sdf ON City=City_x ORDER BY Country, City")

maxmin_city_land_temp_sdf.show()

Taking maxmin_city_land_temp_sdf, calculate the difference of max and min average temperature for each city and project to a new DataFrame diff_city_land_temp_sdf. We use the absolute value function to ensure the values are positive.

Additionally, as we want the cities associated to the First World Countries Saudi Arabia, Finland, Uzbekistan, Denmark and Kazakhstan that have seen the biggest change in average land temperature between 1850 and 2013, we order the feature diffAverageTemperature in descending order.

In [0]:
# diff_city_land_temp_sdf

diff_city_land_temp_sdf = maxmin_city_land_temp_sdf.select('City', 'Country',
                                                           abs(col('maxAverageTemperature') - col('minAverageTemperature')).alias('diffAverageTemperature')).orderBy('diffAverageTemperature', ascending=False)

diff_city_land_temp_sdf.show()

We want the cities (top five) associated to the First World Countries Saudi Arabia, Finland, Uzbekistan, Denmark and Kazakhstan that have seen the biggest change in average land temperature between 1850 and 2013.

Filter the first five rows where diffAverageTemperature > 5.8.

(We could use .show(5), however the code given enables us to display the data in the next section)

In [0]:
# Filter first five rows of diff_city_land_temp_sdf
diff_city_land_temp_sdf = diff_city_land_temp_sdf.filter(col('diffAverageTemperature') > 5.8)

diff_city_land_temp_sdf.show()

###Stage 4: Data Visualisation

For the final stage, we display the data that shows the top five cities associated to the First World Countries Saudi Arabia, Finland, Uzbekistan, Denmark and Kazakhstan that have seen the biggest change in average land temperature between 1850 and 2013.

In [0]:
# Visualise the data in diff_city_land_temp_sdf
display(diff_city_land_temp_sdf)

City,Country,diffAverageTemperature
Atyrau,Kazakhstan,6.522625000000001
Nukus,Uzbekistan,6.355083333333335
Aqtöbe,Kazakhstan,6.284541666666668
Semey,Kazakhstan,5.958541666666668
Aktau,Kazakhstan,5.875416666666666


Therefore in descending order, the cities Atyrau, Nukus, Aqtöbe, Semey and Aktau have seen the biggest change in average land temperature, i.e. 6.52°C, 6.36°C, 6.28°C, 5.96°C and 5.88°C respectively, between 1850 and 2013.

##Goal 3: Predict the average land temperature of the City found to have the greatest change in average land temperature from Goal 2 for the years 2014-2020.

In this section we aim to predict, using the results found in Goal 2 and the Global Land Temperatures By City dataset, the average land temperature of the City found to have the greatest change in average temperature, i.e. Atyrau in Kazakhstan, for the years 2014-2020. To achieve this aim, we will use the data analytic pipeline described at the beginning of this notebook.

###Stage 1: Data Collection

From Goal 2, we were able to transform the Global Land Temperature By City dataset to the DataFrame city_yearly_land_temp_sdf. This DataFrame is a good starting point for Goal 3.

In [0]:
# See city_yearly_land_temp_sdf
display(city_yearly_land_temp_sdf)

year,City,Country,YearlyAverageTemperature
1850,Aalborg,Denmark,7.075500000000001
1851,Aalborg,Denmark,7.520083333333333
1852,Aalborg,Denmark,8.046666666666667
1853,Aalborg,Denmark,6.796916666666667
1854,Aalborg,Denmark,7.750583333333334
1855,Aalborg,Denmark,6.1284166666666655
1856,Aalborg,Denmark,6.761750000000002
1857,Aalborg,Denmark,8.356416666666666
1858,Aalborg,Denmark,7.809916666666667
1859,Aalborg,Denmark,8.449833333333332


###Stage 2: Data Transformation

As we are only interested in predicting the average land temperature of the city Atyrau for the years 2014-2020, we filter the city_yearly_land_temp_sdf for this city.

In [0]:
# Filter rows in city_yearly_land_temp_sdf for the city Atyrau
atyrau_yearly_land_temp_sdf = city_yearly_land_temp_sdf.filter(city_yearly_land_temp_sdf['City'] == 'Atyrau')

atyrau_yearly_land_temp_sdf.show()

Our feature is year and our label is YearlyAverageTemperature.

In [0]:
atyrau_yearly_land_temp_sdf = atyrau_yearly_land_temp_sdf.select('year', 'YearlyAverageTemperature')

atyrau_yearly_land_temp_sdf.show()

###Stage 3: Feature Engineering

The feature column year must be wrapped as vector value.

In [0]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols = ["year"], outputCol = "features")

new_atyrau_yearly_land_temp_sdf = assembler.transform(atyrau_yearly_land_temp_sdf)

###Stage 4: Algorithm Selection

The algorithms that will be used to achieve Goal 3 are the Supervised Machine Learning Algorithms Linear Regression, Decision Tree Regression and Random Forest Regression from MLlib, Apache Spark's scalable machine learning library (https://spark.apache.org/mllib/).

####Stage 4.1: Defining the Linear Regression Algorithm

In [0]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="features", labelCol="YearlyAverageTemperature")

####Stage 4.2: Defining the Decision Tree Regression Algorithm

In [0]:
from pyspark.ml.regression import DecisionTreeRegressor

dtr = DecisionTreeRegressor(featuresCol="features", labelCol="YearlyAverageTemperature")

####Stage 4.3: Defining the Random Forest Regression Algorithm

In [0]:
from pyspark.ml.regression import RandomForestRegressor

rfr = RandomForestRegressor(featuresCol="features",labelCol="YearlyAverageTemperature")

###Stage 5: Training

Split data into training data and evaluation data (ratio is 80% : 20%)

In [0]:
# Split data into train and test set
train, test  = new_atyrau_yearly_land_temp_sdf.randomSplit([0.8,0.2])

Next train each of the three algorithms.

####Stage 5.1: Training the Linear Regression Algorithm

In [0]:
# Train Linear Regression algorithm
lr_model = lr.fit(train)

####Stage 5.2: Training the Decision Tree Regression Algorithm

In [0]:
# Train Decision Tree Regression algorithm
dtr_model = dtr.fit(train)

####Stage 5.3: Training the Random Forest Regression Algorithm

In [0]:
# Train Random Forest Regression algorithm
rfr_model = rfr.fit(train)

###Stage 6: Testing

Before testing each of three models, initiate the evaluation metrics Mean Absolute Error (MAE), Root Mean Square Error (RMSE) and R-squared.

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

# Mean Absolute Error Evaluator
evaluator_mae = RegressionEvaluator(metricName="mae", predictionCol="prediction",labelCol="YearlyAverageTemperature")

# Root Mean Square Error Evaluator
evaluator_rmse = RegressionEvaluator(metricName="rmse", predictionCol="prediction",labelCol="YearlyAverageTemperature")

# R-squared Evaluator
evaluator_r2 = RegressionEvaluator(metricName="r2", predictionCol="prediction",labelCol="YearlyAverageTemperature")

Now test and evaluate each of the three models.

####Stage 6.1: Testing the Linear Regression Model

In [0]:
# Test Linear Regression model
test_lr = lr_model.transform(test)

# Print MAE MSE and R2
print("linear regression mae:", evaluator_mae.evaluate(test_lr))
print("linear regression rmse:", evaluator_rmse.evaluate(test_lr))
print("linear regression r2:", evaluator_r2.evaluate(test_lr))

####Stage 6.2: Testing the Decision Tree Regression Model

In [0]:
# Test Decision Tree Regression model
test_dtr = dtr_model.transform(test)

# Print MAE MSE and R2
print("decision tree regressor mae:", evaluator_mae.evaluate(test_dtr))
print("decision tree regressor rmse:", evaluator_rmse.evaluate(test_dtr))
print("decision tree regressor r2:", evaluator_r2.evaluate(test_dtr))

####Stage 6.3: Testing the Random Forest Regression Model

In [0]:
# Test Random Forest Regression model
test_rfr = rfr_model.transform(test)

# Print MAE MSE and R2
print("random forests regressor mae:", evaluator_mae.evaluate(test_rfr))
print("random forests regressor rmse:", evaluator_rmse.evaluate(test_rfr))
print("random forests regressor r2:", evaluator_r2.evaluate(test_rfr))

###Stage 7: Predictive Analysis

In this stage of the pipeline, we take each of our three models and predict the average land temperature of the city Atyrau for the years 2014-2020.

First we create a DataFrame (series) with the feature year with values 2014, 2015, 2016, 2017, 2018, 2019 and 2020.

In [0]:
# prepare data
year = [2014, 2015, 2016, 2017, 2018, 2019, 2020] 
l = list(zip(year))
l

In [0]:
# pyspark dataframe
from pyspark.sql import Row

rdd = sc.parallelize(l)
rows = rdd.map(lambda z: Row(year=int(z[0])))
pred_sdf = spark.createDataFrame(rows)

# display data
display(pred_sdf)

# Feature Engineering: vector assembler
va_pred_sdf = assembler.transform(pred_sdf)

year
2014
2015
2016
2017
2018
2019
2020


Make predictions based on each of the three models.

####Stage 7.1: Predictions based on the Linear Regression Model

In [0]:
lr_pred = lr_model.transform(va_pred_sdf)
lr_pred_res = lr_pred.select("year", "prediction")
lr_pred_res.cache()

####Stage 7.2: Predictions based on the Decision Tree Regression Model

In [0]:
dtr_pred = dtr_model.transform(va_pred_sdf)
dtr_pred_res = dtr_pred.select("year", "prediction")
dtr_pred_res.cache()

####Stage 7.3: Predictions based on the Random Forest Regression Model

In [0]:
rfr_pred = rfr_model.transform(va_pred_sdf)
rfr_pred_res = rfr_pred.select("year", "prediction")
rfr_pred_res.cache()

###Stage 8: Data Visualisation

Lets see the predictions!

####Stage 8.1: Visualisation of Predictions based on the Linear Regression Model

In [0]:
display(lr_pred_res)

year,prediction
2014,9.32451463157836
2015,9.33732976786327
2016,9.35014490414818
2017,9.362960040433094
2018,9.375775176718005
2019,9.388590313002917
2020,9.401405449287823


####Stage 8.2: Visualisation of Predictions based on the Decision Tree Regression Model

In [0]:
display(dtr_pred_res)

year,prediction
2014,10.14809375
2015,10.14809375
2016,10.14809375
2017,10.14809375
2018,10.14809375
2019,10.14809375
2020,10.14809375


####Stage 8.3: Visualisation of Predictions based on the Random Forest Regression Model

In [0]:
display(rfr_pred_res)

year,prediction
2014,10.060988422619053
2015,10.060988422619053
2016,10.060988422619053
2017,10.060988422619053
2018,10.060988422619053
2019,10.060988422619053
2020,10.060988422619053
