Permalink
Browse files

replace import futures with import concurrent.futures

  • Loading branch information...
1 parent 7b403b2 commit 613e8554ae23ffee26e20c20d7352fc076eed38e @bianjiang committed Jun 30, 2016
View
@@ -17,7 +17,7 @@
from tweetf0rm.utils import full_stack, node_id, public_ip from tweetf0rm.utils import full_stack, node_id, public_ip
from tweetf0rm.proxies import proxy_checker from tweetf0rm.proxies import proxy_checker
from tweetf0rm.scheduler import Scheduler from tweetf0rm.scheduler import Scheduler
-import time, os, tarfile, futures +import time, os, tarfile, concurrent.futures
def check_config(config): def check_config(config):
if ('apikeys' not in config or 'redis_config' not in config): if ('apikeys' not in config or 'redis_config' not in config):
@@ -128,7 +128,7 @@ def start_server(config, proxies):
if (time.time() - last_archive_ts > 3600): if (time.time() - last_archive_ts > 3600):
logger.info("start archive procedure...") logger.info("start archive procedure...")
- with futures.ProcessPoolExecutor(max_workers=len(buckets)) as executor: + with concurrent.futures.ProcessPoolExecutor(max_workers=len(buckets)) as executor:
future_proxies = {executor.submit(tarball_results, ouput_folder, bucket, archive_output, int(time.time()) - 3600): bucket for bucket in buckets} future_proxies = {executor.submit(tarball_results, ouput_folder, bucket, archive_output, int(time.time()) - 3600): bucket for bucket in buckets}
@@ -10,7 +10,7 @@
from .base_handler import BaseHandler from .base_handler import BaseHandler
import multiprocessing as mp import multiprocessing as mp
-import futures, json, copy, time +import concurrent.futures, json, copy, time
from tweetf0rm.redis_helper import NodeQueue, NodeCoordinator from tweetf0rm.redis_helper import NodeQueue, NodeCoordinator
from tweetf0rm.utils import full_stack, get_keys_by_min_value, hash_cmd from tweetf0rm.utils import full_stack, get_keys_by_min_value, hash_cmd
import json import json
@@ -84,7 +84,7 @@ def need_flush(self, bucket):
def flush(self, bucket): def flush(self, bucket):
logger.debug("i'm getting flushed...") logger.debug("i'm getting flushed...")
- with futures.ProcessPoolExecutor(max_workers=1) as executor: + with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
for k, v in self.buffer[bucket].iteritems(): for k, v in self.buffer[bucket].iteritems():
for s in v: for s in v:
o = json.loads(s) o = json.loads(s)
@@ -10,7 +10,7 @@
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
from .base_handler import BaseHandler from .base_handler import BaseHandler
-import futures, os +import concurrent.futures, os
from tweetf0rm.utils import full_stack from tweetf0rm.utils import full_stack
def flush_file(output_folder, bucket, items): def flush_file(output_folder, bucket, items):
@@ -53,7 +53,7 @@ def need_flush(self, bucket):
def flush(self, bucket): def flush(self, bucket):
- with futures.ProcessPoolExecutor(max_workers=3) as executor: + with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
# for each bucket it's a dict, where the key needs to be the file name; and the value is a list of json encoded value # for each bucket it's a dict, where the key needs to be the file name; and the value is a list of json encoded value
for bucket, items in self.buffer.iteritems(): for bucket, items in self.buffer.iteritems():
View
@@ -7,7 +7,7 @@
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
from tweetf0rm.utils import full_stack from tweetf0rm.utils import full_stack
-import requests, futures +import requests, concurrent.futures
def check_proxy(proxy, timeout): def check_proxy(proxy, timeout):
url = "http://twitter.com" url = "http://twitter.com"
@@ -44,7 +44,7 @@ def proxy_checker(proxies):
results = [] results = []
- with futures.ProcessPoolExecutor(max_workers=mp.cpu_count()*10) as executor: + with concurrent.futures.ProcessPoolExecutor(max_workers=mp.cpu_count()*10) as executor:
future_to_proxy = {executor.submit(check_proxy, proxy, 30): proxy for proxy in proxies if proxy.values()[0] == 'http'} future_to_proxy = {executor.submit(check_proxy, proxy, 30): proxy for proxy in proxies if proxy.values()[0] == 'http'}
@@ -53,7 +53,7 @@ def proxy_checker(proxies):
logger.info('%d http proxies to check'%(len(future_to_proxy))) logger.info('%d http proxies to check'%(len(future_to_proxy)))
- futures.wait(future_to_proxy) + concurrent.futures.wait(future_to_proxy)
# for future in futures.as_completed(future_to_proxy): # for future in futures.as_completed(future_to_proxy):

0 comments on commit 613e855

Please sign in to comment.