In [1]:
from scapy.all import *

from hparser import parse_short_header, parse_long_header, parse_first_layers
from traceback import format_exc
from scapy.layers.inet import UDP, IP
from config import ETH_HLEN, UDP_HLEN


In [3]:
pcap = rdpcap('quic_pkt.pcap')
a = pcap[0]
port = 6121  # Initial port

for packet in a:
    try:
        # Check if packet has a UDP layer
        if "UDP" in packet:
            quic_bytes = bytes(packet["UDP"])[8:]

            # Parse QUIC Header
            payload_offset = ETH_HLEN + packet[IP].ihl + UDP_HLEN
            parsed_header, _ = parse_long_header(int.from_bytes(quic_bytes[:4], 'big'), quic_bytes, payload_offset)  # Assuming long header for this example

            # Modify connection IDs
            parsed_header["src_conn_id_hex"] = "994befda" 
            parsed_header["dest_conn_id_hex"] = "994befda"  

            # Craft new packet with modified connection IDs
            new_quic_bytes = bytearray(quic_bytes)

            quic_packet = IP(src="10.10.10.69", dst="10.10.10.45") / UDP(sport=443, dport=port) / Raw(load=new_quic_bytes)
            for i in range(1,2):
                send(quic_packet, verbose=True, count=1)
                #time.sleep(1)
    except IndexError:
        # Handle potential IndexError if packet doesn't have a UDP layer
        pass

.
Sent 1 packets.


In [10]:
from hparser import parse_long_header 
from hparser import parse_short_header
from hparser import parse_first_layers
from config import ETH_HLEN, UDP_HLEN


# Provided functions for parsing QUIC headers
def parse_quic_packet(packet_bytearray: bytearray) -> None:
    # Parse initial layers (IP, UDP)
    layers_info, payload_offset = parse_first_layers(packet_bytearray)
    
    # Check if UDP port matches QUIC 
    #if layers_info["port_dst"] != 6121 :  
        #print(f"Skipping packet: Non-standard UDP port ({layers_info['port_dst']})")
        #return

    # Parse QUIC header (short or long)
    first_byte = packet_bytearray[payload_offset]
    if first_byte & 0x80:  # Check for Long Header format
        parsed_header, _ = parse_long_header(first_byte, packet_bytearray, payload_offset)
    else:
        parsed_header, _ = parse_short_header(first_byte, packet_bytearray, payload_offset)

    # Extract connection IDs and type (handle potential KeyError)
    try:
        src_conn_id_int = parsed_header["src_conn_id_int"]
        src_conn_id_hex = parsed_header["src_conn_id"]
        dest_conn_id_int = parsed_header["dest_conn_id_int"]
        dest_conn_id_hex = parsed_header["dest_conn_id"]
        packet_type = parsed_header["packet_type"]
    except KeyError as e:
        print(f"Error parsing header: {e}")
        src_conn_id_int, src_conn_id_hex, dest_conn_id_int, dest_conn_id_hex, packet_type = None, None, None, None, None

    # Print information
    print(f"Source Connection ID (int): {src_conn_id_int}")
    print(f"Source Connection ID (hex): {src_conn_id_hex}")
    print(f"Destination Connection ID (int): {dest_conn_id_int}")
    print(f"Destination Connection ID (hex): {dest_conn_id_hex}")
    print(f"Packet Type: {packet_type}")
    print("-" * 20)  # separator between packets



# Open the PCAP file
with open('quic_traffic.pcap', 'rb') as f:
    pcap_data = f.read()

# Loop through packets in the PCAP data
for packet in rdpcap("quic_traffic.pcap")[4]:
    parse_quic_packet(bytes(packet))
 

Source Connection ID (int): 3975027447
Source Connection ID (hex): ecee1af7
Destination Connection ID (int): 2507681006
Destination Connection ID (hex): 95782cee
Packet Type: Initial
--------------------


In [11]:
from typing import Tuple
from hparser import parse_long_header 
from hparser import parse_short_header
from hparser import parse_first_layers
from scapy.all import *
from scapy.all import Packet, sniff
from scapy.layers.inet import UDP


# Provided functions for parsing QUIC headers
def hparse_quic_packet(packet_bytearray: bytearray) -> None:
    # Parse initial layers (IP, UDP)
    layers_info, payload_offset = parse_first_layers(packet_bytearray)
    
    # Check if UDP port matches QUIC (optional)
    #if layers_info["port_dst"] != 6121 :  # QUIC typically uses port 443
        #print(f"Skipping packet: Non-standard UDP port ({layers_info['port_dst']})")
        #return

    # Parse QUIC header (short or long)
    first_byte = packet_bytearray[payload_offset]
    if first_byte & 0x80:  # Check for Long Header format
        parsed_header, _ = parse_long_header(first_byte, packet_bytearray, payload_offset)
    else:
        parsed_header, _ = parse_short_header(first_byte, packet_bytearray, payload_offset)

    # Extract connection IDs and type (handle potential KeyError)
    try:
        packet_type = parsed_header["packet_type"]
        if packet_type != "1RTT" :
         version = parsed_header.get("version")
         src_conn_id_int = parsed_header["src_conn_id_int"]
         src_conn_id_hex = parsed_header["src_conn_id"]
         dest_conn_id_int = parsed_header["dest_conn_id_int"]
         dest_conn_id_hex = parsed_header["dest_conn_id"]
        
         # Print information (use the retrieved values or placeholders)
         print(f"Source Connection ID (int): {src_conn_id_int}")
         print(f"Source Connection ID (hex): {src_conn_id_hex}")
         print(f"Destination Connection ID (int): {dest_conn_id_int}")
         print(f"Destination Connection ID (hex): {dest_conn_id_hex}")
         print(f"Packet Type: {packet_type}")
         print(f"Version: {version}") 
         
         
         
         print("-" * 20)  # Optional separator between packets
         
        else:
         spin_bit = parsed_header.get("spin_bit")
         packet_number_length = parsed_header["packet_number_length"]
         packet_number = parsed_header["packet_number"]
            
         print(f"Spin Bit: {spin_bit}")
         print(f"Packet Number Length: {packet_number_length}")
         print(f"Packet Number: {packet_number}")
         print(f"Packet Type: {packet_type}")
            
         print("-" * 20)  # Optional separator between packets


    except KeyError as e:
        print(f"Error parsing header: {e}")
        version, packet_type, src_conn_id_int, src_conn_id_hex, dest_conn_id_int, dest_conn_id_hex, spin_bit, packet_number_length, packet_number = None, None, None, None, None, None, None, None, None       

from fparser import parse_frames, parse_frames_rec, generate_result



def fparse_quic_packet(packet_bytearray: bytearray) -> None:
    # Parse initial layers (IP, UDP)
    layers_info, payload_offset = parse_first_layers(packet_bytearray)

    # Parse QUIC frames
    frames, parse_result = parse_frames(packet_bytearray[payload_offset:], payload_offset)

    # Handle parsing results
    if parse_result["status"] != 200:
        print(f"Error parsing frames: {parse_result['reason']}")
    else:
        # Process parsed frames
        for frame in frames:
            frame_name = frame["frame_name"]
            # Print frame details based on type
            if frame_name == "STREAM":
                print(f"\n  Stream Frame:")
                print(f"    Stream ID: {frame['stream_id']}")
                print(f"    Stream Type: {frame['stream_type']}")
                print(f"    Data Offset: {frame['offset']}")
                print(f"    FIN Flag: {frame['fin']}")
                print(f"    Data Length: {frame['len']}")
            elif frame_name == "ACK":
                print(f"\n  ACK Frame:")
                print(f"    Largest Acknowledged: {frame.get('largest_ack')}")
                print(f"    Acknowledged Blocks: {frame.get('acked')}")
            elif frame_name == "CONNECTION_CLOSE" or frame_name == "APPLICATION_CLOSE":
                print(f"\n  {frame_name} Frame:")
                print(f"    Error Code: {frame.get('error_code')}")
                print(f"    Reason Phrase: {frame.get('reason_phrase')}")
            elif frame_name == "PADDING":
                print(f"\n  Padding Frame:")
                print(f"    Amount of padding: {frame.get('amount')}")
            elif frame_name in ("STREAM_DATA_BLOCKED", "STREAMS_BLOCKED_BIDI", "STREAMS_BLOCKED_UNI"):
                print(f"\n  Stream Blocked Frame:")
                print(f"    Stream ID (if applicable): {frame.get('stream_id')}")
                print(f"    Limit: {frame.get('limit')}")
            elif frame_name in ("NEW_CONNECTION_ID", "RETIRE_CONNECTION_ID"):
                print(f"\n  Connection ID Frame:")
                print(f"    Sequence Number: {frame.get('sequence_number')}")
                if frame_name == "NEW_CONNECTION_ID":
                    print(f"      Connection ID: {frame.get('connection_id')}")
                    print(f"      Stateless Reset Token: {frame.get('stateless_reset_token')}")
            else:
                print(f"\n  Unknown Frame ({frame_name})")
            print("-" * 20)  # Optional separator between frames



# Open the PCAP file
with open('quic_traffic.pcap', 'rb') as f:
    pcap_data = f.read()

# Loop through packets in the PCAP data
for packet in rdpcap("quic_traffic.pcap")[7]:
    hparse_quic_packet(bytes(packet))
    fparse_quic_packet(bytes(packet))

Spin Bit: 0
Packet Number Length: 1
Packet Number: 120
Packet Type: 1RTT
--------------------
Error: Frame length (332808892) + payload offset (51) exceeds packet length (73)

  Stream Frame:
    Stream ID: 339775019
    Stream Type: Server-Uni
    Data Offset: 0
    FIN Flag: False
    Data Length: 332808892
--------------------


In [9]:
# from typing import List, Dict
# from util import get_varint
# from flags import flags
# from frame_types import get_frame_name, APPLICATION_CLOSE, \
#     CONNECTION_CLOSE, NEW_CONNECTION_ID, PADDING, \
#     RETIRE_CONNECTION_ID, STREAM, STREAM_DATA_BLOCKED, \
#     STREAMS_BLOCKED_BIDI, STREAMS_BLOCKED_UNI, ACK
# from config import MAX_STREAM_SIZE, SEAL_BYTES, LAST_FRAME_PADDING
# from fparser import parse_frames_rec

# # Import Scapy for PCAP reading
# from scapy.all import rdpcap

# def parse_frames(packet_bytearray: bytearray, payload_offset: int) -> List[Dict]:
#     frames = []
#     result = parse_frames_rec(packet_bytearray, payload_offset, frames)

#     return frames, result


# def generate_result(status, reason, payload_offset: int, remaining_length: int) -> dict:
#     return {
#         "status": status,
#         "reason": reason,
#         "payload_offset": payload_offset,
#         "remaining_length": remaining_length
#     }

# # Open the PCAP file with Scapy
# packets = rdpcap("quic_traffic.pcap")[7]

# # Loop through packets
# for packet in packets:
#     # Extract raw packet data
#     packet_data = bytes(packet)

#     # Define starting offset
#     payload_offset = 0

#     # Parse frames in the packet
#     frames, result = parse_frames(packet_data, payload_offset)

#     # Analyze the results
#     if result["status"] == 200:  # Successfully parsed all frames
#         print(f"Packet successfully parsed. Found {len(frames)} frames.")
#         for frame in frames:
#             print(f"  - Frame Name: {frame['frame_name']}")
#             print(f"    - Frame Type: {frame.get('frame_type', '')}")
#             # Access other frame details based on frame type

#     else:
#         print(f"Error parsing packet: {result['reason']}")

In [None]:
pcap = rdpcap('new_quic.pcap')
a = pcap[0]  #initial packet type
# send(a, verbose =True , count=1)

for packet in a:
  
    try:
        # Check if packet has a UDP layer
        if UDP in packet:
             # Parse QUIC Header (assuming it's in the payload)
            payload_offset = ETH_HLEN + packet[IP].ihl + UDP_HLEN
            parsed_header, _ = parse_long_header(int.from_bytes(quic_bytes[:4], 'big'), quic_bytes, payload_offset)  # Assuming long header for this example

            # Modify connection IDs (replace with desired test values)
            parsed_header["src_conn_id_hex"] =  "55a520d4" # Replace with your test source connection ID
            parsed_header["dest_conn_id_hex"] = "92b221ec"  # Replace with your test destination connection ID

            # Craft new packet with modified connection IDs
            new_quic_bytes = bytearray(quic_bytes)
            
            # Modify the relevant bytes in quic_bytes

            quic_packet = IP(src="10.10.10.69", dst="10.10.10.66") / UDP(sport=443, dport=6121) / Raw(load=new_quic_bytes)
            send(quic_packet, verbose=True, count=1)
    except IndexError:
        # Handle potential IndexError if packet doesn't have a UDP layer
        pass


In [None]:
import scapy.all
from scapy.all import sniff, wrpcap
from typing import List, Dict
from scapy.layers.inet import UDP

import base64
from scapy.all import *
import os
import random

from hparser import parse_short_header, parse_long_header

from config import ETH_HLEN, UDP_HLEN

# Import functions from the provided frame_parser code
from fparser import parse_frames, generate_result


def parse_quic_packets(pcap_file: str) -> List[Dict]:
  """
  Reads QUIC packets from a PCAP file and parses them using the provided frame_parser.

  Args:
      pcap_file: Path to the PCAP file containing network traffic.

  Returns:
      A list of dictionaries, where each dictionary contains information about a parsed packet:
          - packet_info: Summary of the packet (from scapy).
          - parsed_frames: List of parsed frames for the QUIC packet.
          - result: Parsing result dictionary from the frame parser (may indicate errors or status).
  """
  
  parsed_data = []
  #for packet in sniff(offline=pcap_file):
  if UDP in packet:
        # Extract QUIC payload
        quic_payload = bytes(packet["UDP"])[8:]
        
        # Parse frames using provided functions
        payload_offset = ETH_HLEN + packet[IP].ihl + UDP_HLEN
        frames, result = parse_frames(bytearray(quic_payload), payload_offset=payload_offset)
        parsed_data.append({
            "packet_info": packet.summary(),
            "parsed_frames": frames,
            "result": result
        })
  return parsed_data

with open('quic_traffic.pcap', 'rb') as f:
    pcap_data = f.read()

# Loop through packets in the PCAP data
for packet in rdpcap("quic_traffic.pcap"):
    parsed_packets=parse_quic_packets(bytes(packet))
 

# Example usage
#parsed_packets = parse_quic_packets('quic_traffic.pcap')
#print(payload_offset)
#print(len(packet_bytearray))
#print(frame_len)


# Print or analyze the parsed data (packet info, frames, and results)
for packet_data in parsed_packets:
  print(f"Packet Info: {packet_data['packet_info']}")
  print(f"Parsed Frames: {packet_data['parsed_frames']}")
  print(f"Parsing Result: {packet_data['result']}")
  print("-" * 60)
