# Crawler

## dtacrawl

In [1]:
import json
import random 
import time 
import logging
import argparse
from urllib.parse import urlunsplit
from urllib.parse import urlsplit
from urllib.parse import urlencode
from urllib.request import urlopen

logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO, datefmt="%H:%M:%S")

tei_baseurl = 'https://www.deutschestextarchiv.de/book/download_xml'
query_baseurl = 'https://kaskade.dwds.de/dstar/dta/dstar.perl'
tcf_baseurl = 'https://www.deutschestextarchiv.de/book/download_fulltcf'

def tei_url(basename):
    return tei_baseurl + "/" + basename

def tcf_url(id):
    return tcf_baseurl + "/" + id

def query_url(query):
    parts = list(urlsplit(query_baseurl))
    parts[3] = urlencode(query)
    return urlunsplit(parts)

print(tei_url('brehm_thierleben05_1869'))
print(tcf_url('12345'))
print(query_url({'q': 'abc'}))

https://www.deutschestextarchiv.de/book/download_xml/brehm_thierleben05_1869
https://www.deutschestextarchiv.de/book/download_fulltcf/12345
https://kaskade.dwds.de/dstar/dta/dstar.perl?q=abc


In [2]:
def download(url):
    secs = random.uniform(0.5, 1.5)
    time.sleep(secs)
    logging.info(f"downloading {url} after waiting {secs}s")
    with urlopen(url) as f:
        return f.read() 

def download_to(url, out):
    with open(out, 'wb') as f:
        f.write(download(url))
        
def query(q):
    return json.loads(download(query_url(q)))


In [3]:
def dtaids(max, q):
    ids = {}
    start = 1
    while len(ids) < max:
        q["limit"] = max 
        q["start"] = start 
        q["fmt"] = "json"
        hits = query(q)
        for hit in hits["hits_"]:
            id = hit["meta_"]["dtaid"]
            if id not in ids:
                ids[id] = hit["meta_"]["basename"]
            if len(ids) == max:
                return ids 
        start = max + start
    return ids

print(dtaids(3, {'q': 'Axolotl'}))

13:38:55: downloading https://kaskade.dwds.de/dstar/dta/dstar.perl?q=Axolotl&limit=3&start=1&fmt=json after waiting 0.962889848950359s
13:38:57: downloading https://kaskade.dwds.de/dstar/dta/dstar.perl?q=Axolotl&limit=3&start=4&fmt=json after waiting 1.1984669045074616s


{'25164': 'haeckel_schoepfungsgeschichte_1868', '16241': 'weismann_keimplasma_1892', '25165': 'brehm_thierleben05_1869'}


In [4]:
    ids = dtaids(3, {'q': 'Axolotl'})
    for id in ids:
        download_to(tei_url(ids[id]), f"out/{id}.tei.xml")
        download_to(tcf_url(id), f"out/{id}.tcf.xml")
    logging.info('done')

13:39:03: downloading https://kaskade.dwds.de/dstar/dta/dstar.perl?q=Axolotl&limit=3&start=1&fmt=json after waiting 0.8940787692882608s
13:39:05: downloading https://kaskade.dwds.de/dstar/dta/dstar.perl?q=Axolotl&limit=3&start=4&fmt=json after waiting 1.1000495896616407s
13:39:07: downloading https://www.deutschestextarchiv.de/book/download_xml/haeckel_schoepfungsgeschichte_1868 after waiting 1.4928216765322873s
13:39:10: downloading https://www.deutschestextarchiv.de/book/download_fulltcf/25164 after waiting 0.5720735605787123s
13:39:28: downloading https://www.deutschestextarchiv.de/book/download_xml/weismann_keimplasma_1892 after waiting 0.5800219578988406s
13:39:30: downloading https://www.deutschestextarchiv.de/book/download_fulltcf/16241 after waiting 0.827034519889987s
13:39:47: downloading https://www.deutschestextarchiv.de/book/download_xml/brehm_thierleben05_1869 after waiting 0.567877499494177s
13:39:52: downloading https://www.deutschestextarchiv.de/book/download_fulltcf/25

## Asynchroner Crawler
* lineares Vorgehen:
  1. lade alle Suchergebnisse herunter
  2. lade die zugeh&ouml;rigen Dateien herunter
* asynchrones Vorgehen:
  * lade die Suchergebnisse herunter
  * parallel dazu lade die Dateien herunter sobald Suchergebnisse vorhanden sind.

### Threads
* asynchrone Pfade durch den Code
* Threads laufen gleichzeitig auf verschiedenen CPU's
* Reihenfolge der Threads ist nicht deterministisch
* Kommunikation zwischen den Threads muss synchronisiert werden (Mutex, atomare Variablen ...)
* unsynchronisierter Zugriff auf gemeinsame Daten f&uuml;hrt zu Problemen

In [30]:
import concurrent.futures

def thread(name, max):
    for i in range(max):
        time.sleep(random.uniform(0.5, 1.5))
        logging.info(f'{name} producing {i}')

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    executor.submit(thread, "Thread 1", 3)
    executor.submit(thread, "Thread 2", 5)
    executor.submit(thread, "Thread 3", 2)
logging.info('done')


12:37:18: Thread 3 producing 0
12:37:18: Thread 2 producing 0
12:37:19: Thread 3 producing 1
12:37:19: Thread 1 producing 0
12:37:19: Thread 2 producing 1
12:37:20: Thread 1 producing 1
12:37:21: Thread 2 producing 2
12:37:21: Thread 1 producing 2
12:37:22: Thread 2 producing 3
12:37:23: Thread 2 producing 4
12:37:23: done


### Wettlaufsituation (Race Condition)

In [31]:
deposit = 50
    
def withdraw(amount):
    global deposit
    while True:
        if deposit >= amount:
            time.sleep(random.uniform(0.5, 1.5))
            deposit = deposit - amount
            logging.info(f'deposit: {deposit}')
        else:
            return
        
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(withdraw, 13)
    executor.submit(withdraw, 17)
logging.info(f'final deposit: {deposit}')


12:37:24: deposit: 33
12:37:24: deposit: 20
12:37:24: deposit: 3
12:37:25: deposit: -10
12:37:25: final deposit: -10


### Synchronisation mit Locks

In [32]:
import threading

locked_deposit = 50
lock = threading.Lock()
    
def withdraw(amount):
    global locked_deposit
    global lock
    while True:       
        lock.acquire()
        if locked_deposit >= amount:
            time.sleep(random.uniform(0.5, 1.5))
            locked_deposit = locked_deposit - amount
            logging.info(f'deposit: {locked_deposit}')
            lock.release()
        else:
            lock.release()
            return
        
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(withdraw, 13)
    executor.submit(withdraw, 17)
logging.info(f'final deposit: {locked_deposit}')


12:37:26: deposit: 37
12:37:26: deposit: 24
12:37:27: deposit: 11
12:37:27: final deposit: 11


### Locks mit `with ...`

In [33]:
import threading

locked_deposit = 50
lock = threading.Lock()
    
def withdraw(amount):
    global locked_deposit
    global lock
    while True:
        with lock:    
            if locked_deposit >= amount:
                time.sleep(random.uniform(0.5, 1.5))
                locked_deposit = locked_deposit - amount
                logging.info(f'deposit: {locked_deposit}')
            else:
                return
        
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(withdraw, 13)
    executor.submit(withdraw, 17)
logging.info(f'final deposit: {locked_deposit}')


12:37:28: deposit: 37
12:37:29: deposit: 24
12:37:30: deposit: 11
12:37:30: final deposit: 11


### Erzeuger-Verbraucher (Consumer Producer) Threading
* Erzeuger produzieren Daten
* Verbraucher verarbeiten die Daten weiter
* Verbraucher und Erzeuger laufen in verschiedenen Threads
* Kommunikation (und Synchronisation) der Erzeuger und Verbraucher &uuml;ber Queues
* je nach Anwendung verschiedene Anzahlen von Erzeugern und Verbrauchern

### Queue
* FiFo (First in, first out) Datenstruktur
* Elemente werden in der Reihenfolge heraus genommen in der sie eingef&uuml;gt werden

![Queue](https://upload.wikimedia.org/wikipedia/commons/thumb/5/52/Data_Queue.svg/300px-Data_Queue.svg.png)

Quelle: https://en.wikipedia.org/wiki/Queue_(abstract_data_type)#/media/File:Data_Queue.svg

### Pipeline
* verwendet Pyton Queue implementierung als Basis
* dient der Kommunikation zwischen Erzeuger und Verbraucher
* Schließen der Pipline signalisiert Ende der Arbeit 

In [34]:
import queue
class Pipeline(queue.Queue):
    def __init__(self):
        super().__init__(maxsize=10)
        
    def close(self):
        self.put((None, None)) # insert sentry
        logging.info('pipeline closed')
        
    def add_url(self, url, out):
        self.put((url, out))

    def get_url(self):
        ret = self.get()
        if ret == (None, None):
            self.put(ret) # reinsert sentry
            return (None, None, False)
        return (ret[0], ret[1], True)

### Verbraucher
* ließt urls aus der Pipline
* l&auml;d die entsprechenden Dateien herunter
* mehrere parallele Verbraucher

In [35]:
def consumer(pipeline):
    while True:
        url, out, ok = pipeline.get_url()
        if not ok:
            return 
        download_to(url, out)

### Erzeuger (Producer)
* stellt Suchanfragen
* schreibt die URL's (tcf und tei) in die Pipeline
* signalisiert Ende der Arbeit an die Verbraucher durch Schließen der Pipeline
* nur ein Erzeuger

In [36]:
def producer(pipeline, out, max, q):
    ids = set()
    start = 1
    while len(ids) < max:
        q["limit"] = max 
        q["start"] = start 
        q["fmt"] = "json"
        hits = query(q)
        for hit in hits["hits_"]:
            id = hit["meta_"]["dtaid"]
            basename = hit["meta_"]["basename"]
            if id not in ids:
                ids.add(id)
                pipeline.add_url(tei_url(basename), os.path.join(out, f'{id}.tei.xml'))
                pipeline.add_url(tcf_url(id), os.path.join(out, f'{id}.tcf.xml'))
            if len(ids) == max:
                pipeline.close()
                return 
        start = max + start
    pipeline.close()

### Asynchroner Crawler

In [37]:
pipeline = Pipeline()
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    executor.submit(producer, pipeline, 'out', 3, {'q': 'Axolotl'})
    executor.submit(consumer, pipeline)
    executor.submit(consumer, pipeline)
    executor.submit(consumer, pipeline)
logging.info('done')

12:37:31: downloading https://kaskade.dwds.de/dstar/dta/dstar.perl?q=Axolotl&limit=3&start=1&fmt=json after waiting 1.2165473930914488s
12:37:33: downloading https://kaskade.dwds.de/dstar/dta/dstar.perl?q=Axolotl&limit=3&start=4&fmt=json after waiting 0.720621800062569s
12:37:34: downloading https://www.deutschestextarchiv.de/book/download_xml/haeckel_schoepfungsgeschichte_1868 after waiting 1.1263338779875214s
12:37:34: downloading https://www.deutschestextarchiv.de/book/download_fulltcf/25164 after waiting 1.2757983404771853s
12:37:35: pipeline closed
12:37:35: downloading https://www.deutschestextarchiv.de/book/download_xml/weismann_keimplasma_1892 after waiting 0.8832671798407791s
12:37:36: downloading https://www.deutschestextarchiv.de/book/download_fulltcf/16241 after waiting 0.6964723714718375s
12:37:38: downloading https://www.deutschestextarchiv.de/book/download_xml/brehm_thierleben05_1869 after waiting 1.3832605756182033s
12:37:49: downloading https://www.deutschestextarchiv.

### Pipeline als Kontextmanagerobjekt
* die `with ... as ...` Syntax erm&ouml;glicht das automatische Schließen von Resourcen (Dateihandel...)
* mit `contextmanager` Objekten k&ouml;nnen eigene Klassen mit `with ... as ...` verwendet werden
* `contextmanager` Objekte in Python m&uuml;ssen zwei Metoden implementieren
  1. `__enter__` gibt das mit `as` referenzierte Objekt zur&uuml;ck (wird automatisch geschlossen)
  2. `__exit__` steuert die Fehlerbehandlung
* genaueres in der Python [Dokumentation](https://docs.python.org/3/library/stdtypes.html#typecontextmanager)

In [39]:
class PipelineCM(Pipeline):
    def __init__(self):
        super().__init__()
    def __enter__(self):
        return self 
    def __exit__(self, et, ev, etb):
        self.close()
        return False

def producer2(pipeline, out, max, q):
    ids = set()
    start = 1
    with pipeline as p:
        while len(ids) < max:
            q["limit"] = max 
            q["start"] = start 
            q["fmt"] = "json"
            hits = query(q)
            for hit in hits["hits_"]:
                id = hit["meta_"]["dtaid"]
                basename = hit["meta_"]["basename"]
                if id not in ids:
                    ids.add(id)
                    p.add_url(tei_url(basename), os.path.join(out, f'{id}.tei.xml'))
                    p.add_url(tcf_url(id), os.path.join(out, f'{id}.tcf.xml'))
                if len(ids) == max:
                    return 
            start = max + start 

pipeline = PipelineCM()
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    executor.submit(producer2, pipeline, 'out', 2, {'q': 'Axolotl'})
    executor.submit(consumer, pipeline)
    executor.submit(consumer, pipeline)
    executor.submit(consumer, pipeline)
logging.info('done')

12:40:05: downloading https://kaskade.dwds.de/dstar/dta/dstar.perl?q=Axolotl&limit=2&start=1&fmt=json after waiting 0.6806135555473207s
12:40:07: downloading https://kaskade.dwds.de/dstar/dta/dstar.perl?q=Axolotl&limit=2&start=3&fmt=json after waiting 0.6222352863458483s
12:40:08: downloading https://www.deutschestextarchiv.de/book/download_fulltcf/25164 after waiting 1.1192738121344763s
12:40:08: downloading https://www.deutschestextarchiv.de/book/download_xml/haeckel_schoepfungsgeschichte_1868 after waiting 1.369165716621077s
12:40:08: pipeline closed
12:40:09: downloading https://www.deutschestextarchiv.de/book/download_xml/weismann_keimplasma_1892 after waiting 1.2807447293837313s
12:40:10: downloading https://www.deutschestextarchiv.de/book/download_fulltcf/16241 after waiting 0.6589070988028538s
12:41:12: done


## time

| Name | time - misst die Laufzeit von Programmen |
|:---|:---|
|Überblick| time \[OPTION\]... \[CMD\] \[ARGS\]... |
|Beschreibung | Misst die Laufzeit von Programmen |
| Wichtige Optionen: | |
| -v, --verbose | detailierte Ausgabe |

In [17]:
%%bash
time python3 dtacrawl.py --max 2 --dir out 'Axolotl'

11:53:05: downloading https://kaskade.dwds.de/dstar/dta/dstar.perl?q=Axolotl&limit=2&start=1&fmt=json after waiting 1.4059197141109405s
11:53:08: downloading https://kaskade.dwds.de/dstar/dta/dstar.perl?q=Axolotl&limit=2&start=3&fmt=json after waiting 1.4318023567791403s
11:53:10: downloading https://www.deutschestextarchiv.de/book/download_xml/haeckel_schoepfungsgeschichte_1868 after waiting 0.6747291360172484s
11:53:12: downloading https://www.deutschestextarchiv.de/book/download_fulltcf/25164 after waiting 0.5035658474273219s
11:53:33: downloading https://www.deutschestextarchiv.de/book/download_xml/weismann_keimplasma_1892 after waiting 1.1299730885032393s
11:53:34: downloading https://www.deutschestextarchiv.de/book/download_fulltcf/16241 after waiting 0.5336663175396948s

real	0m47.864s
user	0m1.821s
sys	0m1.467s


In [19]:
%%bash
time python3 dtacrawl_async.py --max 2 --dir out 'Axolotl'

Traceback (most recent call last):
  File "dtacrawl_async.py", line 108, in <module>
    executor.submit(producer, pipeline, args.dir, args.max, {'q': args.query[0]})
NameError: name 'producer' is not defined

real	0m0.133s
user	0m0.123s
sys	0m0.011s


CalledProcessError: Command 'b"time python3 dtacrawl_async.py --max 2 --dir out 'Axolotl'\n"' returned non-zero exit status 1.