From 5928f679fd9de3ff2b01c845b4be0dec1001bca0 Mon Sep 17 00:00:00 2001 From: sunzhengfang Date: Fri, 29 Mar 2019 19:12:40 +0800 Subject: [PATCH] #1079 set alert info into a blockqueue --- dble_checkstyle_suppression.xml | 1 + .../java/com/actiontech/dble/DbleServer.java | 3 +- .../java/com/actiontech/dble/alarm/Alert.java | 19 +- .../dble/alarm/AlertBlockQueue.java | 965 ++++++++++++++++++ .../actiontech/dble/alarm/AlertSender.java | 70 ++ .../com/actiontech/dble/alarm/AlertTask.java | 43 + .../com/actiontech/dble/alarm/AlertUtil.java | 50 +- .../com/actiontech/dble/alarm/NoAlert.java | 12 +- .../dble/alarm/ToResolveContainer.java | 1 + .../com/actiontech/dble/alarm/UcoreAlert.java | 48 +- .../datasource/PhysicalDatasource.java | 26 +- .../backend/heartbeat/MySQLHeartbeat.java | 2 +- .../transaction/xa/XACommitNodesHandler.java | 5 +- .../xa/XARollbackNodesHandler.java | 2 +- .../dble/backend/mysql/xa/XAStateLog.java | 1 + .../recovery/impl/FileSystemRepository.java | 11 +- .../dble/cluster/bean/ClusterAlertBean.java | 55 +- .../handler/CreateDatabaseHandler.java | 5 +- .../dble/meta/ProxyMetaManager.java | 4 +- .../meta/table/AbstractTablesMetaHandler.java | 5 +- .../dble/meta/table/GetNodeTablesHandler.java | 6 +- .../meta/table/MultiTablesMetaHandler.java | 18 +- .../meta/table/TablesMetaCheckHandler.java | 6 +- .../table/old/AbstractTableMetaHandler.java | 24 +- .../old/SchemaDefaultNodeTablesHandler.java | 5 +- .../meta/table/old/TableMetaCheckHandler.java | 6 +- .../dble/server/status/AlertManager.java | 33 + .../dble/server/util/GlobalTableUtil.java | 12 +- 28 files changed, 1278 insertions(+), 160 deletions(-) create mode 100644 src/main/java/com/actiontech/dble/alarm/AlertBlockQueue.java create mode 100644 src/main/java/com/actiontech/dble/alarm/AlertSender.java create mode 100644 src/main/java/com/actiontech/dble/alarm/AlertTask.java create mode 100644 src/main/java/com/actiontech/dble/server/status/AlertManager.java diff --git a/dble_checkstyle_suppression.xml b/dble_checkstyle_suppression.xml index b9b0f93f4b..1e3ad6a488 100644 --- a/dble_checkstyle_suppression.xml +++ b/dble_checkstyle_suppression.xml @@ -13,6 +13,7 @@ + diff --git a/src/main/java/com/actiontech/dble/DbleServer.java b/src/main/java/com/actiontech/dble/DbleServer.java index f612069b87..e0b41229b5 100644 --- a/src/main/java/com/actiontech/dble/DbleServer.java +++ b/src/main/java/com/actiontech/dble/DbleServer.java @@ -36,6 +36,7 @@ import com.actiontech.dble.route.RouteService; import com.actiontech.dble.route.sequence.handler.*; import com.actiontech.dble.server.ServerConnectionFactory; +import com.actiontech.dble.server.status.AlertManager; import com.actiontech.dble.server.status.OnlineLockStatus; import com.actiontech.dble.server.status.SlowQueryLog; import com.actiontech.dble.server.util.GlobalTableUtil; @@ -364,10 +365,10 @@ public void startup() throws IOException { if (system.getEnableSlowLog() == 1) { SlowQueryLog.getInstance().setEnableSlowLog(true); } - AlertUtil.initAlert(); if (system.getEnableAlert() == 1) { AlertUtil.switchAlert(true); } + AlertManager.getInstance().startAlert(); if (aio) { int processorCount = frontProcessorCount + backendProcessorCount; LOGGER.info("using aio network handler "); diff --git a/src/main/java/com/actiontech/dble/alarm/Alert.java b/src/main/java/com/actiontech/dble/alarm/Alert.java index d176b78fdd..3bcda59c95 100644 --- a/src/main/java/com/actiontech/dble/alarm/Alert.java +++ b/src/main/java/com/actiontech/dble/alarm/Alert.java @@ -5,14 +5,23 @@ package com.actiontech.dble.alarm; -import java.util.Map; +import com.actiontech.dble.cluster.bean.ClusterAlertBean; + public interface Alert { enum AlertLevel { NOTICE, WARN, CRITICAL } - void alertSelf(String code, AlertLevel level, String desc, Map labels); - void alert(String code, AlertLevel level, String desc, String alertComponentType, String alertComponentId, Map labels); - boolean alertResolve(String code, AlertLevel level, String alertComponentType, String alertComponentId, Map labels); - boolean alertSelfResolve(String code, AlertLevel level, Map labels); + + enum AlertType { + ALERT, ALERT_RESOLVE, ALERT_SELF, ALERT_SELF_RESOLVE + } + + void alertSelf(ClusterAlertBean bean); + + void alert(ClusterAlertBean bean); + + boolean alertResolve(ClusterAlertBean bean); + + boolean alertSelfResolve(ClusterAlertBean bean); } diff --git a/src/main/java/com/actiontech/dble/alarm/AlertBlockQueue.java b/src/main/java/com/actiontech/dble/alarm/AlertBlockQueue.java new file mode 100644 index 0000000000..b8441302c7 --- /dev/null +++ b/src/main/java/com/actiontech/dble/alarm/AlertBlockQueue.java @@ -0,0 +1,965 @@ +package com.actiontech.dble.alarm; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.AbstractQueue; +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.function.Consumer; + +/** + * copy from jre LinkedBlockingQueue change the offer when the queue is full + * + * @param + */ +public class AlertBlockQueue extends AbstractQueue + implements BlockingQueue, java.io.Serializable { + private static final long serialVersionUID = -6903933977591709194L; + private static final Logger LOGGER = LoggerFactory.getLogger(AlertBlockQueue.class); + + /** + * Linked list node class + */ + static class Node { + E item; + + /** + * One of: + * - the real successor Node + * - this Node, meaning the successor is head.next + * - null, meaning there is no successor (this is the last node) + */ + Node next; + + Node(E x) { + item = x; + } + } + + /** + * The capacity bound, or Integer.MAX_VALUE if none + */ + private final int capacity; + + /** + * Current number of elements + */ + private final AtomicInteger count = new AtomicInteger(); + + /** + * Head of linked list. + * Invariant: head.item == null + */ + transient Node head; + + /** + * Tail of linked list. + * Invariant: last.next == null + */ + private transient Node last; + + /** + * Lock held by take, poll, etc + */ + private final ReentrantLock takeLock = new ReentrantLock(); + + /** + * Wait queue for waiting takes + */ + private final Condition notEmpty = takeLock.newCondition(); + + /** + * Lock held by put, offer, etc + */ + private final ReentrantLock putLock = new ReentrantLock(); + + /** + * Wait queue for waiting puts + */ + private final Condition notFull = putLock.newCondition(); + + /** + * Signals a waiting take. Called only from put/offer (which do not + * otherwise ordinarily lock takeLock.) + */ + private void signalNotEmpty() { + final ReentrantLock takeLock = this.takeLock; + takeLock.lock(); + try { + notEmpty.signal(); + } finally { + takeLock.unlock(); + } + } + + /** + * Signals a waiting put. Called only from take/poll. + */ + private void signalNotFull() { + final ReentrantLock putLock = this.putLock; + putLock.lock(); + try { + notFull.signal(); + } finally { + putLock.unlock(); + } + } + + /** + * Links node at end of queue. + * + * @param node the node + */ + private void enqueue(Node node) { + // assert putLock.isHeldByCurrentThread(); + // assert last.next == null; + last = last.next = node; + } + + /** + * Removes a node from head of queue. + * + * @return the node + */ + private E dequeue() { + // assert takeLock.isHeldByCurrentThread(); + // assert head.item == null; + Node h = head; + Node first = h.next; + h.next = h; // help GC + head = first; + E x = first.item; + first.item = null; + return x; + } + + /** + * Locks to prevent both puts and takes. + */ + void fullyLock() { + putLock.lock(); + takeLock.lock(); + } + + /** + * Unlocks to allow both puts and takes. + */ + void fullyUnlock() { + takeLock.unlock(); + putLock.unlock(); + } + + + public AlertBlockQueue() { + this(Integer.MAX_VALUE); + } + + + public AlertBlockQueue(int capacity) { + if (capacity <= 0) throw new IllegalArgumentException(); + this.capacity = capacity; + last = head = new Node(null); + } + + /** + * Creates a {@code LinkedBlockingQueue} with a capacity of + * {@link Integer#MAX_VALUE}, initially containing the elements of the + * given collection, + * added in traversal order of the collection's iterator. + * + * @param c the collection of elements to initially contain + * @throws NullPointerException if the specified collection or any + * of its elements are null + */ + public AlertBlockQueue(Collection c) { + this(Integer.MAX_VALUE); + final ReentrantLock putLock = this.putLock; + putLock.lock(); // Never contended, but necessary for visibility + try { + int n = 0; + for (E e : c) { + if (e == null) + throw new NullPointerException(); + if (n == capacity) + throw new IllegalStateException("Queue full"); + enqueue(new Node(e)); + ++n; + } + count.set(n); + } finally { + putLock.unlock(); + } + } + + // this doc comment is overridden to remove the reference to collections + // greater in size than Integer.MAX_VALUE + + /** + * Returns the number of elements in this queue. + * + * @return the number of elements in this queue + */ + public int size() { + return count.get(); + } + + // this doc comment is a modified copy of the inherited doc comment, + // without the reference to unlimited queues. + + /** + * Returns the number of additional elements that this queue can ideally + * (in the absence of memory or resource constraints) accept without + * blocking. This is always equal to the initial capacity of this queue + * less the current {@code size} of this queue. + *

+ *

Note that you cannot always tell if an attempt to insert + * an element will succeed by inspecting {@code remainingCapacity} + * because it may be the case that another thread is about to + * insert or remove an element. + */ + public int remainingCapacity() { + return capacity - count.get(); + } + + /** + * Inserts the specified element at the tail of this queue, waiting if + * necessary for space to become available. + * + * @throws InterruptedException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ + public void put(E e) throws InterruptedException { + if (e == null) throw new NullPointerException(); + // Note: convention in all put/take/etc is to preset local var + // holding count negative to indicate failure unless set. + int c = -1; + Node node = new Node(e); + final ReentrantLock putLock = this.putLock; + final AtomicInteger count = this.count; + putLock.lockInterruptibly(); + try { + /* + * Note that count is used in wait guard even though it is + * not protected by lock. This works because count can + * only decrease at this point (all other puts are shut + * out by lock), and we (or some other waiting put) are + * signalled if it ever changes from capacity. Similarly + * for all other uses of count in other wait guards. + */ + while (count.get() == capacity) { + notFull.await(); + } + enqueue(node); + c = count.getAndIncrement(); + if (c + 1 < capacity) + notFull.signal(); + } finally { + putLock.unlock(); + } + if (c == 0) + signalNotEmpty(); + } + + /** + * Inserts the specified element at the tail of this queue, waiting if + * necessary up to the specified wait time for space to become available. + * + * @return {@code true} if successful, or {@code false} if + * the specified waiting time elapses before space is available + * @throws InterruptedException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ + public boolean offer(E e, long timeout, TimeUnit unit) + throws InterruptedException { + + if (e == null) throw new NullPointerException(); + long nanos = unit.toNanos(timeout); + int c = -1; + final ReentrantLock putLock = this.putLock; + final AtomicInteger count = this.count; + putLock.lockInterruptibly(); + try { + while (count.get() == capacity) { + if (nanos <= 0) { + break; + } + nanos = notFull.awaitNanos(nanos); + } + enqueue(new Node(e)); + c = count.getAndIncrement(); + if (c + 1 < capacity) + notFull.signal(); + } finally { + putLock.unlock(); + } + if (c == 0) + signalNotEmpty(); + return true; + } + + /** + * Inserts the specified element at the tail of this queue if it is + * possible to do so immediately without exceeding the queue's capacity, + * returning {@code true} upon success and {@code false} if this queue + * is full. + * When using a capacity-restricted queue, this method is generally + * preferable to method {@link BlockingQueue#add add}, which can fail to + * insert an element only by throwing an exception. + * + * @throws NullPointerException if the specified element is null + */ + public boolean offer(E e) { + if (e == null) throw new NullPointerException(); + final AtomicInteger count = this.count; + int c = -1; + Node node = new Node(e); + final ReentrantLock putLock = this.putLock; + putLock.lock(); + try { + if (count.get() >= capacity) { + takeLock.lock(); + try { + if (count.get() > 0) { + E dropOne = dequeue(); + LOGGER.warn("Drop AlertTask:" + dropOne.toString()); + c = count.getAndDecrement(); + } + } finally { + takeLock.unlock(); + } + } + enqueue(node); + c = count.getAndIncrement(); + if (c + 1 < capacity) + notFull.signal(); + } finally { + putLock.unlock(); + } + if (c == 0) + signalNotEmpty(); + return c >= 0; + } + + public E take() throws InterruptedException { + E x; + int c = -1; + final AtomicInteger count = this.count; + final ReentrantLock takeLock = this.takeLock; + takeLock.lockInterruptibly(); + try { + while (count.get() == 0) { + notEmpty.await(); + } + x = dequeue(); + c = count.getAndDecrement(); + if (c > 1) + notEmpty.signal(); + } finally { + takeLock.unlock(); + } + if (c == capacity) + signalNotFull(); + return x; + } + + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + E x = null; + int c = -1; + long nanos = unit.toNanos(timeout); + final AtomicInteger count = this.count; + final ReentrantLock takeLock = this.takeLock; + takeLock.lockInterruptibly(); + try { + while (count.get() == 0) { + if (nanos <= 0) + return null; + nanos = notEmpty.awaitNanos(nanos); + } + x = dequeue(); + c = count.getAndDecrement(); + if (c > 1) + notEmpty.signal(); + } finally { + takeLock.unlock(); + } + if (c == capacity) + signalNotFull(); + return x; + } + + public E poll() { + final AtomicInteger count = this.count; + if (count.get() == 0) + return null; + E x = null; + int c = -1; + final ReentrantLock takeLock = this.takeLock; + takeLock.lock(); + try { + if (count.get() > 0) { + x = dequeue(); + c = count.getAndDecrement(); + if (c > 1) + notEmpty.signal(); + } + } finally { + takeLock.unlock(); + } + if (c == capacity) + signalNotFull(); + return x; + } + + public E peek() { + if (count.get() == 0) + return null; + final ReentrantLock takeLock = this.takeLock; + takeLock.lock(); + try { + Node first = head.next; + if (first == null) + return null; + else + return first.item; + } finally { + takeLock.unlock(); + } + } + + /** + * Unlinks interior Node p with predecessor trail. + */ + void unlink(Node p, Node trail) { + // assert isFullyLocked(); + // p.next is not changed, to allow iterators that are + // traversing p to maintain their weak-consistency guarantee. + p.item = null; + trail.next = p.next; + if (last == p) + last = trail; + if (count.getAndDecrement() == capacity) + notFull.signal(); + } + + /** + * Removes a single instance of the specified element from this queue, + * if it is present. More formally, removes an element {@code e} such + * that {@code o.equals(e)}, if this queue contains one or more such + * elements. + * Returns {@code true} if this queue contained the specified element + * (or equivalently, if this queue changed as a result of the call). + * + * @param o element to be removed from this queue, if present + * @return {@code true} if this queue changed as a result of the call + */ + public boolean remove(Object o) { + if (o == null) return false; + fullyLock(); + try { + for (Node trail = head, p = trail.next; + p != null; + trail = p, p = p.next) { + if (o.equals(p.item)) { + unlink(p, trail); + return true; + } + } + return false; + } finally { + fullyUnlock(); + } + } + + /** + * Returns {@code true} if this queue contains the specified element. + * More formally, returns {@code true} if and only if this queue contains + * at least one element {@code e} such that {@code o.equals(e)}. + * + * @param o object to be checked for containment in this queue + * @return {@code true} if this queue contains the specified element + */ + public boolean contains(Object o) { + if (o == null) return false; + fullyLock(); + try { + for (Node p = head.next; p != null; p = p.next) + if (o.equals(p.item)) + return true; + return false; + } finally { + fullyUnlock(); + } + } + + /** + * Returns an array containing all of the elements in this queue, in + * proper sequence. + *

+ *

The returned array will be "safe" in that no references to it are + * maintained by this queue. (In other words, this method must allocate + * a new array). The caller is thus free to modify the returned array. + *

+ *

This method acts as bridge between array-based and collection-based + * APIs. + * + * @return an array containing all of the elements in this queue + */ + public Object[] toArray() { + fullyLock(); + try { + int size = count.get(); + Object[] a = new Object[size]; + int k = 0; + for (Node p = head.next; p != null; p = p.next) + a[k++] = p.item; + return a; + } finally { + fullyUnlock(); + } + } + + /** + * Returns an array containing all of the elements in this queue, in + * proper sequence; the runtime type of the returned array is that of + * the specified array. If the queue fits in the specified array, it + * is returned therein. Otherwise, a new array is allocated with the + * runtime type of the specified array and the size of this queue. + *

+ *

If this queue fits in the specified array with room to spare + * (i.e., the array has more elements than this queue), the element in + * the array immediately following the end of the queue is set to + * {@code null}. + *

+ *

Like the {@link #toArray()} method, this method acts as bridge between + * array-based and collection-based APIs. Further, this method allows + * precise control over the runtime type of the output array, and may, + * under certain circumstances, be used to save allocation costs. + *

+ *

Suppose {@code x} is a queue known to contain only strings. + * The following code can be used to dump the queue into a newly + * allocated array of {@code String}: + *

+ *

 {@code String[] y = x.toArray(new String[0]);}
+ * + * Note that {@code toArray(new Object[0])} is identical in function to + * {@code toArray()}. + * + * @param a the array into which the elements of the queue are to + * be stored, if it is big enough; otherwise, a new array of the + * same runtime type is allocated for this purpose + * @return an array containing all of the elements in this queue + * @throws ArrayStoreException if the runtime type of the specified array + * is not a supertype of the runtime type of every element in + * this queue + * @throws NullPointerException if the specified array is null + */ + @SuppressWarnings("unchecked") + public T[] toArray(T[] a) { + fullyLock(); + try { + int size = count.get(); + if (a.length < size) + a = (T[]) java.lang.reflect.Array.newInstance + (a.getClass().getComponentType(), size); + + int k = 0; + for (Node p = head.next; p != null; p = p.next) + a[k++] = (T) p.item; + if (a.length > k) + a[k] = null; + return a; + } finally { + fullyUnlock(); + } + } + + public String toString() { + fullyLock(); + try { + Node p = head.next; + if (p == null) + return "[]"; + + StringBuilder sb = new StringBuilder(); + sb.append('['); + for (; ; ) { + E e = p.item; + sb.append(e == this ? "(this Collection)" : e); + p = p.next; + if (p == null) + return sb.append(']').toString(); + sb.append(',').append(' '); + } + } finally { + fullyUnlock(); + } + } + + /** + * Atomically removes all of the elements from this queue. + * The queue will be empty after this call returns. + */ + public void clear() { + fullyLock(); + try { + for (Node p, h = head; (p = h.next) != null; h = p) { + h.next = h; + p.item = null; + } + head = last; + // assert head.item == null && head.next == null; + if (count.getAndSet(0) == capacity) + notFull.signal(); + } finally { + fullyUnlock(); + } + } + + /** + * @throws UnsupportedOperationException {@inheritDoc} + * @throws ClassCastException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + * @throws IllegalArgumentException {@inheritDoc} + */ + public int drainTo(Collection c) { + return drainTo(c, Integer.MAX_VALUE); + } + + /** + * @throws UnsupportedOperationException {@inheritDoc} + * @throws ClassCastException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + * @throws IllegalArgumentException {@inheritDoc} + */ + public int drainTo(Collection c, int maxElements) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + if (maxElements <= 0) + return 0; + boolean signalNotFull = false; + final ReentrantLock takeLock = this.takeLock; + takeLock.lock(); + try { + int n = Math.min(maxElements, count.get()); + // count.get provides visibility to first n Nodes + Node h = head; + int i = 0; + try { + while (i < n) { + Node p = h.next; + c.add(p.item); + p.item = null; + h.next = h; + h = p; + ++i; + } + return n; + } finally { + // Restore invariants even if c.add() threw + if (i > 0) { + // assert h.item == null; + head = h; + signalNotFull = (count.getAndAdd(-i) == capacity); + } + } + } finally { + takeLock.unlock(); + if (signalNotFull) + signalNotFull(); + } + } + + /** + * Returns an iterator over the elements in this queue in proper sequence. + * The elements will be returned in order from first (head) to last (tail). + *

+ *

The returned iterator is + * weakly consistent. + * + * @return an iterator over the elements in this queue in proper sequence + */ + public Iterator iterator() { + return new Itr(); + } + + private class Itr implements Iterator { + /* + * Basic weakly-consistent iterator. At all times hold the next + * item to hand out so that if hasNext() reports true, we will + * still have it to return even if lost race with a take etc. + */ + + private Node current; + private Node lastRet; + private E currentElement; + + Itr() { + fullyLock(); + try { + current = head.next; + if (current != null) + currentElement = current.item; + } finally { + fullyUnlock(); + } + } + + public boolean hasNext() { + return current != null; + } + + /** + * Returns the next live successor of p, or null if no such. + *

+ * Unlike other traversal methods, iterators need to handle both: + * - dequeued nodes (p.next == p) + * - (possibly multiple) interior removed nodes (p.item == null) + */ + private Node nextNode(Node p) { + for (; ; ) { + Node s = p.next; + if (s == p) + return head.next; + if (s == null || s.item != null) + return s; + p = s; + } + } + + public E next() { + fullyLock(); + try { + if (current == null) + throw new NoSuchElementException(); + E x = currentElement; + lastRet = current; + current = nextNode(current); + currentElement = (current == null) ? null : current.item; + return x; + } finally { + fullyUnlock(); + } + } + + public void remove() { + if (lastRet == null) + throw new IllegalStateException(); + fullyLock(); + try { + Node node = lastRet; + lastRet = null; + for (Node trail = head, p = trail.next; + p != null; + trail = p, p = p.next) { + if (p == node) { + unlink(p, trail); + break; + } + } + } finally { + fullyUnlock(); + } + } + } + + /** + * A customized variant of Spliterators.IteratorSpliterator + */ + static final class LBQSpliterator implements Spliterator { + static final int MAX_BATCH = 1 << 25; // max batch array size; + final AlertBlockQueue queue; + Node current; // current node; null until initialized + int batch; // batch size for splits + boolean exhausted; // true when no more nodes + long est; // size estimate + + LBQSpliterator(AlertBlockQueue queue) { + this.queue = queue; + this.est = queue.size(); + } + + public long estimateSize() { + return est; + } + + public Spliterator trySplit() { + Node h; + final AlertBlockQueue q = this.queue; + int b = batch; + int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1; + if (!exhausted && + ((h = current) != null || (h = q.head.next) != null) && + h.next != null) { + Object[] a = new Object[n]; + int i = 0; + Node p = current; + q.fullyLock(); + try { + if (p != null || (p = q.head.next) != null) { + do { + if ((a[i] = p.item) != null) + ++i; + } while ((p = p.next) != null && i < n); + } + } finally { + q.fullyUnlock(); + } + if ((current = p) == null) { + est = 0L; + exhausted = true; + } else if ((est -= i) < 0L) + est = 0L; + if (i > 0) { + batch = i; + return Spliterators.spliterator + (a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL | + Spliterator.CONCURRENT); + } + } + return null; + } + + public void forEachRemaining(Consumer action) { + if (action == null) throw new NullPointerException(); + final AlertBlockQueue q = this.queue; + if (!exhausted) { + exhausted = true; + Node p = current; + do { + E e = null; + q.fullyLock(); + try { + if (p == null) + p = q.head.next; + while (p != null) { + e = p.item; + p = p.next; + if (e != null) + break; + } + } finally { + q.fullyUnlock(); + } + if (e != null) + action.accept(e); + } while (p != null); + } + } + + public boolean tryAdvance(Consumer action) { + if (action == null) throw new NullPointerException(); + final AlertBlockQueue q = this.queue; + if (!exhausted) { + E e = null; + q.fullyLock(); + try { + if (current == null) + current = q.head.next; + while (current != null) { + e = current.item; + current = current.next; + if (e != null) + break; + } + } finally { + q.fullyUnlock(); + } + if (current == null) + exhausted = true; + if (e != null) { + action.accept(e); + return true; + } + } + return false; + } + + public int characteristics() { + return Spliterator.ORDERED | Spliterator.NONNULL | + Spliterator.CONCURRENT; + } + } + + /** + * Returns a {@link Spliterator} over the elements in this queue. + *

+ *

The returned spliterator is + * weakly consistent. + *

+ *

The {@code Spliterator} reports {@link Spliterator#CONCURRENT}, + * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}. + * + * @return a {@code Spliterator} over the elements in this queue + * @implNote The {@code Spliterator} implements {@code trySplit} to permit limited + * parallelism. + * @since 1.8 + */ + public Spliterator spliterator() { + return new LBQSpliterator(this); + } + + /** + * Saves this queue to a stream (that is, serializes it). + * + * @param s the stream + * @throws java.io.IOException if an I/O error occurs + * @serialData The capacity is emitted (int), followed by all of + * its elements (each an {@code Object}) in the proper order, + * followed by a null + */ + private void writeObject(java.io.ObjectOutputStream s) + throws java.io.IOException { + + fullyLock(); + try { + // Write out any hidden stuff, plus capacity + s.defaultWriteObject(); + + // Write out all elements in the proper order. + for (Node p = head.next; p != null; p = p.next) + s.writeObject(p.item); + + // Use trailing null as sentinel + s.writeObject(null); + } finally { + fullyUnlock(); + } + } + + /** + * Reconstitutes this queue from a stream (that is, deserializes it). + * + * @param s the stream + * @throws ClassNotFoundException if the class of a serialized object + * could not be found + * @throws java.io.IOException if an I/O error occurs + */ + private void readObject(java.io.ObjectInputStream s) + throws java.io.IOException, ClassNotFoundException { + // Read in capacity, and any hidden stuff + s.defaultReadObject(); + + count.set(0); + last = head = new Node(null); + + // Read in all elements and place in queue + for (; ; ) { + @SuppressWarnings("unchecked") + E item = (E) s.readObject(); + if (item == null) + break; + add(item); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/actiontech/dble/alarm/AlertSender.java b/src/main/java/com/actiontech/dble/alarm/AlertSender.java new file mode 100644 index 0000000000..a5eced1102 --- /dev/null +++ b/src/main/java/com/actiontech/dble/alarm/AlertSender.java @@ -0,0 +1,70 @@ +package com.actiontech.dble.alarm; + +import com.actiontech.dble.DbleServer; +import com.actiontech.dble.cluster.ClusterController; +import com.actiontech.dble.cluster.ClusterGeneralConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.BlockingQueue; + +/** + * Created by szf on 2019/3/22. + */ +public class AlertSender implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(AlertSender.class); + + private final BlockingQueue alertQueue; + + private static final Alert DEFAULT_ALERT = new NoAlert(); + private static volatile Alert alert; + + public AlertSender(BlockingQueue alertQueue) { + this.alertQueue = alertQueue; + initAlert(); + } + + @Override + public void run() { + AlertTask alertTask; + while (true) { + try { + alertTask = alertQueue.take(); + + switch (alertTask.getAlertType()) { + case ALERT: + alert.alert(alertTask.getAlertBean()); + break; + case ALERT_SELF: + alert.alertSelf(alertTask.getAlertBean()); + break; + case ALERT_RESOLVE: + if (alert.alertResolve(alertTask.getAlertBean())) { + alertTask.alertCallBack(); + } + break; + case ALERT_SELF_RESOLVE: + if (alert.alertSelfResolve(alertTask.getAlertBean())) { + alertTask.alertCallBack(); + } + break; + default: + break; + } + } catch (Throwable e) { + LOGGER.error("get error when send queue", e); + } + } + } + + + public void initAlert() { + if (DbleServer.getInstance().isUseGeneralCluster() && + (ClusterController.CONFIG_MODE_UCORE.equals(ClusterGeneralConfig.getInstance().getClusterType()) || + ClusterController.CONFIG_MODE_USHARD.equals(ClusterGeneralConfig.getInstance().getClusterType()))) { + alert = UcoreAlert.getInstance(); + } else { + alert = DEFAULT_ALERT; + } + } +} diff --git a/src/main/java/com/actiontech/dble/alarm/AlertTask.java b/src/main/java/com/actiontech/dble/alarm/AlertTask.java new file mode 100644 index 0000000000..b13236a615 --- /dev/null +++ b/src/main/java/com/actiontech/dble/alarm/AlertTask.java @@ -0,0 +1,43 @@ +package com.actiontech.dble.alarm; + +import com.actiontech.dble.cluster.bean.ClusterAlertBean; + +import java.util.Set; + +/** + * Created by szf on 2019/3/29. + */ +public class AlertTask { + + private ClusterAlertBean alertBean; + private Alert.AlertType alertType; + private Set callbackSet; + private String callbackKey; + + + public AlertTask(Alert.AlertType alertType, Set callbackSet, String callbackKey, ClusterAlertBean alertBean) { + this.alertBean = alertBean; + this.alertType = alertType; + this.callbackSet = callbackSet; + this.callbackKey = callbackKey; + + } + + + public ClusterAlertBean getAlertBean() { + return alertBean; + } + + public Alert.AlertType getAlertType() { + return alertType; + } + + + public void alertCallBack() { + if (callbackSet != null && + callbackKey != null) { + callbackSet.remove(callbackKey); + } + } + +} diff --git a/src/main/java/com/actiontech/dble/alarm/AlertUtil.java b/src/main/java/com/actiontech/dble/alarm/AlertUtil.java index 4dfc0bda1d..3833e05a46 100644 --- a/src/main/java/com/actiontech/dble/alarm/AlertUtil.java +++ b/src/main/java/com/actiontech/dble/alarm/AlertUtil.java @@ -5,62 +5,62 @@ package com.actiontech.dble.alarm; -import com.actiontech.dble.DbleServer; -import com.actiontech.dble.cluster.ClusterController; -import com.actiontech.dble.cluster.ClusterGeneralConfig; +import com.actiontech.dble.cluster.bean.ClusterAlertBean; +import com.actiontech.dble.server.status.AlertManager; import java.util.HashMap; import java.util.Map; +import java.util.Set; public final class AlertUtil { private AlertUtil() { } - private static volatile Alert alert; - private static final Alert DEFAULT_ALERT = new NoAlert(); private static volatile boolean isEnable = false; - static { - alert = DEFAULT_ALERT; - } - public static void switchAlert(boolean enableAlert) { isEnable = enableAlert; } - public static void initAlert() { - if (DbleServer.getInstance().isUseGeneralCluster() && - (ClusterController.CONFIG_MODE_UCORE.equals(ClusterGeneralConfig.getInstance().getClusterType()) || - ClusterController.CONFIG_MODE_USHARD.equals(ClusterGeneralConfig.getInstance().getClusterType()))) { - alert = UcoreAlert.getInstance(); - } else { - alert = DEFAULT_ALERT; - } - } - public static boolean isEnable() { return isEnable; } public static void alertSelf(String code, Alert.AlertLevel level, String desc, Map labels) { if (isEnable) { - alert.alertSelf(code, level, desc, labels); + ClusterAlertBean bean = new ClusterAlertBean(); + bean.setCode(code).setLevel(level.toString()).setDesc(desc).setLabels(labels); + AlertTask task = new AlertTask(Alert.AlertType.ALERT_SELF, null, null, bean); + AlertManager.getInstance().getAlertQueue().offer(task); } } public static void alert(String code, Alert.AlertLevel level, String desc, String alertComponentType, String alertComponentId, Map labels) { if (isEnable) { - alert.alert(code, level, desc, alertComponentType, alertComponentId, labels); + ClusterAlertBean bean = new ClusterAlertBean(); + bean.setCode(code).setLevel(level.toString()).setDesc(desc).setLabels(labels).setAlertComponentType(alertComponentType).setAlertComponentId(alertComponentId); + AlertTask task = new AlertTask(Alert.AlertType.ALERT, null, null, bean); + AlertManager.getInstance().getAlertQueue().offer(task); } } - public static boolean alertResolve(String code, Alert.AlertLevel level, String alertComponentType, String alertComponentId, Map labels) { - return isEnable ? alert.alertResolve(code, level, alertComponentType, alertComponentId, labels) : true; + public static void alertResolve(String code, Alert.AlertLevel level, String alertComponentType, String alertComponentId, Map labels, Set callbackSet, String callbackKey) { + if (isEnable) { + ClusterAlertBean bean = new ClusterAlertBean(); + bean.setCode(code).setLevel(level.toString()).setAlertComponentId(alertComponentId).setAlertComponentType(alertComponentType).setLabels(labels); + AlertTask task = new AlertTask(Alert.AlertType.ALERT_RESOLVE, callbackSet, callbackKey, bean); + AlertManager.getInstance().getAlertQueue().offer(task); + } } - public static boolean alertSelfResolve(String code, Alert.AlertLevel level, Map labels) { - return isEnable ? alert.alertSelfResolve(code, level, labels) : true; + public static void alertSelfResolve(String code, Alert.AlertLevel level, Map labels, Set callbackSet, String callbackKey) { + if (isEnable) { + ClusterAlertBean bean = new ClusterAlertBean(); + bean.setCode(code).setLevel(level.toString()).setLabels(labels); + AlertTask task = new AlertTask(Alert.AlertType.ALERT_SELF_RESOLVE, callbackSet, callbackKey, bean); + AlertManager.getInstance().getAlertQueue().offer(task); + } } public static Map genSingleLabel(String key, String value) { diff --git a/src/main/java/com/actiontech/dble/alarm/NoAlert.java b/src/main/java/com/actiontech/dble/alarm/NoAlert.java index 3b012bb622..18031d0024 100644 --- a/src/main/java/com/actiontech/dble/alarm/NoAlert.java +++ b/src/main/java/com/actiontech/dble/alarm/NoAlert.java @@ -5,28 +5,28 @@ package com.actiontech.dble.alarm; -import java.util.Map; +import com.actiontech.dble.cluster.bean.ClusterAlertBean; public class NoAlert implements Alert { + @Override - public void alertSelf(String code, AlertLevel level, String desc, Map labels) { + public void alertSelf(ClusterAlertBean bean) { } @Override - public void alert(String code, AlertLevel level, String desc, String alertComponentType, String alertComponentId, Map labels) { + public void alert(ClusterAlertBean bean) { } @Override - public boolean alertResolve(String code, AlertLevel level, String alertComponentType, String alertComponentId, Map labels) { + public boolean alertResolve(ClusterAlertBean bean) { return true; } @Override - public boolean alertSelfResolve(String code, AlertLevel level, Map labels) { + public boolean alertSelfResolve(ClusterAlertBean bean) { return true; } - } diff --git a/src/main/java/com/actiontech/dble/alarm/ToResolveContainer.java b/src/main/java/com/actiontech/dble/alarm/ToResolveContainer.java index 40d42c5b0e..f3c9c7947b 100644 --- a/src/main/java/com/actiontech/dble/alarm/ToResolveContainer.java +++ b/src/main/java/com/actiontech/dble/alarm/ToResolveContainer.java @@ -20,4 +20,5 @@ private ToResolveContainer() { public static final Set DATA_NODE_LACK = Collections.newSetFromMap(new ConcurrentHashMap()); public static final Set CREATE_CONN_FAIL = Collections.newSetFromMap(new ConcurrentHashMap()); public static final Set REACH_MAX_CON = Collections.newSetFromMap(new ConcurrentHashMap()); + public static final Set XA_WRITE_CHECK_POINT_FAIL = Collections.newSetFromMap(new ConcurrentHashMap()); } diff --git a/src/main/java/com/actiontech/dble/alarm/UcoreAlert.java b/src/main/java/com/actiontech/dble/alarm/UcoreAlert.java index ccbc403c0b..2a269d60bf 100644 --- a/src/main/java/com/actiontech/dble/alarm/UcoreAlert.java +++ b/src/main/java/com/actiontech/dble/alarm/UcoreAlert.java @@ -10,7 +10,6 @@ import com.actiontech.dble.cluster.ClusterParamCfg; import com.actiontech.dble.cluster.bean.ClusterAlertBean; -import java.util.Map; public final class UcoreAlert implements Alert { private static final String SOURCE_COMPONENT_TYPE = "dble"; @@ -26,49 +25,32 @@ private UcoreAlert() { } @Override - public void alertSelf(String code, AlertLevel level, String desc, Map labels) { - alert(code, level, desc, SOURCE_COMPONENT_TYPE, ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID), labels); + public void alertSelf(ClusterAlertBean alert) { + alert(alert.setAlertComponentType(SOURCE_COMPONENT_TYPE).setAlertComponentId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID))); } @Override - public void alert(String code, AlertLevel level, String desc, String alertComponentType, String alertComponentId, Map labels) { - ClusterAlertBean alert = new ClusterAlertBean(); - alert.setCode(code); - alert.setDesc(desc); - alert.setLevel(level.toString()); - alert.setSourceComponentType(SOURCE_COMPONENT_TYPE); - alert.setSourceComponentId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID)); - alert.setAlertComponentId(alertComponentId); - alert.setAlertComponentType(alertComponentType); - alert.setServerId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_SERVER_ID)); - alert.setTimestampUnix(System.currentTimeMillis() * 1000000); - if (labels != null) { - alert.setLabels(labels); - } + public void alert(ClusterAlertBean alert) { + alert.setSourceComponentType(SOURCE_COMPONENT_TYPE). + setSourceComponentId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID)). + setServerId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_SERVER_ID)). + setTimestampUnix(System.currentTimeMillis() * 1000000); ClusterHelper.alert(alert); } @Override - public boolean alertResolve(String code, AlertLevel level, String alertComponentType, String alertComponentId, Map labels) { - ClusterAlertBean alert = new ClusterAlertBean(); - alert.setCode(code); - alert.setDesc(""); - alert.setLevel(level.toString()); - alert.setSourceComponentType(SOURCE_COMPONENT_TYPE); - alert.setSourceComponentId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID)); - alert.setAlertComponentId(alertComponentId); - alert.setAlertComponentType(alertComponentType); - alert.setServerId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_SERVER_ID)); - alert.setResolveTimestampUnix(System.currentTimeMillis() * 1000000); - if (labels != null) { - alert.setLabels(labels); - } + public boolean alertResolve(ClusterAlertBean alert) { + alert.setDesc(""). + setSourceComponentType(SOURCE_COMPONENT_TYPE). + setSourceComponentId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID)). + setServerId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_SERVER_ID)). + setResolveTimestampUnix(System.currentTimeMillis() * 1000000); return ClusterHelper.alertResolve(alert); } @Override - public boolean alertSelfResolve(String code, AlertLevel level, Map labels) { - return alertResolve(code, level, SOURCE_COMPONENT_TYPE, ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID), labels); + public boolean alertSelfResolve(ClusterAlertBean alert) { + return alertResolve(alert.setAlertComponentType(SOURCE_COMPONENT_TYPE).setAlertComponentId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID))); } } diff --git a/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDatasource.java b/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDatasource.java index a8bea28f8d..3003176156 100644 --- a/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDatasource.java +++ b/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDatasource.java @@ -254,9 +254,8 @@ private void createByIdleLittle(int idleCons, int createCount) { } if (ToResolveContainer.CREATE_CONN_FAIL.contains(this.getHostConfig().getName() + "-" + this.getConfig().getHostName())) { Map labels = AlertUtil.genSingleLabel("data_host", this.getHostConfig().getName() + "-" + this.getConfig().getHostName()); - if (AlertUtil.alertResolve(AlarmCode.CREATE_CONN_FAIL, Alert.AlertLevel.WARN, "mysql", this.getConfig().getId(), labels)) { - ToResolveContainer.CREATE_CONN_FAIL.remove(this.getHostConfig().getName() + "-" + this.getConfig().getHostName()); - } + AlertUtil.alertResolve(AlarmCode.CREATE_CONN_FAIL, Alert.AlertLevel.WARN, "mysql", this.getConfig().getId(), + labels, ToResolveContainer.CREATE_CONN_FAIL, this.getHostConfig().getName() + "-" + this.getConfig().getHostName()); } } catch (IOException e) { String errMsg = "create connection err "; @@ -326,9 +325,8 @@ private void takeCon(BackendConnection conn, String schema) { if (ToResolveContainer.CREATE_CONN_FAIL.contains(this.getHostConfig().getName() + "-" + this.getConfig().getHostName())) { Map labels = AlertUtil.genSingleLabel("data_host", this.getHostConfig().getName() + "-" + this.getConfig().getHostName()); - if (AlertUtil.alertResolve(AlarmCode.CREATE_CONN_FAIL, Alert.AlertLevel.WARN, "mysql", this.getConfig().getId(), labels)) { - ToResolveContainer.CREATE_CONN_FAIL.remove(this.getHostConfig().getName() + "-" + this.getConfig().getHostName()); - } + AlertUtil.alertResolve(AlarmCode.CREATE_CONN_FAIL, Alert.AlertLevel.WARN, "mysql", this.getConfig().getId(), labels, + ToResolveContainer.CREATE_CONN_FAIL, this.getHostConfig().getName() + "-" + this.getConfig().getHostName()); } takeCon(conn, schema); conn.setAttachment(attachment); @@ -381,9 +379,9 @@ public void getConnection(String schema, boolean autocommit, final ResponseHandl } else { // create connection if (ToResolveContainer.REACH_MAX_CON.contains(this.getHostConfig().getName() + "-" + this.getConfig().getHostName())) { Map labels = AlertUtil.genSingleLabel("data_host", this.getHostConfig().getName() + "-" + this.getConfig().getHostName()); - if (AlertUtil.alertResolve(AlarmCode.REACH_MAX_CON, Alert.AlertLevel.WARN, "dble", this.getConfig().getId(), labels)) { - ToResolveContainer.REACH_MAX_CON.remove(this.getHostConfig().getName() + "-" + this.getConfig().getHostName()); - } + AlertUtil.alertResolve(AlarmCode.REACH_MAX_CON, Alert.AlertLevel.WARN, "dble", this.getConfig().getId(), labels, + ToResolveContainer.REACH_MAX_CON, this.getHostConfig().getName() + "-" + this.getConfig().getHostName()); + } LOGGER.info("no idle connection in pool,create new connection for " + this.name + " of schema " + schema); createNewConnection(handler, attachment, schema); @@ -404,9 +402,8 @@ public BackendConnection getConnection(String schema, boolean autocommit, final } else { // create connection if (ToResolveContainer.REACH_MAX_CON.contains(this.getHostConfig().getName() + "-" + this.getConfig().getHostName())) { Map labels = AlertUtil.genSingleLabel("data_host", this.getHostConfig().getName() + "-" + this.getConfig().getHostName()); - if (AlertUtil.alertResolve(AlarmCode.REACH_MAX_CON, Alert.AlertLevel.WARN, "dble", this.getConfig().getId(), labels)) { - ToResolveContainer.REACH_MAX_CON.remove(this.getHostConfig().getName() + "-" + this.getConfig().getHostName()); - } + AlertUtil.alertResolve(AlarmCode.REACH_MAX_CON, Alert.AlertLevel.WARN, "dble", this.getConfig().getId(), labels, + ToResolveContainer.REACH_MAX_CON, this.getHostConfig().getName() + "-" + this.getConfig().getHostName()); } LOGGER.info("no ilde connection in pool,create new connection for " + this.name + " of schema " + schema); try { @@ -415,9 +412,8 @@ public BackendConnection getConnection(String schema, boolean autocommit, final con = simpleHandler.getBackConn(); if (ToResolveContainer.CREATE_CONN_FAIL.contains(this.getHostConfig().getName() + "-" + this.getConfig().getHostName())) { Map labels = AlertUtil.genSingleLabel("data_host", this.getHostConfig().getName() + "-" + this.getConfig().getHostName()); - if (AlertUtil.alertResolve(AlarmCode.CREATE_CONN_FAIL, Alert.AlertLevel.WARN, "mysql", this.getConfig().getId(), labels)) { - ToResolveContainer.CREATE_CONN_FAIL.remove(this.getHostConfig().getName() + "-" + this.getConfig().getHostName()); - } + AlertUtil.alertResolve(AlarmCode.CREATE_CONN_FAIL, Alert.AlertLevel.WARN, "mysql", this.getConfig().getId(), labels, + ToResolveContainer.CREATE_CONN_FAIL, this.getHostConfig().getName() + "-" + this.getConfig().getHostName()); } } catch (IOException e) { Map labels = AlertUtil.genSingleLabel("data_host", this.getHostConfig().getName() + "-" + this.getConfig().getHostName()); diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java index c9ef9f2a1f..277a93ef27 100644 --- a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java +++ b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java @@ -150,7 +150,7 @@ void setResult(int result) { private void setOk() { if (this.status != OK_STATUS) { Map labels = AlertUtil.genSingleLabel("data_host", this.source.getHostConfig().getName() + "-" + this.source.getConfig().getHostName()); - AlertUtil.alertResolve(AlarmCode.HEARTBEAT_FAIL, Alert.AlertLevel.WARN, "mysql", this.source.getConfig().getId(), labels); + AlertUtil.alertResolve(AlarmCode.HEARTBEAT_FAIL, Alert.AlertLevel.WARN, "mysql", this.source.getConfig().getId(), labels, null, null); } switch (status) { case TIMEOUT_STATUS: diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACommitNodesHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACommitNodesHandler.java index 896f0e08c7..130092c4dc 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACommitNodesHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACommitNodesHandler.java @@ -40,6 +40,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler { private Lock lockForErrorHandle = new ReentrantLock(); private Condition sendFinished = lockForErrorHandle.newCondition(); private volatile boolean sendFinishedFlag = false; + public XACommitNodesHandler(NonBlockingSession session) { super(session); } @@ -82,6 +83,7 @@ public void commit() { } } + @Override public void clearResources() { tryCommitTimes = 0; @@ -394,7 +396,7 @@ private void cleanAndFeedback(boolean isSuccess) { session.cancelableStatusSet(NonBlockingSession.CANCEL_STATUS_INIT); byte[] toSend = sendData; session.clearResources(false); - AlertUtil.alertSelfResolve(AlarmCode.XA_BACKGROUND_RETRY_FAIL, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("XA_ID", session.getSessionXaID())); + AlertUtil.alertSelfResolve(AlarmCode.XA_BACKGROUND_RETRY_FAIL, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("XA_ID", session.getSessionXaID()), null, null); if (!session.closed()) { setResponseTime(isSuccess); session.getSource().write(toSend); @@ -441,7 +443,6 @@ public void debugCommitDelay() { } - private void waitUntilSendFinish() { this.lockForErrorHandle.lock(); try { diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XARollbackNodesHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XARollbackNodesHandler.java index d2140f56b6..2bdbd1aa6d 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XARollbackNodesHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XARollbackNodesHandler.java @@ -408,7 +408,7 @@ private void cleanAndFeedback() { session.setXaState(TxState.TX_INITIALIZE_STATE); byte[] toSend = sendData; session.clearResources(false); - AlertUtil.alertSelfResolve(AlarmCode.XA_BACKGROUND_RETRY_FAIL, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("XA_ID", session.getSessionXaID())); + AlertUtil.alertSelfResolve(AlarmCode.XA_BACKGROUND_RETRY_FAIL, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("XA_ID", session.getSessionXaID()), null, null); if (!session.closed()) { setResponseTime(false); session.getSource().write(toSend); diff --git a/src/main/java/com/actiontech/dble/backend/mysql/xa/XAStateLog.java b/src/main/java/com/actiontech/dble/backend/mysql/xa/XAStateLog.java index ac1b1e9721..22e5a97fc9 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/xa/XAStateLog.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/xa/XAStateLog.java @@ -41,6 +41,7 @@ private XAStateLog() { } } + public static final String XA_ALERT_FLAG = "XA_ALERT_FLAG"; private static final Repository IN_MEMORY_REPOSITORY = new InMemoryRepository(); private static ReentrantLock lock = new ReentrantLock(); private static AtomicBoolean hasLeader = new AtomicBoolean(false); diff --git a/src/main/java/com/actiontech/dble/backend/mysql/xa/recovery/impl/FileSystemRepository.java b/src/main/java/com/actiontech/dble/backend/mysql/xa/recovery/impl/FileSystemRepository.java index 608d733cad..58e4e410f3 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/xa/recovery/impl/FileSystemRepository.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/xa/recovery/impl/FileSystemRepository.java @@ -9,6 +9,7 @@ import com.actiontech.dble.alarm.AlarmCode; import com.actiontech.dble.alarm.Alert; import com.actiontech.dble.alarm.AlertUtil; +import com.actiontech.dble.alarm.ToResolveContainer; import com.actiontech.dble.backend.mysql.xa.*; import com.actiontech.dble.backend.mysql.xa.recovery.DeserializationException; import com.actiontech.dble.backend.mysql.xa.recovery.Repository; @@ -24,6 +25,8 @@ import java.util.HashMap; import java.util.Map; +import static com.actiontech.dble.backend.mysql.xa.XAStateLog.XA_ALERT_FLAG; + /** * Created by zhangchao on 2016/10/13. */ @@ -202,15 +205,15 @@ public boolean writeCheckpoint(Collection checkpointContent } rwChannel.force(false); file.discardBackupVersion(); - if (XAStateLog.isWriteAlert()) { - boolean resolved = AlertUtil.alertSelfResolve(AlarmCode.XA_WRITE_CHECK_POINT_FAIL, Alert.AlertLevel.WARN, null); - XAStateLog.setWriteAlert(resolved); + if (ToResolveContainer.XA_WRITE_CHECK_POINT_FAIL.size() > 0) { + AlertUtil.alertSelfResolve(AlarmCode.XA_WRITE_CHECK_POINT_FAIL, Alert.AlertLevel.WARN, null, null, null); + ToResolveContainer.XA_WRITE_CHECK_POINT_FAIL.remove(XA_ALERT_FLAG); } return true; } catch (Exception e) { LOGGER.warn("Failed to write checkpoint", e); AlertUtil.alertSelf(AlarmCode.XA_WRITE_CHECK_POINT_FAIL, Alert.AlertLevel.WARN, "Failed to write checkpoint" + e.getMessage(), null); - XAStateLog.setWriteAlert(true); + ToResolveContainer.XA_WRITE_CHECK_POINT_FAIL.add(XA_ALERT_FLAG); return false; } } diff --git a/src/main/java/com/actiontech/dble/cluster/bean/ClusterAlertBean.java b/src/main/java/com/actiontech/dble/cluster/bean/ClusterAlertBean.java index ab9621648a..e94fe75bfe 100644 --- a/src/main/java/com/actiontech/dble/cluster/bean/ClusterAlertBean.java +++ b/src/main/java/com/actiontech/dble/cluster/bean/ClusterAlertBean.java @@ -21,8 +21,9 @@ public Map getLabels() { return labels; } - public void setLabels(Map labels) { - this.labels = labels; + public ClusterAlertBean setLabels(Map xlabels) { + this.labels = xlabels; + return this; } Map labels; @@ -31,79 +32,89 @@ public String getCode() { return code; } - public void setCode(String code) { - this.code = code; + public ClusterAlertBean setCode(String xcode) { + this.code = xcode; + return this; } public String getLevel() { return level; } - public void setLevel(String level) { - this.level = level; + public ClusterAlertBean setLevel(String xlevel) { + this.level = xlevel; + return this; } public String getDesc() { return desc; } - public void setDesc(String desc) { - this.desc = desc; + public ClusterAlertBean setDesc(String xdesc) { + this.desc = xdesc; + return this; } public String getSourceComponentType() { return sourceComponentType; } - public void setSourceComponentType(String sourceComponentType) { - this.sourceComponentType = sourceComponentType; + public ClusterAlertBean setSourceComponentType(String xsourceComponentType) { + this.sourceComponentType = xsourceComponentType; + return this; } public String getSourceComponentId() { return sourceComponentId; } - public void setSourceComponentId(String sourceComponentId) { - this.sourceComponentId = sourceComponentId; + public ClusterAlertBean setSourceComponentId(String xsourceComponentId) { + this.sourceComponentId = xsourceComponentId; + return this; } public String getAlertComponentType() { return alertComponentType; } - public void setAlertComponentType(String alertComponentType) { - this.alertComponentType = alertComponentType; + public ClusterAlertBean setAlertComponentType(String xalertComponentType) { + this.alertComponentType = xalertComponentType; + return this; } public String getAlertComponentId() { return alertComponentId; } - public void setAlertComponentId(String alertComponentId) { - this.alertComponentId = alertComponentId; + public ClusterAlertBean setAlertComponentId(String xalertComponentId) { + this.alertComponentId = xalertComponentId; + return this; } public String getServerId() { return serverId; } - public void setServerId(String serverId) { - this.serverId = serverId; + public ClusterAlertBean setServerId(String xserverId) { + this.serverId = xserverId; + return this; } public long getTimestampUnix() { return timestampUnix; } - public void setTimestampUnix(long timestampUnix) { - this.timestampUnix = timestampUnix; + public ClusterAlertBean setTimestampUnix(long xtimestampUnix) { + this.timestampUnix = xtimestampUnix; + return this; } public long getResolveTimestampUnix() { return resolveTimestampUnix; } - public void setResolveTimestampUnix(long resolveTimestampUnix) { - this.resolveTimestampUnix = resolveTimestampUnix; + public ClusterAlertBean setResolveTimestampUnix(long xresolveTimestampUnix) { + this.resolveTimestampUnix = xresolveTimestampUnix; + return this; } } diff --git a/src/main/java/com/actiontech/dble/manager/handler/CreateDatabaseHandler.java b/src/main/java/com/actiontech/dble/manager/handler/CreateDatabaseHandler.java index 6fe8e8effe..9e3866ce94 100644 --- a/src/main/java/com/actiontech/dble/manager/handler/CreateDatabaseHandler.java +++ b/src/main/java/com/actiontech/dble/manager/handler/CreateDatabaseHandler.java @@ -79,9 +79,8 @@ public void onResult(SQLQueryResult> result) { if (ToResolveContainer.DATA_NODE_LACK.contains(key)) { Map labels = AlertUtil.genSingleLabel("data_host", ds.getHostConfig().getName() + "-" + ds.getConfig().getHostName()); labels.put("data_node", dataNode); - if (AlertUtil.alertResolve(AlarmCode.DATA_NODE_LACK, Alert.AlertLevel.WARN, "mysql", ds.getConfig().getId(), labels)) { - ToResolveContainer.DATA_NODE_LACK.remove(key); - } + AlertUtil.alertResolve(AlarmCode.DATA_NODE_LACK, Alert.AlertLevel.WARN, "mysql", ds.getConfig().getId(), labels, + ToResolveContainer.DATA_NODE_LACK, key); } } numberCount.decrementAndGet(); diff --git a/src/main/java/com/actiontech/dble/meta/ProxyMetaManager.java b/src/main/java/com/actiontech/dble/meta/ProxyMetaManager.java index 62a2cd34dc..5e8236158a 100644 --- a/src/main/java/com/actiontech/dble/meta/ProxyMetaManager.java +++ b/src/main/java/com/actiontech/dble/meta/ProxyMetaManager.java @@ -602,8 +602,8 @@ private boolean createTable(String schema, String table, String sql, boolean isS for (String dataNode : tbConfig.getDataNodes()) { showDataNode = dataNode; String tableId = "DataNode[" + dataNode + "]:Table[" + tableName + "]"; - if (ToResolveContainer.TABLE_LACK.contains(tableId) && AlertUtil.alertSelfResolve(AlarmCode.TABLE_LACK, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableId))) { - ToResolveContainer.TABLE_LACK.remove(tableId); + if (ToResolveContainer.TABLE_LACK.contains(tableId)) { + AlertUtil.alertSelfResolve(AlarmCode.TABLE_LACK, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableId), ToResolveContainer.TABLE_LACK, tableId); } } } diff --git a/src/main/java/com/actiontech/dble/meta/table/AbstractTablesMetaHandler.java b/src/main/java/com/actiontech/dble/meta/table/AbstractTablesMetaHandler.java index 4eb3902cb9..6c259cecc8 100644 --- a/src/main/java/com/actiontech/dble/meta/table/AbstractTablesMetaHandler.java +++ b/src/main/java/com/actiontech/dble/meta/table/AbstractTablesMetaHandler.java @@ -123,9 +123,8 @@ public void onResult(SQLQueryResult>> result) { if (ds != null && ToResolveContainer.DATA_NODE_LACK.contains(key)) { Map labels = AlertUtil.genSingleLabel("data_host", ds.getHostConfig().getName() + "-" + ds.getConfig().getHostName()); labels.put("data_node", dataNode); - if (AlertUtil.alertResolve(AlarmCode.DATA_NODE_LACK, Alert.AlertLevel.WARN, "mysql", ds.getConfig().getId(), labels)) { - ToResolveContainer.DATA_NODE_LACK.remove(key); - } + AlertUtil.alertResolve(AlarmCode.DATA_NODE_LACK, Alert.AlertLevel.WARN, "mysql", ds.getConfig().getId(), labels, + ToResolveContainer.DATA_NODE_LACK, key); } List> rows = result.getResult(); for (Map row : rows) { diff --git a/src/main/java/com/actiontech/dble/meta/table/GetNodeTablesHandler.java b/src/main/java/com/actiontech/dble/meta/table/GetNodeTablesHandler.java index 32a33fd8a7..cfe586c0ff 100644 --- a/src/main/java/com/actiontech/dble/meta/table/GetNodeTablesHandler.java +++ b/src/main/java/com/actiontech/dble/meta/table/GetNodeTablesHandler.java @@ -35,6 +35,7 @@ public abstract class GetNodeTablesHandler { protected abstract void handleTables(String table); protected abstract void handleFinished(); + public void execute() { PhysicalDBNode dn = DbleServer.getInstance().getConfig().getDataNodes().get(dataNode); String mysqlShowTableCol = "Tables_in_" + dn.getDatabase(); @@ -85,9 +86,8 @@ public void onResult(SQLQueryResult>> result) { if (ds != null && ToResolveContainer.DATA_NODE_LACK.contains(key)) { Map labels = AlertUtil.genSingleLabel("data_host", ds.getHostConfig().getName() + "-" + ds.getConfig().getHostName()); labels.put("data_node", dataNode); - if (AlertUtil.alertResolve(AlarmCode.DATA_NODE_LACK, Alert.AlertLevel.WARN, "mysql", ds.getConfig().getId(), labels)) { - ToResolveContainer.DATA_NODE_LACK.remove(key); - } + AlertUtil.alertResolve(AlarmCode.DATA_NODE_LACK, Alert.AlertLevel.WARN, "mysql", ds.getConfig().getId(), labels, + ToResolveContainer.DATA_NODE_LACK, key); } List> rows = result.getResult(); for (Map row : rows) { diff --git a/src/main/java/com/actiontech/dble/meta/table/MultiTablesMetaHandler.java b/src/main/java/com/actiontech/dble/meta/table/MultiTablesMetaHandler.java index 3e83cf3cfd..226d9300a8 100644 --- a/src/main/java/com/actiontech/dble/meta/table/MultiTablesMetaHandler.java +++ b/src/main/java/com/actiontech/dble/meta/table/MultiTablesMetaHandler.java @@ -152,21 +152,22 @@ void countDownShardTable() { String tableId = schema + "." + tableName; if (tableMetas.size() > 1) { consistentWarning(tableName, tableStruct); - } else if (ToResolveContainer.TABLE_NOT_CONSISTENT_IN_DATAHOSTS.contains(tableId) && - AlertUtil.alertSelfResolve(AlarmCode.TABLE_NOT_CONSISTENT_IN_DATAHOSTS, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableId))) { - ToResolveContainer.TABLE_NOT_CONSISTENT_IN_DATAHOSTS.remove(tableId); + } else if (ToResolveContainer.TABLE_NOT_CONSISTENT_IN_DATAHOSTS.contains(tableId)) { + AlertUtil.alertSelfResolve(AlarmCode.TABLE_NOT_CONSISTENT_IN_DATAHOSTS, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableId), + ToResolveContainer.TABLE_NOT_CONSISTENT_IN_DATAHOSTS, tableId); } tableMetas.clear(); } else if (tableStruct.size() == 1) { String tableId = schema + "." + tableName; - if (ToResolveContainer.TABLE_NOT_CONSISTENT_IN_DATAHOSTS.contains(tableId) && - AlertUtil.alertSelfResolve(AlarmCode.TABLE_NOT_CONSISTENT_IN_DATAHOSTS, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableId))) { - ToResolveContainer.TABLE_NOT_CONSISTENT_IN_DATAHOSTS.remove(tableId); + if (ToResolveContainer.TABLE_NOT_CONSISTENT_IN_DATAHOSTS.contains(tableId)) { + AlertUtil.alertSelfResolve(AlarmCode.TABLE_NOT_CONSISTENT_IN_DATAHOSTS, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableId), + ToResolveContainer.TABLE_NOT_CONSISTENT_IN_DATAHOSTS, tableId); } String tableDetailId = "DataNode[" + tableStruct.values().iterator().next() + "]:Table[" + tableName + "]"; - if (ToResolveContainer.TABLE_LACK.contains(tableId) && AlertUtil.alertSelfResolve(AlarmCode.TABLE_LACK, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableDetailId))) { - ToResolveContainer.TABLE_LACK.remove(tableId); + if (ToResolveContainer.TABLE_LACK.contains(tableId)) { + AlertUtil.alertSelfResolve(AlarmCode.TABLE_LACK, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableDetailId), + ToResolveContainer.TABLE_LACK, tableId); } tableMeta = MetaHelper.initTableMeta(tableName, tableStruct.keySet().iterator().next(), version); } @@ -177,6 +178,7 @@ void countDownShardTable() { } countDown(); } + } private void countDown() { diff --git a/src/main/java/com/actiontech/dble/meta/table/TablesMetaCheckHandler.java b/src/main/java/com/actiontech/dble/meta/table/TablesMetaCheckHandler.java index b2ca0c91e8..fec7da891d 100644 --- a/src/main/java/com/actiontech/dble/meta/table/TablesMetaCheckHandler.java +++ b/src/main/java/com/actiontech/dble/meta/table/TablesMetaCheckHandler.java @@ -38,9 +38,9 @@ protected void handlerTable(String table, String dataNode, String sql) { LOGGER.warn(errorMsg); AlertUtil.alertSelf(AlarmCode.TABLE_NOT_CONSISTENT_IN_MEMORY, Alert.AlertLevel.WARN, errorMsg, AlertUtil.genSingleLabel("TABLE", tableId)); ToResolveContainer.TABLE_NOT_CONSISTENT_IN_MEMORY.add(tableId); - } else if (ToResolveContainer.TABLE_NOT_CONSISTENT_IN_MEMORY.contains(tableId) && - AlertUtil.alertSelfResolve(AlarmCode.TABLE_NOT_CONSISTENT_IN_MEMORY, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableId))) { - ToResolveContainer.TABLE_NOT_CONSISTENT_IN_MEMORY.remove(tableId); + } else if (ToResolveContainer.TABLE_NOT_CONSISTENT_IN_MEMORY.contains(tableId)) { + AlertUtil.alertSelfResolve(AlarmCode.TABLE_NOT_CONSISTENT_IN_MEMORY, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableId), + ToResolveContainer.TABLE_NOT_CONSISTENT_IN_MEMORY, tableId); } LOGGER.debug("checking table Table [" + tableMeta.getTableName() + "]"); } diff --git a/src/main/java/com/actiontech/dble/meta/table/old/AbstractTableMetaHandler.java b/src/main/java/com/actiontech/dble/meta/table/old/AbstractTableMetaHandler.java index 4b069bb35d..864c7e1d38 100644 --- a/src/main/java/com/actiontech/dble/meta/table/old/AbstractTableMetaHandler.java +++ b/src/main/java/com/actiontech/dble/meta/table/old/AbstractTableMetaHandler.java @@ -40,6 +40,7 @@ public abstract class AbstractTableMetaHandler { protected String schema; private Set selfNode; private ConcurrentMap> dataNodeTableStructureSQLMap; + public AbstractTableMetaHandler(String schema, TableConfig tbConfig, Set selfNode) { this(schema, tbConfig.getName(), tbConfig.getDataNodes(), selfNode); } @@ -109,17 +110,18 @@ public void onResult(SQLQueryResult> result) { } return; } else { - if (ToResolveContainer.TABLE_LACK.contains(tableId) && AlertUtil.alertSelfResolve(AlarmCode.TABLE_LACK, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableId))) { - ToResolveContainer.TABLE_LACK.remove(tableId); + if (ToResolveContainer.TABLE_LACK.contains(tableId)) { + AlertUtil.alertSelfResolve(AlarmCode.TABLE_LACK, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableId), + ToResolveContainer.TABLE_LACK, tableId); } if (ds != null && ToResolveContainer.DATA_NODE_LACK.contains(key)) { Map labels = AlertUtil.genSingleLabel("data_host", ds.getHostConfig().getName() + "-" + ds.getConfig().getHostName()); labels.put("data_node", dataNode); - if (AlertUtil.alertResolve(AlarmCode.DATA_NODE_LACK, Alert.AlertLevel.WARN, "mysql", ds.getConfig().getId(), labels)) { - ToResolveContainer.DATA_NODE_LACK.remove(key); - } + AlertUtil.alertResolve(AlarmCode.DATA_NODE_LACK, Alert.AlertLevel.WARN, "mysql", ds.getConfig().getId(), labels, + ToResolveContainer.DATA_NODE_LACK, key); } } + String currentSql = result.getResult().get(MYSQL_SHOW_CREATE_TABLE_COLS[1]); if (dataNodeTableStructureSQLMap.containsKey(currentSql)) { List dataNodeList = dataNodeTableStructureSQLMap.get(currentSql); @@ -150,16 +152,16 @@ private StructureMeta.TableMeta genTableMeta() { String tableId = schema + "." + tableName; if (tableMetas.size() > 1) { consistentWarning(); - } else if (ToResolveContainer.TABLE_NOT_CONSISTENT_IN_DATAHOSTS.contains(tableId) && - AlertUtil.alertSelfResolve(AlarmCode.TABLE_NOT_CONSISTENT_IN_DATAHOSTS, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableId))) { - ToResolveContainer.TABLE_NOT_CONSISTENT_IN_DATAHOSTS.remove(tableId); + } else if (ToResolveContainer.TABLE_NOT_CONSISTENT_IN_DATAHOSTS.contains(tableId)) { + AlertUtil.alertSelfResolve(AlarmCode.TABLE_NOT_CONSISTENT_IN_DATAHOSTS, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableId), + ToResolveContainer.TABLE_NOT_CONSISTENT_IN_DATAHOSTS, tableId); } tableMetas.clear(); } else if (dataNodeTableStructureSQLMap.size() == 1) { String tableId = schema + "." + tableName; - if (ToResolveContainer.TABLE_NOT_CONSISTENT_IN_DATAHOSTS.contains(tableId) && - AlertUtil.alertSelfResolve(AlarmCode.TABLE_NOT_CONSISTENT_IN_DATAHOSTS, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableId))) { - ToResolveContainer.TABLE_NOT_CONSISTENT_IN_DATAHOSTS.remove(tableId); + if (ToResolveContainer.TABLE_NOT_CONSISTENT_IN_DATAHOSTS.contains(tableId)) { + AlertUtil.alertSelfResolve(AlarmCode.TABLE_NOT_CONSISTENT_IN_DATAHOSTS, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableId), + ToResolveContainer.TABLE_NOT_CONSISTENT_IN_DATAHOSTS, tableId); } tableMeta = MetaHelper.initTableMeta(tableName, dataNodeTableStructureSQLMap.keySet().iterator().next(), version); } diff --git a/src/main/java/com/actiontech/dble/meta/table/old/SchemaDefaultNodeTablesHandler.java b/src/main/java/com/actiontech/dble/meta/table/old/SchemaDefaultNodeTablesHandler.java index afa1c62c47..9894b28c29 100644 --- a/src/main/java/com/actiontech/dble/meta/table/old/SchemaDefaultNodeTablesHandler.java +++ b/src/main/java/com/actiontech/dble/meta/table/old/SchemaDefaultNodeTablesHandler.java @@ -99,9 +99,8 @@ public void onResult(SQLQueryResult>> result) { if (ds != null && ToResolveContainer.DATA_NODE_LACK.contains(key)) { Map labels = AlertUtil.genSingleLabel("data_host", ds.getHostConfig().getName() + "-" + ds.getConfig().getHostName()); labels.put("data_node", dataNode); - if (AlertUtil.alertResolve(AlarmCode.DATA_NODE_LACK, Alert.AlertLevel.WARN, "mysql", ds.getConfig().getId(), labels)) { - ToResolveContainer.DATA_NODE_LACK.remove(key); - } + AlertUtil.alertResolve(AlarmCode.DATA_NODE_LACK, Alert.AlertLevel.WARN, "mysql", ds.getConfig().getId(), labels, + ToResolveContainer.DATA_NODE_LACK, key); } List> rows = result.getResult(); for (Map row : rows) { diff --git a/src/main/java/com/actiontech/dble/meta/table/old/TableMetaCheckHandler.java b/src/main/java/com/actiontech/dble/meta/table/old/TableMetaCheckHandler.java index 10a6a7d818..9af26c704d 100644 --- a/src/main/java/com/actiontech/dble/meta/table/old/TableMetaCheckHandler.java +++ b/src/main/java/com/actiontech/dble/meta/table/old/TableMetaCheckHandler.java @@ -37,9 +37,9 @@ protected void handlerTable(StructureMeta.TableMeta tableMeta) { LOGGER.warn(errorMsg); AlertUtil.alertSelf(AlarmCode.TABLE_NOT_CONSISTENT_IN_MEMORY, Alert.AlertLevel.WARN, errorMsg, AlertUtil.genSingleLabel("TABLE", tableId)); ToResolveContainer.TABLE_NOT_CONSISTENT_IN_MEMORY.add(tableId); - } else if (ToResolveContainer.TABLE_NOT_CONSISTENT_IN_MEMORY.contains(tableId) && - AlertUtil.alertSelfResolve(AlarmCode.TABLE_NOT_CONSISTENT_IN_MEMORY, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableId))) { - ToResolveContainer.TABLE_NOT_CONSISTENT_IN_MEMORY.remove(tableId); + } else if (ToResolveContainer.TABLE_NOT_CONSISTENT_IN_MEMORY.contains(tableId)) { + AlertUtil.alertSelfResolve(AlarmCode.TABLE_NOT_CONSISTENT_IN_MEMORY, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableId), + ToResolveContainer.TABLE_NOT_CONSISTENT_IN_MEMORY, tableId); } LOGGER.debug("checking table Table [" + tableMeta.getTableName() + "]"); } diff --git a/src/main/java/com/actiontech/dble/server/status/AlertManager.java b/src/main/java/com/actiontech/dble/server/status/AlertManager.java new file mode 100644 index 0000000000..be253b610c --- /dev/null +++ b/src/main/java/com/actiontech/dble/server/status/AlertManager.java @@ -0,0 +1,33 @@ +package com.actiontech.dble.server.status; + +import com.actiontech.dble.alarm.AlertBlockQueue; +import com.actiontech.dble.alarm.AlertSender; +import com.actiontech.dble.alarm.AlertTask; +import com.actiontech.dble.util.ExecutorUtil; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; + +/** + * Created by szf on 2019/3/25. + */ +public class AlertManager { + private BlockingQueue alertQueue = new AlertBlockQueue<>(1024); + private static final AlertManager INSTANCE = new AlertManager(); + private ExecutorService alertSenderExecutor; + + public static AlertManager getInstance() { + return INSTANCE; + } + + public BlockingQueue getAlertQueue() { + return alertQueue; + } + + public void startAlert() { + alertSenderExecutor = ExecutorUtil.createCached("alertSenderExecutor", 1); + alertSenderExecutor.execute(new AlertSender(alertQueue)); + } + + +} diff --git a/src/main/java/com/actiontech/dble/server/util/GlobalTableUtil.java b/src/main/java/com/actiontech/dble/server/util/GlobalTableUtil.java index f29f9900a6..9d745af235 100644 --- a/src/main/java/com/actiontech/dble/server/util/GlobalTableUtil.java +++ b/src/main/java/com/actiontech/dble/server/util/GlobalTableUtil.java @@ -81,9 +81,9 @@ public static boolean isInnerColExist(SchemaUtil.SchemaInfo schemaInfo, Structur for (int i = 0; i < orgTbMeta.getColumnsList().size(); i++) { String column = orgTbMeta.getColumnsList().get(i).getName(); if (column.equalsIgnoreCase(GLOBAL_TABLE_CHECK_COLUMN)) { - if (ToResolveContainer.GLOBAL_TABLE_COLUMN_LOST.contains(tableId) && - AlertUtil.alertSelfResolve(AlarmCode.GLOBAL_TABLE_COLUMN_LOST, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableId))) { - ToResolveContainer.GLOBAL_TABLE_COLUMN_LOST.remove(tableId); + if (ToResolveContainer.GLOBAL_TABLE_COLUMN_LOST.contains(tableId)) { + AlertUtil.alertSelfResolve(AlarmCode.GLOBAL_TABLE_COLUMN_LOST, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableId), + ToResolveContainer.GLOBAL_TABLE_COLUMN_LOST, tableId); } return true; } @@ -220,9 +220,9 @@ public static List>> finished(List