In [1]:
from collections import defaultdict
from datetime import datetime
import json
import os
import re
from urllib.parse import urlsplit, urlunsplit, parse_qs, urlencode

import requests
from lxml import html as etree

In [2]:
USE_CACHE = True

In [3]:
UA = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.110 Safari/537.36'
BASE_HOST = 'www.bundesrat.de'
BASE_URL = 'https://www.bundesrat.de/'
START_PATH = 'DE/plenum/to-plenum/to-plenum-node.html'
TO_URL = 'https://www.bundesrat.de/SharedDocs/TO/{num}/to-node.html'
TOP_URL = 'https://www.bundesrat.de/SharedDocs/TO/{num}/tops/{top}.html?view=render[StandardBody]'

SESSION_NUM_RE = re.compile(r'(\d+)\. Plenarsitzung')
DATE_RE = re.compile(r'(\d{2}\.\d{2}\.\d{4})')
TIME_RE = re.compile(r'(\d{2}:\d{2}) Uhr')
TOP_NUMBER_RE = re.compile(r'TOP (\d+[a-z]?)')
TOP_SIMPLE_NUMBER_RE = re.compile(r'TOP ([a-z]?)')
# BR 3/18(B)  Beschlussdrucksache  (PDF, 112KB)
DOC_TITLE = re.compile(r'(?:(?P<doc_id>[A-Z]{2,}.*)\s{2})?(?P<doc_kind>.*)\s{2}\((?P<doc_filetype>[A-Z]{3,}),?\s+(?P<doc_filesize>\d+[KMGT]B)\)')

STATES = [
    'Schleswig-Holstein',
    'Hamburg',
    'Mecklenburg-Vorpommern',
    'Niedersachsen',
    'Bremen',
    'Berlin',
    'Brandenburg',
    'Sachsen',
    'Sachsen-Anhalt',
    'Thüringen',
    'Hessen',
    'Nordrhein-Westfalen',
    'Rheinland-Pfalz',
    'Saarland',
    'Baden-Württemberg',
    'Bayern'
]

In [4]:
if USE_CACHE:
    os.makedirs('./_cache', exist_ok=True)

def get(url):
    if USE_CACHE:
        filename = url.split('bundesrat.de/')[1].split('?')[0].replace('/', '_')
        filename = os.path.join('./_cache', filename)
        if os.path.exists(filename):
            with open(filename) as f:
                return f.read()
    text = requests.get(url, headers={'User-Agent': UA}).text
    if USE_CACHE:
        with open(filename, 'w') as f:
            f.write(text)
    return text

In [5]:
def get_sessions():
    root = etree.fromstring(get(BASE_URL + START_PATH))
    for table in root.xpath('.//table'):
        for row in table.xpath('.//tr'):
            cols = row.xpath('./td')
            if len(cols) != 2:
                # Wrong table
                break
            path = cols[0].xpath('.//a')[0].attrib['href']
            path = path.split(';jsessionid=')[0]
            # path is relative to base url
            url = BASE_URL + path
            title = cols[0].text_content()
            session_num = SESSION_NUM_RE.search(title)
            if session_num is None:
                continue
            num = int(session_num.group(1))
            date_time = cols[1].text_content()
            date = DATE_RE.search(date_time).group(1)
            time = TIME_RE.search(date_time).group(1)
            timestamp = datetime.strptime('{} {}'.format(date, time), '%d.%m.%Y %H:%M')
            yield {
                'num': num,
                'timestamp': timestamp.isoformat(),
                'url': url
            }


In [6]:
JSESSIONID_PATH = ';jsessionid='

def fix_url(url):
    result = urlsplit(url)
    parts = list(result)
    if result.scheme == '':
        parts[0] = 'https'
    if result.netloc == '':
        parts[1] = BASE_HOST
    if JSESSIONID_PATH in result.path:
        # Remove session id
        parts[2] = result.path.split(JSESSIONID_PATH)[0]

    if parts[1] == BASE_HOST:
        # Remove useless 'nn' query param
        qs = parse_qs(result.query)
        qs.pop('nn', None)
        parts[3] = urlencode(qs, doseq=True)
    return urlunsplit(parts)


In [7]:
fix_url('DE/plenum/bundesrat-kompakt/18/964/964-pk.html;jsessionid=93D6631A016E47B9E032236FE39F9ED1.2_cid382?nn=4352766#top-2a')

'https://www.bundesrat.de/DE/plenum/bundesrat-kompakt/18/964/964-pk.html#top-2a'

In [8]:
def text_extract(elements):
    return '\n'.join(e.text_content() for e in elements).strip()


def press_release(elements):
    data = {}
    for el in elements:
        if el.tag == 'ul':
            data['links'] = list(link_list_extract(el))
        else:
            data.update(link_extract(el))
    return data

def extract_links(element, transform=None):
    a_els = element.xpath('.//a')
    for a in a_els:
        data = {
            'title': a.text_content(),
            'url': fix_url(a.attrib['href'])
        }
        if transform:
            data = transform(data)
        yield data
        
def link_list_extract(elements, transform=None):
    for el in elements:
        yield from extract_links(el, transform=transform)
    
def link_extract(elements):
    return list(link_list_extract(elements))

FILESIZE_PREFIX = {
    'K': 1024 ** 1,
    'M': 1024 ** 2,
    'G': 1024 ** 3,
}

def document_link_transform(data):
    title = data['title']
    match = DOC_TITLE.search(title)
    if match is not None:
        data.update(match.groupdict())
        if data.get('doc_filesize'):
            data['doc_filesize_bytes'] = int(data['doc_filesize'][:-2]) * FILESIZE_PREFIX[data['doc_filesize'][-2]]
    return data

def document_links(elements):
    return list(link_list_extract(elements, transform=document_link_transform))

def url_extract(element):
    return [x['url'] for x in link_extract(element)]

def get_states(text):
    for state in STATES:
        if state in text:
            yield state

def states_involved(els):
    text = text_extract(els)
    return {
        'text': text,
        'states': list(get_states(text))
    }

def get_committees(els):
    committees = []
    for el in els:
        for abbr in el.xpath('./abbr'):
            text = abbr.text_content()
            if text == 'fdf':
                committees[-1]['leading'] = True
                continue
            committees.append({
                'name': abbr.attrib['title'],
                'abbreviation': text,
                'leading': False
            })
    return committees

def related_tops(els):
    for el in els:
        if el.tag != 'ul':
            continue
        links = link_list_extract(el)
        return [l['title'].replace('TOP ', '') for l in links]


def speech_parser(elements):
    element = elements[0]
    for speech in element.xpath('.//div[@class="rack-teaser"]'):
        data = {
            'name': speech.xpath('.//h3')[0].text_content(),
            'party': speech.xpath('.//p[not(@class)]')[0].text_content(),
            'url': fix_url(speech.xpath('.//a')[0].attrib['href'])   
        }
        try:
            data['state'] = speech.xpath('.//p[@class="bundesland"]')[0].text_content()
        except IndexError:
            pass
        try:
            data['ministry'] = speech.xpath('.//p[@class="ressort"]')[0].text_content()
        except IndexError:
            pass
        
        image = speech.xpath('.//img')
        if image:
            image = image[0]
            data['image_url'] = fix_url(image.attrib['src'])
            data['image_credit'] = image.attrib['title']
        yield data
        
def speech_parser_list(elements):
    return list(speech_parser(elements))

    
TOP_HEADINGS = {
    'Beschlusstenor': 'beschlusstenor',
    'BundesratKOMPAKT': 'press',
    'Vorgang in DIP': 'dip',
    'Drucksachen': 'documents',
    'Länderbeteiligung': 'states_involved',
    'Ausschusszuweisung': 'committee',
    'Bemerkungen': 'notes',
    'Gesetzeskategorie': 'law_category',
}

TOP_PARSERS = {
    'notes': text_extract,
    'beschlusstenor': text_extract,
    'press': press_release,
    'dip': url_extract,
    'documents': document_links,
    'states_involved': states_involved,
    'committee': get_committees,
    'related_tops': related_tops,
    'links': document_links,
    'speeches': speech_parser_list,
    'law_category': text_extract,
}

def parse_top_detail(root):
    heading_elements = defaultdict(list)
    heading = None
    for element in root.xpath('.//div[contains(@class, "top-content-full")]/*'):
        if element.tag == 'h3':
            title = element.text_content().strip()
            heading = TOP_HEADINGS[title]
        elif element.tag == 'div' and element.attrib['class'] == 'related-tops':
            heading = 'related_tops'
            heading_elements[heading].append(element)
        elif element.tag == 'div' and element.attrib['class'] == 'ts-members':
            heading = 'speeches'
            heading_elements[heading].append(element)
        elif element.tag == 'ul' and element.attrib['class'] == 'link-list doc-list' and heading != 'documents':
            heading = 'links'
            heading_elements[heading].append(element)
        else:
            heading_elements[heading].append(element)

    for kind, element_list in heading_elements.items():
        parser = TOP_PARSERS.get(kind)
        if parser is None:
            print('no parser for', kind)
            continue
        yield (kind, parser(element_list))

        
def parse_top(session_number, top_element, top_type='normal'):
    top_number = top_element.xpath('.//h2[@class="top-number"]')[0].text_content()
    if top_type == 'normal':
        top_number = TOP_NUMBER_RE.search(top_number).group(1)
    elif top_type == 'simple':
        # Their internal representation of simple top numbers
        top_number = '999' + TOP_SIMPLE_NUMBER_RE.search(top_number).group(1)
   
    title = top_element.xpath('.//div[@class="top-header-content-box"]//a')[0].text_content()
    data = {
        'number': top_number,
        'title': title
    }
    print(top_number)
    root = etree.fromstring(get(TOP_URL.format(num=session_number, top=top_number)))
    top_details = dict(parse_top_detail(root))
    data.update(top_details)
    return data
        

TOP_SECTIONS = {
    'Beschlüsse im vereinfachten Verfahren': 'simple'
}

def get_session_tops(session_number):
    root = etree.fromstring(get(TO_URL.format(num=session_number)))
    sections = root.xpath('.//div[@class="module type-1 tops"]/div/*')
    top_type = 'normal'
    for section in sections:
        if section.tag == 'ul':
            top_elements = section.xpath('.//div[@class="top-header"]')
            for top_element in top_elements:
                top = parse_top(session_number, top_element, top_type=top_type)
                yield top
        elif section.tag == 'h2':
            section_heading = section.text_content()
            top_type = TOP_SECTIONS[section_heading]
        else:
            raise Exception('Unexpected section tag {}'.format(section.tag))

def get_session_details(sessions):
    for session in sessions:
        print('Session', session['num'])
        session['tops'] = list(get_session_tops(session['num']))
    return sessions

In [9]:
sessions = list(get_sessions())
sessions = get_session_details(sessions)

964
1
2a
2b
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
999a
999b
999c
999d
999e
965
1
2
3
4
5
6
7
8
9
10
11
12a
12b
13a
13b
13c
14a
14b
15
16
17
18
19
20
21
22
23
24
25
26
27
999a
966
1
2
3
4
5
6
7
8
9
10a
10b
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32a
32b
33
34
35
36
37a
37b
38
39
40
41
42
999a
967
1
2
3a
3b
4
5
6
7
8
9
10
11
12a
12b
13
14
15
16
17
18
19
20a
20b
21
22
23
24
25
26
27
28a
28b
28c
29
30
31
32
33
34
35
36a
36b
37
38
39
999a
968
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20a
20b
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37a
37b
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
999a
999b
999c
999d
999e
969
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21a
21b
21c
21d
21e
22a
22b
23
24a
24b
25
26
27
28
29
30
31
32
33
34a
34b
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
999a
999b
999c
999d
999e
999f
999g
999h
999i
999j
999k
999l
999m
999n
999o

In [10]:
with open('sessions.json', 'w') as f:
    json.dump(sessions, f)