Skip to content

Commit

Permalink
preprocessing split into 2
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert Meyer committed Feb 25, 2018
1 parent 88e23bb commit a60e75c
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 34 deletions.
107 changes: 73 additions & 34 deletions trufflepig/main.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
import argparse
import concurrent
import gc
import logging
import os
import gc
import concurrent
import time

import pandas as pd

import trufflepig.model as tpmo
import trufflepig.preprocessing as tppp
import trufflepig.bchain.getdata as tpgd
import trufflepig.bchain.postdata as tppd
import trufflepig.model as tpmo
import trufflepig.preprocessing as tppp
import trufflepig.utils as tfut
from trufflepig import config
import argparse
import time

from trufflepig.utils import configure_logging

logger = logging.getLogger(__name__)


def parse_args():
"""Parses command line arguments"""
parser = argparse.ArgumentParser(description='TrufflePig Bot')
parser.add_argument('--broadcast', action="store_false",
default=True)
Expand All @@ -27,31 +28,70 @@ def parse_args():
return args.broadcast, args.now


def configure_logging(directory, current_datetime):
if not os.path.isdir(directory):
os.makedirs(directory)

filename = 'trufflepig_{time}.txt'.format(time=current_datetime.strftime('%Y-%m-%d'))
filename = os.path.join(directory, filename)

format=('%(asctime)s %(processName)s:%(name)s:'
'%(funcName)s:%(lineno)s:%(levelname)s: %(message)s')
handlers = [logging.StreamHandler(), logging.FileHandler(filename)]
logging.basicConfig(level=logging.INFO, format=format,
handlers=handlers)


def large_mp_preprocess(directory, current_datetime, steem_kwargs, data_directory):
configure_logging(directory, current_datetime)
def large_mp_preprocess(log_directory, current_datetime, steem_kwargs, data_directory,
days, offset_days, ncores=16):
"""Helper function to spawn in child process"""
configure_logging(log_directory, current_datetime)
post_frame = tpgd.load_or_scrape_training_data(steem_kwargs, data_directory,
current_datetime=current_datetime,
days=8,
offset_days=8,
ncores=16)
days=days,
offset_days=offset_days,
ncores=ncores)
return tppp.preprocess(post_frame, ncores=3)


def load_and_preprocess_2_frames(log_directory, current_datetime, steem_kwargs,
data_directory, offset_days=8,
days=5, days2=5, ncores=16):
""" Function to load and preprocess the time span split into 2
for better memory footpring
Parameters
----------
log_directory: str
current_datetime: datetime
steem_kwargs: dict
data_directory: str
offset_days: int
days: int
days2: int
ncores: int
Returns
-------
DataFrame
"""
# hack for better memory footprint,
# see https://stackoverflow.com/questions/15455048/releasing-memory-in-python
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
post_frame = executor.submit(large_mp_preprocess,
log_directory=log_directory,
current_datetime=current_datetime,
steem_kwargs=steem_kwargs,
data_directory=data_directory,
days=days, offset_days=offset_days,
ncores=ncores).result()
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
post_frame2 = executor.submit(large_mp_preprocess,
log_directory=log_directory,
current_datetime=current_datetime,
steem_kwargs=steem_kwargs,
data_directory=data_directory,
days=days2, offset_days=8 + days,
ncores=ncores).result()

post_frame = pd.concat([post_frame, post_frame2], axis=0)
# We need to reset the index because due to concatenation
# the defualt indices are duplicates!
post_frame.reset_index(inplace=True, drop=True)
logger.info('Combining 2 frames into 1')
post_frame = tppp.filter_duplicates(post_frame)
return post_frame


def main():
"""Main loop started from command line"""

no_broadcast, current_datetime = parse_args()

Expand All @@ -78,14 +118,13 @@ def main():
tppd.create_wallet(steem_kwargs, config.PASSWORD, config.POSTING_KEY)

if not tpmo.model_exists(current_datetime, model_directoy):
# hack for better memory footprint,
# see https://stackoverflow.com/questions/15455048/releasing-memory-in-python
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
post_frame = executor.submit(large_mp_preprocess,
log_directory,
current_datetime,
steem_kwargs,
data_directory).result()

post_frame = load_and_preprocess_2_frames(
log_directory=log_directory,
current_datetime=current_datetime,
steem_kwargs=steem_kwargs,
data_directory=data_directory
)
logger.info('Garbage collecting')
gc.collect()
else:
Expand Down
22 changes: 22 additions & 0 deletions trufflepig/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,25 @@ def clean_up_directory(directory, keep_last=25):
else:
logger.info('Found only {} files, less than the number to keep '
'({})'.format(nfiles, keep_last))


def configure_logging(directory, current_datetime):
""" Configures logging to stdout and file
Parameters
----------
directory: str
current_datetime: datetime
"""
if not os.path.isdir(directory):
os.makedirs(directory)

filename = 'trufflepig_{time}.txt'.format(time=current_datetime.strftime('%Y-%m-%d'))
filename = os.path.join(directory, filename)

format=('%(asctime)s %(processName)s:%(name)s:'
'%(funcName)s:%(lineno)s:%(levelname)s: %(message)s')
handlers = [logging.StreamHandler(), logging.FileHandler(filename)]
logging.basicConfig(level=logging.INFO, format=format,
handlers=handlers)

0 comments on commit a60e75c

Please sign in to comment.