In [None]:
# Run below commands
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from datetime import datetime, date, timedelta
from dateutil import relativedelta
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
from pyspark.sql.functions import to_timestamp, to_date
from pyspark.sql import functions as F  
from pyspark.sql.functions import collect_list, collect_set, concat, first, array_distinct, col, size, expr
from pyspark.sql import DataFrame 
import random
import pandas as pd
import os

In [None]:
spark = SparkSession\
        .builder\
        .getOrCreate()

In [None]:
#To upload files, use the following command
from google.colab import files
temp_file = "/content/GlobalLandTemperatures_GlobalLandTemperaturesByCountry.csv"
if os.path.isfile(temp_file):
  pass
else:
  files.upload()

Saving GlobalLandTemperatures_GlobalLandTemperaturesByCountry.csv to GlobalLandTemperatures_GlobalLandTemperaturesByCountry.csv


In [None]:
df = spark.read.format("csv").option("header", True).load("/content/GlobalLandTemperatures_GlobalLandTemperaturesByCountry.csv")

In [None]:
from pyspark.sql.types import FloatType
df = df.withColumn("AverageTemperature",df.AverageTemperature.cast(FloatType()))

In [None]:
df.show(20)

+----------+------------------+-----------------------------+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|Country|
+----------+------------------+-----------------------------+-------+
|1743-11-01|             4.384|                        2.294|  Åland|
|1743-12-01|              null|                         null|  Åland|
|1744-01-01|              null|                         null|  Åland|
|1744-02-01|              null|                         null|  Åland|
|1744-03-01|              null|                         null|  Åland|
|1744-04-01|              1.53|                         4.68|  Åland|
|1744-05-01|             6.702|                        1.789|  Åland|
|1744-06-01|            11.609|                        1.577|  Åland|
|1744-07-01|            15.342|                         1.41|  Åland|
|1744-08-01|              null|                         null|  Åland|
|1744-09-01|            11.702|                        1.517|  Åland|
|1744-10-01|        

In [None]:
df.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: float (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- Country: string (nullable = true)



## 1-a
Kuwait in 2012-07-01 has the highest average temperature

In [None]:
max_value = df.agg({"AverageTemperature": "max"}).collect()[0]["max(AverageTemperature)"]
result = df.filter(col("AverageTemperature") == max_value)
result.show()

+----------+------------------+-----------------------------+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|Country|
+----------+------------------+-----------------------------+-------+
|2012-07-01|            38.842|                        0.464| Kuwait|
+----------+------------------+-----------------------------+-------+



## 1-b
This is the top 10 countries with the biggest change in average temperature. for each country, I take the highest and lowest value and got the absolute difference.

In [None]:
max_year = df.groupBy("Country").agg(max("AverageTemperature").alias("max_value"))
min_year = df.groupBy("Country").agg(min("AverageTemperature").alias("min_value"))
final_df = max_year.join(min_year , "Country")
final_df = final_df.withColumn("abs_diff", abs(final_df["max_value"] - final_df["min_value"]))
final_df = final_df.orderBy("abs_diff", ascending=False)
top_10_rows = final_df.limit(10)
top_10_rows.show()

+------------+---------+---------+---------+
|     Country|max_value|min_value| abs_diff|
+------------+---------+---------+---------+
|  Kazakhstan|   25.562|  -23.601|49.163002|
|    Mongolia|   20.716|  -27.442|48.157997|
|      Russia|   16.893|  -30.577|    47.47|
|      Canada|   14.796|  -28.736|43.531998|
|  Uzbekistan|   30.375|  -12.323|42.697998|
|Turkmenistan|   32.136|   -8.443|40.579002|
|     Finland|   19.132|    -21.2|   40.332|
|     Belarus|   22.811|  -16.527|   39.338|
|     Ukraine|   24.297|  -14.724|   39.021|
|     Estonia|   22.332|  -16.551|38.883003|
+------------+---------+---------+---------+



## 2-a

In [None]:
temp_file = "/content/CO2 emissions per capita per country.csv"
if os.path.isfile(temp_file):
  pass
else:
  files.upload()

In [None]:
df_co2 = spark.read.format("csv").option("header", True).load("/content/CO2 emissions per capita per country.csv")

In [None]:
df_co2.show(20)

+--------------------+------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+----+----+----+----+
|        Country Name|Country Code|       1960|       1961|       1962|       1963|       1964|       1965|       1966|       1967|       1968|       1969|       1970|       1971|       1972|       1973|       1974|       1975|       1976|       1977|       1978|       1979|       19

In [None]:
df_co2 = df_co2.withColumnRenamed('Country Name','Country')

In [None]:
from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame
from typing import Iterable 

In [None]:
def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name)) 
        for c in value_vars))

    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

In [None]:
df_co2 = melt(df_co2, id_vars=['Country'],
            value_vars=['1960', '1961', '1962', '1963', '1964', '1965', '1966', '1967', '1968', '1969',
                        '1970', '1971', '1972', '1973', '1974', '1975', '1976', '1977', '1978', '1979',
                        '1980', '1981', '1982', '1983', '1984', '1985', '1986', '1987', '1988', '1989',
                        '1990', '1991', '1992', '1993', '1994', '1995', '1996', '1997', '1998', '1999',
                        '2000', '2001', '2002', '2003', '2004', '2005', '2006', '2007', '2008', '2009',
                        '2010', '2011', '2012', '2013', '2014', '2015', '2016', '2017', '2018'])

In [None]:
df_co2 = df_co2.withColumnRenamed('variable','Year')
df_co2 = df_co2.withColumnRenamed('value', 'CO2')

In [None]:
df_co2.show()

+-------+----+----+
|Country|Year| CO2|
+-------+----+----+
|  Aruba|1960|null|
|  Aruba|1961|null|
|  Aruba|1962|null|
|  Aruba|1963|null|
|  Aruba|1964|null|
|  Aruba|1965|null|
|  Aruba|1966|null|
|  Aruba|1967|null|
|  Aruba|1968|null|
|  Aruba|1969|null|
|  Aruba|1970|null|
|  Aruba|1971|null|
|  Aruba|1972|null|
|  Aruba|1973|null|
|  Aruba|1974|null|
|  Aruba|1975|null|
|  Aruba|1976|null|
|  Aruba|1977|null|
|  Aruba|1978|null|
|  Aruba|1979|null|
+-------+----+----+
only showing top 20 rows



In [None]:
from pyspark.sql.functions import year

In [None]:
# Change for the df
df = df.withColumn('Year', year(df['dt']))
df = df.groupBy('Country', 'Year').agg({'AverageTemperature': 'avg'})

In [None]:
df.show()

+-------------------+----+-----------------------+
|            Country|Year|avg(AverageTemperature)|
+-------------------+----+-----------------------+
|            Albania|1821|      12.23716672261556|
|            Albania|1943|      13.19725015759468|
|            Albania|1960|     13.335083365440369|
|            Andorra|1867|     11.277666707833609|
|            Andorra|1970|     11.376583377520243|
|             Angola|1926|     21.922666549682617|
|             Angola|1942|     21.892083326975506|
|             Angola|1979|     22.144749959309895|
|             Angola|1987|     22.749916394551594|
|           Anguilla|1931|      26.97083330154419|
|           Anguilla|1998|     27.740833282470703|
|Antigua And Barbuda|1866|     26.054166793823242|
|Antigua And Barbuda|1875|      25.93950017293294|
|          Argentina|2010|       15.0788334608078|
|              Aruba|1913|     27.351500193277996|
|          Australia|1868|     21.447583119074505|
|            Austria|1941|     

In [None]:
final_df = df.join(df_co2 , ["Year","Country"])

This is the result of the merging two datasets

In [None]:
final_df.show()

+----+------------------+-----------------------+-----------+
|Year|           Country|avg(AverageTemperature)|        CO2|
+----+------------------+-----------------------+-----------+
|1960|           Albania|     13.335083365440369|1.258194928|
|1979|            Angola|     22.144749959309895|0.636944237|
|1987|            Angola|     22.749916394551594|  0.5184278|
|2010|         Argentina|       15.0788334608078|4.558499612|
|2012|        Azerbaijan|     13.086749908824762| 3.82487717|
|1987|        Bangladesh|      25.64241631825765|0.120625234|
|1990|          Botswana|     22.944833119710285|  1.9613582|
|1997|            Brazil|      25.55941645304362|1.793828678|
|1969|              Cuba|     25.657833258310955|2.015669739|
|2009|              Cuba|     26.022333304087322|2.641408453|
|1999|            Cyprus|     20.073749939600628|6.886143297|
|1981|          Dominica|     26.686333179473877|0.486500829|
|1962|Dominican Republic|      25.74033323923747|0.353028929|
|1960|  

## 2-b
This is the correlation between CO2 emissions and temperature change. Since the correlation is -0.234, we can describe that there is weak negative relationship between CO2 emissions and temperature change. This indicates that in terms of the time range of this dataset, we can mention that there might be a decreasing trend of CO2 overall.

In [None]:
final_df = final_df.dropna()

In [None]:
from pyspark.sql.functions import rand
final_df = final_df.withColumnRenamed('avg(AverageTemperature)', 'avg_AverageTemperature')
final_df = final_df.withColumn("avg_AverageTemperature",final_df.avg_AverageTemperature.cast(FloatType()))
final_df = final_df.withColumn("CO2",final_df.CO2.cast(FloatType()))
final_df.stat.corr("avg_AverageTemperature", "CO2")

-0.2344769653462897

-----------------------------------------------------------

## Run all the above in google cloud using a cluster

### The process to run my code above in google cloud as spark job
1. Authorize the cloud shell and enable compute engine
2. Create the google cloud storage bucket for dataproc cluster
3. Create data proc cluster with jupyter and component Gateway

##### Screenshot of Clusters

![picture](https://drive.google.com/uc?export=view&id=18MGOSGczBiqY8q3eviV6vPf1y7jha5Z2)

##### Screenshot of jupyter code in google cloud

![picture](https://drive.google.com/uc?export=view&id=1pozIhfaRmJNOKhx5yLagFBKTrfr_XvMN)

-------------------------------------------------------------------------------