# Prompt

You are tasked with generating Python scripts that perform real-time BGP analysis using the pybgpstream library. Please adhere to the following guidelines when writing the code:

- Script Structure
Include a __main__ block or a usage example to demonstrate how to run the script.
Implement time-based stop triggers to gracefully stop data collection and processing after a specified duration.
Use Separate Time Variables for Collection Duration and Metrics Interval
Collection Duration Tracking:
Use collection_start_time to track the total duration of data collection.
Metrics Interval Tracking:
Use a separate interval_start_time to track intervals for periodic tasks like printing metrics.
Do not reset collection_start_time inside loops, as it affects the stop condition.

- Key Processing Guidelines
Initialize BGPStream Without Filters:
stream = pybgpstream.BGPStream(
    project="ris-live",
    record_type="updates",
)
Implement Time-Based Stop Triggers:
import time

collection_start_time = time.time()
interval_start_time = collection_start_time
COLLECTION_DURATION = 300

Checking for Stop Conditions Within the Loop:
while True:
    # Check if the total collection duration has been exceeded
    if time.time() - collection_start_time >= COLLECTION_DURATION:
        break

    for rec in stream.records():
        for elem in rec:
            Processing logic goes here
            Check if the collection duration has been exceeded inside nested loops
            if time.time() - collection_start_time >= COLLECTION_DURATION:
                break
        else:
            continue
        break
Graceful Shutdown:
Ensure that the script can be stopped gracefully after the specified duration or when a stop event is triggered.
Clean up resources, close streams, and terminate processes properly.

- Main Loop Processing
Do not use any filter attributes like stream.add_filter() or set filter parameters when initializing BGPStream.
All filtering and processing should occur within the main loop where you iterate over records and elements.
for rec in stream.records():
    for elem in rec:
        Processing logic goes here

Handling Potential Blocking in Data Streams:
Be aware that data streams may block if no new data is received.
Implement mechanisms to periodically check stop conditions even when data is not being received.
Consider using timeouts, non-blocking iterators, or running data collection in a separate thread or process.

- Accessing Element Attributes
Timestamp:
from datetime import datetime

elem_time = datetime.utcfromtimestamp(elem.time)
elem_type = elem.type  'A' for announcements, 'W' for withdrawals

Fields Dictionary:
fields = elem.fields

Prefix:
prefix = fields.get("prefix")
if prefix is None:
    continue

AS Path:
as_path_str = fields.get('as-path', "")
as_path = as_path_str.split()

Peer ASN and Collector:
peer_asn = elem.peer_asn
collector = rec.collector

Communities:
communities = fields.get('communities', [])

Validating and Parsing IP Prefixes:
import ipaddress
try:
    network = ipaddress.ip_network(prefix)
except ValueError:
    continue

Filtering Logic Within the Loop
Filtering for a Specific ASN in AS Path:
target_asn = '3356'
if target_asn not in as_path:
    continue

Filtering for Specific Prefixes:
target_prefixes = ['192.0.2.0/24', '198.51.100.0/24']
if prefix not in target_prefixes:
    continue

- Processing Key Values and Attributes
Counting Announcements and Withdrawals:
from collections import defaultdict
announcements = defaultdict(int)
withdrawals = defaultdict(int)

if elem_type == 'A':
    announcements[prefix] += 1
elif elem_type == 'W':
    withdrawals[prefix] += 1

Detecting AS Path Changes:
prefix_as_paths = {}

if prefix in prefix_as_paths:
    if as_path != prefix_as_paths[prefix]:
        # AS path has changed
        # Handle AS path change
        prefix_as_paths[prefix] = as_path
else:
    prefix_as_paths[prefix] = as_path


- Analyzing Community Attributes:
community_counts = defaultdict(int)

for community in communities:
    community_str = f"{community[0]}:{community[1]}"
    community_counts[community_str] += 1

- Calculating Statistics (e.g., Average MED):
med_values = []

med = fields.get('med')
if med is not None:
    try:
        med_values.append(int(med))
    except ValueError:
        pass

Calculate average MED
if med_values:
    average_med = sum(med_values) / len(med_values)


- Anomaly Detection Tasks
Implement functions or logic within the main loop to detect:
Hijacks:
Compare the observed origin AS with the expected origin AS for target prefixes.
Example:

expected_origins = {'192.0.2.0/24': '64500', '198.51.100.0/24': '64501'}
observed_origin = as_path[-1] if as_path else None
expected_origin = expected_origins.get(prefix)
if expected_origin and observed_origin != expected_origin:
    print(f"Possible hijack detected for {prefix}: expected {expected_origin}, observed {observed_origin}")

Outages:
Monitor for sustained withdrawals of prefixes without re-announcements.
Keep track of withdrawn prefixes and their timestamps.
Example:

from datetime import datetime, timedelta

withdrawals_timestamps = {}

if elem_type == 'W':
    withdrawals_timestamps[prefix] = datetime.utcnow()
elif elem_type == 'A' and prefix in withdrawals_timestamps:
    del withdrawals_timestamps[prefix]

outage_threshold = timedelta(minutes=30)
current_time = datetime.utcnow()
for prefix, withdrawal_time in list(withdrawals_timestamps.items()):
    if current_time - withdrawal_time > outage_threshold:
        # Outage detected
        print(f"Outage detected for {prefix} since {withdrawal_time}")
        del withdrawals_timestamps[prefix]


MOAS (Multiple Origin AS) Conflicts:
Monitor prefixes announced by multiple origin ASNs.
Example:
prefix_origins = defaultdict(set)

origin_asn = as_path[-1] if as_path else None
if origin_asn:
    prefix_origins[prefix].add(origin_asn)
    if len(prefix_origins[prefix]) > 1:
        origins = ', '.join(prefix_origins[prefix])
        print(f"MOAS conflict for {prefix}: announced by ASNs {origins}")

AS Path Prepending:
Detect AS path prepending by identifying consecutive repeated ASNs in the AS path.
Example:
prepending_counts = defaultdict(int)
last_asn = None
consecutive_count = 1

for asn in as_path:
    if asn == last_asn:
        consecutive_count += 1
    else:
        if consecutive_count > 1:
            prepending_counts[last_asn] += consecutive_count - 1
        consecutive_count = 1
    last_asn = asn

Handle the last ASN in the path
if consecutive_count > 1 and last_asn:
    prepending_counts[last_asn] += consecutive_count - 1

Report ASes performing prepending
for asn, count in prepending_counts.items():
    print(f"ASN {asn} prepended {count} times")

Here is your task:

# 1. Monitor Current Trends
Track real-time AS-level trends over period of 1 minute for AS3356. Summarize metrics like announcements, withdrawals, unique prefixes, and top AS paths by time interval.

In [4]:
import time
from datetime import datetime
from collections import defaultdict
import ipaddress
import pybgpstream

# Define constants
COLLECTION_DURATION = 60  # Total duration for data collection (in seconds)
INTERVAL_DURATION = 10     # Metrics interval duration (in seconds)

def main():
    # Initialize BGPStream
    stream = pybgpstream.BGPStream(project="ris-live", record_type="updates")

    # Initialize tracking variables
    collection_start_time = time.time()
    interval_start_time = collection_start_time

    # Metrics storage
    announcements = defaultdict(int)
    withdrawals = defaultdict(int)
    unique_prefixes = set()
    prefix_as_paths = defaultdict(list)

    while True:
        # Check if the total collection duration has been exceeded
        if time.time() - collection_start_time >= COLLECTION_DURATION:
            break

        for rec in stream.records():
            for elem in rec:
                # Access element attributes
                elem_time = datetime.utcfromtimestamp(elem.time)
                elem_type = elem.type  # 'A' for announcements, 'W' for withdrawals
                fields = elem.fields
                
                # Get prefix and validate
                prefix = fields.get("prefix")
                if prefix is None:
                    continue

                # Validate and parse IP Prefix
                try:
                    network = ipaddress.ip_network(prefix)
                except ValueError:
                    continue

                # Track unique prefixes
                unique_prefixes.add(prefix)

                # Count announcements and withdrawals
                if elem_type == 'A':
                    announcements[prefix] += 1
                    as_path_str = fields.get('as-path', "")
                    as_path = as_path_str.split()
                    prefix_as_paths[prefix].append(as_path)

                elif elem_type == 'W':
                    withdrawals[prefix] += 1

                # Check for stop conditions within nested loops
                if time.time() - collection_start_time >= COLLECTION_DURATION:
                    break
            else:
                continue
            break

        # Print metrics at the end of each interval
        if time.time() - interval_start_time >= INTERVAL_DURATION:
            print_metrics(announcements, withdrawals, unique_prefixes, prefix_as_paths)
            interval_start_time = time.time()  # Reset interval timer

    # Final metrics after collection ends
    print_metrics(announcements, withdrawals, unique_prefixes, prefix_as_paths)

def print_metrics(announcements, withdrawals, unique_prefixes, prefix_as_paths):
    print("\nMetrics Summary:")
    print(f"Total Announcements: {sum(announcements.values())}")
    print(f"Total Withdrawals: {sum(withdrawals.values())}")
    print(f"Unique Prefixes: {len(unique_prefixes)}")

    # Calculate and display top AS paths
    as_path_counts = defaultdict(int)
    for as_paths in prefix_as_paths.values():
        for as_path in as_paths:
            as_path_str = "->".join(as_path)
            as_path_counts[as_path_str] += 1

    top_as_paths = sorted(as_path_counts.items(), key=lambda x: x[1], reverse=True)[:5]
    print("Top AS Paths:")
    for path, count in top_as_paths:
        print(f"Path: {path}, Count: {count}")

if __name__ == "__main__":
    main()


Metrics Summary:
Total Announcements: 15766
Total Withdrawals: 502
Unique Prefixes: 3077
Top AS Paths:
Path: 211509->48752->213045->34549->16509->14618, Count: 618
Path: 211509->48752->29632->9002->16509->14618, Count: 568
Path: 48362->1299->6453->4755, Count: 292
Path: 35598->174->6453->4755, Count: 231
Path: 62167->3356->6453->4755, Count: 227

Metrics Summary:
Total Announcements: 15766
Total Withdrawals: 502
Unique Prefixes: 3077
Top AS Paths:
Path: 211509->48752->213045->34549->16509->14618, Count: 618
Path: 211509->48752->29632->9002->16509->14618, Count: 568
Path: 48362->1299->6453->4755, Count: 292
Path: 35598->174->6453->4755, Count: 231
Path: 62167->3356->6453->4755, Count: 227


In [3]:
import pybgpstream
from datetime import datetime, timedelta
from collections import defaultdict
import time

# Constants
TARGET_ASN = '3356'
COLLECTION_DURATION = 60  # Duration in seconds (e.g., 5 minutes)

def main():
    # Initialize BGPStream
    stream = pybgpstream.BGPStream(
        project="ris-live",
        record_type="updates",
    )

    # Start time tracking
    collection_start_time = time.time()
    interval_start_time = collection_start_time
    announcements = defaultdict(int)
    withdrawals = defaultdict(int)
    unique_prefixes = set()
    prefix_as_paths = defaultdict(list)

    # Main loop for real-time processing
    while True:
        # Check for time-based stop condition
        if time.time() - collection_start_time >= COLLECTION_DURATION:
            break
        
        for rec in stream.records():
            for elem in rec:
                try:
                    # Access element attributes
                    elem_time = datetime.utcfromtimestamp(elem.time)
                    elem_type = elem.type  # 'A' for announcements, 'W' for withdrawals
                    fields = elem.fields

                    # Prefix and AS Path handling
                    prefix = fields.get("prefix")
                    if prefix is None:
                        continue

                    as_path_str = fields.get('as-path', "")
                    as_path = as_path_str.strip().split()

                    # Filter for specific ASN in AS Path
                    if TARGET_ASN not in as_path:
                        continue  # Skip this element if target ASN is not in the AS path

                    # Proceed with processing after filtering
                    peer_asn = elem.peer_asn
                    collector = rec.collector
                    communities = fields.get('communities', [])

                    # Update counts for announcements and withdrawals
                    if elem_type == 'A':
                        announcements[prefix] += 1
                        unique_prefixes.add(prefix)
                        prefix_as_paths[prefix].append(as_path)
                    elif elem_type == 'W':
                        withdrawals[prefix] += 1

                    # Print metrics every minute
                    if time.time() - interval_start_time >= 60:
                        print_metrics(announcements, withdrawals, unique_prefixes, prefix_as_paths)
                        interval_start_time = time.time()  # Reset interval start time

                except Exception as e:
                    print(f"Error processing element: {e}")
                    continue  # Continue with the next element

            # Optional: Handle case where no data is received
            if time.time() - collection_start_time >= COLLECTION_DURATION:
                break

    # Final print of metrics after the main loop ends
    print_metrics(announcements, withdrawals, unique_prefixes, prefix_as_paths)

def print_metrics(announcements, withdrawals, unique_prefixes, prefix_as_paths):
    print(f"\nMetrics Summary:")
    print(f"Total Announcements: {sum(announcements.values())}")
    print(f"Total Withdrawals: {sum(withdrawals.values())}")
    print(f"Unique Prefixes: {len(unique_prefixes)}")

    # Calculate and print top AS paths
    top_paths = defaultdict(int)
    for prefix, paths in prefix_as_paths.items():
        for path in paths:
            top_paths[tuple(path)] += 1
    
    print("Top AS Paths:")
    for path, count in sorted(top_paths.items(), key=lambda x: x[1], reverse=True)[:5]:  # Top 5 paths
        print(f"  Path: {' -> '.join(path)}, Count: {count}")

if __name__ == "__main__":
    main()



Metrics Summary:
Total Announcements: 3338
Total Withdrawals: 0
Unique Prefixes: 814
Top AS Paths:
  Path: 20514 -> 2119 -> 3356 -> 58453 -> 9808 -> 56046, Count: 87
  Path: 20514 -> 2119 -> 3356 -> 58453 -> 9808 -> 56040, Count: 85
  Path: 20514 -> 2119 -> 3356 -> 58453 -> 9808 -> 24445, Count: 74
  Path: 20514 -> 2119 -> 3356 -> 58453 -> 9808 -> 56042, Count: 58
  Path: 48070 -> 174 -> 3356 -> 46573, Count: 51
