In [None]:

from IPython.display import clear_output

from ipaddress import ip_address, ip_network, IPv4Network, IPv6Network
import panel as pn
import ipywidgets as widgets
import pandas as pd
import altair as alt
import pathlib
import datetime

pn.extension('vega')

SOURCE_AND_DEST_IP = ["SOURCE IP ADDRESS", "DESTINATION IP ADDRESS"]

SOURCE_IP = SOURCE_AND_DEST_IP[0]
DEST_IP = SOURCE_AND_DEST_IP[1]

file_path = pathlib.Path("Task 1\\20240701_19-20_100l.csv")
dir_path = pathlib.Path().resolve()

csv_file = pd.read_csv(dir_path / file_path)
data: pd.DataFrame = pd.DataFrame(csv_file)

def get_first_time_flow_string():
    return data["START TIME - FIRST SEEN"].min()

def get_last_time_flow_string():
    return data["START TIME - FIRST SEEN"].max()

def get_first_time_flow():
    return datetime.datetime.fromisoformat(get_first_time_flow_string())

def get_last_time_flow():
    return datetime.datetime.fromisoformat(get_last_time_flow_string())


def try_convert_ip_to_int(x):
    try:
        return int(ip_address(x))
    except Exception:
        return 0

def to_datetime(x):
    return datetime.datetime.fromisoformat(x)

def time_to_seconds(time_str):
    time_parts = time_str.split()
    minutes = int(time_parts[0].strip('m'))
    seconds = int(time_parts[1].strip('s'))

    total_minutes = minutes * 60 + seconds
    return total_minutes

style = {'description_width': 'initial'}

apply_changes_widget = widgets.Button(
    description='Apply changes',
    tooltip='Apply changes',
    disabled=False,
    button_style='success', # 'success', 'info', 'warning', 'danger' or ''
    style = style,
)

top_n_widget = widgets.Dropdown(
    options=['10', '20', '30', '40'],
    value='10',
    description='Top N stats',
    disabled=False,
    style = style,
)

single_or_pair_widget = widgets.ToggleButtons(
    options=['Single', 'Pair'],
    description='Info about single IP adresss or pair:',
    disabled=False,
    button_style='info', # 'success', 'info', 'warning', 'danger' or ''
    style = style,
)

src_dst_widget = widgets.ToggleButtons(
    options=['Received', 'Sent'],
    description='Direction:',
    disabled=False,
    button_style='info', # 'success', 'info', 'warning', 'danger' or ''
    style=style,
)

data_type_widget = widgets.ToggleButtons(
    options=['Bytes', 'Packets', 'Flows', 'Duration with dst IP'],
    description='Data type:',
    disabled=False,
    button_style='info', # 'success', 'info', 'warning', 'danger' or ''
    style = style,
)

period_widget = widgets.IntRangeSlider(
    min=0,
    max=36000,
    value=[0, 36000],
    step=5,
    description="Time range",
    disabled=False,
    style = style,
)

ipv4_range_widget = widgets.Textarea(
    value='0.0.0.0/0',
    description='List of IPv4 masks, separeted by \",\"',
    disabled=False,
    style = style,
)

ipv6_range_widget = widgets.Textarea(
    value='0:0:0:0:0:0:0:0/0',
    description='List of IPv6 masks, separeted by \",\"',
    disabled=False,
    style = style,
    layout = widgets.Layout(width='50%')
)

protocol_widget = widgets.Dropdown(
    options=["ALL", "UDP", "SMTP",
             "DNS", "HTTP", "HTTPS",
             "SMB", "TELNET",
             "RDP", "FTP", "TFTFP"
            ],
    description='Protocol',
    disabled=False,
    style = style,
)

port_widget = widgets.Dropdown(
    options=["ALL",
             "8080, 8888, 591, 82",
             "8443, 9443, 4443",
             "5353, 5355",
             "2121, 8021",
             "2525, 26",
             "3390, 3391, 4000",
            ],
    description='Ports',
    disabled=False,
    style = style,
)

minimal_duration_widget = widgets.Text(
    value='disabled',
    description='Minimal duration of a flow in seconds',
    disabled=True,
    style = style,
    layout = widgets.Layout(width='50%')
)

ui_widgets = [
              top_n_widget, single_or_pair_widget, src_dst_widget, data_type_widget,
              minimal_duration_widget, period_widget, ipv4_range_widget, ipv6_range_widget,
              protocol_widget, port_widget, apply_changes_widget,
              ]

def process_flow_pair(curr_data, top_n_n):
    tmp_data = curr_data[[SOURCE_IP, DEST_IP]].apply(lambda x: tuple(sorted(x)), axis=1).value_counts().reset_index()
    tmp_data.columns = ['pair', 'flows']
    tmp_data[[SOURCE_IP, DEST_IP]] = pd.DataFrame(tmp_data['pair'].tolist(), index=tmp_data.index)
    processed_data = tmp_data.drop(columns=['pair']).nlargest(top_n_n, columns='flows')
    processed_data['src-dst'] = processed_data[SOURCE_IP] + ' - ' + processed_data[DEST_IP]
    processed_data = processed_data[['src-dst', 'flows']]
    return processed_data

def process_duration_single(curr_data, g_data_type, minimal_duration, top_n_n):
    processed_data = pd.DataFrame(curr_data)
    processed_data[g_data_type] = processed_data[g_data_type].apply(time_to_seconds)
    return processed_data[(processed_data[g_data_type] >= int(minimal_duration))].groupby(DEST_IP).agg(
                    DURATION=(g_data_type, 'sum'),
                    COUNT=(DEST_IP, 'count'),
                    ).reset_index().nlargest(top_n_n, columns=g_data_type)
    
def process_by_type(curr_data, g_src_dst, g_data_type, top_n_n, single_or_pair, minimal_duration):
    if single_or_pair == "Pair":
        tmp_data = pd.DataFrame(curr_data)
        if g_data_type == "FLOWS":
            processed_data = process_flow_pair(curr_data, top_n_n)
        else:
            tmp_data['pair'] = curr_data[[SOURCE_IP, DEST_IP]].apply(lambda x: tuple(sorted(x)), axis=1)
            processed_data = curr_data.groupby('pair', as_index=False)[g_data_type].sum().nlargest(top_n_n, columns=g_data_type)

    else:
        if g_data_type == "FLOWS":
            processed_data = curr_data.groupby(g_src_dst)[g_src_dst].count().reset_index(name='flows').nlargest(top_n_n, columns='flows')
        elif g_data_type == "DURATION":
            processed_data = process_duration_single(curr_data, g_data_type, minimal_duration, top_n_n)
        else:
            processed_data = curr_data[[g_src_dst, g_data_type]].groupby(g_src_dst).sum().reset_index().nlargest(top_n_n, columns=g_data_type)
    return processed_data

def process_by_ip_range(curr_data, g_src_dst, ipv4_range_str, ipv6_range_str, single_or_pair):
    tmp_data = pd.DataFrame()

    for ipv4_subnet in ipv4_range_str:

        ipv4_range_int = ip_network(ipv4_subnet)
        ipv4_start, ipv4_end = int(ipv4_range_int[0]), int(ipv4_range_int[-1])

        #filter ipv4 adresses based on criteria
        if single_or_pair == "Pair":
            tmp_data = pd.concat([tmp_data, curr_data[(curr_data[SOURCE_IP].apply(try_convert_ip_to_int) >= ipv4_start) &
                                                 (curr_data[SOURCE_IP].apply(try_convert_ip_to_int) <= ipv4_end) &
                                                 (curr_data[DEST_IP].apply(try_convert_ip_to_int) >= ipv4_start) &
                                                 (curr_data[DEST_IP].apply(try_convert_ip_to_int) <= ipv4_end)]])

        else:
            tmp_data = pd.concat([tmp_data, curr_data.loc[curr_data[g_src_dst].apply(try_convert_ip_to_int).between(ipv4_start, ipv4_end)]])

    # for ipv6_subnet in ipv6_range_str:

    #     ipv6_range_int = IPv4Network(ipv6_subnet)
    #     ipv6_start, ipv6_end = int(ipv6_range_int[0]), int(ipv6_range_int[-1])

    #     #filter ipv6 adresses based on criteria
    #     tmp_data.from_records(data.loc[
    #         data[g_src_dst].apply(try_convert).between(ipv6_start, ipv6_end)
    #         ])
    return pd.DataFrame(tmp_data)

def process_by_time_range(curr_data, start_offset, end_offset):
    start = get_first_time_flow() + datetime.timedelta(minutes=start_offset)
    end = get_last_time_flow() + datetime.timedelta(minutes=end_offset)

    return curr_data[(curr_data["START TIME - FIRST SEEN"].apply(to_datetime) >= start) &
                      ((curr_data["START TIME - FIRST SEEN"].apply(to_datetime)) <= end)]


def process_by_protocol(curr_data, protocol):
    if protocol == "ALL":
        return curr_data

    elif (protocol == "SMTP" or protocol == "DNS"):
        #ask how to detect dns/smtp server
        return curr_data[curr_data["PROTOCOL"] == protocol]

    else:
        return curr_data[curr_data["PROTOCOL"] == protocol]

def process_by_port(curr_data, ports, g_src_dst):
    if ports == "ALL":
        return curr_data

    port_dir = "SOURCE PORT" if g_src_dst == SOURCE_IP else "DESTINATION PORT"

    tmp_data = pd.DataFrame()
    ports_list = ports.split(', ')

    for port in ports_list:
        tmp_data = pd.concat([tmp_data, curr_data[curr_data[port_dir] == int(port)]])

    return tmp_data

def get_traffic_data(g_src_dst, g_data_type, top_n, time_period,
                     ipv4_range_str, ipv6_range_str, single_or_pair,
                     protocol, ports, minimal_duration):
    processed_data = pd.DataFrame(data)
    period_start, period_end = time_period

    processed_data: pd.DataFrame = process_by_time_range(processed_data, period_start, period_end)
    processed_data: pd.DataFrame = process_by_port(processed_data, ports, g_src_dst)
    processed_data: pd.DataFrame = process_by_protocol(processed_data, protocol)
    processed_data: pd.DataFrame = process_by_ip_range(processed_data, g_src_dst, ipv4_range_str.split(','), ipv6_range_str.split(','), single_or_pair)

    processed_data: pd.DataFrame = process_by_type(processed_data, g_src_dst, g_data_type, top_n, single_or_pair, minimal_duration)
    processed_data.sort_values(processed_data.columns[1])

    if g_data_type == "DURATION":
        tooltips = [g_data_type, "COUNT"]
        selection = alt.selection_point(fields=[DEST_IP], name='DST')
    else:
        tooltips = [g_data_type]
        selection = None

    return alt.Chart(processed_data).mark_bar().encode(
        x=processed_data.columns[1],
        y=alt.Y(processed_data.columns[0], sort='-x'),
        tooltip=tooltips,
        color=alt.Color(processed_data.columns[1],
                   scale=alt.Scale(range=['lightgreen', 'green']))
    ).properties(
        width=600,
        height=1000 if top_n >= 30 else 600,
        autosize=alt.AutoSizeParams(
            type='fit',
            contains='padding'
        ),
    )
    # ).add_params(
    #     selection
    # )

def disable_button(button):
    button.button_style = ''
    button.disabled = True

def enable_button(button):
    button.button_style = 'info'
    button.disabled = False

def on_change_singel_pair(v):
    if single_or_pair_widget.value == 'Pair':
        disable_button(src_dst_widget)
    else:
        enable_button(src_dst_widget)

def on_change_duration(v):
    clear_output(wait=True)
    if data_type_widget.value == 'Duration with dst IP':
        minimal_duration_widget.disabled = False
        minimal_duration_widget.value = "5"
        disable_button(src_dst_widget)
        disable_button(single_or_pair_widget)

    else:
        minimal_duration_widget.disabled = True
        minimal_duration_widget.value = "disabled"
        enable_button(src_dst_widget)
        enable_button(single_or_pair_widget)

def display_widgets():
    for ui_widget in ui_widgets:
        display(ui_widget)

# def update_selection(event):
#     selected = event.new.get(DEST_IP, [])
#     if selected:
#         selected_value = selected[0][DEST_IP] 
#         result_pane.object = f"**Selected:** {selected_value} → {data[selected_value]}"
#     else:
#         result_pane.object = "**No selection**"

def on_change(v):

    clear_output(wait=True)

    if src_dst_widget.value == 'Received':
        g_src_dst = SOURCE_IP
    else:
        g_src_dst = DEST_IP

    if data_type_widget.value == "Bytes":
        g_data_type = "BYTES"
    elif data_type_widget.value == "Packets":
        g_data_type = "PACKETS"
    elif data_type_widget.value == "Flows":
        g_data_type = "FLOWS"
    else:
        g_data_type = "DURATION"


    graph = get_traffic_data(g_src_dst, g_data_type,
                            int(top_n_widget.value), period_widget.value,
                            ipv4_range_widget.value,ipv6_range_widget.value,
                            single_or_pair_widget.value, protocol_widget.value,
                            port_widget.value, minimal_duration_widget.value)

    display_widgets()
    display(graph)


single_or_pair_widget.on_trait_change(on_change_singel_pair)
data_type_widget.on_trait_change(on_change_duration)
apply_changes_widget.on_click(on_change)
display_widgets()

