Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(replication): Implement queues and replication controller #22

Merged
merged 1 commit into from
Nov 12, 2019

Conversation

adrienverge
Copy link
Owner

@adrienverge adrienverge commented Oct 29, 2019

Try to improve performance by optimizing how many concurrent
replications are done in parallel.

This commit implements:

  1. A queue "of work to do", where "databases to process" are pushed step
    by step, one at a time.

  2. A ReplicationControl object, that will be informed of past
    successes and errors, and will dynamically adapt the number of
    concurrent replications and timeout values.

    The number of concurrent replications will start at 4, then will
    continuously increase (with a maximum of 128), until an error is
    detected. After that, it should stabilize around the last known value
    without any error.

I've tried other implementations, for example dynamically launching
pools of one task on the fly, for each database to process
(multiprocessing.Pool(processes=1).apply_async(...)); or without using
an output queue (out_queue), but performance was the best with the
current implementation.

Performance test on my laptop: doing a backup (time coucharchive -v create --from $URL -o /dev/null) of a CouchDB server with 1000 small
databases with one document each:

  • master:
    50s total (9,37s user 2,71s system 24% cpu)

  • this commit (modified, max 64 workers):
    60s total (8,52s user 2,16s system 17% cpu)

  • this commit (max 128 workers):
    35s total (7,03s user 2,23s system 26% cpu)

  • this commit (modified, max 256 workers):
    24s total (9,85s user 4,03s system 56% cpu)


Previous version:

The ReplicationControl object will be informed of past successes and
errors, and will dynamically adapt the number of concurrent replications
and timeout values.

The number of concurrent replications will start at 4, then will
continuously increase (with a maximum of 128), until an error is
detected. After that, it should stabilize around the last known value
without any error.

@H--o-l
Copy link
Collaborator

H--o-l commented Oct 30, 2019

Hey @adrienverge !

I'm playing with your new implementation right now, and I think there may be something wong.
Without changing anything else in my configuration, before I had the following CPU profile (when creating an archive):
before
Now I have something that is taking much longueur and look like that:
after
In both screens the bleu machine is the source, the yellow the target.

I have tried to max out parallel workers to 64 like it was before, but I have exactly the same strange CPU profile.

Last things, from logging I made, ideal_number_of_workers is always be at the maximum value, and ideal_sleep_value always stay at zero.

I will try to find what could be going wrong!

logging.debug('Currently running %d replication workers'
% len(workers))

time.sleep(1)
Copy link
Collaborator

@H--o-l H--o-l Oct 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For info I did a test where I lowered this value to 0.01 but it did not changed the outcome:
toto
(source in yellow, target in bleu)

@adrienverge
Copy link
Owner Author

Thanks a lot @H--o-l for the feedback!

Would it be possible for you to run another test, but ceiling of the number of workers to 64?

@@ -273,8 +279,8 @@ class ReplicationControl(object):
             min_number_with_error = min(e[1] for e in self.last_errors)
             ideal_number = int(0.9 * min_number_with_error)
 
-        # ... and stay within [1, 128]
-        ideal_number = max(1, min(128, ideal_number))
+        # ... and stay within [1, 64]
+        ideal_number = max(1, min(64, ideal_number))
 
         return ideal_number
 

@H--o-l
Copy link
Collaborator

H--o-l commented Oct 31, 2019

Hey !
Yes sorry I think I wasn't clear, but it was the second test I did yesterday and it did not work, I had the exact same result.
Any other idea?

@adrienverge
Copy link
Owner Author

Could you reproduce the same result at least two times, i.e. confirm that "this PR + 64 workers max" behaves differently than master?

I'm surprised, because with "this PR + 64 workers max" should be equivalent. Unless there's some sort of overhead added somewhere, maybe due to multiprocessing.Pool(processes=1)?

@H--o-l
Copy link
Collaborator

H--o-l commented Nov 5, 2019

Hey!
Thanks for the feedback and I agree it's strange.
I already did the test in these condition two time, you can see it in the graph below.
Beside CPU graph I checked each time the script output after an hour and confirm that it goes at least two time slower.

toto

But I'm ok to do it a third time if deemed needed!
But I also think we should start to try to reproduce locally and look for clues of the issue.
(It's what I attend to do)

@adrienverge
Copy link
Owner Author

Thanks. Do I understand correctly that:

  • on the left, blue line is the replicator machine?
  • in the middle, red line is the replicator machine?
  • on the right, ??

I would like to know, for both "master" and "this PR+64", which machine is "limiting", i.e. where is the CPU bottleneck!

@H--o-l
Copy link
Collaborator

H--o-l commented Nov 5, 2019

Ok, no problem.
In fact, colors doesn't change, it's one continuous graph.
It's cut only when machines are stopped.
But I should have give you the the legend:

  • blue: source cluster machine, used by coucharchive
  • green: second machine of the source cluster
  • yellow: third machine of the source cluster
  • red: target machine were coucharchive run and the backup is created.

The graph is display ~ 48hours (don't trust the upperleft 1w, I zoom in).

For "master", CPU bottleneck was on the blue machine, i.e the source.
For "PR+64", the CPU bottleneck is on the red machine, i.e the target.

Don't hesitate to ask more details I will be happy to share!

@adrienverge
Copy link
Owner Author

Thanks for the details!

I did the following test:

  • Launch an empty CouchDB server, and create 1000 bases with one small document in each, using the following script:

    #!/bin/bash
    URL=http://root:peBf9iyTqx@localhost:48469
    for i in $(seq 1000); do
      uuid=$(uuidgen)
      curl -X PUT $URL/useless_$uuid
      curl -X PUT $URL/useless_$uuid/document -d '{"type": "document"}'
    done
  • Try coucharchive create on this server, and compare "master" with this PR+64.
    I measured the total time taken, and CPU usage, using:

    time ./coucharchive create --from $URL -o /dev/null
    

I ran the command 3 times for each, interleaved (once master, once PR, once master, etc.):

  • With coucharchive "master":

    ./coucharchive-master create --from $URL -o /dev/null  9,17s user 2,64s system 24% cpu 48,148 total
    ./coucharchive-master create --from $URL -o /dev/null  8,92s user 2,67s system 24% cpu 48,019 total
    ./coucharchive-master create --from $URL -o /dev/null  8,72s user 2,52s system 23% cpu 47,370 total
    

    → total time: 48 seconds on average
    → total CPU time: 12 s (most of the time, coucharchive is waiting for network, IO, etc.)

  • With coucharchive "this PR + 64 workers":

    ./coucharchive create --from $URL -o /dev/null  84,57s user 41,54s system 107% cpu 1:57,23 total
    ./coucharchive create --from $URL -o /dev/null  77,27s user 37,70s system 101% cpu 1:53,38 total
    ./coucharchive create --from $URL -o /dev/null  80,80s user 38,89s system 101% cpu 1:58,19 total
    

    → total time: 2 minutes on average
    → total CPU time: 120 s (most of the time, coucharchive is using CPU, i.e. running Python code!)

So I'll investigate further, to see where does the overhead come from. The high value of system CPU time suggests that forking and communicating with pool processes (hence, making system calls) is the issue.
Thanks a lot @H--o-l for testing and reporting the problem!

@adrienverge adrienverge changed the title feat(replication): Implement a replication controller feat(replication): Implement queues and replication controller Nov 5, 2019
@adrienverge
Copy link
Owner Author

@H--o-l I've tested multiple implementations, using different types of multiprocessing.Pool work handling, and also with queues.

Using queues, I found an implementation that does not induce too much overhead (see user and system CPU usage in tests below), and let the replication controller do its job. I've reworked the PR/commit message, and pushed new code.

Performance test on my laptop: I see a 4× speed increase compared to last PR version. In details: doing a backup (time coucharchive -v create --from $URL -o /dev/null) of a CouchDB server with 1000 small databases with one document each:

  • master:
    50s total (9,37s user 2,71s system 24% cpu)

  • this commit (modified, max 64 workers):
    60s total (8,52s user 2,16s system 17% cpu)

  • this commit (max 128 workers):
    35s total (7,03s user 2,23s system 26% cpu)

  • this commit (modified, max 256 workers):
    24s total (9,85s user 4,03s system 56% cpu)

I'd be very happy to see what your "real-life" machines say about that 🙂

PS: I've saved previous implementation, it's visible on branch feat/replication-control-bak-with-apply_async.

@H--o-l
Copy link
Collaborator

H--o-l commented Nov 6, 2019

I was able to do a backup in 2h20m versus 2h40m before in my test configuration :-)
The improvement may be in the error marging, but at least it's not slower than before.
Congratulation!
I will look a bit at the code and I will come back to you ;-)
(PS: I use the PR version without modification)

coucharchive Outdated
def report_error(self):
self.last_errors.append((int(time.time()),
self.running_replications.value))
self.last_errors = self._recent_events_only(self.last_errors)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey!
I think there is a small issue here, as the self.last_errors list is clean up only when new error are reported, i.e at line 276, values that are in the list can be really old.
May I suggest keeping the recent_events_only just before using the list, i.e at line 275? (same for self.last_successes list but success are reported way more often than failure)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Genius!

Your proposal seems idea, can you fix and push to the branch?

Copy link
Collaborator

@H--o-l H--o-l Nov 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey!

So I try to do it the simpliest possible way:

--- a/coucharchive
+++ b/coucharchive
@@ -268,11 +268,13 @@ class ReplicationControl(object):
         ideal_number = 4
 
         # ... then use 2 × the last successful value
+        self.last_successes = self._recent_events_only(self.last_successes)
         if self.last_successes:
             best_successful_number = max(e[1] for e in self.last_successes)
             ideal_number = 2 * best_successful_number
 
         # ... but if there were errors, take 0.9 × the lowest error value
+        self.last_errors = self._recent_events_only(self.last_errors)
         if self.last_errors:
             min_number_with_error = min(e[1] for e in self.last_errors)
             ideal_number = int(0.9 * min_number_with_error)
@@ -286,6 +288,7 @@ class ReplicationControl(object):
         # If there was 1 error within last 5 minutes, pause for 5 seconds,
         # if there were 2 errors within last 5 minutes, pause for 10 seconds,
         # if there were 3 errors within last 5 minutes, pause for 15 seconds...
+        self.last_errors = self._recent_events_only(self.last_errors)
         wait = 5 * len(self.last_errors)
         return min(60, wait)  # stay within [5 s, 60 s]

But, when I do that, I have the following error that is raised in the report_success() function which I did not change:

  File "./coucharchive", line 507, in replicate_one_database
    control.report_success()
  File "./coucharchive", line 254, in report_success
    self.running_replications.value))
  File "<string>", line 2, in append
  File "/usr/lib64/python3.7/multiprocessing/managers.py", line 815, in _callmethod
    self._connect()
  File "/usr/lib64/python3.7/multiprocessing/managers.py", line 802, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/lib64/python3.7/multiprocessing/connection.py", line 492, in Client
    c = SocketClient(address)
  File "/usr/lib64/python3.7/multiprocessing/connection.py", line 619, in SocketClient
    s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory
INFO: chatterton_common_billables_ru: done
Process ForkPoolWorker-10:
Traceback (most recent call last):
  File "/usr/lib64/python3.7/multiprocessing/managers.py", line 811, in _callmethod
    conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

Here is the only popular discussion I find in the Internet, but for us the case is clearly more subtle:
https://stackoverflow.com/questions/25455462/share-list-between-process-in-python-server

So, one solution I find is to read lists self.last_successes and self.last_errors inside ideal_*() functions, but not write inside them.
It's not ideal, but finding another solution seems to me to be a long task.
Here is the diff pushed in a FIXUP commit:

 --- a/coucharchive
+++ b/coucharchive
@@ -268,13 +268,15 @@ class ReplicationControl(object):
         ideal_number = 4
 
         # ... then use 2 × the last successful value
-        if self.last_successes:
-            best_successful_number = max(e[1] for e in self.last_successes)
+        last_successes = self._recent_events_only(self.last_successes)
+        if last_successes:
+            best_successful_number = max(e[1] for e in last_successes)
             ideal_number = 2 * best_successful_number
 
         # ... but if there were errors, take 0.9 × the lowest error value
-        if self.last_errors:
-            min_number_with_error = min(e[1] for e in self.last_errors)
+        last_errors = self._recent_events_only(self.last_errors)
+        if last_errors:
+            min_number_with_error = min(e[1] for e in last_errors)
             ideal_number = int(0.9 * min_number_with_error)
 
         # ... and stay within [1, MAX_NUMBER_OF_WORKERS]
@@ -286,7 +288,8 @@ class ReplicationControl(object):
         # If there was 1 error within last 5 minutes, pause for 5 seconds,
         # if there were 2 errors within last 5 minutes, pause for 10 seconds,
         # if there were 3 errors within last 5 minutes, pause for 15 seconds...
-        wait = 5 * len(self.last_errors)
+        last_errors = self._recent_events_only(self.last_errors)
+        wait = 5 * len(last_errors)
         return min(60, wait)  # stay within [5 s, 60 s]

What do you think?
For info I launch a test on my testing servers to see what's what.

Copy link
Owner Author

@adrienverge adrienverge Nov 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, when I do that, I have the following error that is raised in the report_success() function which I did not change:

Hehe, there was a trick here! We cannot simply replace the array with another: it's not a [], it's a multiprocessing.Manager().list(). If we do last_successes = ..., it won't be a multiprocessing.Manager().list() anymore, so it breaks when accessed concurrently.

In order to replace its values, without destroying/recreating it, one can do last_successes[:] = ....

I replaced your fixup commit with the following:

--- a/coucharchive
+++ b/coucharchive
@@ -252,18 +252,21 @@ class ReplicationControl(object):
     def report_success(self):
         self.last_successes.append((int(time.time()),
                                     self.running_replications.value))
-        self.last_successes = self._recent_events_only(self.last_successes)
 
     def report_error(self):
         self.last_errors.append((int(time.time()),
                                  self.running_replications.value))
-        self.last_errors = self._recent_events_only(self.last_errors)
 
-    def _recent_events_only(self, events):
+    def _drop_old_events(self):
         five_min_ago = time.time() - 5 * 60
-        return [e for e in events if e[0] > five_min_ago]
+        self.last_successes[:] = [e for e in self.last_successes
+                                  if e[0] > five_min_ago]
+        self.last_errors[:] = [e for e in self.last_errors
+                               if e[0] > five_min_ago]
 
     def ideal_number_of_replications(self):
+        self._drop_old_events()
+
         # Start with 4 concurrent replications...
         ideal_number = 4
 
@@ -283,6 +286,8 @@ class ReplicationControl(object):
         return ideal_number
 
     def ideal_sleep_value(self):
+        self._drop_old_events()
+
         # If there was 1 error within last 5 minutes, pause for 5 seconds,
         # if there were 2 errors within last 5 minutes, pause for 10 seconds,
         # if there were 3 errors within last 5 minutes, pause for 15 seconds...

For info I launch a test on my testing servers to see what's what.

Thanks! Can't wait to see the results.

Try to improve performance by optimizing how many concurrent
replications are done in parallel.

This commit implements:
1. A queue "of work to do", where "databases to process" are pushed step
   by step, one at a time.
2. A `ReplicationControl` object, that will be informed of past
   successes and errors, and will dynamically adapt the number of
   concurrent replications and timeout values.

   The number of concurrent replications will start at 4, then will
   continuously increase (with a maximum of 128), until an error is
   detected. After that, it should stabilize around the last known value
   without any error.

I've tried other implementations, for example dynamically launching
pools of one task on the fly, for each database to process
(`multiprocessing.Pool(processes=1).apply_async(...)`); or without using
an output queue (`out_queue`), but performance was the best with the
current implementation.

Performance test on my laptop: doing a backup (`time coucharchive -v
create --from $URL -o /dev/null`) of a CouchDB server with 1000 small
databases with one document each:

- master:
  50s total (9,37s user 2,71s system 24% cpu)

- this commit (modified, max 64 workers):
  60s total (8,52s user 2,16s system 17% cpu)

- this commit (max 128 workers):
  35s total (7,03s user 2,23s system 26% cpu)

- this commit (modified, max 256 workers):
  24s total (9,85s user 4,03s system 56% cpu)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants