Skip to content

Commit

Permalink
New parallel data loading
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert Meyer committed Feb 11, 2018
1 parent b25bf36 commit a8c1215
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 7 deletions.
29 changes: 22 additions & 7 deletions integration_tests/bchain/getdata_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import tempfile

import pandas as pd
from pandas.testing import assert_frame_equal
from steem import Steem
from steem.blockchain import Blockchain

Expand All @@ -20,6 +21,11 @@ def bchain(steem):
return Blockchain(steem)


@pytest.fixture
def temp_dir(tmpdir_factory):
return tmpdir_factory.mktemp('test', numbered=True)


def test_get_headers(steem, bchain):
offset = bchain.get_current_block_num()
now = pd.datetime.utcnow()
Expand Down Expand Up @@ -51,16 +57,25 @@ def test_get_all_posts_between(steem):
assert posts


def test_scrape_date(steem):
def test_scrape_date(steem, temp_dir):
yesterday = (pd.datetime.utcnow() - pd.Timedelta(days=1)).date()

directory = tempfile.mkdtemp()
tpbg.scrape_or_load_full_day(yesterday, steem, directory, stop_after=25)
p1 = tpbg.scrape_or_load_full_day(yesterday, steem, temp_dir, stop_after=25)

assert len(os.listdir(temp_dir)) == 1

p2 = tpbg.scrape_or_load_full_day(yesterday, steem, temp_dir, stop_after=25)

assert len(os.listdir(directory)) == 1
assert len(os.listdir(temp_dir)) == 1

tpbg.scrape_or_load_full_day(yesterday, steem, directory, stop_after=25)
assert_frame_equal(p1, p2)
assert len(p1) > 0

assert len(os.listdir(directory)) == 1

shutil.rmtree(directory)
def test_scrape_or_load_data_parallel(temp_dir):
frames = tpbg.scrape_or_load_training_data_parallel([config.NODE_URL],
temp_dir,
days=5,
stop_after=10,
ncores=5)
assert len(frames) == 5
50 changes: 50 additions & 0 deletions trufflepig/bchain/getdata.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
import os
import multiprocessing as mp
from collections import OrderedDict

import pandas as pd
from steem import Steem
from steem.blockchain import Blockchain
from steem.post import Post
import json
Expand Down Expand Up @@ -248,3 +250,51 @@ def scrape_or_load_full_day(date, steem, directory, overwrite=False,
logger.info('Storing file {} to disk'.format(filename))
post_frame.to_pickle(filename, compression='gzip')
return post_frame


def scrape_or_load_full_day_mp(date, node_urls, directory, overwrite=False,
store=True,
stop_after=None):
steem = Steem(nodes=node_urls)
return scrape_or_load_full_day(date=date,
steem=steem,
directory=directory,
overwrite=overwrite,
store=store,
stop_after=stop_after)


def config_mp_logging():
logging.basicConfig(level=logging.INFO)


def scrape_or_load_training_data_parallel(node_urls, directory,
days=20, offset=8,
ncores=10,
current_datetime=None,
stop_after=None):
ctx = mp.get_context('fork')
pool = ctx.Pool(ncores, initializer=config_mp_logging)

if current_datetime is None:
current_datetime = pd.datetime.utcnow()

start_datetime = current_datetime - pd.Timedelta(days=days + offset)

async_results = []
for day in range(days):
next_date = (start_datetime + pd.Timedelta(days=day)).date()
result = pool.apply_async(scrape_or_load_full_day_mp,
args=(next_date, node_urls, directory,
False, True, stop_after))
async_results.append(result)

pool.close()

frames = []
for async in async_results:
frames.append(async.get(timeout=3600*6))

pool.join()

return frames

0 comments on commit a8c1215

Please sign in to comment.