Skip to content
This repository has been archived by the owner. It is now read-only.
Browse files
Merge pull request #1 from Futurewei-io/master
  • Loading branch information
radibnia77 committed Sep 18, 2020
2 parents be89aa5 + 0f81929 commit 755123dd8009d8d169b2b748c40529787d2dda9b
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 0 deletions.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,51 @@
Airflow is a data pipeline tool that can integrate projects with multiple tasks, and monitor schedulers and logs for each task.

### DAG
DAG, (Directed Acyclic Graph), is written in a Python script, which represents the DAG structure, tasks and its dependencies.
A typical DAG has 5 sessions: package importing, default_args, dag definition, tasks, and procedures of tasks.

### TASK
A DAG can include multiple number of tasks. Each Python file that can run independently can be a task. One Python file can be re-used in multiple tasks. A task shall define the type of operator, connection id, task id and the dag it is assigned to. Optional configurations also include maximum number of driver memory, number of executors etc. While running a DAG, each task has its own log.

DAG defines the type for operator for each task. A few most common operators include PythonOperator, BashOperator and SparkSubmitOperator. All the operator types and parameters can be found here:

Apache Airflow shall be installed on the working server, where the server can run spark-submit and has connection to database. The below command can be easily used to start Airflow webserver and scheduler:

airflow webserver -p 8080
airflow scheduler

### DIN-Model Pre-Processing EXECUTION

To run DIN-Model pre-processing on Airflow, follow the below steps.

1. Download the code from GitHub (under Device Cloud Service /blue-marlin-model/din_model/din_model/pipeline). Securely copy the second last ‘din_model’ folder to the AIRFLOW_HOME directory. The DAG Python files are all under /blue-marlin-models/din_model/din_model/dags directory. Securely copy to Airflow dag directory (airflow/dags). You can check if the DAG is workable by typing “python” in the command line. If the DAG is workable, this command should not return anything.

2. Launch the Airflow webserver and open the UI dashboard to check if this dag exists on the list of dags without showing any error on top. Then enable the dag.

3. Click “Trigger Dag” icon in the rightest column named “Links”, then click “trigger”.

4. Go to “Browse Task Instances” to watch the status of this DAG run instance. Click each task id to view its status and logs.

max_active_runs can be set within a DAG. This represents maximum number of active DAG-runs, beyond this number, the scheduler does not create new active DAG-runs.
To schedule the maximum number of task instances within a DAG, set concurrency (int) in the DAG, which determines the number of task instances allowed to run concurrently.
There is an issue of tasks overlapping between DAG-runs. This can cause tasks to compete for resources as well as duplicating or overwriting what the other task is doing.

To prevent overlapping between DAG-runs:
1. Set max_active_runs for a DAG to 1 to ensure that only one DAG-run is active at a time. In this case, only one DAG-run is executed at a time.
2. Set depend_on_past to True such that a task does not execute unless the previous one completes.
3. Finally, make the DAG use a pool with one slot.

### Error Handling
Each task inside a DAG has its own log record. Besides, we can also set up an independent task to handle the failed tasks or operations.

### Add-ons
Apache Airflow supports DAGs to be run from another server, by installing Airflow Celery. Please refer to for more details.

Empty file.
@@ -0,0 +1,57 @@
from airflow import DAG
import datetime as dt
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import timedelta

default_args = {
'owner': 'din_model',
'depends_on_past': False,
'start_date': dt.datetime(2020, 7, 22),
'retries': 1,
'retry_delay': timedelta(minutes=5),

dag = DAG(


def sparkOperator(
return SparkSubmitOperator(
conf={'spark.driver.maxResultSize': '4g'},

clean = sparkOperator('', 'din_clean')
logs = sparkOperator('', 'din_logs')
region_adding = sparkOperator('', 'din_add_region')
trainready = sparkOperator('', 'din_trainready')
# forecasting = sparkOperator(
# '',
# 'din_forecasting',
# py_files='/home/airflow/airflow/din_model/trainer/'
# )

tfrecords = sparkOperator(

clean >> logs >> region_adding >> trainready >> tfrecords
@@ -0,0 +1,137 @@
from os.path import join as path_join
from airflow import DAG
import datetime as dt
from airflow.contrib.operators.ssh_operator import SSHOperator
from datetime import timedelta

default_args = {
'owner': 'din_model_trainer',
'depends_on_past': False,
'start_date': dt.datetime(2020, 7, 22),
'retries': 0,
'retry_delay': timedelta(minutes=5),

dag = DAG(

class CommandsBuilder(object):
class CommandBuilder(object):
def __init__(self, command):
super(CommandsBuilder.CommandBuilder, self).__init__()
self._command = command
self._params = []

def add_param(self, param):
return self

def build(self):
return '{} {}'.format(self._command, ' '.join(self._params))

def __init__(self, commands):
super(CommandsBuilder, self).__init__()
self._commands = commands

def build(self):
# prefix = ''
commands = ' ; '.join(self._commands)
# return '{} bash --login -c \'{}\''.format(prefix, commands)
return commands

def sshOperator(
return SSHOperator(

# 'python /home/faezeh/Desktop/bluemarlin-models/din_model/din_model/trainer/{}'.format(file)

HOME = '/home/faezeh'
DIN_MODEL_PATH = path_join(HOME, 'Desktop/bluemarlin-models/din_model/din_model')
TRAINER_PATH = path_join(DIN_MODEL_PATH, 'trainer')
CONDA_HOME = path_join(HOME, 'anaconda3')
CONDA_BIN = path_join(CONDA_HOME, 'bin')
CONDA_PATH = path_join(CONDA_BIN, 'conda')

init_commands = [
'. ' + path_join(CONDA_HOME, 'etc/profile.d/'),
'conda activate py3.6',
'python --version'

build_commands = [

train_commands = [

save_model_commands = [

rest_client_commands = [

tensorflow_commands = [

build = sshOperator('build_dataset', CommandsBuilder(init_commands + build_commands).build())
train = sshOperator('train', CommandsBuilder(init_commands + train_commands).build())
save_model = sshOperator('save_model',
CommandsBuilder(init_commands + save_model_commands).build())

# tensorflow = sshOperator('tensorflow', CommandsBuilder(init_commands + tensorflow_commands).build())

# rest_client = sshOperator('rest_client', CommandsBuilder(init_commands + rest_client_commands).build())

# build
build >> train >> save_model # >> tensorflow >> rest_client
@@ -3,3 +3,6 @@
For all the announcements, please follow our twitter:

Blue Marlin events, please go to

Blue Marlin meeting notes, please go to

0 comments on commit 755123d

Please sign in to comment.