Skip to content

Commit

Permalink
Convert progress() function into a class
Browse files Browse the repository at this point in the history
Cleans up some scope issues and is nicer to interact with methods instead of re-writing code in several places.
  • Loading branch information
biologyguy committed Jan 9, 2017
1 parent 5773dbc commit cff0604
Showing 1 changed file with 40 additions and 32 deletions.
72 changes: 40 additions & 32 deletions rdmcl/rdmcl.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,18 +464,13 @@ def mc_psi_pred(seq_obj, args):

def mcmcmc_mcl(args, params):
inflation, gq = args
external_tmp_dir, min_score, seqbuddy, parent_cluster, taxa_separator, sql_broker = params
external_tmp_dir, min_score, seqbuddy, parent_cluster, taxa_separator, sql_broker, progress = params
mcl_tmp_dir = br.TempDir()

mcl_output = Popen("mcl %s/input.csv --abc -te 2 -tf 'gq(%s)' -I %s -o %s/output.groups" %
(external_tmp_dir, gq, inflation, mcl_tmp_dir.path), shell=True, stderr=PIPE).communicate()

with LOCK:
with open("%s/.progress" % in_args.outdir, "r") as ifile:
_progress = json.load(ifile)
_progress['mcl_runs'] += 1
with open("%s/.progress" % in_args.outdir, "w") as ofile:
json.dump(_progress, ofile)
progress.update('mcl_runs', 1)

mcl_output = mcl_output[1].decode()
if re.search("\[mclvInflate\] warning", mcl_output) and min_score:
Expand Down Expand Up @@ -511,14 +506,16 @@ def mcmcmc_mcl(args, params):
return score


def orthogroup_caller(master_cluster, cluster_list, seqbuddy, sql_broker, steps=1000, quiet=True, taxa_separator="-"):
def orthogroup_caller(master_cluster, cluster_list, seqbuddy, sql_broker, progress,
steps=1000, quiet=True, taxa_separator="-"):
"""
Run MCMCMC on MCL to find the best orthogroups
:param master_cluster: The group to be subdivided
:type master_cluster: Cluster
:param cluster_list: When a sequence_ids is finalized after recursion, it is appended to this list
:param seqbuddy: The sequences that are included in the master sequence_ids
:param sql_broker: Multithread SQL broker that can be queried
:param progress: Progress class
:param steps: How many MCMCMC iterations to run TODO: calculate this on the fly
:param quiet: Suppress StdErr
:param taxa_separator: The string that separates taxon names from gene names
Expand All @@ -532,12 +529,8 @@ def save_cluster():
alignment.write("%s/alignments/%s.aln" % (in_args.outdir, master_cluster.name()))
master_cluster.sim_scores.to_csv("%s/sim_scores/%s.scores" % (in_args.outdir, master_cluster.name()),
header=None, index=False, sep="\t")

with open("%s/.progress" % in_args.outdir, "r") as ifile:
_progress = json.load(ifile)
_progress["placed"] += len(master_cluster.seq_ids) if not master_cluster.subgroup_counter else 0
with open("%s/.progress" % in_args.outdir, "w") as _ofile:
json.dump(_progress, _ofile)
update = len(master_cluster.seq_ids) if not master_cluster.subgroup_counter else 0
progress.update("placed", update)
return

master_cluster.set_name()
Expand All @@ -560,7 +553,7 @@ def save_cluster():
try:
with open("%s/max.txt" % temp_dir.path, "w") as ofile:
ofile.write("-1000000000")
mcmcmc_params = ["%s" % temp_dir.path, False, seqbuddy, master_cluster, taxa_separator, sql_broker]
mcmcmc_params = ["%s" % temp_dir.path, False, seqbuddy, master_cluster, taxa_separator, sql_broker, progress]
mcmcmc_factory = mcmcmc.MCMCMC([inflation_var, gq_var], mcmcmc_mcl, steps=steps, sample_rate=1,
params=mcmcmc_params,
quiet=quiet, outfile="%s/mcmcmc_out.csv" % temp_dir.path)
Expand All @@ -575,7 +568,7 @@ def save_cluster():
worst_score = chain.raw_min if chain.raw_min < worst_score else worst_score

mcmcmc_factory.reset_params(["%s" % temp_dir.path, worst_score, seqbuddy,
master_cluster, taxa_separator, sql_broker])
master_cluster, taxa_separator, sql_broker, progress])
mcmcmc_factory.run()
mcmcmc_output = pd.read_csv("%s/mcmcmc_out.csv" % temp_dir.path, "\t")

Expand Down Expand Up @@ -617,21 +610,37 @@ def save_cluster():

# Recursion... Reassign cluster_list, as all clusters are returned at the end of a call to orthogroup_caller
cluster_list = orthogroup_caller(sub_cluster, cluster_list, seqbuddy=seqbuddy_copy, sql_broker=sql_broker,
steps=steps, quiet=quiet, taxa_separator=taxa_separator)
progress=progress, steps=steps, quiet=quiet, taxa_separator=taxa_separator)

save_cluster()
return cluster_list


def progress():
with LOCK:
with open("%s/.progress" % in_args.outdir, "r") as ifile:
try:
class Progress(object):
def __init__(self, out_dir):
self.outdir = out_dir
with open("%s/.progress" % self.outdir, "w") as progress_file:
_progress = {"mcl_runs": 0, "placed": 0, "total": len(group_0_cluster)}
json.dump(_progress, progress_file)

def update(self, key, value):
with LOCK:
with open("%s/.progress" % self.outdir, "r") as ifile:
_progress = json.load(ifile)
return "MCL runs processed: %s. Sequences placed: %s/%s. Run time: " \
% (_progress['mcl_runs'], _progress['placed'], _progress['total'])
except json.decoder.JSONDecodeError:
pass
_progress[key] += value
with open("%s/.progress" % self.outdir, "w") as _ofile:
json.dump(_progress, _ofile)
return

def read(self):
with LOCK:
with open("%s/.progress" % self.outdir, "r") as ifile:
return json.load(ifile)

def __str__(self):
_progress = self.read()
return "MCL runs processed: %s. Sequences placed: %s/%s. Run time: " \
% (_progress['mcl_runs'], _progress['placed'], _progress['total'])


def parse_mcl_clusters(path):
Expand Down Expand Up @@ -1121,17 +1130,16 @@ def check_sequences(seqbuddy):
# Ortholog caller
logging.warning("\n** Recursive MCL **")
final_clusters = []
with open("%s/.progress" % in_args.outdir, "w") as progress_file:
progress_dict = {"mcl_runs": 0, "placed": 0, "total": len(group_0_cluster)}
json.dump(progress_dict, progress_file)
run_time = br.RunTime(prefix=progress, _sleep=0.3, final_clear=True)
progress_tracker = Progress(in_args.outdir)

run_time = br.RunTime(prefix=progress_tracker.__str__, _sleep=0.3, final_clear=True)
run_time.start()
final_clusters = orthogroup_caller(group_0_cluster, final_clusters, seqbuddy=sequences, sql_broker=broker,
steps=in_args.mcmcmc_steps, quiet=True, taxa_separator=in_args.taxa_separator)
progress=progress_tracker, steps=in_args.mcmcmc_steps, quiet=True,
taxa_separator=in_args.taxa_separator)
run_time.end()

with open("%s/.progress" % in_args.outdir, "r") as progress_file:
progress_dict = json.load(progress_file)
progress_dict = progress_tracker.read()
logging.warning("Total MCL runs: %s" % progress_dict["mcl_runs"])
logging.warning("\t-- finished in %s --" % TIMER.split())

Expand Down

0 comments on commit cff0604

Please sign in to comment.