Skip to content

Notable Features

SriDev SriKanth edited this page Feb 14, 2019 · 9 revisions

Table of Contents

* [Overview](#overview)
* [Abstraction Layer](#abstraction_layer)
	* [Executor](#executor)
	* [Task](#task)
* [Modules](#modules)
* [Pipeline Layer - DAG](#pipeline-layer)
	* [Simple Pipeline](#simple-pipeline)
	* [Complex Pipeline](#complex-pipeline)

The entire task is designed to consists of 4 modules, 1 abstraction layer and 1 pipeline layer (DAG) so that the number of modules can be extended or added for maintainability and flexibility purpose.

Overview

The Abstraction layer has the ability to run every module independent from each other. Also, a single module can be executed in parallel with different configurations on different docker containers to improve performance.

Abstraction Layer

The Abstraction Layer consists of 2 objects :

Executor

The Executor is an abstraction that connects to a configuration file. It's a custom built object that creates a task object (see next section) based on the configuration file (see task abstraction diagram). A task configuration file is assigned to the executor based on the arguments that is run from docker console.

Task

The core of the abstraction layer is a custom built Task object that can be used to run any module given the configuration. Following are some of the useful attributes of a task object,

  • Status - Every task is provided with a state (IDLE, RUNNING, DENIED, SUCCESS, FAILURE) such that at any point in time we know the status of our job.
  • Configuration - A configuration is a JSON file that direct the module to be read and certain useful parameters to run the task.

Modules

As explained above, there are 4 modules associated with this exercise. Every module is called by the Task object which is defined based on the task configuration file. Following are the modules and it's functions :

  • Extract and Clean - A module to parse, clean and extract data from the tar file. Also, config parameters allow to extract specific files / JSON column data rather than all the files and JSON column from the tar file. All records in the NEWLINE delimited JSON is parsed line by line to reduce memory failures since its processing huge files ( up-to 5 GB). The extracted, cleaned and clipped data file is written again to a NEWLINE delimited JSON file.

  • Sample Users - A module to clip X users from the extracted/cleaned user JSON file and save the results to a CSV file with headers. The X can be defined inside the configuration file for the task. The JSON file is parsed in chunk lines to avoid multiple IO operations. Note : If required, the chunk can be set to 1 to parse line by line.

  • Sample Users' Reviews - A module to select all reviews for the sampled users and write the results to a CSV file with headers. The parsing procedure is the same as sample users module.

  • Sample Users with no review in last year - A module to select all user IDs from the sampled set who did not write any reviews from the specified X date. The X date can be given in the configuration file which can be changed any time. The parsing procedure is the same as sample users module.

Pipeline Layer - DAG

Though task object can run every module independent from each other, an ETL process will sometimes have dependency tasks before it can begin, hence the need for a pipeline. The pipeline layer is a custom built dag object that orchestrates the task objects in parallel or sequential given the DAG configuration file. The DAG configuration is a series of tasks on a JSON file which has its dependencies defined in it. An example configuration JSON looks like this,

{  
  "decompress_and_clean" : {  
    "path" : "{base}/configs/decompress_and_clean.json"  
  },  
  "sample_users" : {  
    "path" : "{base}/configs/sample_users.json",  
  "dependencies": [  
      "decompress_and_clean"  
  ]  
  }
}

Simple Pipeline :

A simple pipeline is a series of tasks that runs in sequence. One task is dependent on another to begin execution. Following is how a simple pipeline works,

Complex Pipeline :

It's good to have task dependancy. But there are some tasks completely independent from each other. A complex pipeline serves this purpose i.e. to run independent tasks in parallel and wait until all the parallel task has completed before moving to the next step in the pipeline. Following is how a complex pipeline can be work,

Note : An advanced version can be done using Airflow. This is just a complete custom built Pythonic version to showcase object oriented programming in action.

Notify Failures by Email

When the pipeline features is scheduled to run on a scheduler, then when something goes wrong we need to be notified. Hence, any task failures is sent as an email to set recipients. The email addresses can be updated here,

Data_Engineering_Task/container_folder/newyoker_task/etl/__init__.py

A sample failure notification will look like this,

Clone this wiki locally