Skip to content

Commit

Permalink
Merge pull request automaticanalysis#151 from tiborauer/master
Browse files Browse the repository at this point in the history
Tibor
  • Loading branch information
jooh committed Jul 30, 2018
2 parents 2ed6c78 + 2434f09 commit 76cfca4
Show file tree
Hide file tree
Showing 46 changed files with 459 additions and 197 deletions.
9 changes: 6 additions & 3 deletions aa_engine/aa_doprocessing_onetask.m
Expand Up @@ -148,13 +148,16 @@
end
end
for d = 1:numel(deps)
if ~strcmp(deps{d}{1},inp.sourcedomain), continue; end % only relevant modules
if ~strcmp(deps{d}{1},inp.sourcedomain) ||... % only relevant modules
~exist(aas_doneflag_getpath_bydomain(sourceaap,inp.sourcedomain,deps{d}{2},sourcenumber),'file') % only finished modules (patch for remote only)
continue;
end
fid = fopen(aas_doneflag_getpath_bydomain(sourceaap,inp.sourcedomain,deps{d}{2},sourcenumber),'r');
lines = textscan(fid,'%s\n');
fclose(fid);
skipped(d) = strcmp(lines{1}{1},'skipped');
end
if all(skipped)
if all(skipped) && inp.isessential
aas_log(aap,false,sprintf('WARNING: No inputs selected for stream %s. --> MODULE %s will be SKIPPED',inp.name, stagename));
close_task(aap,tempdirtodelete);
aas_writedoneflag(aap,doneflag,'skipped');
Expand All @@ -163,7 +166,7 @@

[gotinputs, streamfiles{inpind}]=aas_retrieve_inputs_part1(aap,inp,allinputs,deps);
if isempty(setdiff(gotinputs,allinputs)) && inp.isessential % no new inputs found
aas_log(aap,true,sprintf('No inputs obtained for stream %s!\n\tModule %s might not have crteated it.',inp.name,inp.sourcestagename));
aas_log(aap,true,sprintf('No inputs obtained for stream %s!\n\tModule %s might not have created it.',inp.name,inp.sourcestagename));
end;
allinputs=[allinputs;gotinputs];
end;
Expand Down
10 changes: 6 additions & 4 deletions aa_engine/aaq/aaq_qsub.m
Expand Up @@ -4,6 +4,8 @@
QV = []
end
properties (Hidden)
poolConf = cell(1,3)

jobnotrun = []
jobinfo = struct(...
'InputArguments',{},...
Expand Down Expand Up @@ -39,8 +41,8 @@
try
if ~isempty(aap.directory_conventions.poolprofile)
% Parse configuration
[poolprofile, obj.initialSubmitArguments] = strtok(aap.directory_conventions.poolprofile,':');
if ~isempty(obj.initialSubmitArguments), obj.initialSubmitArguments(1) = []; end
conf = textscan(aap.directory_conventions.poolprofile,'%s','delimiter',':'); for c = 1:numel(conf{1}), obj.poolConf(c) = conf{1}(c); end
[poolprofile, obj.initialSubmitArguments] = obj.poolConf{1:2};

profiles = parallel.clusterProfiles;
if ~any(strcmp(profiles,poolprofile))
Expand Down Expand Up @@ -79,7 +81,7 @@
aas_log(obj.aap,false,' "AdditionalSubmitArgs" must be listed within AdditionalProperties in the cluster profile in order to customise resource requirement and consequential queue selection.');
aas_log(obj.aap,false,' Your jobs will be submitted to th default queue.');
else
obj.pool.AdditionalProperties.AdditionalSubmitArgs = sprintf('%s -l h_cpu=%d:00:00 -l h_rss=%dG',obj.initialSubmitArguments,aaparallel.walltime,aaparallel.memory);
obj.pool.AdditionalProperties.AdditionalSubmitArgs = sprintf('%s -l s_cpu=%d:00:00 -l s_rss=%dG',obj.initialSubmitArguments,aaparallel.walltime,aaparallel.memory);
end
else
obj.pool.IndependentSubmitFcn = obj.SetArg(obj.pool.IndependentSubmitFcn,'walltime',aaparallel.walltime);
Expand Down Expand Up @@ -406,7 +408,7 @@ function close(obj)
aas_log(obj.aap,false,' "AdditionalSubmitArgs" must be listed within AdditionalProperties in the cluster profile in order to customise resource requirement and consequential queue selection.');
aas_log(obj.aap,false,' Your jobs will be submitted to th default queue.');
else
obj.pool.AdditionalProperties.AdditionalSubmitArgs = sprintf('%s -l h_cpu=%d:00:00 -l h_rss=%dG',obj.initialSubmitArguments,walltime,memory);
obj.pool.AdditionalProperties.AdditionalSubmitArgs = sprintf('%s -l s_cpu=%d:00:00 -l s_rss=%dG',obj.initialSubmitArguments,walltime,memory);
end
else
obj.pool.IndependentSubmitFcn = obj.SetArg(obj.pool.IndependentSubmitFcn,'walltime',walltime);
Expand Down
2 changes: 1 addition & 1 deletion aa_engine/aaq/nonDCS/JobClass.m
Expand Up @@ -11,12 +11,12 @@
end

properties (Hidden)
Pool
Folder
latestTaskID = 0
end

properties (Hidden, Access = protected)
Pool
schedulerID = NaN
end

Expand Down
10 changes: 6 additions & 4 deletions aa_engine/aaq/nonDCS/PoolClass.m
Expand Up @@ -6,6 +6,7 @@

reqMemory = 1
reqWalltime = 1
initialConfiguration = ''
end

properties (Hidden)
Expand All @@ -24,7 +25,7 @@
end

methods
function obj = PoolClass(pool,initialSubmitArguments)
function obj = PoolClass(pool,initialSubmitArguments,initialConfiguration)
% Required argument:
% pool - Initializiation object (created by PCT function parcluster) or structure
%
Expand All @@ -46,6 +47,7 @@
% Optionmal argument:
% initialSubmitArguments - Submission string specifying resources other than memory and waltime. Can be empty.
if nargin >= 2, obj.initialSubmitArguments = initialSubmitArguments; end
if nargin >= 3, obj.initialConfiguration = initialConfiguration; end

obj.Type = pool.Type;
obj.JobStorageLocation = pool.JobStorageLocation;
Expand Down Expand Up @@ -76,8 +78,8 @@
if ~isprop(pool.AdditionalProperties,'AdditionalSubmitArgs') && ~isfield(pool.AdditionalProperties,'AdditionalSubmitArgs')
warning(sprintf('WARNING: Propertiy "AdditionalSubmitArgs" not found.\n "AdditionalSubmitArgs" must be listed within AdditionalProperties in the cluster profile in order to customise resource requirement and consequential queue selection.\n Your jobs will be submitted to th default queue.'));
else
datWT = sscanf(regexp(pool.AdditionalProperties.AdditionalSubmitArgs,'h_cpu=[0-9]*','once','match'),'walltime=%d');
datMem = sscanf(regexp(pool.AdditionalProperties.AdditionalSubmitArgs,'h_rss=[0-9]*','once','match'),'mem=%d');
datWT = sscanf(regexp(pool.AdditionalProperties.AdditionalSubmitArgs,'s_cpu=[0-9]*','once','match'),'walltime=%d');
datMem = sscanf(regexp(pool.AdditionalProperties.AdditionalSubmitArgs,'s_rss=[0-9]*','once','match'),'mem=%d');
end
else
datWT = pool.IndependentSubmitFcn{find(strcmp(pool.IndependentSubmitFcn,'walltime'))+1};
Expand Down Expand Up @@ -142,7 +144,7 @@ function updateSubmitArguments(obj)
case 'LSF'
obj.SubmitArguments = sprintf('%s -c %d -M %d -R "rusage[mem=%d:duration=%dh]"',obj.initialSubmitArguments,walltime*60,memory*1000,memory*1000,walltime);
case 'Generic'
obj.SubmitArguments = sprintf('%s -l h_cpu=%d:00:00 -l h_rss=%dG',obj.initialSubmitArguments,walltime,memory);
obj.SubmitArguments = sprintf('%s -l s_cpu=%d:00:00 -l s_rss=%dG',obj.initialSubmitArguments,walltime,memory);
end
end
end
Expand Down
3 changes: 2 additions & 1 deletion aa_engine/aaq/nonDCS/TaskClass.m
Expand Up @@ -70,7 +70,8 @@

% create script
fid = fopen(obj.ShellFile,'w');
fprintf(fid,'export MALLOC_ARENA_MAX=4; matlab -nosplash -nodesktop -logfile %s -r "fid = fopen(''%s'',''w''); fprintf(fid,''%%s\\n'',java.lang.management.ManagementFactory.getRuntimeMXBean.getName.char); fclose(fid); try; %s catch E; save(''%s'',''E''); end; fid = fopen(''%s'',''a''); fprintf(fid,''%%s'',char(datetime(''now'',''Timezone'',''local''))); fclose(fid); quit"',...
if ~isempty(obj.Parent.Pool.initialConfiguration), fprintf(fid,'%s',obj.Parent.Pool.initialConfiguration); end
fprintf(fid,'matlab -nosplash -nodesktop -logfile %s -r "fid = fopen(''%s'',''w''); fprintf(fid,''%%s\\n'',java.lang.management.ManagementFactory.getRuntimeMXBean.getName.char); fclose(fid); try; %s catch E; save(''%s'',''E''); end; fid = fopen(''%s'',''a''); fprintf(fid,''%%s'',char(datetime(''now'',''Timezone'',''local''))); fclose(fid); quit"',...
obj.DiaryFile,obj.ProcessFile,Command,obj.ErrorFile,obj.ProcessFile);
fclose(fid);
end
Expand Down
4 changes: 2 additions & 2 deletions aa_engine/aaq/nonDCS/aaq_qsub_nonDCS.m
Expand Up @@ -14,8 +14,8 @@
pool = obj.pool;
obj.pool = [];
else
poolprofile = obj.poolConf{1};
% assume aap.directory_conventions.poolprofile is a path to settings
poolprofile = strtok(aap.directory_conventions.poolprofile,':');
if ~exist(poolprofile,'file')
aas_log(obj.aap,false,'aap.directory_conventions.poolprofile is either not a file or cannot be found');
return
Expand All @@ -28,7 +28,7 @@
pool.AdditionalProperties.AdditionalSubmitArgs = '';
end
end
obj.pool = PoolClass(pool,obj.initialSubmitArguments);
obj.pool = PoolClass(pool,obj.initialSubmitArguments,obj.poolConf{3});
obj.pool.reqMemory = aaparallel.memory;
obj.pool.reqWalltime = aaparallel.walltime;
end
Expand Down
1 change: 1 addition & 0 deletions aa_engine/aarecipe.m
Expand Up @@ -96,6 +96,7 @@
end

% SPM
if isdeployed, aap.directory_conventions.spmdir = spm('Dir'); end
if ~isempty(aap.directory_conventions.spmdir)
addpath(aap.directory_conventions.spmdir);
spm_jobman('initcfg');
Expand Down
6 changes: 4 additions & 2 deletions aa_engine/aas_connectAApipelines.m
Expand Up @@ -204,8 +204,9 @@
isfield(aap.tasksettings.(mod.name)(mod.index).inputstreams, 'stream')

% Names of input and output streams for this module
inputStreams = aap.tasksettings.(mod.name)(mod.index).inputstreams.stream;
[inputStreams, inputStreamAttr] = aas_getstreams(aap,mod.name,mod.index,'input');
if ~iscell(inputStreams), inputStreams = {inputStreams}; end
if ~iscell(inputStreamAttr), inputStreamAttr = {inputStreamAttr}; end

remoteStreams = struct('stream', {}, 'stagetag', {}, 'sourcedomain', {}, 'sourcemodality', {}, 'host', {}, 'aapfilename', {}, 'allowcache', {});

Expand Down Expand Up @@ -266,7 +267,8 @@
rIQual = find(strcmp(inputStageTag, {remoteOutputs(rI).stagetag}));

if isempty(rIQual)
aas_log(aap, 1, sprintf('Can''t find %s.%s in remotely specified AAPs!', inputStageTag, inputStreamName));
if isfield(inputStreamAttr{iI},'isessential') && ~inputStreamAttr{iI}.isessential, continue;
else aas_log(aap, 1, sprintf('Can''t find %s.%s in remotely specified AAPs!', inputStageTag, inputStreamName)); end
end
rI = rI(rIQual(end));
end
Expand Down
2 changes: 1 addition & 1 deletion aa_engine/aas_log.m
Expand Up @@ -60,7 +60,7 @@ function aas_log(aap,iserr,msg,style)
cd(aap.internal.pwd)
end

if aap.options.verbose ~= -1, error(sprintf(['aa error:\n' msg '\n'])); end % undocumented, for devel only
if aap.options.verbose ~= -1, error('aa:internal','aa error:\n%s\n',msg); end % undocumented, for devel only
else % warnings
if aap.options.verbose == 2, logitem([msg '\n'],style); end
end
Expand Down
14 changes: 13 additions & 1 deletion aa_engine/aas_renamestream.m
Expand Up @@ -27,6 +27,7 @@
if any(odot), origstreamname = origstream(find(odot)+1:end);
else origstreamname = origstream; end

[newstream, newattr] = strtok(newstream,':'); if ~isempty(newattr), newattr(1) = []; end
ndot = newstream == '.';
if any(ndot), newstreamname = newstream(find(ndot)+1:end);
else newstreamname = newstream; end
Expand All @@ -52,7 +53,18 @@
if strcmp(inputstreamname,origstreamname)
ind = i;
elseif strcmp(origstreamname,'append')
ind = i + 1;
ind = i + 1;
stream.CONTENT = newstream;
% defaults
stream.ATTRIBUTE.isrenameable = 1;
stream.ATTRIBUTE.isessential = 1;
if ~isempty(newattr)
listAttr = textscan(newattr,'%s','delimiter',':'); listAttr = listAttr{1};
for a = listAttr'
[key, val] = strtok(listAttr{1},'-'); val = str2double(val);
stream.ATTRIBUTE.(key) = val;
end
end
aap.schema.tasksettings.(stagename)(stageindex).([type 'streams']).stream{ind} = stream;
if isfield(aap,'internal'), aap.internal.aap_initial.schema.tasksettings.(stagename)(stageindex).([type 'streams']).stream{ind} = stream; end
else
Expand Down
11 changes: 6 additions & 5 deletions aa_engine/aas_retrieve_inputs_part1.m
@@ -1,7 +1,7 @@
% Retrieves inputs from a particular stream to a particular destination
%

function [gotinputs streamfiles]=aas_retrieve_inputs_part1(aap,inputstream,gotinputs,deps)
function [gotinputs, streamfiles]=aas_retrieve_inputs_part1(aap,inputstream,gotinputs,deps)
global aaworker

streamfiles=[];
Expand All @@ -21,7 +21,7 @@
fromstreamname=streamname(pos(end)+1:end);
else
fromstreamname=streamname;
end;
end

%% REMOTE STREAM - this is stored on another machine and retrieved with rsync+ssh
% Is it a remote stream?
Expand Down Expand Up @@ -72,6 +72,7 @@
if any(remoteindices == 0)
badIndex = find(remoteindices==0,1,'first');
domainItems = aas_getNames_bydomain(aap, localTree{badIndex+1});
if isempty(domainItems{1}), domainItems{1}{indices(badIndex)} = localTree{badIndex+1}; end
aas_log(aap, false, sprintf('WARNING: Remote AAP doesn''t have %s ''%s''!', localTree{badIndex+1}, domainItems{1}{indices(badIndex)}));
end

Expand Down Expand Up @@ -236,10 +237,10 @@

gotinputs=[gotinputs;fns_dest_full];
else % cleanup
if isCreated, rmdir(dest); end
end;
if isCreated, rmdir(dest,'s'); end
end

end;
end

else
%% LOCALLY - local disc or S3
Expand Down
111 changes: 111 additions & 0 deletions aa_modules/aamod_CoSMoMVPA.m
@@ -0,0 +1,111 @@
function [aap,resp]=aamod_CoSMoMVPA(aap,task,subj)
resp='';

switch task
case 'report'
% localpath = aas_getpath_bydomain(aap,aap.tasklist.currenttask.domain,[subj,sess]);
%
% fdiag = dir(fullfile(localpath,'diagnostic_*.jpg'));
% if isempty(fdiag)
% streams=aas_getstreams(aap,'output');
% for streamind=1:length(streams)
% % obtain output
% outputfnames = aas_getfiles_bystream(aap,aap.tasklist.currenttask.domain,[subj sess],streams{streamind},'output');
%
% % perform diagnostics
% do_diag(outputfnames);
% end
% fdiag = dir(fullfile(localpath,'diagnostic_*.jpg'));
% end
%
% for d = 1:numel(fdiag)
% aap = aas_report_add(aap,subj,'<table><tr><td>');
% imgpath = fullfile(localpath,fdiag(d).name);
% aap=aas_report_addimage(aap,subj,imgpath);
% aap = aas_report_add(aap,subj,'</td></tr></table>');
% end
case 'doit'
%% Prepare data
RSAROOT = fullfile(aas_getsubjpath(aap,subj),'RSA');
aas_makedir(aap,RSAROOT);

inps = aas_getstreams(aap,'input');
inps = inps(logical(cellfun(@(x) exist(aas_getinputstreamfilename(aap,'subject',subj,x),'file'), inps)));
struct_fn = aas_getfiles_bystream(aap,'subject',subj,inps{1});

fnMask = cellfun(@(x) aas_getfiles_bystream(aap,'subject',subj,x), inps(cell_index(inps,'firstlevel_brainmask')),'UniformOutput',false);
fnSPM = cellfun(@(x) aas_getfiles_bystream(aap,'subject',subj,x), inps(cellfun(@(x) ~isempty(regexp(x,'firstlevel_spm$', 'once')), inps)),'UniformOutput',false);
fnTmaps = cellfun(@(x) aas_getfiles_bystream(aap,'subject',subj,x), inps(cell_index(inps,'firstlevel_spmts')),'UniformOutput',false);

if numel(fnSPM) > 1
brain_mask = spm_imcalc(spm_vol(char(fnMask)),fullfile(RSAROOT,'brain_mask.nii'),'min(X)',{1});
else
brain_mask.fname = char(fnMask);
end

ITEMS = aas_getsetting(aap,'itemList');
fnT = {};
for run = 1:numel(fnSPM)
load(fnSPM{run});
conNames = {SPM.xCon.name}';
for it = 1:numel(ITEMS)
ITEMS{it} = cellstr(ITEMS{it});
fnT{end+1} = fnTmaps{run}(sum(cellfun(@(x) cell_index(conNames,x), ITEMS{it})),:);
end
end
spm_file_merge(fnT,fullfile(RSAROOT,'glm_T_stats_perrun.nii'));

%% Initialise Cosmo
oldPath = path;
cosmo_set_path
cosmo_check_external('-tic');

% Data
ds=cosmo_fmri_dataset(fullfile(RSAROOT,'glm_T_stats_perrun.nii'),'mask',brain_mask.fname,...
'targets',repmat(1:numel(ITEMS),1,numel(fnSPM))');
ds=cosmo_fx(ds, @(x)mean(x,1), 'targets', 1);
ds.sa.labels=cellfun(@(x) x{1}, ITEMS, 'UniformOutput', false)';
ds.sa.set=(1:numel(ITEMS))';
cosmo_check_dataset(ds);

% Searchlight
nbrhood=cosmo_spherical_neighborhood(ds,'count',aas_getsetting(aap,'searchlightVox'));

% Model
target_dsm=importdata(aas_getsetting(aap,'bsMatrix'));
measure=@cosmo_target_dsm_corr_measure;
measure_args=struct();
measure_args.target_dsm=target_dsm;

%% Info
aas_log(aap,false,'INFO:Dataset input:'); cosmo_disp(ds);
aas_log(aap,false,'INFO:Searchlight neighborhood definition:'); cosmo_disp(nbrhood);
aas_log(aap,false,'INFO:Target DSM:'); disp(target_dsm);

% imagesc(target_dsm)
% set(gca,'XTick',1:size(ds.samples,1),'XTickLabel',ds.sa.labels,...
% 'YTick',1:size(ds.samples,1),'YTickLabel',ds.sa.labels)

%% Run
ds_rsm_behav=cosmo_searchlight(ds,nbrhood,measure,measure_args);

% cosmo_plot_slices(ds_rsm_behav);

% store results
rsa_fn=fullfile(RSAROOT,'RSAmap.nii');
cosmo_map2fmri(ds_rsm_behav,rsa_fn);

%% Cleanup
path(oldPath);

aap=aas_desc_outputs(aap,'subject',subj,'RSAmap',rsa_fn);

case 'checkrequirements'

otherwise
if isempty(which('cosmo_set_path')), aas_log(aap,true,sprintf('CoSMoMVPA cannot be found!\n Make sure you add <CoSMoMVPA directory>/mvpa to aap.directory_conventions.spmtoolsdir.')); end
reqInps = {'firstlevel_brainmask' 'firstlevel_spm' 'firstlevel_spmts'};
inps = aas_getstreams(aap,'input');
missingInps = reqInps(cellfun(@(x) ~any(cell_index(inps,x)), reqInps));
if ~isempty(missingInps), aas_log(aap,true,['Inputs not specified:' sprintf(' %s',missingInps{:})]); end
end

0 comments on commit 76cfca4

Please sign in to comment.