# Parallel Computation

In [1]:
import itertools
import multiprocessing as mp
mp.cpu_count()

12

## Parallelize text files downloads
### Exercise 1
Use `ThreadPoolExecutor` to parallelize the text downloads

In [2]:
%mkdir books

import urllib.request as url

source = "https://mmassd.github.io/"
text = [
    "books/hugo.txt",
    "books/proust.txt",
    "books/zola.txt",
    "books/stendhal.txt"
]

In [3]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

download = lambda t: url.urlretrieve(source+t, filename=t)
with ThreadPoolExecutor() as pool:
    pool.map(download, text)

In [4]:
%cd books
!del *.txt
%cd ..
%rmdir books

D:\ETUDES\M1 MAS\M_Python\S3\BigData\books
D:\ETUDES\M1 MAS\M_Python\S3\BigData


## Parallel map
### Exercise 2
Modify the Map Reduce's `mapper` function by adding process name print

In [5]:
def mapper(path):
    print(mp.current_process().name)
    with open(path, 'r') as f:
        return [(word, 1)
                for word in sorted(f.read().replace(".", " ").lower().split())]

## Parallel reduce
### Exercise 3
Write a parallel program that uses the three functions above using `ProcessPoolExecutor`. It reads all the “sample*.txt” files. Map and reduce steps are parallel.

In [6]:
from lorem import text
for i in range(8):
    with open("sample{0:02d}.txt".format(i), "w") as f:
        f.write(text())

In [7]:
%%file iter.py

import multiprocessing as mp
from functools import reduce
import itertools
import glob
from concurrent.futures import ProcessPoolExecutor

def mapper(path):
    print(mp.current_process().name)
    with open(path, 'r') as f:
        return [(word, 1)
                for word in sorted(f.read().replace(".", " ").lower().split())]

def partitioner(pairs):
    partition = dict()
    for key, val in pairs:
        partition[key] = partition.get(key, []) + [val]
    return partition

def reducer(tuplew):
    return tuplew[0], sum(tuplew[1])

if __name__ == "__main__":
    files = sorted(glob.glob('sample0*.txt'))
    with ProcessPoolExecutor() as e:
        count = e.map(
            reducer,
            partitioner(itertools.chain(*e.map(
                mapper, files
            ))).items()
        )
    print(list(count))

Writing iter.py


In [8]:
import sys
!{sys.executable} iter.py
!del -r *.txt
!del iter.py

SpawnProcess-7
SpawnProcess-1
SpawnProcess-2
SpawnProcess-2
SpawnProcess-3
SpawnProcess-4
SpawnProcess-4
SpawnProcess-5
[('adipisci', 55), ('aliquam', 47), ('amet', 54), ('consectetur', 51), ('dolor', 54), ('dolore', 42), ('dolorem', 40), ('eius', 53), ('est', 55), ('etincidunt', 51), ('ipsum', 54), ('labore', 53), ('magnam', 46), ('modi', 49), ('neque', 49), ('non', 47), ('numquam', 54), ('porro', 51), ('quaerat', 46), ('quiquia', 56), ('quisquam', 58), ('sed', 45), ('sit', 43), ('tempora', 45), ('ut', 53), ('velit', 52), ('voluptatem', 46)]


## Increase volume of data

In [9]:
from bs4 import BeautifulSoup
from urllib.error import HTTPError
from urllib.request import *

base_url = "http://www.thelatinlibrary.com/"
home_content = urlopen(base_url)

soup = BeautifulSoup(home_content, "lxml")
author_page_links = soup.find_all("a")
author_pages = [ap["href"] for i, ap in enumerate(author_page_links) if i < 49]

ap_content = list()
for ap in author_pages:
    try: ap_content.append(urlopen(base_url + ap))
    except: continue

book_links = list()
for path, content in zip(author_pages, ap_content):
    author_name = path.split(".")[0]
    ap_soup = BeautifulSoup(content, "lxml")
    book_links += ([link for link in ap_soup.find_all("a", {"href": True}) if author_name in link["href"]])

from urllib.error import HTTPError

num_pages = 100

for i, bl in enumerate(book_links[:num_pages]):
    print("Getting content " + str(i + 1) + " of " + str(num_pages), end="\r", flush=True)
    try:
        content = urlopen(base_url + bl["href"]).read()
        with open(f"book-{i:03d}.dat","wb") as f:
            f.write(content)
    except HTTPError as err:
        print("Unable to retrieve " + bl["href"] + ".")
        continue

from glob import glob
files = glob('book*.dat')
texts = list()
for file in files:
    with open(file,'rb') as f:
        text = f.read()
    texts.append(text)

Getting content 100 of 100

## Extract the text from html and split the text at periods to convert it into sentences.

In [10]:
sentences = list()

for i, text in enumerate(texts):
    print("Document " + str(i + 1) + " of " + str(len(texts)), end="\r", flush=True)
    textSoup = BeautifulSoup(text, "lxml")
    paragraphs = textSoup.find_all("p", attrs={"class":None})
    prepared = ("".join([p.text.strip().lower() for p in paragraphs[1:-1]]))
    for t in prepared.split("."):
        part = "".join([c for c in t if c.isalpha() or c.isspace()])
        sentences.append(part.strip())

# print first and last sentence to check the results
print(sentences[0])
print(sentences[-1])

post emensos insuperabilis expeditionis eventus languentibus partium animis quas periculorum varietas fregerat et laborum nondum tubarum cessante clangore vel milite locato per stationes hibernas fortunae saevientis procellae tempestates alias rebus infudere communibus per multa illa et dira facinora caesaris galli qui ex squalore imo miseriarum in aetatis adultae primitiis ad principale culmen insperato saltu provectus ultra terminos potestatis delatae procurrens asperitate nimia cuncta foedabat



### Exercise 4
Parallelize this last process using `concurrent.futures`.

In [13]:
lower = lambda p: p.text.strip().lower()
sent = lambda l: ("".join(filter(lambda c: c.isalpha() or c.isspace(), l))).strip()


def sent_tokenizer(text):
    paragraphs = BeautifulSoup(text, "lxml").find_all("p",attrs={"class": None})
    return map(sent, ("".join(map(lower, paragraphs[1:-1]))).split("."))

with ThreadPoolExecutor() as t:
    f = t.map(sent_tokenizer, texts)
    sentences_thread = list(itertools.chain(*f))

print(sentences_thread[0])

post emensos insuperabilis expeditionis eventus languentibus partium animis quas periculorum varietas fregerat et laborum nondum tubarum cessante clangore vel milite locato per stationes hibernas fortunae saevientis procellae tempestates alias rebus infudere communibus per multa illa et dira facinora caesaris galli qui ex squalore imo miseriarum in aetatis adultae primitiis ad principale culmen insperato saltu provectus ultra terminos potestatis delatae procurrens asperitate nimia cuncta foedabat


In [14]:
!del *.dat