-
Notifications
You must be signed in to change notification settings - Fork 0
/
mpi_implementation.c
120 lines (98 loc) · 4.02 KB
/
mpi_implementation.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include "Helpers/FileHelper.h"
#include "Helpers/FastqHelper.h"
#include <mpi.h>
#define WinLen 20
#define WinThres 19
#define MaxLength 310
int main(int argc, char **argv){
int rank, size,source,tag,range;
MPI_Init(&argc,&argv);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
MPI_Status status;
MPI_Comm_size(MPI_COMM_WORLD,&size);
FILE * Fin = Get_Input_File(argv[1]);
FILE * Fout = Get_Output_File(argv[2]);
int buffer_size = 10000;
if(rank == 0){ //master
printf("MPI IMPLEMENTATION: Started filtering file: %s, with %d slave nodes\n",argv[1],size-1);
if(argc!=3){
printf("the argument count is wrong, format should be <input.fastq> <output.fastq>\n");
exit(0);
}
int no_of_records = Calculate_No_Of_Records(Count_File_Lines(Fin));
int records_per_node = no_of_records/(size -1 );
printf("records in this file :%d\n",no_of_records);
printf("Records Per Node :%d\n",records_per_node);
fastq_record *Records = malloc(no_of_records* sizeof(fastq_recordR));
//Set limits to each node
for(int i=1;i<size;i++){
int from = (i-1)*records_per_node ;
int to = from + records_per_node -1 ;
if(i==size-1) to += no_of_records%(size -1);
int data[] = {from,to};
range = to -from +1;
//printf("Setting limit to Node %d, filtering from %d to %d,with range %d \n",i,from,to,range);
MPI_Send(data, 2, MPI_INT, i, 1, MPI_COMM_WORLD);
for(int j=from;j<=to;j++) {
Records[j] = Parse_Fastq_Record(Create_Next_Fastq_Buffer(Fin));
}
}
//Collect results from every node, and printing to output
rewind(Fin);
int c=0;
for(int i=1;i<size;i++){
int from = (i-1)*records_per_node;
int to = from + records_per_node -1;
if(i==size-1) to += no_of_records%(size -1);
range = to -from +1;
//Master Receives from every node an array with the position to trim the record
int* results = malloc(range*sizeof(int));
MPI_Recv(results, range, MPI_INT, i, 1, MPI_COMM_WORLD, &status);
for(int j=0;j<range;j++){
if(results[j]==-1) //low quality record
continue;
int rId = j + from;
fastq_record record = Records[rId];
Trim_Record_At(record,results[j]);
fprintf(Fout,"%s\n%s\n+\n%s\n",record->sequence_name,record->sequence,record->sequence_quality);
free(record);
c++;
}
free(results);
}
free(Records);
printf("total records added:%d\n",c);
printf("MPI IMPLEMENTATION: Ended filtering output file: %s\n",argv[2]);
}else{ //slave
int data[2];
int from,to;
//Receive limits to filter
MPI_Recv(data, 2, MPI_INT, 0, 1, MPI_COMM_WORLD, &status);
from = data[0]-1;
to = data[1];
range = to -from;
long lines_to_skip = (from+1)*4;
if(rank ==1) lines_to_skip =0;
//printf("START:Node %d,lines to skip %d, filtering from %d to %d,with range %d \n",rank,lines_to_skip,from,to,range);
skip_Lines(Fin,lines_to_skip);
//For Every record in my range save the result in an array
int* results=malloc(range*sizeof(int));
for(int i=0;i<range;i++) {
fastq_record record = Parse_Fastq_Record(Create_Next_Fastq_Buffer(Fin));
results[i] = -1;
if(Check_For_Minimum_Quality(record,WinLen,WinThres)){
results[i] = Filter_Record(record,WinLen,WinThres);
}
free(record);
}
//printf("FINISH:Node %d, filtered from %d to %d, with range %d\n",rank,from,to,range);
MPI_Send(results,range , MPI_INT, 0, 1, MPI_COMM_WORLD);
free(results);
}
Close_Files(Fin,Fout);
MPI_Finalize();
exit(0);
}