In [21]:
from SophosFirewallAPI import Firewall, EQ, NOT, LIKE
import json

# Firewall Credentials
# JSON File
# {
#     "firewall_ip": "<FIREWALL_IP_ADDRESS>",
#     "username": "<USER_NAME>",
#     "port" : "<FIREWALL_PORT>"
#     "password": "<PASSWORD>",
#      "certificate_verify": <true|false>,
#     "password_encrypted": <true|false>
# }

firewall_info = json.load(open("Credentials\\firewall_access.json"))
username = firewall_info["username"]
password = firewall_info["password"]
firewall_ip = firewall_info["firewall_ip"]
port = firewall_info["port"]
certificate_verify = firewall_info["certificate_verify"]
password_encrypted = firewall_info["password_encrypted"]


def dict_print(data_dict):
    """Converts a dictionary to a JSON string with indentation and prints it."""
    # Convert dictionary to JSON string with indentation
    json_string = json.dumps(data_dict, indent=4)

    # Print the JSON string
    print(json_string)


firewall = Firewall(username, password, firewall_ip, port, certificate_verify=certificate_verify, password_encrypted=password_encrypted)

In [None]:
# Demo create, read, update, delete
#

print("CREATE :: ", firewall.create("IPHost", {"Name": "TEST 1", "IPFamily": "IPv4", "HostType": "IP", "IPAddress": "172.16.17.100"}))
print("CREATE :: ", firewall.create("IPHost", {"Name": "TEST 2", "IPFamily": "IPv4", "HostType": "IP", "IPAddress": "172.16.17.100"}))
print("CREATE :: ", firewall.create("IPHost", {"Name": "TEST 3", "IPFamily": "IPv4", "HostType": "IP", "IPAddress": "172.16.17.100"}))
print("CREATE :: ", firewall.create("IPHost", {"Name": "TEST 4", "IPFamily": "IPv4", "HostType": "IP", "IPAddress": "172.16.17.100"}))

print("\nREAD :: ")
response = firewall.read("IPHost", "TEST", LIKE)
print(f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")
for index, item in enumerate(response["data"], start=1):
    print(f"{index:03}: {item}")

print("\nUPDATE :: ", firewall.update("IPHost", {"Name": "TEST 1", "IPFamily": "IPv4", "HostType": "IP", "IPAddress": "172.16.17.101"}))
print("UPDATE :: ", firewall.update("IPHost", {"Name": "TEST 2", "IPFamily": "IPv4", "HostType": "IP", "IPAddress": "172.16.17.102"}))
print("UPDATE :: ", firewall.update("IPHost", {"Name": "TEST 3", "IPFamily": "IPv4", "HostType": "IP", "IPAddress": "172.16.17.103"}))
print("UPDATE :: ", firewall.update("IPHost", {"Name": "TEST 4", "IPFamily": "IPv4", "HostType": "IP", "IPAddress": "172.16.17.104"}))

print("\nREAD :: ")
response = firewall.read("IPHost", "TEST", LIKE)
print(f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")
for index, item in enumerate(response["data"], start=1):
    print(f"{index:03}: {item}")

print(
    "\nDELETE :: ",
)
response = firewall.delete("IPHost", "TEST 1")
print("DELETE :: TEST 1 ->", f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")
response = firewall.delete("IPHost", "TEST 2")
print("DELETE :: TEST 2 ->", f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")

print("\nREAD :: ")
response = firewall.read("IPHost", "TEST", LIKE)
print(f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")
for index, item in enumerate(response["data"], start=1):
    print(f"{index:03}: {item}")

print("\nDELETE :: ")
response = firewall.delete("IPHost", "TEST", LIKE)
print(f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")

print("\nREAD :: ")
response = firewall.read("IPHost", "TEST", LIKE)
print(f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")
for index, item in enumerate(response["data"], start=1):
    print(f"{index:03}: {item}")


print("\nREAD :: FirewallRule")
response = firewall.read("FirewallRule")
print(f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")
for index, item in enumerate(response["data"], start=1):
    print(f"{index:03}: {item}")

In [None]:
# Demo create, read, update, delete
# Using context manager
#
with Firewall(username, password, firewall_ip, port, certificate_verify=certificate_verify, password_encrypted=password_encrypted) as firewall:

    print("CREATE :: ", firewall.create("IPHost", {"Name": "TEST 1", "IPFamily": "IPv4", "HostType": "IP", "IPAddress": "172.16.17.100"}))
    print("CREATE :: ", firewall.create("IPHost", {"Name": "TEST 2", "IPFamily": "IPv4", "HostType": "IP", "IPAddress": "172.16.17.100"}))
    print("CREATE :: ", firewall.create("IPHost", {"Name": "TEST 3", "IPFamily": "IPv4", "HostType": "IP", "IPAddress": "172.16.17.100"}))
    print("CREATE :: ", firewall.create("IPHost", {"Name": "TEST 4", "IPFamily": "IPv4", "HostType": "IP", "IPAddress": "172.16.17.100"}))

    print("\nREAD :: ")
    response = firewall.read("IPHost", "TEST", LIKE)
    print(f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")
    for index, item in enumerate(response["data"], start=1):
        print(f"{index:03}: {item}")

    print("\nUPDATE :: ", firewall.update("IPHost", {"Name": "TEST 1", "IPFamily": "IPv4", "HostType": "IP", "IPAddress": "172.16.17.101"}))
    print("UPDATE :: ", firewall.update("IPHost", {"Name": "TEST 2", "IPFamily": "IPv4", "HostType": "IP", "IPAddress": "172.16.17.102"}))
    print("UPDATE :: ", firewall.update("IPHost", {"Name": "TEST 3", "IPFamily": "IPv4", "HostType": "IP", "IPAddress": "172.16.17.103"}))
    print("UPDATE :: ", firewall.update("IPHost", {"Name": "TEST 4", "IPFamily": "IPv4", "HostType": "IP", "IPAddress": "172.16.17.104"}))

    print("\nREAD :: ")
    response = firewall.read("IPHost", "TEST", LIKE)
    print(f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")
    for index, item in enumerate(response["data"], start=1):
        print(f"{index:03}: {item}")

    print(
        "\nDELETE :: ",
    )
    response = firewall.delete("IPHost", "TEST 1")
    print("DELETE :: TEST 1 ->", f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")
    response = firewall.delete("IPHost", "TEST 2")
    print("DELETE :: TEST 2 ->", f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")

    print("\nREAD :: ")
    response = firewall.read("IPHost", "TEST", LIKE)
    print(f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")
    for index, item in enumerate(response["data"], start=1):
        print(f"{index:03}: {item}")

    print("\nDELETE :: ")
    response = firewall.delete("IPHost", "TEST", LIKE)
    print(f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")

    print("\nREAD :: ")
    response = firewall.read("IPHost", "TEST", LIKE)
    print(f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")
    for index, item in enumerate(response["data"], start=1):
        print(f"{index:03}: {item}")


    print("\nREAD :: FirewallRule")
    response = firewall.read("FirewallRule")
    print(f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")
    for index, item in enumerate(response["data"], start=1):
        print(f"{index:03}: {item}")

In [24]:
# Initialising firewall object with custom settings
#
firewall = Firewall(username, password, firewall_ip, port, certificate_verify=certificate_verify, password_encrypted=password_encrypted)

In [None]:
# FirewallRUle will be filtered for NAT string in Name
entity_type = "FirewallRule"
entity_name = "AZURE"

print(f"\nREAD :: {entity_type} entity_type with {entity_name} in the 'Name' ")
response = firewall.read(entity_type, entity_name, LIKE)
print(f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")
for index, item in enumerate(response["data"], start=1):
    print(f"{index:03}: {item}")

In [26]:
entity_type = "FirewallRule"

Data = {
    "Name": "NEW Firewall Rule",
    "Description": "NEW Firewall Rule Created from API",
    "IPFamily": "IPv4",
    "Status": "Enable",
    "Position": "Top",
    "PolicyType": "Network",
    # "After": {"Name": "NEW Rule"}, # depends on "Position"
    "NetworkPolicy": {
        "Action": "Accept",
        "LogTraffic": "Enable",
        #     "SkipLocalDestined": "Disable",
        "SourceZones": {"Zone": ["LAN", "DMZ"]},
        "DestinationZones": {"Zone": "WAN"},
        "SourceNetworks": {"Network": ["192.168.30.0/24", "192.168.10.0/24"]},
        # "DestinationNetworks": {"Network": ["192.168.30.0/24", "192.168.10.0/24"]},
        #     "Schedule": "All The Time",
        #     "Services": {"Service": "RDP"},   # Do not include for "Any service"
        #     "DestinationNetworks": {"Network": "#Port2:1"},
        #     "DSCPMarking": "0",
        #     "WebFilter": "None",
        #     "WebCategoryBaseQoSPolicy": None,
        #     "BlockQuickQuic": "Disable",
        #     "ScanVirus": "Disable",
        #     "ZeroDayProtection": "Disable",
        #     "ProxyMode": "Disable",
        #     "DecryptHTTPS": "Disable",
        #     "ApplicationControl": "None",
        #     "ApplicationBaseQoSPolicy": None,
        #     "IntrusionPrevention": "None",
        #     "TrafficShappingPolicy": "None",
        #     "ScanSMTP": "Enable",
        #     "ScanSMTPS": "Enable",
        #     "ScanIMAP": "Enable",
        #     "ScanIMAPS": "Enable",
        #     "ScanPOP3": "Enable",
        #     "ScanPOP3S": "Enable",
        #     "ScanFTP": "Enable",
        #     "SourceSecurityHeartbeat": "Disable",
        #     "MinimumSourceHBPermitted": "No Restriction",
        #     "DestSecurityHeartbeat": "Disable",
        #     "MinimumDestinationHBPermitted": "No Restriction",
    },
}

In [None]:
print("create :: ", firewall.create(entity_type, Data))

In [None]:
print("read :: ", firewall.read(entity_type, Data["Name"]))

In [None]:
print("update :: ", firewall.update(entity_type, Data))

In [None]:
print("delete :: ", firewall.delete(entity_type, Data["Name"]))

In [None]:
# Read all FirewallRules
#
entity_type = "FirewallRule"

print(f"\nREAD :: {entity_type}")
response = firewall.read(entity_type)
print(f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")
for index, item in enumerate(response["data"], start=1):
    print(f"{index:03}: {item}")

In [None]:
# 'DNAT' in the 'Name' of the 'FirewallRule'
#
response = firewall.read(entity_type, "DNAT", LIKE)
print(f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")
for index, item in enumerate(response["data"], start=1):
    print(f"{index:03}: {item}")

In [33]:
# WebFilterURLGroup
entity_type = "WebFilterURLGroup"

# WebFilterURLGroup Template. Use read(entity_type) method to obtain
entity_data_1 = {
    "Name": "TEST 1",
    "Description": "TEST 1",
    "URLlist": {"URL": ["www.example.com", "test.com", "best.com"]},
}


entity_data_2 = {
    "Name": "TEST 2",
    "Description": "TEST 2",
    "URLlist": {"URL": ["www.example.com", "test.com", "best.com"]},
}

In [None]:
response = firewall.read(entity_type)

print("Current WebfilterURLGroups")
print(f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")
for index, item in enumerate(response["data"], start=1):
    print(f"{index:03}: {item}")

In [None]:
entity_type = "WebFilterURLGroup"
entity_name = "TEST"

print("create :: ", firewall.create(entity_type, entity_data_1))
print("create :: ", firewall.create(entity_type, entity_data_2))

response = firewall.read(entity_type)
print("\nCurrent WebfilterURLGroups", end="")
print(" -> Code:", f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")
for index, item in enumerate(response["data"], start=1):
    print(f"{index:03}: {item}")

print(f"\nCurrent WebfilterURLGroups with partial match the {entity_name}")
response = firewall.read(entity_type, entity_name, LIKE)
print("Status:", f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")
for index, item in enumerate(response["data"], start=1):
    print(f"{index:03}: {item}")

print(f"\nCurrent WebfilterURLGroups Delete all entity with partial match the {entity_name}")
print("delete :: ", firewall.delete(entity_type, entity_name, LIKE))  # Delete all entity with partial match the entity_name

response = firewall.read(entity_type)
print("\nCurrent WebfilterURLGroups", end="")
print(" -> Code:", f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")
for index, item in enumerate(response["data"], start=1):
    print(f"{index:03}: {item}")

print("\nDelete again :: ", firewall.delete(entity_type, entity_name, LIKE))

In [None]:
# Read all IPHost entities with entity_name (Internet) in the Name
#
entity_type = "IPHost"
entity_name = "Internet"

print(f"\nREAD :: {entity_type} entity with {entity_name} in the 'Name'")
response = firewall.read(entity_type, entity_name, LIKE)
print("Status:", f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")
for index, item in enumerate(response["data"], start=1):
    print(f"{index:03}: {item}")

In [None]:
for item in open("entity_type.txt", "r"):
    print("\n\n", item, end="")
    if item.strip() not in [
        "Certificate",
        "CertificateAuthority",
        "SelfSignedCertificateAuthority",
        "CRL",
        "FormTemplate",
        "FourEyeAuthenticationDeAnonymize",
        "IviewCustomLogo",
    ]:
        response = firewall.read(item.strip())
        print("Status:", f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")

        for index, item in enumerate(response["data"], start=1):
            print(f"{index:02}: {item}")

In [None]:
response = firewall.read("IPHost", "##ALL_RW", EQ)
print(f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")
for index, item in enumerate(response["data"], start=1):
    print(f"{index:02}: {item}")

In [None]:
from SophosFirewallAPI import Firewall, EQ, NOT, LIKE

firewall = Firewall(username, password, hostname=firewall_ip)

response = firewall.read("FirewallRule")

print(f"Status: {response['status']} | Message: {response['message']} | Data: {response['data'] if response['data'] else 'No data available'}")
for index, item in enumerate(response["data"], start=1):
    print(f"{index:02}: {item}")