# 1. Setting up your Spark instance

In [1]:
# Run below commands
!apt-get install openjdk-8-jdk-headless -qq > /dev/null #Install java
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz ## Install Apache Spark
!tar xf spark-3.4.1-bin-hadoop3.tgz
!pip install -q findspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=f31336abd79f323434aedbe92bfb0be76444406a2ab20f29d7cf88f37c079dc0
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
!pip install pyspark==3.4.1 # make sure spark and pyspark versions are the same

Collecting pyspark==3.4.1
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285388 sha256=551d12a2c2ff427afb56017ebe49b87f0d1d7a43621a6427f37641733c902dfe
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
  Attempting uninstall: pyspark
    Found existing installation: pyspark 3.5.1
    Uninstalling pyspark-3.5.1:
      Successfully uninstalled pyspark-3.5.1
Successfully installed pyspark-3.4.1


In [3]:
import os
from pyspark.sql import SparkSession
# Define Java and Spark home path in Google Colab
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from datetime import datetime, date, timedelta
from dateutil import relativedelta
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
from pyspark.sql.functions import to_timestamp, to_date
from pyspark.sql import functions as F
from pyspark.sql.functions import collect_list, collect_set, concat, first, array_distinct, col, size, expr
from pyspark.sql import DataFrame
from pyspark.sql.functions import max, col
from pyspark.sql import functions as F
from pyspark.sql.functions import corr
from pyspark.sql.window import Window
from functools import reduce
from pyspark.sql import DataFrame
import random

In [5]:
spark = SparkSession.builder \
    .appName("PySpark SQL") \
    .getOrCreate()

# Check if SparkContext is available
print(spark.sparkContext)

<SparkContext master=local[*] appName=PySpark SQL>


In [6]:
co2_emissions = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .csv('/content/drive/MyDrive/Colab Notebooks/CO2 emissions per capita per country.csv')

In [7]:
global_temp = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .csv("/content/drive/MyDrive/Colab Notebooks/GlobalLandTemperatures_GlobalLandTemperaturesByCountry.csv")

In [8]:
flightData2015 = spark\
    .read\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .csv("/content/drive/MyDrive/Colab Notebooks/2015-summary.csv")

In [None]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [None]:
flightData2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#46971 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#46971 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=35447]
      +- FileScan csv [DEST_COUNTRY_NAME#46969,ORIGIN_COUNTRY_NAME#46970,count#46971] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/drive/MyDrive/Colab Notebooks/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [None]:
spark.conf.set("spark.sql.shuffle.partitions", "5")
flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [None]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [None]:
sqlWay = spark.sql("""
  SELECT DEST_COUNTRY_NAME, count(1)
  FROM flight_data_2015
  GROUP BY DEST_COUNTRY_NAME
  """)

In [None]:
dataFrameWay = flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .count()

In [None]:
sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#46969], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#46969, 5), ENSURE_REQUIREMENTS, [plan_id=35469]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#46969], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#46969] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/drive/MyDrive/Colab Notebooks/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#46969], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#46969, 5), ENSURE_REQUIREMENTS, [plan_id=35482]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#46969], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#46969] Batched: false, DataFilters: [], Format: CSV

In [None]:
spark.sql("SELECT max(count) from flight_data_2015").take(1)

[Row(max(count)=370002)]

In [None]:
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [None]:
maxSql = spark.sql("""
  SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
  FROM flight_data_2015
  GROUP BY DEST_COUNTRY_NAME
  ORDER BY sum(count) DESC
  LIMIT 5
  """)
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [None]:
flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .sum("count")\
    .withColumnRenamed("sum(count)", "destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [None]:
flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .sum("count")\
    .withColumnRenamed("sum(count)", "destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#47073L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#46969,destination_total#47073L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#46969], functions=[sum(count#46971)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#46969, 5), ENSURE_REQUIREMENTS, [plan_id=35684]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#46969], functions=[partial_sum(count#46971)])
            +- FileScan csv [DEST_COUNTRY_NAME#46969,count#46971] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/drive/MyDrive/Colab Notebooks/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




# 2. Climate Change: Project Tasmania

### (1) a. Forwhichcountryandduringwhatyear,thehighestaveragetemperature was observed?

In [9]:
global_temp.show()

+----------+-------------------+-----------------------------+-------+
|        dt| AverageTemperature|AverageTemperatureUncertainty|Country|
+----------+-------------------+-----------------------------+-------+
|1743-11-01| 4.3839999999999995|                        2.294|  Åland|
|1743-12-01|               null|                         null|  Åland|
|1744-01-01|               null|                         null|  Åland|
|1744-02-01|               null|                         null|  Åland|
|1744-03-01|               null|                         null|  Åland|
|1744-04-01|               1.53|                         4.68|  Åland|
|1744-05-01|  6.702000000000001|                        1.789|  Åland|
|1744-06-01| 11.609000000000002|                        1.577|  Åland|
|1744-07-01|             15.342|                         1.41|  Åland|
|1744-08-01|               null|                         null|  Åland|
|1744-09-01|             11.702|                        1.517|  Åland|
|1744-

In [None]:
global_temp.createOrReplaceTempView("global_temperature")
result = spark.sql("""
SELECT Country, YEAR(dt) as Year, AverageTemperature
FROM global_temperature
WHERE AverageTemperature IS NOT NULL
ORDER BY AverageTemperature DESC
LIMIT 1
""")
result.show()

+-------+----+------------------+
|Country|Year|AverageTemperature|
+-------+----+------------------+
| Kuwait|2012| 38.84200000000001|
+-------+----+------------------+



The maximum average temperature was observed in 2012, at Kuwait

### b. Analyzethedatabycountryovertheyears,andnamewhicharethetop 10 countries with the biggest change in average temperature.

In [None]:
change_temp_query =spark.sql("""
SELECT Country,
       MAX(AverageTemperature) - MIN(AverageTemperature) AS temperature_change
FROM global_temperature
GROUP BY Country
ORDER BY temperature_change DESC
LIMIT 10
""")
change_temp_query.show()

+------------+------------------+
|     Country|temperature_change|
+------------+------------------+
|  Kazakhstan|            49.163|
|    Mongolia|48.157999999999994|
|      Russia|             47.47|
|      Canada|            43.532|
|  Uzbekistan|            42.698|
|Turkmenistan|            40.579|
|     Finland|            40.332|
|     Belarus|            39.338|
|     Ukraine|            39.021|
|     Estonia|38.882999999999996|
+------------+------------------+



### (2) a. Mergethetwodatasetsbycountry,andkeepthedatafrom1960to2014

In [10]:
co2_emissions.show()

+--------------------+------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+----+----+----+----+
|        Country Name|Country Code|       1960|       1961|       1962|       1963|       1964|       1965|       1966|       1967|       1968|       1969|       1970|       1971|       1972|       1973|       1974|       1975|       1976|       1977|       1978|       1979|       19

In [None]:
co2_emissions.createOrReplaceTempView("co2_emissions")

In [25]:

# List of columns to exclude, exclude 2014 since the dt doesn't have 2014
exclude_columns = ['2015', '2016', '2017', '2018','2014']

# Select all columns except the ones to exclude
selected_columns = [col for col in co2_emissions.columns if col not in exclude_columns]

# Now select these columns from co2_emissions DataFrame
co2_emissions_selected = co2_emissions.select(*selected_columns)

# Create or replace temp views with the selected columns
co2_emissions_selected.createOrReplaceTempView("co2_emissions_selected")
global_temp.createOrReplaceTempView("global_temperature")

# Now the 'merged_data_query' should work as the view 'co2_emissions_selected' exists
merged_data_query = """
SELECT
    sub.dt,
    sub.AverageTemperature,
    sub.AverageTemperatureUncertainty,
    sub.Country,
    ce.*
FROM (
    SELECT
        YEAR(dt) as dt,
        AVG(AverageTemperature) as AverageTemperature,
        AVG(AverageTemperatureUncertainty) as AverageTemperatureUncertainty,
        Country
    FROM global_temperature
    WHERE YEAR(dt) BETWEEN 1960 AND 2014
    GROUP BY YEAR(dt), Country
) as sub
JOIN co2_emissions_selected ce
ON sub.Country = ce.`Country Name`
ORDER BY dt ASC
"""

# Execute the query
merged_data = spark.sql(merged_data_query)

# Show the results
merged_data.show()


+----+-------------------+-----------------------------+------------------+------------------+------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|  dt| AverageTemperature|AverageTemperatureUncertainty|           Country|      Country Name|Country Code|       1960|       1961|       1962|       1963|       1964|       1965|       1966|       1967|       1968|       1969|       1970|     

### b. WhatisthecorrelationbetweenCO2emissionsandtemperaturechange?

I will make lists of both average temperature each year and the CO2 emission each year. So the corr function can work on it directly.

In [26]:
# between MAX and MIN 'AverageTemperature'
temp_diff_by_year = merged_data.groupBy('dt').agg(
    F.abs(F.max('AverageTemperature') - F.min('AverageTemperature')).alias('temp_diff')
)

# Order the results by 'dt'
temp_diff_by_year = temp_diff_by_year.orderBy('dt')

# Collect the results into a list
temp_diff_list = [(row['dt'], row['temp_diff']) for row in temp_diff_by_year.collect()]

In [22]:
temp_diff_list

[(1960, 46.165416666666665),
 (1961, 47.85983333333334),
 (1962, 46.14616666666667),
 (1963, 46.82333333333334),
 (1964, 47.218250000000005),
 (1965, 45.95291666666667),
 (1966, 47.66191666666667),
 (1967, 47.138916666666674),
 (1968, 47.3625),
 (1969, 47.548),
 (1970, 47.72741666666667),
 (1971, 48.319833333333335),
 (1972, 47.90708333333333),
 (1973, 48.38816666666666),
 (1974, 46.673),
 (1975, 48.22275),
 (1976, 46.741333333333344),
 (1977, 47.169666666666664),
 (1978, 47.51133333333333),
 (1979, 47.62083333333333),
 (1980, 47.02483333333333),
 (1981, 47.25316666666667),
 (1982, 48.037749999999996),
 (1983, 49.7235),
 (1984, 48.18366666666666),
 (1985, 46.363416666666666),
 (1986, 46.929583333333326),
 (1987, 48.357083333333335),
 (1988, 47.39533333333333),
 (1989, 48.16499999999999),
 (1990, 47.759),
 (1991, 47.66075000000001),
 (1992, 48.5835),
 (1993, 48.233666666666664),
 (1994, 47.704249999999995),
 (1995, 47.49925),
 (1996, 46.55291666666667),
 (1997, 47.26716666666667),
 (199

In [28]:
# Initialize a list to hold the average emissions for each year
average_emissions = []

years = [str(year) for year in range(1960, 2014)]

# Calculate the average for each year and add it to the list
for year in years:
    avg_col = F.avg(year).alias(f'avg_{year}')
    avg_emission = co2_emissions_selected.select(avg_col).collect()[0][f'avg_{year}']
    average_emissions.append((year, avg_emission))


In [None]:
average_emissions

[('1960', 2.0441783908177085),
 ('1961', 2.157479864538861),
 ('1962', 2.2488049222051276),
 ('1963', 2.7634205084285703),
 ('1964', 2.912680068142856),
 ('1965', 3.0316694686896533),
 ('1966', 3.0446996531970454),
 ('1967', 3.111186310413793),
 ('1968', 3.309332967596057),
 ('1969', 3.9191244998226593),
 ('1970', 4.197480374409761),
 ('1971', 4.421934022213595),
 ('1972', 4.488119374057692),
 ('1973', 4.805842462769229),
 ('1974', 4.499457508288458),
 ('1975', 4.366105995028844),
 ('1976', 4.3566088581490385),
 ('1977', 4.486631686408652),
 ('1978', 4.511017135326925),
 ('1979', 4.56301375180288),
 ('1980', 4.464373396841345),
 ('1981', 3.9935487841105752),
 ('1982', 3.872467809014423),
 ('1983', 3.726820254807689),
 ('1984', 3.8243866911634634),
 ('1985', 3.917695550951922),
 ('1986', 3.90544671704306),
 ('1987', 3.9426031941578925),
 ('1988', 4.077311975177032),
 ('1989', 4.213261544602872),
 ('1990', 4.082445945293023),
 ('1991', 4.12124666359447),
 ('1992', 4.479572514066392),
 ('

In [29]:
# Convert the lists into DataFrames
average_temp_df = spark.createDataFrame(temp_diff_list, schema=['year', 'temp_diff'])
average_emissions_df = spark.createDataFrame(average_emissions, schema=['year', 'average_emissions'])

# Join the DataFrames on the year column
merged_data = average_temp_df.join(average_emissions_df, "year")

# Calculate the Pearson correlation coefficient between average_temp and average_emissions
correlation = merged_data.stat.corr('temp_diff', 'average_emissions')
print(f"The correlation coefficient between average temperature and CO2 emissions is: {correlation}")

The correlation coefficient between average temperature and CO2 emissions is: -0.015756138801512273
