In [3]:
# Run below commands
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

In [8]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from datetime import datetime, date, timedelta
from dateutil import relativedelta
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
from pyspark.sql.functions import to_timestamp, to_date
from pyspark.sql import functions as F  
from pyspark.sql.functions import collect_list, collect_set, concat, first, array_distinct, col, size, expr
from pyspark.sql import DataFrame 
import random
import pandas as pd

In [9]:
spark = SparkSession\
        .builder\
        .getOrCreate()

In [7]:
Flight2015 = spark.read.option('header','true').csv('/content/drive/MyDrive/423projects/2015-summary.csv')

In [8]:
Flight2015.show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
+-----------------+-------------------+-----+
only showing top 3 rows



In [9]:
flightData2015 = spark\
    .read\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .csv("/content/drive/MyDrive/423projects/2015-summary.csv")

In [10]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [11]:
#Sorting
flightData2015.sort("count").explain()

== Physical Plan ==
*(1) Sort [count#94 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#94 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#82]
   +- FileScan csv [DEST_COUNTRY_NAME#92,ORIGIN_COUNTRY_NAME#93,count#94] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/content/drive/MyDrive/423projects/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [13]:
#Shuffle
spark.conf.set("spark.sql.shuffle.partitions", "5")
flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [14]:
# making data frame into a table
flightData2015.createOrReplaceTempView("flight_data_2015")

In [17]:
# in Python
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

dataFrameWay = flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.count()

sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#92], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#92, 5), ENSURE_REQUIREMENTS, [id=#111]
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#92], functions=[partial_count(1)])
      +- FileScan csv [DEST_COUNTRY_NAME#92] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/content/drive/MyDrive/423projects/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#92], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#92, 5), ENSURE_REQUIREMENTS, [id=#130]
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#92], functions=[partial_count(1)])
      +- FileScan csv [DEST_COUNTRY_NAME#92] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/content/drive/MyDrive/423projects/2015-summary.csv], PartitionFilters: [], PushedFilters:

In [18]:
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [51]:
#find the top five destination countries in the data.

maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [20]:

flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.explain()


== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[destination_total#151L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#92,destination_total#151L])
+- *(2) HashAggregate(keys=[DEST_COUNTRY_NAME#92], functions=[sum(cast(count#94 as bigint))])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#92, 5), ENSURE_REQUIREMENTS, [id=#205]
      +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#92], functions=[partial_sum(cast(count#94 as bigint))])
         +- FileScan csv [DEST_COUNTRY_NAME#92,count#94] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/content/drive/MyDrive/423projects/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




In [10]:
### QUESTION 2
tempdata = spark\
    .read\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .csv("/content/drive/MyDrive/423projects/GlobalLandTemperatures_GlobalLandTemperaturesByCountry.csv")

In [11]:
tempdata.show(6)

+----------+------------------+-----------------------------+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|Country|
+----------+------------------+-----------------------------+-------+
|1743-11-01|4.3839999999999995|                        2.294|  Åland|
|1743-12-01|              null|                         null|  Åland|
|1744-01-01|              null|                         null|  Åland|
|1744-02-01|              null|                         null|  Åland|
|1744-03-01|              null|                         null|  Åland|
|1744-04-01|              1.53|                         4.68|  Åland|
+----------+------------------+-----------------------------+-------+
only showing top 6 rows



In [47]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [51]:
# Make the DataFrame into a table
tempdata.createOrReplaceTempView("temp_data")

In [14]:
#Q2.a
#find the top five destination countries in the data.
tempdata.sort("AverageTemperature").take(3)

[Row(dt='1743-12-01', AverageTemperature=None, AverageTemperatureUncertainty=None, Country='Montenegro'),
 Row(dt='1744-02-01', AverageTemperature=None, AverageTemperatureUncertainty=None, Country='Montenegro'),
 Row(dt='1744-01-01', AverageTemperature=None, AverageTemperatureUncertainty=None, Country='Montenegro')]

In [77]:
df_temp = tempdata.groupBy("Country", (year("dt").alias('Year')))\
    .avg("AverageTemperature")\
    .withColumnRenamed("avg(AverageTemperature)", "AVGTEMP")\
    .sort(desc("AVGTEMP"))
df_temp.limit(1)\
        .show()

+--------+----+------------------+
| Country|Year|           AVGTEMP|
+--------+----+------------------+
|Djibouti|2013|30.744749999999996|
+--------+----+------------------+



In [103]:
tempSql2 = spark.sql("""
 SELECT year(dt) , avg(AverageTemperature) as AVGTEMP , Country
 FROM temp_data
 GROUP BY year(dt), Country
 ORDER BY avg(AverageTemperature) DESC
 LIMIT 1
  """)
tempSql2.show()

+----------------------+------------------+--------+
|year(CAST(dt AS DATE))|           AVGTEMP| Country|
+----------------------+------------------+--------+
|                  2013|30.744749999999996|Djibouti|
+----------------------+------------------+--------+



In [23]:
tempavg = tempdata.groupBy("Country", year("dt").alias("year"))\
          .agg(avg("AverageTemperature").alias("AvgTemp"))\
          .sort(desc("AvgTemp"))


In [30]:
tempdiff = tempavg.groupBy("Country")\
            .agg((max("AvgTemp")-min("AvgTemp")).alias("difference"))\
            .sort(desc("difference"))\
            .limit(10)\
            .show() 

+-------------+------------------+
|      Country|        difference|
+-------------+------------------+
|       Canada|23.408714285714286|
|       Russia|19.713666666666665|
|   Kazakhstan|          17.72625|
|North America|17.628666666666668|
|       Kuwait| 15.61866666666667|
|   Uzbekistan|          15.34025|
|United States|15.005666666666666|
| Turkmenistan|14.920500000000002|
|      Algeria| 14.87266666666666|
|       Jordan|14.373499999999998|
+-------------+------------------+



In [31]:
#CO2 Emissions dataset
co2data = spark\
    .read\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .csv("/content/drive/MyDrive/423projects/CO2 emissions per capita per country.csv")

In [32]:
co2data.show(6)

+------------+------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+----+----+----+----+
|Country Name|Country Code|       1960|       1961|       1962|       1963|       1964|       1965|       1966|       1967|       1968|       1969|       1970|       1971|       1972|       1973|       1974|       1975|       1976|       1977|       1978|       1979|       1980|       1981| 

In [37]:
# Melt the CO2 dataframe
from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame
from typing import Iterable 

def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name)) 
        for c in value_vars))

    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

In [38]:
#df2.createOrReplaceTempView("df2")
year_range = [str(i) for i in range(1960, 2015)]
df2_melt = melt(co2data, id_vars=['Country Name','Country Code'], value_vars=year_range)
df2_melt.take(30)

[Row(Country Name='Aruba', Country Code='ABW', variable='1960', value=None),
 Row(Country Name='Aruba', Country Code='ABW', variable='1961', value=None),
 Row(Country Name='Aruba', Country Code='ABW', variable='1962', value=None),
 Row(Country Name='Aruba', Country Code='ABW', variable='1963', value=None),
 Row(Country Name='Aruba', Country Code='ABW', variable='1964', value=None),
 Row(Country Name='Aruba', Country Code='ABW', variable='1965', value=None),
 Row(Country Name='Aruba', Country Code='ABW', variable='1966', value=None),
 Row(Country Name='Aruba', Country Code='ABW', variable='1967', value=None),
 Row(Country Name='Aruba', Country Code='ABW', variable='1968', value=None),
 Row(Country Name='Aruba', Country Code='ABW', variable='1969', value=None),
 Row(Country Name='Aruba', Country Code='ABW', variable='1970', value=None),
 Row(Country Name='Aruba', Country Code='ABW', variable='1971', value=None),
 Row(Country Name='Aruba', Country Code='ABW', variable='1972', value=None),

In [78]:
df_merge = df2_melt.join(df_temp,
               (df2_melt['Country Name'] == df_temp['Country']) & 
               (df2_melt['Variable'] == df_temp['Year']),
               "inner")
df_merge.show()

+------------+------------+--------+-----------+-----------+----+------------------+
|Country Name|Country Code|variable|      value|    Country|Year|           AVGTEMP|
+------------+------------+--------+-----------+-----------+----+------------------+
| Afghanistan|         AFG|    1971|0.166042044|Afghanistan|1971|14.823500000000001|
| Afghanistan|         AFG|    1977|0.182963616|Afghanistan|1977|14.805416666666668|
| Afghanistan|         AFG|    1979|0.168376671|Afghanistan|1979|14.262083333333335|
| Afghanistan|         AFG|    1980|0.132858608|Afghanistan|1980|14.887333333333332|
| Afghanistan|         AFG|    1984|0.234987713|Afghanistan|1984|14.245833333333332|
| Afghanistan|         AFG|    1985|0.297827727|Afghanistan|1985|14.888749999999996|
| Afghanistan|         AFG|    1990|0.213449805|Afghanistan|1990|14.993333333333332|
| Afghanistan|         AFG|    1994| 0.08003917|Afghanistan|1994|          14.75475|
| Afghanistan|         AFG|    1996|0.066044698|Afghanistan|1996|

In [80]:
df_merge.stat.corr("value","AVGTEMP")

-0.16909003112369522