In [1]:
import struct
import socket
from io import BytesIO

from part_1 import build_query, DNSQuestion, DNSHeader, DNSType

In [2]:
request_message = build_query("blog.horang.dev", DNSType.A)

In [3]:
request_message

b'\x90\xde\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x04blog\x06horang\x03dev\x00\x00\x01\x00\x01'

# 2.1 응답 메시지 파싱

In [4]:
from dataclasses import dataclass 

@dataclass
class DNSRecord:
    name: bytes
    type_: int
    class_: int
    ttl: int
    data: bytes 

- name: 도메인 이름
- type_: A, AAAA, MX, NS, TXT 등 (정수로 인코딩)
- class: 항상 동일 (1). 무시할 것이다. RFC 1035, 3.2.4. CLASS values 부분 참조
- ttl: 쿼리를 캐시할 시간. 무시할 것이다.
- data: 레코드의 내용, 질의한 DNS의 IP 주소와 같은 것들이 포함되어 있다.


# 2.2 헤더 파싱

In [5]:
def parse_header(reader):
    items = struct.unpack("!HHHHHH", reader.read(12))
    # see "a note on BytesIO" for an explanation of `reader` here
    return DNSHeader(*items)

파트 1.2의 header_to_bytes에서의 코드와 동일하다. 각각의 6개의 필드는 2바이트 정수이므로, 총 12바이트를 읽어야 한다.

In [6]:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

sock.sendto(request_message, ("8.8.8.8", 53))

response, _ = sock.recvfrom(1024)
print(response)
print(response.hex())

b'\x90\xde\x81\x80\x00\x01\x00\x05\x00\x00\x00\x00\x04blog\x06horang\x03dev\x00\x00\x01\x00\x01\xc0\x0c\x00\x05\x00\x01\x00\x00,b\x00\x13\x07chl8469\x06github\x02io\x00\xc0-\x00\x01\x00\x01\x00\x00\x022\x00\x04\xb9\xc7l\x99\xc0-\x00\x01\x00\x01\x00\x00\x022\x00\x04\xb9\xc7m\x99\xc0-\x00\x01\x00\x01\x00\x00\x022\x00\x04\xb9\xc7n\x99\xc0-\x00\x01\x00\x01\x00\x00\x022\x00\x04\xb9\xc7o\x99'
90de8180000100050000000004626c6f6706686f72616e67036465760000010001c00c0005000100002c6200130763686c383436390667697468756202696f00c02d00010001000002320004b9c76c99c02d00010001000002320004b9c76d99c02d00010001000002320004b9c76e99c02d00010001000002320004b9c76f99


In [7]:
reader = BytesIO(response)
parse_header(reader)

DNSHeader(id=37086, flags=33152, num_questions=1, num_answers=5, num_authorities=0, num_additionals=0)

해석하면 아래와 같다.

- id: `63902`, random으로 넣어주었던 값
- flags: `33152`, QR=1, opcode=0, AA=0, TC=0, RD=1, RA=1, Z=0, RCODE=0
- 질의한 질문의 수: `1`
- 응답으로 돌아온 레코드의 수: `5`

# 2.3 domain name 파싱(데이터 압축 처리 미포함)

헤더부분을 파싱하였으니 이제 질문에 대한 응답을 파싱해보자. 질문에 대한 응답은 질문한 도메인 이름과 그에 대한 레코드들이다.

아래를 보면 파트1에서 name 인코딩했던 것과 유사한 byte가 보인다. 

In [8]:
question = reader.read(21)
question

b'\x04blog\x06horang\x03dev\x00\x00\x01\x00\x01'

In [9]:
def decode_name_simple(reader):
    parts = []
    while (length := reader.read(1)[0]) != 0:
        parts.append(reader.read(length))
    return b".".join(parts)

- 1 byte를 읽어서 해당 도메인 이름의 길이를 알아낸다.
- 그 길이만큼 도메인 이름을 읽어낸다.
- 길이가 0이 될 때까지 반복한다.
- 각각의 도메인 이름을 .으로 연결한다.

# 2.4 질의 파싱

In [10]:
def parse_question(reader):
    name = decode_name_simple(reader)
    data = reader.read(4)
    type_, class_ = struct.unpack("!HH", data)
    return DNSQuestion(name, type_, class_)

In [11]:
reader = BytesIO(response)
parse_header(reader)
parse_question(reader)

DNSQuestion(name=b'blog.horang.dev', type_=1, class_=1)

# 2.5 레코드 파싱 

이제 레코드 부분을 파싱해보자. 레코드의 포맷은 [RFC 1035, 4.1.3. Resource record format](https://datatracker.ietf.org/doc/html/rfc1035#section-4.1.3)을 참조하자.

In [12]:
def parse_record(reader):
    name = decode_name_simple(reader)
    # the type, class, TTL, and data length together are 10 bytes (2 + 2 + 4 + 2 = 10)
    # so we read 10 bytes
    data = reader.read(10)
    # HHIH means 2-byte int, 2-byte-int, 4-byte int, 2-byte int
    type_, class_, ttl, data_len = struct.unpack("!HHIH", data) 
    data = reader.read(data_len)
    return DNSRecord(name, type_, class_, ttl, data)

In [13]:
reader = BytesIO(response)
parse_header(reader)
parse_question(reader)
parse_record(reader)

IndexError: index out of range

오류가 발생한 이유는, 도메인 이름이 압축되어 있기 때문이다. record의 데이터부분을 파싱하다보면 길이가 192인 도메인 이름이 나오는데, 도메인 이름의 각 부분의 최대 길이는 63이다. 192는 byte로 11000000로 표현되는데, 이는 압축되었음을 알리는 코드이다.

이는 DNS 응답에 동일한 도메인 이름이 여러번 나오기 때문에 발생한다. 이를 해결하기 위해 압축된 응답을 처리하는 함수를 작성해보자. question 부분을 파싱할 때 이러한 문제가 발생하지 않은 이유는, 도메인 이름이 한 번만 나왔기 때문이다. DNS 압축에 대한 내용은 [RFC 1035, section 4.1.4](https://datatracker.ietf.org/doc/html/rfc1035#section-4.1.4)를 참고해보자.

# 2.6 domain name 파싱(데이터 압축 처리 포함)

In [14]:
def decode_name(reader):
    parts = []
    while (length := reader.read(1)[0]) != 0:
        if length & 0b1100_0000:
            parts.append(decode_compressed_name(length, reader))
            break
        else:
            parts.append(reader.read(length))
    return b".".join(parts)


def decode_compressed_name(length, reader):
    pointer_bytes = bytes([length & 0b0011_1111]) + reader.read(1)
    pointer = struct.unpack("!H", pointer_bytes)[0]
    current_pos = reader.tell()
    reader.seek(pointer)
    result = decode_name(reader)
    reader.seek(current_pos)
    return result

도메인 이름이 압축되어 있는지 확인하기 위하여 길이를 얻을 때마다 처음 2비트가 1인지 확인한다.
- 앞서 말했듯이 DNS 이름 구성 요소의 최대 길이는 63자이므로 압축되지 않은 DNS 이름 부분에서는 상위 2비트가 절대 설정되지 않는다.

만약 2비트가 1이라면, 이는 압축된 이름이라는 것을 의미한다. 이때, 2바이트를 읽어서 포인터를 얻어낸다. 포인터는 DNS 패킷에서의 위치를 나타내는데, 이 위치로 이동하여 다시 decode_name을 호출한다. 이때, 현재 위치를 저장해두었다가 다시 돌아올 수 있도록 해야 한다.

압축된 도메인 이름은 다른 레이블을 따르지 않으므로, 레이블을 압축해제한 후 즉시 리턴한다.

**이 코드는 보안 취약점을 가지고 있다. 이에 대한 내용은 연습문제 3을 참조하자.**

# 2.7 레코드 파싱(압축 처리 포함)

In [15]:
def parse_record(reader):
    name = decode_name(reader)
    data = reader.read(10)
    type_, class_, ttl, data_len = struct.unpack("!HHIH", data)
    data = reader.read(data_len)
    return DNSRecord(name, type_, class_, ttl, data)

In [16]:
reader = BytesIO(response)
parse_header(reader)
parse_question(reader)
parse_record(reader)

DNSRecord(name=b'blog.horang.dev', type_=5, class_=1, ttl=11362, data=b'\x07chl8469\x06github\x02io\x00')

올바르게 파싱된 것을 확인할 수 있다.

# 2.8 DNS 패킷 파싱

지금까지 배운 것들을 종합하여 전체 DNS 패킷을 구문 분석할 수 있다.

이전에는 헤더 1개, 질문 1개, 레코드 1개를 파싱했지만, 실제로는 헤더에는 패킷의 각 섹션에서 예상되는 레코드 수를 알려주는 숫자(num_questions, num_answers, num_additions 및 num_authorities)가 여러 개 있다.

따라서, DNS 패킷의 모든 섹션을 파싱하려면 이 숫자를 사용하여 각 섹션을 반복적으로 파싱해야 한다.

DNS 패킷의 모든 내용(헤더, 질문, 모든 레코드)을 담을 클래스를 만들어 보자.

In [17]:
@dataclass
class DNSPacket:
    header: DNSHeader
    questions: list[DNSQuestion]
    # don't worry about the exact meaning of these 3 record
    # sections for now: we'll use them in Part 3
    answers: list[DNSRecord]
    authorities: list[DNSRecord]
    additionals: list[DNSRecord]

이제 DNS 패킷을 파싱하는 함수를 작성해보자.

In [18]:
def parse_dns_packet(data):
    reader = BytesIO(data)
    header = parse_header(reader)
    questions = [parse_question(reader) for _ in range(header.num_questions)]
    answers = [parse_record(reader) for _ in range(header.num_answers)]
    authorities = [parse_record(reader) for _ in range(header.num_authorities)]
    additionals = [parse_record(reader) for _ in range(header.num_additionals)]

    return DNSPacket(header, questions, answers, authorities, additionals)

In [19]:
packet = parse_dns_packet(response)
packet

DNSPacket(header=DNSHeader(id=37086, flags=33152, num_questions=1, num_answers=5, num_authorities=0, num_additionals=0), questions=[DNSQuestion(name=b'blog.horang.dev', type_=1, class_=1)], answers=[DNSRecord(name=b'blog.horang.dev', type_=5, class_=1, ttl=11362, data=b'\x07chl8469\x06github\x02io\x00'), DNSRecord(name=b'chl8469.github.io', type_=1, class_=1, ttl=562, data=b'\xb9\xc7l\x99'), DNSRecord(name=b'chl8469.github.io', type_=1, class_=1, ttl=562, data=b'\xb9\xc7m\x99'), DNSRecord(name=b'chl8469.github.io', type_=1, class_=1, ttl=562, data=b'\xb9\xc7n\x99'), DNSRecord(name=b'chl8469.github.io', type_=1, class_=1, ttl=562, data=b'\xb9\xc7o\x99')], authorities=[], additionals=[])

In [20]:
for p in packet.answers:
    print(f"name: {p.name}, type: {DNSType(p.type_).name}, data: {p.data}")

name: b'blog.horang.dev', type: CNAME, data: b'\x07chl8469\x06github\x02io\x00'
name: b'chl8469.github.io', type: A, data: b'\xb9\xc7l\x99'
name: b'chl8469.github.io', type: A, data: b'\xb9\xc7m\x99'
name: b'chl8469.github.io', type: A, data: b'\xb9\xc7n\x99'
name: b'chl8469.github.io', type: A, data: b'\xb9\xc7o\x99'


data 부분이 모두 16진수로 출력되는데, 각 type에 따라 다르게 파싱해주어야 한다. A record일 경우 ip를 문자열로 변환해주는 코드를 작성해보자.

In [21]:
def ip_to_string(ip):
    return ".".join([str(x) for x in ip])

In [22]:
for p in packet.answers:
    dns_type = DNSType(p.type_)
    if dns_type == DNSType.A:
        data = ip_to_string(p.data)
    else:
        data = p.data
    print(f"name: {p.name}, type: {dns_type.name}, data: {data}")

name: b'blog.horang.dev', type: CNAME, data: b'\x07chl8469\x06github\x02io\x00'
name: b'chl8469.github.io', type: A, data: 185.199.108.153
name: b'chl8469.github.io', type: A, data: 185.199.109.153
name: b'chl8469.github.io', type: A, data: 185.199.110.153
name: b'chl8469.github.io', type: A, data: 185.199.111.153


# 2.9 최종 테스트

In [23]:
def lookup_domain(domain_name):
    query = build_query(domain_name, DNSType.A)
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.sendto(query, ("8.8.8.8", 53))

    # get the response
    data, _ = sock.recvfrom(1024)
    packet = parse_dns_packet(data)
    for p in packet.answers:
        dns_type = DNSType(p.type_)
        data = p.data
        if dns_type == DNSType.A:
            data = ip_to_string(data)

        print(f"name: {p.name}, type: {dns_type.name}, data: {data}")

In [24]:
lookup_domain("blog.horang.dev")

name: b'blog.horang.dev', type: CNAME, data: b'\x07chl8469\x06github\x02io\x00'
name: b'chl8469.github.io', type: A, data: 185.199.108.153
name: b'chl8469.github.io', type: A, data: 185.199.109.153
name: b'chl8469.github.io', type: A, data: 185.199.110.153
name: b'chl8469.github.io', type: A, data: 185.199.111.153


In [25]:
lookup_domain("www.facebook.com")

name: b'www.facebook.com', type: CNAME, data: b'\tstar-mini\x04c10r\xc0\x10'
name: b'star-mini.c10r.facebook.com', type: A, data: 157.240.31.35


In [26]:
lookup_domain("www.metafilter.com")

name: b'www.metafilter.com', type: CNAME, data: b'\xc0\x10'
name: b'metafilter.com', type: A, data: 54.203.56.158


A record가 아닌 경우에는 data를 그대로 출력한다.

In [27]:
request_message = build_query("www.metafilter.com", DNSType.A)

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

sock.sendto(request_message, ("8.8.8.8", 53))

response, _ = sock.recvfrom(1024)

reader = BytesIO(response)
parse_header(reader)
parse_question(reader)
parse_record(reader)

DNSRecord(name=b'www.metafilter.com', type_=5, class_=1, ttl=103, data=b'\xc0\x10')

CNAME인 경우 data가 도메인 이름이다. 이를 decode_name으로 다시 파싱해주어야 한다.(압축 처리 포함)

In [28]:
def parse_record_cname(reader):
    name = decode_name(reader)
    data = reader.read(10)
    type_, class_, ttl, data_len = struct.unpack("!HHIH", data)
    if type_ == DNSType.CNAME:
        data = decode_name(reader)
    else:
        data = reader.read(data_len)
    return DNSRecord(name, type_, class_, ttl, data)

In [29]:
request_message = build_query("www.metafilter.com", DNSType.A)

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

sock.sendto(request_message, ("8.8.8.8", 53))

response, _ = sock.recvfrom(1024)

reader = BytesIO(response)
parse_header(reader)
parse_question(reader)
parse_record_cname(reader)

DNSRecord(name=b'www.metafilter.com', type_=5, class_=1, ttl=300, data=b'metafilter.com')

In [30]:
request_message = build_query("www.facebook.com", DNSType.A)

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

sock.sendto(request_message, ("8.8.8.8", 53))

response, _ = sock.recvfrom(1024)

reader = BytesIO(response)
parse_header(reader)
parse_question(reader)
parse_record_cname(reader)

DNSRecord(name=b'www.facebook.com', type_=5, class_=1, ttl=1633, data=b'star-mini.c10r.facebook.com')