In [1]:
import bonobo
import requests
from bonobo.config import use_context_processor
from bonobo.config import use

In [23]:
# используем http из bonobo
@use('http')
def extract(http):
    """
        Получаем данные с сервиса
    """
    yield from http.get('http://89.253.237.183:9876/data').json().get('data')


def get_services():
    """
       Указываем тип сервиса
    """
    http = requests.Session()
    return {
        'http': http
    }    
    
    
def with_opened_file(self, context):
    """
        Файл для записи результатов
    """
    with open('output.txt', 'w+',  encoding='utf-8') as f:
        yield f

# на основе контекста, который работыет с файлом
# мы создаем контролиремый процесс записи
@use_context_processor(with_opened_file)
def write_repr_to_file(f, *row):
    """
       Записываем все полученные строки в файл
    """
    f.write(repr(row) + "\n")


def get_graph(**options):
    """
       создаем граф выполнения
    """
    
    graph = bonobo.Graph()
    
    graph.add_chain(
        extract,
        write_repr_to_file,
    )
    return graph




In [24]:
# выполним и посмотрим на результат

bonobo.run(get_graph(), services=get_services())

BonoboWidget()

<bonobo.execution.contexts.graph.GraphExecutionContext at 0x29809a168b0>

### Пару слов о Generators | Iterators

![](iter.png)

In [53]:
some_steps = iter(range(0, 10))
for i in some_steps:
    print(i)

0
1
2
3
4
5
6
7
8
9


In [54]:
next(some_steps)

StopIteration: 

для доступа к содержимому агрегированных объектов без раскрытия их внутреннего представления;

для поддержки нескольких активных обходов одного и того же агрегированного объекта (желательно, но не обязательно);

для предоставления единообразного интерфейса с целью обхода различных агрегированных структур.

In [39]:
# запомним! Основная необходимость самому создавать итератор
# => это использовать в определенный момент одно значение, а не хранить весь список
# например, вы делаете сложное вычисление или создаете тяжелый объекты

# Пример - класс итератор
class SuperCounter:
    
    # создаение максимального объекта (устанавливает пользователь)
    def __init__(self,max=1):
        self.max=max
    
    # делаем указатель итератора
    def __iter__(self):
        self.n=1
        return self
    
    # указываем действия при вызове элемента
    def __next__(self):
        if self.n<=self.max:
            # если элементы ещё существуют в списке
            result=(2**self.n)**3
            self.n+=1
            return result
        else:
            # если закончились элементы
            raise StopIteration
            
            
a = iter(SuperCounter(2))
next(a)

8

In [None]:
# итерируемый объект — это любой объект, от которого встроенная функция iter() может получить итератор

# итератор в python — это любой объект, реализующий метод __next__ без аргументов, который должен вернуть следующий элемент или ошибку StopIteration


In [49]:
# generator
# объект, который создает итерируеую последовательность
# ключевое слово yield (а не return)

def myGenerator():
    i = 0
    
    while (i <= 5):
        # делаем возврат yeild, а не return
        yield i
        
        #и после yield мы можем ещё делать действия
        i+=1
        
# получаем интересный объект
# Объект-генератор реализует интерфейс итератора, соответственно с этим объектом можно работать, как с любым другим итерируемым объектом.
myGenerator()

<generator object myGenerator at 0x0000029809E8BF90>

yeld:

при вызове функции myGenerator создается объект-генератор

в цикле вызывает функция next() с этим итератором пока не будет получено исключение StopIteration

при каждом вызове next выполнение в функции начинается с того места где было завершено в последний раз и продолжается до следующего yield

равнозначно:

```
def myGenerator_1(i):
    if i <= 1:
        yield i
...

def myGenerator_5(i):
    if i <= 5:
        yield i

def myGenerator_end(i):
    raise StopIteration

```

In [50]:
# как его получить

# 1
print(list(myGenerator()))

# 2
for i in myGenerator():
    print(i)

[0, 1, 2, 3, 4, 5]
0
1
2
3
4
5


In [55]:
# действие от события, без остановки основного выполнения
def myGenerator(x):
    while(x>0):
        # исполняем до ожидаемого события
        if x%2==0:
            # действие произошло без блокировани (остаовки) выполнения функции
            yield 'Event'
            
        else:
            pass
        
        x-=1
        
for i in myGenerator(9):
    print(i)

Event
Event
Event
Event


In [51]:
myGenerator().__sizeof__()

96

In [52]:
iter([0, 1, 2, 3, 4, 5]).__sizeof__()

32

### Bonobo ETL

In [149]:
# используем http из bonobo
#Extract
@use('http')
def extract(http):
    """
        Получаем данные с сервиса
    """
    
    # сделаем выбор объекта, который будем возвращать
    yield http.get('http://89.253.237.183:9876/data').json().get('data')
    
    #return http.get('http://89.253.237.183:9876/data').json().get('data')
    
    #dct = http.get('http://89.253.237.183:9876/data').json().get('data')
    #for i in dct.keys():
    #    yield (i, dct[i])
        
    

def get_services():
    """
       Указываем тип сервиса
    """
    http = requests.Session()
    return {
        'http': http
    } 
    


###########
#transform
def transform(*args):
    for i in args[0].keys():
        if 'Zip' in i:
            yield (i, f"100{args[0][i]}")
            
            
#############
#Load
def with_opened_file(self, context):
    """
        Файл для записи результатов
    """
    with open('output.txt', 'w+',  encoding='utf-8') as f:
        yield f

# на основе контекста, который работыет с файлом
# мы создаем контролиремый процесс записи
@use_context_processor(with_opened_file)
def write_repr_to_file(f, *row):
    """
       Записываем все полученные строки в файл
    """
    f.write(",".join(row) + "\n")    
    
    
def get_graph(**options):
    """
       создаем граф выполнения
    """
    
    graph = bonobo.Graph()
    
    graph.add_chain(
        extract,
        transform,
        bonobo.PrettyPrinter(),
        bonobo.Limit(3),
        write_repr_to_file,
        
    )
    return graph




In [150]:
# выполним и посмотрим на результат

bonobo.run(get_graph(), services=get_services())

BonoboWidget()

0,1
'Zip_94041','1000'
'Zip_94063','1000'
'Zip_94107','1000'
'Zip_94301','1000'
'Zip_95113','1001'


<bonobo.execution.contexts.graph.GraphExecutionContext at 0x2980b5be160>

### Bonobo + Kedro

In [157]:
from kedro.context import load_context
from kedro.extras.datasets.api import APIDataSet

def kedro_de_pipeline():
    """
        подключаем нужный pipeline
    """
    context = load_context("../")
    context.run(pipeline_name='de')

def get_graph(**options):
    """
       создаем граф выполнения
    """
    
    graph = bonobo.Graph()
    
    graph.add_chain(
        kedro_de_pipeline        
    )
    return graph


  and should_run_async(code)


In [156]:
bonobo.run(get_graph())

BonoboWidget()

2020-12-17 19:01:36,934 - root - INFO - ** Kedro project p7
2020-12-17 19:01:40,766 - kedro.io.data_catalog - INFO - Loading data from `station_data` (CSVDataSet)...




2020-12-17 19:01:41,007 - kedro.io.data_catalog - INFO - Loading data from `trip_data` (CSVDataSet)...
2020-12-17 19:01:44,719 - kedro.io.data_catalog - INFO - Loading data from `weather_data` (CSVDataSet)...
2020-12-17 19:01:44,758 - kedro.io.data_catalog - INFO - Loading data from `params:station_nes_names` (MemoryDataSet)...
2020-12-17 19:01:44,760 - kedro.io.data_catalog - INFO - Loading data from `params:zip_code` (MemoryDataSet)...
2020-12-17 19:01:44,762 - kedro.io.data_catalog - INFO - Loading data from `params:trip_nes_names` (MemoryDataSet)...
2020-12-17 19:01:44,764 - kedro.io.data_catalog - INFO - Loading data from `params:map_id` (MemoryDataSet)...
2020-12-17 19:01:44,767 - kedro.pipeline.node - INFO - Running node: preparator([params:map_id,params:station_nes_names,params:trip_nes_names,params:zip_code,station_data,trip_data,weather_data]) -> [clean_station_data,clean_trip_data,clean_weather_data]
2020-12-17 19:01:45,186 - kedro.io.data_catalog - INFO - Saving data to `cl

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  return super().replace(


2020-12-17 19:01:45,440 - kedro.io.data_catalog - INFO - Saving data to `idx` (MemoryDataSet)...
2020-12-17 19:01:45,443 - kedro.runner.sequential_runner - INFO - Completed 2 out of 6 tasks
2020-12-17 19:01:45,445 - kedro.io.data_catalog - INFO - Loading data from `clean_weather_data` (MemoryDataSet)...
2020-12-17 19:01:45,447 - kedro.pipeline.node - INFO - Running node: cleanning([clean_weather_data]) -> [clean_weather_data_new]
2020-12-17 19:01:45,737 - numexpr.utils - INFO - NumExpr defaulting to 4 threads.
2020-12-17 19:01:45,849 - kedro.io.data_catalog - INFO - Saving data to `clean_weather_data_new` (MemoryDataSet)...
2020-12-17 19:01:45,851 - kedro.runner.sequential_runner - INFO - Completed 3 out of 6 tasks
2020-12-17 19:01:45,852 - kedro.io.data_catalog - INFO - Loading data from `clean_trip_data` (MemoryDataSet)...
2020-12-17 19:01:45,866 - kedro.io.data_catalog - INFO - Loading data from `idx` (MemoryDataSet)...
2020-12-17 19:01:45,870 - kedro.io.data_catalog - INFO - Loadin

<bonobo.execution.contexts.graph.GraphExecutionContext at 0x2981abb0970>