Skip to content
Permalink
Browse files
Implement basic parsing of SOCKS messages with parsley
* Does not parse authenticated messages
* XXX Currently parsing of reply messages and methods is broken because of
https://bugs.launchpad.net/parsley/+bug/1085492
  • Loading branch information
hellais committed Dec 1, 2012
1 parent 0082cb6 commit 72475c35db3441c65de243019615e85ac40a476a
Showing with 141 additions and 0 deletions.
  1. +82 −0 txsocksx/parser.py
  2. +59 −0 txsocksx/test/test_parser.py
@@ -0,0 +1,82 @@
import parsley
import struct

socks_grammar = """
# XXX Is this correct?
octet = anything
byteToInt = octet:b
-> ord(b)
byteToIntStr = octet:b
-> str(ord(b))
ver = '\x05' | '\x04'
rsv = '\x00'
IPV4Addr = anything{4}:quads
-> '.'.join(str(ord(q)) for q in quads)
IPV6Addr = <anything{16}>
# XXX notes
# letterOrDigitOrHyphen = letterOrDigit | '-'
# domainLabel = <(letter letterOrDigitOrHyphen{0, 61} letterOrDigit)>
# domainName =
# < (domainLabel '.'?)* >
# XXX make this stricter
SOCKSDomainName =
byteToInt:len <anything*>
SOCKSAddress = (token('\x01') IPV4Addr:addr
-> addr
| token('\x03') IPV6Addr:addr
-> addr
| token('\x04') SOCKSDomainName:domain
-> domain
)
port = anything{2}
# The Client version identified/method selection message
clientVersionMethodMessage =
ver octet:nmethods octet{1, 255}:methods
-> (ver, nmethods, methods)
methods = tokenize('\x00') -> 'No Authentication Required'
| tokenize('\x01') -> 'GSSAPI'
| tokenize('\x02') -> 'Username/Password'
| tokenize('\xFF') -> 'No Acceptable Methods'
# The Server version identified/method selection message
serverVersionMethodMessage =
ver methods -> (ver, method)
cmd = token('\x01') -> 'Connect'
| token('\x02') -> 'Bind'
| token('\x03') -> 'UDP Associate'
clientRequestMessage =
ver cmd rsv SOCKSAddress port
rep = token('\x00') -> 'Suceeded'
| token('\x01') -> 'General SOCKS server failure'
| token('\x02') -> 'Connection not allowed'
| token('\x03') -> 'Network unreachable'
| token('\x04') -> 'Host unreachable'
| token('\x05') -> 'Connection refused'
| token('\x06') -> 'TTL expired'
| token('\x07') -> 'Command not supported'
| token('\x08') -> 'Address type not supported'
serverReplyMessage =
ver rep:reply rsv SOCKSAddress:address port:port
-> (reply, address, port)
"""

SOCKSGrammar = parsley.makeGrammar(socks_grammar, {})

@@ -0,0 +1,59 @@
import struct
import parsley

from unittest import TestCase

from txsocksx.parser import SOCKSGrammar

# From https://gist.github.com/1595135
def IPV4StrToInt(s):
"""
Returns the 32 bits representing an IP address from a string.
"""
return reduce(lambda a,b: a<<8 | b, map(int, s.split(".")))

dummyDomain = 'example.com'
dummyIPV4Addr = '127.0.0.1'
dummyIPV4AddrBytes = struct.pack('!i', IPV4StrToInt(dummyIPV4Addr))

dummyPort = 1080
dummyPortBytes = struct.pack('l', dummyPort)

dummyClientVersionMethodMessageNoAuthV5 = \
'\x05\x01\x00'

dummyServerVersionMethodMessageNoAuthV5 = \
'\x05\x00'

dummySOCKSAddrIPV4 = '\x01' + dummyIPV4AddrBytes
dummySOCKSAddrDomain = '\x04' + '\x08' + dummyDomain

dummyClientRequestMessageConnectDomainV5 = \
'\x05\x01\x00' + dummySOCKSAddrDomain

dummyServerReplyMessageSuccessIPV4 = \
'\x05\x00\x00' + '\x03' + dummyIPV4AddrBytes + dummyPortBytes


class TestSOCKSParser(TestCase):
def test_parse_socks_domain(self):
p = SOCKSGrammar(dummySOCKSAddrDomain)
self.assertEqual(p.SOCKSAddress(),
'example.com')

def test_parse_socks_ipv4(self):
p = SOCKSGrammar(dummySOCKSAddrIPV4)
self.assertEqual(p.SOCKSAddress(),
'127.0.0.1')

def test_parse_client_connect_request_message(self):
p = SOCKSGrammar(dummyClientRequestMessageConnectDomainV5)
self.assertEqual(p.clientRequestMessage(),
('Connect', dummyIPV4Addr, dummyPort))

def test_parse_client_request_message(self):
p = SOCKSGrammar(dummyServerReplyMessageSuccessIPV4)
self.assertEqual(p.clientRequestMessage(),
('Success', dummyIPV4Addr, dummyPort))


0 comments on commit 72475c3

Please sign in to comment.