In [None]:
# Run below commands
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from datetime import datetime, date, timedelta
from dateutil import relativedelta
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
from pyspark.sql.functions import to_timestamp, to_date
from pyspark.sql import functions as F
from pyspark.sql.functions import collect_list, collect_set, concat, first, array_distinct, col, size, expr
from pyspark.sql.functions import array, col, explode, lit, struct
from typing import Iterable
from pyspark.sql.functions import desc
from pyspark.sql.functions import max
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType
from pyspark.sql.functions import year

import random
import pandas as pd

# Getting Started with Spark



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
spark = SparkSession\
        .builder\
        .getOrCreate()

In [None]:
flightData2015 = spark\
    .read\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .csv("/content/drive/Shareddrives/BigData/flight-data/csv/2015-summary.csv")

In [None]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [None]:
flightData2015.sort("count").explain()

== Physical Plan ==
*(1) Sort [count#18 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#18 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#32]
   +- FileScan csv [DEST_COUNTRY_NAME#16,ORIGIN_COUNTRY_NAME#17,count#18] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/content/drive/Shareddrives/BigData/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [None]:
#By default, when we perform a shuffle, Spark outputs 200 shuffle partitions. Let’s set this value to 5 to reduce the number of the output partitions from the shuffle:
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [None]:
flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [None]:
#make any DataFrame into a table or view
flightData2015.createOrReplaceTempView("flight_data_2015")

In [None]:
#spark.sql function that conveniently returns a new DataFrame.
#a SQL query against a DataFrame returns another DataFrame—it’s actually quite powerful
sqlWay = spark.sql("""
  SELECT DEST_COUNTRY_NAME, count(1)
  FROM flight_data_2015
  GROUP BY DEST_COUNTRY_NAME
  """)

In [None]:
dataFrameWay = flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .count()


In [None]:
sqlWay.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#16], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#16, 5), ENSURE_REQUIREMENTS, [id=#61]
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#16], functions=[partial_count(1)])
      +- FileScan csv [DEST_COUNTRY_NAME#16] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/content/drive/Shareddrives/BigData/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [None]:
dataFrameWay.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#16], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#16, 5), ENSURE_REQUIREMENTS, [id=#80]
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#16], functions=[partial_count(1)])
      +- FileScan csv [DEST_COUNTRY_NAME#16] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/content/drive/Shareddrives/BigData/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




maximum number of flights to and from any given location

In [None]:
 spark.sql("SELECT max(count) from flight_data_2015").take(1)

[Row(max(count)=370002)]

In [None]:
flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

top five destination countries in the data

In [None]:
maxSql = spark.sql("""
  SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
  FROM flight_data_2015
  GROUP BY DEST_COUNTRY_NAME
  ORDER BY sum(count) DESC
  LIMIT 5
  """)

In [None]:
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [None]:
 flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .sum("count")\
    .withColumnRenamed("sum(count)", "destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [None]:
flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .sum("count")\
    .withColumnRenamed("sum(count)", "destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .explain()


== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[destination_total#104L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#16,destination_total#104L])
+- *(2) HashAggregate(keys=[DEST_COUNTRY_NAME#16], functions=[sum(cast(count#18 as bigint))])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#16, 5), ENSURE_REQUIREMENTS, [id=#207]
      +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#16], functions=[partial_sum(cast(count#18 as bigint))])
         +- FileScan csv [DEST_COUNTRY_NAME#16,count#18] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/content/drive/Shareddrives/BigData/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




## Dataset 1


In [None]:
spark = SparkSession\
        .builder\
        .getOrCreate()

In [None]:
# #To upload files, use the following command
# from google.colab import files
# files.upload()

In [None]:
df1 = spark.read.csv("/content/drive/MyDrive/BigData/GlobalLandTemperatures_GlobalLandTemperaturesByCountry.csv", inferSchema = True, header = True)

In [None]:
df1.show()

+----------+-------------------+-----------------------------+-------+
|        dt| AverageTemperature|AverageTemperatureUncertainty|Country|
+----------+-------------------+-----------------------------+-------+
|1743-11-01| 4.3839999999999995|                        2.294|  Åland|
|1743-12-01|               null|                         null|  Åland|
|1744-01-01|               null|                         null|  Åland|
|1744-02-01|               null|                         null|  Åland|
|1744-03-01|               null|                         null|  Åland|
|1744-04-01|               1.53|                         4.68|  Åland|
|1744-05-01|  6.702000000000001|                        1.789|  Åland|
|1744-06-01| 11.609000000000002|                        1.577|  Åland|
|1744-07-01|             15.342|                         1.41|  Åland|
|1744-08-01|               null|                         null|  Åland|
|1744-09-01|             11.702|                        1.517|  Åland|
|1744-

In [None]:
df1.describe().show()

+-------+----------+------------------+-----------------------------+-----------+
|summary|        dt|AverageTemperature|AverageTemperatureUncertainty|    Country|
+-------+----------+------------------+-----------------------------+-----------+
|  count|    577462|            544811|                       545550|     577462|
|   mean|      null| 17.19335423293583|           1.0190569003757597|       null|
| stddev|      null|10.953966445121187|           1.2019303866334272|       null|
|    min|1743-11-01|           -37.658|         0.052000000000000005|Afghanistan|
|    max|2013-09-01| 38.84200000000001|                       15.003|      Åland|
+-------+----------+------------------+-----------------------------+-----------+



### For which country and during what year,the highest average temperature was observed?


In [None]:
df1.orderBy(df1['AverageTemperature'].desc()).show()

+----------+------------------+-----------------------------+--------------------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|             Country|
+----------+------------------+-----------------------------+--------------------+
|2012-07-01| 38.84200000000001|                        0.464|              Kuwait|
|2000-07-01| 38.70500000000001|                        0.467|              Kuwait|
|2010-07-01| 38.49500000000001|                        0.498|              Kuwait|
|1998-08-01|            38.436|                        0.353|              Kuwait|
|2000-08-01|            38.315|                        0.593|              Kuwait|
|2013-07-01|            38.234|                        0.454|              Kuwait|
|2011-07-01| 38.15600000000001|                        0.375|              Kuwait|
|1999-08-01|            38.153|                        0.442|              Kuwait|
|2006-08-01|              38.1|                         0.79|              Kuwait|
|200

In [None]:
df1.orderBy(df1['AverageTemperature'].desc()).head(1)[0]

Row(dt='2012-07-01', AverageTemperature=38.84200000000001, AverageTemperatureUncertainty=0.464, Country='Kuwait')

### Analyze the  data by country over the years,and name which are the top 10 countries with the biggest change in average temperature.


In [None]:
grouped_df = df1.groupBy('Country')

In [None]:
max_temp = grouped_df.agg({"AverageTemperature":'max'})
max_temp = max_temp.withColumnRenamed("max(AverageTemperature)","MaxAverageTemperature")
max_temp.show()

+-------------+---------------------+
|      Country|MaxAverageTemperature|
+-------------+---------------------+
|         Chad|               33.415|
|     Anguilla|               29.636|
| Kingman Reef|               29.129|
|     Paraguay|   29.499000000000002|
|       Russia|               16.893|
|Palmyra Atoll|   29.148000000000003|
|        Yemen|   32.736999999999995|
|      Senegal|               32.871|
|       Sweden|   17.930999999999997|
|     Kiribati|   28.715999999999998|
|       Guyana|                28.26|
|        Burma|   28.485000000000007|
|      Eritrea|               31.494|
|       Jersey|               19.701|
|  Philippines|               29.158|
|     Djibouti|               35.175|
|        Tonga|                27.39|
|     Malaysia|                28.26|
|    Singapore|   28.880000000000006|
|         Fiji|                 27.9|
+-------------+---------------------+
only showing top 20 rows



In [None]:
min_temp = grouped_df.agg({"AverageTemperature":'min'})
min_temp = min_temp.withColumnRenamed("min(AverageTemperature)","MinAverageTemperature")
min_temp.show()

+-------------+---------------------+
|      Country|MinAverageTemperature|
+-------------+---------------------+
|         Chad|               18.099|
|     Anguilla|               23.241|
| Kingman Reef|   24.791999999999998|
|     Paraguay|   14.014000000000001|
|       Russia|              -30.577|
|Palmyra Atoll|               24.795|
|        Yemen|               18.928|
|      Senegal|               21.537|
|       Sweden|              -16.608|
|     Kiribati|   24.151999999999997|
|       Guyana|               23.474|
|        Burma|               16.095|
|      Eritrea|   21.348000000000003|
|       Jersey|   0.6929999999999996|
|  Philippines|               22.984|
|     Djibouti|   23.023000000000003|
|        Tonga|               19.295|
|     Malaysia|               23.533|
|    Singapore|   24.031999999999996|
|         Fiji|               21.283|
+-------------+---------------------+
only showing top 20 rows



In [None]:
df1_merged = max_temp.join(min_temp, (max_temp["Country"] == min_temp["Country"]))
df1_merged.show()

+-------------+---------------------+-------------+---------------------+
|      Country|MaxAverageTemperature|      Country|MinAverageTemperature|
+-------------+---------------------+-------------+---------------------+
|         Chad|               33.415|         Chad|               18.099|
|     Anguilla|               29.636|     Anguilla|               23.241|
| Kingman Reef|               29.129| Kingman Reef|   24.791999999999998|
|     Paraguay|   29.499000000000002|     Paraguay|   14.014000000000001|
|       Russia|               16.893|       Russia|              -30.577|
|Palmyra Atoll|   29.148000000000003|Palmyra Atoll|               24.795|
|        Yemen|   32.736999999999995|        Yemen|               18.928|
|      Senegal|               32.871|      Senegal|               21.537|
|       Sweden|   17.930999999999997|       Sweden|              -16.608|
|     Kiribati|   28.715999999999998|     Kiribati|   24.151999999999997|
|       Guyana|                28.26| 

In [None]:
df1_merged = df1_merged.withColumn("Difference",df1_merged["MaxAverageTemperature"] -df1_merged["MinAverageTemperature"])
df1_merged.orderBy(df1_merged['Difference'].desc()).show(10)

+------------+---------------------+------------+---------------------+------------------+
|     Country|MaxAverageTemperature|     Country|MinAverageTemperature|        Difference|
+------------+---------------------+------------+---------------------+------------------+
|  Kazakhstan|   25.561999999999998|  Kazakhstan|              -23.601|            49.163|
|    Mongolia|   20.715999999999998|    Mongolia|  -27.441999999999997|48.157999999999994|
|      Russia|               16.893|      Russia|              -30.577|             47.47|
|      Canada|               14.796|      Canada|              -28.736|            43.532|
|  Uzbekistan|               30.375|  Uzbekistan|              -12.323|            42.698|
|Turkmenistan|               32.136|Turkmenistan|               -8.443|            40.579|
|     Finland|               19.132|     Finland|                -21.2|            40.332|
|     Belarus|               22.811|     Belarus|              -16.527|            39.338|

## Dataset 2


In [None]:
df2 = spark.read.csv("/content/drive/MyDrive/BigData/CO2 emissions per capita per country.csv", inferSchema = True, header = True)

In [None]:
df2.show()

+--------------------+------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+----+----+----+----+
|        Country Name|Country Code|       1960|       1961|       1962|       1963|       1964|       1965|       1966|       1967|       1968|       1969|       1970|       1971|       1972|       1973|       1974|       1975|       1976|       1977|       1978|       1979|       19

In [None]:
df2.describe().show()

+-------+------------+------------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+-----------------+------------------+-----------------+-----------------+------------------+------------------+-----------------+-----------------+-----------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------

In [None]:
df2 = df2.withColumnRenamed("Country Name","Country")
df2.show()

+--------------------+------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+----+----+----+----+
|             Country|Country Code|       1960|       1961|       1962|       1963|       1964|       1965|       1966|       1967|       1968|       1969|       1970|       1971|       1972|       1973|       1974|       1975|       1976|       1977|       1978|       1979|       19

Reshaping Data From Wide to Long

In [None]:

def melt(
        df: DataFrame,
        id_vars: Iterable[str], value_vars: Iterable[str],
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name))
        for c in value_vars))

    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

In [None]:
melt(df2, id_vars=['Country'],
            value_vars=['1960', '1961', '1962', '1963', '1964', '1965', '1966', '1967', '1968', '1969',
                        '1970', '1971', '1972', '1973', '1974', '1975', '1976', '1977', '1978', '1979',
                        '1980', '1981', '1982', '1983', '1984', '1985', '1986', '1987', '1988', '1989',
                        '1990', '1991', '1992', '1993', '1994', '1995', '1996', '1997', '1998', '1999',
                        '2000', '2001', '2002', '2003', '2004', '2005', '2006', '2007', '2008', '2009',
                        '2010', '2011', '2012', '2013', '2014', '2015', '2016', '2017', '2018']).show()

+-------+--------+-----+
|Country|variable|value|
+-------+--------+-----+
|  Aruba|    1960| null|
|  Aruba|    1961| null|
|  Aruba|    1962| null|
|  Aruba|    1963| null|
|  Aruba|    1964| null|
|  Aruba|    1965| null|
|  Aruba|    1966| null|
|  Aruba|    1967| null|
|  Aruba|    1968| null|
|  Aruba|    1969| null|
|  Aruba|    1970| null|
|  Aruba|    1971| null|
|  Aruba|    1972| null|
|  Aruba|    1973| null|
|  Aruba|    1974| null|
|  Aruba|    1975| null|
|  Aruba|    1976| null|
|  Aruba|    1977| null|
|  Aruba|    1978| null|
|  Aruba|    1979| null|
+-------+--------+-----+
only showing top 20 rows



In [None]:
long_df2 = melt(df2, id_vars=['Country'],
            value_vars=['1960', '1961', '1962', '1963', '1964', '1965', '1966', '1967', '1968', '1969',
                        '1970', '1971', '1972', '1973', '1974', '1975', '1976', '1977', '1978', '1979',
                        '1980', '1981', '1982', '1983', '1984', '1985', '1986', '1987', '1988', '1989',
                        '1990', '1991', '1992', '1993', '1994', '1995', '1996', '1997', '1998', '1999',
                        '2000', '2001', '2002', '2003', '2004', '2005', '2006', '2007', '2008', '2009',
                        '2010', '2011', '2012', '2013', '2014', '2015', '2016', '2017', '2018'])

In [None]:
long_df2.where(df2.Country == 'Afghanistan').show()

+-----------+--------+-----------+
|    Country|variable|      value|
+-----------+--------+-----------+
|Afghanistan|    1960|0.046059897|
|Afghanistan|    1961|0.053604304|
|Afghanistan|    1962|0.073764791|
|Afghanistan|    1963|0.074232685|
|Afghanistan|    1964|0.086292452|
|Afghanistan|    1965|0.101467397|
|Afghanistan|    1966|0.107636955|
|Afghanistan|    1967|0.123734289|
|Afghanistan|    1968| 0.11549774|
|Afghanistan|    1969| 0.08682346|
|Afghanistan|    1970|0.150290627|
|Afghanistan|    1971|0.166042044|
|Afghanistan|    1972| 0.13076385|
|Afghanistan|    1973|0.136279785|
|Afghanistan|    1974|0.155649444|
|Afghanistan|    1975|0.168928649|
|Afghanistan|    1976|0.154787206|
|Afghanistan|    1977|0.182963616|
|Afghanistan|    1978|0.163159571|
|Afghanistan|    1979|0.168376671|
+-----------+--------+-----------+
only showing top 20 rows



In [None]:
long_df2 = long_df2.withColumnRenamed('variable','Year')

In [None]:
 long_df2 = long_df2.withColumnRenamed('value', 'CO2')

In [None]:
long_df2.where(df2.Country == 'Afghanistan').show()

+-----------+----+-----------+
|    Country|Year|        CO2|
+-----------+----+-----------+
|Afghanistan|1960|0.046059897|
|Afghanistan|1961|0.053604304|
|Afghanistan|1962|0.073764791|
|Afghanistan|1963|0.074232685|
|Afghanistan|1964|0.086292452|
|Afghanistan|1965|0.101467397|
|Afghanistan|1966|0.107636955|
|Afghanistan|1967|0.123734289|
|Afghanistan|1968| 0.11549774|
|Afghanistan|1969| 0.08682346|
|Afghanistan|1970|0.150290627|
|Afghanistan|1971|0.166042044|
|Afghanistan|1972| 0.13076385|
|Afghanistan|1973|0.136279785|
|Afghanistan|1974|0.155649444|
|Afghanistan|1975|0.168928649|
|Afghanistan|1976|0.154787206|
|Afghanistan|1977|0.182963616|
|Afghanistan|1978|0.163159571|
|Afghanistan|1979|0.168376671|
+-----------+----+-----------+
only showing top 20 rows



In [None]:
print((long_df2.count(), len(long_df2.columns)))
long_df2_1 = long_df2.filter(long_df2.CO2.isNotNull())
print((long_df2_1.count(), len(long_df2_1.columns)))
long_df2_1.show()

(15576, 3)
(12249, 3)
+-------+----+-----------+
|Country|Year|        CO2|
+-------+----+-----------+
|  Aruba|1986|2.868319392|
|  Aruba|1987|7.235198033|
|  Aruba|1988|10.02617921|
|  Aruba|1989| 10.6347326|
|  Aruba|1990|26.37450321|
|  Aruba|1991| 26.0461298|
|  Aruba|1992| 21.4425588|
|  Aruba|1993|22.00078616|
|  Aruba|1994|21.03624511|
|  Aruba|1995|20.77193616|
|  Aruba|1996|20.31835337|
|  Aruba|1997|20.42681771|
|  Aruba|1998|20.58766915|
|  Aruba|1999|20.31156677|
|  Aruba|2000|26.19487524|
|  Aruba|2001|25.93402441|
|  Aruba|2002|25.67116178|
|  Aruba|2003|26.42045209|
|  Aruba|2004|26.51729342|
|  Aruba|2005|27.20070778|
+-------+----+-----------+
only showing top 20 rows



In [None]:
long_df2_1.describe().show()

+-------+-----------+------------------+-----------------+
|summary|    Country|              Year|              CO2|
+-------+-----------+------------------+-----------------+
|  count|      12249|             12249|            12249|
|   mean|       null| 1988.365091027839|4.215330745598858|
| stddev|       null|15.877200914933532|6.913356573485818|
|    min|Afghanistan|              1960|     -0.020100465|
|    max|   Zimbabwe|              2014|      99.46300047|
+-------+-----------+------------------+-----------------+



In [None]:
# Prep tempreture data for merging


In [None]:
df1.show()

+----------+-------------------+-----------------------------+-------+
|        dt| AverageTemperature|AverageTemperatureUncertainty|Country|
+----------+-------------------+-----------------------------+-------+
|1743-11-01| 4.3839999999999995|                        2.294|  Åland|
|1743-12-01|               null|                         null|  Åland|
|1744-01-01|               null|                         null|  Åland|
|1744-02-01|               null|                         null|  Åland|
|1744-03-01|               null|                         null|  Åland|
|1744-04-01|               1.53|                         4.68|  Åland|
|1744-05-01|  6.702000000000001|                        1.789|  Åland|
|1744-06-01| 11.609000000000002|                        1.577|  Åland|
|1744-07-01|             15.342|                         1.41|  Åland|
|1744-08-01|               null|                         null|  Åland|
|1744-09-01|             11.702|                        1.517|  Åland|
|1744-

In [None]:
df1.withColumn('Year', year(df1['dt'])).show()

+----------+-------------------+-----------------------------+-------+----+
|        dt| AverageTemperature|AverageTemperatureUncertainty|Country|Year|
+----------+-------------------+-----------------------------+-------+----+
|1743-11-01| 4.3839999999999995|                        2.294|  Åland|1743|
|1743-12-01|               null|                         null|  Åland|1743|
|1744-01-01|               null|                         null|  Åland|1744|
|1744-02-01|               null|                         null|  Åland|1744|
|1744-03-01|               null|                         null|  Åland|1744|
|1744-04-01|               1.53|                         4.68|  Åland|1744|
|1744-05-01|  6.702000000000001|                        1.789|  Åland|1744|
|1744-06-01| 11.609000000000002|                        1.577|  Åland|1744|
|1744-07-01|             15.342|                         1.41|  Åland|1744|
|1744-08-01|               null|                         null|  Åland|1744|
|1744-09-01|

In [None]:
df1.withColumn('Year', year(df1['dt'])).show()

+----------+-------------------+-----------------------------+-------+----+
|        dt| AverageTemperature|AverageTemperatureUncertainty|Country|Year|
+----------+-------------------+-----------------------------+-------+----+
|1743-11-01| 4.3839999999999995|                        2.294|  Åland|1743|
|1743-12-01|               null|                         null|  Åland|1743|
|1744-01-01|               null|                         null|  Åland|1744|
|1744-02-01|               null|                         null|  Åland|1744|
|1744-03-01|               null|                         null|  Åland|1744|
|1744-04-01|               1.53|                         4.68|  Åland|1744|
|1744-05-01|  6.702000000000001|                        1.789|  Åland|1744|
|1744-06-01| 11.609000000000002|                        1.577|  Åland|1744|
|1744-07-01|             15.342|                         1.41|  Åland|1744|
|1744-08-01|               null|                         null|  Åland|1744|
|1744-09-01|

In [None]:
df1_1 = df1.withColumn('Year', year(df1['dt']))

In [None]:
print((df1_1.count(), len(df1_1.columns)))
df1_1 = df1_1.filter(df1_1.AverageTemperature.isNotNull())
print((df1_1.count(), len(df1_1.columns)))

(577462, 5)
(544811, 5)


In [None]:
df1_1.show()

+----------+-------------------+-----------------------------+-------+----+
|        dt| AverageTemperature|AverageTemperatureUncertainty|Country|Year|
+----------+-------------------+-----------------------------+-------+----+
|1743-11-01| 4.3839999999999995|                        2.294|  Åland|1743|
|1744-04-01|               1.53|                         4.68|  Åland|1744|
|1744-05-01|  6.702000000000001|                        1.789|  Åland|1744|
|1744-06-01| 11.609000000000002|                        1.577|  Åland|1744|
|1744-07-01|             15.342|                         1.41|  Åland|1744|
|1744-09-01|             11.702|                        1.517|  Åland|1744|
|1744-10-01|              5.477|                        1.862|  Åland|1744|
|1744-11-01|              3.407|                        1.425|  Åland|1744|
|1744-12-01|             -2.181|                        1.641|  Åland|1744|
|1745-01-01|              -3.85|                        1.841|  Åland|1745|
|1745-02-01|

In [None]:
df1_1 = df1_1.groupBy('Country', 'Year').agg({'AverageTemperature': 'avg'})

In [None]:
df1_1.show()

+-------------------+----+-----------------------+
|            Country|Year|avg(AverageTemperature)|
+-------------------+----+-----------------------+
|            Albania|1821|     12.237166666666667|
|            Albania|1943|     13.197250000000002|
|            Albania|1960|     13.335083333333332|
|            Andorra|1867|     11.277666666666667|
|            Andorra|1970|     11.376583333333334|
|             Angola|1926|     21.922666666666668|
|             Angola|1942|     21.892083333333332|
|             Angola|1979|               22.14475|
|             Angola|1987|     22.749916666666664|
|           Anguilla|1931|     26.970833333333335|
|           Anguilla|1998|      27.74083333333333|
|Antigua And Barbuda|1866|      26.05416666666667|
|Antigua And Barbuda|1875|                25.9395|
|          Argentina|2010|     15.078833333333336|
|              Aruba|1913|                27.3515|
|          Australia|1868|      21.44758333333333|
|            Austria|1941|     

### Merging two datasets (by Country & Year)

In [None]:
df_merged = long_df2_1.join(df1_1, (long_df2_1["Country"] == df1_1["Country"]) &
   (long_df2_1["Year"] == df1_1["Year"]))
df_merged.show()

+------------------+----+-----------+------------------+----+-----------------------+
|           Country|Year|        CO2|           Country|Year|avg(AverageTemperature)|
+------------------+----+-----------+------------------+----+-----------------------+
|           Albania|1960|1.258194928|           Albania|1960|     13.335083333333332|
|            Angola|1979|0.636944237|            Angola|1979|               22.14475|
|            Angola|1987|  0.5184278|            Angola|1987|     22.749916666666664|
|         Argentina|2010|4.558499612|         Argentina|2010|     15.078833333333336|
|        Azerbaijan|2012| 3.82487717|        Azerbaijan|2012|               13.08675|
|        Bangladesh|1987|0.120625234|        Bangladesh|1987|     25.642416666666666|
|          Botswana|1990|  1.9613582|          Botswana|1990|     22.944833333333335|
|            Brazil|1997|1.793828678|            Brazil|1997|     25.559416666666667|
|              Cuba|1969|2.015669739|              Cub

In [None]:
df_merged.describe().show()

+-------+-----------+------------------+-----------------+-----------+------------------+-----------------------+
|summary|    Country|              Year|              CO2|    Country|              Year|avg(AverageTemperature)|
+-------+-----------+------------------+-----------------+-----------+------------------+-----------------------+
|  count|       7844|              7844|             7844|       7844|              7844|                   7844|
|   mean|       null|1987.7497450280468|4.625203378869329|       null|1987.7497450280468|     19.577945866267207|
| stddev|       null|15.578953121957158|7.723208722989336|       null|15.578953121957158|      9.042534487332055|
|    min|Afghanistan|              1960|     -0.020100465|Afghanistan|              1960|     -20.44683333333333|
|    max|   Zimbabwe|              2013|      99.46300047|   Zimbabwe|              2013|     30.744749999999996|
+-------+-----------+------------------+-----------------+-----------+------------------

### Correlation between CO2 emissions and temperature change


In [None]:
df_merged = df_merged.withColumn("CO2", col("CO2").cast(FloatType()))

In [None]:
df_merged.stat.corr("CO2", "avg(AverageTemperature)")

-0.2344769651324569