Skip to content
1 change: 1 addition & 0 deletions diag_manager/diag_data.F90
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ MODULE diag_data_mod
!! routine is called with the optional time_init parameter.
LOGICAL :: use_mpp_io = .false. !< false is fms2_io (default); true is mpp_io
LOGICAL :: use_refactored_send = .false. !< Namelist flag to use refactored send_data math funcitons.
LOGICAL :: auto_merge_nc = .false. !< Namelist flag to automatically merge netCDF files.

! <!-- netCDF variable -->

Expand Down
174 changes: 170 additions & 4 deletions diag_manager/diag_manager.F90
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ MODULE diag_manager_mod
& get_ticks_per_second
USE mpp_mod, ONLY: mpp_get_current_pelist, mpp_pe, mpp_npes, mpp_root_pe, mpp_sum

USE mpp_mod, ONLY: input_nml_file
USE mpp_mod, ONLY: input_nml_file, mpp_sync

USE fms_mod, ONLY: error_mesg, FATAL, WARNING, NOTE, stdout, stdlog, write_version_number,&
& fms_error_handler, check_nml_error, lowercase
Expand All @@ -230,7 +230,7 @@ MODULE diag_manager_mod
& use_cmor, issue_oor_warnings, oor_warnings_fatal, oor_warning, pack_size,&
& max_out_per_in_field, flush_nc_files, region_out_use_alt_value, max_field_attributes, output_field_type,&
& max_file_attributes, max_axis_attributes, prepend_date, DIAG_FIELD_NOT_FOUND, diag_init_time, diag_data_init,&
& use_mpp_io, use_refactored_send
& use_mpp_io, use_refactored_send, auto_merge_nc
USE diag_data_mod, ONLY: fileobj, fileobjU, fnum_for_domain, fileobjND
USE diag_table_mod, ONLY: parse_diag_table
USE diag_output_mod, ONLY: get_diag_global_att, set_diag_global_att
Expand All @@ -239,6 +239,7 @@ MODULE diag_manager_mod
USE fms_diag_outfield_mod, ONLY: fmsDiagOutfieldIndex_type, fmsDiagOutfield_type
USE fms_diag_fieldbuff_update_mod, ONLY: fieldbuff_update, fieldbuff_copy_missvals, &
& fieldbuff_copy_fieldvals
use netcdf_io_mod, ONLY: filepath_list_type, partitioned_global_files, partitioned_section_files, append_to_filepath_list

#ifdef use_netCDF
USE netcdf, ONLY: NF90_INT, NF90_FLOAT, NF90_CHAR
Expand All @@ -250,6 +251,10 @@ MODULE diag_manager_mod
use diag_axis_mod, only: DIAG_AXIS_UGDOMAIN
!----------

use iso_fortran_env, only: compiler_version
use iso_c_binding, only : c_int, c_char, c_ptr, c_null_ptr, c_null_char, c_new_line
use, intrinsic :: iso_c_binding, only: c_int, c_char

IMPLICIT NONE

PRIVATE
Expand All @@ -258,7 +263,8 @@ MODULE diag_manager_mod
& need_data, DIAG_ALL, DIAG_OCEAN, DIAG_OTHER, get_date_dif, DIAG_SECONDS,&
& DIAG_MINUTES, DIAG_HOURS, DIAG_DAYS, DIAG_MONTHS, DIAG_YEARS, get_diag_global_att,&
& set_diag_global_att, diag_field_add_attribute, diag_field_add_cell_measures,&
& get_diag_field_id, diag_axis_add_attribute, CMOR_MISSING_VALUE, null_axis_id
& get_diag_field_id, diag_axis_add_attribute, CMOR_MISSING_VALUE, null_axis_id,&
& exec_mppnccombine
PUBLIC :: CENTER, NORTH, EAST !< Used for diag_axis_init
! Public interfaces from diag_grid_mod
PUBLIC :: diag_grid_init, diag_grid_end
Expand Down Expand Up @@ -369,6 +375,28 @@ MODULE diag_manager_mod
MODULE PROCEDURE diag_field_add_attribute_i1d
END INTERFACE diag_field_add_attribute

! ----- interface to the C function -----
interface
function exec_mppnccombine(outfile) bind(C)
use, intrinsic :: iso_c_binding, only: c_int, c_char
implicit none
character(kind=c_char) :: outfile
integer(c_int) :: exec_mppnccombine
endfunction exec_mppnccombine
function num_partitioned_files(outfile) bind(C)
use, intrinsic :: iso_c_binding, only: c_int, c_char
implicit none
character(kind=c_char) :: outfile
integer(c_int) :: num_partitioned_files
endfunction num_partitioned_files
function smallest_pix_suffix(pattern) bind(C)
use, intrinsic :: iso_c_binding, only: c_int, c_char
implicit none
character(kind=c_char) :: pattern
integer(c_int) :: smallest_pix_suffix
endfunction smallest_pix_suffix
endinterface

!> @addtogroup diag_manager_mod
!> @{
CONTAINS
Expand Down Expand Up @@ -3687,12 +3715,150 @@ SUBROUTINE diag_manager_end(time)
DO file = 1, num_files
CALL closing_file(file, time)
END DO

! barrier to make sure all io PEs are done closing all files to be combined.
call mpp_sync()

! combine partitioned netcdf files into single file
if ( auto_merge_nc ) call combine_files()

if (allocated(fileobjU)) deallocate(fileobjU)
if (allocated(fileobj)) deallocate(fileobj)
if (allocated(fileobjND)) deallocate(fileobjND)
if (allocated(fnum_for_domain)) deallocate(fnum_for_domain)
END SUBROUTINE diag_manager_end

subroutine combine_files()
integer(c_int) :: pix ! The IO PE index within the set of IO PEs.
integer(c_int) :: smallest_pix ! The smallest IO PE index of the set of IO PEs writing the current section file.
integer(c_int) :: ireturn ! Return code from mppnccombine
integer(c_int) :: niopes ! Number of IO PEs participating in writing of global files
integer :: f ! File index for the global diagnostic files
integer :: pix_order ! 0-based order of the IO PE in the list of all IO PEs writing the file.
type(filepath_list_type), pointer :: current
type(filepath_list_type), pointer :: files_to_combine ! list of files to combined by this PE
character(len=:), allocatable :: filepath
character(kind=c_char, len=256) :: outfile
integer :: stdout_unit

stdout_unit = stdout()
niopes = 0
f = 0

! Part 1 : Global diag files
! loop through global diagnostic files to combine them in a round-robin fashion
! where each IO PE combines one file based on the IO PE index (pix) and the file index (f)
files_to_combine => null() ! initialize the list of files to be combined by this PE
current => partitioned_global_files
do while (associated(current))
filepath = trim(adjustl(current%path))
outfile = filepath(1:len(filepath)-5) // c_null_char

! get the number of files to combine (for the first global file only). The number of files is the
! same for all global files. Similarly, get pix and pix_order for the first file only, since for
! all global files, the pix and pix_order are the same.
if (niopes == 0) then
niopes = num_partitioned_files(outfile)
if (niopes == -1) call error_mesg('diag_manager_mod::combine_files', 'num_partitioned_files failed', FATAL)
read(filepath(len(filepath)-3:len(filepath)),*) pix
pix_order = get_pix_order(filepath, niopes, pix)
endif

if (mod(f, niopes) == pix_order) then
call append_to_filepath_list(outfile, files_to_combine)
end if

current => current%next
f = f + 1
end do

! Part 2 : Section diag files
current => partitioned_section_files
do while (associated(current))
filepath = trim(adjustl(current%path))
outfile = filepath(1:len(filepath)-5) // c_null_char

! Read the IO PE index (pix) from the file suffix (e.g., 0000, 0001, etc.)
read(filepath(len(filepath)-3:len(filepath)),*) pix

! get the smallest IO PE index of the set of IO PEs writing the current section file
smallest_pix = smallest_pix_suffix(outfile)

if (smallest_pix == -1 ) then
call error_mesg('diag_manager_mod::combine_files', 'smallest_pix_suffix failed', FATAL)
end if

!print *, "pix = ", pix, " filepath = ", trim(filepath), " smallest_pix = ", smallest_pix, pix == smallest_pix

if (pix == smallest_pix) then
call append_to_filepath_list(outfile, files_to_combine)
end if

current => current%next
end do

! sync all PEs before combining files
call mpp_sync()

! Part 3 : Combine all files in the list
if (associated(files_to_combine)) then
current => files_to_combine
do while (associated(current))
!write(stdout_unit,*) ' Combining file ' // filepath(1:len(filepath)-5)
ireturn = exec_mppnccombine(current%path)
if (ireturn /= 0) call error_mesg('diag_manager_mod::combine_files', 'mppnccombine failed for file ' // trim(current%path), FATAL)
current => current%next
end do
end if

end subroutine combine_files

!> Given a filename and pix (process id for an IO PE), return the order of the pix
!! in the list of all IO PEs writing the file. The order is determined by the
!! order of the file suffixes (e.g., 0000, 0001, etc.) in the file name. For example,
!! In a list of files with suffixes [0001, 0003, 0004], the order of pix 0003
!! is 1, and the order of pix 0004 is 2.
function get_pix_order(filename, num_files, pix) result(pix_order)
character(len=*), intent(in) :: filename
integer, intent(in) :: num_files
integer, intent(in) :: pix ! 0-based id of the IO PE
! local
integer :: pix_order ! 0-based order of the pix in the list of all IO PEs writing the file
character(len=4) :: suffix ! 0000, 0001, etc.
integer :: npes ! total number of all PEs
integer :: i, f
logical :: exists

npes = mpp_npes()
pix_order = -1

i = 0
do f=0, num_files-1
! Increment i until a file with the suffix i is found
do while (i<npes)
write (suffix, '(i4.4)') i
inquire(file=filename(1:len(filename)-4)//suffix, exist=exists)
if (exists) exit
i = i + 1
end do

! If the file with the suffix i is found, check if it matches the pix,
! which means that pix is the f.th IO PE in the list of all IO PEs writing the file
if (pix == i) then
pix_order = f
exit
end if

i = i + 1
end do

if (pix_order == -1) then
write(*,*) 'Error: pix ', pix, ' not found in file ', filename
call error_mesg('diag_manager_mod::get_pix_order', 'pix not found in file', FATAL)
end if

end function get_pix_order

!> @brief Replaces diag_manager_end; close just one file: files(file)
SUBROUTINE closing_file(file, time)
INTEGER, INTENT(in) :: file
Expand Down Expand Up @@ -3789,7 +3955,7 @@ SUBROUTINE diag_manager_init(diag_model_subset, time_init, err_msg)
& max_num_axis_sets, max_files, use_cmor, issue_oor_warnings,&
& oor_warnings_fatal, max_out_per_in_field, flush_nc_files, region_out_use_alt_value, max_field_attributes,&
& max_file_attributes, max_axis_attributes, prepend_date, use_mpp_io, field_log_separator,&
& use_refactored_send
& use_refactored_send, auto_merge_nc

! If the module was already initialized do nothing
IF ( module_is_initialized ) RETURN
Expand Down
2 changes: 1 addition & 1 deletion diag_manager/diag_output.F90
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ MODULE diag_output_mod
& get_domainUG, get_diag_axis_name
USE diag_data_mod, ONLY: pack_size, diag_fieldtype, diag_global_att_type, CMOR_MISSING_VALUE, diag_atttype, files
USE time_manager_mod, ONLY: get_calendar_type, valid_calendar_types
USE fms_mod, ONLY: error_mesg, mpp_pe, write_version_number, fms_error_handler, FATAL, note
USE fms_mod, ONLY: error_mesg, write_version_number, fms_error_handler, FATAL, note

#ifdef use_netCDF
USE netcdf, ONLY: NF90_INT, NF90_FLOAT, NF90_CHAR
Expand Down
Loading