Skip to content

Commit

Permalink
Close #37 : Merge in pczarn SOA record code
Browse files Browse the repository at this point in the history
  • Loading branch information
campadrenalin committed Sep 13, 2013
2 parents 34f6a0d + 5a15ac6 commit 37c274d
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 11 deletions.
55 changes: 55 additions & 0 deletions pymads/record.py
Expand Up @@ -17,11 +17,21 @@

import struct
import logging
from collections import namedtuple
from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
from persei import String, RawData, RawDataDecorator
from pymads import const
from pymads import utils

soa_namedtuple = namedtuple(
'SOAType',
['mname', 'rname', 'serial', 'refresh', 'retry', 'expire', 'minimum']
)

class SOAType(soa_namedtuple):
def __str__(self):
return "%s.\t%s.\t%d\t%d\t%d\t%d\t%d" % self

class Record(object):
''' Represents a DNS record. '''

Expand Down Expand Up @@ -119,6 +129,20 @@ def __repr__(self):
self.rdata,
)

def __str__(self):
text = '%s.\t%d\t%s\t%s\t%s' % (
self.domain_name,
self.rttl,
self.rclass,
self.rtype,
self.rdata
)

if self.packtype == 'domain':
text += '.'

return text

@property
def packtype(self):
if self.rtype in ('A',):
Expand All @@ -127,6 +151,8 @@ def packtype(self):
return 'IPv6'
elif self.rtype in ('NS', 'CNAME'):
return 'domain'
elif self.rtype in ('SOA',):
return 'zone'
else:
logging.warn('unknown record type ' + self.rtype)
return 'unknown'
Expand Down Expand Up @@ -162,6 +188,22 @@ def pack_rdata_domain(self):
'''
return utils.labels2str(self.rdata.split('.'))

def pack_rdata_zone(self):
'''
Pack a record that holds global parameters of a zone.
'''
if type(self.rdata) is dict:
if set(self.rdata.keys()) != set(SOAType._fields):
raise TypeError("invalid SOA record")
self.rdata = SOAType(*[self.rdata[f] for f in SOAType._fields])
elif type(self.rdata) in (tuple, list):
self.rdata = SOAType(*self.rdata)

packed = utils.labels2str(self.rdata.mname.split('.'))
packed += utils.labels2str(self.rdata.rname.split('.'))
packed += struct.pack("!IiiiI", *self.rdata[2:])
return packed

@RawDataDecorator(args=False)
def unpack_rdata(self, data, offset, length):
'''
Expand Down Expand Up @@ -202,6 +244,19 @@ def unpack_rdata_offset_domain(self, data, offset, length):
for x in utils.str2labels(data, offset)[1]
)

def unpack_rdata_offset_zone(self, data, offset, length):
'''
Unpack a record that holds global parameters of a zone.
'''
offset, mname = utils.str2labels(data, offset)
offset, rname = utils.str2labels(data, offset)

return SOAType(
'.'.join(String(x).export() for x in mname),
'.'.join(String(x).export() for x in rname),
*struct.unpack("!IiiiI", data[offset:offset+20].export())
)

def pack(self):
'''
Formats the resource fields to be used in the response packet.
Expand Down
19 changes: 18 additions & 1 deletion pymads/tests/test_packet.py
Expand Up @@ -60,7 +60,7 @@ def test_cycle_request(self):

def test_cycle_response(self):
from pymads.request import Request
from pymads.record import Record
from pymads.record import Record, SOAType

req = Request(25, [], 'AAAA')
req.name = 'example.com'
Expand All @@ -74,6 +74,23 @@ def test_cycle_response(self):

self.do_test_pack_cycle(resp)

rec3 = Record(
'example.com',
SOAType(
'ns.example.com',
'dns-admin.example.com',
2003080800,
172800,
1209600,
900,
3600
),
'SOA'
)
resp = req.respond(0, [rec, rec2, rec3])

self.do_test_pack_cycle(resp)

def test_parse_pointers(self):
from pymads.request import Request
from pymads.record import Record
Expand Down
67 changes: 57 additions & 10 deletions pymads/tests/test_resolution.py
Expand Up @@ -73,16 +73,7 @@ def do_test_record(self, *records, **kwargs):
]

for record in records:
success_text = '%s.\t%d\t%s\t%s\t%s' % (
record.domain_name,
record.rttl,
record.rclass,
record.rtype,
record.rdata
)

if record.packtype == 'domain':
success_text += '.'
success_text = str(record)

self.assertIn(
success_text,
Expand Down Expand Up @@ -152,6 +143,62 @@ def test_CNAME(self):

self.do_test_record(*expected_records)

def test_SOA(self):
'''
Attempt to store and resolve a SOA record.
'''
from pymads.record import SOAType
from pymads.sources.dns import DummyDnsSource

# Response for @8.8.8.8 SOA www.theuselessweb.com
packet = RawData((0x0,0x1,0x81,0x80,0x0,0x1,0x0,0x3,0x0,0x0,0x0,0x0,0x3,0x77,0x77,0x77,0xd,0x74,0x68,0x65,0x75,0x73,0x65,0x6c,0x65,0x73,0x73,0x77,0x65,0x62,0x3,0x63,0x6f,0x6d,0x0,0x0,0x6,0x0,0x1,0xc0,0xc,0x0,0x5,0x0,0x1,0x0,0x0,0xe,0x10,0x0,0x37,0x3,0x77,0x77,0x77,0xd,0x74,0x68,0x65,0x75,0x73,0x65,0x6c,0x65,0x73,0x73,0x77,0x65,0x62,0x3,0x63,0x6f,0x6d,0x14,0x73,0x33,0x2d,0x77,0x65,0x62,0x73,0x69,0x74,0x65,0x2d,0x75,0x73,0x2d,0x65,0x61,0x73,0x74,0x2d,0x31,0x9,0x61,0x6d,0x61,0x7a,0x6f,0x6e,0x61,0x77,0x73,0xc0,0x1e,0xc0,0x33,0x0,0x5,0x0,0x1,0x0,0x0,0x0,0x3c,0x0,0x2,0xc0,0x49,0xc0,0x49,0x0,0x6,0x0,0x1,0x0,0x0,0x3,0x84,0x0,0x48,0x7,0x6e,0x73,0x2d,0x31,0x39,0x31,0x39,0x9,0x61,0x77,0x73,0x64,0x6e,0x73,0x2d,0x34,0x37,0x2,0x63,0x6f,0x2,0x75,0x6b,0x0,0x11,0x61,0x77,0x73,0x64,0x6e,0x73,0x2d,0x68,0x6f,0x73,0x74,0x6d,0x61,0x73,0x74,0x65,0x72,0x6,0x61,0x6d,0x61,0x7a,0x6f,0x6e,0xc0,0x1e,0x0,0x0,0x0,0x1,0x0,0x0,0x1c,0x20,0x0,0x0,0x3,0x84,0x0,0x12,0x75,0x0,0x0,0x1,0x51,0x80))

self.chain = Chain([DummyDnsSource(packet)])
self.server.config['chains'] = [self.chain]
records = self.chain.get_domain_string('www.theuselessweb.com')

# Test input parsing
uwstr = 'www.theuselessweb.com'
awstr = 's3-website-us-east-1.amazonaws.com'
fstr = uwstr + '.' + awstr

expected_records = [
Record(uwstr,fstr, 'CNAME',3600),
Record(fstr, awstr,'CNAME',60),
Record(awstr, (
"ns-1919.awsdns-47.co.uk",
"awsdns-hostmaster.amazon.com",
1,
7200,
900,
1209600,
86400), 'SOA', 900)
]

self.assertEquals(
records,
expected_records,
)

self.do_test_record(*expected_records)
keys = ('mname', 'rname',
'serial', 'refresh', 'retry', 'expire', 'minimum')
values = ('ns.example.com', 'dns-admin.example.com',
2003080800, 172800, 1209600, 900, 3600)

# Test every valid input
for rdata in (values, dict(zip(keys, values)), SOAType(*values)):
record = Record('example.com', rdata, rtype = 'SOA')

self.setup_chain(record)
self.do_test_record(record)

# Test invalid dict input
self.assertRaises(
TypeError,
lambda: Record('example.com', dict(zip(keys, values[:-1])), 'SOA')
)

def test_async(self):
'''
Run server and consumer threads seperately.
Expand Down

0 comments on commit 37c274d

Please sign in to comment.