Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 5380 - Separate cleanAllRUV code into new file #5381

Merged
merged 1 commit into from
Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -1674,6 +1674,7 @@ libreplication_plugin_la_SOURCES = ldap/servers/plugins/replication/cl5_api.c \
ldap/servers/plugins/replication/repl_globals.c \
ldap/servers/plugins/replication/repl_opext.c \
ldap/servers/plugins/replication/repl_session_plugin.c \
ldap/servers/plugins/replication/repl_cleanallruv.c \
ldap/servers/plugins/replication/repl5_agmt.c \
ldap/servers/plugins/replication/repl5_agmtlist.c \
ldap/servers/plugins/replication/repl5_backoff.c \
Expand Down
41 changes: 29 additions & 12 deletions dirsrvtests/tests/suites/replication/cleanallruv_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,25 @@ def remove_supplier4_agmts(msg, topology_m4):

log.info('%s: remove all the agreements to supplier 4...' % msg)
repl = ReplicationManager(DEFAULT_SUFFIX)
# This will delete m4 frm the topo *and* remove all incoming agreements
# This will delete m4 from the topo *and* remove all incoming agreements
# to m4.
repl.remove_supplier(topology_m4.ms["supplier4"],
[topology_m4.ms["supplier1"], topology_m4.ms["supplier2"], topology_m4.ms["supplier3"]])

def remove_some_supplier4_agmts(msg, topology_m4):
"""Remove all the repl agmts to supplier4 except from supplier3. Used by
the force tests."""

log.info('%s: remove the agreements to supplier 4...' % msg)
repl = ReplicationManager(DEFAULT_SUFFIX)
# This will delete m4 from the topo *and* remove all incoming agreements
# to m4.
repl.remove_supplier(topology_m4.ms["supplier4"],
[topology_m4.ms["supplier1"], topology_m4.ms["supplier2"]])


def check_ruvs(msg, topology_m4, m4rid):
"""Check suppliers 1- 3 for supplier 4's rid."""
"""Check suppliers 1-3 for supplier 4's rid."""
for inst in (topology_m4.ms["supplier1"], topology_m4.ms["supplier2"], topology_m4.ms["supplier3"]):
clean = False
replicas = Replicas(inst)
Expand Down Expand Up @@ -172,7 +183,7 @@ def fin():
})
cruv_task.wait()
except ldap.UNWILLING_TO_PERFORM:
# In some casse we already cleaned rid4, so if we fail, it's okay
# In some cases we already cleaned rid4, so if we fail, it's okay
pass
restore_supplier4(topology_m4)
# Make sure everything works.
Expand Down Expand Up @@ -296,7 +307,6 @@ def test_clean_restart(topology_m4, m4rid):
log.info('test_clean_restart PASSED, restoring supplier 4...')


@pytest.mark.flaky(max_runs=2, min_passes=1)
def test_clean_force(topology_m4, m4rid):
"""Check that multiple tasks with a 'force' option work properly

Expand Down Expand Up @@ -326,15 +336,19 @@ def test_clean_force(topology_m4, m4rid):
topology_m4.ms["supplier3"].stop()

# Add a bunch of updates to supplier 4
m4_add_users = AddUsers(topology_m4.ms["supplier4"], 1500)
m4_add_users = AddUsers(topology_m4.ms["supplier4"], 10)
m4_add_users.start()
m4_add_users.join()

# Remove the agreements from the other suppliers that point to supplier 4
remove_some_supplier4_agmts("test_clean_force", topology_m4)

# Start supplier 3, it should be out of sync with the other replicas...
topology_m4.ms["supplier3"].start()

# Remove the agreements from the other suppliers that point to supplier 4
remove_supplier4_agmts("test_clean_force", topology_m4)
# Remove the agreement to replica 4
replica = Replicas(topology_m4.ms["supplier3"]).get(DEFAULT_SUFFIX)
replica.get_agreements().get("004").delete()

# Run the task, use "force" because supplier 3 is not in sync with the other replicas
# in regards to the replica 4 RUV
Expand Down Expand Up @@ -648,7 +662,6 @@ def test_stress_clean(topology_m4, m4rid):
ldbm_config.set('nsslapd-readonly', 'off')


@pytest.mark.flaky(max_runs=2, min_passes=1)
def test_multiple_tasks_with_force(topology_m4, m4rid):
"""Check that multiple tasks with a 'force' option work properly

Expand Down Expand Up @@ -680,16 +693,20 @@ def test_multiple_tasks_with_force(topology_m4, m4rid):
topology_m4.ms["supplier3"].stop()

# Add a bunch of updates to supplier 4
m4_add_users = AddUsers(topology_m4.ms["supplier4"], 1500)
m4_add_users = AddUsers(topology_m4.ms["supplier4"], 10)
m4_add_users.start()
m4_add_users.join()

# Disable supplier 4
# Remove the agreements from the other suppliers that point to supplier 4
remove_some_supplier4_agmts("test_multiple_tasks_with_force", topology_m4)

# Start supplier 3, it should be out of sync with the other replicas...
topology_m4.ms["supplier3"].start()

# Disable supplier 4
# Remove the agreements from the other suppliers that point to supplier 4
remove_supplier4_agmts("test_multiple_tasks_with_force", topology_m4)
# Remove the agreement to replica 4
replica = Replicas(topology_m4.ms["supplier3"]).get(DEFAULT_SUFFIX)
replica.get_agreements().get("004").delete()

# Run the task, use "force" because supplier 3 is not in sync with the other replicas
# in regards to the replica 4 RUV
Expand Down
53 changes: 31 additions & 22 deletions ldap/servers/plugins/replication/repl5.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@
#define ABORT_SESSION 1
#define SESSION_ABORTED 2

#define CLEANRUV "CLEANRUV"
#define CLEANRUVLEN 8
#define CLEANALLRUV "CLEANALLRUV"
#define CLEANALLRUVLEN 11
#define CLEANRUV_ACCEPTED "accepted"
#define CLEANRUV_REJECTED "rejected"
#define CLEANRUV_FINISHED "finished"
Expand Down Expand Up @@ -834,28 +838,6 @@ typedef struct _csngen_test_data
int replica_config_init(void);
void replica_config_destroy(void);
int get_replica_type(Replica *r);
int replica_execute_cleanruv_task_ext(Replica *r, ReplicaId rid);
void add_cleaned_rid(cleanruv_data *clean_data);
int is_cleaned_rid(ReplicaId rid);
int32_t check_and_set_cleanruv_task_count(ReplicaId rid);
int32_t check_and_set_abort_cleanruv_task_count(void);
int replica_cleanall_ruv_abort(Slapi_PBlock *pb, Slapi_Entry *e, Slapi_Entry *eAfter, int *returncode, char *returntext, void *arg);
void replica_cleanallruv_thread_ext(void *arg);
void stop_ruv_cleaning(void);
int task_aborted(void);
void replica_abort_task_thread(void *arg);
void remove_cleaned_rid(ReplicaId rid);
int process_repl_agmts(Replica *replica, int *agmt_info, char *oid, Slapi_Task *task, struct berval *payload, int op);
int decode_cleanruv_payload(struct berval *extop_value, char **payload);
struct berval *create_cleanruv_payload(char *value);
void ruv_get_cleaned_rids(RUV *ruv, ReplicaId *rids);
void add_aborted_rid(ReplicaId rid, Replica *r, char *repl_root, char *certify_all, PRBool original_task);
int is_task_aborted(ReplicaId rid);
void delete_aborted_rid(Replica *replica, ReplicaId rid, char *repl_root, char *certify_all, PRBool original_task, int skip);
int is_pre_cleaned_rid(ReplicaId rid);
void set_cleaned_rid(ReplicaId rid);
void cleanruv_log(Slapi_Task *task, int rid, char *task_type, int sev_level, char *fmt, ...);
char *replica_cleanallruv_get_local_maxcsn(ReplicaId rid, char *base_dn);

/* replutil.c */
LDAPControl *create_managedsait_control(void);
Expand Down Expand Up @@ -914,4 +896,31 @@ int repl_session_plugin_call_recv_acquire_cb(const char *repl_area, int is_total
int repl_session_plugin_call_reply_acquire_cb(const char *repl_area, int is_total, char **data_guid, struct berval **data);
void repl_session_plugin_call_destroy_agmt_cb(const Repl_Agmt *ra);

/* repl_cleanallruv.c */
int32_t cleanallruv_init(void);
int replica_execute_cleanruv_task_ext(Replica *r, ReplicaId rid);
void add_cleaned_rid(cleanruv_data *clean_data);
int is_cleaned_rid(ReplicaId rid);
int32_t check_and_set_cleanruv_task_count(ReplicaId rid);
int32_t check_and_set_abort_cleanruv_task_count(void);
int replica_cleanall_ruv_abort(Slapi_PBlock *pb, Slapi_Entry *e, Slapi_Entry *eAfter, int *returncode, char *returntext, void *arg);
void replica_cleanallruv_thread_ext(void *arg);
void stop_ruv_cleaning(void);
int task_aborted(void);
void replica_abort_task_thread(void *arg);
void remove_cleaned_rid(ReplicaId rid);
int process_repl_agmts(Replica *replica, int *agmt_info, char *oid, Slapi_Task *task, struct berval *payload, int op);
int decode_cleanruv_payload(struct berval *extop_value, char **payload);
struct berval *create_cleanruv_payload(char *value);
void ruv_get_cleaned_rids(RUV *ruv, ReplicaId *rids);
void add_aborted_rid(ReplicaId rid, Replica *r, char *repl_root, char *certify_all, PRBool original_task);
int is_task_aborted(ReplicaId rid);
void delete_aborted_rid(Replica *replica, ReplicaId rid, char *repl_root, char *certify_all, PRBool original_task, int skip);
int is_pre_cleaned_rid(ReplicaId rid);
void set_cleaned_rid(ReplicaId rid);
void cleanruv_log(Slapi_Task *task, int rid, char *task_type, int sev_level, char *fmt, ...);
char *replica_cleanallruv_get_local_maxcsn(ReplicaId rid, char *base_dn);
int replica_execute_cleanruv_task(Replica *r, ReplicaId rid, char *returntext);
int replica_execute_cleanall_ruv_task(Replica *r, ReplicaId rid, Slapi_Task *task, const char *force_cleaning, PRBool original_task, char *returntext);

#endif /* _REPL5_H_ */
Loading