Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot load "My Subscriptions" #529

Closed
benyamin-codez opened this issue Oct 30, 2023 · 31 comments
Closed

Cannot load "My Subscriptions" #529

benyamin-codez opened this issue Oct 30, 2023 · 31 comments
Labels
bug Something isn't working

Comments

@benyamin-codez
Copy link

Context

Please provide any relevant information about your setup

  • Add-on Version: 7.0.2.2.matrix.1
  • Kodi Version: 19.5
  • Kodi GUI Language: English
  • Operating System: Linux 5.10.78-7-osmc armv7l GNU/Linux (Raspberry Pi 3B+)
  • Operating System Language: English

Expected Behavior

When selecting "My Subscriptions" the subscription feed loads and is displayed.


Current Behavior

When selecting "My Subscriptions" the subscription feed continually loads (never stops) and an error message is displayed stating that a new thread cannot be started.


Steps to Reproduce

Please provide detailed steps for reproducing the issue.

  1. Select "My Subscriptions" on resource poor platforms and wait...

Log

If you really want it let me know and I will consider dropping one, but know that I don't really have the time to perform necessary sanitization of GPS and other identifying data from it...


Additional Information

Platform is a Raspberry Pi Model 3B+.

I suspected a possible race condition on resource constrained hardware.

I modified per the following to resolve the problem.

--- ./plugin.video.youtube-7.0.2.2.matrix.1_release/resources/lib/youtube_plugin/youtube/client/youtube.py
+++ ./plugin.video.youtube-7.0.2.2.matrix.1_wip/resources/lib/youtube_plugin/youtube/client/youtube.py
@@ -14,6 +14,7 @@
 import threading
 import traceback
 import xml.etree.ElementTree as ET
+import time
 
 import requests
 
@@ -847,7 +848,19 @@
                               responses)
                     )
                     threads.append(thread)
-                    thread.start()
+                    threadcount = len(threads)
+                    for _ in range(5):
+                        try:
+                            thread.start()
+                        except Exception as e:
+                            err = e
+                            _context.log_debug('Failed to start thread ' + str(threadcount) + ' for channel_id ' + channel_id)
+                            time.sleep(0.3)
+                            continue
+                        else:
+                            break
+                    else:
+                        raise err
 
                 for thread in threads:
                     thread.join(30)

Following this, the log shows many threads restarting on second try, and none needing more than three tries.


Hope that helps...!

Ben

@benyamin-codez benyamin-codez added the bug Something isn't working label Oct 30, 2023
@MoojMidge
Copy link
Collaborator

Yeah, in hindsight this was not the best solution to the previous problem.

I'm in the process of changing all requests to use a single connection pool, with a single session per thread reusing the same connection pool, with the threads being managed in a FIFO fixed length queue. Should fix this problem.

@benyamin-codez
Copy link
Author

benyamin-codez commented Oct 31, 2023

I looked at queue as an alternative too. It would certainly be more elegant.

I didn't go with it because at the time I wasn't sure it would have solved the problem. In my environment the root cause could have been any number of things.

I also found thread failure is sporadic with a decreasing frequency with each script run. In fact, the original issue can also be overcome by hitting Esc and manually retrying somewhere between 3 and 10 times. The number of retries required for success decreases. Similarly, with the workaround in place, the log shows a reduction in errors with every run. Eventually the log will not show any threading errors. After some time, this resets and the failure frequency returns to what it was on first run. I considered maybe some garbage collection was occurring or a connection was timing out.

In any case, I think queue will probably fix this for me too.

@MoojMidge
Copy link
Collaborator

I also found thread failure is sporadic with a decreasing frequency with each script run. In fact, the original issue can also be overcome by hitting Esc and manually retrying somewhere between 3 and 10 times. The number of retries required for success decreases. Similarly, with the workaround in place, the log shows a reduction in errors with every run. Eventually the log will not show any threading errors. After some time, this resets and the failure frequency returns to what it was on first run. I considered maybe some garbage collection was occurring or a connection was timing out.

Hmm the queue by itself may not be enough. Will probably need to catch the RuntimeError as you have done, and implement some back-off functionality to the filling.

Can you add the following before the time.sleep call in your workaround? While the current code has no bounds on the number of Threads that are spawned, which is obiously problematic for large subscription lists or resource constrained devices, it would be interesting to quantify the issue.

_context.log_debug(f'{threading.active_count() = }')
_context.log_debug(f'{threading.enumerate() = }')

@benyamin-codez
Copy link
Author

Yeah, I have approx 215 subs in my feed - not sure if that would be considered "large" though...
I guess "large" is a bit like "soon" and somewhat subjective. 8^d

Just to be clear, I did check system proc limits with ulimit -u and also ps -fLu osmc | wc -l during the event, and didn't really see any issues. Without debugging Python's threading.py it would be hard to discern why the thread actually wasn't starting. I did go down this path but there was really too much noise to discern what might be happening.

I dropped to the _context.log_error level and added the enum and count you requested.
Please see the attached log snip. I hope it is of some value to you.
20231102_youtube_threads.log

I still think it is a race condition, but couldn't say where or why...

@MoojMidge
Copy link
Collaborator

Thanks for that.

Seems like there are two inter-related problems.

A large number of threads are spawned all at once, possibly hitting the per process thread limit based on how much system memory you have and max thread stack size. Unfortunately I am not sure on the specifics of how this is exactly determined, but Python now defaults to limiting the max threads it opens in a thread pool to be your CPU core count + 4 up to a max of 32 i.e. a range of 5 to 32, so will probably do the same.

The second problem is that some threads are closing in a timely manner, but others are not. The flood of connections may be causing your network to choke or for Youtube to be throttling or rate limiting responses. Will likely not be an issue when using a more reasonable amount of threads.

@benyamin-codez
Copy link
Author

The second problem is that some threads are closing in a timely manner, but others are not. The flood of connections may be causing your network to choke or for Youtube to be throttling or rate limiting responses. Will likely not be an issue when using a more reasonable amount of threads.

It might be worth mentioning that the list does usually load within 30 seconds, despite the use of thread.join(30), suggesting that none of the threads are timing out. It could be a race between connection setup for each thread? I do very occasionally (very rarely) see 404s from YT on a couple of random channel_ids which might be an indication I am hitting some rate limiting limits, so perhaps YT is throttling responses also.

@MoojMidge
Copy link
Collaborator

The join method, with a specified timeout, doesn't really do anything special if the thread has not completed within the timeout period. It is simply blocking the main thread and giving the child thread the opportunity to complete, but if it doesn't end, or ends early, then the main thread will still continue as normal.

The original problem was that the large number of simultaneous requests was causing a large number of connections to be created, which in turn caused the SSL handshake of the connections to fail, without being caught.

The original PR fixed the problem with multiple simultaneous connections by using a single session and connection pool, and also limiting the number of concurrent connections and reusing them when the requests were complete. However it didn't check if the requests were timing out or otherwise failing, and the original code would block indefinitely waiting for the threads to complete: https://github.com/anxdpanic/plugin.video.youtube/pull/478/files#diff-5ecde54fa534f31c07c435544e8497cb462674901a9d688be492bfbbde50b579R824-R852

My idea at the time was to add timeouts to the requests, and the 30s thread.join timeout value is based on the request timeout period. That's why you don't see the threads hanging even if join doesn't do much, because the requests themselves have terminated, in a specific effort to make sure that Kodi wouldn't hang.

I just didn't think this through enough to realise the issue with creating the threads in the first place, as I don't really use this particular functionality.

@benyamin-codez
Copy link
Author

Thanks for the explanation @MoojMidge.

I see there's been some further improvements in that section of code since the pull you referenced (#478), such as the timeout tupple on the session.get, the subsequent error handling, and the join timeout catch-all we discussed above.

Looking a little deeper, I see the HTTPAdapter (currently at line 828 in master), i.e. adapter = requests.adapters.HTTPAdapter(pool_maxsize=5, pool_block=True) divides the number of requests from child threads into 5 per session (via pool_maxsize=5) and the adapter blocks further requests from the thread pool until a connection is available (via pool_block=True). I'm fairly certain the use of pool_block=True is the root cause of the problem.

Normally pool_maxsize would be equal to the number of child threads spawned so that new sessions are not created. The trouble here is we cannot forsee how many threads will be created. Despite much errata to the contrary, when pool_block=False and the the pool_maxsize is reached a new session is created, apparently using the same TCP connection. As these sessions close, they are consolidated into the one pool. When pool_block=True these new sessions cannot be started, leading to the exception being raised.

It's probably worth mentioning the HTTPAdapter default pool_connections=10 will only ever use 1 pool when matching www.youtube.com. There might be some value to specifying pool_connections=1 when instantiating the adapter.

All of the following don't produce any errors:

adapter = requests.adapters.HTTPAdapter(pool_maxsize=300, pool_block=True)
adapter = requests.adapters.HTTPAdapter(pool_maxsize=5)
adapter = requests.adapters.HTTPAdapter(pool_connections=1, pool_maxsize=5)
adapter = requests.adapters.HTTPAdapter(pool_connections=1, pool_maxsize=30)

I'd like to confirm what is happening in urllib3 but I'm having some trouble enabling debug logging for same. Any hints?

@MoojMidge
Copy link
Collaborator

MoojMidge commented Nov 6, 2023

Not really something I know too much about, but I'll try to respond based on my current understanding of the issue. Note that I may interchangeably refer to parts of the urllib3 and requests API (requests more predominantly because it is what is directly used in this plugin), but hopefully the API and terminology should be sufficiently similar to be able to follow either way.

I'm fairly certain the use of pool_block=True is the root cause of the problem.

Yes and no. Yes - it can prevent the remaining threads, with the pending requests, from ending, while waiting for the requests currently in the pool to complete. No - I don't think it is the root cause of the issue. See below.

Normally pool_maxsize would be equal to the number of child threads spawned so that new sessions are not created. The trouble here is we cannot forsee how many threads will be created. Despite much errata to the contrary, when pool_block=False and the the pool_maxsize is reached a new session is created, apparently using the same TCP connection. As these sessions close, they are consolidated into the one pool. When pool_block=True these new sessions cannot be started, leading to the exception being raised.

I don't think this is correct. There is only one Session instance being created. The ConnectionPool instances are not responsible for creating the Session instances, it is the other way around, with the Session creating adapters as required, which in turn create a ConnectionPool instance as required, depending on the number of different hosts that are being connected to, up to a maximum default of 10 based on the pool_connections=10 parameter.

By default, each individual ConnectionPool created for a Session will only reuse one HTTPConnection after the request is completed, for use by subsequent requests, based on the default pool_maxsize=1 parameter.

If pool_block=False then there is no limit on the number of connections that can be made to a specific host, however only the number specified by pool_maxsize will be returned to the ConnectionPool for reuse.

If pool_block=True then the creation of additional HTTPConnection instances will be blocked to limit the total number in the ConnectionPool to be no greater than pool_maxsize. If this is not used, then the original problem can re-occur, because the number of connections are not limited, however only pool_maxsize connections will be reused, with additional HTTPConnection instances created for the additional threads that are created.

Because the additional connections are being prevented from being created, this causes the threads to stay running for longer, however all four example adaptor instances that didn't produce error for you, are all essentially doing the same thing - creating new connections for all 215 subscriptions you have, all at once, which is what was causing the original problem for other people.

It is just a trade-off between managing network resources and CPU/memory resources.

It's probably worth mentioning the HTTPAdapter default pool_connections=10 will only ever use 1 pool when matching www.youtube.com. There might be some value to specifying pool_connections=1 when instantiating the adapter.

That's right, but it shouldn't create new pools unless required. However, from what I can tell requests.Session is not thread safe and while it may not make a difference when only connecting to a single host, I think what should be done more generally is to use a common HTTPAdapter across multiple Session instances, which will create a thread safe ConnectionPool instance per host, as required by the requests that are made using the Session.

This is what I am intending to do, because while the issue with fetching subscriptions results in connections to only a single host, there are multiple other requests being made to different Google/Youtube hosts at various times in this plugin, that will all benefit from using a unified request mechanism.

In this way connections can be reused throughout the plugin, which should both be faster, while also reducing memory and socket usage, and also preventing Kodi from hanging when there are network issues. For this, the default pool_connections=10 seems appropriate, along with pool_maxsize=10 and pool_block=True.

This is currently a WIP, the common module is done and works pretty well across the plugin. The next step is to finish the thread management.

I'd like to confirm what is happening in urllib3 but I'm having some trouble enabling debug logging for same. Any hints?

Adding the following to youtube.py (or anywhere else really) should do the trick.

import logging

logging.basicConfig()
logging.getLogger("urllib3").setLevel(logging.DEBUG)

@benyamin-codez
Copy link
Author

My understanding is that requests now requires urllib3. If you look at script.module.requests/lib/requests/init.py you will see it imports urllib3 and script.module.requests/lib/requests/adapters.py utilises a urllib3.poolmanager to instantiate the HTTPAdapter.

Unfortunately, I found your suggestion above for urllib3 logging clobbered the kodi log, so I added requests.urllib3.add_stderr_logger() near the top of youtube.py instead. The function is defined in script.module.urllib3/lib/init.py with a default of level=logging.DEBUG. This made it pretty easy to work out what was happening.

You are correct that a new Session is not instantiated, but rather a new HTTPConnection until pool_maxsize is reached. Whilst requests.Session() creates default adapters when initialized, in our case the session object is mounted with our custom HTTPAdapter.

When pool_block=False each HTTPConnection created above pool_maxsize will return to the ConnectionPool for reuse when a slot is freed up following the closure of a HTTPConnection already in the pool. In this way any "extra" HTTPConnection can be returned for use in the pool. I erroneously conflated "session" with HTTPConnection, which is clearly a TCP connection.

Just to be clear, in my testing with pool_block=False, pool_maxsize=5 resulted in 30 connections and pool_maxsize=20 resulted in 36 connections. So the use of the HTTPAdapter is certainly an improvement in any case.

FYI, release 1.26.16 of urllib3 made poolmanager threadsafe. Also the default for pool_maxsize is 10. I note the comments from @cas-- as to why he chose pool_maxsize=5 in #280 / #478... However, in my case the default seems to be a bit of a sweet spot instead.

I also think that consolidating the request mechanics into a unified function is a great idea. Given the defaults, requests.adapters.HTTPAdapter(pool_block=True) should be sufficient for instantiating the HTTPAdapter, although you may want to consider using urllib3.util.retry for max_retries.e.g. max_retries=urllib3.util.retry(total=3, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504]). Are you intending to modify _request() in youtube.py?

I note the max_retries parameter above cannot resolve the thread.start() issue as the the thread must start before the max_retries functionality can be of any effect.

I think queue could work, but won't be as efficient. It is probably overkill as the child threads don't need to share data. It would seem a thread.start() retry mechanism is still required in any case...

@benyamin-codez
Copy link
Author

benyamin-codez commented Nov 6, 2023

FYI, "final" diff:

--- ./plugin.video.youtube-7.0.2.2.matrix.1_release/resources/lib/youtube_plugin/youtube/client/youtube.py
+++ ./plugin.video.youtube-7.0.2.2.matrix.1_wip/resources/lib/youtube_plugin/youtube/client/youtube.py
@@ -17,6 +17,8 @@
 
 import requests
 
+from time import sleep
+from urllib3.util import Retry
 from .login_client import LoginClient
 from ..youtube_exceptions import YouTubeException
 from ..helper.video_info import VideoInfo
@@ -26,6 +28,7 @@
 
 _context = Context(plugin_id='plugin.video.youtube')
 
+#requests.urllib3.add_stderr_logger()
 
 class YouTube(LoginClient):
     def __init__(self, config=None, language='en-US', region='US', items_per_page=50, access_token='', access_token_tv=''):
@@ -825,7 +828,15 @@
                 session = requests.Session()
                 session.headers = headers
                 session.verify = self._verify
-                adapter = requests.adapters.HTTPAdapter(pool_maxsize=5, pool_block=True)
+                retries = Retry(
+                    total=3,
+                    backoff_factor=0.1,
+                    status_forcelist=[500, 502, 503, 504]
+                )
+                adapter = requests.adapters.HTTPAdapter(
+                    pool_block=True,
+                    max_retries=retries
+                )
                 session.mount("https://", adapter)
                 responses = []
 
@@ -847,7 +858,21 @@
                               responses)
                     )
                     threads.append(thread)
-                    thread.start()
+                    for _ in range(5):
+                        try:
+                            thread.start()
+                        except Exception as e:
+                            err = e
+                            _context.log_error('Failed to start thread ' + str(len(threads)) + ' for channel_id ' + channel_id)
+                            #_context.log_debug(f'{threading.active_count() = }')
+                            #_context.log_debug(f'{threading.enumerate() = }')
+                            sleep(0.15)
+                            continue
+                        else:
+                            _context.log_debug('Success starting thread ' + str(len(threads)) + ' for channel_id ' + channel_id)
+                            break
+                    else:
+                        raise err
 
                 for thread in threads:
                     thread.join(30)

EDIT: Removed threadcount... Note extra debugging for urllib3 and threading is commented out.
EDIT: Cleaned up time.sleep. Sleeping for 0.15s is sufficient, i.e. no discernible difference with 0.3s but must be > 0.1s.

@benyamin-codez
Copy link
Author

A large number of threads are spawned all at once, possibly hitting the per process thread limit based on how much system memory you have and max thread stack size. Unfortunately I am not sure on the specifics of how this is exactly determined, but Python now defaults to limiting the max threads it opens in a thread pool to be your CPU core count + 4 up to a max of 32 i.e. a range of 5 to 32, so will probably do the same.

@MoojMidge , I've been playing with concurrent.futures.ThreadPoolExecutor() and think it is a bit more reliable for a range of potential problems. For the OP problem, i.e. a new thread cannot be started, it appears the max_workers argument must be manually set very high to reliably reproduce the problem. It appears to never occur when max_workers is less than or equal to the HTTPAdapter argument pool_maxsize. From what I've seen it does a pretty good job of not stuffing the HTTPAdapter with too many threads at once and as such avoids the problem.

I'm just doing some more testing and will trim my edits before posting here...

@benyamin-codez
Copy link
Author

Ok, so using concurrent.futures.ThreadPoolExecutor() I wasn't able to reproduce the problem using the default values for the concurrent.futures.ThreadPoolExecutor._max_workers property, which is 8 on a Raspberry Pi 3 B+, nor with the maximum max_workers default of 32 (which I manually specified in order to test). I could reproduce the problem with some frequency when using a max_workers value between 300 to 500. At 1500 the problem was reliably reproducible.

This is all predicated on the HTTPAdapter using the default pool_maxsize of 10.

Seeing the HTTPAdapter I propose includes the urllib3 retry mechanism, in the end I didn't see a need to include retry mechanics for the actual fetch. The same applies to thread creation as the concurrent.futures.ThreadPoolExecutor() appears to be very reliable. As such, I think the following is sufficient:

--- CodeSafe/__dev/kodi/youtube/plugin.video.youtube-7.0.2.2.matrix.1_release/resources/lib/youtube_plugin/youtube/client/youtube.py
+++ CodeSafe/__dev/kodi/youtube/plugin.video.youtube-7.0.2.2.matrix.1_wip/resources/lib/youtube_plugin/youtube/client/youtube.py
@@ -12,11 +12,13 @@
 import json
 import re
 import threading
+import concurrent.futures
 import traceback
 import xml.etree.ElementTree as ET
 
 import requests
 
+from urllib3.util import Retry
 from .login_client import LoginClient
 from ..youtube_exceptions import YouTubeException
 from ..helper.video_info import VideoInfo
@@ -825,11 +827,14 @@
                 session = requests.Session()
                 session.headers = headers
                 session.verify = self._verify
-                adapter = requests.adapters.HTTPAdapter(pool_maxsize=5, pool_block=True)
+                retries = Retry(total=3, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
+                adapter = requests.adapters.HTTPAdapter(pool_block=True, max_retries=retries)
                 session.mount("https://", adapter)
                 responses = []
 
-                def fetch_xml(_url, _responses):
+                def fetch_xml(_e_args):
+                    _url = f'https://www.youtube.com/feeds/videos.xml?channel_id={_e_args[0]}'
+                    _responses = _e_args[1]
                     try:
                         _response = session.get(_url, timeout=(3.05, 27))
                         _response.raise_for_status()
@@ -839,18 +844,35 @@
                         return
                     _responses.append(_response)
 
-                threads = []
-                for channel_id in sub_channel_ids:
-                    thread = threading.Thread(
-                        target=fetch_xml,
-                        args=('https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id,
-                              responses)
-                    )
-                    threads.append(thread)
-                    thread.start()
-
-                for thread in threads:
-                    thread.join(30)
+                def tpool_handler():
+                    with concurrent.futures.ThreadPoolExecutor() as fetch_tpool:
+                        e_args = []
+                        fetch_args = []
+                        for channel_id in sub_channel_ids:
+                            e_args = [channel_id, responses]
+                            fetch_args.append(e_args)
+                        try:
+                            futures = {fetch_tpool.submit(fetch_xml, task): task for task in fetch_args}
+                        except Exception as e:
+                            fetch_tpool.shutdown(cancel_futures=True)
+                            return e
+                        else:
+                            while len(futures) > 0:
+                                pend_futures = {}
+                                done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
+                                for tsk in done:
+                                    if tsk.exception(): raise tsk.exception()
+                                for tsk in not_done:
+                                    task = futures[tsk]
+                                    pend_futures[tsk] = task
+                                futures = pend_futures
+                            return True
+                
+                tp_status = tpool_handler()
+                if not tp_status == True:
+                    _context.log_error('FATAL : Failed to start Channel XML Fetch ThreadPool executor or component.')
+                    raise tp_status
+                
                 session.close()
 
                 for response in responses:

Here it is again with some error and minor debug logging:

--- CodeSafe/__dev/kodi/youtube/plugin.video.youtube-7.0.2.2.matrix.1_release/resources/lib/youtube_plugin/youtube/client/youtube.py
+++ CodeSafe/__dev/kodi/youtube/plugin.video.youtube-7.0.2.2.matrix.1_wip/resources/lib/youtube_plugin/youtube/client/youtube.py
@@ -12,11 +12,13 @@
 import json
 import re
 import threading
+import concurrent.futures
 import traceback
 import xml.etree.ElementTree as ET
 
 import requests
 
+from urllib3.util import Retry
 from .login_client import LoginClient
 from ..youtube_exceptions import YouTubeException
 from ..helper.video_info import VideoInfo
@@ -825,11 +827,14 @@
                 session = requests.Session()
                 session.headers = headers
                 session.verify = self._verify
-                adapter = requests.adapters.HTTPAdapter(pool_maxsize=5, pool_block=True)
+                retries = Retry(total=3, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
+                adapter = requests.adapters.HTTPAdapter(pool_block=True, max_retries=retries)
                 session.mount("https://", adapter)
                 responses = []
 
-                def fetch_xml(_url, _responses):
+                def fetch_xml(_e_args):
+                    _url = f'https://www.youtube.com/feeds/videos.xml?channel_id={_e_args[0]}'
+                    _responses = _e_args[1]
                     try:
                         _response = session.get(_url, timeout=(3.05, 27))
                         _response.raise_for_status()
@@ -839,18 +844,45 @@
                         return
                     _responses.append(_response)
 
-                threads = []
-                for channel_id in sub_channel_ids:
-                    thread = threading.Thread(
-                        target=fetch_xml,
-                        args=('https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id,
-                              responses)
-                    )
-                    threads.append(thread)
-                    thread.start()
-
-                for thread in threads:
-                    thread.join(30)
+                def tpool_handler():
+                    with concurrent.futures.ThreadPoolExecutor() as fetch_tpool:
+                        e_args = []
+                        fetch_args = []
+                        
+                        for channel_id in sub_channel_ids:
+                            e_args = [channel_id, responses]
+                            fetch_args.append(e_args)
+                        
+                        try:
+                            futures = {fetch_tpool.submit(fetch_xml, task): task for task in fetch_args}
+                        except Exception as e:
+                            fetch_tpool.shutdown(cancel_futures=True)
+                            return e
+                        else:
+                            while len(futures) > 0:
+                                pend_futures = {}
+                                done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
+                                _context.log_debug(f'Channel XML Fetch Active Threads : {len(futures)} | Done : {len(done)} | Not Done : {len(not_done)}')
+                                for tsk in done:
+                                    if tsk.exception():
+                                        _context.log_error(f'FATAL : Failed to fetch xml data for channel_id {task[2]}')
+                                        raise tsk.exception()
+                                    else:
+                                        task = futures[tsk]
+                                        _context.log_debug(f'SUCCESS : Fetched xml data for channel_id {task[2]}')
+                                for tsk in not_done:
+                                    task = futures[tsk]
+                                    pend_futures[tsk] = task
+                                futures = pend_futures
+                            return True
+                
+                tp_status = tpool_handler()
+                if not tp_status == True:
+                    _context.log_error('FATAL : Failed to start Channel XML Fetch ThreadPool executor or component.')
+                    raise tp_status
+                else:
+                    _context.log_debug(f'SUCCESS : Exited Channel XML Fetch ThreadPool cleanly | Status : {tp_status}')
+                
                 session.close()
 
                 for response in responses:

This one has full error and debug logging (but without timekeeping and statistical logging), and includes urllib3 debug logging and a retry capability for threadpool start and xml fetch operations (the latter being largely redundant when using the included HTTPAdapter with urllib3 retry mechanics):

--- CodeSafe/__dev/kodi/youtube/plugin.video.youtube-7.0.2.2.matrix.1_release/resources/lib/youtube_plugin/youtube/client/youtube.py
+++ CodeSafe/__dev/kodi/youtube/plugin.video.youtube-7.0.2.2.matrix.1_wip/resources/lib/youtube_plugin/youtube/client/youtube.py
@@ -12,11 +12,13 @@
 import json
 import re
 import threading
+import concurrent.futures
 import traceback
 import xml.etree.ElementTree as ET
 
 import requests
 
+from urllib3.util import Retry
 from .login_client import LoginClient
 from ..youtube_exceptions import YouTubeException
 from ..helper.video_info import VideoInfo
@@ -24,8 +26,16 @@
 from ...kodion.utils import datetime_parser
 from ...kodion.utils import to_unicode
 
+# Maximum times to attempt to fetch channel data.
+# Session HTTPAdapter already also tries 3 times.
+# This value acts as a multiplier of session retries.
+MAX_FETCH_XML_RETRIES = 1
+MAX_THREADSTART_RETRIES = 5
+
+
 _context = Context(plugin_id='plugin.video.youtube')
 
+requests.urllib3.add_stderr_logger()
 
 class YouTube(LoginClient):
     def __init__(self, config=None, language='en-US', region='US', items_per_page=50, access_token='', access_token_tv=''):
@@ -825,11 +835,13 @@
                 session = requests.Session()
                 session.headers = headers
                 session.verify = self._verify
-                adapter = requests.adapters.HTTPAdapter(pool_maxsize=5, pool_block=True)
+                retries = Retry(total=3, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
+                adapter = requests.adapters.HTTPAdapter(pool_block=True, max_retries=retries)
                 session.mount("https://", adapter)
-                responses = []
 
-                def fetch_xml(_url, _responses):
+                def fetch_xml(_e_args):
+                    _url = f'https://www.youtube.com/feeds/videos.xml?channel_id={_e_args[2]}'
+                    _responses = _e_args[3]
                     try:
                         _response = session.get(_url, timeout=(3.05, 27))
                         _response.raise_for_status()
@@ -837,20 +849,76 @@
                         _context.log_debug('Response: {0}'.format(error.response and error.response.text))
                         _context.log_error('Failed |%s|' % traceback.print_exc())
                         return
+                    except Exception as e:
+                        _context.log_error(f'Task # {_e_args[0]} failed to perform a clean Channel XML Fetch for channel_id {_e_args[2]}.')
+                        raise e
                     _responses.append(_response)
 
-                threads = []
-                for channel_id in sub_channel_ids:
-                    thread = threading.Thread(
-                        target=fetch_xml,
-                        args=('https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id,
-                              responses)
-                    )
-                    threads.append(thread)
-                    thread.start()
-
-                for thread in threads:
-                    thread.join(30)
+                def tpool_handler(_attempt):
+                    with concurrent.futures.ThreadPoolExecutor() as fetch_tpool:
+                        task_idx = 0
+                        retry_ctr = 0
+                        e_args = []
+                        fetch_args = []
+                        _context.log_debug(f'Channel XML Fetch ThreadPool has {fetch_tpool._max_workers} workers...')
+                        
+                        for channel_id in sub_channel_ids:
+                            e_args = [task_idx, retry_ctr, channel_id, responses]
+                            fetch_args.append(e_args)
+                            task_idx +=1
+                        
+                        try:
+                            futures = {fetch_tpool.submit(fetch_xml, task): task for task in fetch_args}
+                        except Exception as e:
+                            _context.log_error(f'Failed to start Channel XML Fetch ThreadPool executor or component on attempt # {(_attempt + 1)} of {MAX_THREADSTART_RETRIES}. ')
+                            try:
+                                futures
+                            except NameError:
+                                _context.log_error(f'No Threads to cancel. Retrying...')
+                            except:
+                                _context.log_error(f'Threads to be cancelled before retrying : {len(futures)}')
+                            finally:
+                                fetch_tpool.shutdown(cancel_futures=True)
+                                return e
+                        else:
+                            _context.log_debug('SUCCESS : All jobs have been submitted to the Channel XML Fetch ThreadPool.')
+                            while len(futures) > 0:
+                                pend_futures = {}
+                                done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
+                                _context.log_debug(f'Channel XML Fetch Active Threads : {len(futures)} | Done : {len(done)} | Not Done : {len(not_done)}')
+                                for tsk in done:
+                                    if tsk.exception():
+                                        task = futures[tsk]
+                                        if task[1] < MAX_FETCH_XML_RETRIES:
+                                            task[1] += 1
+                                            _context.log_error(f'Failed to fetch xml for channel_id {task[2]}. Attempting retry # {task[1]} of {MAX_FETCH_XML_RETRIES}...')
+                                            pend_futures[fetch_tpool.submit(fetch_xml, task)] = task
+                                        else:
+                                            _context.log_error(f'FATAL : Failed to fetch xml data for channel_id {task[2]}')
+                                            raise tsk.exception()
+                                    else:
+                                        task = futures[tsk]
+                                        _context.log_debug(f'SUCCESS : Fetched xml data for channel_id {task[2]}')
+                                for tsk in not_done:
+                                    task = futures[tsk]
+                                    pend_futures[tsk] = task
+                                futures = pend_futures
+                            return True
+                        
+                for attempt in range(MAX_THREADSTART_RETRIES):
+                    responses = []
+                    tp_status = tpool_handler(attempt)
+                    if not tp_status == True:
+                        _context.log_debug(f'Channel XML Fetch ThreadPool failed. Retrying now... | Status : {tp_status}')
+                        continue
+                    else:
+                        _context.log_debug(f'SUCCESS : Exited Channel XML Fetch ThreadPool cleanly | Status : {tp_status}')
+                        break
+                else:
+                    if not tp_status == True:
+                        _context.log_error('FATAL : Failed to start Channel XML Fetch ThreadPool executor or component.')
+                        raise tp_status
+                
                 session.close()
 
                 for response in responses:

I hope some of it is of use... 8^d

@MoojMidge
Copy link
Collaborator

I hope some of it is of use... 8^d

It definitely is. Thanks for the investigation, will take me a while to look through this properly, just a bit busy with other things at the moment.

@benyamin-codez
Copy link
Author

I should probably point out the limitations.

Importantly, I used the submit() method rather than map() so results are not returned in order, but as they finish. The reason for using submit() was originally to avoid spinning the main thread but also to make use of Future objects, and more specifically add_done_callback(future) in conjunction with a locked condition.wait() and corresponding notify().

This didn't really work - I suspect because these particular threads are so short lived. Often the callback would execute and the future object would be torn down which would raise an undefined exception. This could actually be an upstream bug, as really the future shouldn't be torn down until after the callback returns.

Anyway, in the end, I used the concurrent.futures.wait() module function and let the main thread spin in a while loop. imho, the use of futures is superior to iterating over results using map(), so for this reason I continued to use submit().

The submit() method really requires all the function arguments in an array as it only accepts one argument before kwargs. One could use kwargs here, but you will see I used a dictionary comprehension to populate the futures variable with a collection of future objects. These objects only exist until a short time after they reach the "done" status, so we are dependent on the results being in the responses array (which outlives futures).

When an ordered response is required then a sort operation can be performed using an index (e.g. task_idx in the last diff), although a new parent responses array would need to first incorporate said index to allow for any such re-ordering once all futures are completed, e.g. something like responses = sorted(indexed_responses, key=lambda x: x[0])

Creating the thread pool using with takes care of the pool shutdown (which otherwise would have to be done manually) and should free up resources created within the conditional once it does shutdown.

Hope that helps explain some of the choices made... 8^d

@benyamin-codez
Copy link
Author

Update for v7.0.3.2:

--- plugin.video.youtube-7.0.3.2_release/resources/lib/youtube_plugin/youtube/client/youtube.py
+++ ThreadPool_XML_Fetch_7.0.3.2_release/resources/lib/youtube_plugin/youtube/client/youtube.py
@@ -11,6 +11,7 @@
 from __future__ import absolute_import, division, unicode_literals
 
 import threading
+import concurrent.futures
 import xml.etree.ElementTree as ET
 from copy import deepcopy
 from itertools import chain, islice
@@ -1507,23 +1508,41 @@
 
                 responses = []
 
-                def fetch_xml(_url, _responses):
+                def fetch_xml(_e_args):
+                    _url = f'https://www.youtube.com/feeds/videos.xml?channel_id={_e_args[0]}'
+                    _responses = _e_args[1]
                     _response = self.request(_url, headers=headers)
                     if _response:
                         _responses.append(_response)
 
-                threads = []
-                for channel_id in sub_channel_ids:
-                    thread = threading.Thread(
-                        target=fetch_xml,
-                        args=('https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id,
-                              responses)
-                    )
-                    threads.append(thread)
-                    thread.start()
-
-                for thread in threads:
-                    thread.join(30)
+                def tpool_handler():
+                    with concurrent.futures.ThreadPoolExecutor() as fetch_tpool:
+                        e_args = []
+                        fetch_args = []
+                        for channel_id in sub_channel_ids:
+                            e_args = [channel_id, responses]
+                            fetch_args.append(e_args)
+                        try:
+                            futures = {fetch_tpool.submit(fetch_xml, task): task for task in fetch_args}
+                        except Exception as e:
+                            fetch_tpool.shutdown(cancel_futures=True)
+                            return e
+                        else:
+                            while len(futures) > 0:
+                                pend_futures = {}
+                                done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
+                                for tsk in done:
+                                    if tsk.exception(): raise tsk.exception()
+                                for tsk in not_done:
+                                    task = futures[tsk]
+                                    pend_futures[tsk] = task
+                                futures = pend_futures
+                            return True
+
+                tp_status = tpool_handler()
+                if not tp_status == True:
+                    _context.log_error('FATAL : Failed to start Channel XML Fetch ThreadPool executor or component.')
+                    raise tp_status
 
                 for response in responses:
                     if response:

@benyamin-codez
Copy link
Author

benyamin-codez commented Mar 25, 2024

Update for 7.0.4:

--- plugin.video.youtube-7.0.4.nexus.1/resources/lib/youtube_plugin/youtube/client/youtube.py   2024-03-22 09:53:34.000000000 +1100
+++ plugin.video.youtube-7.0.4.nexus.1_threadpool_xml_fetch/resources/lib/youtube_plugin/youtube/client/youtube.py      2024-03-25 18:20:28.983669273 +1100
@@ -11,6 +11,7 @@
 from __future__ import absolute_import, division, unicode_literals
 
 import threading
+import concurrent.futures
 import xml.etree.ElementTree as ET
 from copy import deepcopy
 from itertools import chain, islice
@@ -1515,23 +1516,41 @@
 
                 responses = []
 
-                def fetch_xml(_url, _responses):
+                def fetch_xml(_e_args):
+                    _url = f'https://www.youtube.com/feeds/videos.xml?channel_id={_e_args[0]}'
+                    _responses = _e_args[1]
                     _response = self.request(_url, headers=headers)
                     if _response:
                         _responses.append(_response)
 
-                threads = []
-                for channel_id in sub_channel_ids:
-                    thread = threading.Thread(
-                        target=fetch_xml,
-                        args=('https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id,
-                              responses)
-                    )
-                    threads.append(thread)
-                    thread.start()
-
-                for thread in threads:
-                    thread.join(30)
+                def tpool_handler():
+                    with concurrent.futures.ThreadPoolExecutor() as fetch_tpool:
+                        e_args = []
+                        fetch_args = []
+                        for channel_id in sub_channel_ids:
+                            e_args = [channel_id, responses]
+                            fetch_args.append(e_args)
+                        try:
+                            futures = {fetch_tpool.submit(fetch_xml, task): task for task in fetch_args}
+                        except Exception as e:
+                            fetch_tpool.shutdown(cancel_futures=True)
+                            return e
+                        else:
+                            while len(futures) > 0:
+                                pend_futures = {}
+                                done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
+                                for tsk in done:
+                                    if tsk.exception(): raise tsk.exception()
+                                for tsk in not_done:
+                                    task = futures[tsk]
+                                    pend_futures[tsk] = task
+                                futures = pend_futures
+                            return True
+
+                tp_status = tpool_handler()
+                if not tp_status == True:
+                    self._context.log_error('FATAL : Failed to start Channel XML Fetch ThreadPool executor or component.')
+                    raise tp_status
 
                 for response in responses:
                     if response:

@benyamin-codez
Copy link
Author

Update for v7.0.5+beta.1:

--- plugin.video.youtube-7.0.5+beta.1.matrix.1/resources/lib/youtube_plugin/youtube/client/youtube.py   2024-03-28 16:41:56.000000000 +1100
+++ plugin.video.youtube-7.0.5+beta.1.matrix.1_threadpool_xml_fetch/resources/lib/youtube_plugin/youtube/client/youtube.py      2024-03-29 19:01:06.845441805 +1100
@@ -11,6 +11,7 @@
 from __future__ import absolute_import, division, unicode_literals
 
 import threading
+import concurrent.futures
 import xml.etree.ElementTree as ET
 from copy import deepcopy
 from itertools import chain, islice
@@ -1524,23 +1525,41 @@
 
                 responses = []
 
-                def fetch_xml(_url, _responses):
+                def fetch_xml(_e_args):
+                    _url = f'https://www.youtube.com/feeds/videos.xml?channel_id={_e_args[0]}'
+                    _responses = _e_args[1]
                     _response = self.request(_url, headers=headers)
                     if _response:
                         _responses.append(_response)
 
-                threads = []
-                for channel_id in sub_channel_ids:
-                    thread = threading.Thread(
-                        target=fetch_xml,
-                        args=('https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id,
-                              responses)
-                    )
-                    threads.append(thread)
-                    thread.start()
-
-                for thread in threads:
-                    thread.join(30)
+                def tpool_handler():
+                    with concurrent.futures.ThreadPoolExecutor() as fetch_tpool:
+                        e_args = []
+                        fetch_args = []
+                        for channel_id in sub_channel_ids:
+                            e_args = [channel_id, responses]
+                            fetch_args.append(e_args)
+                        try:
+                            futures = {fetch_tpool.submit(fetch_xml, task): task for task in fetch_args}
+                        except Exception as e:
+                            fetch_tpool.shutdown(cancel_futures=True)
+                            return e
+                        else:
+                            while len(futures) > 0:
+                                pend_futures = {}
+                                done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
+                                for tsk in done:
+                                    if tsk.exception(): raise tsk.exception()
+                                for tsk in not_done:
+                                    task = futures[tsk]
+                                    pend_futures[tsk] = task
+                                futures = pend_futures
+                            return True
+
+                tp_status = tpool_handler()
+                if not tp_status == True:
+                    self._context.log_error('FATAL : Failed to start Channel XML Fetch ThreadPool executor or component.')
+                    raise tp_status
 
                 for response in responses:
                     if response:

@benyamin-codez
Copy link
Author

benyamin-codez commented Apr 3, 2024

Update for 7.0.5+beta.4 (includes edit for #679):

--- plugin.video.youtube-7.0.5+beta.4/resources/lib/youtube_plugin/youtube/client/youtube.py
+++ plugin.video.youtube-7.0.5+beta.4_threadpool_xml_fetch/resources/lib/youtube_plugin/youtube/client/youtube.py
@@ -11,6 +11,7 @@
 from __future__ import absolute_import, division, unicode_literals
 
 import threading
+import concurrent.futures
 import xml.etree.ElementTree as ET
 from copy import deepcopy
 from itertools import chain, islice
@@ -1524,23 +1525,41 @@
 
                 responses = []
 
-                def fetch_xml(_url, _responses):
+                def fetch_xml(_e_args):
+                    _url = f'https://www.youtube.com/feeds/videos.xml?channel_id={_e_args[0]}'
+                    _responses = _e_args[1]
                     _response = self.request(_url, headers=headers)
                     if _response:
                         _responses.append(_response)
 
-                threads = []
-                for channel_id in sub_channel_ids:
-                    thread = threading.Thread(
-                        target=fetch_xml,
-                        args=('https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id,
-                              responses)
-                    )
-                    threads.append(thread)
-                    thread.start()
-
-                for thread in threads:
-                    thread.join(30)
+                def tpool_handler():
+                    with concurrent.futures.ThreadPoolExecutor() as fetch_tpool:
+                        e_args = []
+                        fetch_args = []
+                        for channel_id in sub_channel_ids:
+                            e_args = [channel_id, responses]
+                            fetch_args.append(e_args)
+                        try:
+                            futures = {fetch_tpool.submit(fetch_xml, task): task for task in fetch_args}
+                        except Exception as e:
+                            fetch_tpool.shutdown(cancel_futures=True)
+                            return e
+                        else:
+                            while len(futures) > 0:
+                                pend_futures = {}
+                                done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
+                                for tsk in done:
+                                    if tsk.exception(): raise tsk.exception()
+                                for tsk in not_done:
+                                    task = futures[tsk]
+                                    pend_futures[tsk] = task
+                                futures = pend_futures
+                            return True
+
+                tp_status = tpool_handler()
+                if not tp_status == True:
+                    self._context.log_error('FATAL : Failed to start Channel XML Fetch ThreadPool executor or component.')
+                    raise tp_status
 
                 do_encode = not current_system_version.compatible(19, 0)
 
--- plugin.video.youtube-7.0.5+beta.4/resources/lib/youtube_plugin/youtube/helper/video_info.py
+++ plugin.video.youtube-7.0.5+beta.4_threadpool_xml_fetch/resources/lib/youtube_plugin/youtube/helper/video_info.py
@@ -596,7 +596,7 @@
         48: '48000/1000',  # 48.00 fps
         50: '50000/1000',  # 50.00 fps
         60: '60000/1000',  # 60.00 fps
-    },
+    }
     FRACTIONAL_FPS_SCALE = {
         0: '{0}000/1000',  # --.00 fps
         24: '24000/1001',  # 23.976 fps

@benyamin-codez
Copy link
Author

Update for v7.0.5:

--- plugin.video.youtube-7.0.5_release/resources/lib/youtube_plugin/youtube/client/youtube.py
+++ plugin.video.youtube-7.0.5_threadpool_xml_fetch/resources/lib/youtube_plugin/youtube/client/youtube.py
@@ -11,6 +11,7 @@
 from __future__ import absolute_import, division, unicode_literals
 
 import threading
+import concurrent.futures
 import xml.etree.ElementTree as ET
 from copy import deepcopy
 from itertools import chain, islice
@@ -1524,23 +1525,41 @@
 
                 responses = []
 
-                def fetch_xml(_url, _responses):
+                def fetch_xml(_e_args):
+                    _url = f'https://www.youtube.com/feeds/videos.xml?channel_id={_e_args[0]}'
+                    _responses = _e_args[1]
                     _response = self.request(_url, headers=headers)
                     if _response:
                         _responses.append(_response)
 
-                threads = []
-                for channel_id in sub_channel_ids:
-                    thread = threading.Thread(
-                        target=fetch_xml,
-                        args=('https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id,
-                              responses)
-                    )
-                    threads.append(thread)
-                    thread.start()
-
-                for thread in threads:
-                    thread.join(30)
+                def tpool_handler():
+                    with concurrent.futures.ThreadPoolExecutor() as fetch_tpool:
+                        e_args = []
+                        fetch_args = []
+                        for channel_id in sub_channel_ids:
+                            e_args = [channel_id, responses]
+                            fetch_args.append(e_args)
+                        try:
+                            futures = {fetch_tpool.submit(fetch_xml, task): task for task in fetch_args}
+                        except Exception as e:
+                            fetch_tpool.shutdown(cancel_futures=True)
+                            return e
+                        else:
+                            while len(futures) > 0:
+                                pend_futures = {}
+                                done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
+                                for tsk in done:
+                                    if tsk.exception(): raise tsk.exception()
+                                for tsk in not_done:
+                                    task = futures[tsk]
+                                    pend_futures[tsk] = task
+                                futures = pend_futures
+                            return True
+
+                tp_status = tpool_handler()
+                if not tp_status == True:
+                    self._context.log_error('FATAL : Failed to start Channel XML Fetch ThreadPool executor or component.')
+                    raise tp_status
 
                 do_encode = not current_system_version.compatible(19, 0)

@benyamin-codez
Copy link
Author

Update for 7.0.6+beta.1:

--- plugin.video.youtube-7.0.6+beta.1_release/resources/lib/youtube_plugin/youtube/client/youtube.py
+++ plugin.video.youtube-7.0.6+beta.1_threadpool_xml_fetch/resources/lib/youtube_plugin/youtube/client/youtube.py
@@ -11,6 +11,7 @@
 from __future__ import absolute_import, division, unicode_literals
 
 import threading
+import concurrent.futures
 import xml.etree.ElementTree as ET
 from copy import deepcopy
 from itertools import chain, islice
@@ -1525,23 +1526,41 @@
 
                 responses = []
 
-                def fetch_xml(_url, _responses):
+                def fetch_xml(_e_args):
+                    _url = f'https://www.youtube.com/feeds/videos.xml?channel_id={_e_args[0]}'
+                    _responses = _e_args[1]
                     _response = self.request(_url, headers=headers)
                     if _response:
                         _responses.append(_response)
 
-                threads = []
-                for channel_id in sub_channel_ids:
-                    thread = threading.Thread(
-                        target=fetch_xml,
-                        args=('https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id,
-                              responses)
-                    )
-                    threads.append(thread)
-                    thread.start()
-
-                for thread in threads:
-                    thread.join(30)
+                def tpool_handler():
+                    with concurrent.futures.ThreadPoolExecutor() as fetch_tpool:
+                        e_args = []
+                        fetch_args = []
+                        for channel_id in sub_channel_ids:
+                            e_args = [channel_id, responses]
+                            fetch_args.append(e_args)
+                        try:
+                            futures = {fetch_tpool.submit(fetch_xml, task): task for task in fetch_args}
+                        except Exception as e:
+                            fetch_tpool.shutdown(cancel_futures=True)
+                            return e
+                        else:
+                            while len(futures) > 0:
+                                pend_futures = {}
+                                done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
+                                for tsk in done:
+                                    if tsk.exception(): raise tsk.exception()
+                                for tsk in not_done:
+                                    task = futures[tsk]
+                                    pend_futures[tsk] = task
+                                futures = pend_futures
+                            return True
+
+                tp_status = tpool_handler()
+                if not tp_status == True:
+                    self._context.log_error('FATAL : Failed to start Channel XML Fetch ThreadPool executor or component.')
+                    raise tp_status
 
                 do_encode = not current_system_version.compatible(19, 0)

@benyamin-codez
Copy link
Author

Update for v7.0.6.3:

--- plugin.video.youtube-7.0.6.3_release/resources/lib/youtube_plugin/youtube/client/youtube.py
+++ plugin.video.youtube-7.0.6.3_threadpool_xml_fetch/resources/lib/youtube_plugin/youtube/client/youtube.py
@@ -11,6 +11,7 @@
 from __future__ import absolute_import, division, unicode_literals
 
 import threading
+import concurrent.futures
 import xml.etree.ElementTree as ET
 from copy import deepcopy
 from itertools import chain, islice
@@ -1525,23 +1526,41 @@
 
                 responses = []
 
-                def fetch_xml(_url, _responses):
+                def fetch_xml(_e_args):
+                    _url = f'https://www.youtube.com/feeds/videos.xml?channel_id={_e_args[0]}'
+                    _responses = _e_args[1]
                     _response = self.request(_url, headers=headers)
                     if _response:
                         _responses.append(_response)
 
-                threads = []
-                for channel_id in sub_channel_ids:
-                    thread = threading.Thread(
-                        target=fetch_xml,
-                        args=('https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id,
-                              responses)
-                    )
-                    threads.append(thread)
-                    thread.start()
-
-                for thread in threads:
-                    thread.join(30)
+                def tpool_handler():
+                    with concurrent.futures.ThreadPoolExecutor() as fetch_tpool:
+                        e_args = []
+                        fetch_args = []
+                        for channel_id in sub_channel_ids:
+                            e_args = [channel_id, responses]
+                            fetch_args.append(e_args)
+                        try:
+                            futures = {fetch_tpool.submit(fetch_xml, task): task for task in fetch_args}
+                        except Exception as e:
+                            fetch_tpool.shutdown(cancel_futures=True)
+                            return e
+                        else:
+                            while len(futures) > 0:
+                                pend_futures = {}
+                                done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
+                                for tsk in done:
+                                    if tsk.exception(): raise tsk.exception()
+                                for tsk in not_done:
+                                    task = futures[tsk]
+                                    pend_futures[tsk] = task
+                                futures = pend_futures
+                            return True
+
+                tp_status = tpool_handler()
+                if not tp_status == True:
+                    self._context.log_error('FATAL : Failed to start Channel XML Fetch ThreadPool executor or component.')
+                    raise tp_status
 
                 do_encode = not current_system_version.compatible(19, 0)

MoojMidge added a commit to MoojMidge/plugin.video.youtube that referenced this issue May 13, 2024
MoojMidge added a commit to MoojMidge/plugin.video.youtube that referenced this issue May 13, 2024
@MoojMidge
Copy link
Collaborator

MoojMidge commented May 13, 2024

@benyamin-codez - I have had to make some more changes to My Subscriptions, that I hadn't planned on doing until more substantial work was done on moving other unrelated functionality to use the v1 API, so that parallel requests can be transparently handled by the plugin requests module.

Long story short - I have also implemented a more naive implementation of the thread pool fix you are using, as a stepping stone to the end goal. The main reason I didn't use your patch is because I am ostensibly trying to maintain basic functionality for Kodi 18 (using Python 2.7), in which concurrent.futures doesn't exist.

The git history is here:
https://github.com/MoojMidge/plugin.video.youtube/commits/master/resources/lib/youtube_plugin/youtube/client/youtube.py

Main threading related changes are:
MoojMidge@76b2954#diff-5ecde54fa534f31c07c435544e8497cb462674901a9d688be492bfbbde50b579R1560-R1615

Before I merge this, can you test? https://github.com/MoojMidge/plugin.video.youtube/releases/tag/v7.0.7%2Bbeta.2

If you want, you can also PR your patch and I can edit it so the forthcoming changes will merge cleanly.

@benyamin-codez
Copy link
Author

@MoojMidge, many thanks for that. I've checked on two RPi3B+ and your changes do resolve the problem in the OP.

Given your backwards compatibility restraints, I thought your solution was quite elegant. I should mention that it is demonstrably slower on initial cache population, but only marginally slower otherwise - at least it seems slightly slower. I'm not sure if this is due to your threading implementation or perhaps another change...?

Using concurrent.futures might result in faster execution - I couldn't say. If I did a PR for my patch, would you then use concurrent.futures in the non-Leia builds? If you want me to raise the PR, which repository do you want it in?

MoojMidge added a commit to MoojMidge/plugin.video.youtube that referenced this issue May 14, 2024
@MoojMidge
Copy link
Collaborator

MoojMidge commented May 14, 2024

I should mention that it is demonstrably slower on initial cache population, but only marginally slower otherwise - at least it seems slightly slower. I'm not sure if this is due to your threading implementation or perhaps another change...?

How are you measuring this? I think the issue is that I changed what was being done in each thread, so rather than just making the request, each thread was also processing the feed xml i.e. doing a bunch of cpu bound rather than I/O bound operations. Also there was an unnecessary lock for writing to the output, and also an excessive amount of attempts to acquire the input read/modify lock, and apparently those are fairly expensive operations.

Can you see if MoojMidge@513a9f4 restores the speed?

If I did a PR for my patch, would you then use concurrent.futures in the non-Leia builds? If you want me to raise the PR, which repository do you want it in?

On master. Won't be maintaining separate branches for older versions of Kodi, but there are lightweight compatibility shims for Leia. Will have to see what the easiest way to implement it will be, along with making some changes to the data structure used for argument passing.

@benyamin-codez
Copy link
Author

How are you measuring this? I think the issue is that I changed what was being done in each thread, so rather than just making the request, each thread was also processing the feed xml. Also there was an unnecessary lock for writing to the output, and also an excessive amount of attempts to acquire the input read/modify lock, and apparently those are fairly expensive operations.

Can you see if MoojMidge@513a9f4 restores the speed?

Yes, that was a significant improvement.

This round I obtained some rough run times using debug log time stamps. I forced cached = [] also. Your solution had a runtime of approx. 20s on initial run and 9s for subsequent runs, whereas 7.0.7+beta.1 with my patch runs for approx. 18s on initial run and 9s for subsequent runs. This is for approx. 215 channels plus 3 bookmarked channels.

I think the main difference is that your solution is executing the whole_job * max_threads, whereas mine is only executing each worker * max_threads. So your solution is quite efficient given you still need to do the locking for your args.pop().

On run-times, I should point out that the fetch has run a bit slower since I reverted to using your HTTPAdapter with pool_maxsize=5 rather than the default value of 10.

Given the above, do you still want me to submit the PR...? 2 seconds isn't much, but I guess a little noticeable. I don't mind submitting the PR if you think you'll make use of it.

MoojMidge added a commit to MoojMidge/plugin.video.youtube that referenced this issue May 23, 2024
@MoojMidge
Copy link
Collaborator

Can you see how the latest beta works? https://github.com/anxdpanic/plugin.video.youtube/releases/tag/v7.0.7%2Bbeta.2
If the speed is comparable, then can just run with this from now on, otherwise can look into it further based on the patch you have been using.

@benyamin-codez
Copy link
Author

@MoojMidge, I wasn't able to get v7.0.7+beta.2 working, per #769. 8^d

It looks like your solution has matured quite a bit since I last had a peek. I look forward to giving it a go.

@MoojMidge
Copy link
Collaborator

The intent is to make this a more generic helper method that can be used in other parts of the plugin, so still needs some more work, but let's see how it works for this initial application

@benyamin-codez
Copy link
Author

benyamin-codez commented May 24, 2024

The intent is to make this a more generic helper method that can be used in other parts of the plugin, so still needs some more work, but let's see how it works for this initial application

Well it is certainly very quick..! At 10 to 11 seconds a run using cached = [] it is noticeably faster. Very impressive. When you migrate this into a helper method, it will be interesting to see what other gains can be had. Thank you for resolving this @MoojMidge. I am well pleased with the outcome and happy for you to close the issue.

@MoojMidge
Copy link
Collaborator

It was a bit faster for me on a fast multi-core windows laptop, but good to hear that the same or better speed gains can be observed on more limited devices where it matters more.

@MoojMidge MoojMidge mentioned this issue May 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants