Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

add no slave purge

  • Loading branch information...
commit 084b4d9fca69d3edaa2d49b6bbd29da5f56dca1e 1 parent fcfb19d
@fangjian601 authored
Showing with 63 additions and 8 deletions.
  1. +10 −1 controller.py
  2. +6 −2 model.py
  3. +20 −4 restful.py
  4. +27 −1 worker.py
View
11 controller.py
@@ -139,4 +139,13 @@ def update_binlog_window(self, replica_id, binlog_window):
if dbreplica is not None:
dbreplica.binlog_window = binlog_window
session.commit()
- session.close()
+ session.close()
+
+ def update_no_slave_purge(self, replica_id, no_slave_purge):
+ session = self.persistence.session()
+ query = session.query(DBReplica)
+ dbreplica = query.get(replica_id)
+ if dbreplica is not None:
+ dbreplica.no_slave_purge = no_slave_purge
+ session.commit()
+ session.close()
View
8 model.py
@@ -59,12 +59,16 @@ class DBReplica(ORMBase):
nullable=False)
binlog_window = sqlalchemy.Column(sqlalchemy.Integer,
nullable=False)
+ no_slave_purge = sqlalchemy.Column(sqlalchemy.Integer,
+ nullable=False)
def __init__(self, id, master, slaves,
check_period = 60,
- binlog_window = 0):
+ binlog_window = 0,
+ no_slave_purge = 1):
self.id = id
self.master = master
self.slaves = slaves
self.check_period = check_period
- self.binlog_window = binlog_window
+ self.binlog_window = binlog_window
+ self.no_slave_purge = no_slave_purge
View
24 restful.py
@@ -85,13 +85,14 @@ def _init_worker(self):
@cherrypy.expose
@Helper.restful
def add(self, replica_id, master, slaves,
- check_period=60, binlog_window=0):
+ check_period=60, binlog_window=0, no_slave_purge=1):
if self.workers.has_key(replica_id):
raise Exception("duplicate replication")
else:
dbreplica = DBReplica(replica_id, master, slaves,
int(check_period),
- int(binlog_window))
+ int(binlog_window),
+ int(no_slave_purge))
worker = ReplicaWorker(self.persistence, dbreplica)
worker.start()
self.workers[replica_id] = worker
@@ -164,7 +165,7 @@ def update_check_period(self, replica_id, check_period):
raise Exception("replication not exist")
else:
self.controller.update_check_period(replica_id,
- check_period)
+ int(check_period))
self.workers[replica_id].stop()
worker = ReplicaWorker(self.persistence,
self.controller.get(replica_id))
@@ -179,13 +180,28 @@ def update_binlog_window(self, replica_id, binlog_window):
raise Exception("replication not exist")
else:
self.controller.update_binlog_window(replica_id,
- binlog_window)
+ int(binlog_window))
self.workers[replica_id].stop()
worker = ReplicaWorker(self.persistence,
self.controller.get(replica_id))
worker.start()
self.workers[replica_id] = worker
return "ok"
+
+ @cherrypy.expose
+ @Helper.restful
+ def update_no_slave_purge(self, replica_id, no_slave_purge):
+ if not self.workers.has_key(replica_id):
+ raise Exception("replication not exist")
+ else:
+ self.controller.update_no_slave_purge(replica_id,
+ int(no_slave_purge))
+ self.workers[replica_id].stop()
+ worker = ReplicaWorker(self.persistence,
+ self.controller.get(replica_id))
+ worker.start()
+ self.workers[replica_id] = worker
+ return "ok"
@cherrypy.expose
@Helper.restful
View
28 worker.py
@@ -84,9 +84,34 @@ def _target_master_binlog(self, earliest_slave_binlog):
raise ReplicaWorkerException("slave binary log not" +
" in master binary logs")
+ def _do_no_slave_purge(self):
+ master_binlogs = self.master_handler.binlogs_sorted()
+ binlog_window = self.dbreplica.binlog_window
+ binlog_length = len(master_binlogs)
+ if binlog_window + 1 < binlog_length:
+ target_binlog = master_binlogs[binlog_length-binlog_window-1]
+ self.logger.info(("start no slave purging, " +
+ "earliest_master_binlog %s, " +
+ "target_master_binlog %s, " +
+ "latest_master_binlog %s") %
+ (master_binlogs[0][1],
+ target_binlog[1],
+ master_binlogs[binlog_length-1][1]))
+ self.master_handler.purge(target_binlog[1])
+ else:
+ self.logger.info(("skip no slave purging, " +
+ "earliest_master_binlog %s, " +
+ "latest_master_binlog %s, " +
+ "binlog window %s") %
+ (master_binlogs[0][1],
+ master_binlogs[binlog_length-1][1],
+ binlog_window))
+
def _do_purge(self):
- if len(self.slaves) <= 0:
+ if len(self.slaves) <= 0 and self.dbreplica.no_slave_purge == 0:
self.logger.info("skip purge, no slave")
+ elif len(self.slaves) <= 0 and self.dbreplica.no_slave_purge != 0:
+ self._do_no_slave_purge()
else:
slave_binlog = self._earliest_slave_binlog()
(skip,
@@ -115,6 +140,7 @@ def _do_purge(self):
earliest_master_binlog[1],
latest_master_binlog[1],
self.dbreplica.binlog_window))
+
def purge(self):
if not self.lock.acquire(False):
raise ReplicaWorkerException("another purge is running")
Please sign in to comment.
Something went wrong with that request. Please try again.