Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gevent breaking threading #167

Open
Shiny380 opened this issue Dec 31, 2020 · 12 comments
Open

gevent breaking threading #167

Shiny380 opened this issue Dec 31, 2020 · 12 comments
Assignees

Comments

@Shiny380
Copy link
Collaborator

Shiny380 commented Dec 31, 2020

gevent monkey patching seems to be breaking threads.
This can be seen in mqtt_client_base.py https://github.com/NubeIO/rubix-point-server/blob/master/src/services/mqtt_client/mqtt_client_base.py#L32

self._client.connect(self.config.host, self.config.port, self.config.keepalive)

This line somehow splits the thread into two separate threads. One thread the normal FlaskThread and one gevent.threading._DummyThread. Seems to be linked to accessing threading.currentThread() or similar.

This is causing a constant reconnection issue for mqtt and probably other issues. It stops happening if monkey.patch_all() (https://github.com/NubeIO/rubix-point-server/blob/master/src/server.py#L14) is removed.


Debug code from mqtt_client_base.py:

print('mqtt client connecting...')
self._client.connect(self.config.host, self.config.port, self.config.keepalive)
print('    mqtt client connected')
print('    ', threading.currentThread())
print('    ', type(threading.currentThread()))

Output:

mqtt starting
mqtt client connecting...
    mqtt client connected
     <FlaskThread(Thread-1, started daemon 140669658012112)>
     <class 'src.background.FlaskThread'>
        mqtt client looping
    mqtt client connected
     <_DummyThread(Dummy-2, started daemon 140669658012112)>
     <class 'gevent.threading._DummyThread'>

Prevent threading being patched with curious_george.patch_all(thread=False) :

/home/dan/rubix-point-server/venv/lib/python3.8/site-packages/gunicorn/workers/ggevent.py:53: MonkeyPatchWarning: Patching more than once will result in the union of all True parameters being patched
  monkey.patch_all()

But then the event dispatching across threads breaks in a weird way where it appears to function correctly but the event never gets added to the event queue. The event queue object is the exact same object (address) across the different threads but the loop in the target thread never seems to receive the event.
seen in this line https://github.com/NubeIO/rubix-point-server/blob/master/src/services/event_service_base.py#L70 where it functions correctly on the correct object address but just disappears.
can be seen by calling this function https://github.com/NubeIO/rubix-point-server/blob/master/src/source_drivers/modbus/resources/point/point_singular.py#L88 from a Flask http request
/api/modbus/poll/point

{
    "network_type": "RTU",
    "network_rtu_port": "/dev/ttyUSB0", 

    "device_address": 1,

    "point_register": 1,
    "point_register_length": 2,
    "point_function_code": "READ_INPUT_REGISTERS",
    "point_data_type": "FLOAT",
    "point_write_value": 0
}

Some links:

@RaiBnod
Copy link
Member

RaiBnod commented Jan 6, 2021

As per @Shiny380, @zero88 has started looking on it. And I was also doing some inspections and changes on the project https://github.com/NubeIO/lora-raw same issue.

Till now my inspection is:

Working both of us on same task won't be fruitful so I will let @zero88 to work on. Or, if you wanna continue this inspection please let me know.

P.S: This task is on priority

@RaiBnod
Copy link
Member

RaiBnod commented Jan 6, 2021

P.S. check this one: https://github.com/NubeIO/lora-raw/pull/29

@zero88
Copy link
Contributor

zero88 commented Jan 6, 2021

It is my result after make simple test on rubix-bacnet-server without any change of current structure code.
As you see, MQTT client still connect and receive message successfully, however mqtt client keep restart constantly.

1

So, first assumption for causing a constant re-connection issue for mqtt is address correctly.
However, the cause that @Shiny380 for dummy thread is not correct. Flask thread and Dummy Thread still point to same id 140669658012112 and not growing.
And more importance, it is not blocker issue, then I think you're still able to develop project

After investigate more, I realize: paho-mqtt-client manage event-loop itself, and it will overlap and conflict with current gevent event loop as main event loop in our python project.
Then I think make an adapter to paho-mqtt with gevent like this docs and example

Furthermore, I've not yet looked your code in detail about Event Dispatcher or sth similar that, but might you are trying to implement eventbus system with multiple thread programming. I think you @Shiny380 should learn and use gevent.events instead of doing it by yourself. Also make whole application is seamless

For PR from @RaiBnod , I don't expect it. It makes custom thread out of server context. If server receive signal to stop/restart, it might stop gunicorn master thread but not for custom thread. And not yet test, but if run in multiple workers, it will call setup db and thread more than one time for each worker, it doesn't share thread context between each workers.

@RaiBnod
Copy link
Member

RaiBnod commented Jan 6, 2021

@zero88 it doesn't make custom thread out of server context, you can see we are wrapping things inside application context. If it was out of that that context, it shows failure on run-time at the db access.

Just go with the simple inspection through my PR: NubeIO/lora-raw#29 and test it, it works perfectly fine without any issue like yours in above. If you get any issue let me know and if anything better could you do that's also be appreciated.

My inspection is, as I had talked on above:

If we make pre-load=True and do a start, it will give us that flexibility to run code after that application gets ready. So you are doing those db.create_all() for creating table, and Threading related work on there. But on the other end, it's starts two processes when you do time.sleep(<seconds>) inside Threads. See the example below:

ezgif com-gif-maker (1)

After my change:

ezgif com-gif-maker (2)

And lastly, those consecutive two calls on that flow is causing issue. One gets the exact context value, and one gets None.

@zero88
Copy link
Contributor

zero88 commented Jan 7, 2021

Do you see a problem? @RaiBnod
If more than one workers, createDB and Thread is invoked more than one time

import os
os.environ['GEVENT_SUPPORT'] = 'True'
runfile('/data/projects/nubeio/rubix-py/rubix-lora/run.py', args=['-s', 'config.json', '--workers', '2'], wdir='/data/projects/nubeio/rubix-py/rubix-lora')

image

@zero88
Copy link
Contributor

zero88 commented Jan 7, 2021

After gevent.monkey.patch_all(), it is already made whole python thread concept to coroutine aka eventloop.
Then time.sleep() is become to gevent.sleep(), when it is invoked, it will fork worker thread to sleep, instead of main current thread. Then it will create dummy thread
I don't see any wrong with time.sleep() in here.

{'_gevent_saved_patch_all_module_settings': {'socket': True, 'dns': True, 'time': True, 'select': True, 'thread': True, 'os': True, 'ssl': True, 'subprocess': True, 'sys': False, 'aggressive': True, 'Event': True, 'builtins': True, 'signal': True, 'queue': True, 'contextvars': True}, 'os': {'fork': <built-in function fork>, 'forkpty': <built-in function forkpty>, 'waitpid': <built-in function waitpid>, 'posix_spawn': <built-in function posix_spawn>, 'posix_spawnp': <built-in function posix_spawnp>}, '_thread': {'allocate_lock': <built-in function allocate_lock>, 'get_ident': <built-in function get_ident>, 'exit': <built-in function exit>, 'LockType': <class '_thread.lock'>, 'stack_size': <built-in function stack_size>, 'start_new_thread': <built-in function start_new_thread>, '_local': <class '_thread._local'>}, 'threading': {'local': <class '_thread._local'>, '_start_new_thread': <built-in function start_new_thread>, '_allocate_lock': <built-in function allocate_lock>, 'Lock': <built-in function allocate_lock>, '_DummyThread': <class 'threading._DummyThread'>, 'Thread': <class 'threading.Thread'>, 'Timer': <class 'threading.Timer'>, '_set_sentinel': <built-in function _set_sentinel>, 'get_ident': <built-in function get_ident>, '_CRLock': <class '_thread.RLock'>, 'Event': <class 'threading.Event'>, '_shutdown': <function _shutdown at 0x7fb68ead9b80>}, 'logging': {'_lock': <unlocked _thread.RLock object owner=0 count=0 at 0x7fb68e8e7720>}, '_threading_local': {'local': <class '_threading_local.local'>}, 'time': {'sleep': <built-in function sleep>}, 'socket': {'create_connection': <function create_connection at 0x7fb68ea7c1f0>, 'socket': <class 'socket.socket'>, 'SocketType': <class '_socket.socket'>, 'fromfd': <function fromfd at 0x7fb68ea5d3a0>, 'socketpair': <function socketpair at 0x7fb68ea77af0>, 'getaddrinfo': <function getaddrinfo at 0x7fb68ea7c3a0>, 'gethostbyname': <built-in function gethostbyname>, 'gethostbyname_ex': <built-in function gethostbyname_ex>, 'gethostbyaddr': <built-in function gethostbyaddr>, 'getnameinfo': <built-in function getnameinfo>, 'getfqdn': <function getfqdn at 0x7fb68ea77b80>}, 'select': {'select': <built-in function select>, 'poll': <built-in function poll>, 'epoll': <class 'select.epoll'>}, 'selectors': {'DefaultSelector': <class 'selectors.EpollSelector'>, 'EpollSelector': <class 'selectors.EpollSelector'>}, 'ssl': {'SSLContext': <class 'ssl.SSLContext'>, 'SSLSocket': <class 'ssl.SSLSocket'>, 'wrap_socket': <function wrap_socket at 0x7fb68e445a60>, 'get_server_certificate': <function get_server_certificate at 0x7fb68e447430>}, 'subprocess': {'Popen': <class 'subprocess.Popen'>, 'call': <function call at 0x7fb68e8c9310>, 'check_call': <function check_call at 0x7fb68e8c93a0>, 'check_output': <function check_output at 0x7fb68e8c9430>, '_posixsubprocess': <module '_posixsubprocess' from '/usr/lib64/python3.8/lib-dynload/_posixsubprocess.cpython-38-x86_64-linux-gnu.so'>, 'run': <function run at 0x7fb68e8c94c0>, 'CompletedProcess': <class 'subprocess.CompletedProcess'>, '_use_posix_spawn': <function _use_posix_spawn at 0x7fb68e8c98b0>, '_USE_POSIX_SPAWN': True}, 'signal': {'signal': <function signal at 0x7fb68e966940>, 'getsignal': <function getsignal at 0x7fb68e9669d0>}, 'queue': {'SimpleQueue': <class '_queue.SimpleQueue'>}}

@zero88
Copy link
Contributor

zero88 commented Jan 7, 2021

If you want to manage your threading by yourself, just not use gevent anymore.
use worker_class: sync

@zero88
Copy link
Contributor

zero88 commented Jan 7, 2021

moreover, your fix is just tricky workaround to avoid starting thread in worker initialization time, but I'm not sure it works if create new thread in runtime, and time.sleep() should be forbid??
And when I say custom thread out of server context:

  • server context is gunicorn
  • application context is flask
    Then spawn custom thread that is not managed by gunicorn is bad practice, it will make thread leak issue when threading is not well managed.

@zero88
Copy link
Contributor

zero88 commented Jan 7, 2021

If all of you want to leverage the power of gevent to make asynchronous application, I invite you to learn from here before starting implement more complexity multiple threading.
Otherwise:
If you want to manage your threading by yourself, just don't use gevent anymore.
use worker_class: sync

@RaiBnod
Copy link
Member

RaiBnod commented Jan 7, 2021

Even though we use woker_class: sync it shows same issue on multiple workers.

So let come into the conclusion:

Issue: It has issue when we have multiple workers.

When we do multiple workers turned on our system, all our executable codes gets forked into multiples.

In our case, Background tasks are also gets forked into workers numbers. And on our background task we are establishing a connection with MQTT. Since we have multiple workers: it will try to establish multiple connections on the same program. And it shows the issue as:

1609991993: Client lora-raw-mqtt already connected, closing old connection.
1609991993: New client connected from 172.17.0.1 as lora-raw-mqtt (p2, c1, k60).
1609991994: New connection from 172.17.0.1 on port 1883.
1609991994: Client lora-raw-mqtt already connected, closing old connection.
1609991994: New client connected from 172.17.0.1 as lora-raw-mqtt (p2, c1, k60).
1609991995: New connection from 172.17.0.1 on port 1883.
1609991995: Client lora-raw-mqtt already connected, closing old connection.

So, I was running with only one worker. Believing that, we could do some sort solution later on for establishing MQTT connection on from same run.

Do you have any solution for handling this? If you have please do it, otherwise we will proceed with 1 worker atm and later we will dig out on it bit.

@zero88
Copy link
Contributor

zero88 commented Jan 7, 2021

Sorry, you're trying your solution with sync @RaiBnod

Just use latest code, with my tweak some configuration and follow my setup in idea in here

It is my result: NubeIO/rubix-bacnet-server#75

image

Note
Btw, if mqtt-broker kick out mqtt-client in simple case, it is due to same mqtt-client name. So please lookup into your code, and ensure only one mqtt client is initialized, and take care with your threading issue

Conclusion @RaiBnod @Shiny380

As I said above, repeat myself:

If all of you want to leverage the power of gevent to make asynchronous application, I invite you to learn from here before starting implement more complexity multiple threading.
Otherwise:
If you want to manage your threading by yourself, just don't use gevent anymore.
use worker_class: sync

In case of still want to use gevent, must make adapter to paho.mqtt as repeat myself:

After investigate more, I realize: paho-mqtt-client manage event-loop itself, and it will overlap and conflict with current gevent event loop as main event loop in our python project.
Then I think make an adapter to paho-mqtt with gevent like this docs and example

So, to simple for your development, just make changes as I did in NubeIO/rubix-bacnet-server#75

@zero88 zero88 assigned RaiBnod and Shiny380 and unassigned zero88 Jan 7, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants