In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from datetime import datetime, date, timedelta
from dateutil import relativedelta
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
from pyspark.sql.functions import to_timestamp, to_date
from pyspark.sql import functions as F  
from pyspark.sql.functions import collect_list, collect_set, concat, first, array_distinct, col, size, expr
from pyspark.sql import DataFrame 
import random
import pandas as pd

In [2]:
spark = SparkSession\
        .builder\
        .getOrCreate()

In [5]:
tempData = spark.read.csv('gs://bax423hw2/GlobalLandTemperatures_GlobalLandTemperaturesByCountry.csv', header = True, inferSchema = True)

                                                                                

In [6]:
CO2Data = spark.read.csv('gs://bax423hw2/CO2 emissions per capita per country.csv', header = True, inferSchema = True)

In [7]:
tempData.show(10)

+----------+------------------+-----------------------------+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|Country|
+----------+------------------+-----------------------------+-------+
|1743-11-01|4.3839999999999995|                        2.294|  Åland|
|1743-12-01|              null|                         null|  Åland|
|1744-01-01|              null|                         null|  Åland|
|1744-02-01|              null|                         null|  Åland|
|1744-03-01|              null|                         null|  Åland|
|1744-04-01|              1.53|                         4.68|  Åland|
|1744-05-01| 6.702000000000001|                        1.789|  Åland|
|1744-06-01|11.609000000000002|                        1.577|  Åland|
|1744-07-01|            15.342|                         1.41|  Åland|
|1744-08-01|              null|                         null|  Åland|
+----------+------------------+-----------------------------+-------+
only showing top 10 

In [8]:
tempData.na.fill(value='0')

DataFrame[dt: string, AverageTemperature: double, AverageTemperatureUncertainty: double, Country: string]

In [9]:
tempData.createOrReplaceTempView("Temperature")

In [10]:
query_1 = spark.sql("""
SELECT 
  t1.Country,
  YEAR(t1.dt) AS Year_Of_Occurence,
  t1.AverageTemperature
FROM
  Temperature t1
WHERE
  AverageTemperature = (SELECT MAX(t2.AverageTemperature) FROM Temperature t2)
""")

query_1.show()

                                                                                

+-------+-----------------+------------------+
|Country|Year_Of_Occurence|AverageTemperature|
+-------+-----------------+------------------+
| Kuwait|             2012| 38.84200000000001|
+-------+-----------------+------------------+



In [11]:
query_2 = spark.sql("""
WITH cte1 AS (
SELECT 
  t1.Country,
  YEAR(t1.dt) AS Year_Of_Occurence,
  AVG(t1.AverageTemperature) as Yearly_Average 
FROM
  Temperature t1
GROUP BY t1.Country, YEAR(t1.dt)
ORDER BY t1.Country,Year_Of_Occurence),

cte2 AS (
SELECT 
  cte1.Country,
  MIN(Year_Of_Occurence) as min_year,
  MAX(Year_Of_Occurence) as max_year
FROM
  cte1
GROUP BY cte1.Country),

cte3 AS (
SELECT 
  cte1.Country,
  cte1.Year_Of_Occurence,
  cte1.Yearly_Average 
FROM
  cte1
  JOIN cte2 ON cte1.Country = cte2.Country 
                AND cte1.Year_Of_Occurence = cte2.min_year
),

cte4 AS (
SELECT 
  cte1.Country,
  cte1.Year_Of_Occurence,
  cte1.Yearly_Average 
FROM
  cte1
  JOIN cte2 ON cte1.Country = cte2.Country 
                AND cte1.Year_Of_Occurence = cte2.max_year
),

cte5 AS (
SELECT
  cte3.Country,
  cte3.Yearly_Average as Tempold,
  cte4.Yearly_Average as TempNew,
  ABS(cte4.Yearly_Average - cte3.Yearly_Average) as Absolute_Change_In_Temp
FROM
  cte3
  JOIN cte4 ON cte3.Country = cte4.Country
ORDER BY Absolute_Change_In_Temp DESC LIMIT 10
)

SELECT * FROM cte5

""")


query_2.show()

                                                                                

+----------+------------------+------------------+-----------------------+
|   Country|           Tempold|           TempNew|Absolute_Change_In_Temp|
+----------+------------------+------------------+-----------------------+
|    Kuwait|             12.02|         27.273375|     15.253375000000002|
|   Ukraine|             1.898|10.913499999999999|                 9.0155|
|Azerbaijan|             5.235|         14.173875|               8.938875|
|   Moldova|3.4150000000000005|           11.9605|      8.545499999999999|
|   Georgia|            2.3055|         10.686625|      8.381124999999999|
|     Syria|12.096000000000002|20.021124999999998|      7.925124999999996|
| Macedonia|             5.431|13.260124999999999|      7.829124999999999|
|   Romania|              3.89|           11.6455|                 7.7555|
|    Serbia|             5.151|         12.843625|               7.692625|
|   Finland|            -3.571| 4.064125000000001|               7.635125|
+----------+-------------

In [13]:
CO2Data.show(10)

22/04/15 06:38:34 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 25:>                                                         (0 + 1) / 1]

+--------------------+------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+----+----+----+----+
|        Country Name|Country Code|       1960|       1961|       1962|       1963|       1964|       1965|       1966|       1967|       1968|       1969|       1970|       1971|       1972|       1973|       1974|       1975|       1976|       1977|       1978|       1979|       19

                                                                                

In [14]:
CO2_df = CO2Data.toPandas()
CO2_df = pd.melt(CO2_df,id_vars=['Country Name', 'Country Code'], var_name='Year',value_name = 'Emission')
CO2Data = spark.createDataFrame(CO2_df)
CO2Data.show(10)

[Stage 27:>                                                         (0 + 1) / 1]

+--------------------+------------+----+-----------+
|        Country Name|Country Code|Year|   Emission|
+--------------------+------------+----+-----------+
|               Aruba|         ABW|1960|        NaN|
|         Afghanistan|         AFG|1960|0.046059897|
|              Angola|         AGO|1960|0.097471604|
|             Albania|         ALB|1960|1.258194928|
|             Andorra|         AND|1960|        NaN|
|          Arab World|         ARB|1960|0.643689011|
|United Arab Emirates|         ARE|1960|0.118757692|
|           Argentina|         ARG|1960|2.367473032|
|             Armenia|         ARM|1960|        NaN|
|      American Samoa|         ASM|1960|        NaN|
+--------------------+------------+----+-----------+
only showing top 10 rows



                                                                                

In [15]:
CO2Data = CO2Data.withColumnRenamed("Country Name","Country") \
    .withColumnRenamed("Country Code","Country_Code")
CO2Data.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Country_Code: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Emission: double (nullable = true)



In [16]:
CO2Data1 = CO2Data.na.drop("any")
CO2Data1.show()

+--------------------+------------+----+-----------+
|             Country|Country_Code|Year|   Emission|
+--------------------+------------+----+-----------+
|         Afghanistan|         AFG|1960|0.046059897|
|              Angola|         AGO|1960|0.097471604|
|             Albania|         ALB|1960|1.258194928|
|          Arab World|         ARB|1960|0.643689011|
|United Arab Emirates|         ARE|1960|0.118757692|
|           Argentina|         ARG|1960|2.367473032|
| Antigua and Barbuda|         ATG|1960|0.662642982|
|           Australia|         AUS|1960|8.582936643|
|             Austria|         AUT|1960|4.373318828|
|             Belgium|         BEL|1960|9.941594074|
|               Benin|         BEN|1960|0.066354063|
|        Burkina Faso|         BFA|1960|0.009111902|
|            Bulgaria|         BGR|1960|2.833901121|
|             Bahrain|         BHR|1960|3.544478443|
|        Bahamas, The|         BHS|1960|3.749762618|
|              Belize|         BLZ|1960|0.4779

In [17]:
CO2Data1.createOrReplaceTempView("CO2Data")

In [18]:
query_1.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Year_Of_Occurence: integer (nullable = true)
 |-- AverageTemperature: double (nullable = true)



In [19]:
CO2Data1.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Country_Code: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Emission: double (nullable = true)



In [20]:

query_3 = spark.sql("""
SELECT 
  t1.Country,
  YEAR(t1.dt) AS Year_Of_Occurence,
  AVG(t1.AverageTemperature) as Yearly_Average 
FROM
  Temperature t1
GROUP BY t1.Country, YEAR(t1.dt)
ORDER BY t1.Country,Year_Of_Occurence
""")

query_3.show(10)



+-----------+-----------------+------------------+
|    Country|Year_Of_Occurence|    Yearly_Average|
+-----------+-----------------+------------------+
|Afghanistan|             1838|18.379571428571428|
|Afghanistan|             1839|              null|
|Afghanistan|             1840|13.413454545454545|
|Afghanistan|             1841|           13.9976|
|Afghanistan|             1842| 15.15466666666667|
|Afghanistan|             1843|          13.75625|
|Afghanistan|             1844|13.148750000000001|
|Afghanistan|             1845|13.305833333333332|
|Afghanistan|             1846|14.030833333333332|
|Afghanistan|             1847|13.475583333333335|
+-----------+-----------------+------------------+
only showing top 10 rows



                                                                                

In [21]:
query_3.createOrReplaceTempView("Avg_Land_Temp")

In [22]:
query_4 = spark.sql("""
SELECT 
  *
FROM 
  Avg_Land_Temp a
  JOIN CO2Data c ON a.Country = c.Country
                    AND a.Year_Of_Occurence = c.Year
WHERE
  c.Year BETWEEN 1960 AND 2014
""")

query_4.show(10)

[Stage 35:>                                                         (0 + 1) / 1]

+--------------------+-----------------+------------------+--------------------+------------+----+-----------+
|             Country|Year_Of_Occurence|    Yearly_Average|             Country|Country_Code|Year|   Emission|
+--------------------+-----------------+------------------+--------------------+------------+----+-----------+
|         Afghanistan|             1960|13.985416666666667|         Afghanistan|         AFG|1960|0.046059897|
|              Angola|             1960|21.927083333333332|              Angola|         AGO|1960|0.097471604|
|             Albania|             1960|13.335083333333332|             Albania|         ALB|1960|1.258194928|
|United Arab Emirates|             1960|27.785666666666668|United Arab Emirates|         ARE|1960|0.118757692|
|           Argentina|             1960| 14.99366666666667|           Argentina|         ARG|1960|2.367473032|
|           Australia|             1960|21.210500000000003|           Australia|         AUS|1960|8.582936643|
|

                                                                                

In [23]:
query_4.stat.corr("Yearly_Average","Emission")



-0.2344769655945819