Import Libraries

In [None]:
import scapy.all as scapy
from collections import defaultdict
import matplotlib.pyplot as plt
from IPython.display import display
import csv
import os
import time

Packet Callback function

In [None]:
# Global variables
packet_count = 0
protocol_count = defaultdict(int)
packet_details = []

def packet_callback(packet):
    global packet_count
    global packet_details
    packet_count += 1
    
    if packet.haslayer(scapy.IP):
        ip_layer = packet.getlayer(scapy.IP)
        protocol = ip_layer.proto
        
        # Manual mapping of protocol numbers to names
        protocol_names = {
            1: "ICMP",
            2: "IGMP",
            6: "TCP",
            17: "UDP",
            41: "IPv6",
            50: "ESP",
            51: "AH",
            58: "ICMPv6"
        }
        
        protocol_name = protocol_names.get(protocol, "Unknown")
        protocol_count[protocol_name] += 1

        # Collect packet details including timestamp
        packet_info = {
            'Packet Number': packet_count,
            'Source IP': ip_layer.src,
            'Destination IP': ip_layer.dst,
            'Protocol': protocol_name,
            'Size (bytes)': len(packet),
            'Time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(packet.time))
        }
        packet_details.append(packet_info)

        print(f"Packet {packet_count}: {ip_layer.src} -> {ip_layer.dst} | Protocol: {protocol_name} | Size: {len(packet)} bytes | Time: {packet_info['Time']}")


Function to start Sniffing

In [None]:
def start_sniffing(interface):
    print(f"Starting packet capture on interface {interface}...")
    scapy.sniff(iface=interface, prn=packet_callback, store=0)

Function to display Statistics

In [None]:
def display_statistics():
    global protocol_count, packet_details
    print("\n--- Protocol Statistics ---")
    
    # Print statistics to the console
    for protocol, count in protocol_count.items():
        print(f"{protocol}: {count} packets")
    
    # Save to CSV
    try:
        with open('protocol_statistics.csv', 'w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(["Protocol", "Count"])
            for protocol, count in protocol_count.items():
                writer.writerow([protocol, count])
        print("Statistics saved to 'protocol_statistics.csv'.")
    except Exception as e:
        print(f"Error saving statistics to CSV: {e}")
        
    # Print current working directory
    print(f"Current working directory: {os.getcwd()}")
    
    # Check if packet_details is empty
    if not packet_details:
        print("No packet details to save.")
        return

    # Visualization (optional)
    protocols = list(protocol_count.keys())
    counts = list(protocol_count.values())

    plt.bar(protocols, counts)
    plt.xlabel('Protocol')
    plt.ylabel('Count')
    plt.title('Packet Protocol Distribution')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

Check Available Interface

In [None]:
import psutil

def list_network_interfaces():
    interfaces = psutil.net_if_addrs()
    print("Available interfaces:")
    for iface in interfaces:
        print(iface)

# Main execution
list_network_interfaces()


Sniffing and Displaying Statistics

In [None]:
print()
print("Enter the interface name: ")
interface = input()  # Replace with your network interface name
try:
    start_sniffing(interface)
except KeyboardInterrupt:
    print("\nCapture stopped.")
    display_statistics()

Creating CSV File

In [None]:
csv_file = 'D:/4th semester/CN_EL/Network_Traffic_Analyzer/protocol_statistics.csv'
try:
        with open(csv_file, 'w', newline='') as file:
            writer = csv.DictWriter(file, fieldnames=['Packet Number', 'Source IP', 'Destination IP', 'Protocol', 'Size (bytes)', 'Time'])
            writer.writeheader()
            writer.writerows(packet_details)
        print(f"Statistics saved to '{csv_file}'.")
except Exception as e:
        print(f"Error saving statistics to CSV: {e}")