### Использование NiFi API и библиотеки Python NiPyAPI для управления потоком данных 

#### 1. Установка необходимых библиотек

#### 2. Импорт библиотеки взаимодействия с NiFi через API и присваивание хоста и порта

In [1]:
"https://nipyapi.readthedocs.io/en/latest/index.html"
"https://github.com/Chaffelson/nipyapi/tree/main/nipyapi"

import nipyapi

In [None]:
nipyapi.config.nifi_config.host = 'http://10.129.0.54:8443/nifi-api' # put your nifi host here

#### 3. Установка соединения и запрос на получение списка процессоров

In [3]:
#Команда отмены проверки сертификатов 
nipyapi.config.nifi_config.ssl_ca_cert = False

#Команда получения корневой процесс группы
root_pg = nipyapi.canvas.get_root_pg_id()

#Команда получания списка процессоров из корневой процесс группы
processors = nipyapi.canvas.list_all_processors(root_pg)

#Вывод списка процессоров - имя и ID
for processor in processors:
    print(f"Processor Name: {processor.component.name}, Processor ID: {processor.id}")

print(f"Корневой процесс {root_pg}")

Processor Name: MergeRecord, Processor ID: e3a0fc2c-d8fe-371e-62e1-551a5fe20b84
Processor Name: AttributesToJSON, Processor ID: 13081c36-5f86-3cc4-f6f4-5427b60f326c
Processor Name: Re-Merge, Processor ID: 002fedd7-1834-3efb-7cbc-3eda1bf43e55
Processor Name: SplitRecord, Processor ID: 32dfd48c-c427-38ba-991c-f5e7e8cc65b8
Processor Name: AttributesToCSV, Processor ID: 9ec3d95a-4358-3c7e-77bd-ca32dbb81717
Processor Name: JoinEnrichment, Processor ID: 3c12312e-3fa5-32b1-735a-4daf2323158f
Processor Name: GetFile, Processor ID: 75af742d-cab8-380a-d618-6a3cc3b3d679
Processor Name: FlattenJson, Processor ID: 829d0c85-69a0-3b62-cbb1-3661e663b227
Processor Name: ExecuteSQL, Processor ID: eb4e6916-1775-35a9-7097-7b9a712737db
Processor Name: MergeContent, Processor ID: f94b8768-84fe-31d7-aea6-08cf538afa04
Processor Name: GenerateFlowFile, Processor ID: 595da082-2077-38a1-dd83-602883880cb3
Processor Name: InvokeHTTP, Processor ID: 7ffa9e82-2305-3a95-0645-f2a59bab1023
Processor Name: PutFile, Proces

#### 4. Запуск и остановка процессора по его имени

In [5]:
# Запуск процессора по имени с кучей дополнений
processor_name = 'StartFromAPI'
processors = nipyapi.canvas.list_all_processors()
processor_entity = None
for proc in processors:
    if proc.component.name == processor_name:
        processor_entity = proc
        break

if processor_entity:
    # Запуск процессора
    switcher = False # True - запуск процессора и False - остановка
    nipyapi.canvas.schedule_processor(processor_entity, switcher)  
    print(f"Процессор {processor_name} сейчас запущен - {switcher}")
else:
    print(f"Процессор {processor_name} не найден.")

Процессор StartFromAPI сейчас запущен - False


In [6]:
# Однократный запуск процессора (RunOnce)
import time
import nipyapi

# Укажите имя процессора
processor_name = 'StartFromAPI'

# Получение списка всех процессоров
processors = nipyapi.canvas.list_all_processors()
processor_entity = None

# Поиск процессора по имени
for proc in processors:
    if proc.component.name == processor_name:
        processor_entity = proc
        break

if processor_entity:
    # Запуск процессора
    nipyapi.canvas.schedule_processor(processor_entity, True)
    print(f"Процессор {processor_name} запущен")

    # Проверка состояния процессора и ожидание завершения работы
    while processor_entity.status.aggregate_snapshot.active_thread_count > 0:
        time.sleep(1)
        processor_entity = nipyapi.canvas.get_processor(processor_entity.id, 'id')

    # Остановка процессора
    nipyapi.canvas.schedule_processor(processor_entity, False)
    print(f"Процессор {processor_name} остановлен")
else:
    print(f"Процессор {processor_name} не найден.")

Процессор StartFromAPI запущен
Процессор StartFromAPI остановлен


#### 5. Изменить атрибут процессора по его ID
- Список всех атрибутов объекта canvas

In [6]:
import nipyapi

# Print all attributes and methods of the canvas module
print(dir(nipyapi.canvas))

['__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', '_raise', 'absolute_import', 'create_connection', 'create_controller', 'create_funnel', 'create_port', 'create_process_group', 'create_processor', 'create_remote_process_group', 'delete_connection', 'delete_controller', 'delete_funnel', 'delete_port', 'delete_process_group', 'delete_processor', 'delete_remote_process_group', 'exception_handler', 'get_bulletin_board', 'get_bulletins', 'get_component_connections', 'get_controller', 'get_controller_type', 'get_flow', 'get_funnel', 'get_pg_parents_ids', 'get_process_group', 'get_process_group_status', 'get_processor', 'get_processor_type', 'get_remote_process_group', 'get_root_pg_id', 'get_variable_registry', 'list_all_by_kind', 'list_all_connections', 'list_all_controller_types', 'list_all_controllers', 'list_all_funnels', 'list_all_input_ports', 'list_all_output_ports', 'list_all_process_groups', 'list_all_processor_types

#### 6. Информация о текущем потоке

In [7]:
from nipyapi import canvas, config

# Set up the NiFi connection
config.nifi_config.host = 'https://10.129.0.54:8443/nifi-api'

print(canvas.get_flow())

{'permissions': {'can_read': True, 'can_write': True},
 'process_group_flow': {'breadcrumb': {'breadcrumb': {'id': '33699316-0199-1000-14fc-27edac8e126e',
                                                      'name': 'NiFi Flow',
                                                      'version_control_information': None},
                                       'id': '33699316-0199-1000-14fc-27edac8e126e',
                                       'parent_breadcrumb': None,
                                       'permissions': {'can_read': True,
                                                       'can_write': True},
                                       'versioned_flow_state': None},
                        'flow': {'connections': [{'bends': [],
                                                  'bulletins': None,
                                                  'component': {'available_relationships': ['success'],
                                                                'back_pre

#### 7. Справочная информация

In [8]:
import pydoc

# Получение справочной информации о модуле canvas
pydoc.help('nipyapi.canvas')

Help on module nipyapi.canvas in nipyapi:

NAME
    nipyapi.canvas - For interactions with the NiFi Canvas.

FUNCTIONS
    create_connection(source, target, relationships=None, name=None)
        Creates a connection between two objects for the given relationships

        Args:
            source: Object to initiate the connection, e.g. ProcessorEntity
            target: Object to terminate the connection, e.g. FunnelEntity
            relationships (list): list of strings of relationships to connect, may
                be collected from the object 'relationships' property (optional)
            name (str): Defaults to None, String of Name for Connection (optional)

        Returns:
            (ConnectionEntity): for the created connection

    create_controller(parent_pg, controller, name=None)
        Creates a new Controller Service in a given Process Group of the given
            Controller type, with the given Name

        Args:
            parent_pg (ProcessGroupEntity): Ta

#### 8. Автоматическое построение простого потока и его запуск

In [8]:
from nipyapi import canvas, config, nifi
from random import randrange

# 1. Настройка подключения
config.nifi_config.host = 'http://10.129.0.54:8080/nifi-api' 

# 2. Случайные координаты на canvas
location = (randrange(0, 4000), randrange(0, 4000))

# 3. Конфигурации процессоров
processor_GenFlowFile_config = {
    "properties": {
        "generate-ff-custom-text": "Hello World from Big Data School"
    },
    "schedulingPeriod": "10 sec",
    "schedulingStrategy": "TIMER_DRIVEN"
}

processor_PutFile_config = {
    "properties": {
        "Directory": "/home/ubuntu/test_pg",
        "Conflict Resolution Strategy": "replace",
        "Maximum File Count": "100"
    },
    "autoTerminatedRelationships": ["failure", "success"]
}

# 4. Получение root PG
root_pg_id = canvas.get_root_pg_id()
root_pg = canvas.get_process_group(root_pg_id, identifier_type='id')

# 5. Создание новой PG
new_pg = canvas.create_process_group(
    parent_pg=root_pg,
    new_pg_name='Test_process_group',
    location=location,
    comment='Demo group'
)
print(f"Создана процессная группа: {new_pg.id}")

# 6. Получение типов процессоров
gen_type = canvas.get_processor_type('org.apache.nifi.processors.standard.GenerateFlowFile', identifier_type='name')
put_type = canvas.get_processor_type('org.apache.nifi.processors.standard.PutFile', identifier_type='name')

# 7. Создание процессоров
p1 = canvas.create_processor(parent_pg=new_pg, processor=gen_type, location=(200, 200), config=processor_GenFlowFile_config)
p2 = canvas.create_processor(parent_pg=new_pg, processor=put_type, location=(200, 400), config=processor_PutFile_config)

# 8. Соединение процессоров
canvas.create_connection(p1, p2, relationships=['success'])
print("Процессоры соединены.")

# 9. Запуск PG
canvas.schedule_process_group(new_pg.id, scheduled=True)
print("Процессная группа запущена.")

Создана процессная группа: 3e1d8af0-0199-1000-5863-0790af5cef79
Процессоры соединены.
Процессная группа запущена.


In [6]:
from nipyapi import canvas, config

config.nifi_config.host = 'http://10.129.0.254:8443/nifi-api' 

pg_id = canvas.get_root_pg_id()
pg = canvas.get_process_group(pg_id, identifier_type='id')
print(pg.id, pg.component.name)

018380c4-0199-1000-566f-3c43fff5b0a3 NiFi Flow


#### 9. Координаты расположения существующих объектов на канве

In [22]:
from nipyapi import canvas, config

# Настройка подключения к NiFi API
config.nifi_config.host = 'https://10.129.0.84:8443/nifi-api'

# Получение ID корневой группы процессов
root_id = canvas.get_root_pg_id()

# Получение всех процессоров в корневой группе процессов
processors = canvas.list_all_processors(root_id)

# Вывод имен и расположения всех процессоров
for processor in processors:
    name = processor.component.name
    position = processor.component.position
    print(f"Processor Name: {name}, Location: (x: {position.x}, y: {position.y})")

Processor Name: UpdateAttribute, Location: (x: 3112.6260513529605, y: 1794.5897424961859)
Processor Name: AttributesToJSON, Location: (x: 1008.0, y: 392.0)
Processor Name: ForkEnrichment, Location: (x: 1478.690983221707, y: 859.1398666752482)
Processor Name: GenerateFlowFile, Location: (x: -184.0, y: -96.0)
Processor Name: GenerateFlowFile, Location: (x: 200.0, y: 200.0)
Processor Name: QueryDatabaseTableRecord, Location: (x: 424.0, y: 200.0)
Processor Name: LogAttribute, Location: (x: 1656.0, y: 776.0)
Processor Name: EvaluateJsonPath, Location: (x: -248.0, y: 424.0)
Processor Name: SplitToJsons, Location: (x: 384.0, y: 392.0)
Processor Name: PutFile, Location: (x: 2000.0, y: 2200.0)
Processor Name: JoltTransformJSON, Location: (x: -184.0, y: 368.0)
Processor Name: UpdateAttribute, Location: (x: 408.0, y: 248.0)
Processor Name: GenerateFlowFile, Location: (x: 200.0, y: 200.0)
Processor Name: HandleHttpResponse, Location: (x: 968.0, y: -160.0)
Processor Name: GenerateFlowFile, Location