Skip to content

Data modeling with Redshift and build an ETL pipeline using Python.

Notifications You must be signed in to change notification settings

TDPrabhu/Capstone-Project

Repository files navigation

Capstone-Project

Objective : The purpose of this project is to create an ETL pipeline (etl.py) which extracts the data (i94_jan16_sub.sas7bdat, usa-cities-demography.csv), converts it to a parquet format and store them in the S3 bucket. From there the data is finally loaded into RedShift DB where the analysts can consume the data and perform any statistical analysis or exploratory data analysis.

Architecture Diagram :

The dimenional model which i adopted is based on star schema.

For extraction and transformation I have used pandas and pyarrow libraries and to transfer the files to amazon S3 bucket, I have used s3fs libraries. The script uses psycopg2 package for data munging. Before inserting any data, I have performed the validations in order to avoid any redundancy.

alt tag

Dataset :

I have used the data set provided by the Udacity, of that i chose 2 different data set (i94_jan16_sub.sas7bdat , usa-cities-demography.csv ) .

1)i94_jan16_sub.sas7bdat has more than 2.8 million records . 2)usa-cities-demography.csv has nearly 3 thousand records .

Explore and Assess the Data :

Based on analyzing the data from 'i94_jan16_sub.sas7bdat' below are some of the assumptions for the required data columns:

Example :- da.cicid.isnull().sum().sum() to check for the null values

       da.cicid.dtype  to check for the data type .
  • CICID is a unique number for the immigrants. (No null values found).
  • I94res is country from where one has travelled. (No null values found).
  • I94addr is where the immigrants resides in USA . (Found null values)..
  • arrdate is date of arrrival . (Convert it to timestamp format).
  • visatype is the type of visa which one owns . (No null values found).

I have created two defaultdict for the columns (I94res and I94addr, loaded them into a data frame (country ,usa_states) respectively . This will help us to eliminate any null values or bad data which are not listed in the dictionary. In future, if we need to validate for the new state or country we can add the values in the dictionary and process the data.

Converted the arrdate to timestamp .

/* df['converted_date'] = pd.to_timedelta(df.arrdate, unit="d") + pd.Timestamp(1960, 1, 1) */ .

The tools utilized on this project are the same as we have been learning during the course of the Nanodegree.

  • Python
    • --Pyarrow
    • --Pandas
    • --Collections
    • --s3fs
  • AWS S3
  • AWS Redshift

Data Model :

alt tag

immigration_stagging Staging Table
usa_demographic_stagging Staging Table
dim_visa Dimension Table
dim_date Dimension Table
dim_usa_state Dimension Table
dim_country Dimension Table
fact_immigration Fact Table
Table Primary Key Distribution-Style Sort Key
dim_visa visa_id All visa_id
dim_date date_id All date_id
dim_usa_state usa_state_id All usa_state_id
dim_country country_id All country_id
fact_immigration cicid Key cicid

Table Creation:

Tables are created by executing the script create_tables.py. The scripts will call the sql_queries.py which has all the required DDL statements in it.

ETL Pipeline:

etl.py is used to extract and transform the data from the file. It establishes a connection to the database; it extracts the required information from the files mentioned in the path and stores the data in to the appropriate staging tables. It checks for the duplicate before inserting the record into the facts and dim. The code is modularized and comments are included explaining their function.

1)upload_immigration_data_S3:This function will fetch the immigration data and store it in the S3 bucket. For all the transformation like checking the data type, date format, the defaultdict is used to map the country and State for US.

  • The valid US states are loaded into the dictionary, if the column doesn’t have the values provided, it will be replaced with "Other".

  • The valid Country and their respective country code are loaded into the dictionary , if the column doesnt have the values provided in dictionary it will be replaced with "Other".

  • Once all the required transformation is done, the data is then loaded to a pandas data frame, converted to parquet file format and then moved into the S3 bucket.

  • The parquet file is loaded into two folders Source & Result. The source will contain all the files and the result folder will only have the delta files .

2)upload_usa_demography_S3: Function will fetch and load the usa-cities-demography.csv into S3 bucket.

3)load_staging_tables : Loads the data from S3 bucket into the Redshift Staging table.

4)insert_tables : Inserts the data into the dim and fact table.

5)validation_records :- Validate the record count of immigration_stagging & fact_immigration table

If we are running this in the AWS cloud environment, create this etl.py as an lambda service and schedule it with the cloud watch service.

Steps to execute the code :

  1. Open a new terminal

  2. Install the below listed libraries

    • !pip install pyarrow
    • !pip install s3fs
  3. First execute the create_tables.py.

    python create_tables.py

  4. Next Execute the etl.py

    python etl.py

    alt tag

Re run the create_tables.py, whenever you do the change to sql_queries.py or before you execute the etl.py

Example query for analysis

select state, visa_type from ( select state, visa_type, rank () over ( partition by state order by agg_tot desc ) agg_values from ( select b.visa_type,c.state, count (1) agg_tot from public.fact_immigration a,public.dim_visa b ,public.dim_usa_state c where a.visa_id =b.visa_id and a.usa_state_id =c.usa_state_id group by b.visa_type,c.state ) ) where agg_values = 1;

Scenarios

  1. The data was increased by 100x.
  • In this project we have used Python for data processing, S3 for storage and Redshift as database. We can replace the python (etl.py) with Spark. All the data processing can be handed over to Spark which makes the process robust and very fast. So the technology stack will be 1) Spark for data processing 2) S3 for file storage 3) Redshift as a database

  • If we deploy our project in AWS environment, we can also try with AWS Glue, it's. If So, then the new tech stack will be (AWS GLUE, AWS S3, AWS Redshift / AWS Athena).

  1. The pipelines would run daily by 7 am every day.
  1. The database needed to be accessed by 100+ people.
  • Redshift is highly scalable, hence this will not be problem.
  • We can also use AWS Athena, as it is a serverless query service. One advantage would be that we would not load the S3 data into Athena. This makes the entire process to be fast and efficient for data consumer to gain insight
    1. Apache Spark
    2. S3 (data partition based on year, month, day)
    3. Athena .

About

Data modeling with Redshift and build an ETL pipeline using Python.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Languages