# Data Engineering Capstone Project

### ETL Pipeline - Immigration and Temperature Data

The project's goal is to build out an ETL pipeline with the data of city temperature, airport detail, USA cities population by gender and immigration statistics of different cities. First extract data from given format and converted into small chucks of Json files using Apache Spark. And then store.all files to Amazon S3 bucket. Now load that all data into Redshift cluster's Postgres database using Apache Airflow to make fully automated this system.

Now using that tables which is created in redshist cluster we will give answer regarding imagration behavior to location tempatures questions.

The project follows the follow steps:

- Step 1: Scope the Project and Gather Data
- Step 2: Explore and Assess the Data
- Step 3: Define the Data Model
- Step 4: Run ETL to Model the Data
- Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project first i take all provided data and clean data in csv_to_json.py file and prepare required bunch of json file of that csv file in appropriate formate which i have to required to make a dimension and fact table.
- For convert CSV to Json formate run the `upload_file_dag.py` so it will load all prepared that bunch of json file to upload S3 bucket
- Then now load that S3 bucket data using redshift cluster integrate with posgres SQL databse and load data into database. 
- For that prepare `create_table_dag.py` dag so it will create blank table for load json data
- Then run `Capstone_dag.py` dag so all json file data load into the creted table on redshift cluster.
- Now for end soluction create ETL pipeline in the `capstone_project.ipynb` file for given answer to the question.

#### Tools and Technologies used
The tools used in this project include:
- __Apache Spark__ - This was used for convert big csv file to small chunks of json file.
- __Apache Airflow__ - This is used for making automated process in every stage like upload json file to S3, create table to upload data,performing ETL process load data into dimension and fact table.
- __Amazon Redshift__ - This is database it store data from S3 and that data add into the fact and dimension tables.
- __Amazon S3__ - It is storage system so here we stored json files which is generated by spark that are uploaded from the local filesystem.
    
#### Describe and Gather Data 
The datasets used and sources include:
- __I94 Immigration Data__: This data is retrieved from the US National Tourism and Trade Office and the source can be found [here](https://travel.trade.gov/research/reports/i94/historical/2016.html).
- __World Temperature Data__: This dataset came from Kaggle and you can find out about it [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).
- __U.S. City Demographic Data__: This data comes from OpenSoft [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).
- __Airport Code Table__: This is a simple table of airport codes and corresponding cities that is retrieved from [here](https://datahub.io/core/airport-codes#data)


In [65]:
%load_ext sql

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


In [66]:
%sql postgresql://awsuser:awsPassw0rd@redshift.c7h8pkc1mpfg.us-west-2.redshift.amazonaws.com:5439/dev

'Connected: awsuser@dev'

### Step 2: Explore and Assess the Data
#### Explore the Data & Cleaning Steps
When i load data from .csv to json file at that time removed all unnecessary data and remove duplicate vale and upload cleaned data to the json file.

In [37]:
%sql SELECT * FROM city_temp LIMIT 5;

 * postgresql://awsuser:***@redshift.c7h8pkc1mpfg.us-west-2.redshift.amazonaws.com:5439/dev
5 rows affected.


averagetemperature,city,country
27,Ambattur,India
8,Arnhem,Netherlands
15,Bari,Italy
16,Beibei,China
22,Birigui,Brazil


In [38]:
%sql SELECT * FROM immigration LIMIT 5;

 * postgresql://awsuser:***@redshift.c7h8pkc1mpfg.us-west-2.redshift.amazonaws.com:5439/dev
5 rows affected.


i94yr,i94mon,i94cit,i94port,i94mode,i94visa
2016.0,4.0,746.0,NEW YORK,1.0,2.0
2016.0,4.0,148.0,SAN FRANCISCO,1.0,2.0
2016.0,4.0,135.0,LOS ANGELES,1.0,2.0
2016.0,4.0,690.0,MIAMI,1.0,2.0
2016.0,4.0,582.0,NEW YORK,1.0,2.0


In [39]:
%sql SELECT * FROM cities_demographics LIMIT 5;

 * postgresql://awsuser:***@redshift.c7h8pkc1mpfg.us-west-2.redshift.amazonaws.com:5439/dev
5 rows affected.


city,male_population,female_population,total_population,state_code
Winston-Salem,112520,128712,241232,NC
South Gate,47758,48641,96399,CA
Bloomington,43318,43118,86436,MN
Boynton Beach,33701,40271,73972,FL
Green Bay,52517,52704,105221,WI


In [40]:
%sql SELECT * FROM airport_code LIMIT 5;

 * postgresql://awsuser:***@redshift.c7h8pkc1mpfg.us-west-2.redshift.amazonaws.com:5439/dev
5 rows affected.


code,name,country,state
12TN,Riley Creek Airport,US,TN
AU-MUP,[Duplicate] Mulga Park Airport,AU,NT
ID-WA98,Kobok Airport,ID,MU
OK72,Trust Landing Airport,US,OK
YKSC,Kingscote Airport,AU,SA


In [41]:
%sql SELECT * FROM immigration_city_temp LIMIT 5;

 * postgresql://awsuser:***@redshift.c7h8pkc1mpfg.us-west-2.redshift.amazonaws.com:5439/dev
5 rows affected.


i94yr,i94mon,i94cit,i94mode,i94visa,averagetemperature,city,country
2016.0,4.0,111.0,1.0,1.0,15,San Diego,United States
2016.0,4.0,438.0,1.0,2.0,12,Los Angeles,Chile
2016.0,4.0,201.0,1.0,2.0,12,Los Angeles,Chile
2016.0,4.0,108.0,1.0,2.0,12,Los Angeles,Chile
2016.0,4.0,687.0,1.0,2.0,12,Los Angeles,Chile


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### city_temp Dimension Table
- Here i prepared city temperature detail table from GlobalLandTemperaturesByCity.csv which is converted in to as `city_temp` as Json bunch files and stored on S3 Bucket.

In [60]:
%sql SELECT ORDINAL_POSITION, COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH, IS_NULLABLE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'city_temp'

 * postgresql://awsuser:***@redshift.c7h8pkc1mpfg.us-west-2.redshift.amazonaws.com:5439/dev
3 rows affected.


ordinal_position,column_name,data_type,character_maximum_length,is_nullable
3,country,character varying,256,YES
2,city,character varying,256,YES
1,averagetemperature,character varying,256,YES


#### immigration Dimension Table
- Here i prepared immigration detail table from immigration_data_sample.csv and city_code.json file which is converted in to as `immigration` as Json bunch files and stored on S3 Bucket.

In [61]:
%sql SELECT ORDINAL_POSITION, COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH, IS_NULLABLE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'immigration'

 * postgresql://awsuser:***@redshift.c7h8pkc1mpfg.us-west-2.redshift.amazonaws.com:5439/dev
6 rows affected.


ordinal_position,column_name,data_type,character_maximum_length,is_nullable
6,i94visa,character varying,256,YES
5,i94mode,character varying,256,YES
4,i94port,character varying,256,YES
3,i94cit,character varying,256,YES
2,i94mon,character varying,256,YES
1,i94yr,character varying,256,YES


#### cities_demographics Dimension Table
- Here i prepared city demographics detail table from us-cities-demographics.csv which is converted in to as `city_demographics` as Json bunch files and stored on S3 Bucket.

In [62]:
%sql SELECT ORDINAL_POSITION, COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH, IS_NULLABLE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'city_demographics'

 * postgresql://awsuser:***@redshift.c7h8pkc1mpfg.us-west-2.redshift.amazonaws.com:5439/dev
0 rows affected.


ordinal_position,column_name,data_type,character_maximum_length,is_nullable


#### airport_code Dimension Table
- Here i prepared airport code detail table from airport-codes_csv.csv which is converted in to as `airport_code` as Json bunch files and stored on S3 Bucket.

In [63]:
%sql SELECT ORDINAL_POSITION, COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH, IS_NULLABLE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'airport_code'

 * postgresql://awsuser:***@redshift.c7h8pkc1mpfg.us-west-2.redshift.amazonaws.com:5439/dev
4 rows affected.


ordinal_position,column_name,data_type,character_maximum_length,is_nullable
4,state,character varying,256,YES
3,country,character varying,256,YES
2,name,character varying,256,YES
1,code,character varying,256,YES


#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

#### immigration_city_temp Fact Table
- Here i prepared table using detail from city_temp and immigration table.

In [64]:
%sql SELECT ORDINAL_POSITION, COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH, IS_NULLABLE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'immigration_city_temp'

 * postgresql://awsuser:***@redshift.c7h8pkc1mpfg.us-west-2.redshift.amazonaws.com:5439/dev
8 rows affected.


ordinal_position,column_name,data_type,character_maximum_length,is_nullable
8,country,character varying,256,YES
7,city,character varying,256,YES
6,averagetemperature,character varying,256,YES
5,i94visa,character varying,256,YES
4,i94mode,character varying,256,YES
3,i94cit,character varying,256,YES
2,i94mon,character varying,256,YES
1,i94yr,character varying,256,YES


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [48]:
# in which state maximum population by total and gender wise
%sql select sum(total_population) as total_population,sum(male_population) as male_population, \
                 sum(female_population) as female_population, state_code \
                 from cities_demographics group by state_code order by total_population desc LIMIT 5

 * postgresql://awsuser:***@redshift.c7h8pkc1mpfg.us-west-2.redshift.amazonaws.com:5439/dev
5 rows affected.


total_population,male_population,female_population,state_code
24822460,12278281,12544179,CA
14299983,7063571,7236412,TX
9815626,4692055,5123571,NY
6796738,3236773,3487375,FL
4562312,2218541,2343771,IL


In [49]:
#From which country (or countries) travelers originate? Top countries of origin.
%sql SELECT  i94port AS country, COUNT(*) AS country_visitors \
                 FROM immigration  GROUP BY i94port ORDER BY country_visitors DESC LIMIT 10

 * postgresql://awsuser:***@redshift.c7h8pkc1mpfg.us-west-2.redshift.amazonaws.com:5439/dev
10 rows affected.


country,country_visitors
NEW YORK,154
MIAMI,111
LOS ANGELES,106
SAN FRANCISCO,55
CHICAGO,45
NEWARK TETERBORO,45
HONOLULU,38
ATLANTA,37
ORLANDO,36
HOUSTON,30


In [47]:
#On which type of visa and from which mode most of the people do immigration
%sql select top 5 count(*) as total_imigrants,i94visa,i94mode \
                 from immigration group by i94visa,i94mode order by total_imigrants desc;

 * postgresql://awsuser:***@redshift.c7h8pkc1mpfg.us-west-2.redshift.amazonaws.com:5439/dev
5 rows affected.


total_imigrants,i94visa,i94mode
788,2.0,1.0
151,1.0,1.0
23,2.0,3.0
13,3.0,1.0
6,2.0,2.0


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks already processed in to the `capstone_dags.py` file

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in README.md file also.

## Data Dictionary
### city_temp Dimension Table

| Column                       | Data Type                    | Description                  |
| ---------------------------- | ---------------------------- | ---------------------------- | 
| averagetemperature           | varchar(256)                 | average temperature of city  |
| city                         | varchar(256)                 | city name                    |
| country                      | varchar(256)                 | country name                 |

### immigration Dimension Table

| Column                       | Data Type                    | Description                  | 
| ---------------------------- | ---------------------------- | ---------------------------- |
| i94yr                        | varchar(256)                 | 4 digit year                 | 
| i94mon                       | varchar(256)                 | numeric month value          | 
| i94cit                       | varchar(256)                 | numeric cit value            | 
| i94mode                      | varchar(256)                 | mode of immigration          |
| i94visa                      | varchar(256)                 | visa type                    | 
| i94port                      | varchar(256)                 | city name                    |

### cities_demographics Dimension Table

| Column                       | Data Type                    | Description                  | 
| ---------------------------- | ---------------------------- | ---------------------------- |
| city                         | varchar(256)                 | city name                    | 
| state_code                   | varchar(256)                 | state name                   | 
| male_population              | varchar(256)                 | male population              |
| female_population            | varchar(256)                 | female population            | 
| total_population             | varchar(256)                 | total population             |

### airport_code Dimension Table

| Column                       | Data Type                    | Description                  | 
| ---------------------------- | ---------------------------- | ---------------------------- |
| code                         | varchar(256)                 | airport code                 | 
| name                         | varchar(256)                 | airport name                 | 
| country                      | varchar(256)                 | country name                 |
| state                        | varchar(256)                 | state name                   | 

### immigration_city_temp Fact Table

| Column                       | Data Type                    | Description                  | 
| ---------------------------- | ---------------------------- | ---------------------------- |
| i94yr                        | varchar(256)                 | 4 digit year                 | 
| i94mon                       | varchar(256)                 | numeric month value          | 
| i94cit                       | varchar(256)                 | numeric cit value            | 
| i94mode                      | varchar(256)                 | mode of immigration          |
| i94visa                      | varchar(256)                 | visa type                    | 
| averagetemperature           | varchar(256)                 | average temperature of city  |
| City                         | varchar(256)                 | city name                    | 
| Country                      | varchar(256)                 | country name                 | 


## Step 5: Complete Project Write Up
### Possible Scenarios and Approach
- If the data gets increased by 100x:
    - It can be handled by increasing the number of compute nodes being used in the redshift cluster so it will increase speed in read and write operation.
    - Compress the S3 data.
- If the pipelines were needed to be run on a daily basis by 7am:
    - In dags we can set clone job using scheduled parameter to run daily by setting the start_date config as a datetime and set schedule_interval as @daily.
- If the database needed to be accessed by 100+ people:
    - Use scaling fundamental in redshift cluster so use elastic resize for better performance.
    - Prepare multi region data cluster so according to user get data to their nearest region which makes in fast data access.