In [15]:
import os
import sys

sys.path.insert(0, os.path.dirname(os.getcwd()))

# Importing the Firewall API
from firewall_api import Firewall, LIKE, EQ, NOT
import json
import ipaddress
import csv

# Firewall Credentials
# JSON File
# {
#     "firewall_ip": "<FIREWALL_IP_ADDRESS>",
#     "username": "<USER_NAME>",
#     "port" : "<FIREWALL_PORT>"
#     "password": "<PASSWORD",
#     "password_encrypted": <true|false>
# }

firewall_info = json.load(open("../Credentials/firewall_access.json"))
username = firewall_info["username"]
password = firewall_info["password"]
firewall_ip = firewall_info["firewall_ip"]
port = firewall_info["port"]
password_encrypted = firewall_info["password_encrypted"]

firewall = Firewall(
    username,
    password,
    firewall_ip,
    port,
    certificate_verify=False,
    password_encrypted=True,
)

In [16]:
# Test connection
print(f"Connection Test ::  {firewall.read("Login")}\n")

Connection Test ::  {'status': '216', 'message': 'Operation Successful.', 'data': [{'status': 'Authentication Successful'}]}



In [None]:
# Function to create and log entities
def create_entity(firewall, entity_type, entity_data, print_data=False, print_result=False):
    if print_data:
        print(f"Creating {entity_type} with data: {entity_data}")
    response = firewall.create(entity_type, entity_data)
    if print_result:
        print(f"CREATE :: {response}\n")
    return response


# Function to update and log entities
def update_entity(firewall, entity_type, entity_data, print_data=False, print_result=False):
    if print_data:
        print(f"Updating {entity_type} with data: {entity_data}")
    response = firewall.update(entity_type, entity_data)
    if print_result:
        print(f"UPDATE :: {response}\n")
    return response


# Function to read and log entities
def read_entity(firewall, entity_type, entity_data={}, filter_selector=LIKE, print_data=False, print_result=False):
    if print_data:
        print(f"Reading {entity_type} with filter: {entity_data} (Filter criteria: {filter_selector})")
    response = firewall.read(entity_type, entity_data, filter_selector)
    if print_result:
        print(f"READ :: {response}\n")
    return response


# Function to delete and log entities
def delete_entity(firewall, entity_type, entity_data, filter_selector=EQ, print_data=False, print_result=False):
    if print_data:
        print(f"Deleting {entity_type} with filter: {entity_data} (Filter criteria: {filter_selector})")
    response = firewall.delete(entity_type, entity_data, filter_selector)
    if print_result:
        print(f"DELETE :: {response}\n")
    return response

In [None]:
# Load objects from CSV
with open("IPaddresses.csv", "r", encoding="UTF8", newline="") as csv_file:
    list_of_objects = list(csv.DictReader(csv_file))

# Common constants
PREFIX = "BFA_"  # Prefix for all objects
COMMON_DESCRIPTION = "Created by Integrity360 Sophos Firewall API"

In [None]:
# Create IPHostGroup
ip_host_group_name = f"{PREFIX}Brute-force attack Group"
create_entity(
    firewall,
    "IPHostGroup",
    {
        "Name": ip_host_group_name,
        "IPFamily": "IPv4",
        "Description": COMMON_DESCRIPTION,
    },
)

In [None]:
# Process each object from the CSV
for item in list_of_objects:
    entity_type = None
    entity_data = {}

    if item["type"] == "ipv4":
        entity_type = "IPHost"
        entity_data["IPFamily"] = "IPv4"

        if "-" in item["data"]:  # IP range
            start_ip, end_ip = item["data"].replace(" ", "").split("-")
            entity_data.update(
                {
                    "Name": f"{PREFIX}IPR_{item['data']}",
                    "HostType": "IPRange",
                    "StartIPAddress": start_ip,
                    "EndIPAddress": end_ip,
                }
            )
        else:  # Single IP or Network
            network = ipaddress.IPv4Network(item["data"])
            if str(network.netmask) == "255.255.255.255":  # Single IP
                entity_data.update(
                    {
                        "Name": f"{PREFIX}IPH_{item['data']}",
                        "HostType": "IP",
                        "IPAddress": str(network.network_address),
                    }
                )
            else:  # Network
                entity_data.update(
                    {
                        "Name": f"{PREFIX}IPNW_{item['data']}",
                        "HostType": "Network",
                        "IPAddress": str(network.network_address),
                        "Subnet": str(network.netmask),
                    }
                )

        entity_data["HostGroupList"] = {"HostGroup": ip_host_group_name}

    else:
        print(f"Unsupported type: {item['type']} - Skipping item.")
        continue  # Skip unsupported types

    # Add common properties and create entity
    entity_data["Description"] = COMMON_DESCRIPTION
    create_entity(firewall, entity_type, entity_data, print_data=True)

In [None]:
entity_type = "LocalServiceACL"
rule_name = f"{PREFIX}Brute-force attack ACL"

In [None]:
entity_data = {
    "RuleName": rule_name,
    "Description": "Added according to Sophos recommendations: https://support.sophos.com/support/s/article/KBA-000009932",
    "Position": "Top",
    "IPFamily": "IPv4",
    "SourceZone": "WAN",
    "Hosts": {"Host": ip_host_group_name},
    "Services": {
        "Service": [
            "HTTPS",
            "SSH",
            "DNS",
            "Ping",
            "SSLVPN",
            "SNMP",
            "IPsec",
            "SMTPRelay",
            "RED",
            "UserPortal",
            "DynamicRouting",
            "VPNPortal",
            "40",
        ]
    },
    "Action": "drop",
}

create_entity(firewall, entity_type, entity_data, print_data=True)

In [None]:
entity_name = rule_name
entity_type = "LocalServiceACL"
read_entity(firewall, entity_type, entity_name, print_data=True)

In [None]:
for item in enumerate(read_entity(firewall, "LocalServiceACL", print_data=True).get('data',[])):
    print(item)


In [None]:
firewall.read("LocalServiceACL", f"{PREFIX}", filter_selector=LIKE)

In [None]:
#
#   DELETE
#
# firewall = Firewall(username, password, firewall_ip, port, certificate_verify, password_encrypted)
#

entity_name = rule_name
entity_type = "LocalServiceACL"
delete_entity(firewall, entity_type, entity_name,EQ, print_data=True)

In [None]:
entity_name = "BFA_"

entity_type = "LocalServiceACL"
delete_entity(firewall, entity_type, entity_name, LIKE, print_data=True)


In [None]:

entity_type = "IPHostGroup"
delete_entity(firewall, entity_type, entity_name, LIKE, print_data=True)

entity_type = "IPHost"
delete_entity(firewall, entity_type, entity_name, LIKE, print_data=True)