Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

the fairness among multiple sender sessions. #66

Open
honglei opened this issue Jul 6, 2022 · 6 comments
Open

the fairness among multiple sender sessions. #66

honglei opened this issue Jul 6, 2022 · 6 comments

Comments

@honglei
Copy link
Contributor

honglei commented Jul 6, 2022

In the following code, I create two sessions in one process, only session s1 keeps send data:

import ipaddress
from random import randint
import datetime
import pynorm 

def create_Session(instance:pynorm.Instance, destAddr:str, destPort:int, localAddr:str, localPort:int=0):
    session = instance.createSession(destAddr, destPort,  localId=ipaddress.IPv4Address(localAddr)._ip )
    session.setTxPort(localPort,txBindAddr=localAddr) #
    session.setTxOnly(txOnly=True) 
    #session.setTxRate(256e10)
    session.setCongestionControl(ccEnable=True)
    session.startSender(randint(0, 1000), 100*1024*1024, segmentSize=1400, blockSize=128, numParity=0) 
    session.setGroupSize(4)
    return session 
    
    
if __name__ == '__main__':
    instance = pynorm.Instance()
    instance.setDebugLevel(level=1)
    
    localAddr = "10.1.1.1"
    destPort =6003
    data = b'xxxxxxxxxxxxxxx'*1024*1024
    #opts.address
    s1 = create_Session(instance, destAddr='224.1.2.3'  , destPort=destPort, localAddr=localAddr, localPort=0)
    s1.name='s1'
    s1.dataEnqueue(data= data )
    
    s2 = create_Session(instance, destAddr='224.1.2.4' , destPort=destPort, localAddr=localAddr, localPort=0)
    s2.name='s2'
    s2.dataEnqueue(data= data )
    try:
        for event in instance:
            event:pynorm.event.Event
            e = str(event).strip("NORM_")
            print( datetime.datetime.now().strftime("%H:%M:%S.%f"), e, event.session.name)
            if e in ('TX_OBJECT_SENT' , 'TX_OBJECT_PURGED' , 'TX_QUEUE_EMPTY') :
                result = event.session.dataEnqueue(data= data )            
    except KeyboardInterrupt:
        pass

Output:

19:02:46.367202 GRTT_UPDATED s1
19:02:46.367202 GRTT_UPDATED s2
19:02:49.571540 CC_ACTIVE s1
19:02:49.571540 TX_RATE_CHANGED s1
19:02:49.571540 CC_ACTIVE s2
19:02:49.571540 TX_RATE_CHANGED s2
19:02:50.711149 GRTT_UPDATED s1
19:02:51.007191 GRTT_UPDATED s1
19:02:51.208529 GRTT_UPDATED s1
19:02:51.409058 GRTT_UPDATED s1
19:02:51.578669 GRTT_UPDATED s1
19:02:51.750514 GRTT_UPDATED s1
19:02:51.889442 GRTT_UPDATED s1
19:02:52.012103 GRTT_UPDATED s1
19:02:52.058247 GRTT_UPDATED s2
19:02:52.121027 GRTT_UPDATED s1
19:02:52.236193 GRTT_UPDATED s1
19:02:52.313101 GRTT_UPDATED s1
19:02:52.331226 GRTT_UPDATED s2
19:02:52.394557 GRTT_UPDATED s1
19:02:52.513325 GRTT_UPDATED s1
19:02:52.513325 TX_OBJECT_SENT s1
19:02:52.516415 TX_QUEUE_EMPTY s1
19:02:52.552606 GRTT_UPDATED s2
19:02:52.563738 GRTT_UPDATED s1
19:02:52.612012 GRTT_UPDATED s1
19:02:52.681224 GRTT_UPDATED s1
19:02:52.912325 TX_OBJECT_SENT s1
19:02:53.004779 GRTT_UPDATED s1
19:02:53.022583 GRTT_UPDATED s1
19:02:53.063095 GRTT_UPDATED s1
19:02:53.154112 GRTT_UPDATED s1
19:02:53.185314 GRTT_UPDATED s1
19:02:53.211547 GRTT_UPDATED s1
19:02:53.345550 TX_OBJECT_SENT s1
19:02:53.347252 GRTT_UPDATED s1
19:02:53.349203 GRTT_UPDATED s1
19:02:53.352201 GRTT_UPDATED s1
19:02:53.358234 GRTT_UPDATED s1
19:02:53.368854 GRTT_UPDATED s1
19:02:53.389490 GRTT_UPDATED s1
19:02:53.621008 TX_OBJECT_SENT s1
19:02:53.640529 TX_QUEUE_EMPTY s1
19:02:53.885777 GRTT_UPDATED s1
19:02:53.916294 GRTT_UPDATED s2
19:02:53.975162 GRTT_UPDATED s2
19:02:53.987099 GRTT_UPDATED s1
19:02:53.998778 GRTT_UPDATED s1
19:02:54.009587 GRTT_UPDATED s1
19:02:54.027158 GRTT_UPDATED s1
19:02:54.054362 GRTT_UPDATED s1
19:02:54.168584 TX_OBJECT_SENT s1
19:02:54.193041 TX_QUEUE_EMPTY s1
19:02:54.305678 GRTT_UPDATED s1
19:02:54.346052 GRTT_UPDATED s1
19:02:54.495377 GRTT_UPDATED s1
19:02:54.562085 GRTT_UPDATED s1
19:02:54.625685 GRTT_UPDATED s1
19:02:54.755185 TX_OBJECT_SENT s1
19:02:54.769249 GRTT_UPDATED s1
19:02:54.866510 GRTT_UPDATED s1
19:02:54.939676 GRTT_UPDATED s1
19:02:55.045133 GRTT_UPDATED s1
19:02:55.045133 TX_OBJECT_SENT s1
19:02:55.046137 TX_QUEUE_EMPTY s1
19:02:55.046137 GRTT_UPDATED s1
19:02:55.046137 TX_OBJECT_PURGED s1
19:02:55.046137 GRTT_UPDATED s1
19:02:55.047138 GRTT_UPDATED s1
19:02:55.047138 GRTT_UPDATED s1
19:02:55.047138 GRTT_UPDATED s1
19:02:55.047138 TX_OBJECT_SENT s1
19:02:55.047138 GRTT_UPDATED s1
19:02:55.047138 GRTT_UPDATED s1
19:02:55.047138 TX_OBJECT_SENT s1
19:02:55.047138 TX_QUEUE_EMPTY s1
19:02:55.048410 TX_OBJECT_PURGED s1
19:02:55.048410 GRTT_UPDATED s1
19:02:55.048410 GRTT_UPDATED s1
19:02:55.048410 GRTT_UPDATED s1
19:02:55.048410 GRTT_UPDATED s1
19:02:55.048923 GRTT_UPDATED s1
19:02:55.048971 GRTT_UPDATED s1
19:02:55.048971 GRTT_UPDATED s2
19:02:55.048971 TX_OBJECT_SENT s1
19:02:55.048971 TX_QUEUE_EMPTY s1
19:02:55.048971 TX_OBJECT_PURGED s1
19:02:55.048971 TX_OBJECT_PURGED s1
19:02:55.048971 TX_OBJECT_PURGED s1
19:02:55.049935 TX_OBJECT_PURGED s1
19:02:55.049935 TX_OBJECT_PURGED s1
19:02:55.049935 TX_OBJECT_PURGED s1
19:02:55.208242 GRTT_UPDATED s1
19:02:55.272049 GRTT_UPDATED s1
19:02:55.423414 GRTT_UPDATED s1
19:02:55.618342 GRTT_UPDATED s1
19:02:55.641176 TX_QUEUE_EMPTY s1
19:02:56.150859 TX_OBJECT_SENT s1
19:02:56.315592 GRTT_UPDATED s1
19:02:56.326327 GRTT_UPDATED s1
19:02:56.342155 GRTT_UPDATED s1
19:02:56.391655 TX_OBJECT_SENT s1
19:02:56.831313 TX_OBJECT_PURGED s1
19:02:56.928489 GRTT_UPDATED s1
19:02:56.946144 GRTT_UPDATED s2
19:02:56.974019 GRTT_UPDATED s1
19:02:56.998517 GRTT_UPDATED s1
19:02:57.032450 GRTT_UPDATED s1
19:02:57.055062 GRTT_UPDATED s1
19:02:57.092916 TX_OBJECT_SENT s1
19:02:57.305245 GRTT_UPDATED s1
19:02:57.419702 GRTT_UPDATED s1
19:02:57.474402 TX_OBJECT_PURGED s1
19:02:57.566750 GRTT_UPDATED s1
19:02:57.580307 TX_OBJECT_SENT s1
19:02:57.766409 GRTT_UPDATED s1
19:02:57.786197 GRTT_UPDATED s2
19:02:57.814184 GRTT_UPDATED s2
19:02:57.825652 GRTT_UPDATED s1
19:02:57.841741 GRTT_UPDATED s1
19:02:57.928619 GRTT_UPDATED s1
19:02:57.959251 TX_OBJECT_PURGED s1
19:02:58.158236 GRTT_UPDATED s1
19:02:58.263278 GRTT_UPDATED s1
19:02:58.390227 TX_OBJECT_SENT s1
19:02:58.579250 GRTT_UPDATED s1
19:02:58.593876 GRTT_UPDATED s1
19:02:58.611716 GRTT_UPDATED s1
19:02:58.637049 GRTT_UPDATED s1
19:02:58.657264 GRTT_UPDATED s1
19:02:58.730318 TX_OBJECT_SENT s1
19:02:58.803523 TX_OBJECT_PURGED s1
19:02:58.917232 GRTT_UPDATED s1
19:02:58.938985 GRTT_UPDATED s1
19:02:58.950941 GRTT_UPDATED s1
19:02:58.960462 GRTT_UPDATED s1
19:02:58.971501 GRTT_UPDATED s1
19:02:58.994048 GRTT_UPDATED s1
19:02:59.055631 TX_OBJECT_SENT s1
19:02:59.153415 TX_OBJECT_PURGED s1
19:02:59.627289 GRTT_UPDATED s1
19:02:59.766426 GRTT_UPDATED s1
19:02:59.767565 GRTT_UPDATED s1
19:02:59.785161 GRTT_UPDATED s1
19:02:59.805698 GRTT_UPDATED s1
19:02:59.851661 GRTT_UPDATED s1
19:02:59.988465 GRTT_UPDATED s1
19:03:00.006055 GRTT_UPDATED s1
19:03:00.122846 TX_OBJECT_SENT s1
19:03:00.316230 GRTT_UPDATED s1
19:03:00.324909 GRTT_UPDATED s1
19:03:00.340978 TX_OBJECT_PURGED s1
19:03:00.414464 GRTT_UPDATED s1
19:03:00.442500 GRTT_UPDATED s1
19:03:00.487322 GRTT_UPDATED s1
19:03:00.519405 TX_OBJECT_SENT s2
19:03:00.647415 TX_QUEUE_EMPTY s2
19:03:00.702349 TX_OBJECT_PURGED s1
19:03:01.134269 GRTT_UPDATED s1
19:03:01.153242 TX_OBJECT_SENT s1
19:03:01.300629 GRTT_UPDATED s1
19:03:01.330173 GRTT_UPDATED s1
19:03:01.338429 GRTT_UPDATED s1
19:03:01.349977 GRTT_UPDATED s1
19:03:01.360303 GRTT_UPDATED s1
19:03:01.384839 GRTT_UPDATED s1
19:03:01.396490 GRTT_UPDATED s1
19:03:01.410553 GRTT_UPDATED s1
19:03:01.446692 TX_OBJECT_SENT s1
19:03:01.641648 TX_OBJECT_PURGED s1
19:03:01.885609 GRTT_UPDATED s1
19:03:01.996293 GRTT_UPDATED s1
19:03:02.001411 GRTT_UPDATED s1
19:03:02.005331 GRTT_UPDATED s1
19:03:02.022968 GRTT_UPDATED s1
19:03:02.044058 GRTT_UPDATED s1
19:03:02.064358 GRTT_UPDATED s1
19:03:02.080898 GRTT_UPDATED s1
19:03:02.109323 GRTT_UPDATED s1
19:03:02.192692 TX_OBJECT_SENT s1
19:03:02.193687 GRTT_UPDATED s1
19:03:02.193687 GRTT_UPDATED s1
19:03:02.193687 GRTT_UPDATED s1
19:03:02.193687 GRTT_UPDATED s1
19:03:02.193687 GRTT_UPDATED s1
19:03:02.193687 GRTT_UPDATED s1
19:03:02.193687 GRTT_UPDATED s1
19:03:02.193687 TX_OBJECT_PURGED s1
19:03:02.194809 TX_OBJECT_SENT s1
@bebopagogo
Copy link
Collaborator

One thing you may want to try is to use the following approach for enqueuing new data objects

if e in ('TX_QUEUE_EMPTY') :
result = event.session.dataEnqueue(data= data )

I.e., only use TX_QUEUE_EMPTY as the trigger for enqueuing data. TX_OBJECT_SENT is more of an informational notification for applications wishing to know when the sender has made through at least one pass of initial transmission of data for an object. And TX_OBJECT_PURGED is intended as a notification the application can use when it is safe to deallocate resources referenced by a NORM tx object (e.g., a chunk of memory in the case of NORM_OBJECT_DATA or a file in the case of NORM_OBJECT_FILE).

This might help with the fairness issue since I think the pattern you are using quickly ramps up activity on the first session 's1' and session 's2' tends to no get serviced as much as a result and may end up ramping up very slowly. What you may also want to do is have your code enqueue multiple data objects to both sessions at start up. If you keep both sessions transmission queues filled up, the underlying NORM code should be giving both sessions somewhat round-robin service. It could also be a better approach for this case to use multiple NormInstances, on per session, and then the operating system scheduling would probably keep the load balanced among the sessions. This is not something I've looked at closely with the NORM code and there could be issues under different conditions in addition to what you've identified here where sessions are not evenly/fairly serviced. One thing that perhaps could be done is updating the NormEvent queue to make sure that events for different sessions are alternated. Perhaps with a NormEvent queue per session and round-robin service of those queues ...

I really appreciate the degree of regression testing you have been conducting here and apologize that my schedule has not let me keep up with addressing all of your good suggestions here more quickly.

@honglei
Copy link
Contributor Author

honglei commented Jul 20, 2022

Yes, only use TX_QUEUE_EMPTY as the trigger for enqueuing data makes two session send data!

            if  event.type ==EventType.TX_QUEUE_EMPTY :
                result = event.session.dataEnqueue(data= data )
                
            elif event.type == EventType.TX_OBJECT_PURGED:
                if event.session == s1:
                    s1count +=1
                elif event.session == s2:
                    s2count +=1
                print ( f"s1: {s1count}, s2:{s2count}")

Env: 1Gbps LAN, 2 Win10PC ; 1 sender with 2 sender session, 2 receiver each with one receiver session:
1, without setting rateMax, object sended: s1: 375, s2:78
2, with setting rateMax ==300Mbps, object sended: s1: 310, s2:302

     session.setTxRateBounds(rateMin=-1, rateMax= 300_000_000)

3, with setting rateMax ==500Mbps, object sended: s1: 358, s2:144

@honglei
Copy link
Contributor Author

honglei commented Jul 20, 2022

3 sender sessions, setting rateMax ==200Mbps, object sended: s1: 500, s2:450, s3:403

@honglei
Copy link
Contributor Author

honglei commented Jul 21, 2022

So many warnings:

Linux:

Proto Warn: NormSession::SendMessage() sendto(224.1.2.4/6003) 'blocked' warning: Resource temporarily unavailable

Win10:

Proto Warn: NormSession::SendMessage() sendto(224.1.2.4/6003) 'blocked' warning: A non-blocking socket operation could not be completed immediately

@bebopagogo
Copy link
Collaborator

bebopagogo commented Jul 30, 2022

Those warnings are fairly benign just indicating that the underlying transmit socket buffer is full and that the code will need to "try again" to send a packet. At 3 sessions time 300 Mbps each, you are getting close to the capacity of the 1 Gbps LAN. At higher data rates larger socket buffer sizes can help when the user-space code is getting more "bursty" scheduling on your operating system.

The printout of the warnings could impact performance when the debug level is set high enough for those to be displayed. I can't recall off the top of my head what debug level those warnings are output, but it it's happening at too low a debug level useful for debugging other issues, it's possible they could be raised.

@bebopagogo
Copy link
Collaborator

Another work around for this issue would be to use a separate NormInstance for each Session ... then fairness is really dictated by operating system scheduling of each of the threads (Each NormInstance corresponds to an operating system thread). One improvement I've thought about is using system calls to balance sessions over instances/threads under the hood so the user of the API doesn't have to concern themselves with that detail.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants