In [1]:
import time
from multiping import multi_ping
from journal import journal_template_fill
from common import *
from push_config import *
from change_intf import *
from flask import request, Response
from flask import Flask
from config import netbox_api
from config import *
from jinja2 import Environment, FileSystemLoader
from nornir_napalm.plugins.tasks import napalm_get, napalm_configure
import ipaddress
from nornir_utils.plugins.functions import print_result
import json
from nornir.core.task import Task, Result
from nornir_jinja2.plugins.tasks import template_file
from nornir_netmiko.tasks import netmiko_send_command, netmiko_send_config

import re

from credentials import(netbox_url,
                        netbox_token,
                        device_username,
                        device_password)
from nornir import InitNornir
from nornir.core.filter import F
from deepdiff import DeepDiff
from deepdiff import grep

from global_var import(global_id,
                        global_dcim,
                        templates_path)

In [2]:
def create_nornir_session():
    """ 
    Инициализируем nornir, но для "hosts" используем данные из netbox
    :return: nr_session
    """
    nr_session = InitNornir(
        inventory={
            "plugin": "NetBoxInventory2",
            "options": {
                "nb_url": netbox_url,
                "nb_token": netbox_token,
                "group_file": "./inventory/groups.yml",
                "defaults_file": "./inventory/defaults.yml",
            },
        },
    )
    return nr_session

In [3]:
global_id = 0 # ID устройства, нужен для логирования
global_dcim = 'dcim.device' # нужен для логирования (зависит от настроек webhook 'Content types')
templates_path = "./templates/"

Заполняем журнал

In [4]:
def journal_template_fill(comment,level,assigned_object_id=global_id,dcim=global_dcim):
    """ 
    Заполнение шаблона значениями
    :param comment: событие 
    :param level: уровень важности
    :return: journal """
     
    
    if global_id !=0:
        template_file = "netbox_journals.template"
        environment = Environment(loader=FileSystemLoader(templates_path)) # загружаем шаблон для заполнения
        template = environment.get_template(template_file)
        journal = None
        wow = ''
        netbox_level = ''
        
        kind = ['emergency','alert','critical','error','warning','notification','informational','debugging',]
        set_kind = tuple(kind) # преобразуем список в кортеж
        
        if level in kind: 
            log_level = set_kind.index(level)
            
            if log_level in range(2):
                netbox_level = 'danger'
                wow = '!!!'
            
            elif log_level in range(2,5):
                netbox_level = 'warning'
                wow = '!'
            
            elif log_level in range(5,7):
                netbox_level = 'info'
                wow = '.'
        
        else: 
            netbox_level = 'success'
            wow = ''
        
        try:
            journal = template.render( # заполняем шаблон
                                    assigned_object_type = dcim,
                                    assigned_object_id = assigned_object_id,
                                    created_by = '1',
                                    kind = netbox_level,
                                    comments = comment + '{}'.format(wow)
                                    )
        except: print("not all arguments have been transmitted...")
        else: 
        
            if journal != None:
                journal = json.loads(journal)
                # POST journal
                netbox_api.extras.journal_entries.create([journal])
                print(level.upper()+': '+comment+wow)

In [5]:
def tup_to_dict(tup, dict = {}):
    """
    Конвертируем кортеж в словарь
    :param tup: кортеж
    :param dict: словарь
    :return: dict
    """
    for x, y in tup:
        dict.setdefault(x, []).append(y)
    return dict

In [6]:
def parse_interface_name(interface_name):
    """
    Given an interface name, split the string into the type of interface
    and the interface ID.  Use for generating RESTCONF URLs requiring an
    interface specifier.
    :param interface_name: String - name of the interface to parse
    :return: Tuple of (interface type, interface ID)
    """
    interface_pattern = r"^(\D+)(\d+.*)$"
    interface_regex = re.compile(interface_pattern)

    interface_type, interface_id = interface_regex.match(str(interface_name)).groups()

    return interface_type, interface_id

Создаем функцию извлечения IPv4

In [7]:
def configure_interface_ipv4_address(netbox_ip_address='0.0.0.0'):
    """
    Извлекаем IPv4 адреса, маски, сети, префикса, шлюза и возвращем словарь
    :param netbox_ip_address: IPv4 адрес (IP/Prefix)
    :return: ipv4_dic
    """
        
    ipv4_dic = dict()
    try:
        ip_cidr = ipaddress.ip_interface(netbox_ip_address)
        
        ipv4_dic['ip4_address'] = format(ipaddress.IPv4Interface(ip_cidr).ip)
        ipv4_dic['ip4_netmask'] = format(ipaddress.IPv4Interface(ip_cidr).netmask)
        ipv4_dic['ip4_network'] = format(ipaddress.IPv4Interface(ip_cidr).network)
        ipv4_dic['ip4_prefix'] = format(ipaddress.IPv4Network(ipv4_dic['ip4_network']).prefixlen)
        ipv4_dic['ip4_broadcast'] = format(ipaddress.IPv4Network(ipv4_dic['ip4_network']).broadcast_address)
        
        if (ipaddress.IPv4Network(ipv4_dic['ip4_network']).num_addresses) > 1:
            ipv4_dic['ip4_gateway'] = format(list(ipaddress.IPv4Network(ipv4_dic['ip4_network']).hosts())[-1])
        return(ipv4_dic)
    
    except ValueError:
        # > добавляем запись в журнал
        comment,level = 'address/netmask is invalid for IPv4 {}'.format(netbox_ip_address),'error'                
        print(journal_template_fill(comment,level,global_id,global_dcim))
        # <

Создаем функцию получения адреса management интерфейса

In [8]:
def mgmt_address(device_interface):
    """
    Извлечение IPv4 адреса интерефейса управления.
    :param device_interface: ссылка на объект интерфейса pynetbox
    :return: mgmt_ip
    """    

    interface_type, interface_id = parse_interface_name(device_interface.name)
    mgmt_ip = configure_interface_ipv4_address(device_interface.device.primary_ip)['ip4_address']
    
    return(mgmt_ip)

In [9]:
def compare(prechange, postchange):
    """ 
    Сравниваем два объекта и возращаем словарь
    :param prechange: старые данные 
    :param prechange: новые данные
    :return: change
    """   
    compare = DeepDiff(prechange,postchange,exclude_paths="root['last_updated']")
    change = dict()
    change_key = []
    new_value = []

    for key in compare.keys():
        
        if key == 'values_changed' or key == 'type_changes':
            
            for inkey in compare[key].keys():
                change_key.append(re.findall("'([^']*)'", inkey)[0])
                new_value.append(compare[key][inkey]['new_value'])

    change = dict(zip(change_key,new_value)) # объединяем два списка в словарь
    
    if len(change) == 0:
        change = None
    
    return change

In [10]:
def convert_none_to_str(value):
    return '' if value is None else str(value)

Заполняем шаблон

In [11]:
def cisco_config_interface(j2_interface,event='None'):
    """ 
    Заполнение шаблона значениями, если событие "delete", то заполняем по умолчанию
    :param j2_interface: ссылка на объект интерфейса pynetbox 
    :param event: событие
    :return: content
    """   

    def template_fill(*args,**kwargs):
        environment = Environment(loader=FileSystemLoader(templates_path)) # загружаем шаблон для заполнения
        template = environment.get_template(template_file)
        content = None
        try:
            if event == 'shutdown':
                content = template.render( # заполняем шаблон
                                    interface_name = convert_none_to_str(j2_interface.name)
                                )
            else:
                content = template.render( # заполняем шаблон
                                    interface_name = convert_none_to_str(j2_interface.name),
                                    descr = convert_none_to_str(j2_interface.description),
                                    access_vlan = convert_none_to_str(j2_interface.untagged_vlan.vid),
                                    mode = convert_none_to_str(j2_interface.mode.value)
                                )
        except: 
            # > добавляем запись в журнал
            comment,level =  'Not enough data to fill out the template','warning'                
            print(journal_template_fill(comment,level,global_id,global_dcim))
            # <
        else:
            #print("Filling in the template...\n{}".format(content)) 
            print("Filling in the template...")

        return content 
        
    if event == 'shutdown':
        template_file = "cisco_ios_shutdown_interface.template"
        content = template_fill(j2_interface, template_file,event)
    
    elif event != 'delete':    
        template_file = "cisco_ios_access_interface.template"
        content = template_fill(j2_interface, template_file)
    
    else:
        template_file = "cisco_ios_default_interface.template"
        content = template_fill(j2_interface, template_file)

    return content

Получаем через napalm интерфейсы с устройства

In [12]:
def push_config_interface(netbox_interface,content,event='None'):
    """  
    Проверка на доступность устройства (ping)
    Подключение к устройству по IP адресу, принадлежащему интерфейсу управления
    Проверка соответствия портов между netbox и реальным устройством
    Отправка конфигурации на устройство
    :param netbox_interface: ссылка на объект интерфейса pynetbox
    :return: None
    """
    templates_roles = ['access_switch'] # получаем из netbox (произвольные данные)
    device_role = netbox_interface.device.device_role.slug
    
    if device_role in templates_roles:
        attempts = 3 # количество попыток подключения
        attempt_timeout = 5 # время ожидания между попытками в секундах
        fail_count = 0 # количество неудачных попыток
        name = netbox_interface.name
        addrs = []
        filter_query = mgmt_address(netbox_interface) # адрес интерфейса управления
        addrs.append(filter_query)
        
        responses, no_responses = multi_ping(addrs, timeout=0.5, retry=2,ignore_lookup_errors=True)
        print("icmp ping...")
        
        if filter_query in list(responses.keys()): # если адрес интерфейса управления доступен (icmp ping)
            print('{} is available...'.format(addrs))
            
            nr = create_nornir_session()
            sw = nr.filter(hostname = filter_query) # производим отбор по конкретному хосту
            get_int = sw.run(task=napalm_get, getters=['get_interfaces']) # получаем все интерфейсы с устройства в виде словаря
            
            for _ in range(attempts): # попытка подключения к устройству
                print('Attempting to connect {}...'.format(_+1))
        
                if get_int.failed == True: # попытка не удалась
                    fail_count += 1
                    time.sleep(attempt_timeout)
                
                else: # есть подключение
                    print('Connection state is connected...')
            
                    for device in get_int.values():
                        
                        interfaces = device.result['get_interfaces'].keys() # получаем интерфейсы как ключи словаря  
                        if name in (intf for intf in list(interfaces)):
                            print("Find {} for device {}...".format(name, device.host))              
                    
                    result = sw.run(netmiko_send_config,name="Configuration interface.../",config_commands=content)
                    #result = sw.run(netmiko_send_config,name="Configuration interface.../",config_commands=cisco_config_interface(netbox_interface,event).split('\n'))
                    
                    print_result(result)
                    # > добавляем запись в журнал
                    comment,level = 'All operations are performed','success'                
                    print(journal_template_fill(comment,level,global_id,global_dcim))
                    # <
                    break
            if fail_count>=attempts:
                # > добавляем запись в журнал
                comment,level = 'Connection state is failed','error'                
                (journal_template_fill(comment,level,global_id,global_dcim))
                # <
            sw.close_connections()
            print("Connection state is closed.")
        
        elif filter_query in no_responses:
            # > добавляем запись в журнал
            comment,level = '{} is not available'.format(addrs),'warning'                
            (journal_template_fill(comment,level,global_id,global_dcim))
            # <
    else:
        # > добавляем запись в журнал
        comment,level = 'Devices must match the list of {}'.format(templates_roles),'notification'                
        (journal_template_fill(comment,level,global_id,global_dcim))
        # <


========================================

Получаем конфигурацию интерфейса и отправляем её на устройство 

In [13]:
def change_config_intf(netbox_interface,event):
    """ 
    Изменяем настройки порта коммутатора 
    :param netbox_interface: ссылка на объект интерфейса pynetbox
    :param event: shutdown|delete|update|create
    :return: None
    """
    
    interface_name = netbox_interface.name
    
    try:
        content = cisco_config_interface(netbox_interface,event).split('\n')
        print("{} interface {} config...".format(event.capitalize(),netbox_interface))
        push_config_interface(netbox_interface,content,event)
    
    except:
            # > добавляем запись в журнал
            comment,level = 'No data for {} {}'.format(event.lower(),interface_name),'informational'
            print(journal_template_fill(comment,level,global_id,global_dcim))
            # <

Управляем изменениями связанных интерфейсов

In [14]:
def mng_connected_interfaces(user_device_intf):
    """  
    Получаем конфигурацию интерфейса устройства пользователя и передаем её интерфейсу устройства-соседа
    Соединение должно быть point-to-point между интерфейсами (Interface)
    :param user_device_intf: ссылка на объект интерфейса pynetbox
    :return: None
    """
    
    if user_device_intf['connected_endpoints_reachable']: # проверяем, есть ли соединение с другим устройством
        
        interface = ['mtu','mac_address','speed','duplex','description','mode','untagged_vlan'] # произвольный список параметров интерфеса 
        network_device = user_device_intf['connected_endpoints'][0]['id'] # устройство-сосед (сетевое)
        changes = dict()
        change_key = ['id'] # добавляем 'ID' устройства-соседа, используемое как ключ
        new_value = [network_device]

        for value in user_device_intf: # перебираем список кортежей
            
            if value[0] in interface and value[1] != '' and value[1] != None: # проверяем, есть ли значение в списке, определенном нами ранее,
                                                                              # значение должно быть заполнено
                change_key.append(value[0])
                
                if isinstance(value[1], dict): # проверяем, является ли значение словарем
                    new_value.append(list(value[1].values())[0]) # превращаем значение словаря в список
                
                else:
                    new_value.append(value[1])
        print(changes)
        changes = [dict(zip(change_key,new_value))] # объединяем два списка в словарь
        # POST intf
        netbox_api.dcim.interfaces.update(changes) # обновляем данные интерфейса через netbox_api
    
    else:
        # > добавляем запись в журнал
        comment,level = 'Neighbor is not reachable for {}'.format(user_device_intf),'notification'
        print(journal_template_fill(comment,level,global_id,global_dcim))
        # <        

=================================================================

Управляем соединением

In [15]:
def mng_cable():
    """  
    Соединение должно быть point-to-point между интерфейсами (Interface)
    Проверка полученных устройств на соответствие списку (access switch и user device)
    :return: Response(status=204)
    """
        
    devices_keys = ['role','device_id','intf_id'] # список ключей для словаря devices
    devices = []
    devices_names = []
    templates_roles = ['access_switch', 'user_device'] # присваиваем значение из netbox ("произвольные" данные)
    device_roles = []
    regex = "[a|b]_terminations"
    
    # Получаем данные через flask от webhook netbox:
    get_cable = request.json['data']
    get_event = request.json["event"]
    prechange = request.json['snapshots']['prechange']
    postchange = request.json['snapshots']['postchange']
    
    global global_id
    global global_dcim 
    global_dcim = 'dcim.cable'
    global_id = get_cable['id']
    
        
    for key in get_cable.keys(): # заполняем список device_value и объединяем с device_keys в словарь

        if re.match(regex, key) and len(get_cable[key])==1:  # отбираем нужные ключи из словаря по регулярке
                                                                # при условии, что интерфейс один на устройство
            for _ in range(len(get_cable[key])):
                device_id = get_cable[key][_]['object']['device']['id'] # ID устройства из json
                devices_values = []
                devices_names.append(netbox_api.dcim.devices.get(device_id).name) # наименование устройства через netbox_api
                devices_values.append(netbox_api.dcim.devices.get(device_id).device_role.slug) # роль устройства через netbox_api
                devices_values.append(device_id) # id устройства
                devices_values.append(get_cable[key][_]['object']['id']) # ID интерфейса устройства из json
                devices.append(dict(zip(devices_keys,devices_values))) # получаем список из словарей
    
    for device in devices: # заполняем список ролей
        device_roles.append(device['role'])
    
    print("{} cable ID #{} between {}...".format(get_event.upper(),get_cable['id'], devices_names))
    
    if set(device_roles) == set(templates_roles): # проверяем, что получили устройства с разными ролями и соответствущие списку    
        
        for device in devices:
            
            if device['role'] == templates_roles[0]: # нам нужен access switch
                device_intf_id = device['intf_id'] # получаем ID интерфейса access switchа из нами созданного словаря            
        
        get_device_interface = netbox_api.dcim.interfaces.get(device_intf_id) # по ID находим интерфейс через netbox_api
        interface_name = get_device_interface.name
        global_dcim = 'dcim.device'
        global_id = get_device_interface.device.id
        print("Connection between {} and {}, switch access interface ID: {}...".format(device_roles[0],device_roles[1], device_intf_id))
        
        if get_device_interface.mgmt_only: # проверяем, является ли интерфейс management интерфейсов
            # > добавляем запись в журнал
            comment,level = '{} is management interface, no changes will be performed'.format(interface_name),'notification'                
            (journal_template_fill(comment,level,global_id,global_dcim))
            # <
        
        else:
            
            if get_device_interface.enabled == False:
                print('Interface {} was turned off before'.format(interface_name)) 
                pass
            
            elif get_event == "created": # Конфиг интерфейса будет добавлен

                change_config_intf(netbox_interface=get_device_interface,event='create')

            elif get_event == "updated" and compare(prechange,postchange) != None: # Конфиг интерфейса будет изменен

                change_config_intf(netbox_interface=get_device_interface,event='update') 
            
            elif get_event == "deleted": # Конфиг интерфейса будет удален или настроен по умолчанию
                pass # изменить настройки интерфейса в Netbox
                change_config_intf(netbox_interface=get_device_interface,event='delete') # удаляем настройки интерфейса (или настраиваем по умолчанию)
                
            else:
                # > добавляем запись в журнал
                comment,level = 'No data for {} {}'.format(get_event.lower(),interface_name),'informational'
                (journal_template_fill(comment,level,global_id,global_dcim))
                # <

            
    else:
        # > добавляем запись в журнал
        comment,level = 'Devices must match the list of {}'.format(templates_roles),'notification'                
        (journal_template_fill(comment,level,global_id,global_dcim))
        # <
    return Response(status=204)

Управление интерфейсом

In [16]:
def mng_int():
    """  
    Проверяем полученые данные на соответствие определенным условиям и передаем их на устройства
    :return: Response(status=204)
    """
    
    network_devices_roles = ['access_switch'] # присваиваем значение из netbox ("произвольные" данные)
    user_devices_roles = ['user_device'] # присваиваем значение из netbox ("произвольные" данные)
    
    # Получаем данные через flask от webhook netbox:
    event = request.json["event"]
    mng_int_id = request.json['data']['id']
    prechange = request.json['snapshots']['prechange']
    postchange = request.json['snapshots']['postchange']
    
    # Запрашиваем данные через netbox_api:
    get_device_interface = netbox_api.dcim.interfaces.get(mng_int_id)
    interface_name = get_device_interface.name
    device_role = get_device_interface.device.device_role.slug
    
    global global_id
    global global_dcim 
    global_dcim = 'dcim.device'
    global_id = get_device_interface.device.id
    
    print("{} {}...".format(event.upper(), interface_name))
    
    if get_device_interface.mgmt_only: # проверяем, является ли интерфейс management интерфейсом       
        # > добавляем запись в журнал
        comment,level = '{} is management interface, no changes will be performed'.format(interface_name),'notification'        
        (journal_template_fill(comment,level,global_id,global_dcim))
        # <
    
    else:
           
        if compare(prechange,postchange) == None: # проверяем, изменились ли данные
            # > добавляем запись в журнал
            comment,level = 'No data for {} {}'.format(event.lower(),interface_name),'informational'
            (journal_template_fill(comment,level,global_id,global_dcim))
            # <

        elif request.json['data']['enabled'] == False and prechange['enabled'] == True: # проверяем, соответствует ли изменению ВКЛ->ВЫКЛ
            # > добавляем запись в журнал
            comment,level = 'Interface {} is disabled on the device'.format(interface_name),'notification'              
            (journal_template_fill(comment,level,global_id,global_dcim))
            # <
            change_config_intf(get_device_interface,event='shutdown') # вызываем функцию выключения порта устройства
            
        elif request.json['data']['enabled'] == False and prechange['enabled'] == False: # проверяем, соответствует ли изменению ВЫКЛ->ВЫКЛ
            print('Interface {} was turned off before'.format(interface_name))              
                        
        else: 
            
            if device_role in network_devices_roles: # проверяем, является ли устройство сетевым устройством
                change_config_intf(get_device_interface,event='update') # вызываем функцию внесения изменений настроек порта устройства
            
            elif device_role in user_devices_roles: # проверяем, является ли устройство конечным (пользователя)
                mng_connected_interfaces(get_device_interface) # вызываем функцию внесения изменений настроек связанных портов
        
    return Response(status=204)

main.py

In [18]:
# Create a Flask instance
from flask import Flask
from nb_ipam_api import mng_ip

# Create a Flask instance
app = Flask(__name__)
"""
Webhook POST
    Name:'Fixed IP into DHCPd'
    Content types: 'IPAM > IP Address'
"""
app.add_url_rule("/api/fixed_ip",
                 methods=["POST"],
                 view_func=mng_ip)
"""
Webhook POST
    Name:'Change the cable'
    Content types: 'DCIM > Cable'
"""
app.add_url_rule("/api/cable_change",
                methods=['POST'],
                view_func=mng_cable)
"""
Webhook POST
    Name:'Update the interface'
    Content types: 'DCIM > Interfaces'
"""
app.add_url_rule("/api/int_update",
                 methods=['POST'],
                 view_func=mng_int)
    
if __name__ == "__main__": 
    app.run(host='0.0.0.0', port=8080)

 * Serving Flask app '__main__'
[0m * Debug mode: off
[0m

 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:8080
 * Running on http://10.68.5.12:8080
[0mPress CTRL+C to quit[0m
[0m

UPDATED Ethernet 0...[0m
[0mINFORMATIONAL: No data for updated Ethernet 0.[0m


10.30.1.226 - - [16/Apr/2023 10:03:43] "[0m

[0m

POST /api/int_update HTTP/1.1[0m" 204 -
[0m

UPDATED Ethernet 0...[0m
[0m{}[0m
[0m

10.30.1.226 - - [16/Apr/2023 10:04:00] "[0mPOST /api/int_update HTTP/1.1[0m" 204 -
[0m

UPDATED Ethernet0/2...[0m
[0mFilling in the template...[0m
[0mUpdate interface Ethernet0/2 config...[0m
[0mINFORMATIONAL: No data for update Ethernet0/2.[0m
[0mNone[0m


10.30.1.226 - - [16/Apr/2023 10:04:01] "[0mPOST /api/int_update HTTP/1.1[0m" 204 -
[0m

[0m