Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zerocopy data reader loses data #4715

Open
1 task done
ussefdesouky opened this issue Apr 22, 2024 · 0 comments
Open
1 task done

Zerocopy data reader loses data #4715

ussefdesouky opened this issue Apr 22, 2024 · 0 comments
Labels
triage Issue pending classification

Comments

@ussefdesouky
Copy link

ussefdesouky commented Apr 22, 2024

Is there an already existing issue for this?

  • I have searched the existing issues

Expected behavior

I expect that the data published by the writer will be received and read by the reader.

Current behavior

the writer publishes the data, but the the reader drops some of it.
Screenshot from 2024-04-23 00-36-46

Also why the reader sample not having the same address as the writer, they just share part of the address
Screenshot from 2024-04-23 00-51-15

So it nearly dropped 90 msgs, despite if I run the subscriber 100 msgs after the publisher the subscriber can get the first 100 msgs from the history.
Also the pool is not yet full as it has 530 samples.

Steps to reproduce

Down below you will find my writer profile, and the reader profile will be the same.

Fast DDS version/commit

2.11.2

Platform/Architecture

Other. Please specify in Additional context section.

Transport layer

Zero copy

Additional context

I am working on docker image with ubuntu 22.04 installed

XML configuration file

`<?xml version="1.0" encoding="UTF-8" ?>
<dds xmlns="http://www.eprosima.com/XMLSchemas/fastRTPS_Profiles" >
    <profiles>
        <participant profile_name="participant_profile">
            <domainId>0</domainId>
            <rtps>
                <name>sensor data pub</name>
                <participantID>99</participantID>


            </rtps>
        </participant>

        <data_writer profile_name="datawriter_profile">
            <topic>
                <historyQos>
                    <!-- KEEP_LAST: keep the most recent values of the instance and discard the older ones depending on the depth value. -->
                    <!-- KEEP_ALL: keep all the data until it's received by the reader. The depth has no effect. -->
                    <kind>KEEP_ALL</kind> 
                    <depth>530</depth>
                </historyQos>
                <!-- Determines the actual maximum queue size especially when the HISTORY QosPolicy is set to KEEP_ALL. -->
                <!-- To maintain the consistency within the ResourceLimitsQosPolicy, the values of the data members must follow the next conditions:
                   -> The value of max_samples must be higher or equal to the value of max_samples_per_instance.
                   -> The value established for the HistoryQosPolicy depth must be lower or equal to the value stated for max_samples_per_instance. -->
                <resourceLimitsQos>
                    <!-- Controls the maximum number of samples that the DataWriter or DataReader can manage across all the instances associated with it. In other words, it represents the maximum samples that the middleware can store for a DataReader or DataWriter. -->
                    <!-- If you want to be able to store max_samples_per_instance for every instance, then you should set
                        -> max_samples >= max_instances * max_samples_per_instance -->
                    <!-- if you want to save memory and you do not expect that the running application will ever reach the case where it will see max_instances of instances, then you may use a smaller value for max_samples to save memory.In any case, there is a lower limit for max_samples:
                        -> max_samples >= max_samples_per_instance -->
                    <max_samples>530</max_samples> <!-- uint32_t --> <!-- 0 means infinite -->
                    <!-- Controls the maximum number of instances that a DataWriter or DataReader can manage. -->
                    <max_instances>1</max_instances> <!-- uint32_t --> <!-- 0 means infinite -->
                    <!-- Controls the maximum number of samples within an instance that the DataWriter or DataReader can manage. -->
                    <!-- If the HISTORY QosPolicy’s kind is set to KEEP_LAST, then you should set:
                        -> max_samples_per_instance = HISTORY.depth -->
                    <max_samples_per_instance>530</max_samples_per_instance> <!-- uint32_t --> <!-- 0 means infinite -->
                    <!-- States the number of samples that will be allocated on initialization. -->
                    <allocated_samples>0</allocated_samples> <!-- uint32_t --> 
                    <!-- States the number of extra samples that will be allocated on the pool, so the maximum number of samples on the pool will be max_samples plus extra_samples. These extra samples act as a reservoir of samples even when the history is full. -->
                    <extra_samples>0</extra_samples> <!-- uint32_t --> 
                </resourceLimitsQos>
            </topic>
            <qos> <!-- dataWriterQosPoliciesType -->
                <!-- Defines the data-sharing delivery communication between a writer and a reader, disabling any extra copies. The writer writes data to shared mem file then notify the reader -->
                <data_sharing>
                    <!-- OFF: The data-sharing delivery is disabled. No communication will be performed using data-sharing delivery functionality. -->
                    <!-- ON: The data-sharing delivery is manually enabled. An error will occur if the current topic is not compatible with data-sharing delivery. Communication with remote entities that share at least one data-sharing domain ID will be done using data-sharing delivery functionality. -->
                    <!-- AUTO: data-sharing delivery will be activated if the current topic is compatible with data-sharing, and deactivated if not. -->
                    <kind>ON</kind> <!-- ON --> <!-- OFF --> <!-- AUTOMATIC --> 
                    <!-- Directory used for the memory-mapped files. -->
                    <shared_dir>/repos/shared</shared_dir> <!-- string -->
                    <!-- Maximum number of Data-Sharing domain IDs in the local or remote endpoints. -->
                    <max_domains>10</max_domains> <!-- uint32_t --> <!-- 0 means unlimited -->
                    <!-- List of Data-Sharing domain IDs configured for the current endpoint. -->
                    <domain_ids> <!-- domainId -->
                        <domainId>0</domainId>
                        <!-- <domainId>11</domainId> -->
                    </domain_ids>
                </data_sharing>
                <!-- On the publishing side, the deadline defines the maximum period in which the application is expected to supply a new sample. -->
                <!-- On the subscribing side, it defines the maximum period in which new samples should be received. -->
                <deadline>
                    <!-- A DurationType is defined by at least one mandatory element of two possible ones: <sec> plus <nanosec> -->
                    <period>
                        <sec>1</sec> <!-- DURATION_INFINITY, DURATION_INFINITE_SEC -->
                        <nanosec>0</nanosec> <!-- DURATION_INFINITE_NSEC -->
                    </period>
                </deadline>
                <disable_heartbeat_piggyback>true</disable_heartbeat_piggyback>
                <!-- This additional QoS allows reducing network traffic when strict reliable communication is not required and bandwidth is limited. It consists in changing the default behavior by which positive acks are sent from readers to writers. Instead, only negative acks will be sent when a reader is missing a sample, but writers will keep data for an adjustable time before considering it as acknowledged. -->
                <disablePositiveAcks>
                    <enabled>false</enabled> <!-- bool --> <!-- default: false -->
                    <duration>
                        <sec>0</sec>
                        <nanosec>5000000</nanosec>
                    </duration>
                </disablePositiveAcks>
                <!-- The DurabilityQoSPolicy defines how the system will behave regarding those samples that existed on the Topic before the DataReader joins. -->
                <durability>
                    <!-- VOLATILE_DURABILITY_QOS: Past samples are ignored and a joining DataReader receives samples generated after the moment it matches. -->
                    <!-- TRANSIENT_LOCAL_DURABILITY_QOS: When a new DataReader joins, its History is filled with past samples. -->
                    <!-- TRANSIENT_DURABILITY_QOS: When a new DataReader joins, its History is filled with past samples, which are stored on persistent storage (see Persistence Service). -->
                    <!-- PERSISTENT_DURABILITY_QOS: (Not Implemented): All the samples are stored on a permanent storage, so that they can outlive a system session. -->
                    <kind>TRANSIENT_LOCAL</kind>
                </durability>
                <!-- QoS policy pending implementation -->
                <!-- specifies the maximum acceptable delay from the time the data is written until the data is inserted on the DataReader History and notified of the fact. That delay by default is set to 0 in order to optimize the internal operations -->
                <latencyBudget>
                    <duration>
                        <sec>0</sec>
                    </duration>
                </latencyBudget>
                <!-- Each data sample written by a DataWriter has an associated expiration time beyond which the data is removed from the DataWriter and DataReader history as well as from the transient and persistent information caches -->
                <lifespan>
                    <duration>
                        <sec>5</sec> <!--int32_t--> <!-- default: Infinite -->
                    </duration>
                </lifespan>
                <!-- QoS Policy controls the mechanism used by the service to ensure that a particular entity on the network is still alive. There are different settings that allow distinguishing between applications where data is updated periodically and applications where data is changed sporadically. It also allows customizing the application regarding the kind of failures that should be detected by the liveliness mechanism -->
                <liveliness>
                    <!-- AUTOMATIC_LIVELINESS_QOS: The service takes the responsibility for renewing the leases at the required rates, as long as the local process where the participant is running and the link connecting it to remote participants exists, the entities within the remote participant will be considered alive. This kind is suitable for applications that only need to detect whether a remote application is still running. -->
                    <!-- The two Manual modes require that the application on the publishing side asserts the liveliness periodically before the lease_duration timer expires. Publishing any new data value implicitly asserts the DataWriter’s liveliness, but it can be done explicitly by calling the assert_liveliness member function. -->
                    <!--MANUAL_BY_PARTICIPANT_LIVELINESS_QOS: If one of the entities in the publishing side asserts its liveliness, the service deduces that all other entities within the same DomainParticipant are also alive. -->
                    <!--MANUAL_BY_TOPIC_LIVELINESS_QOS: This mode is more restrictive and requires that at least one instance within the DataWriter is asserted to consider that the DataWriter is alive.-->
                    <kind>MANUAL_BY_PARTICIPANT</kind> <!-- AUTOMATIC_LIVELINESS_QOS -->
                    <!-- Amount of time to wait since the last time the DataWriter asserts its liveliness to consider that it is no longer alive. Additionally, the lease_duration of the DataWriter must not be greater than the lease_duration of the DataReader. -->
                    <lease_duration>
                        <sec>1000000</sec> 
                        <nanosec>856000</nanosec>
                    </lease_duration>
                    <!-- Amount of time between consecutive liveliness messages sent by the DataWriter. This data member only takes effect if the kind is AUTOMATIC_LIVELINESS_QOS or MANUAL_BY_PARTICIPANT_LIVELINESS_QOS and needs to be lower than the lease_duration. -->
                    <announcement_period>
                        <sec>90000</sec> 
                        <nanosec>856000</nanosec> 
                    </announcement_period>
                </liveliness>
                <!-- QoS Policy specifies whether it is allowed for multiple DataWriters to update the same instance of data, and if so, how these modifications should be arbitrated. -->
                <ownership>
                    <!-- To maintain the compatibility between OwnershipQosPolicy in DataReaders and DataWriters, the DataWriter kind must be equal to the DataReader kind. -->
                    <!-- SHARED_OWNERSHIP_QOS: Multiple Data writers write on the same topic. -->
                    <!-- EXCLUSIVE_OWNERSHIP_QOS: Only one data writer owns the topic, if multiple writers are trying to own the topic the one with higher strength will own it -->
                    <kind>EXCLUSIVE</kind>
                </ownership>
                <!-- This QoS Policy specifies the value of the strength used to arbitrate among multiple DataWriters that attempt to modify the same data instance. It is only applicable if the OwnershipQosPolicy kind is set to EXCLUSIVE_OWNERSHIP_QOS. If strength is equal the one with lower GUID will own the topic -->
                <ownershipStrength>
                    <value>50</value>
                </ownershipStrength>
                <publishMode>
                    <kind>SYNCHRONOUS</kind>
                </publishMode>
                <!-- Indicates the level of reliability offered and requested by the service. -->
                <reliability>
                    <!-- BEST_EFFORT_RELIABILITY_QOS: It indicates that it is acceptable not to retransmit the missing samples, so the messages are sent without waiting for an arrival confirmation. Presumably new values for the samples are generated often enough that it is not necessary to re-send any sample. However, the data samples sent by the same DataWriter will be stored in the DataReader history in the same order they occur. In other words, even if the DataReader misses some data samples, an older value will never overwrite a newer value. -->
                    <!-- RELIABLE_RELIABILITY_QOS: It indicates that the service will attempt to deliver all samples of the DataWriter’s history expecting an arrival confirmation from the DataReader. The data samples sent by the same DataWriter cannot be made available to the DataReader if there are previous samples that have not been received yet. The service will retransmit the lost data samples in order to reconstruct a correct snapshot of the DataWriter history before it is accessible by the DataReader. This option may block the write operation, hence the max_blocking_time is set that will unblock it once the time expires. But if the max_blocking_time expires before the data is sent, the write operation will return an error. -->
                    <!-- Setting this QoS Policy to BEST_EFFORT_RELIABILITY_QOS affects to the DurabilityQosPolicy, making the endpoints behave as VOLATILE_DURABILITY_QOS. -->
                    <kind>RELIABLE</kind>
                    <!-- Configures the maximum duration that the write operation can be blocked. -->
                    <max_blocking_time>
                        <sec>0</sec> <!-- default: 100ms -->
                        <nanosec>1000000</nanosec>
                    </max_blocking_time>
                </reliability>
            </qos>

            <!-- <propertiesPolicy>
                <properties> -->
                    <!-- Persistence GUID -->
                    <!-- <property>
                        <name>dds.persistence.guid</name>
                        <value>77.72.69.74.65.72.5f.70.65.72.73.5f|67.75.69.64</value>
                    </property>
                </properties>
            </propertiesPolicy> -->

            <times>
                <initialHeartbeatDelay>
                    <sec>0</sec> <!-- 12ms -->
                    <nanosec>12000000</nanosec>
                </initialHeartbeatDelay>
                <heartbeatPeriod>
                    <sec>3</sec> <!-- 3s -->
                    <nanosec>0</nanosec>
                </heartbeatPeriod>
                <nackResponseDelay>
                    <sec>0</sec> <!-- 5ms -->
                    <nanosec>5000000</nanosec>
                </nackResponseDelay>
                <nackSupressionDuration>
                    <sec>0</sec> <!-- 0 ms-->
                    <nanosec>100000</nanosec>
                </nackSupressionDuration>
            </times>

            <!-- Whether locators that don’t match with the announced locators should be kept. I think it's related to the TCP & UDP -->
            <ignore_non_matching_locators>true</ignore_non_matching_locators> <!-- bool --> <!-- default: false -->
                        <!-- Indicates the way the memory is managed in terms of dealing with the CacheChanges. -->
            <!-- PREALLOCATED_MEMORY_MODE: This option sets the size to the maximum of each data type. It produces the largest memory footprint but the smallest allocation count. -->
            <!-- PREALLOCATED_WITH_REALLOC_MEMORY_MODE: This option set the size to the default for each data type and it requires reallocation when a bigger message arrives. It produces a lower memory footprint at the expense of increasing the allocation count. -->
            <!-- DYNAMIC_RESERVE_MEMORY_MODE: This option allocates the size dynamically at the time of message arrival. It produces the least memory footprint but the highest allocation count. -->
            <!-- DYNAMIC_REUSABLE_MEMORY_MODE: This option is similar to DYNAMIC_RESERVE_MEMORY_MODE, but the allocated memory is reused for future messages. Best for memory usage. -->
            <historyMemoryPolicy>PREALLOCATED_WITH_REALLOC</historyMemoryPolicy> <!-- default: PREALLOCATED -->


            <!-- Establishes the unique identifier used for StaticEndpointDiscovery. -->
            <userDefinedID>-1</userDefinedID> <!-- int16_t --> <!-- default: -1 -->
            <!-- The user can specify the identifier for the endpoint. -->
            <entityID>-1</entityID> <!-- int16_t --> <!-- default: -1 -->

            <matchedSubscribersAllocation>
                <!-- Number of elements for which space is initially allocated. -->
                <initial>0</initial> <!-- uint32_t--> <!-- default: 0 -->
                <!-- Maximum number of elements for which space will be allocated. -->
                <maximum>0</maximum> <!-- uint32_t--> <!-- default: 0 (Infinite) -->
                <!-- Number of new elements that will be allocated when more space is necessary. -->
                <increment>1</increment> <!-- uint32_t--> <!-- default: 1 -->
            </matchedSubscribersAllocation>
        </data_writer>
    </profiles>
</dds>`

Relevant log output

No response

Network traffic capture

No response

@ussefdesouky ussefdesouky added the triage Issue pending classification label Apr 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
triage Issue pending classification
Projects
None yet
Development

No branches or pull requests

1 participant