In [None]:
!pip install "dask[complete]"
!pip install networkx
!pip install matplotlib
!pip install torch_geometric
!pip install pyg_lib torch_scatter torch_sparse torch_cluster torch_spline_conv -f https://data.pyg.org/whl/torch-2.6.0+cpu.html
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu
!pip install pymongo

In [None]:
import dask.dataframe as dd
from dask.diagnostics import ProgressBar

# Load LANL
with ProgressBar():
    print("Loading auth logs")
    authLogs = dd.read_csv(
        "/data/LANL/auth.txt",
        names=[
            "time",
            "source user",
            "destination user",
            "source computer",
            "destination computer",
            "authentication type",
            "logon type",
            "authentication orientation",
            "success/failure"
        ],
        header=None
    )

    print("Loading redteam logs")
    redLogs = dd.read_csv("/data/LANL/redteam.txt")

In [None]:
def apply_auth(row):
    row["_index"] = "index"
    row["timestamp"] = "hello"
    row["file"] = "auth"
    return row

s = authLogs.apply(apply_auth, axis=1, meta={'time': 'int64', 'source user': 'object', 'destination user': 'object', 'source computer': 'object', 'destination computer': 'object', 'authentication type': 'object', 'logon type': 'object', 'authentication orientation': 'object', 'success/failure': 'object', '_index': 'object', 'timestamp': 'object', 'file': 'object'})

In [None]:
import pandas as pd

data = {
    "Column1": [1, 2],
    "Column2": ["A", "B"],
    "Column3": [True, False]
}

df = pd.DataFrame(data)
addition = {
    "_index": "Hello",
    "_source": "HJello"
}
for row in df.to_dict(orient="records"):
    for key, value in addition.items():
        row[key] = value
    print(row)


In [None]:
from elasticsearch import Elasticsearch, helpers

# Connect to Elasticsearch
es = Elasticsearch("https://localhost:9200", 
                   api_key="MGZGTDhaVUJHWEpfZm5CYVB1bXo6dXBxVk5ucF9Rc3F6dWh5RjVRVDQzUQ==", 
                   verify_certs=False, 
                   ssl_show_warn=False,
                   headers={"Content-Type": "application/json", "Accept": "application/json"})


In [None]:
class ElasticDataFetcher():
    """
    A generator-like class that retrieves all records from an Elasticsearch index
    and supports the len() function to get the total number of matching records.

    Args:
        es (Elasticsearch): The Elasticsearch client instance.
        index_name (str): The name of the Elasticsearch index.
        query (dict): The query to filter records.
        pagination (int, optional): The number of records to fetch per request. Defaults to 10000.
    """
    def __init__(self, es, index_name, query, pagination=10000):
        self.es = es
        self.index_name = index_name
        self.query = query
        self.pagination = pagination
        self.matchcount = None # The count of matching records
        self._total_count = None  # Cache the total count of matching records

    def __iter__(self):
        search_after = None  # Used for pagination to fetch the next set of records.

        while True:
            # Perform a search request with pagination and sorting.
            resp = self.es.search(
                index=self.index_name,
                query=self.query,
                size=self.pagination,
                search_after=search_after,
                sort=[
                    {"timestamp": "asc"},  # Sort by datetime in ascending order.
                ],
            )

            # If no records are returned, exit the loop.
            if len(resp.body["hits"]["hits"]) == 0:
                return

            # Update the search_after value for the next request.
            search_after = resp.body["hits"]["hits"][-1]["sort"]

            # Yield each record's source data.
            for record in resp.body["hits"]["hits"]:
                yield record["_source"]
    
    def __len__(self):
        # Fetch the length if not previously fetched
        if self.matchcount == None:
            resp = self.es.count(
                index=self.index_name,
                query=self.query
            )
            self.matchcount = resp.body["count"]
        
        return self.matchcount
    
query = {
    "range": {
        "time": {
            "gte": 0,
            "lt": 10
        }
    }
}
data_fetcher = ElasticDataFetcher(es, "lanl", query)

In [None]:
from elasticsearch import Elasticsearch
import torch
import pandas as pd
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.data import Data
from torch_geometric.nn import GCNConv

def pd_to_graph(auth_df, redteam_df):
    srcs = []
    dests = []
    node_features = []
    edge_attr = []
    node_map = {}
    y = []

    # Merge the auth and redteam df
    # time,user@domain,source computer,destination computer
    auth_df = auth_df.merge(
        redteam_df.assign(is_malicious=1),
        on=['time', "source computer", 'destination computer'],
        how='left'
    )
    auth_df['is_malicious'] = auth_df['is_malicious'].fillna(0).astype(int)

    for _, row in auth_df.iterrows():
        src, dest = row['source computer'], row['destination computer']
        if src not in node_map:
            node_map[src] = len(node_map)
        if dest not in node_map:
            node_map[dest] = len(node_map)
        srcs.append(node_map[src])
        dests.append(node_map[dest])
        edge_attr.append([])
        y.append(row["is_malicious"])
        
    node_features = [k for k, v in sorted(node_map.items(), key=lambda item: item[1])]
    return Data(
        x=torch.zeros(3),
        edge_index=torch.tensor([srcs, dests], dtype=torch.long),
        edge_attr=torch.tensor(edge_attr),
        y=y
    )

def pd_to_temporal(df_auth, df_redteam, start_time, end_time, step):
    temporal = []
    for time in range(start_time, end_time, step):
        auth = df_auth[df_auth["time"] == time]
        redteam = df_redteam[df_redteam["time"] == time]
        temporal.append(pd_to_graph(auth, redteam))
    return temporal


es = Elasticsearch("https://localhost:9200", 
                   api_key="MGZGTDhaVUJHWEpfZm5CYVB1bXo6dXBxVk5ucF9Rc3F6dWh5RjVRVDQzUQ==", 
                   verify_certs=False, 
                   ssl_show_warn=False,)
query = {
    "range": {
        "time": {
            "gte": 1758220,
            "lt": 1758230
        }
    }
}
data_fetcher = ElasticDataFetcher(es, "lanl", query)
df = pd.DataFrame(data_fetcher)

In [None]:
auth_df = df[df["file"] == "auth"]
redteam_df = df[df["file"] == "redteam"]

data = pd_to_temporal(auth_df, redteam_df, 1758220, 1758230, 1)

In [None]:
import networkx as nx
import torch_geometric

g = torch_geometric.utils.to_networkx(data[6], to_undirected=True)
nx.draw(g)

In [None]:
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.data import Data
from torch_geometric.nn import GCNConv
from torch_geometric.loader import DataLoader
from elasticsearch import Elasticsearch

class GNNEncoder(nn.Module):
    def __init__(self, in_channels, hidden_channels):
        super().__init__()
        self.conv1 = GCNConv(in_channels, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.conv3 = GCNConv(hidden_channels, hidden_channels)

    def forward(self, x, edge_index):
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        x = F.relu(x)
        x = self.conv3(x, edge_index)
        return x  # node embeddings
    
class RNNEncoder(nn.Module):
    def __init__(self, hidden_channels, num_layers=1):
        super().__init__()
        self.rnn = nn.GRU(hidden_channels, hidden_channels, num_layers=num_layers,  batch_first=True)

    def forward(self, x_seq):
        # x_seq shape: [num_nodes, sequence_length, hidden_channels]
        output, final_state = self.rnn(x_seq)
        return output, final_state  # optional depending on decoder
    
class DotProductDecoder(nn.Module):
    def forward(self, z_src, z_dst):
        return (z_src * z_dst).sum(dim=1)
    
class FullModel(nn.Module):
    def __init__(self, in_channels, hidden_channels):
        super().__init__()
        self.gnn = GNNEncoder(in_channels, hidden_channels)
        self.rnn = RNNEncoder(hidden_channels)
        self.decoder = DotProductDecoder()

    def forward(self, graph_sequence, edge_pairs):
        # Generate the features at each timestamp
        # shape (timestamp, nodes, features)
        graph_features = [self.gnn(data.x, data.edge_index) for data in graph_sequence]

        # Switch the order, so that each node is treated as a batch
        # shape (nodes, timestamp, features)
        graph_features = torch.stack(graph_features)
        graph_features = graph_features.permute(1, 0, 2)
        rnn_output, _ = self.rnn(graph_features)

        # Get the embeddings at time t+1
        # shape (nodes, timestamp (t+1), features)
        predicted_embeddings = rnn_output[:, -1, :]  # use last time step

        # Generate the scores for 
        src = edge_pairs[0]
        dst = edge_pairs[1]
        scores = self.decoder(predicted_embeddings[src], predicted_embeddings[dst])  # shape [num_edges]

        return scores

In [None]:
import torch
from utils.elastic_datafetcher import ElasticDataFetcher, LANL_Data
from torch_geometric.loader import DataLoader
from torch_geometric.utils import to_undirected, negative_sampling

model=FullModel(3, 15)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.BCEWithLogitsLoss()

es = Elasticsearch("https://localhost:9200", 
                   api_key="MGZGTDhaVUJHWEpfZm5CYVB1bXo6dXBxVk5ucF9Rc3F6dWh5RjVRVDQzUQ==", 
                   verify_certs=False, 
                   ssl_show_warn=False,)
lanl_fetcher = LANL_Data(es, "lanl-auth")

lanl_fetcher.get_nodemap()
data = lanl_fetcher.fetch(0, 86400)

In [None]:
data.snapshot(0, 604800)

In [None]:


batches = [([data.snapshot(i+t, i+t) for t in range (4)], data.snapshot(i+4, i+4)) for i in range(1758200, 1758300-5, 5)]
for (graph_seq, y_graph) in batches:
    positive_edges = y_graph.edge_index
    negative_edges = negative_sampling(
        edge_index=y_graph.edge_index,
        num_nodes=y_graph.num_nodes,
        num_neg_samples=positive_edges.size(1)
    )

    edge_pairs = torch.cat([positive_edges, negative_edges], dim=1)  # shape [2, num_edges]
    labels = torch.cat([
        torch.ones(positive_edges.size(1)),   # label 1 for real edges
        torch.zeros(negative_edges.size(1))   # label 0 for fake edges
    ])
    scores = model(graph_seq, edge_pairs)
    loss = loss_fn(scores, labels)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    

In [None]:
data[0]

In [None]:
# from elasticsearch import Elasticsearch
# import time

# old_index = "lanl"
# new_index = "lanl-auth"

# # Step 1: Define the correct mapping
# new_mapping = {
#     "mappings": {
#         "properties": {
#             "time": {"type": "long"},
#             "timestamp": {"type": "date"},
#             "source computer": {"type": "keyword"},
#             "destination computer": {"type": "keyword"},
#             "source user@domain": {"type": "keyword"},
#             "destination user@domain": {"type": "keyword"},
#             "authentication type": {"type": "keyword"},
#             "logon type": {"type": "keyword"},
#             "authentication orientation": {"type": "keyword"},
#             "success/failure": {"type": "keyword"},
#             "file": {"type": "keyword"}
#         }
#     }
# }

# # Step 2: Create the new index with the mapping
# if not es.indices.exists(index=new_index):
#     es.indices.create(index=new_index, body=new_mapping)
#     print(f"✅ Created index: {new_index}")
# else:
#     print(f"⚠️ Index '{new_index}' already exists — skipping creation")

# # Step 3: Reindex asynchronously to track progress
# reindex_body = {
#     "source": {"index": old_index},
#     "dest": {"index": new_index}
# }
# response = es.reindex(body=reindex_body, wait_for_completion=False)
# task_id = response["task"]
# print(f"🚀 Reindex started. Task ID: {task_id}")


In [None]:
from tqdm import tqdm
from utils.elastic_datafetcher import LANLGraphFetcher
from elasticsearch import Elasticsearch
import logging

logging.basicConfig(
    level=logging.WARNING,  # Only show WARNING and above
    format="%(levelname)s: %(message)s"
)
logging.getLogger("elastic_transport.transport").setLevel(logging.WARNING)

es = Elasticsearch("http://localhost:9200", 
                   api_key="MGZGTDhaVUJHWEpfZm5CYVB1bXo6dXBxVk5ucF9Rc3F6dWh5RjVRVDQzUQ==", 
                   verify_certs=False, 
                   ssl_show_warn=False,)
graph_data = LANLGraphFetcher(es, ("lanl-auth", "lanl-redteam") , 0, 1000, prefetch=True)

all_data = []
for data in tqdm(graph_data.grouped_by_second(), "Loading graph data", graph_data.to_sec-graph_data.from_sec):
    all_data.append(data)
all_data

In [None]:
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

# Parameters
tsv_path = "/data/LANL/auth.txt"    # path to the input CSV file
out_path = "/data/LANL/auth.parquet"  # path for the output Parquet file
chunksize = 100000              # number of rows per batch

# Ensure output directory exists
o_dir = os.path.dirname(out_path)
if o_dir and not os.path.exists(o_dir):
    os.makedirs(o_dir)

writer = None

# Iterate over CSV in chunks
enumerate_offset = 0
for i, df_chunk in enumerate(pd.read_csv(tsv_path, chunksize=chunksize)):
    # Convert pandas DataFrame chunk to Arrow Table
    table = pa.Table.from_pandas(df_chunk)

    # Initialize ParquetWriter with schema from first chunk\    
    if writer is None:
        writer = pq.ParquetWriter(out_path, table.schema)

    # Write batch to Parquet file
    writer.write_table(table)
    print(f"Wrote chunk {i + 1} (rows {i * chunksize}–{(i + 1) * chunksize})")

# Close the writer when done
if writer is not None:
    writer.close()
    print(f"Parquet file saved to: {out_path}")
else:
    print("No data was written; CSV may be empty.")

# Usage: 
# Once converted, load with pandas for fast querying:
# df = pd.read_parquet(out_path)
# e.g., df[df['column'] == 'value']


In [None]:
import pandas as pd
out_path = "/data/LANL/auth.parquet"  # path for the output Parquet file

df = pd.read_parquet(out_path)
df

In [None]:

from elasticsearch import Elasticsearch, helpers
import pandas as pd
from datetime import datetime, timedelta
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
logger = logging.getLogger(__name__)

# Index name and CSV file path
INDEX_NAME = "lanl-redteam"
CSV_FILE_PATH = "/data/LANL/redteam.txt"
REDTEAM_FILE_HEADERS = [
    "time", 
    "user@domain", 
    "source computer", 
    "destination computer"
]
start_of_2015 = datetime(2015, 1, 1)

# Step 1: Delete all documents in the index
def delete_all_documents(index):
    if es.indices.exists(index=index):
        logger.info(f"Deleting all documents from index: {index}")
        es.delete_by_query(
            index=index,
            body={"query": {"match_all": {}}},
            refresh=True,
            conflicts="proceed"
        )
    else:
        logger.warning(f"Index {index} does not exist. Creating it.")
        es.indices.create(index=index)

# Step 2: Load CSV
def read_csv(path):
    logger.info(f"Reading CSV from: {path}")
    return pd.read_csv(path, names=REDTEAM_FILE_HEADERS)

# Step 3: Push documents to Elasticsearch
def index_csv(df, index):
    logger.info(f"Indexing {len(df)} documents into index: {index}")

    def generate_actions():
        for row in df.to_dict(orient="records"):
            # Add custom fields
            row["timestamp"] = start_of_2015 + timedelta(seconds=row["time"])
            row["file"] = "auth"

            yield {
                "_index": index,
                "_source": row
            }

    helpers.bulk(es, generate_actions())
    logger.info("Indexing complete.")

# Execute steps
delete_all_documents(INDEX_NAME)
df = read_csv(CSV_FILE_PATH)
index_csv(df, INDEX_NAME)



In [None]:
from tqdm import tqdm
from utils.elastic_datafetcher import LANLGraphFetcher
from elasticsearch import Elasticsearch
import logging

logging.basicConfig(level=logging.DEBUG, format="%(levelname)s: %(message)s")
logger = logging.getLogger(__name__)

es = Elasticsearch("http://localhost:9200",
                   api_key="MGZGTDhaVUJHWEpfZm5CYVB1bXo6dXBxVk5ucF9Rc3F6dWh5RjVRVDQzUQ==",
                   verify_certs=False,
                   ssl_show_warn=False,)
graph_data = LANLGraphFetcher(es, ("lanl-auth", "lanl-redteam") , 0, 5, prefetch=True)

for data in tqdm(graph_data, "Loading graph data"):
    print(".", end="")

for data in tqdm(graph_data, "Loading graph data"):
    print(".", end="")

print("Done")

Loading graph data: 100%|██████████| 5/5 [00:00<00:00, 77.16it/s]


.....

Loading graph data: 100%|██████████| 5/5 [00:00<00:00, 78.53it/s]

.....Done





In [None]:
list(b)[:-1]

In [None]:
import torch
from torch_geometric.data import Data
from torch_geometric.transforms import RemoveSelfLoops
from typing import List, Union

from torch_geometric.data import Data, HeteroData
from torch_geometric.data.datapipes import functional_transform
from torch_geometric.transforms import BaseTransform
from torch_geometric.utils import coalesce


class RemoveDuplicatedEdgesTemporal(BaseTransform):
    r"""Removes duplicated edges from a given homogeneous or heterogeneous
    graph. Useful to clean-up known repeated edges/self-loops in common
    benchmark datasets, *e.g.*, in :obj:`ogbn-products`.
    (functional name: :obj:`remove_duplicated_edges`).

    Args:
        key (str or [str], optional): The name of edge attribute(s) to merge in
            case of duplication. (default: :obj:`["edge_weight", "edge_attr"]`)
        reduce (str, optional): The reduce operation to use for merging edge
            attributes (:obj:`"add"`, :obj:`"mean"`, :obj:`"min"`,
            :obj:`"max"`, :obj:`"mul"`). (default: :obj:`"add"`)
    """
    def __init__(
        self,
        key: Union[str, List[str]] = ['edge_attr', 'edge_weight'],
        reduce: str = "add",
    ) -> None:
        if isinstance(key, str):
            key = [key]

        self.keys = key
        self.reduce = reduce

    def forward(
        self,
        data: Union[Data, HeteroData],
    ) -> Union[Data, HeteroData]:

        for store in data.edge_stores:
            keys = [key for key in self.keys if key in store]
            print(store.keys())

            size = [s for s in store.size() if s is not None]
            num_nodes = max(size) if len(size) > 0 else None

            store.edge_index, edge_attrs = coalesce(
                edge_index=store.edge_index,
                edge_attr=[store[key] for key in keys],
                num_nodes=num_nodes,
                reduce=self.reduce,
            )

            for key, edge_attr in zip(keys, edge_attrs):
                store[key] = edge_attr

        return data

data = Data(
    edge_index=torch.tensor([
        [1,2,2],
        [2,2,2],
    ]),
    time=torch.Tensor([1,1,0])
)
print(f"Data before transformation has {data.num_edges} edges and {len(data.time)} time steps")
removeLoops = RemoveDuplicatedEdgesTemporal(key=["edge_index"])
data = removeLoops(data)
print(f"Data after transformation has {data.num_edges} edges and {len(data.time)} time steps")

Data before transformation has 3 edges and 3 time steps
KeysView({'edge_index': tensor([[1, 2, 2],
        [2, 2, 2]]), 'time': tensor([1., 1., 0.])})


IndexError: index 2 is out of bounds for dimension 0 with size 2

In [11]:
data.num_edges

2