<a href="https://colab.research.google.com/github/deenyse/VSB_ZSU/blob/main/Copy_of_UASS_7.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Ukázka generování sítě z XML dat

In [None]:
import xml.etree.ElementTree as ET
import networkx as nx
from collections import defaultdict
from datetime import datetime
from typing import List, Dict, Any


def parse_xml(filepath: str):
    tree = ET.parse(filepath)
    root = tree.getroot()

    accounts = {}
    for acc in root.find("Accounts"):
        addr = acc.attrib.get("smtpAddress")
        if addr:
            accounts[addr] = acc.attrib

    messages = []
    for msg in root.findall("Message"):
        sender = msg.attrib.get("Sender")
        sent_raw = msg.attrib.get("Sent")
        sent = datetime.fromisoformat(sent_raw.replace("Z", "+00:00")) if sent_raw else None

        recipients = [
            {"type": r.attrib.get("Type", "To"), "email": r.text.strip()}
            for r in msg.findall("Recipient") if r.text
        ]

        msg_data = {
            "sender": sender,
            "sent": sent,
            "recipients": recipients,
            "attributes": msg.attrib.copy()
        }
        messages.append(msg_data)

    return accounts, messages


def build_graph(accounts: Dict[str, Dict[str, str]], messages: List[Dict[str, Any]]) -> nx.DiGraph:
    G = nx.DiGraph()
    edge_weights = defaultdict(int)

    for addr, attrs in accounts.items():
        domain = addr.split("@")[-1] if addr else None
        G.add_node(addr, domain=domain, **attrs)

    for msg in messages:
        sender = msg["sender"]
        if sender not in accounts:
            continue

        sent = msg["sent"]
        msg_attrs = msg["attributes"]

        for rec in msg["recipients"]:
            recipient = rec["email"]
            edge_weights[(sender, recipient)] += 1

            if not G.has_edge(sender, recipient):
                G.add_edge(sender, recipient,
                           weight=1,
                           first_date=sent,
                           last_date=sent,
                           first_year=sent.year if sent else None,
                           last_year=sent.year if sent else None,
                           messages=[msg_attrs])
            else:
                edge = G[sender][recipient]
                edge["weight"] += 1
                edge["messages"].append(msg_attrs)

                if sent:
                    if "last_date" not in edge or sent > edge["last_date"]:
                        edge["last_date"] = sent
                        edge["last_year"] = sent.year
                    if "first_date" not in edge or sent < edge["first_date"]:
                        edge["first_date"] = sent
                        edge["first_year"] = sent.year

    return G

def create_email_ego_subgraph(G, ego_email: str, distance: int = 1) -> nx.DiGraph:
  # TODO: Doplnit export ego site pro zvoleny email https://networkx.org/documentation/stable/reference/generated/networkx.generators.ego.ego_graph.html
  return None

def export_graph(G: nx.DiGraph, path="email_network_weighted.gexf"):
    H = G.copy()
    for u, v, data in H.edges(data=True):
        if isinstance(data.get("messages"), list):
            data["message_count"] = len(data["messages"])
            del data["messages"]

        for k in ["first_date", "last_date"]:
            if isinstance(data.get(k), datetime):
                data[k] = data[k].isoformat()

    nx.write_gexf(H, path)
    print(f"Graph exported to {path}")

accounts, messages = parse_xml("teamnet_anonymized.xml")
G = build_graph(accounts, messages)

ego_net = create_email_ego_subgraph(G, "", 1)
export_graph(ego_net, "ego_network.gexf")

Výpis atributů

In [None]:
def print_node_attributes(G: nx.DiGraph, limit: [int] = 10):
    for i, (node, data) in enumerate(G.nodes(data=True)):
        print(f"{i+1}. {node}: {data}")
        if limit and i + 1 >= limit:
            break

print_node_attributes(G)

def print_edge_attributes(G: nx.DiGraph, limit: [int] = 10):
    for i, (u, v, data) in enumerate(G.edges(data=True)):
        print(f"{i+1}. {u} → {v}: {data}")
        if limit and i + 1 >= limit:
            break

print_edge_attributes(G)

1. name00001@domain0001.com: {'domain': 'domain0001.com', 'smtpAddress': 'name00001@domain0001.com'}
2. name00002@domain0002.cz: {'domain': 'domain0002.cz', 'smtpAddress': 'name00002@domain0002.cz'}
3. name00003@domain0003.cz: {'domain': 'domain0003.cz', 'smtpAddress': 'name00003@domain0003.cz'}
4. name00004@domain0004.cz: {'domain': 'domain0004.cz', 'smtpAddress': 'name00004@domain0004.cz'}
5. name00005@domain0005.cz: {'domain': 'domain0005.cz', 'smtpAddress': 'name00005@domain0005.cz'}
6. name00006@domain0004.cz: {'domain': 'domain0004.cz', 'smtpAddress': 'name00006@domain0004.cz'}
7. name00007@domain0004.cz: {'domain': 'domain0004.cz', 'smtpAddress': 'name00007@domain0004.cz'}
8. name00008@domain0004.cz: {'domain': 'domain0004.cz', 'smtpAddress': 'name00008@domain0004.cz'}
9. name00009@domain0004.cz: {'domain': 'domain0004.cz', 'smtpAddress': 'name00009@domain0004.cz'}
10. name00010@domain0004.cz: {'domain': 'domain0004.cz', 'smtpAddress': 'name00010@domain0004.cz'}
1. name00001@dom

Ukázka jak vyfiltrovat vrcholy a hrany podle parametru

In [None]:
nodes = []
x = 3
attr = ''
for n, data in G.nodes(data=True):
    if data.get(attr) is not None and data[attr] >= x:
        nodes.append(n)

edges = []
y = 1
for s, t, data in G.edges(data=True):
    if data.get("weight") is not None and data["weight"] >= y:
        edges.append((s, t))

H = nx.Graph()
for n in nodes:
    H.add_node(n, **G.nodes[n])

for s, t in edges:
    if s in H and t in H:
        H.add_edge(s, t, **G[s][t])

def create_x(G, atrributes, distance:int = 1):
  # TODO: Zpracovat maily pouze z malého časového období (1 rok/měsíc)
  # Seznam domén a vzít úvahu pouze maily, kde je odesílatelem někdo z domény v tomto seznamu.
  pass