Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add vless protocol in vmess2json.py #14

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 151 additions & 86 deletions vmess2json.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import urllib.parse

vmscheme = "vmess://"
vlscheme = "vless://"
ssscheme = "ss://"

TPL = {}
Expand Down Expand Up @@ -59,33 +60,7 @@
"domainStrategy": "UseIP"
}
}
],
"dns": {
"servers": [
"1.0.0.1",
"localhost"
]
},
"routing": {
"domainStrategy": "IPIfNonMatch",
"rules": [
{
"type": "field",
"ip": [
"geoip:private",
"geoip:cn"
],
"outboundTag": "direct"
},
{
"type": "field",
"domain": [
"geosite:cn"
],
"outboundTag": "direct"
}
]
}
]
}
"""

Expand Down Expand Up @@ -306,11 +281,24 @@
"""


def _decode_if_b64_encoded(text):
for char in text:
if char.isalpha() or char.isdigit() or char in "=/":
continue
return text # Detected non-base64 character
# The string is base64 encoded. Decode it!
blen = len(text)
if blen % 4 > 0:
text += "=" * (4 - blen % 4)
return base64.b64decode(text).decode()

def parseLink(link):
if link.startswith(ssscheme):
return parseSs(link)
elif link.startswith(vmscheme):
return parseVmess(link)
elif link.startswith(vlscheme):
return parseVless(link)
else:
print("ERROR: This script supports only vmess://(N/NG) and ss:// links")
return None
Expand All @@ -323,48 +311,39 @@ def parseSs(sslink):
"port": "",
"id": "",
"aid": "",
"net": "shadowsocks",
"net": "",
"type": "",
"host": "",
"path": "",
"tls": ""
"tls": "",
"protocol": "shadowsocks"
}
if sslink.startswith(ssscheme):
info = sslink[len(ssscheme):]

if info.rfind("#") > 0:
info, _ps = info.split("#", 2)
RETOBJ["ps"] = urllib.parse.unquote(_ps)

if info.find("@") < 0:
# old style link
#paddings
blen = len(info)
if blen % 4 > 0:
info += "=" * (4 - blen % 4)

info = base64.b64decode(info).decode()

atidx = info.rfind("@")
method, password = info[:atidx].split(":", 2)
addr, port = info[atidx+1:].split(":", 2)
else:
atidx = info.rfind("@")
addr, port = info[atidx+1:].split(":", 2)

info = info[:atidx]
blen = len(info)
if blen % 4 > 0:
info += "=" * (4 - blen % 4)
assert(sslink.startswith(ssscheme))

info = base64.b64decode(info).decode()
method, password = info.split(":", 2)
info = sslink[len(ssscheme):]

# Strip the tailing description in URL
if info.rfind("#") > 0:
info, _ps = info.split("#", 2)
RETOBJ["ps"] = urllib.parse.unquote(_ps)

if info.find("@") < 0:
# old style link: The whole link is encoded
info = _decode_if_b64_encoded(info)
atidx = info.rfind("@")
method, password = info[:atidx].split(":", 2)
addr, port = info[atidx+1:].split(":", 2)
else:
atidx = info.rfind("@")
addr, port = info[atidx+1:].split(":", 2)
info = _decode_if_b64_encoded(info[:atidx])
method, password = info.split(":", 2)

RETOBJ["add"] = addr
RETOBJ["port"] = port
RETOBJ["aid"] = method
RETOBJ["id"] = password
return RETOBJ
RETOBJ["add"] = addr
RETOBJ["port"] = port
RETOBJ["aid"] = method
RETOBJ["id"] = password
return RETOBJ


def parseVmess(vmesslink):
Expand All @@ -381,45 +360,128 @@ def parseVmess(vmesslink):
"type": "none",
"host": "",
"path": "",
"tls": ""
"tls": "",
"protocol": "vmess"
}
"""
if vmesslink.startswith(vmscheme):
bs = vmesslink[len(vmscheme):]
#paddings
blen = len(bs)
if blen % 4 > 0:
bs += "=" * (4 - blen % 4)

vms = base64.b64decode(bs).decode()
return json.loads(vms)
else:
raise Exception("vmess link invalid")
assert(vmesslink.startswith(vmscheme))

# vmess link is a base64 encoded json
vms = _decode_if_b64_encoded(vmesslink[len(vmscheme):])
RETOBJ = json.loads(vms)
RETOBJ["protocol"] = "vmess"
return RETOBJ

def parseVless(vlesslink):
# This function also applies to vmess AEAD. Just remove the assertion to allow vmessAEAD link to come in. https://github.com/XTLS/Xray-core/issues/91
assert(vlesslink.startswith(vlscheme))
RETOBJ = {
"v": "2",
"ps": "REMARK",
"add": "SERVER-ADDR",
"port": "SERVER-PORT",
"id": "LONG-ID",
"aid": "", # N/A
"net": "",
"type": "", # mKCP or QUIC header type
"host": "", # h2 or ws host / quic security
"path": "", # h2 or ws path / quic key
"tls": "", # transport layer security, none/tls/xtls
"protocol": "", # vless or vmess
# "optional_users_security": "", # encryption of vmess or vless
# "optional_tls_sni": "",
}

# vless link is either fully encoded, or not encoded at all
vlesslink = vlscheme + _decode_if_b64_encoded(vlesslink[len(vlscheme):])
parsed = urllib.parse.urlparse(vlesslink)

def validate(arg, defval = None):
if arg is not None:
return str(arg)
elif defval is not None:
return ""
else:
raise RuntimeError("failed while validating URL component. The link is incomplete or broken.")

RETOBJ["protocol"] = validate(parsed.scheme)
RETOBJ["id"] = validate(parsed.username)
RETOBJ["add"] = validate(parsed.hostname)
RETOBJ["port"] = validate(parsed.port)
RETOBJ["ps"] = validate(parsed.fragment, "") # TODO: decodeURIComponent

parsed_qs = urllib.parse.parse_qs(parsed.query)
parsed_qs = {k:v[0] for k,v in parsed_qs.items()}
RETOBJ["net"] = validate(parsed_qs.get('type'))
RETOBJ["optional_users_security"] = validate(parsed_qs.get('encryption', 'auto' if parsed.scheme == 'vmess' else 'none'))
RETOBJ["tls"] = validate(parsed_qs.get('security', 'none'))
RETOBJ["optional_tls_sni"] = validate(parsed_qs.get('sni', parsed.hostname))

if RETOBJ["net"] == 'http' or RETOBJ["net"] == 'ws':
RETOBJ["path"] = validate(parsed_qs.get('path', '/'))
RETOBJ["host"] = validate(parsed_qs.get('host', parsed.hostname))
elif RETOBJ["net"] == 'kcp':
RETOBJ["type"] = validate(parsed_qs.get('headerType', 'none'))
if parsed_qs.get('seed', None) is not None:
raise RuntimeError("mKCP seed is not supported by this tool")
elif RETOBJ["net"] == 'quic':
RETOBJ["type"] = validate(parsed_qs.get('headerType', 'none'))
RETOBJ["host"] = validate(parsed_qs.get('quicSecurity', 'none'))
RETOBJ["path"] = validate(parsed_qs.get('key', '')) # TODO: decodeURIComponent
elif RETOBJ["net"] == 'grpc':
raise RuntimeError("gRPC transport layer is not supported by this tool")
elif RETOBJ["net"] == 'tcp':
raise RuntimeError("Unknown 'type' argument. Accepting tcp, http, ws, kcp, quic, grpc, but getting " + RETOBJ["net"])

if parsed_qs.get('alpn', None) is not None:
raise RuntimeError("'alpn' parameter in the link is not supported. ")
if parsed_qs.get('flow', None) is not None:
raise RuntimeError("'flow' parameter in the link is not supported. (because xtls is not supported)")

return RETOBJ



def load_TPL(stype):
s = TPL[stype]
return json.loads(s)

def fill_basic(_c, _v):
def fill_vmess_or_vless(_c, _v):
_outbound = _c["outbounds"][0]
_outbound["protocol"] = _v["protocol"]

_vnext = _outbound["settings"]["vnext"][0]
_vnext["address"] = _v["add"]
_vnext["port"] = int(_v["port"])

_vnext["address"] = _v["add"]
_vnext["port"] = int(_v["port"])
_vnext["users"][0]["id"] = _v["id"]
_vnext["users"][0]["alterId"] = int(_v["aid"])
_vnext_user_0 = _vnext["users"][0]
_vnext_user_0["id"] = _v["id"]
if _v["aid"] == "":
del _vnext_user_0["alterId"]
else:
_vnext_user_0["alterId"] = int(_v["aid"])
if "optional_users_security" in _v:
if _v["protocol"] == "vless":
_vnext_user_0["encryption"] = _v["optional_users_security"]
del _vnext_user_0["security"]
else:
_vnext_user_0["security"] = _v["optional_users_security"]

_outbound["streamSettings"]["network"] = _v["net"]

if _v["tls"] == "tls":
_outbound["streamSettings"]["security"] = "tls"
_outbound["streamSettings"]["tlsSettings"] = {"allowInsecure": True}
if _v["host"] != "":
if "optional_tls_sni" in _v:
_outbound["streamSettings"]["tlsSettings"]["serverName"] = _v["optional_tls_sni"]
elif _v["host"] != "":
_outbound["streamSettings"]["tlsSettings"]["serverName"] = _v["host"]

elif _v["tls"] == "xtls":
raise RuntimeError("transport layer 'xtls' is not supported by this script")

return _c



def fill_shadowsocks(_c, _v):
_ss = load_TPL("out_ss")
_ss["email"] = _v["ps"] + "@ss"
Expand Down Expand Up @@ -481,12 +543,15 @@ def fill_quic(_c, _v):

def vmess2client(_t, _v):
_net = _v["net"]
_protocol = _v["protocol"]
_type = _v["type"]

if _net == "shadowsocks":
if _protocol == "shadowsocks":
return fill_shadowsocks(_t, _v)

_c = fill_basic(_t, _v)
elif _protocol == "vmess" or _protocol == "vless":
_c = fill_vmess_or_vless(_t, _v)
else:
raise RuntimeError("Invalid link causing vmess2client getting an unknown protocol " + _protocol)

if _net == "kcp":
return fill_kcp(_c, _v)
Expand Down