# Historic Global Temperatures 
### Data Engineering Capstone Project
By Kyle McMillan


### Project Summary <a name="summary"></a>
The aim of this project to create and data lake and ETL pipeline to store data related to historic city temperatures and atmospheric CO<sup>2</sup> PPM values. This data can be used to investigate global temperature increases.   

The project follows the follow steps:  
[Step 1: Scope the Project and Gathering Data](#step1)  
[Step 2: Exploring and Assessing the Data](#step2)  
[Step 3: Defining the Data Model](#step3)  
[Step 4: Running the Pipelines to Model the Data](#step4)  
[Step 5: Project Write Up](#step5)  

In [49]:
# Imports
import pandas as pd
from collections import Counter
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DateType, FloatType
from pyspark.sql.functions import udf, year, month, dayofmonth, hour, weekofyear, date_format, col, to_date
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [50]:
#Set the current date
today = datetime.now().date().strftime("%Y-%m-%d")

In [51]:
#Raw data input filepaths.
temp_file_name = 'temp_data/GlobalLandTemperaturesByCity.csv'
co2_file_name = 'co2_data/co2-ppm-daily_json.json'

#Statging parquet output filepaths.
temp_sparkdf_path = f"parquet_files/{today}-city_temp_staging.parquet"
co2_sparkdf_path = f"parquet_files/{today}-global_co2_staging.parquet"

#Final table parquet output filepaths.
fact_temp_table_path = f"parquet_files/{today}-fact_temp_table.parquet"
dim_city_table_path = f"parquet_files/{today}-dim_city_table.parquet"
dim_co2_table_path = f"parquet_files/{today}-dim_co2_table.parquet"
dim_date_table_path = f"parquet_files/{today}-dim_date_table.parquet"

### Step 1: Scope the Project and Gathering Data <a name="step1"></a>

#### Scope 
The scope of this project is to create an ETL pipeline for processing, cleaning and storing data. The data is global country and city historic temperatures, and global historic CO<sup>2</sup> PPM values.

The output of the ETL pipeline is the processed data stored in snowflake schema model saved to parquet files on the local system. 

Tools used: 
- python 
- pandas 
- pyspark


#### Describe and Gather Data 
This project contains datasets from two different sources. 

##### Earth temperature data
Source: [Kaggle dataset](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data#GlobalLandTemperaturesByCity.csv)  
filename: GlobalLandTemperaturesByCity.csv  
Type: CSV  
Size: 508.15 MB  
Shape: 8599212, 7  
Description: This data contains historic recorded temperatures from cities around the world on a daily basis. The dates start from the 18<sup>th</sup> century but not every city starts back to that point. Also, the easliest recordings are not on a daily basis. Very early dates are sporadic.   
Visualisation of the temperature data:  
![Temp visuals](img/global_temp.PNG)


##### Earth CO<sup>2</sup> levels data
Source: [CO2 PPM - Trends in Atmospheric Carbon Dioxide](https://datahub.io/core/co2-ppm-daily)  
filename: co2-ppm-daily_json.json  
Type: JSON  
Size: 1 MB  
Shape: 19413, 2  
Description: This data contains historic recorded CO<sup>2</sup> PPM levels recorded from a monitoring station located in Hawaii. The dates go back as far as 1958/3/30. The earlist dates are not on a daily basis, but most days have been recorded.  
Visualisation of the CO<sup>2</sup> PPM level data:  
![CO2 visuals](img/global_co2.PNG)

#### Historic global temperature data
Data viewed as a pandas dataframe.

In [52]:
# Read in the data from CSV file
temp_df = pd.read_csv(temp_file_name)
temp_df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


#### Global CO<sup>2</sup> PPM levels
Data viewed as a pandas dataframe.

In [53]:
# Read in the data from JSON
co2_df = pd.read_json(co2_file_name)
co2_df.head()

Unnamed: 0,date,value
0,1958-03-30,316.16
1,1958-03-31,316.4
2,1958-04-02,317.67
3,1958-04-03,317.76
4,1958-04-04,317.09


#### Load the data into Spark

In [54]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

#### Load and display the temperature data

In [55]:
#Set the schema for the temperature data.
temp_schema = StructType([StructField("datestamp", DateType(), False),
                            StructField("avg_temp", FloatType(), True),
                            StructField("avg_temp_uncert", FloatType(), True),
                            StructField("city", StringType(), False),
                            StructField("country", StringType(), False),
                            StructField("latitude", StringType(), False),
                            StructField("longitude", StringType(), False)])

#Load the CSV into spark with the schema.
temp_df_spark = spark.read.csv(temp_file_name, temp_schema, header=True)

In [56]:
#Check the schema.
temp_df_spark.printSchema()

root
 |-- datestamp: date (nullable = true)
 |-- avg_temp: float (nullable = true)
 |-- avg_temp_uncert: float (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



In [57]:
#Show the first 5 rows.
temp_df_spark.show(5, truncate=False)

+----------+--------+---------------+-----+-------+--------+---------+
|datestamp |avg_temp|avg_temp_uncert|city |country|latitude|longitude|
+----------+--------+---------------+-----+-------+--------+---------+
|1743-11-01|6.068   |1.737          |Århus|Denmark|57.05N  |10.33E   |
|1743-12-01|null    |null           |Århus|Denmark|57.05N  |10.33E   |
|1744-01-01|null    |null           |Århus|Denmark|57.05N  |10.33E   |
|1744-02-01|null    |null           |Århus|Denmark|57.05N  |10.33E   |
|1744-03-01|null    |null           |Århus|Denmark|57.05N  |10.33E   |
+----------+--------+---------------+-----+-------+--------+---------+
only showing top 5 rows



#### Load and display the temperature data

In [58]:
# co2_schema = StructType([StructField("datestamp", StringType(), False), 
#                          StructField("co2_ppm", StringType(), False),])

#Load the JSON file into spark.
co2_df_spark =spark.read.json(co2_file_name)

In [59]:
#Check the schema.
co2_df_spark.printSchema()

root
 |-- date: string (nullable = true)
 |-- value: string (nullable = true)



In [60]:
#Show the first 5 rows.
co2_df_spark.show(5, truncate=False)

+----------+------+
|date      |value |
+----------+------+
|1958-03-30|316.16|
|1958-03-31|316.40|
|1958-04-02|317.67|
|1958-04-03|317.76|
|1958-04-04|317.09|
+----------+------+
only showing top 5 rows



#### Write parquet files

In [61]:
# Write city temperature data to parquet file.
temp_df_spark.write.parquet(temp_sparkdf_path, mode="overwrite")
print(f"filepath: {temp_sparkdf_path}")

# Read the parquet file back into a Spark dataframe.
temp_df_spark = spark.read.parquet(temp_sparkdf_path)

filepath: parquet_files/2020-02-26-city_temp_staging.parquet


In [62]:
# Write global co2 data to parquet file.
co2_df_spark.write.parquet(co2_sparkdf_path, mode="overwrite")
print(f"filepath: {co2_sparkdf_path}")

# Read the parquet file back into a Spark dataframe.
co2_df_spark = spark.read.parquet(co2_sparkdf_path)

filepath: parquet_files/2020-02-26-global_co2_staging.parquet


### Step 2: Exploring and Assessing the Data <a name="step2"></a>




#### Exploring the Data 

Quality issues
* Staging global temperature data: 
    * Some temperature values have null.
  
* Staging CO<sup>2</sup> PPM data:
    * No quality issues.

#### Cleaning the Data

Steps to clean the data.
* Staging global temperature data:
    * null values to be dropped.

* Staging CO<sup>2</sup> PPM data:
    * No quality issues

In [63]:
# Remove null values from the global temperature data.
temp_df_spark_clean = temp_df_spark.where(col("avg_temp").isNotNull())

### Step 3: Defining the Data Model <a name="step3"></a>
#### 3.1 Conceptual Data Model
The model that was chosen for this project is a snowflake model. This was chosen due to the nature of the data, where the main temperature values are listed by date and city where as the CO<sup>2</sup> values are listed only by date.   
The model consists of the following:  
- Fact table:
    - fact_temp_table
- Dimension tables:
    - dim_city_table
    - dim_date_table
    - dim_co2_table

Schema visulisation  
![Snowflake schema](img/tables.PNG)

#### 3.2 Mapping Out Data Pipelines

1. ETL script gathers the input data; Temperature CSV and CO2 JSON files.
1. The raw sourced data is read into Spark dataframes and stored locally as parquet staging files.
1. Each staging parquet file is read back into Spark dataframes and cleaned as necessay.
1. Data is extracted and used to create the output tables.
1. Quality checks are performed
1. The final tables are written as parquet files to the local filesystem. 


### Step 4: Running the Pipelines to Model the Data <a name="step4"></a>
#### 4.1 Create the data model
Building the data pipelines to create the data model.

In [64]:
#Extract columns to create the temperature table
fact_temp_table = temp_df_spark_clean.select('datestamp', 'avg_temp', 'avg_temp_uncert','city', 'country')

In [65]:
#Check the schema.
fact_temp_table.printSchema()

root
 |-- datestamp: date (nullable = true)
 |-- avg_temp: float (nullable = true)
 |-- avg_temp_uncert: float (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)



In [66]:
#Show the first 5 rows.
fact_temp_table.show(5, truncate=False)

+----------+--------+---------------+--------+-------+
|datestamp |avg_temp|avg_temp_uncert|city    |country|
+----------+--------+---------------+--------+-------+
|1907-07-01|14.739  |0.624          |Edmonton|Canada |
|1907-08-01|12.001  |0.603          |Edmonton|Canada |
|1907-09-01|8.319   |0.324          |Edmonton|Canada |
|1907-10-01|6.339   |0.77           |Edmonton|Canada |
|1907-11-01|-1.668  |0.787          |Edmonton|Canada |
+----------+--------+---------------+--------+-------+
only showing top 5 rows



In [67]:
#Extract columns to create the city table and only keep one row for each city.
dim_city_table = temp_df_spark_clean.select('city', 'country', 'longitude','latitude').dropDuplicates()

In [68]:
#Check the schema.
dim_city_table.printSchema()

root
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)



In [69]:
#Show the first 5 rows.
dim_city_table.show(5, truncate=False)

+------------+-------+---------+--------+
|city        |country|longitude|latitude|
+------------+-------+---------+--------+
|Erzincan    |Turkey |39.54E   |39.38N  |
|Khorramshahr|Iran   |48.00E   |29.74N  |
|Korla       |China  |85.21E   |40.99N  |
|Lasa        |China  |90.46E   |29.74N  |
|Lille       |France |3.80E    |50.63N  |
+------------+-------+---------+--------+
only showing top 5 rows



In [70]:
#Extract columns to create the co2 table
dim_co2_table = co2_df_spark.withColumn('datestamp', to_date(col("date"),'yyyy-MM-dd')) \
            .select('datestamp', co2_df_spark.value.cast('float').alias('co2_ppm'))

In [71]:
#Check the schema.
dim_co2_table.printSchema()

root
 |-- datestamp: date (nullable = true)
 |-- co2_ppm: float (nullable = true)



In [72]:
#Show the first 5 rows.
dim_co2_table.show(5, truncate=False)

+----------+-------+
|datestamp |co2_ppm|
+----------+-------+
|1958-03-30|316.16 |
|1958-03-31|316.4  |
|1958-04-02|317.67 |
|1958-04-03|317.76 |
|1958-04-04|317.09 |
+----------+-------+
only showing top 5 rows



In [73]:
#Create date based information from the original datestamp column.
get_weekday = udf(lambda x: datetime.strptime(str(x), '%Y-%m-%d').weekday())

#Extract columns to create the date table
dim_date_table = temp_df_spark_clean.select('datestamp',
                                           year(temp_df_spark_clean.datestamp).alias('year'), 
                                           month(temp_df_spark_clean.datestamp).alias('month'), 
                                           dayofmonth(temp_df_spark_clean.datestamp).alias('day'),  
                                           weekofyear(temp_df_spark_clean.datestamp).alias('week')) \
                                           .withColumn('weekday', get_weekday(temp_df_spark_clean.datestamp).cast('int'))

In [74]:
#Check the schema.
dim_date_table.printSchema()

root
 |-- datestamp: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [75]:
#Show the first 5 rows.
dim_date_table.show(5, truncate=False)

+----------+----+-----+---+----+-------+
|datestamp |year|month|day|week|weekday|
+----------+----+-----+---+----+-------+
|1907-07-01|1907|7    |1  |27  |0      |
|1907-08-01|1907|8    |1  |31  |3      |
|1907-09-01|1907|9    |1  |35  |6      |
|1907-10-01|1907|10   |1  |40  |1      |
|1907-11-01|1907|11   |1  |44  |4      |
+----------+----+-----+---+----+-------+
only showing top 5 rows



Writing the four parquet table files 

In [76]:
fact_temp_table.write.parquet(fact_temp_table_path, partitionBy=['country', 'city'], mode="overwrite")
print(f"Data written to: {fact_temp_table_path}")
dim_city_table.write.parquet(dim_city_table_path, partitionBy=['country', 'city'], mode="overwrite")
print(f"Data written to: {dim_city_table_path}")
dim_co2_table.write.parquet(dim_co2_table_path, mode="overwrite")
print(f"Data written to: {dim_co2_table_path}")
dim_date_table.write.parquet(dim_date_table_path, mode="overwrite")
print(f"Data written to: {dim_date_table_path}")

Data written to: parquet_files/2020-02-26-fact_temp_table.parquet
Data written to: parquet_files/2020-02-26-dim_city_table.parquet
Data written to: parquet_files/2020-02-26-dim_co2_table.parquet
Data written to: parquet_files/2020-02-26-dim_date_table.parquet


#### 4.2 Data Quality Checks
Quality checks for this project are:
- Insure that all tables have data in them.
- Check that important values within the tables have no null values.
- Confirm the data types are correct for each tables' columns.

In [77]:
#Check to see if the tables are empty.
print (f"Number of rows in fact_temp_table: {fact_temp_table.count()}")
print (f"Number of rows in dim_city_table: {dim_city_table.count()}")
print (f"Number of rows in dim_co2_table: {dim_co2_table.count()}")
print (f"Number of rows in dim_date_table: {dim_date_table.count()}")

Number of rows in fact_temp_table: 8235082
Number of rows in dim_city_table: 3510
Number of rows in dim_co2_table: 19414
Number of rows in dim_date_table: 8235082


In [78]:
#Check for null values in certian columns in the tables
print (f"Number of rows in fact_temp_table where avg_temp column has null values: {fact_temp_table.where(col('avg_temp').isNull()).count()}")
print (f"Number of rows in dim_city_table where city column has null values: {dim_city_table.where(col('city').isNull()).count()}")
print (f"Number of rows in dim_city_table where country column has null values: {dim_city_table.where(col('country').isNull()).count()}")
print (f"Number of rows in dim_co2_table where co2_ppm column has null values: {dim_co2_table.where(col('co2_ppm').isNull()).count()}")
print (f"Number of rows in dim_date_table where datestamp column has null values: {dim_date_table.where(col('datestamp').isNull()).count()}")

Number of rows in fact_temp_table where avg_temp column has null values: 0
Number of rows in dim_city_table where city column has null values: 0
Number of rows in dim_city_table where country column has null values: 0
Number of rows in dim_co2_table where co2_ppm column has null values: 0
Number of rows in dim_date_table where datestamp column has null values: 0


In [79]:
#Check the data types of each table
print (f"Data types in the fact_temp_table: {fact_temp_table.dtypes}")
print (f"Data types in the dim_city_table: {dim_city_table.dtypes}")
print (f"Data types in the dim_co2_table: {dim_co2_table.dtypes}")
print (f"Data types in the dim_date_table: {dim_date_table.dtypes}")

Data types in the fact_temp_table: [('datestamp', 'date'), ('avg_temp', 'float'), ('avg_temp_uncert', 'float'), ('city', 'string'), ('country', 'string')]
Data types in the dim_city_table: [('city', 'string'), ('country', 'string'), ('longitude', 'string'), ('latitude', 'string')]
Data types in the dim_co2_table: [('datestamp', 'date'), ('co2_ppm', 'float')]
Data types in the dim_date_table: [('datestamp', 'date'), ('year', 'int'), ('month', 'int'), ('day', 'int'), ('week', 'int'), ('weekday', 'int')]


#### 4.3 Data dictionary 
Located in a seperate file "Data Dictionary.md" with this project.

#### Step 5: Project Write Up <a name="step5"></a>

##### Choice of tools:
Python and Spark were chosen for this project, because they are both tools that can scale easily if needed.  

As the data used for this project is not very large, all processing was done on a local machine without out connecting to a cluster service such as AWS. Though it would be easy to adjust the original Spark setup to run via a cluster if needed or if this project was to scale up.

##### Data update frequency
As this processes daily temperature data, the best time for updating would be at the end of the day UTC+12. This would allow all countries to gather the data for that day in question before being pushed to a database.

##### Other scenarios
##### - Data increased by 100x
If the data for this project was increased 100x then I would recommend it to be run on an AWS Spark cluster for the processing and using AWS S3 and redshift for the staging and final tables respectively. All of these systems are designed to be scalable up to terabytes of data.

##### - Daily updates at 7am
This is possible. Before 7am the previous day's measurements would be gathered and pushed to a data lake or data warehouse. A dashboard such as Tableau can be connected to the data lake or data warehouse.

##### - 100+ people to access the database.
To achieve this the tables will need to be hosted in a service like AWS Redshift or via a company owned database system. If there a specific types of queries that are being run, then the tables could be changed or added to incorporate these specific queries.