In [1]:
import logging
import os
import glob
from airflow.operators.sensors import BaseSensorOperator
from airflow.utils.decorators import apply_defaults


[2017-04-26 14:25:11,301] {__init__.py:57} INFO - Using executor SequentialExecutor


In [2]:

class LocalFileSensor(BaseSensorOperator):
    """
    Waits for a file to be present in the local filesystem.
    
    
    """
    @apply_defaults
    def __init__(
            self,
            filepath,
            filesystem_conn_id='localfs_default',
            *args, **kwargs):
        super(LocalFileSensor, self).__init__(*args, **kwargs)
        # Parse
        self.filepath = filepath
        self.filesystem_conn_id = filesystem_conn_id
        
        
    def poke(self, context):
        return os.path.isfile(self.filepath)

In [4]:
fs = LocalFileSensor(task_id = 'fs', filepath='Untitled.ipynb', filesystem_conn_id = 'hive_default')

In [6]:
fs.poke(context=None)

True

In [None]:
class WildcardLocalFileSensor(BaseSensorOperator):
    """
    Waits for a file to be present in the local filesystem.

    :param filepath: The path to the file
    :type filepath: str
    :param wildcard_match: whether the filename should be interpreted as a
        Unix wildcard pattern
    :type wildcard_match: bool
    :param filesystem_conn_id: a reference to the localfs connection
    :type filesystem_conn_id: str
    """
    template_fields = ('filepath',)

    @apply_defaults
    def __init__(
            self, filepath,
            wildcard_match=False,
            filesystem_conn_id='localfs_default',
            *args, **kwargs):
        super(LocalFileSensor, self).__init__(*args, **kwargs)
        # Parse
        self.filepath = filepath
        self.wildcard_match = wildcard_match
        self.filesystem_conn_id = filesystem_conn_id

    def poke(self, context):
        path = self.filepath
        if not self.wildcard_match:
            path = os.path.realpath(self.filepath)
        logging.info('Poking for file matching : {}'.format(path))
        if self.wildcard_match:
            results = glob.glob(path)
            if len(results) > 1:
                logging.warning('Several files match the sensor: %s',
                                '\n'.join(results))
            return bool(results)
        else:
            return os.path.isfile(path)