In [None]:
import urllib.parse
import base64
import json
import socket
import time
import pandas as pd

# ============================================================
# Load previously generated VMESS configuration dataset
# ============================================================

vless_df = pd.read_excel(
    '/home/alireza/github/My-automated-life/Config_Automation/data/2)Scrapped_VMESS_Configs_json.xlsx'
)

# ============================================================
# Proxy Link Parsing Utilities
# ============================================================

def extract_host_port(proxy_link):
    """
    Extract target host and port from VLESS or VMESS proxy links.

    Supports:
    - vless://user@host:port?...
    - vmess://<base64-json>

    Returns:
        (host, port) as (str, int)
        (None, None) if parsing fails
    """
    if not isinstance(proxy_link, str):
        return None, None

    proxy_link = proxy_link.strip()

    # ----------------------------
    # VLESS parsing
    # ----------------------------
    if proxy_link.startswith("vless://"):
        try:
            parsed = urllib.parse.urlparse(proxy_link)

            # Expect userinfo before '@'
            if "@" not in parsed.netloc:
                return None, None

            netloc = parsed.netloc.split("@", 1)[1]

            if ":" not in netloc:
                return None, None

            host, port = netloc.rsplit(":", 1)
            return host, int(port)

        except Exception:
            return None, None

    # ----------------------------
    # VMESS parsing
    # ----------------------------
    elif proxy_link.startswith("vmess://"):
        try:
            # Strip scheme
            base64_config = proxy_link[8:]

            # Fix Base64 padding
            padding = 4 - len(base64_config) % 4
            if padding != 4:
                base64_config += "=" * padding

            # Decode Base64 → JSON
            json_config = base64.urlsafe_b64decode(base64_config).decode('utf-8')
            vmess_data = json.loads(json_config)

            # Extract address and port
            address = vmess_data.get("add", "")
            port = vmess_data.get("port", "")

            if not address or not port:
                return None, None

            return address, int(port)

        except Exception:
            return None, None

    return None, None

# ============================================================
# TCP Ping Functions
# ============================================================

def tcp_ping(host, port, timeout=2):
    """
    Perform a single TCP connection attempt and measure latency.

    Returns:
        latency in milliseconds
        None if unreachable
    """
    try:
        start = time.time()
        with socket.create_connection((host, port), timeout=timeout):
            return (time.time() - start) * 1000
    except Exception:
        return None

def batch_tcp_ping(proxy_links, timeout=2, max_workers=10):
    """
    Perform TCP ping on a list of proxy links.

    NOTE:
    Threading is imported but not used.
    Function currently runs sequentially.
    """
    import concurrent.futures

    results = []

    for link in proxy_links:
        host, port = extract_host_port(link)
        if host is not None:
            latency = tcp_ping(host, port, timeout)
            results.append(latency)
        else:
            results.append(None)

    return results

def enhanced_tcp_ping(host, port, timeout=3, retries=2):
    """
    TCP ping with retries and granular error handling.

    Retries connection attempts to handle transient network failures.
    """
    for attempt in range(retries):
        try:
            start = time.time()
            with socket.create_connection((host, port), timeout=timeout):
                return (time.time() - start) * 1000

        except socket.timeout:
            continue
        except ConnectionRefusedError:
            return None
        except socket.gaierror:
            return None
        except Exception:
            continue

    return None

# ============================================================
# Main Execution Loop
# ============================================================

latencies = []
successful_count = 0
failed_count = 0

for link in vless_df["links"]:
    host, port = extract_host_port(link)

    if host is not None and port is not None:
        print(f"Testing {host}:{port}...", end=" ")

        latency = enhanced_tcp_ping(host, port)

        if latency is not None:
            print(f"✓ {latency:.2f} ms")
            successful_count += 1
        else:
            print("✗ Failed")
            failed_count += 1

        latencies.append(latency)

    else:
        print(f"Could not parse: {link}")
        latencies.append(None)
        failed_count += 1

print(f"\nSummary: {successful_count} successful, {failed_count} failed")

# ============================================================
# Store and Rank Results
# ============================================================

# Attach latency results to DataFrame
vless_df["tcp_latency_ms"] = latencies

# Sort by fastest reachable proxies
vless_df = vless_df.sort_values(
    "tcp_latency_ms",
    na_position='last'
).reset_index(drop=True)

# Remove unreachable proxies
vless_df = vless_df.dropna(subset=['tcp_latency_ms'])

vless_df


Testing 95.216.183.113:42152... ✓ 146.67 ms
Testing tr1-smart.adelping.com:80... ✓ 353.36 ms
Testing 140.248.137.99:80... ✓ 106.58 ms
Testing Cpannel.iranucshop.ir:2085... ✓ 1436.09 ms
Testing 157.173.206.47:57449... ✗ Failed
Testing servers.xvon.in:2087... ✗ Failed
Testing 62.60.245.128:24448... ✗ Failed
Testing 172.233.229.91:14102... ✓ 409.10 ms
Testing 213.176.121.114:443... ✓ 59.41 ms
Testing 213.176.121.158:443... ✓ 42.64 ms
Testing 2.188.212.39:8080... ✓ 37.26 ms
Testing nl.pingooo.org:2053... ✓ 226.53 ms
Testing us.pingooo.org:80... ✓ 255.48 ms
Testing 57.129.28.216:443... ✓ 162.17 ms
Testing Cpannel.iranucshop.ir:2085... ✗ Failed
Testing 47.250.45.234:43063... ✓ 351.35 ms
Testing 91.134.8.101:443... ✓ 133.94 ms
Testing dll.avaaaal.ir:8443... ✓ 6313.73 ms
Testing digitalocean.com:8080... ✗ Failed
Testing 172.67.204.84:8080... ✓ 353.98 ms
Testing 85.17.192.110:16623... ✗ Failed
Testing daily.pouya2.ir:6024... ✗ Failed
Testing 15.235.50.172:443... ✓ 258.34 ms
Testing 165.140.216.

Unnamed: 0.1,Unnamed: 0,links,json_config,tcp_latency_ms
0,728,vmess://eyJhZGQiOiI1LjgzLjE1MC4xMDgiLCJob3N0Ij...,"{""log"": {""loglevel"": ""warning""}, ""inbounds"": [...",25.811672
1,729,vmess://eyJhZGQiOiI1LjgzLjE1MC4xMDgiLCJob3N0Ij...,"{""log"": {""loglevel"": ""warning""}, ""inbounds"": [...",26.057720
2,723,vmess://eyJhZGQiOiI1LjgzLjE1MC4xMDgiLCJob3N0Ij...,"{""log"": {""loglevel"": ""warning""}, ""inbounds"": [...",26.614904
3,155,vmess://ew0KICAidiI6ICIyIiwNCiAgInBzIjogIlx1RD...,"{""log"": {""loglevel"": ""warning""}, ""inbounds"": [...",28.021574
4,70,vmess://eyJhZGQiOiI4LjYuMTEyLjAiLCJhaWQiOiIwIi...,"{""log"": {""loglevel"": ""warning""}, ""inbounds"": [...",28.148651
...,...,...,...,...
288,120,vmess://eyJhZGQiOiJzcGVlZHRlc3QubmV0IiwiYWlkIj...,"{""log"": {""loglevel"": ""warning""}, ""inbounds"": [...",13271.260262
289,100,vmess://eyJhZGQiOiJjYWxsbWV2cG44LmNhbGxtZXNob3...,"{""log"": {""loglevel"": ""warning""}, ""inbounds"": [...",15422.569036
290,794,vmess://ewogICAgImFkZCI6ICJoZ3Ryb2phbi56YWJjLm...,"{""log"": {""loglevel"": ""warning""}, ""inbounds"": [...",15426.237106
291,84,vmess://eyJhZGQiOiJzZXJ2ZXJzLnh2b24uaW4iLCJhaW...,"{""log"": {""loglevel"": ""warning""}, ""inbounds"": [...",20100.023031


In [None]:
import asyncio
import pandas as pd
import json
import subprocess
import socket
import time
import tempfile
import copy
from playwright.async_api import async_playwright

# ============================================================
# System-level configuration
# ============================================================

# Absolute path to system Chrome/Chromium binary
# Required because Playwright is launched manually
CHROME_PATH = "/usr/bin/google-chrome"

# ============================================================
# Target websites for full proxy functionality testing
# These represent real-world blocked or sensitive services
# ============================================================

TEST_SITES = {
    "youtube": "https://www.youtube.com",
    "telegram": "https://web.telegram.org",
    "instagram": "https://www.instagram.com",
    "chatgpt": "https://chat.openai.com",
}

# ============================================================
# Test a single VMESS/Xray config using:
# 1) Local SOCKS proxy via Xray
# 2) Real browser traffic via Playwright
# ============================================================

async def test_single_config_async(config_dict, local_port):
    """
    Launch Xray with a single config, expose SOCKS proxy,
    then verify real browsing capability using Playwright.

    Returns:
        dict containing availability and per-site latency results
    """
    result = {"proxy_alive": False}

    # Validate input
    if not isinstance(config_dict, dict):
        return result

    # Clone config to avoid mutating original object
    config = copy.deepcopy(config_dict)

    # Override SOCKS inbound port to avoid collisions
    for inbound in config.get("inbounds", []):
        if inbound.get("protocol") == "socks":
            inbound["port"] = local_port

    # Write config to a temporary JSON file
    with tempfile.NamedTemporaryFile("w", delete=False, suffix=".json") as f:
        json.dump(config, f, indent=2)
        config_path = f.name

    # Launch Xray as a subprocess
    proc = subprocess.Popen(
        ["xray", "run", "-c", config_path],
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL
    )

    # --------------------------------------------------------
    # Wait until SOCKS port becomes reachable (max 15s)
    # --------------------------------------------------------
    start = time.time()
    while time.time() - start < 15:
        try:
            with socket.create_connection(("127.0.0.1", local_port), timeout=1):
                result["proxy_alive"] = True
                break
        except Exception:
            await asyncio.sleep(0.5)

    # Abort if proxy failed to initialize
    if not result["proxy_alive"]:
        proc.terminate()
        proc.wait()
        return result

    # --------------------------------------------------------
    # Real traffic test via Playwright browser
    # --------------------------------------------------------
    async with async_playwright() as p:
        browser = await p.chromium.launch(
            executable_path=CHROME_PATH,
            proxy={"server": f"socks5://127.0.0.1:{local_port}"},
            headless=True
        )

        context = await browser.new_context()
        page = await context.new_page()

        # Visit each target site and measure navigation latency
        for name, url in TEST_SITES.items():
            try:
                t0 = time.time()
                await page.goto(url, timeout=30000)  # 30s navigation timeout
                latency_ms = (time.time() - t0) * 1000

                result[f"{name}_latency_ms"] = latency_ms
                result[f"{name}_status"] = 200

            except Exception:
                result[f"{name}_latency_ms"] = None
                result[f"{name}_status"] = None

        await browser.close()

    # Cleanly terminate Xray process
    proc.terminate()
    proc.wait()

    return result

# ============================================================
# Batch testing for entire DataFrame
# Each config gets its own SOCKS port
# ============================================================

async def test_vless_df_async(df, start_port=10808):
    """
    Run full async tests for all configs in DataFrame.

    Each config:
    - Runs Xray on a unique local port
    - Is tested concurrently using asyncio
    """
    tasks = []

    # Create async tasks with unique ports
    for i in range(len(df)):
        tasks.append(
            test_single_config_async(
                df.loc[i, "json_config"],
                start_port + i
            )
        )

    completed = 0
    final_results = []

    # Collect results as tasks finish (not sequential order)
    for coro in asyncio.as_completed(tasks):
        res = await coro
        final_results.append(res)
        completed += 1
        print(f"✅ Finished {completed}/{len(df)}")

    # Merge results back into original DataFrame
    results_df = pd.DataFrame(final_results)
    return pd.concat([df.reset_index(drop=True), results_df], axis=1)

# ============================================================
# Execution
# ============================================================

# Convert JSON strings back into Python dicts
vless_df["json_config"] = vless_df["json_config"].apply(json.loads)

# Run async test loop (Jupyter-compatible)
vless_df = await test_vless_df_async(vless_df)

# Keep only configs that successfully initialized proxy
vless_df = vless_df[vless_df["proxy_alive"]].reset_index(drop=True)

# Persist final validated configs
vless_df.to_excel(
    '/home/alireza/github/My-automated-life/Config_Automation/data/3)Config_Test_Results.xlsx'
)

vless_df
