# Project Title
### Data Engineering Capstone Project

#### Project Summary
The temperature of the Earth is increasing with the years and find some conclusions using the data collected along of decades maybe can help Governments and scientist to take better decisions.

When talk about data analyzes, is important to have easy datasets to start the job. In this project, the goal is put all data in a star schema, which is fast to do joins and also flexible to complement the fact table with any dimension table available. Furthermore, the idea here is find the maximum number of footprint score possible to the cities present in GlobalLandTemperaturesByCity.csv processing every single rows present in S3 bucket using Spark and return to S3 in parquet format.  

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, dayofmonth
from pyspark.sql.functions import monotonically_increasing_id, col, lit
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, FloatType, DateType

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
5,application_1591013111460_0006,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 1: Scope the Project and Gather Data

#### Scope

The final table of the project need be like the schema below.

![Goal](./img/goal.png)

#### Describe and Gather Data


##### Global Land Temperature
The global land temperature dataset can be found at [Kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) and in this project was used the GlobalLandTemperaturesByCity.csv file. Some point about the data is important to be  highlighted.

* The technicians collected the data using mercury thermometers, this method is sensitive by the time along the day that is measurements. 

* Many of the weather stations needed to be constructed in another area because the airports could disturb the measurements in 1940s.

* In 1980s the instrumental measurements were changed to electronic thermometers because they have a cooling bias.

Columns:

* dt 
* AverageTemperature
* AverageTemperatureUncertainty
* City
* Country
* Latitude
* Longitude

##### National Footprint
The second dataset can be found at [Kaggle](https://www.kaggle.com/kingburrito666/national-footprint-accounts) too, and is about National Footprint that measures the ecological resource use and capacity of nations. The file used was NFA_2017_Edition.csv

Columns:
* Country
* Year
* Country Code
* Record-type
* The Ecological Footprint of cropland demand
* The Ecological Footprint of grazing land demand
* The Ecological Footprint of forest land demand
* The Ecological Footprint of fishing land demand
* The Ecological Footprint of built-up land demand
* The Ecological Footprint of carbon land demand
* The total Ecological Footprint of demand(sum of all land types)
* Data quality score

In [None]:
import configparser
import os

config = configparser.ConfigParser()
config.read('aws.cfg')
os.environ['AWS_ACCESS_KEY_ID']=config['CREDENTIALS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['CREDENTIALS']['AWS_SECRET_ACCESS_KEY']
AWS_ACCESS_KEY=config['CREDENTIALS']['AWS_ACCESS_KEY_ID']
AWS_SECRET_KEY=config['CREDENTIALS']['AWS_SECRET_ACCESS_KEY']

In [None]:
spark = SparkSession \
            .builder \
            .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
            .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") \
            .config("spark.hadoop.fs.s3a.awsAccessKeyId", os.environ['AWS_ACCESS_KEY_ID']) \
            .config("spark.hadoop.fs.s3a.awsSecretAccessKey", os.environ['AWS_SECRET_ACCESS_KEY']) \
            .getOrCreate()

##### Import datasets

In [2]:
global_temp_df = spark\
                .read\
                .option('header', 'true')\
                .csv('s3a://capstone-project1/GlobalLandTemperaturesByCity.csv')
global_temp_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows

In [3]:
foot_print = spark\
            .read\
            .option('header', 'true')\
            .csv('s3a://capstone-project1/NFA2017Edition.csv')
foot_print.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----+------------+------------+-----------------+-----------------+--------------------+-------------------+------------------+----------------+-----------------+------+
|country|year|country_code|      record|        crop_land|     grazing_land|         forest_land|     fishing_ground|     built_up_land|          carbon|            total|QScore|
+-------+----+------------+------------+-----------------+-----------------+--------------------+-------------------+------------------+----------------+-----------------+------+
|Armenia|1992|           1|  AreaPerCap|0.140020292796057|0.199159298449051|  0.0969995651543702| 0.0368169299898536|0.0292578643431679|               0|  0.5022539507325|     5|
|Armenia|1992|           1|   AreaTotHA|           483000|           687000|              334600|             127000|  100925.003051758|               0| 1732525.00305176|     5|
|Armenia|1992|           1|BiocapPerCap|0.276531391460875| 0.13489158801002|  0.0838392733311525| 0.01370

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

1. global_temp_df dataset has 364130 null or Nan values in AverageTemperature and AverageTemperatureUncertainty columns
2. foot_print dataset doesn't have null or nan values
3. both dataset don't have duplicate values

#### Cleaning Steps

1. Rows that have null and NaN values in AverageTemperature and AverageTemperatureUncertainty columns was removed

##### Verify the number of rows

In [4]:
num_rows_temp = global_temp_df.count()
num_rows_foot = foot_print.count()

print(num_rows_temp)
print(num_rows_foot)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8599212
99456

##### Verify all many null or Nan values exists in the columns

In [5]:
from pyspark.sql.functions import isnan, when, count, col
global_temp_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in global_temp_df.columns]).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+------------------+-----------------------------+----+-------+--------+---------+
| dt|AverageTemperature|AverageTemperatureUncertainty|City|Country|Latitude|Longitude|
+---+------------------+-----------------------------+----+-------+--------+---------+
|  0|            364130|                       364130|   0|      0|       0|        0|
+---+------------------+-----------------------------+----+-------+--------+---------+

In [6]:
foot_print.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in foot_print.columns]).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----+------------+------+---------+------------+-----------+--------------+-------------+------+-----+------+
|country|year|country_code|record|crop_land|grazing_land|forest_land|fishing_ground|built_up_land|carbon|total|QScore|
+-------+----+------------+------+---------+------------+-----------+--------------+-------------+------+-----+------+
|      0|   0|           0|     0|        0|           0|          0|             0|            0|     0|    0|     0|
+-------+----+------------+------+---------+------------+-----------+--------------+-------------+------+-----+------+

##### See duplicate values

The results below show that there aren't duplicate values

In [7]:
import pyspark.sql.functions as f
global_temp_df.groupBy(global_temp_df.columns)\
    .count()\
    .where(f.col('count') > 1)\
    .select(f.sum('count'))\
    .show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+
|sum(count)|
+----------+
|      null|
+----------+

In [8]:
foot_print.groupBy(foot_print.columns)\
    .count()\
    .where(f.col('count') > 1)\
    .select(f.sum('count'))\
    .show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+
|sum(count)|
+----------+
|      null|
+----------+

##### Removing Na values from global_temp_df

In [9]:
global_temp_df = global_temp_df.dropna('any')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

The data model to this project is the star schema, it is a great model to query perfomance because with a little number of joins all data is structured to be analyzed. Futhermore, this data model is easy to understand how all data is distributed in tables.

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data modeled


1. Extract year, month and day from dt column in global_temp_df
2. Inner Join foot_print and global_temp_df using year and country as composite key to create globaltemp_footprint_df with uniques id column
3. Create location_dm dimension table using Country and City columns and create location_id
4. Insert location_id into globaltemp_footprint_df and drop Country and City columns
5. Create record_dm with record_id column 
6. Insert record_id into globaltemp_footprint_df and drop record column
7. Create geopoint_dm with Latitude and Longitude column from globaltemp_footprint_df and add a geopoint_id column
8. Add geopoint_id column to globaltemp_footprint_df and drop Latitude and Longitude columns
9. Create time_dm table with ts, year, month and day from globaltemp_footprint_df
10. Drop year, month, day columns from globaltemp_footprint_df
11. Load globaltemp_footprint_df, location_dm, record_dm, geopoint_dm, time_dm to S3 bucket (s3://capstone-project1/)

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### Extract year, month and day from dt column in global_temp_df

In [10]:
global_temp_df = global_temp_df.select(global_temp_df.columns)\
                                .withColumn('year', year(global_temp_df.dt))\
                                .withColumn('month', month(global_temp_df.dt))\
                                .withColumn('day', dayofmonth(global_temp_df.dt))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Inner Join foot_print and global_temp_df using year and country as composite key to create globaltemp_footprint_df with uniques id column

In [11]:
cond = [global_temp_df.year == foot_print.year, global_temp_df.Country == foot_print.country]
globaltemp_footprint_df = global_temp_df.join(foot_print, cond, 'inner')\
                                    .withColumn('globaltempfoot_id', monotonically_increasing_id())\
                                    .drop(foot_print.year)\
                                    .drop(foot_print.country)\
                                    .drop(foot_print.country_code)
globaltemp_footprint_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------------+-----------------------------+-------------+--------+--------+---------+----+-----+---+------------+-----------------+------------------+------------------+------------------+------------------+-----------------+-----------------+------+-----------------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|         City| Country|Latitude|Longitude|year|month|day|      record|        crop_land|      grazing_land|       forest_land|    fishing_ground|     built_up_land|           carbon|            total|QScore|globaltempfoot_id|
+----------+------------------+-----------------------------+-------------+--------+--------+---------+----+-----+---+------------+-----------------+------------------+------------------+------------------+------------------+-----------------+-----------------+------+-----------------+
|1963-01-01|            13.399|                        0.595|Ahmadpur East|Pakistan|  29.74N|   72.00E|1963|    1|  1|  AreaPerCap|0.639166

#### Create location_dm dimension table using Country and City columns and create location_id

In [12]:
location_dm = globaltemp_footprint_df.groupBy(globaltemp_footprint_df.Country, globaltemp_footprint_df.City)\
                                .count()\
                                .withColumn('location_id', monotonically_increasing_id())\
                                .select(['location_id', 'Country','City'])
location_dm.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+------------+------+
|location_id|     Country|  City|
+-----------+------------+------+
|          0|Saudi Arabia|  Abha|
|          1|       China| Beian|
|          2|       China|Bozhou|
|          3|    Cameroon|Garoua|
|          4|       India|Jhansi|
+-----------+------------+------+
only showing top 5 rows

#### Insert location_id into globaltemp_footprint_df and drop Country and City columns

In [13]:
cond=[globaltemp_footprint_df.Country == location_dm.Country, globaltemp_footprint_df.City == location_dm.City]
globaltemp_footprint_df = globaltemp_footprint_df.join(location_dm, cond, 'inner')\
                            .drop(globaltemp_footprint_df.Country)\
                            .drop(globaltemp_footprint_df.City)\
                            .drop('Country')\
                            .drop('City')
globaltemp_footprint_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------------+-----------------------------+--------+---------+----+-----+---+------------+-----------------+----------------+----------------+-----------------+------------------+----------------+----------------+------+-----------------+-----------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|Latitude|Longitude|year|month|day|      record|        crop_land|    grazing_land|     forest_land|   fishing_ground|     built_up_land|          carbon|           total|QScore|globaltempfoot_id|location_id|
+----------+------------------+-----------------------------+--------+---------+----+-----+---+------------+-----------------+----------------+----------------+-----------------+------------------+----------------+----------------+------+-----------------+-----------+
|1987-01-01|            26.346|                        0.266|  28.13S|   55.45W|1987|    1|  1|  AreaPerCap|0.781407768567306|3.30264247643294|1.10928412710825| 2.64820200150033| 0.033785453366

#### Create record_dm with record_id column

In [14]:
record_dm = globaltemp_footprint_df.groupBy(globaltemp_footprint_df.record)\
                    .count()\
                    .withColumn('record_id', monotonically_increasing_id())\
                    .select(['record_id', 'record'])
record_dm.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+---------------+
|   record_id|         record|
+------------+---------------+
| 42949672960|   EFConsTotGHA|
|154618822656|   EFConsPerCap|
|249108103168|      AreaTotHA|
|352187318272|EFImportsTotGHA|
|455266533376|   EFProdPerCap|
+------------+---------------+
only showing top 5 rows

#### Insert record_id into globaltemp_footprint_df and drop record column

In [15]:
globaltemp_footprint_df = globaltemp_footprint_df.join(record_dm, globaltemp_footprint_df.record == record_dm.record, 'inner')\
                        .drop(record_dm.record)\
                        .drop('record')
globaltemp_footprint_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------------+-----------------------------+--------+---------+----+-----+---+----------------+----------------+----------------+----------------+----------------+----------------+----------------+------+-----------------+-----------+-----------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|Latitude|Longitude|year|month|day|       crop_land|    grazing_land|     forest_land|  fishing_ground|   built_up_land|          carbon|           total|QScore|globaltempfoot_id|location_id|  record_id|
+----------+------------------+-----------------------------+--------+---------+----+-----+---+----------------+----------------+----------------+----------------+----------------+----------------+----------------+------+-----------------+-----------+-----------+
|1987-01-01|            26.346|                        0.266|  28.13S|   55.45W|1987|    1|  1|39191987.4247696|33745479.3242697|6029992.47939934|3967814.83040835|2720542.51253325|32047029.1293088|117702845.7

#### Create geopoint_dm with Latitude and Longitude column from globaltemp_footprint_df and add a geopoint_id column

In [16]:
geopoint_dm = globaltemp_footprint_df.groupBy(globaltemp_footprint_df.Latitude, globaltemp_footprint_df.Longitude)\
                    .count()\
                    .withColumn('geopoint_id', monotonically_increasing_id())\
                    .select(['geopoint_id','Latitude', 'Longitude'])
geopoint_dm.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------+---------+
|geopoint_id|Latitude|Longitude|
+-----------+--------+---------+
|          0|   4.02N|   74.73W|
|          1|  31.35N|  111.20E|
| 8589934592|  45.81N|   10.38E|
| 8589934593|  45.81N|   31.15E|
| 8589934594|   0.80S|   49.02W|
+-----------+--------+---------+
only showing top 5 rows

#### Add geopoint_id column to globaltemp_footprint_df and drop Latitude and Longitude columns

In [17]:
cond=[globaltemp_footprint_df.Latitude == geopoint_dm.Latitude, globaltemp_footprint_df.Longitude == geopoint_dm.Longitude]
globaltemp_footprint_df = globaltemp_footprint_df.join(geopoint_dm, cond, 'inner')\
                        .drop(geopoint_dm.Longitude)\
                        .drop(geopoint_dm.Latitude)\
                        .drop('Longitude')\
                        .drop('Latitude')

globaltemp_footprint_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------------+-----------------------------+----+-----+---+----------------+----------------+----------------+----------------+----------------+----------------+----------------+------+-----------------+-------------+-----------+-----------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|year|month|day|       crop_land|    grazing_land|     forest_land|  fishing_ground|   built_up_land|          carbon|           total|QScore|globaltempfoot_id|  location_id|  record_id|geopoint_id|
+----------+------------------+-----------------------------+----+-----+---+----------------+----------------+----------------+----------------+----------------+----------------+----------------+------+-----------------+-------------+-----------+-----------+
|1992-01-01|3.0980000000000003|                        0.273|1992|    1|  1|442904034.177821|157384300.972959|194720630.346759|58982269.8013754|89233318.0098797|940698224.176475|1883922777.48527|     6|      25769919989|133

#### Create time_dm table with ts, year, month and day from globaltemp_footprint_df

In [18]:
time_dm = globaltemp_footprint_df.groupBy(globaltemp_footprint_df.dt,\
                                         globaltemp_footprint_df.year,\
                                         globaltemp_footprint_df.month,\
                                         globaltemp_footprint_df.day)\
                    .count()\
                    .select(['dt','year', 'month','day'])

time_dm.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----+-----+---+
|        dt|year|month|day|
+----------+----+-----+---+
|1991-10-01|1991|   10|  1|
|2000-11-01|2000|   11|  1|
|1963-08-01|1963|    8|  1|
|1973-07-01|1973|    7|  1|
|2010-11-01|2010|   11|  1|
+----------+----+-----+---+
only showing top 5 rows

#### Drop year, month, day columns from globaltemp_footprint_df

In [19]:
globaltemp_footprint_df=globaltemp_footprint_df.drop('year')\
                                                .drop('month')\
                                                .drop('day')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
globaltemp_footprint_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------------+-----------------------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+------+-----------------+-------------+-----------+-----------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|       crop_land|    grazing_land|     forest_land|  fishing_ground|   built_up_land|          carbon|           total|QScore|globaltempfoot_id|  location_id|  record_id|geopoint_id|
+----------+------------------+-----------------------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+------+-----------------+-------------+-----------+-----------+
|1992-01-01|3.0980000000000003|                        0.273|442904034.177821|157384300.972959|194720630.346759|58982269.8013754|89233318.0098797|940698224.176475|1883922777.48527|     6|      25769919989|1331439861772|42949672960|          1|
|1992-02-01|            

#### Load globaltemp_footprint_df, location_dm, record_dm, geopoint_dm, time_dm to S3 bucket (s3://capstone-project1/)

In [21]:
output_data = 's3a://capstone-project1/'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
location_dm.write.parquet(output_data + 'location_dm/', mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
record_dm.write.parquet(output_data + 'record_dm/', mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
time_dm.write.parquet(output_data + 'time_dm/', mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
geopoint_dm.write.parquet(output_data + 'geopoint_dm/', mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
globaltemp_footprint_df.write.parquet(output_data + 'globaltemp_footprint_df/', mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
1. Here is important check again if there are any duplicate values
2. Check if the number of lines before upload to S3 bucket is the same in fact uploaded.
 
Run Quality Checks

In [27]:
import pyspark.sql.functions as f
globaltemp_footprint_df.groupBy(globaltemp_footprint_df.columns)\
    .count()\
    .where(f.col('count') > 1)\
    .select(f.sum('count'))\
    .show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+
|sum(count)|
+----------+
|      null|
+----------+

from pyspark.sql.functions import isnan, when, count, col
globaltemp_footprint_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in globaltemp_footprint_df.columns]).show()

In [28]:
def data_quality_check(spark, non_uploaded_table, output_data , folder):
    '''
    Verify if the number of rows before and after upload to S3 have the same number
    
    Spark: spark session create by create_spark_session function
    non_uploaded_table: the table before be uploaded
    output_data: the s3 path where the data was uploaded
    folder: name of the s3 folder
    '''
    # count the final row before uploaded
    count_rows_non_uploaded_table = non_uploaded_table.count()
    # load table uploaded
    table_after_upload = spark.read.parquet(output_data + folder + '/')
    # count rows of uploaded table
    count_rows_upload_table = table_after_upload.count()

    if count_rows_non_uploaded_table != count_rows_upload_table:
        raise ValueError(f'Data quality checks failed. Not all rows were copied.')
    else:
        print(f'Data was copied completely.')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
output_data= 's3a://capstone-project1/'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
data_quality_check(spark, location_dm, output_data, 'location_dm')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Data was copied completely.

In [31]:
data_quality_check(spark, record_dm, output_data, 'record_dm')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Data was copied completely.

In [32]:
data_quality_check(spark, time_dm, output_data, 'time_dm')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Data was copied completely.

In [33]:
data_quality_check(spark, geopoint_dm, output_data, 'geopoint_dm')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Data was copied completely.

In [34]:
data_quality_check(spark, globaltemp_footprint_df, output_data, 'globaltemp_footprint_df')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Data was copied completely.

#### 4.3 Data dictionary 
![dictionary](./img/dictionary.png)


#### Step 5: Complete Project Write Up

#### About the project

In this project was used Spark to process the data initially stored in S3. For this, was created an EMR cluster in AWS able to process it. It was a good choice because Spark works in a distributed fashion and EMR cluster can be upgraded the number and type of instances as the data grows. The S3 here has a fundamental role to store the dataset giving consistency and facilities to access all data.

To the organization of the data, the dataset can be updated one time per year because the analysis can be done after data for an entire year has been collected from National Footprint dataset.


#### Technologies

* The data model used is start schema with only four dimension table (time_dm, lcoation_dm, geopoint_dm, record_dm). The start schema model is a great choice to do fast analysis joining tables.

* AWS EMR (Spark Cluster): The best tool to process large dataset using Spark with low cost, elasticity in a easy way to use

* Python: A programming language that has many libs and frameworks that able work with data

* Amazon S3: Offer scalability, data availaility, security and perfomance

#### Scenarios

* The data was increased by 100x.
    * Probably the the Spark EMR cluster would need to be modified in relation of the instance type and the number of cores to process all data


* The pipelines would be run on a daily basis by 7 am every day
    * The Airflow framework can schedule, effectively, to run all process with the data every single morning.


* The database needed to be accessed by 100+ people.
    * A security group would be created and pass all users to have the access of the S3