Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

fixing cluster-id off-by-one -- adding minclustersize csize -- tag_rmdup

  • Loading branch information...
commit bdb2f290af609dc2699f6a3e8032129128009f2d 1 parent 28b1f8c
@avilella authored
Showing with 45 additions and 32 deletions.
  1. +45 −32 modules/Pinball/Cluster.pm
View
77 modules/Pinball/Cluster.pm
@@ -36,7 +36,7 @@ sub fetch_input {
$self->{starttime} = time();
print STDERR "[init] ",time()-$self->{starttime}," secs...\n" if ($self->debug);
- my $tag_rmdup = $self->param('tag_rmdup') || die "'tag_rmdup' is an obligatory parameter";
+ my $tag_filter = $self->param('tag_filter') || die "'tag_filter' is an obligatory parameter";
}
@@ -51,7 +51,14 @@ sub run {
print STDERR "[run init] ",time()-$self->{starttime}," secs...\n" if ($self->debug);
- my $tag_rmdup = $self->param('tag_rmdup');
+ my $clusterfile_topup = $self->param('clusterfile_topup');
+ if (defined $clusterfile_topup) {
+ # We did a manual top_up, so we don't actually call the cluster method,
+ # just skip to write_output
+ return 1;
+ }
+
+ my $tag_filter = $self->param('tag_filter');
my $tag = $self->param('tag');
my $threads = $self->param('cluster_threads') || 1;
my $overlap = $self->param('overlap');
@@ -60,19 +67,22 @@ sub run {
my $sga_executable = $self->param('sga_executable');
my $work_dir = $self->param('work_dir');
- my ($infilebase,$path,$type) = fileparse($self->param('tag_rmdup'));
+ my ($infilebase,$path,$type) = fileparse($self->param('tag_filter'));
my $cmd;
if (!defined $work_dir) {
$work_dir = $path;
}
chdir($work_dir);
+ my $verbose_flag = '-v'; $verbose_flag = '-v' if ($self->debug);
# sga cluster
- # $cmd = "$sga_executable cluster -m $overlap -c $csize -e $erate -t $threads $tag.rmdup.fa -o $tag.d$dust.$csize.$overlap.e$erate.clusters";
+ # $cmd = "$sga_executable cluster $verbose_flag -m $overlap -c $csize -e $erate -t $threads $tag.filter.pass.fa -o $tag.d$dust.$csize.$overlap.e$erate.clusters";
my $clustersfile = $work_dir . "/$tag.clusters";
- $cmd = "$sga_executable cluster -m $overlap -c $csize -e $erate -t $threads $tag_rmdup -o $clustersfile";
+ $cmd = "$sga_executable cluster -m $overlap -c $csize -e $erate -t $threads $tag_filter -o $clustersfile";
print STDERR "$cmd\n" if ($self->debug);
+ $self->db->dbc->disconnect_when_inactive(1);
unless(system("$cmd") == 0) { print("$cmd\n"); $self->throw("error running sga cluster: $!\n"); }
+ $self->db->dbc->disconnect_when_inactive(0);
print STDERR "[cluster] ",time()-$self->{starttime}," secs...\n" if ($self->debug);
if (-e $clustersfile && !-z $clustersfile) {
@@ -92,7 +102,7 @@ sub run {
sub write_output { # nothing to write out, but some dataflow to perform:
my $self = shift @_;
- my $minclustersize = $self->param('minclustersize');
+ my $minclustersize = $self->param('minclustersize') || $self->param('csize');
my $maxclustersize = $self->param('maxclustersize');
my $work_dir = $self->param('work_dir');
my $tag = $self->param('tag');
@@ -101,6 +111,10 @@ sub write_output { # nothing to write out, but some dataflow to perform:
$self->{sizedir} = 200;
my $clustersfile = $self->param('clustersfile');
+
+ my $clusterfile_topup = $self->param('clusterfile_topup');
+ $clustersfile = $clusterfile_topup if (defined $clusterfile_topup);
+ $DB::single=1;1;
my $last_cluster_id = 'none';
my @seq_list;
my @output_ids;
@@ -116,9 +130,9 @@ sub write_output { # nothing to write out, but some dataflow to perform:
if (defined @seq_list) {
if (scalar @seq_list < $maxclustersize && $minclustersize < scalar @seq_list) {
- my $outfile = $self->create_outdir($cluster_id, $work_dir);
+ my $outfile = $self->create_outdir($last_cluster_id, $work_dir);
open OUT, ">$outfile" or die $!; print OUT join('',@seq_list); close OUT;
- print STDERR "[ $readsnum - $cluster_id - $outfile - $diff secs...]\n" if ($self->debug);
+ print STDERR "[ $readsnum - $last_cluster_id - $outfile - $diff secs...]\n" if ($self->debug);
push @output_ids, { 'clst' => $outfile, 'work_dir' => $work_dir, 'tag' => $tag };
}
}
@@ -133,35 +147,34 @@ sub write_output { # nothing to write out, but some dataflow to perform:
$self->param('output_ids', \@output_ids);
my $output_ids = $self->param('output_ids');
- $self->dataflow_output_id($output_ids, 2);
+ my $job_ids = $self->dataflow_output_id($output_ids, 2);
+ print join("\n",@$job_ids), "\n" if ($self->debug);
$self->warning(scalar(@$output_ids).' jobs have been created');
}
-sub create_outdir {
- my $self = shift;
- my $cluster_id = shift;
- my $work_dir = shift;
-
- my $outddir;
- if ($self->{this_sizedir} >= $self->{sizedir}) {
- $self->{this_sizedir} = 0;
- $self->{filenum}++;
- }
- $self->{filenum} = sprintf("%06d", $self->{filenum});
- $self->{filenum} =~ /(\d{2})(\d{2})(\d{2})/; #
- my $dir1 = $1; my $dir2 = $2; my $dir3 = $3;
- my $outdir = $work_dir . "/$dir1/$dir2/$dir3";
- $self->{final_dir} = "$dir1:$dir2:$dir3";
- $outdir =~ s/\/\//\//g;
- unless (-d $outdir) {
- system("mkdir -p $outdir");
- }
- $self->{this_sizedir}++;
- my $outfile = $outdir . "/" . $cluster_id . ".fa";
-
- return $outfile;
+sub write_output {
+ my $self = shift @_;
+
+ my $work_dir = $self->param('work_dir');
+ my $tag = $self->param('tag');
+ my $clustersfile = $self->param('clustersfile');
+
+ my @output_ids;
+ push @output_ids, { 'clustersfile' => $clustersfile, 'work_dir' => $work_dir, 'tag' => $tag };
+ $self->param('output_ids', \@output_ids);
+ my $output_ids = $self->param('output_ids');
+
+ my $job_ids = $self->dataflow_output_id($output_ids, 2);
+ print join("\n",@$job_ids), "\n" if ($self->debug);
+
+ $self->warning(scalar(@$output_ids).' job(s) created'); # warning messages get recorded into 'job_message' table
+
+ ## then flow into the branch#1 funnel; input_id would flow into branch#1 by default anyway, but we request it here explicitly:
+ # $self->dataflow_output_id($self->input_id, 1);
}
+
+
1;
Please sign in to comment.
Something went wrong with that request. Please try again.