Skip to content

Commit

Permalink
[AIRFLOW-3175] Fix docstring format in airflow/jobs.py (#4025)
Browse files Browse the repository at this point in the history
These docstrings could not parsed properly in Sphinx syntax
  • Loading branch information
XD-DENG authored and ashb committed Oct 9, 2018
1 parent 4e94997 commit 3d50d94
Showing 1 changed file with 23 additions and 12 deletions.
35 changes: 23 additions & 12 deletions airflow/jobs.py
Expand Up @@ -349,10 +349,10 @@ def _launch_process(result_queue,
:param file_path: the file to process
:type file_path: unicode
:param pickle_dags: whether to pickle the DAGs found in the file and
save them to the DB
save them to the DB
:type pickle_dags: bool
:param dag_id_white_list: if specified, only examine DAG ID's that are
in this list
in this list
:type dag_id_white_list: list[unicode]
:param thread_name: the name to use for the process that is launched
:type thread_name: unicode
Expand Down Expand Up @@ -424,6 +424,7 @@ def start(self):
def terminate(self, sigkill=False):
"""
Terminate (and then kill) the process launched to process the file.
:param sigkill: whether to issue a SIGKILL if SIGTERM doesn't work.
:type sigkill: bool
"""
Expand Down Expand Up @@ -452,6 +453,7 @@ def pid(self):
def exit_code(self):
"""
After the process is finished, this can be called to get the return code
:return: the exit code of the process
:rtype: int
"""
Expand All @@ -463,6 +465,7 @@ def exit_code(self):
def done(self):
"""
Check if the process launched to process this file is done.
:return: whether the process is finished running
:rtype: bool
"""
Expand Down Expand Up @@ -544,16 +547,18 @@ def __init__(
:param dag_ids: if specified, only schedule tasks with these DAG IDs
:type dag_ids: list[unicode]
:param subdir: directory containing Python files with Airflow DAG
definitions, or a specific path to a file
definitions, or a specific path to a file
:type subdir: unicode
:param num_runs: The number of times to try to schedule each DAG file.
-1 for unlimited within the run_duration.
-1 for unlimited within the run_duration.
:type num_runs: int
:param processor_poll_interval: The number of seconds to wait between
polls of running processors
polls of running processors
:type processor_poll_interval: int
:param run_duration: how long to run (in seconds) before exiting
:type run_duration: int
:param do_pickle: once a DAG object is obtained by executing the Python
file, whether to serialize the DAG object to the DB
file, whether to serialize the DAG object to the DB
:type do_pickle: bool
"""
# for BaseJob compatibility
Expand Down Expand Up @@ -782,7 +787,7 @@ def update_import_errors(session, dagbag):
def create_dag_run(self, dag, session=None):
"""
This method checks whether a new DagRun needs to be created
for a DAG based on scheduling interval
for a DAG based on scheduling interval.
Returns DagRun if one is scheduled. Otherwise returns None.
"""
if dag.schedule_interval and conf.getboolean('scheduler', 'USE_JOB_SCHEDULE'):
Expand Down Expand Up @@ -990,7 +995,7 @@ def _change_state_for_tis_without_dagrun(self,
:param new_state: set TaskInstances to this state
:type new_state: State
:param simple_dag_bag: TaskInstances associated with DAGs in the
simple_dag_bag and with states in the old_state will be examined
simple_dag_bag and with states in the old_state will be examined
:type simple_dag_bag: SimpleDagBag
"""
tis_changed = 0
Expand Down Expand Up @@ -1061,7 +1066,7 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None):
dag concurrency, executor state, and priority.
:param simple_dag_bag: TaskInstances associated with DAGs in the
simple_dag_bag will be fetched from the DB and executed
simple_dag_bag will be fetched from the DB and executed
:type simple_dag_bag: SimpleDagBag
:param executor: the executor that runs task instances
:type executor: BaseExecutor
Expand Down Expand Up @@ -1374,7 +1379,7 @@ def _execute_task_instances(self,
3. Enqueue the TIs in the executor.
:param simple_dag_bag: TaskInstances associated with DAGs in the
simple_dag_bag will be fetched from the DB and executed
simple_dag_bag will be fetched from the DB and executed
:type simple_dag_bag: SimpleDagBag
:param states: Execute TaskInstances in these states
:type states: Tuple[State]
Expand Down Expand Up @@ -1483,7 +1488,7 @@ def _log_file_processing_stats(self,
Print out stats about how files are getting processed.
:param known_file_paths: a list of file paths that may contain Airflow
DAG definitions
DAG definitions
:type known_file_paths: list[unicode]
:param processor_manager: manager for the file processors
:type stats: DagFileProcessorManager
Expand Down Expand Up @@ -1789,7 +1794,7 @@ def process_file(self, file_path, pickle_dags=False, session=None):
:param file_path: the path to the Python file that should be executed
:type file_path: unicode
:param pickle_dags: whether serialize the DAGs found in the file and
save them to the db
save them to the db
:type pickle_dags: bool
:return: a list of SimpleDags made from the Dags found in the file
:rtype: list[SimpleDag]
Expand Down Expand Up @@ -2028,6 +2033,7 @@ def _update_counters(self, ti_status):
"""
Updates the counters per state of the tasks that were running. Can re-add
to tasks to run in case required.
:param ti_status: the internal status of the backfill job tasks
:type ti_status: BackfillJob._DagRunTaskStatus
"""
Expand Down Expand Up @@ -2072,6 +2078,7 @@ def _manage_executor_state(self, running):
"""
Checks if the executor agrees with the state of task instances
that are running
:param running: dict of key, task to verify
"""
executor = self.executor
Expand Down Expand Up @@ -2103,6 +2110,7 @@ def _get_dag_run(self, run_date, session=None):
Returns a dag run for the given run date, which will be matched to an existing
dag run if available or create a new dag run otherwise. If the max_active_runs
limit is reached, this function will return None.
:param run_date: the execution date for the dag run
:type run_date: datetime
:param session: the database session object
Expand Down Expand Up @@ -2162,6 +2170,7 @@ def _task_instances_for_dag_run(self, dag_run, session=None):
"""
Returns a map of task instance key to task instance object for the tasks to
run in the given dag run.
:param dag_run: the dag run to get the tasks from
:type dag_run: models.DagRun
:param session: the database session object
Expand Down Expand Up @@ -2227,6 +2236,7 @@ def _process_backfill_task_instances(self,
Process a set of task instances from a set of dag runs. Special handling is done
to account for different task instance states that could be present when running
them in a backfill process.
:param ti_status: the internal status of the job
:type ti_status: BackfillJob._DagRunTaskStatus
:param executor: the executor to run the task instances
Expand Down Expand Up @@ -2464,6 +2474,7 @@ def _execute_for_run_dates(self, run_dates, ti_status, executor, pickle_id,
Computes the dag runs and their respective task instances for
the given run dates and executes the task instances.
Returns a list of execution dates of the dag runs that were executed.
:param run_dates: Execution dates for dag runs
:type run_dates: list
:param ti_status: internal BackfillJob status structure to tis track progress
Expand Down

0 comments on commit 3d50d94

Please sign in to comment.