In [0]:
import luigi
import requests
import json
import pandas as pd
import datetime

<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc" style="margin-top: 1em;"><ul class="toc-item"><li><span><a href="#Пример-пайплайна-для--данных-osm" data-toc-modified-id="Пример-пайплайна-для--данных-osm-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Пример пайплайна для  данных osm</a></span><ul class="toc-item"><li><span><a href="#Плюсы-Luigi" data-toc-modified-id="Плюсы-Luigi-1.1"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>Плюсы Luigi</a></span></li></ul></li><li><span><a href="#Ограничения-Luigi" data-toc-modified-id="Ограничения-Luigi-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Ограничения Luigi</a></span></li><li><span><a href="#Другие-библиотеки" data-toc-modified-id="Другие-библиотеки-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Другие библиотеки</a></span></li></ul></div>

### Пример пайплайна для  данных osm

Скачать данные по выбранной категории объектов

Обработка данных

Сохранение для дальнейшего использлвания

Основными параметрами данного пайплайна будет территория, заданная ббоксом города, категория места, а также атрибут OSM.
Скачаем места по Москве:

In [0]:
class TaskOSMLoading(luigi.Task):
    
    attr = luigi.Parameter()
    category = luigi.Parameter()
    bbox = luigi.Parameter()
    
    def output(self):
        return luigi.LocalTarget("osm_data/data_osm_{}.json".format(self.category))
        
    def run(self):
        
        overpass_url = "https://overpass.kumi.systems/api/interpreter"
        overpass_query = """
            [out:json];
            ({}[{}]({});
            );
             out body;
                         """.format(self.attr, self.category, self.bbox)
        
        response = requests.get(overpass_url, 
                                params={'data': overpass_query})
        data = response.json()
        
        with self.output().open('w') as f:
            json.dump(data, f)

В классе можно определять вспомогательные методы, которые затем можно переиспользовать в `run()`:

In [0]:
class TaskOSMProcessing(luigi.Task):
    
    attr = luigi.Parameter()
    categories = luigi.ListParameter()
    bbox = luigi.Parameter()
    #date = luigi.DateParameter(default = datetime.datetime.now()) 
    
    def requires(self):
        return [TaskOSMLoading(self.attr, category, self.bbox) for category in self.categories]
    
    def output(self):
        return luigi.LocalTarget('osm_data/results_s.csv')
    
    def get_tag(self, x, key_name):
        if str(key_name) in x:
            tmp = x[key_name]
            return tmp
        else:
            return 'unknown'
    
    def run(self):
        
        all_osm = pd.DataFrame()
        for _input in self.input():
            with _input.open('r') as raw_file:
                data = pd.DataFrame((json.load(raw_file))['elements'])
                all_osm = all_osm.append(data)
        
        all_osm['amenity'] = all_osm['tags'].apply(lambda x: self.get_tag(x, 'amenity'))
        
        with self.output().open('w') as f:
            all_osm[['id','lat','lon','amenity']].to_csv(f, encoding='utf8')

In [0]:
if __name__ == '__main__':
    luigi.build([TaskOSMProcessing('node',['amenity','shop','office','bus_stop'],
                                   "55.1422,36.8031,56.0212,37.9674")])

DEBUG: Checking if TaskOSMProcessing(attr=node, categories=["amenity", "shop", "office", "bus_stop"], bbox=55.1422,36.8031,56.0212,37.9674) is complete
DEBUG: Checking if TaskOSMLoading(attr=node, category=amenity, bbox=55.1422,36.8031,56.0212,37.9674) is complete
DEBUG: Checking if TaskOSMLoading(attr=node, category=shop, bbox=55.1422,36.8031,56.0212,37.9674) is complete
DEBUG: Checking if TaskOSMLoading(attr=node, category=office, bbox=55.1422,36.8031,56.0212,37.9674) is complete
DEBUG: Checking if TaskOSMLoading(attr=node, category=bus_stop, bbox=55.1422,36.8031,56.0212,37.9674) is complete
INFO: Informed scheduler that task   TaskOSMProcessing_node_55_1422_36_8031____amenity____sho_badb57d548   has status   PENDING
INFO: Informed scheduler that task   TaskOSMLoading_node_55_1422_36_8031__bus_stop_cc6b2eb261   has status   PENDING
INFO: Informed scheduler that task   TaskOSMLoading_node_55_1422_36_8031__office_a8ea274f7d   has status   PENDING
INFO: Informed scheduler that task   Ta

<img src=luigi_dependencies.png>

#### Плюсы Luigi 

Luigi можно запускать для тестирования пайплайна в режиме `--local-sheduler`, также можно запускать через центральный планировщик с аргументом `--central-sheduler`  

Довольно простой интерфейс - по факту для написания пайплайна нужно оперировать только тремя сущностями;

Неплохая документация;

Визуализация графа задач;

Перезапуск задачи не приводит к потере данных предыдущих задач;

Удобная обработка аргументов, передаваемые через терминал;

### Ограничения Luigi

Не поддерживает real-time обработку

Отсутствует запуск задач по расписанию

### Другие библиотеки

airflow - библиотека от airbnb, сейчас выложена в открытом доступе

celery - подходит для real-time обработки