### 17.1.1 순차적으로 내려받는 스크립트 
- 동시성을 고려하지 않은 코드이다
- 원저자의 사이트에 flag 정보가 없어서 10000img.com 에서 이미지를 받아오는 예제로 대체

In [8]:
import os
import time
import sys
import time
import requests
from bs4 import BeautifulSoup


COUNT = 25

BASE_URL = 'http://10000img.com'
DEST_DIR = 'downloads/'

def get_img():    
    res = requests.get(f'{BASE_URL}/ran.php')
    soup = BeautifulSoup(res.text, 'html.parser')
    img_src = soup.find('img').get('src')
    img_url = f'{BASE_URL}/{img_src}'
    suffix = img_src.split('.')[-1]
    filename = str(int(time.time()* 1000000)) + '.' + suffix
    res = requests.get(img_url)
    return res.content, filename


def save_img(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


def main():
    t0 = time.time()    
    for n in range(COUNT):
        img, filename = get_img()
        save_img(img, filename)
    
    elapsed = time.time() - t0
    print(f'걸린 시간 {elapsed} 초')

main()

걸린 시간 10.768370151519775 초


In [3]:
import os
import time
from collections import namedtuple

import requests
from bs4 import BeautifulSoup as bs

DEST_DIR = 'downloads/'
IMG_PREFIX = 'sequantial'

Category = namedtuple('Category', 'uid subuid')

cate10 = [1000, 1001, 69, 115, 116, 119, 112]
cate11 = [1101, 1000, 86, 120, 89, 117, 87]
cate22 = [2203, 2201, 203, 201, 2209, 2202]
cate21 = [2103, 201, 2102]
cate16 = [1601, 84, 51, 113, 40]

categories = []
categories += [Category(uid=10, subuid=subuid) for subuid in cate10]
categories += [Category(uid=11, subuid=subuid) for subuid in cate11]
categories += [Category(uid=16, subuid=subuid) for subuid in cate16]
categories += [Category(uid=21, subuid=subuid) for subuid in cate21]
categories += [Category(uid=22, subuid=subuid) for subuid in cate22]

def get_top_banner(category):
    category_url = f'https://page.kakao.com/main?categoryUid={category.uid}&subCategoryUid={category.subuid}'
    res = requests.get(category_url)
    soup = bs(res.text, 'html.parser')
    divs = soup.findAll('div', {'class': {'topBanner'}})
    if (len(divs)):
        img_src = divs[0].find('img').get('src')       
        res = requests.get(f'https:{img_src}')
        return res.content, f'{IMG_PREFIX}-{category.uid}-{category.subuid}-topbanner.png'
    else:
        print(f'no top banner on {category.uid} in {category.subuid}')
        return None, None

def save_img(img, filename):
    if img and filename:
        path = os.path.join(DEST_DIR, filename)
        with open(path, 'wb') as fp:
            fp.write(img)

def download_all(category):    
    for category in categories:
        img, filename = get_top_banner(category)
        print(f'{filename} image download')
        save_img(img, filename)
    return len(category)

def strtime(timestamp):
    return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(timestamp))
        
def main():
    start = time.time()    
    print(f'[{strtime(start)}] start!', )
    count = download_all(categories)
    end = time.time()
    elapsed = end - start
    print(f'[{strtime(end)}] topbanner img downloaded in {elapsed:.3}s')

main()

[2019-06-03 09:25:30] start!
sequantial-10-1000-topbanner.png image download
sequantial-10-1001-topbanner.png image download
sequantial-10-69-topbanner.png image download
sequantial-10-115-topbanner.png image download
sequantial-10-116-topbanner.png image download
sequantial-10-119-topbanner.png image download
sequantial-10-112-topbanner.png image download
sequantial-11-1101-topbanner.png image download
sequantial-11-1000-topbanner.png image download
sequantial-11-86-topbanner.png image download
sequantial-11-120-topbanner.png image download
sequantial-11-89-topbanner.png image download
sequantial-11-117-topbanner.png image download
sequantial-11-87-topbanner.png image download
sequantial-16-1601-topbanner.png image download
sequantial-16-84-topbanner.png image download
sequantial-16-51-topbanner.png image download
sequantial-16-113-topbanner.png image download
sequantial-16-40-topbanner.png image download
sequantial-21-2103-topbanner.png image download
sequantial-21-201-topbanner.png 

### 17.1.2 concurrent.futures 로 내려받기
- concurrent.futures 패키지의 가장 큰 특징은 ThreadPoolExecutor와 ProcessPoolExecutor 클래스이다. 콜러블 객체를 서로다른 스레드나 프로세스에서 실행할 수 있게 해주는 인터페이스를 구현한것. 
- 작업자 스레드(프로세스)를 관리하는 풀과 실행할 작업을 담은 큐를 가지고 있다. 
- 간단한 작업을 하는경우에는 내부 동작을 알 필요는 없다. 
- ThreadPoolExecutor.map() 메서드로 동시에 내려받는 작업을 해보자.

In [11]:
from concurrent import futures
import os
import time
import sys
import time
import requests
from bs4 import BeautifulSoup


MAX_WORKERS = 20
COUNT = 25

BASE_URL = 'http://10000img.com'
DEST_DIR = 'downloads/'

def save_one(n):
    img, filename = get_img()
    save_img(img, filename)

def main():
    t0 = time.time()    
    workers = min(MAX_WORKERS, COUNT)
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(save_one, range(20))
    
    elapsed = time.time() - t0
    print(f'걸린 시간 {elapsed} 초')

main()

걸린 시간 3.046466827392578 초


In [12]:
import time
from concurrent import futures


MAX_WORKERS = 10
IMG_PREFIX = 'threadpool'

def download_one(category):
    img, filename = get_top_banner(category)
    print(f'{filename} image download')
    save_img(img, filename)

def download_all(category):    
    """
    원래 이렇게 되어 있던 부분을 비동기로 변경하면 됨
    map의 인터페이스는 다음과 같다. 
    map(func, *iterables, timeout=None, chunksize=1)
        for category in categories:
            img, filename = get_top_banner(category)
            print(f'{filename} image download')
            save_img(img, filename)
    """
    workers = min(MAX_WORKERS, len(category))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_one, categories)
    return len(category)

        
def main():
    start = time.time()    
    print(f'[{strtime(start)}] thread pool executore start!', )
    count = download_all(categories)
    end = time.time()
    elapsed = end - start
    print(f'[{strtime(end)}] topbanner img downloaded in {elapsed:.3}s')

main()

[2019-06-01 15:10:36] thread pool executore start!
threadpool-11-1000-topbanner.png image download
threadpool-10-1000-topbanner.png image download
threadpool-10-115-topbanner.png image download
threadpool-11-1101-topbanner.png image download
threadpool-10-1001-topbanner.png image download
threadpool-10-119-topbanner.png image download
threadpool-10-116-topbanner.png image download
threadpool-10-112-topbanner.png image download
threadpool-10-69-topbanner.png image download
threadpool-11-86-topbanner.png image download
threadpool-16-1601-topbanner.png image download
threadpool-11-120-topbanner.png image download
threadpool-11-89-topbanner.png image download
threadpool-11-117-topbanner.png image download
threadpool-11-87-topbanner.png image download
threadpool-16-113-topbanner.png image download
threadpool-16-51-topbanner.png image downloadthreadpool-16-84-topbanner.png image download

threadpool-16-40-topbanner.png image download
threadpool-21-2103-topbanner.png image download
no top ban

### 17.1.3 future 는 어디에 있나

In [9]:
import time
from concurrent import futures


MAX_WORKERS = 10
IMG_PREFIX = 'threadpool-with-future3'

def download_one(category):
    img, filename = get_top_banner(category)
    print(f'{filename} image download')
    save_img(img, filename)
    return filename

def download_all(category):    
    """
    원래 map을 썻었는데 submit 을 사용해서 Future 객체를 받아보자
    workers = min(MAX_WORKERS, len(category))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_one, categories)

    """

    workers = min(MAX_WORKERS, len(category))
    with futures.ThreadPoolExecutor(workers) as executor:
        cate_futures = []
        cate_map = {}

        for idx, category in enumerate(categories):
            future = executor.submit(download_one, category)
            print(future)
            cate_futures.append(future)
            cate_map[future] = idx
            print(f'스케줄링 오케 {category}: {future}')
            
        results = []
        for future in futures.as_completed(cate_map):
            res = future.result()
            print(f'{future} : {res!r}')
            results.append(res)
    return len(category)

        
def main():
    start = time.time()    
    print(f'[{strtime(start)}] thread pool executore start!', )
    count = download_all(categories)
    end = time.time()
    elapsed = end - start
    print(f'[{strtime(end)}] topbanner img downloaded in {elapsed:.3}s')

main()

[2019-06-03 09:43:20] thread pool executore start!
<Future at 0x10b79c6d8 state=running>
스케줄링 오케 Category(uid=10, subuid=1000): <Future at 0x10b79c6d8 state=running>
<Future at 0x10c030518 state=running>
스케줄링 오케 Category(uid=10, subuid=1001): <Future at 0x10c030518 state=running>
<Future at 0x10b79c278 state=running>
스케줄링 오케 Category(uid=10, subuid=69): <Future at 0x10b79c278 state=running>
<Future at 0x10c030940 state=running>
스케줄링 오케 Category(uid=10, subuid=115): <Future at 0x10c030940 state=running>
<Future at 0x10bb1bc50 state=running>
스케줄링 오케 Category(uid=10, subuid=116): <Future at 0x10bb1bc50 state=running>
<Future at 0x10bb1b198 state=running>
스케줄링 오케 Category(uid=10, subuid=119): <Future at 0x10bb1b198 state=running>
<Future at 0x10b684710 state=running>
스케줄링 오케 Category(uid=10, subuid=112): <Future at 0x10b684710 state=running>
<Future at 0x10b684b70 state=running>
스케줄링 오케 Category(uid=11, subuid=1101): <Future at 0x10b684b70 state=running>
<Future at 0x10b79ce48 state=runnin

## 17.3 concurrent.futures 로 프로세스 실행하기

In [10]:

IMG_PREFIX = 'processpool-with-future'

def download_all(category):    
    """
    쓰레드 풀을 프로세스 풀로 변경해보자. 원래코드는 아래와 같다. 
    한줄만 변경하면 되니 정말 편한것 같다. 
    with futures.ThreadPoolExecutor(workers) as executor:     
    """

    workers = min(MAX_WORKERS, len(category))
    with futures.ProcessPoolExecutor() as executor:
        cate_futures = []
        for category in categories:
            future = executor.submit(download_one, category)
            cate_futures.append(future)
            print(f'프로세스 풀 스케줄링 오케 {category}: {future}')
            
        results = []
        for future in futures.as_completed(cate_futures):
            res = future.result()
            print(f'{future} : {res!r}')
            results.append(res)
    return len(category)

        
def main():
    start = time.time()    
    print(f'[{strtime(start)}] process pool executore start!', )
    count = download_all(categories)
    end = time.time()
    elapsed = end - start
    print(f'[{strtime(end)}] topbanner img downloaded in {elapsed:.3}s')

main()

[2019-06-03 09:47:01] process pool executore start!
프로세스 풀 스케줄링 오케 Category(uid=10, subuid=1000): <Future at 0x10b9262e8 state=running>
프로세스 풀 스케줄링 오케 Category(uid=10, subuid=1001): <Future at 0x10af5dc50 state=pending>
프로세스 풀 스케줄링 오케 Category(uid=10, subuid=69): <Future at 0x10c08c550 state=pending>
프로세스 풀 스케줄링 오케 Category(uid=10, subuid=115): <Future at 0x10ac94828 state=pending>
프로세스 풀 스케줄링 오케 Category(uid=10, subuid=116): <Future at 0x10ac94710 state=pending>
프로세스 풀 스케줄링 오케 Category(uid=10, subuid=119): <Future at 0x10bb99d30 state=pending>
프로세스 풀 스케줄링 오케 Category(uid=10, subuid=112): <Future at 0x10bb99ac8 state=pending>
프로세스 풀 스케줄링 오케 Category(uid=11, subuid=1101): <Future at 0x10bb99eb8 state=pending>
프로세스 풀 스케줄링 오케 Category(uid=11, subuid=1000): <Future at 0x10bb99fd0 state=pending>
프로세스 풀 스케줄링 오케 Category(uid=11, subuid=86): <Future at 0x10bb99f60 state=pending>
프로세스 풀 스케줄링 오케 Category(uid=11, subuid=120): <Future at 0x10c2cdfd0 state=pending>
프로세스 풀 스케줄링 오케 Category(uid=11, s


계산이 많은 경우에 ProcessPool을 사용하면 빨라지는지 테스트 해보자 

In [13]:
import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def prime_process_pool_main():
    start = time.time()
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))
    elapsed = time.time() - start
    print(f'elapsed time {elapsed:.2}')


prime_process_pool_main()

112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False
elapsed time 0.88


In [16]:
def prime_thread_pool_main():
    start = time.time()
    with concurrent.futures.ThreadPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))
    elapsed = time.time() - start
    print(f'elapsed time {elapsed:.2}')

prime_thread_pool_main()

112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False
elapsed time 2.9


In [19]:
def main():
    start = time.time()
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))
    elapsed = time.time() - start
    print(f'elapsed time {elapsed:.2}')

main()

112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False
elapsed time 0.79


## 17.4 Executor.map() 실험

In [14]:
from time import sleep, strftime
from concurrent import futures
def display(*args):
    print(strftime('[%H:%M:%S]'), end= ' ')
    print(*args)

def loiter(n):
    msg = '{}loiter({}) : doing nothing for {}s...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done.'
    display(msg.format('\t'*n, n))
    return n * 10

def main():
    display('Script starting')
    executor = futures.ThreadPoolExecutor(max_workers=3)
    results = executor.map(loiter, range(5))
    display('results : ', results)
    display('Waiting for individual results : ')
    for i, result in enumerate(results):
        display('result {}: {}'.format(i, result))
        
main()

[18:56:22] Script starting
[18:56:22] loiter(0) : doing nothing for 0s...
[18:56:22][18:56:22] loiter(0): done.
 	loiter(1) : doing nothing for 1s...
[18:56:22] 		loiter(2) : doing nothing for 2s...
[18:56:22][18:56:22] 			loiter(3) : doing nothing for 3s...
 results :  <generator object Executor.map.<locals>.result_iterator at 0x10b11c830>
[18:56:22] Waiting for individual results : 
[18:56:22] result 0: 0
[18:56:23] 	loiter(1): done.
[18:56:23][18:56:23] result 1: 10
 				loiter(4) : doing nothing for 4s...
[18:56:24] 		loiter(2): done.
[18:56:24] result 2: 20
[18:56:25] 			loiter(3): done.
[18:56:25] result 3: 30
[18:56:27] 				loiter(4): done.
[18:56:27] result 4: 40


## 17.5 진행 상황 출력하고 에러를 처리하며 내려받기

In [15]:
import time
from tqdm import tqdm_notebook as tqdm

for i in tqdm(range(1000)):
    time.sleep(.01)

HBox(children=(IntProgress(value=0, max=1000), HTML(value='')))




In [16]:
import time
import collections
from collections import namedtuple
from enum import Enum

Result = namedtuple('Result', 'status data')
HTTPStatus = Enum('Status', 'ok not_found error')

from tqdm import tqdm_notebook as tqdm

def get_top_banner(category):
    category_url = f'https://page.kakao.com/main?categoryUid={category.uid}&subCategoryUid={category.subuid}'
    res = requests.get(category_url)
    if res.status_code != 200:
        res.raise_for_status()
        
    soup = bs(res.text, 'html.parser')
    divs = soup.findAll('div', {'class': {'topBanner'}})
    if (len(divs)):
        img_src = divs[0].find('img').get('src')        
        return res.content, f'{IMG_PREFIX}-{category.uid}-{category.subuid}-topbanner.png'
    else:
        print(f'no top banner on {category.uid} in {category.subuid}')
        return None, None
    

def download_one(category, verbose=False):
    try:
        img, filename = get_top_banner(category)
    except requests.exceptions.HTTPError as exc:
        res = exc.response
        if res.status_code == 404:
            status = HTTPS.not_found
            msg = 'page not found'
        else:
            raise
    else:
        save_img(img, filename)
        status = HTTPStatus.ok
        msg = 'ok'
    
    if verbose:
        print(category, msg)
    return Result(status, category)


def download_all(categories, verbose=False, max_req=1000):
    counter = collections.Counter()
    
    if not verbose:
        categories = tqdm(categories)
    
    for category in categories:
        try:
            res = download_one(category, verbose)
        except requests.exceptions.HTTPError as exc:
            error_msg = f'HTTP error {exc.response.status_code} - {exc.response.reason}'
            print(error_msg)
        except requests.exceptions.ConnectionError as exc:
            error_msg = 'Connection error'
            print(error_msg)
        else:
            error_msg = ' '
            status = res.status
        
        if error_msg:
            status = HTTPStatus.error
            counter[status] += 1
            if verbose and error_msg:
                print(f'*** Error for {category} : {error_msg}')
            
    return counter

def strtime(timestamp):
    return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(timestamp))


def main():
    start = time.time()    
    print(f'[{strtime(start)}] process pool executore start!', )
    
    # 에러 상황 재현을 위해 넣음
    # categories.append(Category(uid=100, subuid=11111))
    count = download_all(categories)
    end = time.time()
    elapsed = end - start
    print(f'[{strtime(end)}] topbanner img downloaded in {elapsed:.3}s')

main()

[2019-06-03 09:57:11] process pool executore start!


HBox(children=(IntProgress(value=0, max=28), HTML(value='')))

no top banner on 22 in 2203

[2019-06-03 09:57:17] topbanner img downloaded in 6.58s


In [17]:
import collections
from tqdm import tqdm_notebook as tqdm

IMG_PREFIX = 'threadpool-with-error'

def download_all(categories, verbose=False, concur_req=10):
    counter = collections.Counter()
    with futures.ThreadPoolExecutor(max_workers=concur_req) as executor:
        todo_map = {} # map 으로 future를 저장한다.
        
        for category in categories:
            future = executor.submit(download_one, category, verbose)
            todo_map[future] = category
        
        done_iter = futures.as_completed(todo_map)
        
        if not verbose:
            done_iter = tqdm(done_iter, total=len(categories))
        
        for future in done_iter:
            try:
                res = future.result()
            except requests.exceptions.HTTPError as exc:
                res = exc.response
                error_msg = f'HTTP {res.status_code} - {res.reason}'
            except requests.exceptions.ConnectionError as exc:
                error_msg = 'Connection error'
            else:
                error_msg = ''
                status = res.status
            
            if error_msg:
                status = HTTPStatus.error
            
            counter[status] += 1
            
            if verbose and error_msg:
                category = todo_map[future]
                print(f'*** Error for {category} : {error_msg}')
        return counter

main()       

[2019-06-03 09:57:48] process pool executore start!


HBox(children=(IntProgress(value=0, max=28), HTML(value='')))

no top banner on 22 in 2203

[2019-06-03 09:57:50] topbanner img downloaded in 2.18s
