### EDA on World Temperature Data
- month: month
- day: day
- city: city
- state: state
- state_code: US state code
- city_state_code: a combined field that is consist of city and US state code
- avg_temperature: daily average temperature across each US city area

In [1]:
import os
import datetime
import pandas as pd
pd.set_option('display.max_columns', 50)
pd.set_option('display.max_colwidth', 1028)
import configparser

import boto3

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType, LongType, MapType

In [2]:
# config = configparser.ConfigParser()
# config.read('/home/workspace/capstone.cfg')

['/home/workspace/capstone.cfg']

In [3]:
# os.environ["AWS_ACCESS_KEY_ID"] = config['AWS']['AWS_ACCESS_KEY_ID']
# os.environ["AWS_SECRET_ACCESS_KEY"] = config['AWS']['AWS_SECRET_ACCESS_KEY']
# input_data_dir = config['S3']['INPUT_S3_BUCKET']

In [4]:
# Init a SparkSession object
spark = SparkSession \
    .builder \
    .config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:2.7.0') \
    .getOrCreate()

In [5]:
world_temperature_spark_df =spark.read \
    .format('csv') \
    .options(header='true', inferSchema='true', encoding="ISO-8859-1") \
    .load(f'{input_data_dir}/GlobalLandTemperaturesByCity.csv')

In [6]:
# world_temperature_spark_df.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [7]:
# world_temperature_spark_df.agg(F.max('dt'),F.min('dt')).show()

+-------------------+-------------------+
|            max(dt)|            min(dt)|
+-------------------+-------------------+
|2013-09-01 00:00:00|1743-11-01 00:00:00|
+-------------------+-------------------+



In [8]:
# world_temperature_spark_df.limit(2).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Ãrhus,Denmark,57.05N,10.33E
1,1743-12-01,,,Ãrhus,Denmark,57.05N,10.33E


In [9]:
world_temperature_spark_df = world_temperature_spark_df.filter(F.col('dt') >= datetime.datetime(2000,1,1))

In [10]:
world_temperature_spark_df = world_temperature_spark_df \
    .withColumn('month', F.month(F.col('dt'))) \
    .withColumn('day', F.dayofmonth(F.col('dt'))) \
    .drop(F.col('dt'))

In [11]:
# world_temperature_spark_df.limit(2).toPandas()

Unnamed: 0,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,month,day
0,3.065,0.372,Ãrhus,Denmark,57.05N,10.33E,1,1
1,3.724,0.241,Ãrhus,Denmark,57.05N,10.33E,2,1


In [12]:
avg_world_temperature_spark_df = world_temperature_spark_df \
    .groupBy(['month', 'day', 'City', 'Country', 'Latitude', 'Longitude']) \
    .agg(F.mean('AverageTemperature')) \
    .withColumnRenamed('avg(AverageTemperature)', 'AverageTemperature') \
    .withColumn('month', F.col('month').cast('integer')) \
    .withColumn('day', F.col('day').cast('integer'))

In [13]:
# avg_world_temperature_spark_df.limit(2).toPandas()

Unnamed: 0,month,day,City,Country,Latitude,Longitude,AverageTemperature
0,5,1,Adilabad,India,20.09N,78.48E,35.639643
1,9,1,Agadir,Morocco,29.74N,9.23W,22.426692


In [14]:
#  avg_world_temperature_spark_df = avg_world_temperature_spark_df \
#         .select('Country', 'City','month','day', 'Latitude','Longitude', 'AverageTemperature')\
#         .sort(F.col('Country'), F.col('City'), F.col('month'))

In [15]:
# avg_world_temperature_spark_df.limit(25).toPandas()

Unnamed: 0,Country,City,month,day,Latitude,Longitude,AverageTemperature
0,Afghanistan,Baglan,1,1,36.17N,69.61E,-1.9105
1,Afghanistan,Baglan,2,1,36.17N,69.61E,0.461214
2,Afghanistan,Baglan,3,1,36.17N,69.61E,6.821286
3,Afghanistan,Baglan,4,1,36.17N,69.61E,12.45
4,Afghanistan,Baglan,5,1,36.17N,69.61E,18.218
5,Afghanistan,Baglan,6,1,36.17N,69.61E,22.8405
6,Afghanistan,Baglan,7,1,36.17N,69.61E,24.817571
7,Afghanistan,Baglan,8,1,36.17N,69.61E,23.532214
8,Afghanistan,Baglan,9,1,36.17N,69.61E,18.804846
9,Afghanistan,Baglan,10,1,36.17N,69.61E,12.479462


In [16]:
spark.stop()