In [None]:
import ipywidgets as widgets
import datetime
import pandas as pd
import warnings
import re
import json
import ipaddress
import logging
import os

pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', None)
pd.set_option('display.width', 1000)

try:
    open('_debug.log', 'w').close()
    logging.basicConfig(format='%(asctime)s %(message)s', filename='_debug.log', encoding='utf-8', level=logging.DEBUG)
    logging.info("PROGRAM_INITIALIZED" + "#"*100)
except Exception:
    pass

warnings.simplefilter(action='ignore', category=FutureWarning)

In [None]:
class Rule:
    def determine_fields(self):
        try:
            regex_dict = {
                'action'                     : r'^(add|remove|keep)',
                'environment'                : r'^(testing|test|production|prod|development|dev|staging)',
                'address_singular'           : r'^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',
                'address_range_shorthand'    : r'^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}-\d{1,3})',
                'address_range_expanded'     : r'^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}-\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',
                'address_any'                : r'^(any)',
                'address_cidr'               : r'^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}(\s)?/(\s)?\d{1,2})',
                'port_singular'              : r'^(\d{1,5}\s)|any|ANY|Any',
                'port_range'                 : r'^(\d{1,5}-\d{1,5})',
                'port_comma_separated'       : r'^(((\d{1,5})?,( )?\d{1,5}))+',
                'protocol'                   : r'^(tcp|udp)'
            }
            action = self.regex_match_and_trim(regex_dict['action'])
            from_environment = self.regex_match_and_trim(regex_dict['environment'])
            from_address = [match for match in [self.regex_match_and_trim(regex_dict["address_cidr"]), self.regex_match_and_trim(regex_dict["address_any"]), self.regex_match_and_trim(regex_dict["address_range_expanded"]), self.regex_match_and_trim(regex_dict["address_range_shorthand"]), self.regex_match_and_trim(regex_dict['address_singular'])] if match is not None][0]  # Checks for match of each type -- order of checking is important and you may still get a match of singular for expanded etc.
            from_port = [match for match in [self.regex_match_and_trim(regex_dict["port_comma_separated"]), self.regex_match_and_trim(regex_dict["port_range"]), self.regex_match_and_trim(regex_dict["port_singular"])] if match is not None][0]  # Checks for match of each type -- order of checking is important and you may still get a match of singular for range etc.
            to_environment = self.regex_match_and_trim(regex_dict['environment'])
            to_address = [match for match in [self.regex_match_and_trim(regex_dict["address_cidr"]), self.regex_match_and_trim(regex_dict["address_any"]), self.regex_match_and_trim(regex_dict["address_range_expanded"]), self.regex_match_and_trim(regex_dict["address_range_shorthand"]), self.regex_match_and_trim(regex_dict['address_singular'])] if match is not None][0]  # Checks for match of each type -- order of checking is important and you may still get a match of singular for expanded etc.
            to_port = [match.strip() for match in [self.regex_match_and_trim(regex_dict["port_comma_separated"]), self.regex_match_and_trim(regex_dict["port_range"]), self.regex_match_and_trim(regex_dict["port_singular"])] if match is not None][0]  # Checks for match of each type -- order of checking is important and you may still get a match of singular for range etc.
            protocol = self.regex_match_and_trim(regex_dict['protocol'])
            try:
                for field in [action, from_environment, from_address, from_port, to_environment, to_address, to_port, protocol]:
                    assert field is not None
            except AssertionError:
                logging.error(f"ERROR:Rule:determine_fields:$        failed to determine field")
                pass_fail_indicator.value = False
                
            logging.info(f"INFO:Rule:determine_fields:$         determined : '{action}','{from_environment}','{from_address}','{from_port}','{to_environment}','{to_address}','{to_port}','{protocol}'")
        except Exception as error:
            logging.error(f"ERROR:Rule:determine_fields:$        exception {error}")
            pass_fail_indicator.value = False
        return action, from_environment, from_address, from_port, to_environment, to_address, to_port, protocol

    @staticmethod
    def regex_match_and_trim(expression):
        try:
            global working_rule
            match = re.compile(expression, re.IGNORECASE).match(working_rule)
            if match:
                working_rule = (working_rule[match.span()[1]::]).strip()
                return match[0]
            elif not match:
                return None
        except Exception as error:
            logging.error(f"ERROR:Rule:regex_match_and_trim:$ exception {error}", exc_info=True)
            pass_fail_indicator.value = False

    def __init__(self, rule):
        global working_rule
        working_rule = rule
        self.rule = rule
        self.determined = self.determine_fields()
        self.action = self.determined[0]
        self.from_environment = self.determined[1]
        self.from_address = self.determined[2]
        self.from_port = self.determined[3]
        self.to_environment = self.determined[4]
        self.to_address = self.determined[5]
        self.to_port = self.determined[6]
        self.protocol = self.determined[7]


In [None]:
class Correlation:
    @staticmethod
    def isolate_address(address):
        try:
            if 'any' in address.lower():
                return None
            elif 'any' not in address.lower():
                address_expression = re.compile(r'\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}')
                match = address_expression.search(address)
                if match:
                    return match[0]
        except Exception as error:
            logging.error(f"ERROR:Correlation:isolate_address:$  exception {error}", exc_info=True)
            pass_fail_indicator.value = False

    def correlate_address(self):
        asa_address, asa_context, asa_access_list = None, None, None
        from_address = self.isolate_address(self.from_address)
        to_address = self.isolate_address(self.to_address)
        with open(self.correlations_file, 'r') as file:
            correlations_config = json.load(file)
            for asa in correlations_config.keys():
                for context in correlations_config[asa].keys():
                    for acl in correlations_config[asa][context].keys():
                        acl_network = ipaddress.ip_network(correlations_config[asa][context][acl])
                        if from_address:
                            from_address = ipaddress.IPv4Address(from_address)
                            if from_address in acl_network:
                                if "VLAN" in acl:
                                    asa_address = asa
                                    asa_context = context
                                    asa_access_list = acl
                                    logging.info(f"INFO:Correlation:correlate_address:$ correlated : {from_address} in {acl_network} at {asa_address} > {asa_context:} > {asa_access_list}")
                                    return asa_address, asa_context, asa_access_list
            for asa in correlations_config.keys():
                for context in correlations_config[asa].keys():
                    for acl in correlations_config[asa][context].keys():
                        acl_network = ipaddress.ip_network(correlations_config[asa][context][acl])
                        if to_address:
                            to_address = ipaddress.IPv4Address(to_address)
                            if to_address in acl_network:
                                if "SVI" in acl:
                                    asa_address = asa
                                    asa_context = context
                                    asa_access_list = acl
                                    logging.info(f"INFO:Correlation:correlate_address:$ correlated : {to_address} in {acl_network} at {asa_address} > {asa_context:} > {asa_access_list}")
                                    return asa_address, asa_context, asa_access_list

    def __init__(self, from_address, to_address, correlations):
        self.correlations_file = correlations
        self.from_address = from_address
        self.to_address = to_address
        self.correlation = self.correlate_address()
        self.asa_address = self.correlation[0]
        self.asa_context = self.correlation[1]
        self.asa_access_list = self.correlation[2]


In [None]:
class Grouper:
    def split_multiple_ports(self):
        # If comma-separated ports are detected in the rule, proceed to duplicate this rule in the dataframe for each port. Then drop the original row containing the comma-separated ports.
        try:
            for index, rule in self.master_frame.iterrows():
                if ',' in rule['To_Port']:
                    logging.info(f"INFO:Grouper:split_multiple_ports:$  comma-sep ports detected : {rule['Action']} {rule['From_Environment']} {rule['From_Address']} {rule['From_Port']} {rule['To_Environment']} {rule['To_Address']} {rule['To_Port']} {rule['Protocol']}")
                    rule_copy = rule
                    logging.info(f"INFO:Grouper:split_multiple_ports:$  dropping rule at index {index}  --  {rule['Action']} {rule['From_Environment']} {rule['From_Address']} {rule['From_Port']} {rule['To_Environment']} {rule['To_Address']} {rule['To_Port']} {rule['Protocol']}")
                    self.master_frame.drop(index, inplace=True)
                    logging.info(f"INFO:Grouper:split_multiple_ports:$  comma-sep rule dropped; copy retained")
                    all_ports = rule_copy['To_Port'].split(',')
                    all_ports = [port.strip(' ') for port in all_ports]
                    for port in all_ports:
                        rule_copy['To_Port'] = port
                        self.master_frame = self.master_frame.append(rule_copy)
                        logging.info(f"INFO:Grouper:split_multiple_ports:$  added rule copy with port : {port}")
            self.master_frame.reset_index(drop=True, inplace=True)
        except Exception as error:
            logging.error(f"ERROR:Grouper:split_multiple_ports:$ exception {error}", exc_info=True)
            pass_fail_indicator.value = False

    def remove_duplicates(self):
        # Removes duplicate rows / rules if present. Ignores From_Environment and To_Environment.
        try:
            temp_frame = self.master_frame[self.master_frame.duplicated(subset=['Action', 'From_Address', 'From_Port', 'To_Address', 'To_Port', 'Protocol'], keep='last')]
            self.master_frame = self.master_frame.drop(temp_frame.index)
            self.master_frame = self.master_frame.reset_index(drop=True)
            if len(temp_frame) > 0:
                logging.info(f"INFO:Grouper:remove_duplicates:$     removed duplicates : {len(temp_frame)}")
                logging.info(f"---------------------------------------------REMOVED DUPLICATES-----------------------------------------\n{temp_frame}")
        except Exception as error:
            logging.error(f"ERROR:Grouper:remove_duplicates:$    exception {error}", exc_info=True)
            pass_fail_indicator.value = False

    def group_to_address(self):
        # Groups rows / rules that share important fields (i.e. not From/To_Environment or From_Port) logically. From_Port is always assumed ephemeral and is therefore ignored.
        # Produces Cisco ASA object-group of TO addresses. Example: 'access-list acl-25-VLAN line 1 extended permit TCP host 25.25.25.3 object-group network_object_1 eq 8443'
        try:
            temp_frame = self.master_frame[self.master_frame.duplicated(subset=['Action', 'From_Address', 'Protocol', 'ASA_ACL', 'To_Port'], keep=False)]
            if temp_frame.empty:
                return None
            acl_list = []
            for acl_field in temp_frame['ASA_ACL']:
                acl_list.append(acl_field)
            acl_list = list(set(acl_list))
            logging.info(f"INFO:Grouper:group_to_address:$      object-groupings of To_Address discovered")
            
            for acl in acl_list:
                object_group_df = (temp_frame.loc[(temp_frame['ASA_ACL'] == acl)]) # If ACL in list matches ACL in rule/row, that's your object-group. Modify this to also verify from_address field is the same since TO address is being grouped
                logging.info(f"---------------------------------------------TO ADDRESS GROUPING / BY ACL---------------------------------\n{object_group_df}")
                unique_from_address_list = []
                for from_address_field in object_group_df['From_Address']:
                    unique_from_address_list.append(from_address_field)
                unique_from_address_list = list(set(unique_from_address_list))
                for unique_from_address in unique_from_address_list:
                    singular_object_group_df = (object_group_df.loc[(object_group_df['From_Address'] == unique_from_address)])
                    self.groups['To_Address'].append(singular_object_group_df)
                    logging.info(f"---------------------------------------------TO ADDRESS GROUPING / BY GROUPING---------------------------------\n{singular_object_group_df}")
                self.master_frame.drop((temp_frame.loc[(temp_frame['ASA_ACL'] == acl)]).index, inplace=True) # Drop rows from master frame that are in the temp frame with the current iterated acl in acl_list
        except Exception as error:
            logging.error(f"ERROR:Grouper:group_to_address:$     exception {error}", exc_info=True)
            pass_fail_indicator.value = False

    def group_from_address(self):
        # Groups rows / rules that share important fields (i.e. not From/To_Environment or From_Port) logically. From_Port is always assumed ephemeral and is therefore ignored.
        # Produces Cisco ASA object-group of FROM addresses. Example: 'access-list acl-25-VLAN line 1 extended permit TCP object-group network_object_1 host 25.25.25.3 eq 8443'
        try:
            temp_frame = self.master_frame[self.master_frame.duplicated(subset=['Action', 'To_Address', 'Protocol', 'ASA_ACL', 'To_Port'], keep=False)]
            if temp_frame.empty:
                return None
            acl_list = []
            for acl_field in temp_frame['ASA_ACL']:
                acl_list.append(acl_field)
            acl_list = list(set(acl_list))
            logging.info(f"INFO:Grouper:group_from_address:$      object-groupings of From_Address discovered")
            
            for acl in acl_list:
                object_group_df = (temp_frame.loc[(temp_frame['ASA_ACL'] == acl)]) # If ACL in list matches ACL in rule/row, that's your object-group. Modify this to also verify to_address field is the same since TO address is being grouped
                logging.info(f"---------------------------------------------FROM ADDRESS GROUPING / BY ACL---------------------------------\n{object_group_df}")
                unique_to_address_list = []
                for to_address_field in object_group_df['To_Address']:
                    unique_to_address_list.append(to_address_field)
                unique_to_address_list = list(set(unique_to_address_list))
                for unique_to_address in unique_to_address_list:
                    singular_object_group_df = (object_group_df.loc[(object_group_df['To_Address'] == unique_to_address)])
                    self.groups['From_Address'].append(singular_object_group_df)
                    logging.info(f"---------------------------------------------FROM ADDRESS GROUPING / BY GROUPING---------------------------------\n{singular_object_group_df}")
                self.master_frame.drop((temp_frame.loc[(temp_frame['ASA_ACL'] == acl)]).index, inplace=True) # Drop rows to master frame that are in the temp frame with the current iterated acl in acl_list
        except Exception as error:
            logging.error(f"ERROR:Grouper:group_from_address:$     exception {error}", exc_info=True)
            pass_fail_indicator.value = False

    def group_to_port(self):
        try:
            temp_frame = self.master_frame[self.master_frame.duplicated(subset=['Action', 'From_Address', 'To_Address', 'Protocol', 'ASA_ACL'], keep=False)]
            if temp_frame.empty:
                return None
            acl_list = []
            for acl_field in temp_frame['ASA_ACL']:
                acl_list.append(acl_field)
            acl_list = list(set(acl_list))
            logging.info(f"INFO:Grouper:group_from_address:$    object-groupings of To_Port discovered")
            for acl in acl_list:
                object_group_df = (temp_frame.loc[(temp_frame['ASA_ACL'] == acl)])
                logging.info(f"---------------------------------------------TO PORT GROUPING---------------------------------------\n{object_group_df}")
                self.master_frame.drop((temp_frame.loc[(temp_frame['ASA_ACL'] == acl)]).index, inplace=True)
                self.groups['To_Port'].append(object_group_df)
        except Exception as error:
            logging.error(f"ERROR:Grouper:group_to_port:$        exception {error}", exc_info=True)
            pass_fail_indicator.value = False

    def group_remainder(self):
        try:
            if self.master_frame.empty:
                return None
            logging.info(f"INFO:Grouper:group_remainder:$      remaining ungrouped rules discovered")
            logging.info(f"------------------------------------------REMAINING UNGROUPED RULES------------------------------------\n{self.master_frame}")
            self.groups['Host_Singular'].append(self.master_frame)
        except Exception as error:
            logging.error(f"ERROR:Grouper:group_remainder:$      exception {error}", exc_info=True)
            pass_fail_indicator.value = False

    def __init__(self, dataframe):
        self.groups = {
            'From_Address': [],
            'To_Address': [],
            'To_Port': [],
            'Host_Singular': []
        }
        self.master_frame = dataframe
        self.split_multiple_ports()
        self.remove_duplicates()
        self.group_from_address()
        self.group_to_address()
        self.group_to_port()
        self.group_remainder()


In [None]:
class Commands:
    def get_settings(self, key):
        try:
            with open(self.settings_file, 'r') as file:
                settings = json.load(file)
                return settings[key]
        except Exception as error:
            logging.error(f"ERROR:Commands:get_settings:$        exception {error}", exc_info=True)
            pass_fail_indicator.value = False

    def increment_settings(self, key):
        try:
            with open(self.settings_file, "r") as file:
                data = json.load(file)
                file.close()
            data[key] += 1
            with open("settings.json", "w") as file:
                write_to_json = json.dump(data, file)
                logging.info(f"INFO:Commands:increment_settings:$ incremented settings : {key} in {self.settings_file}")
                file.close()
        except Exception as error:
            logging.error(f"ERROR:Commands:increment_settings:$  exception {error}", exc_info=True)
            pass_fail_indicator.value = False

    @staticmethod
    def cidr_to_netmask(cidr):
        try:
            cidr = int(cidr)
            mask = (0xffffffff >> (32 - cidr)) << (32 - cidr)
            return (str((0xff000000 & mask) >> 24) + '.' +
                    str((0x00ff0000 & mask) >> 16) + '.' +
                    str((0x0000ff00 & mask) >> 8) + '.' +
                    str((0x000000ff & mask)))
        except Exception as error:
            logging.error(f"ERROR:Commands:cidr_to_netmask:$     exception {error}", exc_info=True)
            pass_fail_indicator.value = False


    def generate_from_address(self, object_group_df):
        try:
            command_count = len(self.commands)
            self.increment_settings('object_network_index')
            object_network_prefix = self.get_settings('object_network_prefix')
            object_network_index = self.get_settings('object_network_index')
            for dataframe in self.groups['From_Address']:
                for rule in dataframe.index:
                    rule = dataframe.loc[rule]
                    break
            self.commands.append(f"changeto context {object_group_df.iloc[0]['ASA_Context']}")
            self.commands.append(f"conf t")
            self.commands.append(f"object-group network {object_network_prefix}{object_network_index}")
            for row in object_group_df.index:
                rule = object_group_df.loc[row]
                address = rule['From_Address']
                self.address_logic_object_group(address)
            self.commands.append("exit")
            self.commands.append(f"access-list {rule['ASA_ACL']} line 1 extended permit {rule['Protocol']} object-group {object_network_prefix + str(object_network_index)} host {rule['To_Address']} eq {rule['To_Port']}")
            if self.remark:
                self.commands.append(f"access-list {rule['ASA_ACL']} line 1 remark {self.remark}")
            self.commands.append("\n")
            command_count = len(self.commands) - command_count
            logging.info(f"INFO:Commands:generate_from_address:$ generated {command_count} From_Address grouped commands : \n{'.'*30}{(os.linesep + '.'*30).join([command for command in self.commands[-(command_count):]])}")
        except Exception as error:
            logging.error(f"ERROR:Commands:generate_from_address:$ exception {error}", exc_info=True)
            pass_fail_indicator.value = False

    def generate_to_address(self, object_group_df):
        try:
            command_count = len(self.commands)
            self.increment_settings('object_network_index')
            object_network_prefix = self.get_settings('object_network_prefix')
            object_network_index = self.get_settings('object_network_index')
            for dataframe in self.groups['To_Address']:
                for rule in dataframe.index:
                    rule = dataframe.loc[rule]
                    break
            self.commands.append(f"changeto context {object_group_df.iloc[0]['ASA_Context']}")
            self.commands.append(f"conf t")
            self.commands.append(f"object-group network {object_network_prefix}{object_network_index}")
            for row in object_group_df.index:
                rule = object_group_df.loc[row]
                address = rule['To_Address']
                self.address_logic_object_group(address)
            self.commands.append("exit")
            self.commands.append(f"access-list {rule['ASA_ACL']} line 1 extended permit {rule['Protocol']} host {rule['From_Address']} object-group {object_network_prefix + str(object_network_index)} eq {rule['To_Port']}")
            if self.remark:
                self.commands.append(f"access-list {rule['ASA_ACL']} line 1 remark {self.remark}")
            self.commands.append("\n")
            command_count = len(self.commands) - command_count
            logging.info(f"INFO:Commands:generate_to_address:$ generated {command_count} To_Address grouped commands : \n{'.'*30}{(os.linesep + '.'*30).join([command for command in self.commands[-(command_count):]])}")
        except Exception as error:
            logging.error(f"ERROR:Commands:generate_to_address:$ exception {error}", exc_info=True)
            pass_fail_indicator.value = False

    def generate_to_port(self, object_group_df):
        try:
            command_count = len(self.commands)
            port_list = []
            self.increment_settings('object_service_index')
            object_service_prefix = self.get_settings('object_service_prefix')
            object_service_index = self.get_settings('object_service_index')
            self.commands.append(f"changeto context {object_group_df.iloc[0]['ASA_Context']}")
            self.commands.append('conf t')
            self.commands.append(f'object-group service {object_service_prefix}{object_service_index}')
            # Below appends port_list with all ports, then removes duplicates to avoid ASA duplicate object error
            for row in object_group_df.index:
                rule = object_group_df.loc[row]
                port = rule['To_Port']
                port_list.append(port)
            port_list = list(set(port_list))
            for i in port_list:
                self.commands.append(f"service-object {rule['Protocol']} destination eq {i}")
            self.commands.append('exit')
            a = object_group_df[object_group_df.duplicated(subset=['Action', 'From_Address', 'From_Port', 'To_Address', 'Protocol'], keep='last')]
            obj_group_df = object_group_df.drop(a.index)
            obj_group_df = obj_group_df.reset_index(drop=True)
            for row in obj_group_df.index:
                df_rule = obj_group_df.loc[row]
                self.commands.append(f"access-list {df_rule['ASA_ACL']} line 1 extended permit object-group {object_service_prefix + str(object_service_index)} host {df_rule['From_Address']} host {df_rule['To_Address']}")
            if self.remark:
                self.commands.append(f"access-list {df_rule['ASA_ACL']} line 1 remark {self.remark}")
            self.commands.append("\n")
            command_count = len(self.commands) - command_count
            logging.info(f"INFO:Commands:generate_to_port:$ generated {command_count} To_Port grouped commands : \n{'.'*30}{(os.linesep + '.'*30).join([command for command in self.commands[-(command_count):]])}")
        except Exception as error:
            logging.error(f"ERROR:Commands:generate_to_port:$    exception {error}", exc_info=True)
            pass_fail_indicator.value = False

    def generate_singular(self, object_group_df):
        try:
            command_count = len(self.commands)
            acl_list = []
            asa_address = ''
            asa_context = ''
            temp_group_dataframe = pd.DataFrame(columns=['Command', 'ASA_ACL', 'ASA_Context', 'ASA_Address'])
            for row in object_group_df.index:
                df_rule = object_group_df.loc[row]
                acl_list.append(df_rule['ASA_ACL'])
            acl_list = list(set(acl_list))
            for acl in acl_list:
                logging.info(f"INFO:Commands:generate_singular:$ singular command generation working within ACL: {acl}")
                for row in object_group_df.index:
                    logging.info(f"INFO:Commands:generate_singular:$ singular command generation iterating rule : {row}")
                    df_rule = object_group_df.loc[row]
                    if df_rule['ASA_ACL'] == acl:
                        data = {"Command": f"access-list {df_rule['ASA_ACL']} line 1 extended permit {df_rule['Protocol']} {self.address_logic_singular(df_rule['From_Address'])} {self.address_logic_singular(df_rule['To_Address'])} eq {df_rule['To_Port']}", "ASA_ACL": df_rule['ASA_ACL'], "ASA_Context": df_rule['ASA_Context'], "ASA_Address": df_rule['ASA_Address']}
                        logging.info(f"INFO:Commands:generate_singular:$ singular command generated commands : {data['Command']}")
                        temp_group_dataframe = temp_group_dataframe.append(data, ignore_index=True)
            temp_group_dataframe.groupby("ASA_Address")
            logging.info(f"INFO:Commands:generate_singular:$ groupy complete -- begin row iteration and command append")
            logging.info(f"-"*80)
            logging.info(f"\n\n{temp_group_dataframe}\n\n")
            for row in temp_group_dataframe.index:
                df_rule = temp_group_dataframe.loc[row]
                logging.info(f"\n\n{df_rule}\n\n")
                if df_rule['ASA_Address'] == asa_address:
                    pass
                elif df_rule['ASA_Address'] != asa_address:
                    asa_address = df_rule['ASA_Address']
                    self.commands.append(f"ssh {asa_address}")
                if df_rule['ASA_Context'] == asa_context:
                    pass
                elif df_rule['ASA_Context'] != asa_context:
                    asa_context = df_rule['ASA_Context']
                    self.commands.append(f"changeto context {df_rule['ASA_Context']}")
                    self.commands.append("conf t")
                self.commands.append(df_rule['Command'])
                logging.info(f"INFO:Commands:generate_singular:$ command append to commands : {df_rule['Command']}")
            if self.remark:
                self.commands.append(f"access-list {df_rule['ASA_ACL']} line 1 remark {self.remark}")
            self.commands.append("\n")
            command_count = len(self.commands) - command_count
            logging.info(f"INFO:Commands:generate_singular:$ generated {command_count} singular host commands : \n{'.'*30}{(os.linesep + '.'*30).join([command for command in self.commands[-(command_count):]])}")
        except Exception as error:
            logging.error(f"ERROR:Commands:generate_singular:$   exception {error}", exc_info=True)
            pass_fail_indicator.value = False

    def command_handler(self):
        try:
            if self.groups['From_Address']:
                for dataframe in self.groups['From_Address']:
                    self.generate_from_address(dataframe)
            if self.groups['To_Address']:
                for dataframe in self.groups['To_Address']:
                    self.generate_to_address(dataframe)
            if self.groups['To_Port']:
                for dataframe in self.groups['To_Port']:
                    self.generate_to_port(dataframe)
            if self.groups['Host_Singular']:
                for dataframe in self.groups['Host_Singular']:
                    self.generate_singular(dataframe)
        except Exception as error:
            logging.error(f"ERROR:Commands:command_handler:$     exception {error}", exc_info=True)
            pass_fail_indicator.value = False

    def address_logic_object_group(self, address):
        try:
            if '-' not in address:  # Singular
                self.commands.append(f"network-object host {address}")
            if '-' in address and address.count('.') > 3:  # Range-expanded
                self.commands.append(f"network-object range {address}")
            if '-' in address and address.count('.') == 3:  # Range-shorthand
                self.commands.append(f"network-object range {address}")
            if '/' in address:  # Subnet
                address = address.split("/")
                address[1] = self.cidr_to_netmask(address[1])
                self.commands.append(f"network-object subnet {address[0]} {address[1]}")
        except Exception as error:
            logging.error(f"ERROR:Commands:address_logic_object_group:$ exception {error}", exc_info=True)
            pass_fail_indicator.value = False

    def address_logic_singular(self, address):
        try:
            if '/' in address:  # Subnet
                address = address.split("/")
                address[1] = self.cidr_to_netmask(address[1])
                return f"{address[0]} {address[1]}"
            if '-' not in address and address.count('.') == 3:  # Singular
                return f"host {address}"
            if '-' in address and address.count('.') > 3:  # Range-expanded
                return address
            if '-' in address and address.count('.') == 3:  # Range-shorthand
                return address
        except Exception as error:
            logging.error(f"ERROR:Commands:address_logic_singular:$ exception {error}", exc_info=True)
            pass_fail_indicator.value = False

    def __init__(self, groups, remark, settings_file):
        self.settings_file = settings_file
        self.remark = remark
        self.groups = groups
        self.commands = []
        self.command_handler()


In [None]:
def port_names_to_port_numbers(raw_rules):
    try:
        port_names = {'echo': '7', 'discard': '9', 'systat': '11', 'daytime': '13', 'qotd': '17', 'chargen': '19', 'ftp-data': '20', 'ftp': '21', 'ssh': '22', 'telnet': '23', 'smtp': '25', 'time': '37', 'rlp': '39', 'nameserver': '42', 'nicname': '43', 'domain': '53', 'bootps': '67', 'bootpc': '68', 'tftp': '69', 'gopher': '70', 'finger': '79', 'http': '80', 'hosts2-ns': '81', 'kerberos': '88',
                        'hostname': '101', 'iso-tsap': '102', 'rtelnet': '107', 'pop2': '109', 'pop3': '110', 'sunrpc': '111', 'auth': '113', 'uucp-path': '117', 'sqlserv': '118', 'nntp': '119', 'ntp': '123', 'epmap': '135', 'netbios-ns': '137', 'netbios-dgm': '138', 'netbios-ssn': '139', 'imap': '143', 'sql-net': '150', 'sqlsrv': '156', 'pcmail-srv': '158', 'snmp': '161', 'snmptrap': '162',
                        'print-srv': '170', 'bgp': '179', 'irc': '194', 'ipx': '213', 'rtsps': '322', 'mftp': '349', 'ldap': '389', 'https': '443', 'microsoft-ds': '445', 'kpasswd': '464', 'isakmp': '500', 'crs': '507', 'exec': '512', 'login': '513', 'cmd': '514', 'printer': '515', 'talk': '517', 'ntalk': '518', 'efs': '520', 'ulp': '522', 'timed': '525', 'tempo': '526', 'irc-serv': '529',
                        'courier': '530', 'conference': '531', 'netnews': '532', 'netwall': '533', 'uucp': '540', 'klogin': '543', 'kshell': '544', 'dhcpv6-client': '546', 'dhcpv6-server': '547', 'afpovertcp': '548', 'new-rwho': '550', 'rtsp': '554', 'remotefs': '556', 'rmonitor': '560', 'monitor': '561', 'nntps': '563', 'whoami': '565', 'ms-shuttle': '568', 'ms-rome': '569', 'http-rpc-epmap': '593',
                        'hmmp-ind': '612', 'hmmp-op': '613', 'ldaps': '636', 'doom': '666', 'msexch-routing': '691', 'kerberos-adm': '749', 'kerberos-iv': '750', 'mdbs_daemon': '800', 'ftps-data': '989', 'ftps': '990', 'telnets': '992', 'imaps': '993', 'ircs': '994', 'pop3s': '995', 'activesync': '1034', 'kpop': '1109', 'nfsd-status': '1110', 'nfa': '1155', 'phone': '1167', 'opsmgr': '1270',
                        'ms-sql-s': '1433', 'ms-sql-m': '1434', 'ms-sna-server': '1477', 'ms-sna-base': '1478', 'wins': '1512', 'ingreslock': '1524', 'stt': '1607', 'l2tp': '1701', 'pptconference': '1711', 'pptp': '1723', 'msiccp': '1731', 'remote-winsock': '1745', 'ms-streaming': '1755', 'msmq': '1801', 'radius': '1812', 'radacct': '1813', 'msnp': '1863', 'ssdp': '1900', 'close-combat': '1944',
                        'nfsd': '2049', 'knetd': '2053', 'mzap': '2106', 'qwave': '2177', 'directplay': '2234', 'ms-olap3': '2382', 'ms-olap4': '2383', 'ms-olap1': '2393', 'ms-olap2': '2394', 'ms-theater': '2460', 'wlbs': '2504', 'ms-v-worlds': '2525', 'sms-rcinfo': '2701', 'sms-xfer': '2702', 'sms-chat': '2703', 'sms-remctrl': '2704', 'msolap-ptp2': '2725', 'icslap': '2869', 'cifs': '3020',
                        'xbox': '3074', 'ms-dotnetster': '3126', 'ms-rule-engine': '3132', 'msft-gc': '3268', 'msft-gc-ssl': '3269', 'ms-cluster-net': '3343', 'ms-wbt-server': '3389', 'ms-la': '3535', 'pnrp-port': '3540', 'teredo': '3544', 'p2pgroup': '3587', 'ws-discovery': '3702', 'dvcprov-port': '3776', 'msfw-control': '3847', 'msdts1': '3882', 'sdp-portmapper': '3935', 'net-device': '4350',
                        'ipsec-msft': '4500', 'llmnr': '5355', 'wsd': '5357', 'rrac': '5678', 'dccm': '5679', 'ms-licensing': '5720', 'directplay8': '6073', 'ms-do': '7680', 'man': '9535', 'rasadv': '9753', 'imip-channels': '11320'}
        clean_input = raw_rules
        for name in port_names.keys():
            if name in raw_rules.lower():
                clean_input = clean_input.lower().replace(name, port_names[name])
        return dirtywords_to_portnumbers(clean_input)
    except Exception as error:
        logging.error(f"ERROR:__main__:port_names_to_port_numbers:$ exception {error}", exc_info=True)
        pass_fail_indicator.value = False
        
def dirtywords_to_portnumbers(clean_input):
    try:
        dirty_words = {
            # After initial conversion of service names to numbers, some shared-name services get stepped on--this is a dictionary for replacing those mistakes.
            # Comments are in format: actual-service | service-causing-mistake | actual-port-number
            't21': '69',  # tftp | ftp | 69
            'r23': '107',  # rtelnet | telnet | 107
            '161trap': '162',  # snmp-trap | snmp | 162
            'm21': '349',  # mftp | ftp | 349
            '80s': '443',  # https | http | 443
            'n517': '518',  # ntalk | talk | 518
            '194-serv': '529',  # irc-serv | irc | 529
            'k513': '543',  # klogin | login | 529
            'remot520': '556',  # remotefs | efs | 556
            '119s': '563',  # nntps | nntp | 563
            '80-rpc-135': '593',  # http-rpc-epmap | http & epmap | 593
            '389s': '636',  # ldaps | ldap | 636
            '88-adm': '749',  # kerberos-adm | kerberos | 749
            '88-iv': '750',  # kerberos-iv | kerberos | 750
            '21s-data': '989',  # ftps-data | ftp | 989
            '21s': '990',  # ftps | ftp | 990
            '23s': '992',  # telnets | telnet | 992
            '143s': '993',  # imaps | imap | 993
            '194s': '994',  # ircs | irc | 994
            '110s': '995'  # pop3s | pop3 | 995
            }
        for key in dirty_words:
            if key in clean_input:
                clean_input = clean_input.replace(key, dirty_words[key])
        raw_input_textarea.value=clean_input
        return clean_input
    except Exception as error:
        logging.error(f"ERROR:__main__:dirtywords_to_portnumbers:$ exception {error}", exc_info=True)
        pass_fail_indicator.value = False

In [None]:
# Placeholder dataframe for display formatting
rule_dataframe = pd.DataFrame(columns=['Action', 'From_Environment', 'From_Address', 'From_Port', 'To_Environment', 'To_Address', 'To_Port', 'Protocol', 'ASA_Address', 'ASA_Context', 'ASA_ACL'])

def generate_commands():
    try:
        global rule_dataframe
        global generated_commands
        rules = raw_input_textarea.value.split('\n')
        df = pd.DataFrame()
        line_num = 0
        for rule in rules:
            line_num += 1
            logging.info("-"*15)
            logging.info(f"INFO:__main__:generate_commands:$    begin line {line_num} : {rule}")
            
            try:
                determined = Rule(rule)
            except Exception as error:
                logging.error(f"ERROR:__main__:generate_commands: exception {error} ----> FAILED TO DETERMINE : {rule}", exc_info=True)
                pass_fail_indicator.value = False
                
            try:
                correlated = Correlation(determined.from_address, determined.to_address, correlations_file)
            except Exception as error:
                logging.error(f"ERROR:__main__:generate_commands: exception {error} ----> FAILED TO CORRELATE : {rule}", exc_info=True)
                pass_fail_indicator.value = False
                
            df = df.append({
                'Action': determined.action,
                'From_Environment': determined.from_environment,
                'From_Address': determined.from_address,
                'From_Port': determined.from_port,
                'To_Environment': determined.to_environment,
                'To_Address': determined.to_address,
                'To_Port': determined.to_port,
                'Protocol': determined.protocol,
                'ASA_Address': correlated.asa_address,
                'ASA_Context': correlated.asa_context,
                'ASA_ACL': correlated.asa_access_list
            }, ignore_index=True)
        rule_dataframe = df
        grid[3:, :1] = output_dataframe(rule_dataframe)
        
        logging.info("-"*15)
        logging.info(f"INFO:__main__:generate_commands:$    determinations and correlations completed -- begin groupings")
        logging.info("-"*15)
        
        try:
            rule_groupings = Grouper(rule_dataframe).groups
        except Exception as error:
            logging.error(f"ERROR:__main__:generate_commands: exception {error} ----> FAILED WHILE GROUPING", exc_info=True)
            pass_fail_indicator.value = False
        
        logging.info("-"*15)
        logging.info(f"INFO:__main__:generate_commands:$    object groupings completed -- begin generating commands")
        logging.info("-"*15)
        
        try:
            generated_commands = Commands(rule_groupings, remark, settings_file).commands
        except Exception as error:
            logging.error(f"ERROR:__main__:generate_commands: exception {error} ----> FAILED TO GENERATE COMMANDS", exc_info=True)
            pass_fail_indicator.value = False
        
    except Exception as error:
        logging.error(f"ERROR:__main__:generate_commands:$   exception {error} ----> UNCAUGHT ERROR", exc_info=True)
        pass_fail_indicator.value = False


In [None]:
#1: raw_input_textarea

raw_input_textarea = widgets.Textarea(placeholder='''PASTE RAW FIREWALL REQUEST -- EXPAND TO VIEW DETAILS\n
Add Production X.X.X.X any Production X.X.X.X 8080 TCP
Add Development X.X.X.X/22 any Development X.X.X.X 53 UDP
Add Testing X.X.X.X any Testing X.X.X.X-254 HTTPS TCP
Add Staging X.X.X.X any Staging X.X.X.X 67, 68 UDP
Add Production X.X.X.X any Production X.X.X.X 23 TCP

Standard Format (as directly copied):
Action From_Environment From_Address From_Port To_Environment To_Address To_Port Protocol

Recognizable Formatting:
(All fields separated by any amount of whitespace && ignore case)

ACTIONS: Add || Remove || Ignore
ENVIRONMENTS: Testing  ||  Test  ||  Production  ||  Prod  ||  Development  ||  Dev  ||  Staging  ||  Stag || Ignore
ADDRESSES: 192.168.0.1 (Singular) || 192.168.0.1-150 (Range) || 192.168.0.1-192.168.0.150 (Range-Full) || 192.168.0.1/24 (Subnet)
PORTS: Any || Ephemeral || 443 (Singular) || 8800-8804 (Range) || 22, 80, 443 (Multiport)
PROTOCOLS: TCP || UDP || TCP/UDP || UDP/TCP
''', layout=widgets.Layout(width='auto', height='300px'))
#display(raw_input_textarea)

raw_input_textarea_interactive = widgets.interactive(port_names_to_port_numbers, raw_rules=raw_input_textarea)

In [None]:
#2: correlations_text
correlations_text = widgets.Text(
    placeholder='/filepath/to/correlations.json',
    disabled=False,
    layout=widgets.Layout(width='auto', height='auto')
)
#display(correlations_text)

In [None]:
#3: object_group_text
object_group_text = widgets.Text(
    placeholder='/filepath/to/settings.json',
    disabled=False,
    layout=widgets.Layout(width='auto', height='auto'),
)
#display(object_group_text)

In [None]:
#4: remark_text
remark_text = widgets.Text(
    placeholder=f'TKT000123 <username> {datetime.datetime.now().date()}',
    disabled=False,
    layout=widgets.Layout(width='auto', height='auto'),
)
#display(remark_text)

In [None]:
#5: generate_button
generate_button = widgets.Button(
    description='Generate',
    tooltip='Generate Output',
    disabled=False,
    button_style='success',
    layout=widgets.Layout(width='100%', height='80%')
)

def generate_button_onclick(_):
    logging.info("-"*15)
    logging.info("INFO:__main__:generate_button_onclick:$ generate button clicked")
    global remark
    global correlations_file
    global settings_file
    try:
        if correlations_text.value:
            correlations_file = correlations_text.value
        elif not correlations_text.value:
            correlations_file = "correlations.json"
    except Exception as error:
        logging.info(f"INFO:__main__:generate_button_onclick:$ failed to load correlations")
    try:
        if object_group_text.value:
            settings_file = object_group_text.value
        elif not object_group_text.value:
            settings_file = "settings.json"
    except Exception as error:
        logging.info(f"INFO:__main__:generate_button_onclick:$ failed to load settings")
    try:
        if remark_text.value:
            remark = remark_text.value
        elif not remark_text.value:
            remark = ''
        pass_fail_indicator.value = True
        generate_commands()
    except Exception as error:
        logging.info(f"INFO:__main__:generate_button_onclick:$ command generation failed : exception {error}")
        pass_fail_indicator.value = False
    try:
        output_textarea.value = '\n'.join(generated_commands)
    except Exception as error:
        logging.info(f"INFO:__main__:generate_button_onclick:$ command output failed : exception {error}")
    
generate_button.on_click(generate_button_onclick)
    
#display(generate_button)

In [None]:
#6: pass_fail_indicator
pass_fail_indicator = widgets.Valid(
    value=True,
    description='Pass/Fail:',
)
#display(pass_fail_indicator)

In [None]:
#7 output_dataframe
def output_dataframe(rule_dataframe):
    out = widgets.Output(layout=widgets.Layout(width='auto', height='auto', border='3px solid gray'))
    with out:
        display(rule_dataframe)
    return out

In [None]:
#8: output_text
output_textarea = widgets.Textarea(placeholder='Output Commands', value='', layout=widgets.Layout(width='100%', height='100%', border='3px solid gray'))
#display(output_textarea)

In [None]:
grid = widgets.GridspecLayout(9, 4, 
                              height='auto', 
                              width='auto',
                              grid_gap="10px 10px",
                             layout=widgets.Layout(
                                    grid_template_columns='',
                                    grid_template_rows='',
                                    border='3px solid gray'))

grid[0:3, 0:] = raw_input_textarea
grid[3:, 1:3] = output_textarea
grid[3, 3] = correlations_text
grid[4, 3] = object_group_text
grid[5, 3] = remark_text
grid[6, 3] = pass_fail_indicator
grid[7, 3] = generate_button
grid[3:, :1] = output_dataframe(rule_dataframe)
display(grid)