<a href="https://colab.research.google.com/github/Ankita-24-pixel/pong-game/blob/main/Task.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install tshark -y
!pip install pyshark pandas

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  libbcg729-0 libc-ares2 liblua5.2-0 libnl-genl-3-200 libpcap0.8 libsbc1
  libsmi2ldbl libspandsp2 libspeexdsp1 libwireshark-data libwireshark15
  libwiretap12 libwsutil13 wireshark-common
Suggested packages:
  snmp-mibs-downloader geoipupdate geoip-database geoip-database-extra
  libjs-leaflet libjs-leaflet.markercluster wireshark-doc
The following NEW packages will be installed:
  libbcg729-0 libc-ares2 liblua5.2-0 libnl-genl-3-200 libpcap0.8 libsbc1
  libsmi2ldbl libspandsp2 libspeexdsp1 libwireshark-data libwireshark15
  libwiretap12 libwsutil13 tshark wireshark-common
0 upgraded, 15 newly installed, 0 to remove and 35 not upgraded.
Need to get 23.0 MB of archives.
After this operation, 120 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 libpcap0.8 amd64 1.10.1-4ubuntu1.22.04.1

In [6]:
from google.colab import files
uploaded = files.upload()

Saving SIP-Call-with-Proxy-Server (1).pcap to SIP-Call-with-Proxy-Server (1) (1).pcap


In [5]:
import pyshark
import json
from datetime import datetime, timedelta
from collections import defaultdict
import pandas as pd
import subprocess

# ---------------- Parse PCAP for SIP Calls ----------------
def parse_pcap(filename):
    calls = []
    # Use tshark directly to avoid event loop issues with pyshark.FileCapture in Colab
    tshark_command = [
        "tshark", "-r", filename, "-Y", "sip",
        "-T", "fields",
        "-e", "frame.time_epoch",
        "-e", "sip.from_user",
        "-e", "sip.to_user",
        "-e", "ip.src",
        "-e", "ip.dst",
        "-e", "sip.session_expires"
    ]

    try:
        process = subprocess.Popen(tshark_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        stdout, stderr = process.communicate()

        if stderr:
            print(f"Tshark error: {stderr.decode('utf-8')}")
            return []

        for line in stdout.decode('utf-8').splitlines():
            fields = line.split('\t')
            if len(fields) == 6:
                try:
                    ts = datetime.fromtimestamp(float(fields[0]))
                    caller = fields[1] if fields[1] else ""
                    callee = fields[2] if fields[2] else ""
                    src_ip = fields[3] if fields[3] else ""
                    dst_ip = fields[4] if fields[4] else ""
                    duration = int(fields[5]) if fields[5] else 0

                    calls.append({
                        "timestamp": ts,
                        "caller": caller,
                        "callee": callee,
                        "src_ip": src_ip,
                        "dst_ip": dst_ip,
                        "duration": duration
                    })
                except (ValueError, IndexError) as e:
                    print(f"Error parsing line: {line} - {e}")
                    continue

    except FileNotFoundError:
        print("Error: tshark command not found. Make sure tshark is installed.")
        return []
    except Exception as e:
        print(f"An error occurred while running tshark: {e}")
        return []

    return calls

# ---------------- Apply Heuristics ----------------
def apply_heuristics(calls):
    suspects = []
    call_counter = defaultdict(list)
    bad_ips, bad_numbers = set(), set()

    for call in calls:
        reasons = []

        # Track calls for frequency check
        call_counter[call["caller"]].append(call["timestamp"])
        recent_calls = [t for t in call_counter[call["caller"]] if call["timestamp"] - t < timedelta(minutes=5)]

        # Heuristic 1: High frequency
        if len(recent_calls) >= 10:
            reasons.append("High frequency (<5min)")
            bad_numbers.add(call["caller"])

        # Heuristic 2: Abnormal duration
        if call["duration"] < 5:
            reasons.append("Short call <5s")
            bad_numbers.add(call["caller"])
        elif call["duration"] > 7200:
            reasons.append("Very long call >2h")
            bad_numbers.add(call["caller"])

        # Heuristic 3: Suspicious/foreign IPs (placeholder – in Colab we skip GeoIP)
        # Example: treat non-private IP ranges as "foreign"
        if not (call["src_ip"].startswith("192.168.") or call["src_ip"].startswith("10.") or call["src_ip"].startswith("172.16.")):
            reasons.append("Foreign/suspicious IP")
            bad_ips.add(call["src_ip"])

        if reasons:
            call["reasons"] = reasons
            suspects.append(call)

    return suspects, bad_ips, bad_numbers

# ---------------- Save Results ----------------
def save_results(suspects, bad_ips, bad_numbers):
    if suspects:
        df = pd.DataFrame(suspects)
        df.to_csv("suspects.csv", index=False)
        df.to_json("suspects.json", orient="records", indent=2, date_format="iso")

        # Save dynamically built blacklist
        blacklist = {
            "bad_ips": list(bad_ips),
            "bad_numbers": list(bad_numbers)
        }
        with open("blacklist.json", "w") as f:
            json.dump(blacklist, f, indent=2)

        from google.colab import files
        files.download("suspects.csv")
        files.download("suspects.json")
        files.download("blacklist.json")

        return df
    else:
        print("No suspicious calls detected.")
        return None

# ---------------- Main ----------------
def analyze_pcap(pcap_file):
    calls = parse_pcap(pcap_file)
    suspects, bad_ips, bad_numbers = apply_heuristics(calls)
    df = save_results(suspects, bad_ips, bad_numbers)
    return df

In [8]:
import nest_asyncio
nest_asyncio.apply()

df = analyze_pcap("SIP-Call-with-Proxy-Server (1).pcap")
df.head()


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Unnamed: 0,timestamp,caller,callee,src_ip,dst_ip,duration,reasons
0,2011-08-01 06:37:30.454022,201,101,10.33.6.101,10.33.6.102,0,[Short call <5s]
1,2011-08-01 06:37:30.478746,201,101,10.33.6.102,10.33.6.101,0,[Short call <5s]
2,2011-08-01 06:37:30.490270,201,101,10.33.6.102,10.33.6.100,0,[Short call <5s]
3,2011-08-01 06:37:30.530925,201,101,10.33.6.100,10.33.6.102,0,[Short call <5s]
4,2011-08-01 06:37:30.549798,201,101,10.33.6.100,10.33.6.102,0,[Short call <5s]
