From 0ad6396c165cf5dfb3d46ea72abcf52e97081a6a Mon Sep 17 00:00:00 2001 From: "guoqing.ge" Date: Sun, 12 Apr 2026 22:40:37 -0600 Subject: [PATCH] patch to add flexible time levels and output done marker --- src/framework/mpas_stream_list_types.inc | 15 + src/framework/mpas_stream_manager.F | 1214 ++++++++++++++++--- src/framework/mpas_stream_manager_types.inc | 4 +- src/framework/xml_stream_parser.c | 103 +- 4 files changed, 1150 insertions(+), 186 deletions(-) diff --git a/src/framework/mpas_stream_list_types.inc b/src/framework/mpas_stream_list_types.inc index 30d88faa44..1b37574585 100644 --- a/src/framework/mpas_stream_list_types.inc +++ b/src/framework/mpas_stream_list_types.inc @@ -2,6 +2,18 @@ MPAS_STREAM_LIST_DUPLICATE = 1, & MPAS_STREAM_LIST_NOT_FOUND = 2 + ! Maximum number of segments in the output_timelevels specification + integer, parameter :: MAX_TIMELEVEL_SEGMENTS = 32 + + ! timelevel specification + type MPAS_timelevel_spec_type + integer :: n_segments = 0 ! Number of parsed segments + real(kind=RKIND) :: start_hour(MAX_TIMELEVEL_SEGMENTS) ! Start hour for each segment + real(kind=RKIND) :: end_hour(MAX_TIMELEVEL_SEGMENTS) ! End hour for each segment + real(kind=RKIND) :: interval_minutes(MAX_TIMELEVEL_SEGMENTS) ! Output interval in minutes + logical :: is_parsed = .false. ! True if successfully parsed + end type MPAS_timelevel_spec_type + type MPAS_stream_list_type ! Used by list head @@ -17,6 +29,8 @@ character(len=StrKIND) :: filename character(len=StrKIND) :: filename_template character(len=StrKIND) :: filename_interval + character(len=StrKIND) :: output_timelevels = '' + type (MPAS_timelevel_spec_type) :: timelevel_spec type (MPAS_Stream_type), pointer :: stream => null() integer :: timeLevel = 0 integer :: nRecords @@ -24,6 +38,7 @@ integer :: clobber_mode integer :: gattr_update = 1 integer :: io_type + integer :: output_done_marker = 0 type (MPAS_TimeInterval_type), pointer :: recordInterval => null() type (MPAS_stream_list_type), pointer :: alarmList_in => null() type (MPAS_stream_list_type), pointer :: alarmList_out => null() diff --git a/src/framework/mpas_stream_manager.F b/src/framework/mpas_stream_manager.F index b194a98a69..2df6ab409b 100644 --- a/src/framework/mpas_stream_manager.F +++ b/src/framework/mpas_stream_manager.F @@ -15,6 +15,7 @@ module mpas_stream_manager use mpas_stream_list use mpas_sort use mpas_threading + use mpas_string_utils, only : mpas_split_string public :: MPAS_stream_mgr_init, & @@ -45,7 +46,9 @@ module mpas_stream_manager MPAS_stream_mgr_stream_exists, & MPAS_stream_mgr_get_stream_interval, & MPAS_get_stream_filename, & - MPAS_build_stream_filename + MPAS_build_stream_filename, & + get_output_interval_from_timelevels, & + get_next_timelevel_start !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! ! @@ -163,7 +166,7 @@ end subroutine seed_random if (err_local /= MPAS_STREAM_LIST_NOERR) then if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR STREAM_ERROR_WRITE('Problems while creating stream list') - return + return end if ! @@ -173,7 +176,7 @@ end subroutine seed_random if (err_local /= MPAS_STREAM_LIST_NOERR) then if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR STREAM_ERROR_WRITE('Problems while creating input alarm list') - return + return end if ! @@ -183,7 +186,7 @@ end subroutine seed_random if (err_local /= MPAS_STREAM_LIST_NOERR) then if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR STREAM_ERROR_WRITE('Problems while creating output alarm list') - return + return end if ! @@ -221,7 +224,7 @@ subroutine MPAS_stream_mgr_finalize(manager, ierr)!{{{ threadNum = mpas_threading_get_thread_num() - STREAM_DEBUG_WRITE('-- Called MPAS_stream_mgr_finalize()') + STREAM_DEBUG_WRITE('-- Called MPAS_stream_mgr_finalize()') if (present(ierr)) ierr = MPAS_STREAM_MGR_NOERR @@ -281,16 +284,16 @@ end subroutine MPAS_stream_mgr_finalize!}}} !> \author Michael Duda, Doug Jacobsen !> \date 13 June 2014 !> \details - !> Creates a new stream within the stream manager. The "direction" - !> argument may be either MPAS_STREAM_INPUT, MPAS_STREAM_OUTPUT, - !> MPAS_STREAM_INPUT_OUTPUT, or MPAS_STREAM_NONE. The "filename" argument - !> is the template of the filenames that are associated with the stream. - !> Knowing the interval between files, and - !> the filename template, a "referenceTime" argument must be provided to - !> specify the first timestamp appearing in any of the files associated with - !> the stream, thereby determining where the "file breaks" will occur between - !> timestamps. If no "referenceTime" is specified, the start time of the - !> clock associated with the stream handler will be used as the reference + !> Creates a new stream within the stream manager. The "direction" + !> argument may be either MPAS_STREAM_INPUT, MPAS_STREAM_OUTPUT, + !> MPAS_STREAM_INPUT_OUTPUT, or MPAS_STREAM_NONE. The "filename" argument + !> is the template of the filenames that are associated with the stream. + !> Knowing the interval between files, and + !> the filename template, a "referenceTime" argument must be provided to + !> specify the first timestamp appearing in any of the files associated with + !> the stream, thereby determining where the "file breaks" will occur between + !> timestamps. If no "referenceTime" is specified, the start time of the + !> clock associated with the stream handler will be used as the reference !> time. Additionally, the interval between records in the file may be !> specified using the optional "recordInterval" argument; if this argument !> is not supplied, the stream manager will assume that this interval is @@ -301,8 +304,8 @@ end subroutine MPAS_stream_mgr_finalize!}}} !> MPAS_IO_DOUBLE_PRECISION, or MPAS_IO_NATIVE_PRECISION; if this argument is !> not supplied, native precision is assumed. !> The optional argument clobberMode determines how the stream manager will - !> deal with existing files; possible options include MPAS_STREAM_CLOBBER_NEVER, - !> MPAS_STREAM_CLOBBER_APPEND, MPAS_STREAM_CLOBBER_TRUNCATE, + !> deal with existing files; possible options include MPAS_STREAM_CLOBBER_NEVER, + !> MPAS_STREAM_CLOBBER_APPEND, MPAS_STREAM_CLOBBER_TRUNCATE, !> and MPAS_STREAM_CLOBBER_OVERWRITE. The default behavior is to never modify !> existing files (MPAS_STREAM_CLOBBER_NEVER). !> The optional argument ioType specifies the I/O type to use for the @@ -408,14 +411,14 @@ subroutine MPAS_stream_mgr_create_stream(manager, streamID, direction, filename, if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR STREAM_ERROR_WRITE('Problems while creating input alarm list') deallocate(new_stream) - return + return end if call MPAS_stream_list_create(new_stream % alarmList_out, ierr=err_local) if (err_local /= MPAS_STREAM_LIST_NOERR) then if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR STREAM_ERROR_WRITE('Problems while creating output alarm list') deallocate(new_stream) - return + return end if call mpas_pool_create_pool(new_stream % att_pool) call mpas_pool_clone_pool(manager % defaultAtts, new_stream % att_pool) @@ -434,7 +437,7 @@ subroutine MPAS_stream_mgr_create_stream(manager, streamID, direction, filename, STREAM_ERROR_WRITE('Problems while adding stream to list') return end if - + manager % numStreams = manager % numStreams + 1 end if @@ -645,7 +648,7 @@ subroutine MPAS_stream_mgr_destroy_stream(manager, streamID, ierr)!{{{ deallocate(stream % stream) end if deallocate(stream) - + manager % numStreams = manager % numStreams - 1 end if @@ -659,8 +662,8 @@ end subroutine MPAS_stream_mgr_destroy_stream!}}} !> \author Michael Duda !> \date 22 August 2014 !> \details - !> Returns a pointer to the clock associated with the stream manager, - !> in which any stream alarms should be defined before being added to + !> Returns a pointer to the clock associated with the stream manager, + !> in which any stream alarms should be defined before being added to !> the stream manager via the MPAS_stream_mgr_add_alarm() routine. ! !----------------------------------------------------------------------- @@ -1082,7 +1085,7 @@ end subroutine MPAS_stream_mgr_add_stream_fields!}}} ! !----------------------------------------------------------------------- subroutine MPAS_stream_mgr_remove_field(manager, streamID, fieldName, ierr)!{{{ - + implicit none character (len=*), parameter :: sub = 'MPAS_stream_mgr_remove_field' @@ -1242,7 +1245,7 @@ subroutine MPAS_stream_mgr_add_alarm(manager, streamID, alarmID, direction, ierr ! if ( threadNum == 0 ) then if (direction == MPAS_STREAM_INPUT .or. direction == MPAS_STREAM_INPUT_OUTPUT) then - + ! If alarm is not already defined, we need to create a new alarm node nullify(new_alarm) if (.not. MPAS_stream_list_query(manager % alarms_in, alarmID, new_alarm, ierr=err_local)) then @@ -1252,10 +1255,10 @@ subroutine MPAS_stream_mgr_add_alarm(manager, streamID, alarmID, direction, ierr if (err_local /= MPAS_STREAM_LIST_NOERR) then if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR STREAM_ERROR_WRITE('Problems while creating stream list for alarm') - return + return end if nullify(new_alarm % next) - + call MPAS_stream_list_insert(manager % alarms_in, new_alarm, ierr=err_local) if (err_local /= MPAS_STREAM_LIST_NOERR) then if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR @@ -1263,7 +1266,7 @@ subroutine MPAS_stream_mgr_add_alarm(manager, streamID, alarmID, direction, ierr return end if end if - + ! Add specified stream to alarm node stream list allocate(new_xref) new_xref % name = streamID @@ -1274,7 +1277,7 @@ subroutine MPAS_stream_mgr_add_alarm(manager, streamID, alarmID, direction, ierr STREAM_ERROR_WRITE('Problems while adding stream to alarm stream list') return end if - + ! Add alarm to stream alarm list allocate(new_xref) new_xref % name = alarmID @@ -1286,9 +1289,9 @@ subroutine MPAS_stream_mgr_add_alarm(manager, streamID, alarmID, direction, ierr return end if end if - + if (direction == MPAS_STREAM_OUTPUT .or. direction == MPAS_STREAM_INPUT_OUTPUT) then - + ! If alarm is not already defined, we need to create a new alarm node nullify(new_alarm) if (.not. MPAS_stream_list_query(manager % alarms_out, alarmID, new_alarm, ierr=err_local)) then @@ -1298,10 +1301,10 @@ subroutine MPAS_stream_mgr_add_alarm(manager, streamID, alarmID, direction, ierr if (err_local /= MPAS_STREAM_LIST_NOERR) then if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR STREAM_ERROR_WRITE('Problems while creating stream list for alarm') - return + return end if nullify(new_alarm % next) - + call MPAS_stream_list_insert(manager % alarms_out, new_alarm, ierr=err_local) if (err_local /= MPAS_STREAM_LIST_NOERR) then if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR @@ -1309,7 +1312,7 @@ subroutine MPAS_stream_mgr_add_alarm(manager, streamID, alarmID, direction, ierr return end if end if - + ! Add specified stream to alarm node stream list allocate(new_xref) new_xref % name = streamID @@ -1320,7 +1323,7 @@ subroutine MPAS_stream_mgr_add_alarm(manager, streamID, alarmID, direction, ierr STREAM_ERROR_WRITE('Problems while adding stream to alarm stream list') return end if - + ! Add alarm to stream alarm list allocate(new_xref) new_xref % name = alarmID @@ -1409,7 +1412,7 @@ subroutine MPAS_stream_mgr_remove_alarm(manager, streamID, alarmID, direction, i STREAM_ERROR_WRITE('Output alarm '//trim(alarmID)//' does not exist on stream '//trim(streamID)) end if return - end if + end if if (.not. associated(streamNode)) then if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR STREAM_ERROR_WRITE('Alarm '//trim(alarmID)//' does not have stream '//trim(streamID)//' on its stream list.') @@ -1443,7 +1446,7 @@ end subroutine MPAS_stream_mgr_remove_alarm!}}} !> Resets all alarms used by the stream manager. If the optional argument !> 'streamID' is provided, only alarms associated with streams that match !> the 'streamID' regular expression will be - !> reset. If the optional 'direction' argument is provided, only alarms + !> reset. If the optional 'direction' argument is provided, only alarms !> associated with that direction will be reset. ! !----------------------------------------------------------------------- @@ -1458,12 +1461,13 @@ subroutine MPAS_stream_mgr_reset_alarms(manager, streamID, direction, ierr)!{{{ type (MPAS_stream_list_type), pointer :: stream type (MPAS_stream_list_type), pointer :: alarm_cursor + type (MPAS_stream_list_type), pointer :: stream_cursor integer :: local_direction integer :: local_ierr, threadNum logical :: resetAlarms threadNum = mpas_threading_get_thread_num() - + if (present(streamID)) then STREAM_DEBUG_WRITE('-- Called MPAS_stream_mgr_reset_alarms() for stream ' // trim(streamID)) else @@ -1496,7 +1500,7 @@ subroutine MPAS_stream_mgr_reset_alarms(manager, streamID, direction, ierr)!{{{ alarm_cursor => stream % alarmList_in % head do while (associated(alarm_cursor)) if (mpas_is_alarm_ringing(manager % streamClock, alarm_cursor % name, ierr=local_ierr)) then - call mpas_reset_clock_alarm(manager % streamClock, alarm_cursor % name, ierr=local_ierr) + call mpas_reset_clock_alarm(manager % streamClock, alarm_cursor % name, ierr=local_ierr) end if alarm_cursor => alarm_cursor % next end do @@ -1506,7 +1510,14 @@ subroutine MPAS_stream_mgr_reset_alarms(manager, streamID, direction, ierr)!{{{ alarm_cursor => stream % alarmList_out % head do while (associated(alarm_cursor)) if (mpas_is_alarm_ringing(manager % streamClock, alarm_cursor % name, ierr=local_ierr)) then - call mpas_reset_clock_alarm(manager % streamClock, alarm_cursor % name, ierr=local_ierr) + ! For variable output streams, update_variable_output_alarm handles everything + ! (removes old alarm, adds new one with correct timing) + ! For regular streams, just reset the alarm + if (stream % timelevel_spec % is_parsed) then + call update_variable_output_alarm(manager, stream, alarm_cursor % name, ierr=local_ierr) + else + call mpas_reset_clock_alarm(manager % streamClock, alarm_cursor % name, ierr=local_ierr) + end if end if alarm_cursor => alarm_cursor % next end do @@ -1524,7 +1535,7 @@ subroutine MPAS_stream_mgr_reset_alarms(manager, streamID, direction, ierr)!{{{ alarm_cursor => manager % alarms_in % head do while (associated(alarm_cursor)) if (mpas_is_alarm_ringing(manager % streamClock, alarm_cursor % name, ierr=local_ierr)) then - call mpas_reset_clock_alarm(manager % streamClock, alarm_cursor % name, ierr=local_ierr) + call mpas_reset_clock_alarm(manager % streamClock, alarm_cursor % name, ierr=local_ierr) end if alarm_cursor => alarm_cursor % next end do @@ -1535,7 +1546,24 @@ subroutine MPAS_stream_mgr_reset_alarms(manager, streamID, direction, ierr)!{{{ alarm_cursor => manager % alarms_out % head do while (associated(alarm_cursor)) if (mpas_is_alarm_ringing(manager % streamClock, alarm_cursor % name, ierr=local_ierr)) then - call mpas_reset_clock_alarm(manager % streamClock, alarm_cursor % name, ierr=local_ierr) + ! Check if ANY associated stream uses variable output + ! If so, update_variable_output_alarm handles removal/recreation + ! Otherwise, use standard reset + resetAlarms = .false. ! Reuse this flag to track if any variable stream handled it + stream_cursor => alarm_cursor % streamList % head + do while (associated(stream_cursor)) + ! stream_cursor % xref points to the actual stream + if (stream_cursor % xref % timelevel_spec % is_parsed) then + call update_variable_output_alarm(manager, stream_cursor % xref, alarm_cursor % name, ierr=local_ierr) + resetAlarms = .true. ! A variable stream handled it + end if + stream_cursor => stream_cursor % next + end do + ! For non-variable streams, reset the alarm normally + ! But ONLY if no variable stream already handled it + if (.not. resetAlarms) then + call mpas_reset_clock_alarm(manager % streamClock, alarm_cursor % name, ierr=local_ierr) + end if end if alarm_cursor => alarm_cursor % next end do @@ -1553,10 +1581,10 @@ end subroutine MPAS_stream_mgr_reset_alarms!}}} !> \author Michael Duda !> \date 30 September 2014 !> \details - !> Tests whether any I/O alarms in a stream manager are ringing; if the optional - !> 'streamID' argument is given, only alarms for that stream are tested; if - !> the optional argument 'direction' is given, only alarms for the specified - !> direction are tested. If any of the tested alarms is ringing, the function + !> Tests whether any I/O alarms in a stream manager are ringing; if the optional + !> 'streamID' argument is given, only alarms for that stream are tested; if + !> the optional argument 'direction' is given, only alarms for the specified + !> direction are tested. If any of the tested alarms is ringing, the function !> returns .true.; otherwise, it returns .false.. !> Note: This function doesn't support streamID regular expressions ! @@ -1576,7 +1604,7 @@ logical function MPAS_stream_mgr_ringing_alarms(manager, streamID, direction, ie integer :: local_ierr, threadNum threadNum = mpas_threading_get_thread_num() - + STREAM_DEBUG_WRITE('-- Called MPAS_stream_mgr_ringing_alarms()') MPAS_stream_mgr_ringing_alarms = .false. @@ -1784,6 +1812,9 @@ subroutine MPAS_stream_mgr_set_property_int(manager, streamID, propertyName, pro case (MPAS_STREAM_PROPERTY_IOTYPE) stream_cursor % io_type = propertyValue + case (MPAS_STREAM_PROPERTY_DONE_MARKER) + stream_cursor % output_done_marker = propertyValue + case default STREAM_ERROR_WRITE('MPAS_stream_mgr_set_property(): No such property $i' COMMA intArgs=(/propertyName/)) STREAM_ERROR_WRITE(' or specified property is not of type integer.') @@ -1797,7 +1828,7 @@ subroutine MPAS_stream_mgr_set_property_int(manager, streamID, propertyName, pro if ( .not. setProperties ) then STREAM_ERROR_WRITE('No stream matching '//trim(streamID)//' exists in call to MPAS_stream_mgr_set_property().') if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR - return + return end if @@ -1857,11 +1888,26 @@ subroutine MPAS_stream_mgr_set_property_char(manager, streamID, propertyName, pr case (MPAS_STREAM_PROPERTY_FILENAME_INTV) stream_cursor % filename_interval = propertyValue + case (MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS) + stream_cursor % output_timelevels = propertyValue + ! Parse the timelevels string and store it + if (len_trim(propertyValue) > 0) then + call parse_all_timelevels(propertyValue, stream_cursor % timelevel_spec, & + stream_cursor % name, err_local) + if (err_local /= 0) then + if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR + end if + else + ! Saft guard: reset timelevel_spec if output_timelevels is somehow cleared + stream_cursor % timelevel_spec % n_segments = 0 + stream_cursor % timelevel_spec % is_parsed = .false. + end if + case (MPAS_STREAM_PROPERTY_REF_TIME) call mpas_set_time(stream_cursor % referenceTime, dateTimeString=propertyValue) case (MPAS_STREAM_PROPERTY_RECORD_INTV) - + ! The interval between records may not have been allocated if the optional recordInterval ! argument was not provided when the stream was created if (.not. associated(stream_cursor % recordInterval)) then @@ -1882,7 +1928,7 @@ subroutine MPAS_stream_mgr_set_property_char(manager, streamID, propertyName, pr if ( .not. setProperties ) then STREAM_ERROR_WRITE('No stream matching '//trim(streamID)//' exists in call to MPAS_stream_mgr_set_property().') if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR - return + return end if @@ -1955,7 +2001,7 @@ subroutine MPAS_stream_mgr_set_property_logical(manager, streamID, propertyName, if ( .not. setProperties ) then STREAM_ERROR_WRITE('No stream matching '//trim(streamID)//' exists in call to MPAS_stream_mgr_set_property().') if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR - return + return end if @@ -2000,7 +2046,7 @@ subroutine MPAS_stream_mgr_get_property_int(manager, streamID, propertyName, pro if (.not. MPAS_stream_list_query(manager % streams, streamID, stream_cursor, ierr=err_local)) then STREAM_ERROR_WRITE('Stream '//trim(streamID)//' does not exist in call to MPAS_stream_mgr_get_property().') if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR - return + return end if ! @@ -2070,7 +2116,7 @@ subroutine MPAS_stream_mgr_get_property_char(manager, streamID, propertyName, pr if (.not. MPAS_stream_list_query(manager % streams, streamID, stream_cursor, ierr=err_local)) then STREAM_ERROR_WRITE('Stream '//trim(streamID)//' does not exist in call to MPAS_stream_mgr_get_property().') if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR - return + return end if ! @@ -2084,6 +2130,9 @@ subroutine MPAS_stream_mgr_get_property_char(manager, streamID, propertyName, pr case (MPAS_STREAM_PROPERTY_FILENAME_INTV) propertyValue = stream_cursor % filename_interval + case (MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS) + propertyValue = stream_cursor % output_timelevels + case (MPAS_STREAM_PROPERTY_REF_TIME) call mpas_get_time(stream_cursor % referenceTime, dateTimeString=propertyValue) @@ -2094,9 +2143,9 @@ subroutine MPAS_stream_mgr_get_property_char(manager, streamID, propertyName, pr ! assume that the interval is the shortest interval between alarms on the stream; since ! recordInterval is only used for reading, use the input alarm list in this check. if (.not. associated(stream_cursor % recordInterval)) then - + ! - ! If no direction is specified, return the read interval, since this was the only historic + ! If no direction is specified, return the read interval, since this was the only historic ! use of the recordInterval for a stream. ! if (present(direction)) then @@ -2177,7 +2226,7 @@ subroutine MPAS_stream_mgr_get_property_logical(manager, streamID, propertyName, if (.not. MPAS_stream_list_query(manager % streams, streamID, stream_cursor, ierr=err_local)) then STREAM_ERROR_WRITE('Stream '//trim(streamID)//' does not exist in call to MPAS_stream_mgr_get_property().') if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR - return + return end if ! @@ -2255,14 +2304,14 @@ subroutine MPAS_stream_mgr_add_pkg(manager, streamID, packageName, ierr)!{{{ ! ! Add package to the packages pool for the stream ! - call mpas_pool_add_package(stream_cursor % pkg_pool, packageName, package) + call mpas_pool_add_package(stream_cursor % pkg_pool, packageName, package) end do if ( .not. addedPackages ) then STREAM_ERROR_WRITE('No stream matching '//trim(streamID)//' exists in call to MPAS_stream_mgr_add_pkg().') if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR - return + return end if @@ -2320,7 +2369,7 @@ subroutine MPAS_stream_mgr_remove_pkg(manager, streamID, packageName, ierr)!{{{ if ( .not. removedPackage ) then STREAM_ERROR_WRITE('No stream matching '//trim(streamID)//' exists in call to MPAS_stream_mgr_remove_pkg().') if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR - return + return end if @@ -2384,7 +2433,7 @@ subroutine MPAS_stream_mgr_add_att_int(manager, attName, attVal, streamID, ierr) if (mpas_pool_config_type(att_pool, attName) /= MPAS_POOL_FATAL) then STREAM_ERROR_WRITE('Attribute '//trim(attName)//' in stream '//trim(stream_cursor % name)//' is not of type integer.') if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR - return + return end if call mpas_pool_add_config(att_pool, attName, attVal) else @@ -2395,7 +2444,7 @@ subroutine MPAS_stream_mgr_add_att_int(manager, attName, attVal, streamID, ierr) if ( .not. addedAttribute ) then STREAM_ERROR_WRITE('No stream matching '//trim(streamID)//' exists in call to MPAS_stream_mgr_add_att().') if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR - return + return end if else @@ -2413,7 +2462,7 @@ subroutine MPAS_stream_mgr_add_att_int(manager, attName, attVal, streamID, ierr) if (mpas_pool_config_type(att_pool, attName) /= MPAS_POOL_FATAL) then STREAM_ERROR_WRITE('Attribute '//trim(attName)//' in streamManager is not of type integer.') if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR - return + return end if call mpas_pool_add_config(att_pool, attName, attVal) else @@ -2483,7 +2532,7 @@ subroutine MPAS_stream_mgr_add_att_real(manager, attName, attVal, streamID, ierr if (mpas_pool_config_type(att_pool, attName) /= MPAS_POOL_FATAL) then STREAM_ERROR_WRITE('Attribute '//trim(attName)//' in stream '//trim(stream_cursor % name)//' is not of type real.') if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR - return + return end if call mpas_pool_add_config(att_pool, attName, attVal) else @@ -2494,7 +2543,7 @@ subroutine MPAS_stream_mgr_add_att_real(manager, attName, attVal, streamID, ierr if ( .not. addedAttribute ) then STREAM_ERROR_WRITE('No stream matching '//trim(streamID)//' exists in call to MPAS_stream_mgr_add_att().') if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR - return + return end if else @@ -2513,7 +2562,7 @@ subroutine MPAS_stream_mgr_add_att_real(manager, attName, attVal, streamID, ierr if (mpas_pool_config_type(att_pool, attName) /= MPAS_POOL_FATAL) then STREAM_ERROR_WRITE('Attribute '//trim(attName)//' in streamManager is not of type real.') if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR - return + return end if call mpas_pool_add_config(att_pool, attName, attVal) else @@ -2583,7 +2632,7 @@ subroutine MPAS_stream_mgr_add_att_char(manager, attName, attVal, streamID, ierr if (mpas_pool_config_type(att_pool, attName) /= MPAS_POOL_FATAL) then STREAM_ERROR_WRITE('Attribute '//trim(attName)//' in stream '//trim(stream_cursor % name)//' is not of type character.') if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR - return + return end if call mpas_pool_add_config(att_pool, attName, attVal) else @@ -2594,7 +2643,7 @@ subroutine MPAS_stream_mgr_add_att_char(manager, attName, attVal, streamID, ierr if ( .not. addedAttribute ) then STREAM_ERROR_WRITE('No stream matching '//trim(streamID)//' exists in call to MPAS_stream_mgr_add_att().') if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR - return + return end if else @@ -2613,7 +2662,7 @@ subroutine MPAS_stream_mgr_add_att_char(manager, attName, attVal, streamID, ierr if (mpas_pool_config_type(att_pool, attName) /= MPAS_POOL_FATAL) then STREAM_ERROR_WRITE('Attribute '//trim(attName)//' in streamManager is not of type character.') if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR - return + return end if call mpas_pool_add_config(att_pool, attName, attVal) else @@ -2683,7 +2732,7 @@ subroutine MPAS_stream_mgr_add_att_logical(manager, attName, attVal, streamID, i if (mpas_pool_config_type(att_pool, attName) /= MPAS_POOL_FATAL) then STREAM_ERROR_WRITE('Attribute '//trim(attName)//' in stream '//trim(stream_cursor % name)//' is not of type logical.') if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR - return + return end if call mpas_pool_add_config(att_pool, attName, attVal) else @@ -2694,7 +2743,7 @@ subroutine MPAS_stream_mgr_add_att_logical(manager, attName, attVal, streamID, i if ( .not. addedAttribute ) then STREAM_ERROR_WRITE('No stream matching '//trim(streamID)//' exists in call to MPAS_stream_mgr_add_att().') if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR - return + return end if else @@ -2713,7 +2762,7 @@ subroutine MPAS_stream_mgr_add_att_logical(manager, attName, attVal, streamID, i if (mpas_pool_config_type(att_pool, attName) /= MPAS_POOL_FATAL) then STREAM_ERROR_WRITE('Attribute '//trim(attName)//' in streamManger is not of type logical.') if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR - return + return end if call mpas_pool_add_config(att_pool, attName, attVal) else @@ -2737,11 +2786,11 @@ end subroutine MPAS_stream_mgr_add_att_logical!}}} !> stream is only written if any of its alarms are ringing. !> The "timeLevel" argument optionally specifies, for fields with multiple !> time levels, the time level from which fields should be written. - !> The "mgLevel" argument optionally specifies, for fields that exist for + !> The "mgLevel" argument optionally specifies, for fields that exist for !> multiple grid levels, the grid level from which fields should be written. - !> The "forceWriteNow" argument optionally specifies that all streams -- or - !> the stream specified by the "streamID" argument -- should be written by - !> the call regardless of whether any alarms associated with the stream(s) + !> The "forceWriteNow" argument optionally specifies that all streams -- or + !> the stream specified by the "streamID" argument -- should be written by + !> the call regardless of whether any alarms associated with the stream(s) !> are ringing. The "writeTime" argument optionally specifies a time stamp !> to be used for expanding a filename template, when it is not passed in, !> the current time of the streamManager's clock is used to expand filename @@ -2833,7 +2882,7 @@ subroutine MPAS_stream_mgr_write(manager, streamID, timeLevel, mgLevel, forceWri if ( .not. wroteStreams ) then STREAM_ERROR_WRITE('No output stream matching '//trim(streamID)//' exists in call to MPAS_stream_mgr_write().') if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR - return + return end if else nullify(stream_cursor) @@ -2880,11 +2929,11 @@ end subroutine MPAS_stream_mgr_write !}}} !> stream is only written if any of its alarms are ringing. !> The "timeLevel" argument optionally specifies, for fields with multiple !> time levels, the time level from which fields should be written. - !> The "mgLevel" argument optionally specifies, for fields that exist for + !> The "mgLevel" argument optionally specifies, for fields that exist for !> multiple grid levels, the grid level from which fields should be written. - !> The "forceWriteNow" argument optionally specifies that all streams -- or - !> the stream specified by the "streamID" argument -- should be written by - !> the call regardless of whether any alarms associated with the stream(s) + !> The "forceWriteNow" argument optionally specifies that all streams -- or + !> the stream specified by the "streamID" argument -- should be written by + !> the call regardless of whether any alarms associated with the stream(s) !> are ringing. The "writeTime" argument optionally specifies a time stamp !> to be used for expanding a filename template, when it is not passed in, !> the current time of the streamManager's clock is used to expand filename @@ -3013,7 +3062,7 @@ subroutine MPAS_stream_mgr_block_write(manager, writeBlock, streamID, timeLevel, if ( .not. wroteStreams ) then STREAM_ERROR_WRITE('No output stream matching '//trim(streamID)//' exists in call to MPAS_stream_mgr_block_write().') if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR - return + return end if else nullify(stream_cursor) @@ -3054,7 +3103,7 @@ subroutine MPAS_stream_mgr_block_write(manager, writeBlock, streamID, timeLevel, call mpas_dmpar_finalize(debugContext % dminfo) deallocate(debugContext % dminfo) - deallocate(debugContext) + deallocate(debugContext) end if call mpas_threading_barrier() @@ -3090,8 +3139,8 @@ subroutine write_stream(manager, stream, blockID, timeLevel, mgLevel, forceWrite integer, intent(out) :: ierr type (MPAS_stream_list_type), pointer :: alarm_cursor - type (MPAS_Time_type) :: ref_time - type (MPAS_TimeInterval_type) :: temp_interval + type (MPAS_Time_type) :: ref_time, start_time, current_time, alarmTime_local + type (MPAS_TimeInterval_type) :: temp_interval, time_diff, alarmInterval_local type (MPAS_TimeInterval_type) :: filename_interval character (len=StrKIND) :: now_string, time_string character (len=StrKIND) :: temp_filename, actualWhen @@ -3100,6 +3149,9 @@ subroutine write_stream(manager, stream, blockID, timeLevel, mgLevel, forceWrite logical :: clobberRecords, clobberFiles, truncateFiles integer :: maxRecords, tempRecord integer :: local_ierr, threadNum + integer (kind=I8KIND) :: seconds_diff + real (kind=RKIND) :: forecast_hour, next_start_hour + real (kind=RKIND) :: interval_minutes, next_interval_minutes threadNum = mpas_threading_get_thread_num() @@ -3141,6 +3193,48 @@ subroutine write_stream(manager, stream, blockID, timeLevel, mgLevel, forceWrite return end if + ! + ! If stream uses output_timelevels, check if current forecast hour is within defined range + ! This prevents output when the alarm rings but we're past the timelevels range + ! + if (stream % timelevel_spec % is_parsed) then + ! Calculate current forecast hour + start_time = mpas_get_clock_time(manager % streamClock, MPAS_START_TIME, ierr=local_ierr) + current_time = mpas_get_clock_time(manager % streamClock, MPAS_NOW, ierr=local_ierr) + time_diff = current_time - start_time + call mpas_get_timeInterval(time_diff, S_i8=seconds_diff, ierr=local_ierr) + forecast_hour = real(seconds_diff, RKIND) / 3600.0_RKIND + + ! Check if forecast hour is within any defined timelevel range + call get_output_interval_from_timelevels(stream % timelevel_spec, forecast_hour, interval_minutes, local_ierr) + if (local_ierr /= MPAS_STREAM_MGR_NOERR) then + ! Outside defined timelevels range - check if there's a future range to schedule + call get_next_timelevel_start(stream % timelevel_spec, forecast_hour, next_start_hour, next_interval_minutes, local_ierr) + + alarm_cursor => stream % alarmList_out % head + do while (associated(alarm_cursor)) + if (mpas_is_alarm_ringing(manager % streamClock, alarm_cursor % name, ierr=local_ierr)) then + call mpas_reset_clock_alarm(manager % streamClock, alarm_cursor % name, ierr=local_ierr) + call mpas_remove_clock_alarm(manager % streamClock, alarm_cursor % name, ierr=local_ierr) + + ! If there's a future range, schedule alarm for when it starts + if (next_start_hour > 0.0_RKIND) then + STREAM_DEBUG_WRITE('-- Stream '//trim(stream % name)//' rescheduling alarm for next timelevel range.') + call mpas_set_timeInterval(time_diff, dt=next_start_hour * 3600.0_RKIND, ierr=local_ierr) + alarmTime_local = start_time + time_diff + call mpas_set_timeInterval(alarmInterval_local, dt=next_interval_minutes * 60.0_RKIND, ierr=local_ierr) + call mpas_add_clock_alarm(manager % streamClock, alarm_cursor % name, alarmTime_local, & + alarmTimeInterval=alarmInterval_local, ierr=local_ierr) + else + STREAM_DEBUG_WRITE('-- Stream '//trim(stream % name)//' output complete: past all output_timelevels ranges.') + end if + end if + alarm_cursor => alarm_cursor % next + end do + return + end if + end if + ! ! Work out file clobbering options ! @@ -3169,7 +3263,11 @@ subroutine write_stream(manager, stream, blockID, timeLevel, mgLevel, forceWrite ! stream, in which case we create the stream from scratch ! if (.not. stream % valid) then - if ( stream % filename_interval /= 'none' ) then + if ( stream % filename_interval == 'output' ) then + ! Use actual write time for filename (each output gets unique file) + call mpas_get_time(writeTime, dateTimeString=time_string) + call mpas_expand_string(time_string, blockID, stream % filename_template, stream % filename) + else if ( stream % filename_interval /= 'none' ) then call mpas_set_timeInterval(filename_interval, timeString=stream % filename_interval) call mpas_build_stream_filename(stream % referenceTime, writeTime, filename_interval, stream % filename_template, blockID, stream % filename, ierr=local_ierr) else @@ -3183,7 +3281,7 @@ subroutine write_stream(manager, stream, blockID, timeLevel, mgLevel, forceWrite ! Based on clobber_mode, determine if it matters if the file exists or not. if ( stream % clobber_mode == MPAS_STREAM_CLOBBER_OVERWRITE .or. stream % clobber_mode == MPAS_STREAM_CLOBBER_APPEND ) then STREAM_DEBUG_WRITE(' -- Cobber mode is overwrite or append...') - + ! Check if the file exists if (manager % ioContext % dminfo % my_proc_id == IO_NODE) then inquire(file=trim(stream % filename), exist=recordSeek) @@ -3220,12 +3318,12 @@ subroutine write_stream(manager, stream, blockID, timeLevel, mgLevel, forceWrite if ( recordSeek ) then STREAM_DEBUG_WRITE(' -- File exists on disk: ' // trim(stream % filename)) call mpas_get_time(writeTime, dateTimeString=now_string) - + ! Look for exact record (in the case of overwriting) ! This also gets the number of records in the file. stream % nRecords = MPAS_seekStream(stream % stream, now_string, MPAS_STREAM_EXACT_TIME, actualWhen, maxRecords, local_ierr) STREAM_DEBUG_WRITE(' -- Seeked record is: $i with current records equal to $i and an error of $i' COMMA intArgs=(/stream % nRecords COMMA maxRecords COMMA local_ierr/)) - + if ( stream % nRecords == 0 ) then ! If we didn't find an exact time, set record to point to the end of the file. ! This might result in non-monotonic timestamps in the output file. @@ -3244,7 +3342,11 @@ subroutine write_stream(manager, stream, blockID, timeLevel, mgLevel, forceWrite stream % valid = .true. else - if ( stream % filename_interval /= 'none' ) then + if ( stream % filename_interval == 'output' ) then + ! Use actual write time for filename (each output gets unique file) + call mpas_get_time(writeTime, dateTimeString=time_string) + call mpas_expand_string(time_string, blockID, stream % filename_template, temp_filename) + else if ( stream % filename_interval /= 'none' ) then call mpas_set_timeInterval(filename_interval, timeString=stream % filename_interval) call mpas_build_stream_filename(stream % referenceTime, writeTime, filename_interval, stream % filename_template, blockID, temp_filename, ierr=local_ierr) else @@ -3269,7 +3371,7 @@ subroutine write_stream(manager, stream, blockID, timeLevel, mgLevel, forceWrite ! Based on clobber_mode, determine if it matters if the file exists or not. if ( stream % clobber_mode == MPAS_STREAM_CLOBBER_OVERWRITE .or. stream % clobber_mode == MPAS_STREAM_CLOBBER_APPEND ) then STREAM_DEBUG_WRITE(' -- Cobber mode is overwrite or append...') - + ! Check if the file exists if (manager % ioContext % dminfo % my_proc_id == IO_NODE) then inquire(file=trim(stream % filename), exist=recordSeek) @@ -3308,12 +3410,12 @@ subroutine write_stream(manager, stream, blockID, timeLevel, mgLevel, forceWrite if ( recordSeek ) then STREAM_DEBUG_WRITE(' -- File exists on disk: ' // trim(stream % filename)) call mpas_get_time(writeTime, dateTimeString=now_string) - + ! Look for exact record (in the case of overwriting) ! This also gets the number of records in the file. stream % nRecords = MPAS_seekStream(stream % stream, now_string, MPAS_STREAM_EXACT_TIME, actualWhen, maxRecords, local_ierr) STREAM_DEBUG_WRITE(' -- Seeked record is: $i with current records equal to $i and an error of $i' COMMA intArgs=(/stream % nRecords COMMA maxRecords COMMA local_ierr/)) - + if ( stream % nRecords == 0 ) then ! If we didn't find an exact time, set record to point to the end of the file. ! This might result in non-monotonic timestamps in the output file. @@ -3333,12 +3435,12 @@ subroutine write_stream(manager, stream, blockID, timeLevel, mgLevel, forceWrite stream % nRecords = stream % nRecords + 1 if ( stream % clobber_mode == MPAS_STREAM_CLOBBER_OVERWRITE .or. stream % clobber_mode == MPAS_STREAM_CLOBBER_APPEND ) then call mpas_get_time(writeTime, dateTimeString=now_string) - + ! Look for exact record (in the case of overwriting) ! This also gets the number of records in the file. tempRecord = MPAS_seekStream(stream % stream, now_string, MPAS_STREAM_EXACT_TIME, actualWhen, maxRecords, local_ierr) STREAM_DEBUG_WRITE(' -- Seeked record is: $i with current records equal to $i and an error of $i' COMMA intArgs=(/tempRecord COMMA maxRecords COMMA local_ierr/)) - + if ( tempRecord /= 0 .and. stream % nRecords < maxRecords ) then ! If we found an exact result ! This might result in non-monotonic timestamps in the output file. @@ -3378,9 +3480,9 @@ subroutine write_stream(manager, stream, blockID, timeLevel, mgLevel, forceWrite call prewrite_reindex(manager % allFields, manager % allPackages, stream % field_pool, stream % field_pkg_pool) end if - ! + ! ! Write the stream - ! + ! STREAM_DEBUG_WRITE(' -- Writing stream ' // trim(stream % name)) call MPAS_writeStream(stream % stream, stream % nRecords, ierr=local_ierr) @@ -3417,6 +3519,11 @@ subroutine write_stream(manager, stream, blockID, timeLevel, mgLevel, forceWrite if ( swapRecords ) then stream % nRecords = tempRecord end if + + ! Write done marker file if enabled for this stream + if (stream % output_done_marker == 1) then + call write_done_marker(stream % filename, manager % ioContext % dminfo, stream % blockWrite) + end if end if end subroutine write_stream !}}} @@ -3435,19 +3542,19 @@ end subroutine write_stream !}}} !> is only read if any of its alarms are ringing. !> The "timeLevel" argument optionally specifies, for fields with multiple !> time levels, the time level into which fields should be read. - !> The "mgLevel" argument optionally specifies, for fields that exist for + !> The "mgLevel" argument optionally specifies, for fields that exist for !> multiple grid levels, the grid level into which fields should be read. !> The "when" argument optionally specifies the timestamp from which fields !> are to be read. !> The "whence" argument optionally specifies the method for determining !> the timestamp to read from in case an exact match is not found for the !> read timestamp, which is the current time unless the optional "when" - !> argument is given; possible values are MPAS_STREAM_EXACT_TIME, - !> MPAS_STREAM_NEAREST, MPAS_STREAM_LATEST_BEFORE, - !> MPAS_STREAM_LATEST_STRICTLY_BEFORE, MPAS_STREAM_EARLIEST_AFTER, or + !> argument is given; possible values are MPAS_STREAM_EXACT_TIME, + !> MPAS_STREAM_NEAREST, MPAS_STREAM_LATEST_BEFORE, + !> MPAS_STREAM_LATEST_STRICTLY_BEFORE, MPAS_STREAM_EARLIEST_AFTER, or !> MPAS_STREAM_EARLIEST_STRICTLY_AFTER. - !> The optional output argument "actualWhen" returns the actual time read - !> from a stream in case an exact match for the "when" time is not found, + !> The optional output argument "actualWhen" returns the actual time read + !> from a stream in case an exact match for the "when" time is not found, !> and a nearby time is selected using the "whence" argument. ! !----------------------------------------------------------------------- @@ -3473,7 +3580,7 @@ subroutine MPAS_stream_mgr_read(manager, streamID, timeLevel, mgLevel, rightNow, integer :: local_whence integer :: local_ierr integer :: temp_ierr - type (MPAS_Time_type) :: now_time + type (MPAS_Time_type) :: now_time integer :: threadNum logical :: readStreams @@ -3529,7 +3636,7 @@ subroutine MPAS_stream_mgr_read(manager, streamID, timeLevel, mgLevel, rightNow, nullify(stream_cursor) do while (MPAS_stream_list_query(manager % streams, streamID, stream_cursor, ierr=ierr)) STREAM_DEBUG_WRITE('-- Handling read of stream '//trim(stream_cursor % name)) - + ! Verify that the stream is an input stream if (stream_cursor % direction == MPAS_STREAM_INPUT .or. stream_cursor % direction == MPAS_STREAM_INPUT_OUTPUT) then readStreams = .true. @@ -3541,18 +3648,18 @@ subroutine MPAS_stream_mgr_read(manager, streamID, timeLevel, mgLevel, rightNow, if ( .not. readStreams ) then STREAM_ERROR_WRITE('No input stream matching '//trim(streamID)//' exists in call to MPAS_stream_mgr_read().') if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR - return + return end if else nullify(stream_cursor) stream_cursor => manager % streams % head do while (associated(stream_cursor)) STREAM_DEBUG_WRITE('-- Handling read of stream '//trim(stream_cursor % name)) - + ! Verify that the stream is an input stream if (stream_cursor % direction == MPAS_STREAM_INPUT .or. & stream_cursor % direction == MPAS_STREAM_INPUT_OUTPUT) then - + ! ! What should be the meaning of actualWhen if we read multiple streams in this call? ! @@ -3562,7 +3669,7 @@ subroutine MPAS_stream_mgr_read(manager, streamID, timeLevel, mgLevel, rightNow, local_ierr = MPAS_STREAM_MGR_ERROR end if end if - + stream_cursor => stream_cursor % next end do end if @@ -3675,10 +3782,14 @@ subroutine read_stream(manager, stream, timeLevel, mgLevel, forceReadNow, when, ! ! First we need to build the filename for the current read time. ! - if ( stream % filename_interval /= 'none' ) then + if ( stream % filename_interval == 'output' ) then + ! Use actual read time for filename + call mpas_expand_string(when, blockID_local, stream % filename_template, temp_filename) + else if ( stream % filename_interval /= 'none' ) then call mpas_set_time(now_time, dateTimeString=when, ierr=local_ierr) + call mpas_set_timeInterval(filename_interval, timeString=stream % filename_interval) - + call mpas_build_stream_filename(stream % referenceTime, now_time, filename_interval, stream % filename_template, blockID_local, temp_filename, ierr=local_ierr) else call mpas_expand_string(when, blockID_local, stream % filename_template, temp_filename) @@ -3774,7 +3885,9 @@ subroutine read_stream(manager, stream, timeLevel, mgLevel, forceReadNow, when, end if retestFile = .false. - if ( trim(stream % filename_interval) /= 'none' .and. whence /= MPAS_STREAM_EXACT_TIME ) then + if ( trim(stream % filename_interval) /= 'none' .and. & + trim(stream % filename_interval) /= 'output' .and. & + whence /= MPAS_STREAM_EXACT_TIME ) then currentTime = mpas_get_clock_time(manager % streamClock, MPAS_NOW, ierr=local_ierr) call mpas_set_timeInterval(filenameInterval, timeString=stream % filename_interval, ierr=local_ierr) @@ -3846,9 +3959,13 @@ subroutine read_stream(manager, stream, timeLevel, mgLevel, forceReadNow, when, STREAM_DEBUG_WRITE(' --- Retesting file... ') call mpas_get_time(filenameTime, dateTimeString=test_when) - call mpas_set_timeInterval(filename_interval, timeString=stream % filename_interval) - - call mpas_build_stream_filename(stream % referenceTime, filenameTime, filename_interval, stream % filename_template, blockID_local, test_filename, ierr=local_ierr) + if ( stream % filename_interval == 'output' ) then + ! Use actual time for filename + call mpas_expand_string(test_when, blockID_local, stream % filename_template, test_filename) + else + call mpas_set_timeInterval(filename_interval, timeString=stream % filename_interval) + call mpas_build_stream_filename(stream % referenceTime, filenameTime, filename_interval, stream % filename_template, blockID_local, test_filename, ierr=local_ierr) + end if STREAM_DEBUG_WRITE(' --- Retesting filename is ' // trim(test_filename)) @@ -3962,9 +4079,9 @@ subroutine read_stream(manager, stream, timeLevel, mgLevel, forceReadNow, when, stream % timeLevel = timeLevel end if - ! + ! ! Read the stream - ! + ! call MPAS_readStream(stream % stream, stream % nRecords, ierr=local_ierr) if (local_ierr /= MPAS_STREAM_NOERR) then ierr = MPAS_STREAM_MGR_ERROR @@ -4000,13 +4117,13 @@ end subroutine read_stream !}}} !----------------------------------------------------------------------- ! routine MPAS_stream_mesg ! - !> \brief Write an error message (if the level requires it) to + !> \brief Write an error message (if the level requires it) to !> \author Michael Duda, Doug Jacobsen !> \date 07/16/2014 - !> \details Using the input error level, + !> \details Using the input error level, !----------------------------------------------------------------------- subroutine MPAS_stream_mesg(level, mesg)!{{{ - + use mpas_dmpar implicit none @@ -4079,7 +4196,11 @@ subroutine mpas_get_stream_filename(manager, streamID, when, blockID, filename, ! ! First we need to build the filename for the current read time. ! - if ( streamCursor % filename_interval /= 'none' ) then + if ( streamCursor % filename_interval == 'output' ) then + ! Use actual time for filename + call mpas_get_time(now_time, dateTimeString=when_string, ierr=err_local) + call mpas_expand_string(when_string, blockID_local, streamCursor % filename_template, filename) + else if ( streamCursor % filename_interval /= 'none' ) then call mpas_set_timeInterval(filename_interval, timeString=streamCursor % filename_interval) call mpas_build_stream_filename(streamCursor % referenceTime, now_time, filename_interval, streamCursor % filename_template, blockID_local, filename, ierr=err_local) else @@ -4103,7 +4224,7 @@ end subroutine mpas_get_stream_filename !}}} !> \brief Construct the filename that contains a specific time in a stream !> \author Michael Duda, Doug Jacobsen !> \date 21 August 2014 - !> \details + !> \details !> Given a filename template and the information necessary to determine the time !> in the stream that matches a time available in any of the files associated with !> the stream, returns a specific filename that should contain that time. @@ -4114,7 +4235,7 @@ end subroutine mpas_get_stream_filename !}}} !> !> This is a low level subroutine to complement the !> mpas_get_stream_Filename routine - !> + !> !> Return error codes: !> 0 no error !----------------------------------------------------------------------- @@ -4161,7 +4282,7 @@ subroutine mpas_build_stream_filename(ref_time, when, filename_interval, filenam call mpas_get_timeInterval(intv, timeString=temp_string) STREAM_DEBUG_WRITE(' ** intv is: ' // trim(temp_string)) - call mpas_interval_division(ref_time, intv, filename_interval, nrecs, rem) + call mpas_interval_division(ref_time, intv, filename_interval, nrecs, rem) ! STREAM_DEBUG_WRITE(' ** Divisions are: $i' COMMA intArgs=(/nrecs/)) @@ -4192,7 +4313,7 @@ end subroutine mpas_build_stream_filename !}}} !> \brief This is a utility routine to build a stream type from a pool representing a stream. !> \author Michael Duda, Doug Jacobsen !> \date 07/23/2014 - !> \details + !> \details !> This routine will take as input a pool representing a stream. !> It will then generate a stream type based on this pool, and return that. !----------------------------------------------------------------------- @@ -4302,7 +4423,7 @@ end subroutine gen_random ! ! Write output_interval to stream ! - IF (stream %filename_interval(1:4) /= "none") THEN + IF (stream %filename_interval(1:4) /= "none" .and. stream %filename_interval /= "output") THEN call mpas_set_timeInterval(filename_interval, timeString=stream %filename_interval) call mpas_get_timeInterval(filename_interval,M=output_interval) call mpas_writeStreamAtt(stream%stream, 'output_interval',output_interval,syncVal=.true., ierr=local_ierr) @@ -4411,8 +4532,8 @@ end subroutine build_stream !}}} !> \brief Updates the time level for fields in a stream !> \author Michael Duda, Doug Jacobsen !> \date 07/23/2014 - !> \details - !> For an existing stream, updates the time levels for all fields in + !> \details + !> For an existing stream, updates the time levels for all fields in !> the stream so that subsequent reads/writes of the stream will read !> from / write to the specified time level. !----------------------------------------------------------------------- @@ -4527,7 +4648,7 @@ end subroutine update_stream !}}} !> \brief Checks whether a stream has any active packages (or none at all) !> \author Michael Duda !> \date 23 September 2014 - !> \details + !> \details !> This function determines whether a stream has any active packages !> associated with it. If the stream has at least one active package, !> or no packages at all, the function returns true; else, if all packages @@ -4576,32 +4697,32 @@ end function stream_active_pkg_check !}}} !> \brief Parses a semi-colon-separated list of package names, indicating whether any are active !> \author Michael Duda !> \date 19 March 2015 - !> \details - !> This function determines whether any of the named strings in + !> \details + !> This function determines whether any of the named strings in !> the semi-colon-separated list provided in the 'packages' argument are - !> active. + !> active. !> If any of the packages does not exist in the package pool, the optional - !> argument ierr is set to a non-zero value; otherwise, if all packages exist, + !> argument ierr is set to a non-zero value; otherwise, if all packages exist, !> ierr will be set to zero upon return. ! !----------------------------------------------------------------------- logical function parse_package_list(package_pool, packages, ierr) result(active) - + implicit none - + type (mpas_pool_type), intent(in) :: package_pool character (len=*), intent(in) :: packages integer, intent(out), optional :: ierr - + integer :: i, j, slen integer :: err_level logical, pointer :: pkg_val - - + + if (present(ierr)) ierr = 0 - + slen = len_trim(packages) - + ! ! No packages @@ -4610,7 +4731,7 @@ logical function parse_package_list(package_pool, packages, ierr) result(active) active = .true. return end if - + active = .false. err_level = mpas_pool_get_error_level() @@ -4842,8 +4963,8 @@ end subroutine exch_all_halos !}}} !> \brief Determines whether a dimension represents a decomposed dimension or not !> \author Michael Duda !> \date 24 September 2014 - !> \details - !> This function determines whether the name of the input argument is + !> \details + !> This function determines whether the name of the input argument is !> a decompsed dimension or not. Currently in MPAS, the only decomposed !> dimensions are: !> nCells @@ -4878,8 +4999,8 @@ end function is_decomposed_dim !}}} !> \brief Reindex connectivity fields from local to global index space. !> \author Doug Jacobsen, Michael Duda !> \date 24 September 2014 - !> \details - !> For any connectivity fields contained in the stream to be written, + !> \details + !> For any connectivity fields contained in the stream to be written, !> whose fields include those in the streamFields pool, save the locally !> indexed fields in module variables *_save, and allocate new arrays for !> the fields, which are set to contain global indices. @@ -5215,12 +5336,12 @@ end subroutine prewrite_reindex !}}} !> \brief Reindex connectivity fields from global to local index space. !> \author Doug Jacobsen, Michael Duda !> \date 24 September 2014 - !> \details - !> For any connectivity fields contained in the stream to be written, + !> \details + !> For any connectivity fields contained in the stream to be written, !> whose fields include those in the streamFields pool, restore the locally !> indexed fields from module variables *_save. !> This routine should be called immediately after a write of a stream. - !> + !> !> NB: Even if the write of a stream fails, it is important to stil call !> this routine to reset the connectivity fields to contain local indices. !> @@ -5263,35 +5384,35 @@ subroutine postwrite_reindex(allFields, streamFields) !{{{ if (associated(cellsOnCell_save)) then cellsOnCell_ptr => cellsOnCell_save call mpas_pool_get_field(allFields, 'cellsOnCell', cellsOnCell) - end if + end if if (associated(edgesOnCell_save)) then edgesOnCell_ptr => edgesOnCell_save call mpas_pool_get_field(allFields, 'edgesOnCell', edgesOnCell) - end if + end if if (associated(verticesOnCell_save)) then verticesOnCell_ptr => verticesOnCell_save call mpas_pool_get_field(allFields, 'verticesOnCell', verticesOnCell) - end if + end if if (associated(cellsOnEdge_save)) then cellsOnEdge_ptr => cellsOnEdge_save call mpas_pool_get_field(allFields, 'cellsOnEdge', cellsOnEdge) - end if + end if if (associated(verticesOnEdge_save)) then verticesOnEdge_ptr => verticesOnEdge_save call mpas_pool_get_field(allFields, 'verticesOnEdge', verticesOnEdge) - end if + end if if (associated(edgesOnEdge_save)) then edgesOnEdge_ptr => edgesOnEdge_save call mpas_pool_get_field(allFields, 'edgesOnEdge', edgesOnEdge) - end if + end if if (associated(cellsOnVertex_save)) then cellsOnVertex_ptr => cellsOnVertex_save call mpas_pool_get_field(allFields, 'cellsOnVertex', cellsOnVertex) - end if + end if if (associated(edgesOnVertex_save)) then edgesOnVertex_ptr => edgesOnVertex_save call mpas_pool_get_field(allFields, 'edgesOnVertex', edgesOnVertex) - end if + end if ! ! Reset indices for connectivity arrays from global to local index space @@ -5402,8 +5523,8 @@ end subroutine postwrite_reindex !}}} !> \brief Reindex connectivity fields from global to local index space. !> \author Doug Jacobsen, Michael Duda !> \date 24 September 2014 - !> \details - !> For any connectivity fields contained in the stream that was read, + !> \details + !> For any connectivity fields contained in the stream that was read, !> whose fields include those in the streamFields pool, convert the !> globally indexed connectivity fields in the stream to local index space. !> This routine should be called immediately after a read of a stream. @@ -5656,13 +5777,13 @@ end subroutine postread_reindex !}}} !> \author Doug Jacobsen, Michael Duda !> \date 03/03/2015 !> \details - !> If the optional 'streamID' argument is provided, this routine resets - !> the iterator within a stream manager so that streams may subsequently - !> be iterated over using the MPAS_stream_mgr_get_next_stream function. + !> If the optional 'streamID' argument is provided, this routine resets + !> the iterator within a stream manager so that streams may subsequently + !> be iterated over using the MPAS_stream_mgr_get_next_stream function. !> !> If an optional stream name is provided via the 'streamID' argument, this - !> routine will reset the iterator for fields within the specified stream, - !> which may subsequently iterated over using the + !> routine will reset the iterator for fields within the specified stream, + !> which may subsequently iterated over using the !> MPAS_stream_mgr_get_next_field() routine. !> !> NOTE: This routine does not support regular expressions for StreamID @@ -5827,7 +5948,7 @@ end function MPAS_stream_mgr_get_next_stream !}}} !> the stream manager. !> !> This function returns .TRUE. if the stream contains another field, - !> whether active or not, in which case the output argument fieldName + !> whether active or not, in which case the output argument fieldName !> provides the name of this field, and .FALSE. otherwise. If a field name !> is returned, the optional logical argument isActive may be used to !> determine whether the field is currently active in the stream. @@ -5883,7 +6004,7 @@ logical function MPAS_stream_mgr_get_next_field(manager, streamID, fieldName, is else isActive = .true. end if - + call mpas_pool_set_error_level(err_level) end if @@ -5919,7 +6040,571 @@ logical function MPAS_stream_mgr_stream_exists(manager, streamID) result(validSt return end function MPAS_stream_mgr_stream_exists!}}} - + + + !||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| + ! + ! parse_all_timelevels + ! + !> \brief Parse the entire output_timelevels string into a timelevel_spec structure + !> \author Guoqing Ge + !> \date March 2026 + !> \details + !> Parses the complete output_timelevels string (e.g., "0-3-15m 4-72 75-168-3") + !> into a pre-parsed structure for efficient runtime lookup. Uses mpas_split_string + !> to tokenize the string by spaces. Reports parse errors via mpas_log_write. + ! + !----------------------------------------------------------------------- + subroutine parse_all_timelevels(timelevels_str, timelevel_spec, stream_name, ierr)!{{{ + + implicit none + + character(len=*), intent(in) :: timelevels_str + type(MPAS_timelevel_spec_type), intent(out) :: timelevel_spec + character(len=*), intent(in) :: stream_name + integer, intent(out) :: ierr + + character(len=StrKIND), pointer, dimension(:) :: segments + character(len=StrKIND) :: segment + integer :: i, n_segments, local_ierr + real(kind=RKIND) :: start_hour, end_hour, interval_mins + + ierr = 0 + timelevel_spec % n_segments = 0 + timelevel_spec % is_parsed = .false. + + ! Guard against empty string + if (len_trim(timelevels_str) == 0) then + call mpas_log_write('Stream '''//trim(stream_name)//''': output_timelevels string is empty', MPAS_LOG_ERR) + ierr = 1 + return + end if + + ! Split the string by spaces (trim leading/trailing whitespaces first) + nullify(segments) + call mpas_split_string(trim(adjustl(timelevels_str)), ' ', segments) + n_segments = size(segments) + + ! Check for too many segments + if (n_segments > MAX_TIMELEVEL_SEGMENTS) then + call mpas_log_write('Stream '''//trim(stream_name)//''': too many timelevel segments (max $i)', & + MPAS_LOG_ERR, intArgs=(/MAX_TIMELEVEL_SEGMENTS/)) + deallocate(segments) + ierr = 1 + return + end if + + ! Parse each segment + do i = 1, n_segments + segment = trim(adjustl(segments(i))) + + ! Skip empty segments (from consecutive spaces) + if (len_trim(segment) == 0) cycle + + call parse_output_timelevel_spec(segment, start_hour, end_hour, interval_mins, local_ierr) + + if (local_ierr /= 0) then + call mpas_log_write('Stream '''//trim(stream_name)//''': Invalid timelevel format. ' // & + 'Expected ''start-end-interval'' but got '''//trim(segment)//'''', MPAS_LOG_ERR) + deallocate(segments) + ierr = 1 + return + end if + + ! Store parsed values + timelevel_spec % n_segments = timelevel_spec % n_segments + 1 + timelevel_spec % start_hour(timelevel_spec % n_segments) = start_hour + timelevel_spec % end_hour(timelevel_spec % n_segments) = end_hour + timelevel_spec % interval_minutes(timelevel_spec % n_segments) = interval_mins + end do + + deallocate(segments) + + ! Mark as successfully parsed if we have at least one segment + if (timelevel_spec % n_segments > 0) then + timelevel_spec % is_parsed = .true. + else + call mpas_log_write('Stream '''//trim(stream_name)//''': no valid timelevel segments found', MPAS_LOG_ERR) + ierr = 1 + end if + + end subroutine parse_all_timelevels!}}} + + + !||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| + ! + ! parse_time_string + ! + !> \brief Parse a time string into total minutes ( allow 0.5 minutes, i.e. 30 seconds) + !> \author Guoqing Ge + !> \date February 2026 + !> \details + !> Parses time strings like "1h30m", "45m", "6", "90s", "1d" into total minutes + !> Supported units: d/D (days), h (hours), m (minutes), s (seconds) + !> Plain integers without units are interpreted as hours + ! + !----------------------------------------------------------------------- + subroutine parse_time_string(time_str, total_minutes, ierr)!{{{ + + implicit none + + character(len=*), intent(in) :: time_str + real(kind=RKIND), intent(out) :: total_minutes + integer, intent(out) :: ierr + + character(len=StrKIND) :: str_local + integer :: i, len_str, num_start, read_err + real(kind=RKIND) :: value + character :: ch + logical :: found_unit + + ierr = 0 + total_minutes = 0.0_RKIND + str_local = trim(adjustl(time_str)) + len_str = len_trim(str_local) + + if (len_str == 0) then + ierr = 1 + return + end if + + ! Check if last character is a digit (plain integer = hours) + ch = str_local(len_str:len_str) + if (ch >= '0' .and. ch <= '9') then + read(str_local, *, iostat=read_err) value + if (read_err /= 0) then + ierr = 1 + return + end if + total_minutes = value * 60.0_RKIND + return + end if + + ! Parse duration with units (e.g., "1h30m", "45m", "90s", "1d" or "1D") + num_start = 1 + i = 1 + do while (i <= len_str) + ch = str_local(i:i) + ! Check for unit characters: d/D (days), h (hours), m (minutes), s (seconds) + if (ch == 'h' .or. ch == 'm' .or. ch == 's' .or. ch == 'd' .or. ch == 'D') then + + if (i == num_start) then + ierr = 1 ! No number before unit + return + end if + + read(str_local(num_start:i-1), *, iostat=read_err) value + if (read_err /= 0) then + ierr = 1 + return + end if + + select case (ch) + case ('h') + total_minutes = total_minutes + value * 60.0_RKIND + case ('m') + total_minutes = total_minutes + value + case ('s') + total_minutes = total_minutes + value / 60.0_RKIND + case ('d', 'D') + total_minutes = total_minutes + value * 24.0_RKIND * 60.0_RKIND + end select + + num_start = i + 1 + end if + i = i + 1 + end do + + ! Check for trailing digits without unit (error) + if (num_start <= len_str) then + ierr = 1 + return + end if + + end subroutine parse_time_string!}}} + + + !||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| + ! + ! parse_output_timelevel_spec + ! + !> \brief Parse one time level specification segment + !> \author Guoqing Ge + !> \date February 2026 + !> \details + !> Parses a single segment using format: start, start-stop, or start-stop-step + !> Time strings can be integers (hours) or duration format (e.g., "1h30m", "45m") + !> Examples: "6", "0-3", "0-1h-15m", "1h30m-2h-15m" + ! + !----------------------------------------------------------------------- + subroutine parse_output_timelevel_spec(spec, start_hour, end_hour, interval_minutes, ierr)!{{{ + + implicit none + + character(len=*), intent(in) :: spec + real(kind=RKIND), intent(out) :: start_hour, end_hour + real(kind=RKIND), intent(out) :: interval_minutes + integer, intent(out) :: ierr + + character(len=StrKIND), pointer, dimension(:) :: parts + integer :: i, n_parts, local_ierr + real(kind=RKIND) :: start_minutes, end_minutes, step_minutes + + ierr = 0 + + ! Guard against empty string + if (len_trim(spec) == 0) then + ierr = 1 + return + end if + + ! Split by dash delimiter + nullify(parts) + call mpas_split_string(trim(adjustl(spec)), '-', parts) + n_parts = size(parts) + + ! Validate parts array - check for empty parts (from consecutive dashes or leading/trailing dashes) + do i = 1, n_parts + if (len_trim(parts(i)) == 0) then + ierr = 1 + deallocate(parts) + return + end if + end do + + ! Parse based on number of parts + if (n_parts == 1) then + ! Format: single time string (output at that time only) + call parse_time_string(parts(1), start_minutes, local_ierr) + if (local_ierr /= 0) then + ierr = 1 + deallocate(parts) + return + end if + start_hour = start_minutes / 60.0_RKIND + end_hour = start_hour + interval_minutes = 60.0_RKIND ! Default, but won't matter for single time + + else if (n_parts == 2) then + ! Format: start-stop (interval defaults to 1 hour) + call parse_time_string(parts(1), start_minutes, local_ierr) + if (local_ierr /= 0) then + ierr = 1 + deallocate(parts) + return + end if + + call parse_time_string(parts(2), end_minutes, local_ierr) + if (local_ierr /= 0) then + ierr = 1 + deallocate(parts) + return + end if + + start_hour = start_minutes / 60.0_RKIND + end_hour = end_minutes / 60.0_RKIND + interval_minutes = 60.0_RKIND + + else if (n_parts == 3) then + ! Format: start-stop-step + call parse_time_string(parts(1), start_minutes, local_ierr) + if (local_ierr /= 0) then + ierr = 1 + deallocate(parts) + return + end if + + call parse_time_string(parts(2), end_minutes, local_ierr) + if (local_ierr /= 0) then + ierr = 1 + deallocate(parts) + return + end if + + call parse_time_string(parts(3), step_minutes, local_ierr) + if (local_ierr /= 0) then + ierr = 1 + deallocate(parts) + return + end if + + start_hour = start_minutes / 60.0_RKIND + end_hour = end_minutes / 60.0_RKIND + interval_minutes = step_minutes + + else + ! Invalid number of parts (0 or > 3 dashes) + ierr = 1 + deallocate(parts) + return + end if + + deallocate(parts) + + ! Check for setting mistake "start-step-stop". The correct one should be "start-stop-step" + if ((n_parts == 3) .and. (interval_minutes > end_hour * 60.0_RKIND + 1.0e-4_RKIND)) then + call mpas_log_write('ERROR: output_timelevels segment '''//trim(spec)//''': ' // & + 'expected format is start-stop-step, but appears to be start-step-stop', MPAS_LOG_ERR) + end if + + end subroutine parse_output_timelevel_spec!}}} + + + !||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| + ! + ! get_output_interval_from_timelevels + ! + !> \brief Calculate output interval based on current forecast hour and pre-parsed timelevels + !> \author Guoqing Ge + !> \date March 2026 + !> \details + !> Given a pre-parsed timelevel_spec and current forecast hour, determines the + !> appropriate output interval in minutes. + ! + !----------------------------------------------------------------------- + subroutine get_output_interval_from_timelevels(timelevel_spec, forecast_hour, interval_minutes, ierr, next_output_hour)!{{{ + + implicit none + + type(MPAS_timelevel_spec_type), intent(in) :: timelevel_spec + real(kind=RKIND), intent(in) :: forecast_hour + real(kind=RKIND), intent(out) :: interval_minutes + integer, intent(out) :: ierr + real(kind=RKIND), intent(out), optional :: next_output_hour + + integer :: i + real(kind=RKIND) :: start_hour, end_hour, seg_interval + real(kind=RKIND) :: next_hour + logical :: found, is_single_time + + ierr = 0 + interval_minutes = 60.0_RKIND ! Default to 1 hour + if (present(next_output_hour)) next_output_hour = -1.0_RKIND + found = .false. + is_single_time = .false. + next_hour = huge(1.0_RKIND) + + ! Guard against unparsed spec + if (.not. timelevel_spec % is_parsed .or. timelevel_spec % n_segments == 0) then + ierr = 1 + return + end if + + ! First pass: find if current forecast hour matches any segment + do i = 1, timelevel_spec % n_segments + start_hour = timelevel_spec % start_hour(i) + end_hour = timelevel_spec % end_hour(i) + seg_interval = timelevel_spec % interval_minutes(i) + + ! Check if current forecast hour falls in this range + if (forecast_hour >= start_hour .and. forecast_hour <= end_hour) then + found = .true. + ! Check if this is a single time point (not a range) + if (abs(start_hour - end_hour) < 1.0e-6_RKIND) then + is_single_time = .true. + else + ! It's a range - use the segment's interval + interval_minutes = seg_interval + return + end if + end if + end do + + ! If we matched a single time, find the next output time + if (found .and. is_single_time) then + do i = 1, timelevel_spec % n_segments + start_hour = timelevel_spec % start_hour(i) + end_hour = timelevel_spec % end_hour(i) + + ! For single times, check if > current + if (abs(start_hour - end_hour) < 1.0e-6_RKIND) then + if (start_hour > forecast_hour + 1.0e-6_RKIND .and. start_hour < next_hour) then + next_hour = start_hour + end if + else + ! For ranges, check if start is after current + if (start_hour > forecast_hour + 1.0e-6_RKIND .and. start_hour < next_hour) then + next_hour = start_hour + end if + end if + end do + + ! Calculate interval to next time + if (next_hour < huge(1.0_RKIND)) then + interval_minutes = (next_hour - forecast_hour) * 60.0_RKIND + if (present(next_output_hour)) next_output_hour = next_hour + else + ! No more times after this - interval_minutes = 0 signals last output + interval_minutes = 0.0_RKIND + if (present(next_output_hour)) next_output_hour = -1.0_RKIND + end if + return + end if + + ! If no matching range found, signal that no more output is needed + if (.not. found) then + interval_minutes = 0.0_RKIND + ierr = MPAS_STREAM_MGR_ERROR ! Signal no more output + end if + + end subroutine get_output_interval_from_timelevels!}}} + + + !||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| + ! + ! get_next_timelevel_start + ! + !> \brief Find the start hour of the next timelevel range after current forecast hour + !> \author Guoqing Ge + !> \date February 2026 + !> \details + !> Given a time levels string and current forecast hour, finds the start hour + !> of the next timelevel range (if any). Returns -1 if no future ranges exist. + ! + !----------------------------------------------------------------------- + subroutine get_next_timelevel_start(timelevel_spec, forecast_hour, next_start_hour, next_interval_minutes, ierr)!{{{ + + implicit none + + type(MPAS_timelevel_spec_type), intent(in) :: timelevel_spec + real(kind=RKIND), intent(in) :: forecast_hour + real(kind=RKIND), intent(out) :: next_start_hour + real(kind=RKIND), intent(out) :: next_interval_minutes + integer, intent(out) :: ierr + + integer :: i + real(kind=RKIND) :: start_hour, best_start, best_interval + logical :: found + + ierr = 0 + next_start_hour = -1.0_RKIND + next_interval_minutes = 60.0_RKIND + best_start = huge(1.0_RKIND) + best_interval = 60.0_RKIND + found = .false. + + ! Guard against unparsed spec + if (.not. timelevel_spec % is_parsed .or. timelevel_spec % n_segments == 0) then + ierr = 1 + return + end if + + ! Find the segment with smallest start_hour > forecast_hour + do i = 1, timelevel_spec % n_segments + start_hour = timelevel_spec % start_hour(i) + + ! Check if this range starts after current forecast hour + if (start_hour > forecast_hour .and. start_hour < best_start) then + best_start = start_hour + best_interval = timelevel_spec % interval_minutes(i) + found = .true. + end if + end do + + if (found) then + next_start_hour = best_start + next_interval_minutes = best_interval + else + ierr = MPAS_STREAM_MGR_ERROR ! No future ranges + end if + + end subroutine get_next_timelevel_start!}}} + + + !||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| + ! + ! update_variable_output_alarm + ! + !> \brief Update variable output alarm with new interval based on forecast hour + !> \author Guoqing Ge + !> \date March 2026 + !> \details + !> This routine updates a variable output alarm after it has rung by: + !> 1. Checking if the stream uses output_timelevels (via timelevel_spec) + !> 2. Calculating current forecast hour (current time - start time) + !> 3. Getting the next interval from get_output_interval_from_timelevels + !> 4. Removing the old alarm and creating a new one with the updated interval + ! + !----------------------------------------------------------------------- + subroutine update_variable_output_alarm(manager, stream, alarm_name, ierr)!{{{ + + implicit none + + type (MPAS_streamManager_type), intent(inout) :: manager + type (MPAS_stream_list_type), pointer :: stream + character(len=*), intent(in) :: alarm_name + integer, intent(out), optional :: ierr + + type (MPAS_Time_type) :: start_time, current_time, alarmTime_local + type (MPAS_TimeInterval_type) :: time_diff, alarmInterval_local + real (kind=RKIND) :: forecast_hour, next_start_hour, next_output_hour + real (kind=RKIND) :: interval_minutes, next_interval_minutes + integer :: local_ierr, ierr_tmp + integer (kind=I8KIND) :: seconds_diff + + local_ierr = MPAS_STREAM_MGR_NOERR + + ! Check if stream uses output_timelevels (via pre-parsed timelevel_spec) + if (.not. stream % timelevel_spec % is_parsed) then + ! Not a variable output stream, nothing to do + if (present(ierr)) ierr = MPAS_STREAM_MGR_NOERR + return + end if + + ! Calculate current forecast hour + start_time = mpas_get_clock_time(manager % streamClock, MPAS_START_TIME, ierr=ierr_tmp) + current_time = mpas_get_clock_time(manager % streamClock, MPAS_NOW, ierr=ierr_tmp) + time_diff = current_time - start_time + call mpas_get_timeInterval(time_diff, S_i8=seconds_diff, ierr=ierr_tmp) + forecast_hour = real(seconds_diff, RKIND) / 3600.0_RKIND + + ! Get next interval using pre-parsed timelevel_spec + ! Also get the absolute next output hour to avoid floating-point drift + call get_output_interval_from_timelevels(stream % timelevel_spec, forecast_hour, interval_minutes, ierr_tmp, next_output_hour) + if (ierr_tmp /= MPAS_STREAM_MGR_NOERR .or. interval_minutes <= 0.0_RKIND) then + ! Current hour not in any range - check for future range + call get_next_timelevel_start(stream % timelevel_spec, forecast_hour, next_start_hour, next_interval_minutes, ierr_tmp) + call mpas_remove_clock_alarm(manager % streamClock, alarm_name, ierr=ierr_tmp) + + if (next_start_hour > 0.0_RKIND) then + ! Schedule recurring alarm for when the next range starts + call mpas_set_timeInterval(time_diff, dt=next_start_hour * 3600.0_RKIND, ierr=ierr_tmp) + alarmTime_local = start_time + time_diff + call mpas_set_timeInterval(alarmInterval_local, dt=next_interval_minutes * 60.0_RKIND, ierr=ierr_tmp) + call mpas_add_clock_alarm(manager % streamClock, alarm_name, alarmTime_local, & + alarmTimeInterval=alarmInterval_local, ierr=ierr_tmp) + end if + if (present(ierr)) ierr = MPAS_STREAM_MGR_NOERR + return + end if + + ! Convert minutes to time interval (using dt in seconds for sub-minute precision) + call mpas_set_timeInterval(alarmInterval_local, dt=interval_minutes * 60.0_RKIND, ierr=ierr_tmp) + local_ierr = ior(local_ierr, ierr_tmp) + + ! Remove old alarm + call mpas_remove_clock_alarm(manager % streamClock, alarm_name, ierr=ierr_tmp) + local_ierr = ior(local_ierr, ierr_tmp) + + ! Add new alarm with updated interval + ! Use ABSOLUTE time from start_time to avoid floating-point drift + ! This ensures discrete times like "0m 15m 1h3m 2 3" hit exactly + if (next_output_hour > 0.0_RKIND) then + ! Compute alarm time absolutely: start_time + next_output_hour + call mpas_set_timeInterval(time_diff, dt=next_output_hour * 3600.0_RKIND, ierr=ierr_tmp) + alarmTime_local = start_time + time_diff + else + ! Fallback to relative calculation (for ranges) + alarmTime_local = current_time + alarmInterval_local + end if + call mpas_add_clock_alarm(manager % streamClock, alarm_name, alarmTime_local, & + alarmTimeInterval=alarmInterval_local, ierr=ierr_tmp) + local_ierr = ior(local_ierr, ierr_tmp) + + if (present(ierr)) ierr = local_ierr + + end subroutine update_variable_output_alarm!}}} + end module mpas_stream_manager @@ -6019,7 +6704,7 @@ subroutine stream_mgr_create_stream_c(manager_c, streamID_c, direction_c, filena end if STREAM_DEBUG_WRITE('Creating stream from c...') - + ! ! For immutable streams, the stream should have already been defined at this point, and ! all we need to do is update the stream's filename template; @@ -6341,3 +7026,200 @@ subroutine stream_mgr_add_pkg_c(manager_c, streamID_c, package_c, ierr_c) bind(c end if end subroutine stream_mgr_add_pkg_c !}}} + + +subroutine stream_mgr_set_property_c(manager_c, streamID_c, propertyName_c, propertyValue_c, ierr_c) bind(c) !{{{ + + use mpas_c_interfacing, only : mpas_c_to_f_string + use iso_c_binding, only : c_char, c_int, c_ptr, c_f_pointer + use mpas_derived_types, only : MPAS_streamManager_type, MPAS_STREAM_MGR_NOERR, & + MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS, MPAS_STREAM_PROPERTY_DONE_MARKER + use mpas_stream_manager, only : MPAS_stream_mgr_set_property + use mpas_kind_types, only : StrKIND + + implicit none + + type (c_ptr) :: manager_c + character(kind=c_char) :: streamID_c(*) + character(kind=c_char) :: propertyName_c(*) + character(kind=c_char) :: propertyValue_c(*) + integer(kind=c_int) :: ierr_c + + type (MPAS_streamManager_type), pointer :: manager + character(len=StrKIND) :: streamID, propertyName, propertyValue + integer :: ierr + + + ierr = 0 + + call c_f_pointer(manager_c, manager) + call mpas_c_to_f_string(streamID_c, streamID) + call mpas_c_to_f_string(propertyName_c, propertyName) + call mpas_c_to_f_string(propertyValue_c, propertyValue) + + ! Map property name string to constant + if (trim(propertyName) == 'output_timelevels') then + call MPAS_stream_mgr_set_property(manager, streamID, MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS, propertyValue, ierr=ierr) + else if (trim(propertyName) == 'output_done_marker') then + call MPAS_stream_mgr_set_property(manager, streamID, MPAS_STREAM_PROPERTY_DONE_MARKER, 1, ierr=ierr) + end if + + if (ierr == MPAS_STREAM_MGR_NOERR) then + ierr_c = 0 + else + ierr_c = 1 + end if + +end subroutine stream_mgr_set_property_c !}}} + + +subroutine stream_mgr_add_variable_output_alarm_c(manager_c, streamID_c, ierr_c) bind(c) !{{{ + + use mpas_c_interfacing, only : mpas_c_to_f_string + use iso_c_binding, only : c_char, c_int, c_ptr, c_f_pointer + use mpas_derived_types, only : MPAS_streamManager_type, MPAS_Clock_type, MPAS_Time_type, MPAS_TimeInterval_type, & + MPAS_STREAM_MGR_NOERR, MPAS_STREAM_OUTPUT, MPAS_START_TIME, & + MPAS_STREAM_PROPERTY_REF_TIME, MPAS_stream_list_type + use mpas_stream_manager, only : MPAS_stream_mgr_get_clock, MPAS_stream_mgr_add_alarm, & + MPAS_stream_mgr_get_property, get_output_interval_from_timelevels, & + get_next_timelevel_start + use mpas_stream_list, only : MPAS_stream_list_query + use mpas_kind_types, only : StrKIND, RKIND + use mpas_timekeeping, only : mpas_add_clock_alarm, mpas_get_clock_time, mpas_set_time, mpas_set_timeInterval, & + mpas_get_timeInterval, operator(-), operator(+) + + implicit none + + type (c_ptr) :: manager_c + character(kind=c_char) :: streamID_c(*) + integer(kind=c_int) :: ierr_c + + type (MPAS_streamManager_type), pointer :: manager + type (MPAS_stream_list_type), pointer :: stream + type (MPAS_Clock_type), pointer :: clock + character(len=StrKIND) :: streamID, alarmID, ref_time_str + type (MPAS_Time_type) :: alarmTime_local, ref_time, current_time, start_time + type (MPAS_TimeInterval_type) :: alarmInterval_local, time_since_ref, time_offset + integer :: ierr, ierr_tmp, err_local + real(kind=RKIND) :: forecast_hour, next_start_hour + real(kind=RKIND) :: interval_minutes, next_interval_minutes + + + ierr = 0 + ierr_tmp = 0 + + call c_f_pointer(manager_c, manager) + call mpas_c_to_f_string(streamID_c, streamID) + write(alarmID, '(a)') trim(streamID)//'_output' + + ! Look up the stream to get its pre-parsed timelevel_spec + nullify(stream) + if (.not. MPAS_stream_list_query(manager % streams, streamID, stream, ierr=err_local)) then + ierr_c = 1 + return + end if + + ! Check if timelevel_spec is parsed + if (.not. stream % timelevel_spec % is_parsed) then + ierr_c = 1 + return + end if + + call MPAS_stream_mgr_get_clock(manager, clock) + + ! Get reference time for the stream + call MPAS_stream_mgr_get_property(manager, streamID, MPAS_STREAM_PROPERTY_REF_TIME, ref_time_str, ierr=ierr_tmp) + ierr = ior(ierr, ierr_tmp) + call mpas_set_time(ref_time, dateTimeString=ref_time_str, ierr=ierr_tmp) + ierr = ior(ierr, ierr_tmp) + + ! For initial alarm, get start time + start_time = mpas_get_clock_time(clock, MPAS_START_TIME, ierr=ierr_tmp) + ierr = ior(ierr, ierr_tmp) + + ! Get interval for first output (at forecast hour 0) using pre-parsed timelevel_spec + forecast_hour = 0.0_RKIND + call get_output_interval_from_timelevels(stream % timelevel_spec, forecast_hour, interval_minutes, ierr_tmp) + + if (ierr_tmp /= MPAS_STREAM_MGR_NOERR .or. interval_minutes <= 0.0_RKIND) then + ! Hour 0 not in any range - find the first range's start time + call get_next_timelevel_start(stream % timelevel_spec, forecast_hour, next_start_hour, next_interval_minutes, ierr_tmp) + if (ierr_tmp /= MPAS_STREAM_MGR_NOERR .or. next_start_hour < 0.0_RKIND) then + ! No valid ranges at all + ierr_c = 1 + return + end if + ! Schedule alarm for when the first range starts + call mpas_set_timeInterval(time_offset, dt=next_start_hour * 3600.0_RKIND, ierr=ierr_tmp) + alarmTime_local = start_time + time_offset + call mpas_set_timeInterval(alarmInterval_local, dt=next_interval_minutes * 60.0_RKIND, ierr=ierr_tmp) + else + ! Hour 0 is in a valid range - start alarm at start time + alarmTime_local = start_time + call mpas_set_timeInterval(alarmInterval_local, dt=interval_minutes * 60.0_RKIND, ierr=ierr_tmp) + end if + ierr = ior(ierr, ierr_tmp) + + ! Add the alarm + call mpas_add_clock_alarm(clock, alarmID, alarmTime_local, alarmTimeInterval=alarmInterval_local, ierr=ierr_tmp) + ierr = ior(ierr, ierr_tmp) + + ! Register alarm with stream manager + call MPAS_stream_mgr_add_alarm(manager, streamID, alarmID, MPAS_STREAM_OUTPUT, ierr=ierr_tmp) + ierr = ior(ierr, ierr_tmp) + + if (ierr == MPAS_STREAM_MGR_NOERR) then + ierr_c = 0 + else + ierr_c = 1 + end if + +end subroutine stream_mgr_add_variable_output_alarm_c !}}} + + +!||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| +! +! subroutine write_done_marker +! +!> \brief Write an empty marker file to indicate output file completion +!> \author Guoqing Ge +!> \date March 2026 +!> \details +!> Creates an empty file with .done extension after output file is closed. +!> Only called if output_done_marker property is true for the stream. +!> For normal parallel I/O (blockWrite=false), only rank 0 (IO_NODE) writes +!> the marker since all ranks write to the same file. +!> For blockWrite mode, each rank writes its own file, so each rank creates +!> its own marker file. +! +!----------------------------------------------------------------------- +subroutine write_done_marker(filename, dminfo, blockWrite)!{{{ + + use mpas_derived_types, only : dm_info + use mpas_dmpar, only : IO_NODE + + implicit none + + character(len=*), intent(in) :: filename + type (dm_info), intent(in) :: dminfo + logical, intent(in) :: blockWrite + + character(len=1024) :: marker_filename + character(len=8) :: date_str + character(len=10) :: time_str + integer :: unit_num + + ! For blockWrite mode, each rank writes its own file so each creates marker + ! For normal parallel I/O, all ranks write same file so only rank 0 creates marker + if (blockWrite .or. dminfo % my_proc_id == IO_NODE) then + marker_filename = trim(filename) // '.done' + unit_num = 99 + call date_and_time(date=date_str, time=time_str) + open(unit=unit_num, file=trim(marker_filename), status='replace', action='write') + ! Write timestamp: YYYY-MM-DD HH:MM:SS + write(unit_num, '(A)') date_str(1:4)//'-'//date_str(5:6)//'-'//date_str(7:8)//' '// & + time_str(1:2)//':'//time_str(3:4)//':'//time_str(5:6) + close(unit_num) + end if + +end subroutine write_done_marker!}}} diff --git a/src/framework/mpas_stream_manager_types.inc b/src/framework/mpas_stream_manager_types.inc index 1527c5411a..10d0a7fdbb 100644 --- a/src/framework/mpas_stream_manager_types.inc +++ b/src/framework/mpas_stream_manager_types.inc @@ -21,7 +21,9 @@ MPAS_STREAM_PROPERTY_FILENAME_INTV = 11, & MPAS_STREAM_PROPERTY_CLOBBER = 12, & MPAS_STREAM_PROPERTY_IOTYPE = 13, & - MPAS_STREAM_PROPERTY_GATTR_UPDATE = 14 + MPAS_STREAM_PROPERTY_GATTR_UPDATE = 14, & + MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS = 15, & + MPAS_STREAM_PROPERTY_DONE_MARKER = 16 integer, public, parameter :: MPAS_STREAM_CLOBBER_NEVER = 100, & MPAS_STREAM_CLOBBER_APPEND = 101, & diff --git a/src/framework/xml_stream_parser.c b/src/framework/xml_stream_parser.c index 86a3249e06..f4e649f109 100644 --- a/src/framework/xml_stream_parser.c +++ b/src/framework/xml_stream_parser.c @@ -31,6 +31,8 @@ void stream_mgr_add_immutable_stream_fields_c(void *, const char *, const char * void stream_mgr_add_pool_c(void *, const char *, const char *, const char *, int *); void stream_mgr_add_alarm_c(void *, const char *, const char *, const char *, const char *, int *); void stream_mgr_add_pkg_c(void *, const char *, const char *, int *); +void stream_mgr_set_property_c(void *, const char *, const char *, const char *, int *); +void stream_mgr_add_variable_output_alarm_c(void *, const char *, int *); /* @@ -347,7 +349,7 @@ int par_read(char *fname, int *mpi_comm, char **xml_buf, size_t *bufsize) *********************************************************************************/ int attribute_check(ezxml_t stream) { - const char *s_name, *s_type, *s_filename, *s_filename_intv, *s_input, *s_output, *s_ref_time; + const char *s_name, *s_type, *s_filename, *s_filename_intv, *s_input, *s_output, *s_ref_time, *s_output_timelevels; char msgbuf[MSGSIZE]; int i, len, nextchar; @@ -358,6 +360,7 @@ int attribute_check(ezxml_t stream) s_input = ezxml_attr(stream, "input_interval"); s_output = ezxml_attr(stream, "output_interval"); s_ref_time = ezxml_attr(stream, "reference_time"); + s_output_timelevels = ezxml_attr(stream, "output_timelevels"); /* @@ -380,15 +383,20 @@ int attribute_check(ezxml_t stream) /* - * Check that input streams have an input interval, output streams have an output interval + * Check that input streams have an input interval, output streams have an output interval or output_timelevels */ if (strstr(s_type, "input") != NULL && s_input == NULL) { snprintf(msgbuf, MSGSIZE, "stream \"%s\" is an input stream and must have the \"input_interval\" attribute.", s_name); fmt_err(msgbuf); return 1; } - if (strstr(s_type, "output") != NULL && s_output == NULL) { - snprintf(msgbuf, MSGSIZE, "stream \"%s\" is an output stream and must have the \"output_interval\" attribute.", s_name); + if (strstr(s_type, "output") != NULL && s_output == NULL && s_output_timelevels == NULL) { + snprintf(msgbuf, MSGSIZE, "stream \"%s\" is an output stream and must have either the \"output_interval\" or \"output_timelevels\" attribute.", s_name); + fmt_err(msgbuf); + return 1; + } + if (s_output != NULL && s_output_timelevels != NULL) { + snprintf(msgbuf, MSGSIZE, "stream \"%s\" cannot have both \"output_interval\" and \"output_timelevels\" attributes.", s_name); fmt_err(msgbuf); return 1; } @@ -1103,6 +1111,7 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status) /* First, handle changes to immutable stream filename templates, intervals, etc. */ immutable = 1; for (stream_xml = ezxml_child(streams, "immutable_stream"); stream_xml; stream_xml = ezxml_next(stream_xml)) { + const char *output_done_marker; streamID = ezxml_attr(stream_xml, "name"); direction = ezxml_attr(stream_xml, "type"); filename_template = ezxml_attr(stream_xml, "filename_template"); @@ -1118,6 +1127,7 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status) clobber = ezxml_attr(stream_xml, "clobber_mode"); gattr_update = ezxml_attr(stream_xml, "gattr_update"); iotype = ezxml_attr(stream_xml, "io_type"); + output_done_marker = ezxml_attr(stream_xml, "output_done_marker"); /* Extract the input interval, if it refer to other streams */ if ( interval_in ) { @@ -1366,6 +1376,17 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status) return; } + /* If output_done_marker is specified, set it as a property */ + if (output_done_marker != NULL && strstr(output_done_marker, "yes") != NULL) { + stream_mgr_set_property_c(manager, streamID, "output_done_marker", "1", &err); + if (err != 0) { + *status = 1; + return; + } + snprintf(msgbuf, MSGSIZE, " %-20s%s", "output done marker:", "yes"); + mpas_log_write_c(msgbuf, "MPAS_LOG_OUT"); + } + /* Possibly add an input alarm for this stream */ if (itype == 3 || itype == 1) { stream_mgr_add_alarm_c(manager, streamID, "input", "start", interval_in2, &err); @@ -1433,6 +1454,8 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status) /* Next, handle modifications to mutable streams as well as new stream definitions */ immutable = 0; for (stream_xml = ezxml_child(streams, "stream"); stream_xml; stream_xml = ezxml_next(stream_xml)) { + const char *output_timelevels; + const char *output_done_marker; streamID = ezxml_attr(stream_xml, "name"); direction = ezxml_attr(stream_xml, "type"); filename_template = ezxml_attr(stream_xml, "filename_template"); @@ -1441,6 +1464,7 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status) interval_in2 = ezxml_attr(stream_xml, "input_interval"); interval_out = ezxml_attr(stream_xml, "output_interval"); interval_out2 = ezxml_attr(stream_xml, "output_interval"); + output_timelevels = ezxml_attr(stream_xml, "output_timelevels"); reference_time = ezxml_attr(stream_xml, "reference_time"); record_interval = ezxml_attr(stream_xml, "record_interval"); precision = ezxml_attr(stream_xml, "precision"); @@ -1448,6 +1472,7 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status) clobber = ezxml_attr(stream_xml, "clobber_mode"); gattr_update = ezxml_attr(stream_xml, "gattr_update"); iotype = ezxml_attr(stream_xml, "io_type"); + output_done_marker = ezxml_attr(stream_xml, "output_done_marker"); /* Extract the input interval, if it refer to other streams */ if ( interval_in ) { @@ -1481,21 +1506,27 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status) if ( strstr(direction, "input") != NULL && strstr(direction, "output") != NULL ) { /* If input interval is an interval (i.e. not initial_only/final_only or none) set filename_interval to the interval. */ - if ( strstr(interval_in, "initial_only") == NULL && strstr(interval_in, "final_only") == NULL && strstr(interval_in, "none") == NULL ){ + if ( interval_in && strstr(interval_in, "initial_only") == NULL && strstr(interval_in, "final_only") == NULL && strstr(interval_in, "none") == NULL ){ filename_interval = interval_in2; /* If output interval is an interval (i.e. not initial_only/final_only or none) set filename_interval to the interval. */ - } else if ( strstr(interval_out, "initial_only") == NULL && strstr(interval_out, "final_only") == NULL && strstr(interval_out, "none") == NULL ){ + } else if ( interval_out && strstr(interval_out, "initial_only") == NULL && strstr(interval_out, "final_only") == NULL && strstr(interval_out, "none") == NULL ){ filename_interval = interval_out2; + /* If output_timelevels is set, use 'output' to get unique filename for each write */ + } else if ( output_timelevels != NULL ) { + filename_interval = "output"; } /* Check for an input stream. */ } else if ( strstr(direction, "input") != NULL ) { - if ( strstr(interval_in, "initial_only") == NULL && strstr(interval_in, "final_only") == NULL && strstr(interval_in, "none") == NULL ){ + if ( interval_in && strstr(interval_in, "initial_only") == NULL && strstr(interval_in, "final_only") == NULL && strstr(interval_in, "none") == NULL ){ filename_interval = interval_in2; } /* Check for an output stream. */ } else if ( strstr(direction, "output") != NULL ) { - if ( strstr(interval_out, "initial_only") == NULL && strstr(interval_out, "final_only") == NULL && strstr(interval_out, "none") == NULL ){ + if ( interval_out && strstr(interval_out, "initial_only") == NULL && strstr(interval_out, "final_only") == NULL && strstr(interval_out, "none") == NULL ){ filename_interval = interval_out2; + /* If output_timelevels is set, use 'output' to get unique filename for each write */ + } else if ( output_timelevels != NULL ) { + filename_interval = "output"; } } } else { @@ -1507,13 +1538,13 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status) * to force it's value to be none as well. */ if ( strstr(filename_interval, "input_interval") != NULL ) { - if ( strstr(interval_in, "initial_only") == NULL && strstr(interval_in, "final_only") == NULL && strstr(interval_in, "none") == NULL ) { + if ( interval_in && strstr(interval_in, "initial_only") == NULL && strstr(interval_in, "final_only") == NULL && strstr(interval_in, "none") == NULL ) { filename_interval = interval_in2; } else { filename_interval = NULL; } } else if ( strstr(filename_interval, "output_interval") != NULL ) { - if ( strstr(interval_out, "initial_only") == NULL && strstr(interval_out, "final_only") == NULL && strstr(interval_out, "none") == NULL ) { + if ( interval_out && strstr(interval_out, "initial_only") == NULL && strstr(interval_out, "final_only") == NULL && strstr(interval_out, "none") == NULL ) { filename_interval = interval_out2; } else { filename_interval = NULL; @@ -1696,6 +1727,28 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status) return; } + /* If output_timelevels is specified, set it as a property */ + if (output_timelevels != NULL) { + stream_mgr_set_property_c(manager, streamID, "output_timelevels", output_timelevels, &err); + if (err != 0) { + *status = 1; + return; + } + snprintf(msgbuf, MSGSIZE, " %-20s%s", "output timelevels:", output_timelevels); + mpas_log_write_c(msgbuf, "MPAS_LOG_OUT"); + } + + /* If output_done_marker is specified, set it as a property */ + if (output_done_marker != NULL && strstr(output_done_marker, "yes") != NULL) { + stream_mgr_set_property_c(manager, streamID, "output_done_marker", "1", &err); + if (err != 0) { + *status = 1; + return; + } + snprintf(msgbuf, MSGSIZE, " %-20s%s", "output done marker:", "yes"); + mpas_log_write_c(msgbuf, "MPAS_LOG_OUT"); + } + /* Possibly add an input alarm for this stream */ if (itype == 3 || itype == 1) { stream_mgr_add_alarm_c(manager, streamID, "input", "start", interval_in2, &err); @@ -1714,16 +1767,28 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status) /* Possibly add an output alarm for this stream */ if (itype == 3 || itype == 2) { - stream_mgr_add_alarm_c(manager, streamID, "output", "start", interval_out2, &err); - if (err != 0) { - *status = 1; - return; - } - if ( strcmp(interval_out, interval_out2) != 0 ) { - snprintf(msgbuf, MSGSIZE, " %-20s%s (%s)", "output alarm:", interval_out, interval_out2); - mpas_log_write_c(msgbuf, "MPAS_LOG_OUT"); + /* If output_timelevels is specified, use variable alarm; otherwise use fixed interval */ + if (output_timelevels == NULL) { + stream_mgr_add_alarm_c(manager, streamID, "output", "start", interval_out2, &err); + if (err != 0) { + *status = 1; + return; + } + if ( strcmp(interval_out, interval_out2) != 0 ) { + snprintf(msgbuf, MSGSIZE, " %-20s%s (%s)", "output alarm:", interval_out, interval_out2); + mpas_log_write_c(msgbuf, "MPAS_LOG_OUT"); + } else { + snprintf(msgbuf, MSGSIZE, " %-20s%s", "output alarm:", interval_out); + mpas_log_write_c(msgbuf, "MPAS_LOG_OUT"); + } } else { - snprintf(msgbuf, MSGSIZE, " %-20s%s", "output alarm:", interval_out); + /* Use variable output alarm based on timelevels */ + stream_mgr_add_variable_output_alarm_c(manager, streamID, &err); + if (err != 0) { + *status = 1; + return; + } + snprintf(msgbuf, MSGSIZE, " %-20s%s", "output alarm:", "variable (from output_timelevels)"); mpas_log_write_c(msgbuf, "MPAS_LOG_OUT"); } }