初期設定

In [None]:
%run /notebooks/startup.py

スナップショットの作成

In [None]:
SNAPSHOT_PATH = "networks/example-filters/current" #対象フォルダの指定
SNAPSHOT_NAME = "current" #スナップショット名

セッションの作成

In [None]:
bf = Session(host="localhost") #ホストの指定（batfishの動いているローカルを指定）
bf.set_network("network-example-filters") #ネットワーク名の指定
bf.init_snapshot(SNAPSHOT_PATH, name=SNAPSHOT_NAME, overwrite=True) #スナップショットの初期化

ACLで無駄な行があるかを検索

In [None]:
aclAns = bf.q.filterLineReachability(nodes="rtr-with-acl").answer()
show(aclAns.frame().sort_values(by="Unreachable_Line"))

特定のIPからの疎通確認

In [None]:
flow_name = HeaderConstraints(srcIps="10.10.10.1",#送信元アドレス
                            dstIps="218.8.104.58",#宛先アドレス
                            applications=["dns"])#アプリケーション
answer = bf.q.testFilters(headers=flow_name,
                        nodes="rtr-with-acl",#解析するACLを持つ機器のホスト名
                        filters="acl_in").answer()#解析するACLの名前
show(answer.frame())

特定のサブネットからの疎通確認

In [None]:
traffic_name = HeaderConstraints(srcIps="10.10.10.0/24",#送信元サブネットを記入
                                dstIps="218.8.104.58",#宛先IPを記入
                                ipProtocols=["tcp","udp"])#プロトコルを指定
answer = bf.q.searchFilters(headers=traffic_name,
                            action="deny", #"deny"or"permit"を記入
                            filters="acl_in").answer()
show(answer.frame())

トレースルートの確認

In [None]:
SNAPSHOT_PATH = "networks/example" #config格納のパスを指定

# Import packages
%run startup.py
bf = Session(host="localhost")

NETWORK_NAME = "example_network"
SNAPSHOT_NAME = "example_snapshot"

bf.set_network(NETWORK_NAME)
bf.init_snapshot(SNAPSHOT_PATH, name=SNAPSHOT_NAME, overwrite=True)


In [None]:
# start the traceroute from the Loopback0 interface of as3core1 to host1
headers = HeaderConstraints(dstIps='host1') #宛先のIP or ホスト名を指定
tracert = bf.q.traceroute(startLocation="as3core1[Loopback0]",  headers=headers).answer().frame() #開始ホスト名[開始ポート]を指定

show(tracert)

ルーティングテーブルの表示

In [None]:
routes_all = bf.q.routes().answer().frame()
show(routes_all)

サブネットの一覧表示

In [None]:
ip = bf.q.ipOwners().answer().frame()
show(ip)

In [None]:
import ipaddress

# サブネット一覧を表示
subnets = bf.q.ipOwners().answer().frame()
# 表示はIP/MASKの形式
df = subnets.loc[:,['IP','Mask']]

# IPアドレスとサブネットマスクを結合してIPアドレス/サブネットマスクの形式に変換する関数
def to_cidr(ip, mask):
    return f"{ip}/{mask}"

# applyメソッドを使って各行のIPアドレスとサブネットマスクを結合し、配列に保存
ip_subnet_array = df.apply(lambda row: to_cidr(row['IP'], row['Mask']), axis=1).to_numpy()

# 結果を表示
print(ip_subnet_array)

In [None]:
import ipaddress

# サブネット一覧を表示
subnets = bf.q.ipOwners().answer().frame()
# 表示はIP/MASKの形式
df = subnets.loc[:, ['IP', 'Mask', 'Interface']]  # 'Interface'列も取得する

# IPアドレスとサブネットマスクを結合してIPアドレス/サブネットマスクの形式に変換する関数
def to_cidr(row):
    # "Interface"が"Loopback0"の場合は処理をスキップしてNoneを返す
    if row['Interface'] == 'Loopback0':
        return None
    return f"{row['IP']}/{row['Mask']}"

# applyメソッドを使って各行のIPアドレスとサブネットマスクを結合し、配列に保存
ip_subnet_array = df.apply(to_cidr, axis=1).dropna().to_numpy()

# ネットワークをまとめる
def aggregate_networks(ip_subnet_array):
    networks = []
    for item in ip_subnet_array:
        network = ipaddress.IPv4Network(item, strict=False)
        networks.append(network)
    aggregated_networks = ipaddress.collapse_addresses(networks)
    return [str(net) for net in aggregated_networks]

# 結果を表示
print("結果:", aggregate_networks(ip_subnet_array))


Vlan情報の出力

In [None]:
SNAPSHOT_PATH = "input/test" #対象フォルダの指定
SNAPSHOT_NAME = "test" #スナップショット名
bf = Session(host="localhost") #ホストの指定（batfishの動いているローカルを指定）
bf.set_network("test") #ネットワーク名の指定
bf.init_snapshot(SNAPSHOT_PATH, name=SNAPSHOT_NAME, overwrite=True) #スナップショットの初期化

In [None]:
result = bf.q.switchedVlanProperties().answer().frame()
show(result)

In [None]:
result = bf.q.interfaceProperties().answer().frame()
show(result)

In [None]:
import ipaddress

# サブネット一覧を表示
interface_info = bf.q.interfaceProperties().answer().frame()
# 表示はIP/MASKの形式
df = interface_info.loc[:, ['Interface', 'All_Prefixes']]

# "Interface"に"Vlan"が含まれる行のみを抽出
vlan_df = df.query('Interface.astype("str").str.contains("Vlan", na = False)', engine='python')

vlan_df


In [None]:
import ipaddress

# サブネット一覧を表示
interface_info = bf.q.interfaceProperties().answer().frame()
# 表示はIP/MASKの形式
df = interface_info.loc[:, ['Interface', 'All_Prefixes']]

# "Interface"に"Vlan"が含まれる行のみを抽出
vlan_df = df.query('Interface.astype("str").str.contains("Vlan", na=False)', engine='python')

network_addresses = []
for index, row in vlan_df.iterrows():
    # "router[]"を削除して、インターフェース名を取得
    interface_name = str(row['Interface']).replace('router[', '').rstrip(']')
    # "All_Prefixes"列からIPアドレスを取得して、ネットワークアドレスに変換
    all_prefixes = row['All_Prefixes']
    if all_prefixes:
        # ネットワークアドレスを取得
        network_address = all_prefixes[0]
        network_addresses.append([interface_name, network_address])
    else:
        network_addresses.append([interface_name, None])

network_addresses


ACLの出力

In [None]:
SNAPSHOT_PATH = "input/test" #対象フォルダの指定
SNAPSHOT_NAME = "test" #スナップショット名
bf = Session(host="localhost") #ホストの指定（batfishの動いているローカルを指定）
bf.set_network("test") #ネットワーク名の指定
bf.init_snapshot(SNAPSHOT_PATH, name=SNAPSHOT_NAME, overwrite=True) #スナップショットの初期化

In [None]:
result = bf.q.testFilters(headers=HeaderConstraints(dstIps='192.168.20.0/24', ipProtocols= ['tcp'])).answer().frame()

result = result.sort_values(by=['Filter_Name', 'Flow'])

result

In [None]:
result = bf.q.searchFilters(headers=HeaderConstraints()).answer().frame()
result