Skip to content

Latest commit

 

History

History

airflow

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Project: Data Pipelines with Airflow

A music streaming company, Sparkify, has decided that it is time to introduce more automation and monitoring to their data warehouse ETL pipelines and come to the conclusion that the best tool to achieve this is Apache Airflow.

They have decided to bring you into the project and expect you to create high grade data pipelines that are dynamic and built from reusable tasks, can be monitored, and allow easy backfills. They have also noted that the data quality plays a big part when analyses are executed on top the data warehouse and want to run tests against their datasets after the ETL steps have been executed to catch any discrepancies in the datasets.

The source data resides in S3 and needs to be processed in Sparkify's data warehouse in Amazon Redshift. The source datasets consist of JSON logs that tell about user activity in the application and JSON metadata about the songs the users listen to.

Song Dataset

The first dataset is a subset of real data from the Million Song Dataset. Each file is in JSON format and contains metadata about a song and the artist of that song. The files are partitioned by the first three letters of each song's track ID. For example, here are filepaths to two files in this dataset.

s3://my-personal-bucket/song_data/A/B/C/TRABCEI128F424C983.json
s3://my-personal-bucket/song_data/A/A/B/TRAABJL12903CDCF1A.json

And below is an example of what a single song file, TRAABJL12903CDCF1A.json, looks like.

{"num_songs": 1, "artist_id": "ARJIE2Y1187B994AB7", "artist_latitude": null, "artist_longitude": null, "artist_location": "", "artist_name": "Line Renaud", "song_id": "SOUPIRU12A6D4FA1E1", "title": "Der Kleine Dompfaff", "duration": 152.92036, "year": 0}

Log Dataset

The second dataset consists of log files in JSON format generated by this event simulator based on the songs in the dataset above. These simulate activity logs from a music streaming app based on specified configurations.
The log files in the dataset you'll be working with are partitioned by year and month. For example, here are filepaths to two files in this dataset.

s3://my-personal-bucket/log_data/2018/11/2018-11-12-events.json
s3://my-personal-bucket/log_data/2018/11/2018-11-13-events.json

And below is an example of what the data in a log file, looks like.

{"artist":"The Kooks","auth":"Logged In","firstName":"Sara","gender":"F","itemInSession":0,"lastName":"Johnson","length":132.25751,"level":"paid","location":"Winston-Salem, NC","method":"PUT","page":"NextSong","registration":1540809153796.0,"sessionId":152,"song":"Eddie's Gun","status":200,"ts":1541260356796,"userAgent":"\"Mozilla\/5.0 (iPhone; CPU iPhone OS 7_1_2 like Mac OS X) AppleWebKit\/537.51.2 (KHTML, like Gecko) Version\/7.0 Mobile\/11D257 Safari\/9537.53\"","userId":"95"}

Project structure

Files used on the project:

  1. All the data resides in my personal AWS s3 bucket.
  2. The dags folder has all the imports and task templates in place.
  3. sql_creation_queries.py has inside all the queries necessary to build the database structure.
  4. create_tables.py creates the sparkify database in redshift using the above queries.
  5. dwh.cfg needs to be filled with the credentials to allow the above script to connect to Redshift.
  6. plugins/helpers/sql_queries.py contains the insert statements that populate the database.
  7. plugins/operators/ inside this folder are all the tasks that compose the complete DAG.
  8. plugins/helpers/sql_queries.py contains the necessary queries to populate the database.
  9. README.md is an overview of the whole project.

Schema for Song Play Analysis

I processed the info contained in the S3 bucket into staging tables that I later used to create the main database.

Staging Tables

  • staging_events: contains all the fields and rows from the logs jsons.
  • staging_songs: contains all the fields and rows from the songs jsons.

Using the songs and events staging tables, I created a star schema database optimized for queries on song play analysis, which is composed of the following tables.

Fact Table

songplays - records in log data associated with song plays:

  • play_id int PRIMARY KEY;
  • start_time date NOT NULL;
  • user_id int NOT NULL ;
  • level ENUM ('paid', 'free');
  • song_id text;
  • artist_id text;
  • session_id int;
  • location text;
  • user_agent text.

Dimension Tables

users - users in the app:

  • user_id int PRIMARY KEY;
  • first_name text;
  • last_name text;
  • gender ENUM ('M', 'F');
  • level ENUM ('paid', 'free').

songs - songs in music database:

  • song_id text PRIMARY KEY;
  • title text;
  • artist_id text NOT NULL;
  • year int;
  • duration float NOT NULL.

artists - artists in music database:

  • artist_id text PRIMARY KEY;
  • name text;
  • location text;
  • latitude float;
  • longitude float.

time - timestamps of records in songplays broken down into specific units:

  • start_time date PRIMARY KEY;
  • hour int;
  • day int;
  • week int;
  • month int;
  • year int;
  • weekday text.

Airflow Pipeline

Before running the project it's necessary creating the tables by running the create_tables.py script once to create the destination database in Redshift.

To execute the pipeline we need to establish a connection to AWS, to make the code reusable and protect my credentials I used Airflow's admin page to create custom connections that store my private informations as well as the connection details.

This what the dag looks like from the airflow UI, keep reading for a description of the operators groups: dag

Operators

Operators create necessary tables, stage the data, transform the data, and run checks on data quality.
Connections and Hooks are configured using Airflow's built-in functionalities.
All of the operators and task run SQL statements against the Redshift database.

Stage Operators

The stage operator loads any JSON and CSV formatted files from S3 to Amazon Redshift. The operator creates and runs a SQL COPY statement based on the parameters provided. The operator's parameters should specify where in S3 the file is loaded and what is the target table.

Fact and Dimension Load Operators

The dimension and fact operators make use of the SQL helper class to run data transformations. Operators take as input the SQL statement from the helper class and target the database on which to run the query against. A target table is also defined that contains the results of the transformation.

Data Quality Operator

The data quality operator is used to run checks on the data itself. The operator's main functionality is to receive one or more SQL based test cases along with the expected results and execute the tests. For each the test, the test result and expected result are checked and if there is no match, the operator raises an exception and the task is retried until it fails eventually.

For example one test could be a SQL statement that checks if certain column contains NULL values by counting all the rows that have NULL in the column. We do not want to have any NULLs so expected result would be 0 and the test would compare the SQL statement's outcome to the expected result.
The tests are not hardcoded, the user can create any condition and query to test the tables against by compiling the operator with the proper arguments.

Begin and End Operators

They don't actually perform any real task, they serve to enhance the visual representation of the dag in the airflow UI and define a clear boundary of its execution.