In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=2fed75b6eb1525bf124374089863bd6fadcd3bf051ef14e1aba86a0a986c4f88
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [2]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor, LinearRegression, RandomForestRegressor, DecisionTreeRegressor
from pyspark.ml.feature import HashingTF, Tokenizer, StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.types import *

In [3]:
spark_context = SparkContext('local')
spark = SparkSession(spark_context)
spark

In [15]:
path_to_csv = '/content/greenhouse_gas_inventory_data_data.csv'
data = spark.read.format('csv') \
            .option('header', 'true') \
            .option('delimiter', ',') \
            .option('inferSchema', 'true') \
            .load(path_to_csv)
data.show()

+---------------+----+----------------+--------------------+
|country_or_area|year|           value|            category|
+---------------+----+----------------+--------------------+
|      Australia|2014|393126.946994288|carbon_dioxide_co...|
|      Australia|2013| 396913.93653029|carbon_dioxide_co...|
|      Australia|2012|  406462.8477036|carbon_dioxide_co...|
|      Australia|2011|403705.528313991|carbon_dioxide_co...|
|      Australia|2010|406200.993184341|carbon_dioxide_co...|
|      Australia|2009| 408448.47899963|carbon_dioxide_co...|
|      Australia|2008|404237.828214077|carbon_dioxide_co...|
|      Australia|2007|398816.453543549|carbon_dioxide_co...|
|      Australia|2006|391134.100909449|carbon_dioxide_co...|
|      Australia|2005|385581.132806466|carbon_dioxide_co...|
|      Australia|2004|381519.261592783|carbon_dioxide_co...|
|      Australia|2003|368345.977425107|carbon_dioxide_co...|
|      Australia|2002|361861.387896028|carbon_dioxide_co...|
|      Australia|2001|35

In [16]:
data.count()

8406

In [17]:
data.printSchema()

root
 |-- country_or_area: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- value: double (nullable = true)
 |-- category: string (nullable = true)



In [18]:
stages = []
label_str_idx = StringIndexer(
    inputCol = 'value',
    outputCol = 'label',
    handleInvalid = 'keep'
)
stages += [label_str_idx]

categorical_cols = ['country_or_area', 'category']
numeric_cols = ['year']
for col in categorical_cols:
    string_indexer = StringIndexer(
        inputCol = col,
        outputCol = col + '_indexer',
        handleInvalid = 'keep'
    )
    encoder = OneHotEncoder(
        inputCol = string_indexer.getOutputCol(),
        outputCol = col + '_class_vec'
    )
    stages += [string_indexer, encoder]

assembler_cols = [col + '_class_vec' for col in categorical_cols] \
               + numeric_cols

stages += [VectorAssembler(
    inputCols = assembler_cols,
    outputCol = 'features'
)]

In [19]:
train_data, test_data = data.randomSplit([0.75, 0.25])

In [24]:
models = []
models.append(('LinearRegression', LinearRegression(featuresCol='features', labelCol='label', maxIter=100)))
models.append(('DecisionTreeRegressor', DecisionTreeRegressor(featuresCol='features', labelCol='label')))
models.append(('RandomForestRegressor', RandomForestRegressor(featuresCol='features', labelCol='label', numTrees=50, maxDepth=8)))
models.append(('GBTRegressor', GBTRegressor(featuresCol='features', labelCol='label', maxIter=50)))

In [25]:
for name, model in models:
    stages += [model]
    pipeline = Pipeline(stages=stages)
    pipeline_fit = pipeline.fit(train_data)
    prediction = pipeline_fit.transform(test_data)
    #model = model.fit(train_data)
    #prediction = model.transform(test_data)
    evaluator = RegressionEvaluator(
        predictionCol = 'prediction',
        labelCol = 'label',
        metricName = 'rmse'
    )
    rmse = evaluator.evaluate(prediction.select('prediction', 'label'))
    print(f'{name}\t RMSE: {round(rmse, 4)}')
    del stages[-1]

LinearRegression	 RMSE: 2903.9398
DecisionTreeRegressor	 RMSE: 2927.9107
RandomForestRegressor	 RMSE: 2873.2255
GBTRegressor	 RMSE: 2888.3004
