Skip to content

Commit

Permalink
Parallel perceptron training – work in progress
Browse files Browse the repository at this point in the history
Now it runs, but fails on first job.
  • Loading branch information
tuetschek committed Sep 11, 2014
1 parent 68fc0ff commit b9ffc2c
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 81 deletions.
23 changes: 20 additions & 3 deletions run_tgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
- arguments: [-p prune_threshold] train-das train-ttrees output-model
percrank_train -- train perceptron global ranker
- arguments: [-d debug-output] [-c candgen-model] [-s data-portion] ranker-config train-das train-ttrees output-model
- arguments: [-d debug-output] [-c candgen-model] [-s data-portion] [-j parallel-jobs] [-w parallel-work-dir] \\
ranker-config train-das train-ttrees output-model
sample_gen -- generate using the given candidate generator
- arguments: [-n trees-per-da] [-o oracle-eval-ttrees] [-w output-ttrees] candgen-model test-das
Expand All @@ -26,6 +27,7 @@
import sys
from getopt import getopt
import platform
import os

from alex.components.nlg.tectotpl.core.util import file_stream
from alex.components.nlg.tectotpl.core.document import Document
Expand All @@ -41,6 +43,7 @@
from tgen.planner import SamplingPlanner, ASearchPlanner
from tgen.eval import p_r_f1_from_counts, tp_fp_fn, f1_from_counts, ASearchListsAnalyzer, \
EvalTypes, Evaluator
from tgen.parallel_percrank_train import ParallelPerceptronRanker


def candgen_train(args):
Expand All @@ -61,9 +64,12 @@ def candgen_train(args):


def percrank_train(args):
opts, files = getopt(args, 'c:d:s:')
opts, files = getopt(args, 'c:d:s:j:w:')
candgen_model = None
train_size = 1.0
parallel = False
jobs_number = 0
work_dir = None

for opt, arg in opts:
if opt == '-d':
Expand All @@ -72,6 +78,11 @@ def percrank_train(args):
train_size = float(arg)
elif opt == '-c':
candgen_model = arg
elif opt == '-j':
parallel = True
jobs_number = int(arg)
elif opt == '-w':
work_dir = arg

if len(files) != 4:
sys.exit(__doc__)
Expand All @@ -82,7 +93,13 @@ def percrank_train(args):
rank_config = Config(fname_rank_config)
if candgen_model:
rank_config['candgen_model'] = candgen_model
ranker = PerceptronRanker(rank_config)
if not parallel:
ranker = PerceptronRanker(rank_config)
else:
rank_config['jobs_number'] = jobs_number
if work_dir is None:
work_dir, _ = os.path.split(fname_rank_config)
ranker = ParallelPerceptronRanker(rank_config, work_dir)
ranker.train(fname_train_das, fname_train_ttrees, data_portion=train_size)
ranker.save_to_file(fname_rank_model)

Expand Down
232 changes: 154 additions & 78 deletions tgen/parallel_percrank_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
"""

from __future__ import unicode_literals
from copy import deepcopy
from collections import deque, namedtuple
import sys
from time import sleep
from threading import Thread
import socket
import cPickle as pickle

import numpy as np
from rpyc import Service, connect, async
Expand All @@ -20,41 +22,73 @@
from flect.cluster import Job

from rank import PerceptronRanker
from tgen.logf import log_info


class ParallelTrainingPercRank(PerceptronRanker, Service):
class ServiceConn(namedtuple('ServiceConn', ['host', 'port', 'conn'])):
"""This stores a connection along with its address."""
pass

def __init__(self, cfg):
super(ParallelTrainingPercRank, self).__init__(cfg)
super(Service, self).__init__()
self.work_dir = cfg['work_dir'] # TODO possibly change this so that it is a separate param

def get_worker_registrar_for(head):
"""Return a class that will handle worker registration for the given head."""

# create a dump of the head to be passed to workers
head_dump = pickle.dumps(head.get_plain_percrank(), protocol=pickle.HIGHEST_PROTOCOL)

class WorkerRegistrarService(Service):

def exposed_register_worker(self, host, port):
"""Register a worker with my head, initialize it."""
# initiate connection in the other direction
log_info('Worker %s:%d connected, initializing training.' % (host, port))
conn = connect(host, port)
# initialize the remote server (with training data etc.)
conn.root.init_training(head_dump)
# add it to the list of running services
sc = ServiceConn(host, port, conn)
head.services.add(sc)
head.free_services.append(sc)
log_info('Worker %s:%d initialized.' % (host, port))

return WorkerRegistrarService


class ParallelPerceptronRanker(PerceptronRanker):

DEFAULT_PORT = 25125

def __init__(self, cfg, work_dir):
# initialize base class
super(ParallelPerceptronRanker, self).__init__(cfg)
# initialize myself
self.work_dir = work_dir
self.jobs_number = cfg.get('jobs_number', 10)
self.data_portions = cfg.get('data_portions', self.jobs_number)
self.job_memory = cfg.get('job_memory', 4)
self.port = cfg.get('port', 25125)
self.port = cfg.get('port', self.DEFAULT_PORT)
self.host = socket.getfqdn()
self.poll_interval = cfg.get('poll_interval', 1)
# this will be needed when running
self.server = None
self.server_thread = None
self.jobs = None
self.pending_requests = None
self.services = None
self.free_services = None
self.results = None

def train(self, das_file, ttree_file, data_portion=1.0):
"""(Head) run the training, start and manage workers."""
"""Run parallel perceptron training, start and manage workers."""
# initialize myself
log_info('Initializing...')
self._init_training(das_file, ttree_file, data_portion)
# run server to process registering clients
self.server = ThreadPoolServer(service=ParallelTrainingPercRank,
nbThreads=1,
port=self.port)
self.services = set()
self.free_services = deque()
self.pending_requests = set()
self.jobs = []
self._init_server()
# spawn training jobs
log_info('Spawning jobs...')
for j in xrange(self.jobs_number):
job = Job(code='run_worker(%s, %d)',
job = Job(code='run_worker("%s", %d)' % (self.host, self.port),
header='from tgen.parallel_percrank_train import run_worker',
name="PRT%02d" % j,
work_dir=self.work_dir)
Expand All @@ -63,24 +97,28 @@ def train(self, das_file, ttree_file, data_portion=1.0):
# wait for free services / assign computation
try:
for iter_no in xrange(1, self.passes + 1):
log_info('Iteration %d...' % iter_no)
cur_portion = 0
results = [None] * self.data_portions
while cur_portion < self.data_portions:
# check if some of the pending computations have finished
for conn, req_portion, req in list(self.pending_requests):
for sc, req_portion, req in list(self.pending_requests):
if req.ready:
log_info('Retrieved finished request %d / %d' % (iter_no, req_portion))
if req.error:
raise Exception('Request computed with error: IT %d PORTION %d, WORKER %s:%d' %
(iter_no, req_portion, conn.host, conn.port))
log_info('Error found on request: IT %d PORTION %d, WORKER %s:%d' %
(iter_no, req_portion, sc.host, sc.port))
results[req_portion] = req.value
self.pending_requests.remove(conn, req_portion, req)
self.free_services.append(conn)
self.pending_requests.remove(sc, req_portion, req)
self.free_services.append(sc)
# check for free services and assign new computation
while cur_portion < self.data_portions and self.free_services:
conn = self.free_services.popleft()
train_func = async(conn.root.training_iter)
sc = self.free_services.popleft()
log_info('Assigning request %d / %d to %s:%d' %
(iter_no, cur_portion, sc.host, sc.port))
train_func = async(sc.conn.root.training_iter)
req = train_func(self.w, iter_no, *self._get_portion_bounds(cur_portion))
self.pending_requests.add((conn, cur_portion, req))
self.pending_requests.add((sc, cur_portion, req))
cur_portion += 1
# sleep for a while
sleep(self.poll_interval)
Expand All @@ -91,79 +129,117 @@ def train(self, das_file, ttree_file, data_portion=1.0):
for job in self.jobs:
job.delete()

def _init_server(self):
"""Initializes a server that registers new workers."""
registrar_class = get_worker_registrar_for(self)
self.server = ThreadPoolServer(service=registrar_class, nbThreads=1, port=self.port)
self.services = set()
self.free_services = deque()
self.pending_requests = set()
self.jobs = []
self.server_thread = Thread(target=self.server.start)
self.server_thread.setDaemon(True)
self.server_thread.start()

def _get_portion_bounds(self, portion_no):
# TODO
"""(Head) return the offset and size of the specified portion of the training
data to be sent to a worker.
@param portion_no: the number of the portion whose bounds should be computed
@rtype: tuple
@return: offset and size of the desired training data portion
"""
portion_size, bigger_portions = divmod(len(self.train_trees), self.data_portions)
if portion_no < bigger_portions:
return (portion_size + 1) * portion_no, portion_size + 1
else:
return portion_size * portion_no + bigger_portions, portion_size
raise NotImplementedError()

def exposed_register_worker(self, host, port):
"""(Head) register a worker with this head."""
# initiate connection in the other direction
conn = connect(host, port)
# initialize the remote server (with training data etc.)
conn.root.init_training(self)
# add it to the list of running services
self.services.add(conn)
self.free_services.append(conn)

def exposed_init_training(self, other):
def get_plain_percrank(self):
percrank = PerceptronRanker(cfg=None) # initialize with 'empty' configuration
# copy all necessary data from the head
percrank.w = self.w
percrank.feats = self.feats
percrank.vectorizer = self.vectorizer
percrank.normalizer = self.normalizer
percrank.alpha = self.alpha
percrank.passes = self.passes
percrank.rival_number = self.rival_number
percrank.language = self.rival_number
percrank.selector = self.selector
percrank.rival_gen_strategy = self.rival_gen_strategy
percrank.rival_gen_max_iter = self.rival_gen_max_iter
percrank.rival_gen_max_defic_iter = self.rival_gen_max_defic_iter
percrank.rival_gen_beam_size = self.rival_gen_beam_size
percrank.candgen_model = self.candgen_model
percrank.train_trees = self.train_trees
percrank.train_feats = self.train_feats
percrank.train_sents = self.train_sents
percrank.train_das = self.train_das
percrank.asearch_planner = self.asearch_planner
percrank.sampling_planner = self.sampling_planner
percrank.candgen = self.candgen
return percrank


class PercRankTrainingService(Service):

def __init__(self, conn_ref):
super(PercRankTrainingService, self).__init__(conn_ref)
self.percrank = None

def exposed_init_training(self, head_percrank):
"""(Worker) Just deep-copy all necessary attributes from the head instance."""
self.w = deepcopy(other.w)
self.feats = deepcopy(other.feats)
self.vectorizer = deepcopy(other.vectorizer)
self.normalizer = deepcopy(other.normalizer)
self.alpha = deepcopy(other.alpha)
self.passes = deepcopy(other.passes)
self.rival_number = deepcopy(other.rival_number)
self.language = deepcopy(other.rival_number)
self.selector = deepcopy(other.selector)
self.rival_gen_strategy = deepcopy(other.rival_gen_strategy)
self.rival_gen_max_iter = deepcopy(other.rival_gen_max_iter)
self.rival_gen_max_defic_iter = deepcopy(other.rival_gen_max_defic_iter)
self.rival_gen_beam_size = deepcopy(other.rival_gen_beam_size)
self.candgen_model = deepcopy(other.candgen_model)
self.feats = deepcopy(other.feats)
self.train_trees = deepcopy(other.train_trees)
self.train_feats = deepcopy(other.train_feats)
self.train_sents = deepcopy(other.train_sents)
self.train_das = deepcopy(other.train_das)
self.asearch_planner = deepcopy(other.asearch_planner)
self.sampling_planner = deepcopy(other.sampling_planner)
self.candgen = deepcopy(other.candgen)
log_info('Initializing training...')
self.percrank = pickle.loads(head_percrank)
log_info('Training initialized.')

def exposed_training_iter(self, w, iter_no, data_offset, data_len):
"""(Worker) Run one iteration on a part of the training data."""
log_info('Training iteration %d with data portion %d + %d' %
(iter_no, data_offset, data_len))
# import current feature weights
self.w = w
percrank = self.percrank
percrank.w = w
# save rest of the training data to temporary variables, set just the
# required portion for computation
all_train_das = self.train_das
self.train_das = self.train_das[data_offset:data_offset + data_len]
all_train_trees = self.train_trees
self.train_trees = self.train_trees[data_offset:data_offset + data_len]
all_train_feats = self.train_feats
self.train_feats = self.train_feats[data_offset:data_offset + data_len]
all_train_sents = self.train_sents
self.train_sents = self.train_sents[data_offset:data_offset + data_len]
all_train_das = percrank.train_das
percrank.train_das = percrank.train_das[data_offset:data_offset + data_len]
all_train_trees = percrank.train_trees
percrank.train_trees = percrank.train_trees[data_offset:data_offset + data_len]
all_train_feats = percrank.train_feats
percrank.train_feats = percrank.train_feats[data_offset:data_offset + data_len]
all_train_sents = percrank.train_sents
percrank.train_sents = percrank.train_sents[data_offset:data_offset + data_len]
# do the actual computation (update w)
self._training_iter(iter_no)
percrank._training_iter(iter_no)
# return the rest of the training data to member variables
self.train_das = all_train_das
self.train_trees = all_train_trees
self.train_feats = all_train_feats
self.train_sents = all_train_sents
percrank.train_das = all_train_das
percrank.train_trees = all_train_trees
percrank.train_feats = all_train_feats
percrank.train_sents = all_train_sents
# return the result of the computation
return self.w
log_info('Training iteration %d / %d / %d done.' % (iter_no, data_offset, data_len))
return percrank.w


def run_worker(head_host, head_port):
# start the server
server = ThreadPoolServer(service=ParallelTrainingPercRank, nbThreads=1)
server.start()
# start the server (in the background)
log_info('Creating worker server...')
server = ThreadPoolServer(service=PercRankTrainingService, nbThreads=1)
server_thread = Thread(target=server.start)
server_thread.start()
my_host = socket.getfqdn()
log_info('Worker server created at %s:%d. Connecting to head at %s:%d...' %
(my_host, server.port, head_host, head_port))
# notify main about this server
conn = connect(head_host, head_port)
conn.root.register_worker(server.host, server.port)
conn.root.register_worker(my_host, server.port)
conn.close()
# now serve until we're killed
log_info('Worker is registered with the head.')
# now serve until we're killed (the server thread will continue to run)
server_thread.join()


if __name__ == '__main__':
Expand Down

0 comments on commit b9ffc2c

Please sign in to comment.