Написание на Luigi пайплайна для скачивания данных из OpenStreetMap
---

Представим, что мы хотим автоматизировать процесс скачивания данных. Разобъём на 2 подзадачи - первая будет скачвать данные, а вторая обрабатывать и формировать датафрейм.

In [None]:
import luigi
import requests
import json
import pandas as pd
import datetime

### Пример пайплайна для  данных osm

Скачать данные по выбранной категории объектов

Обработка данных

Сохранение для дальнейшего использлвания

Основными параметрами данного пайплайна будет территория, заданная боксом города, категория места, а также атрибут OSM.
Скачаем места по Москве:

In [None]:
# класс отвечающий за выгрузуку данных
class TaskOSMLoading(luigi.Task):
    # определяем параметры: атрибуты, категории и bounding box
    attr = luigi.Parameter()
    category = luigi.Parameter()
    bbox = luigi.Parameter()
    
    # выбираем куда мы будем сохранять данные (json)
    def output(self):
        return luigi.LocalTarget("osm_data/data_osm_{}.json".format(self.category))
    
    # в методе run формируем get-запрос
    def run(self):
        
        overpass_url = "https://overpass.kumi.systems/api/interpreter"
        overpass_query = """
            [out:json];
            ({}[{}]({});
            );
             out body;
                         """.format(self.attr, self.category, self.bbox)
        
        response = requests.get(overpass_url, 
                                params={'data': overpass_query})
        data = response.json()
        
        # сохраняем результат запроса в файл
        with self.output().open('w') as f:
            json.dump(data, f)

Создадим следующую задачу, которая будет принимать на вход данные от этого класса и выдавать некоторый датафрейм.

Теперь `categories = luigi.ListParameter()` определяется как лист. Это нужно чтобы мы могли скачивать не одну какую то категорию, а несколько мест, например магазины, офисы, остановки...

В классе можно определять вспомогательные методы, которые затем можно переиспользовать в `run()`:

In [None]:
class TaskOSMProcessing(luigi.Task):
    # определяем параметры: атрибуты, категории и bounding box
    attr = luigi.Parameter()
    categories = luigi.ListParameter()
    bbox = luigi.Parameter()
    #date = luigi.DateParameter(default = datetime.datetime.now()) 
    
    # метод возвращает не одну зависимую задачу, а список задач - создаём цикл по категориям
    def requires(self):
        # сначала скачиваются все места по всем категориям, а затем будет произведена обработка данных
        return [TaskOSMLoading(self.attr, category, self.bbox) for category in self.categories]
    
    # определяем куда мы сохраняем итоговый результат
    def output(self):
        return luigi.LocalTarget('osm_data/results_s.csv')
    
    # чтобы не перегружать метод run выносим часть кода сюда
    def get_tag(self, x, key_name):
        if str(key_name) in x:
            tmp = x[key_name]
            return tmp
        else:
            return 'unknown'
    
    # в методе run вся основная логика обработки данных
    def run(self):
        
        all_osm = pd.DataFrame()
        # проходимся по всем скачанным файлам
        for _input in self.input():
            with _input.open('r') as raw_file:
                # преобразовываем их в датафрейм
                data = pd.DataFrame((json.load(raw_file))['elements'])
                all_osm = all_osm.append(data)
        
        # вытаскиваем нужную категорию
        all_osm['amenity'] = all_osm['tags'].apply(lambda x: self.get_tag(x, 'amenity'))
        
        # сохраняем результат в csv
        with self.output().open('w') as f:
            all_osm[['id','lat','lon','amenity']].to_csv(f, encoding='utf8')

Запускаем нашу задачу. Для этого определяем какие параметры мы передаём:
- node - потому что вытаскиваем места
- далее лист категорий
- bounding box Москвы

In [None]:
if __name__ == '__main__':
    luigi.build([TaskOSMProcessing('node',['amenity','shop','office','bus_stop'],
                                   "55.1422,36.8031,56.0212,37.9674")])

DEBUG: Checking if TaskOSMProcessing(attr=node, categories=["amenity", "shop", "office", "bus_stop"], bbox=55.1422,36.8031,56.0212,37.9674) is complete
DEBUG: Checking if TaskOSMLoading(attr=node, category=amenity, bbox=55.1422,36.8031,56.0212,37.9674) is complete
DEBUG: Checking if TaskOSMLoading(attr=node, category=shop, bbox=55.1422,36.8031,56.0212,37.9674) is complete
DEBUG: Checking if TaskOSMLoading(attr=node, category=office, bbox=55.1422,36.8031,56.0212,37.9674) is complete
DEBUG: Checking if TaskOSMLoading(attr=node, category=bus_stop, bbox=55.1422,36.8031,56.0212,37.9674) is complete
INFO: Informed scheduler that task   TaskOSMProcessing_node_55_1422_36_8031____amenity____sho_badb57d548   has status   PENDING
INFO: Informed scheduler that task   TaskOSMLoading_node_55_1422_36_8031__bus_stop_cc6b2eb261   has status   PENDING
INFO: Informed scheduler that task   TaskOSMLoading_node_55_1422_36_8031__office_a8ea274f7d   has status   PENDING
INFO: Informed scheduler that task   Ta

<img src=luigi_dependencies.png>

#### Плюсы Luigi 

- Luigi можно запускать для тестирования пайплайна в режиме `--local-sheduler`, также можно запускать через центральный планировщик с аргументом `--central-sheduler`  
- Довольно простой интерфейс - по факту для написания пайплайна нужно оперировать только тремя сущностями;
- Неплохая документация;
- Визуализация графа задач;
- Перезапуск задачи не приводит к потере данных предыдущих задач;
- Удобная обработка аргументов, передаваемые через терминал;

### Ограничения Luigi

- Не поддерживает real-time обработку
- Отсутствует запуск задач по расписанию

### Другие библиотеки

- `airflow` - библиотека от airbnb, сейчас выложена в открытом доступе
- `celery` - подходит для real-time обработки