Permalink
Browse files

Add a thread safe COWMap to allow safe publication of local sites in …

…RealVoltDB

This is to fix a bug found by Steve where MailboxTracker updates before the local site map is fully constructed and gets a concurrent modification exception. Also republish the site tracker to each execution site after the execution sites have all been created to ensure there are no lost updates for added sites.
  • Loading branch information...
1 parent 0d2603f commit 0c9a39811a94494ba0a0e641be39e092c36cc296 @aweisberg aweisberg committed May 4, 2012
View
Binary file not shown.
@@ -0,0 +1,117 @@
+/* This file is part of VoltDB.
+ * Copyright (C) 2008-2012 VoltDB Inc.
+ *
+ * VoltDB is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * VoltDB is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with VoltDB. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.voltcore.utils;
+
+import java.util.Map;
+
+import com.google.common.collect.ForwardingMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Key set, value set, and entry set are all immutable as are their iterators.
+ * Otherwise behaves as you would expect.
+ */
+public class COWMap<K, V> extends ForwardingMap<K, V> implements Map<K, V> {
+ private final AtomicReference<ImmutableMap<K, V>> m_map;
+
+ public COWMap() {
+ m_map = new AtomicReference<ImmutableMap<K, V>>(new Builder<K, V>().build());
+ }
+
+ public COWMap(Map<K, V> map) {
+ if (map == null) {
+ throw new IllegalArgumentException("Wrapped map cannot be null");
+ }
+ m_map = new AtomicReference<ImmutableMap<K, V>>(new Builder<K, V>().putAll(map).build());
+ }
+
+ @Override
+ public V put(K key, V value) {
+ while (true) {
+ ImmutableMap<K, V> original = m_map.get();
+ Builder<K, V> builder = new Builder<K, V>();
+ V oldValue = null;
+ boolean replaced = false;
+ for (Map.Entry<K, V> entry : original.entrySet()) {
+ if (entry.getKey().equals(key)) {
+ oldValue = entry.getValue();
+ builder.put(key, value);
+ replaced = true;
+ } else {
+ builder.put(entry);
+ }
+ }
+ if (!replaced) {
+ builder.put(key, value);
+ }
+ ImmutableMap<K, V> copy = builder.build();
+ if (m_map.compareAndSet(original, copy)) {
+ return oldValue;
+ }
+ }
+ }
+
+ @Override
+ public V remove(Object key) {
+ while (true) {
+ ImmutableMap<K, V> original = m_map.get();
+ Builder<K, V> builder = new Builder<K, V>();
+ V oldValue = null;
+ for (Map.Entry<K, V> entry : original.entrySet()) {
+ if (entry.getKey().equals(key)) {
+ oldValue = entry.getValue();
+ } else {
+ builder.put(entry);
+ }
+ }
+ ImmutableMap<K, V> copy = builder.build();
+ if (m_map.compareAndSet(original,copy)) {
+ return oldValue;
+ }
+ }
+ }
+
+ @Override
+ public void putAll(Map<? extends K, ? extends V> m) {
+ while (true) {
+ ImmutableMap<K, V> original = m_map.get();
+ Builder<K, V> builder = new Builder<K, V>();
+ for (Map.Entry<K, V> entry : original.entrySet()) {
+ if (!m.containsKey(entry.getKey())) {
+ builder.put(entry);
+ }
+ }
+ builder.putAll(m);
+ ImmutableMap<K, V> copy = builder.build();
+ if (m_map.compareAndSet(original, copy)) {
+ return;
+ }
+ }
+ }
+
+ @Override
+ public void clear() {
+ m_map.set(new Builder<K, V>().build());
+ }
+
+ @Override
+ protected Map<K, V> delegate() {
+ return m_map.get();
+ }
+}
@@ -44,6 +44,7 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
@@ -94,6 +95,7 @@
import org.voltdb.fault.VoltFault.FaultType;
import org.voltdb.licensetool.LicenseApi;
import org.voltdb.messaging.VoltDbMessageFactory;
+import org.voltcore.utils.COWMap;
import org.voltdb.utils.CatalogUtil;
import org.voltdb.utils.HTTPAdminListener;
import org.voltdb.utils.LogKeys;
@@ -281,7 +283,7 @@ public void initialize(VoltDB.Configuration config) {
m_replicationActive = false;
// set up site structure
- m_localSites = new HashMap<Long, ExecutionSite>();
+ m_localSites = new COWMap<Long, ExecutionSite>();
m_siteThreads = new HashMap<Long, Thread>();
m_runners = new ArrayList<ExecutionSiteRunner>();
@@ -537,6 +539,32 @@ public synchronized Thread newThread(Runnable r) {
m_localSites.put(runner.m_siteId, runner.m_siteObj);
}
+ /*
+ * At this point all of the execution sites have been published to m_localSites
+ * It is possible that while they were being created the mailbox tracker found additional
+ * sites, but was unable to deliver the notification to some or all of the execution sites.
+ * Since notifying them of new sites is idempotent (version number check), let's do that here so there
+ * are no lost updates for additional sites. But... it must be done from the
+ * mailbox tracker thread or there is a race with failure detection and handling.
+ * Generally speaking it seems like retrieving a reference to a site tracker not via a message
+ * from the mailbox tracker thread that builds the site tracker is bug. If it isn't delivered to you by
+ * a site tracker then you lose sequential consistency.
+ */
+ try {
+ m_mailboxTracker.executeTask(new Runnable() {
+ @Override
+ public void run() {
+ for (ExecutionSite es : m_localSites.values()) {
+ es.notifySitesAdded(m_siteTracker);
+ }
+ }
+ }).get();
+ } catch (InterruptedException e) {
+ VoltDB.crashLocalVoltDB(e.getMessage(), true, e);
+ } catch (ExecutionException e) {
+ VoltDB.crashLocalVoltDB(e.getMessage(), true, e);
+ }
+
// Create the client interface
int portOffset = 0;
// TODO: fix
@@ -47,11 +47,16 @@
import org.voltdb.VoltZK;
import org.voltdb.VoltZK.MailboxType;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
public class MailboxTracker {
private final ZooKeeper m_zk;
private final MailboxUpdateHandler m_handler;
- private final ExecutorService m_es =
- Executors.newSingleThreadExecutor(CoreUtils.getThreadFactory("Mailbox tracker", 1024 * 128));
+ private final ListeningExecutorService m_es =
+ MoreExecutors.listeningDecorator(
+ Executors.newSingleThreadExecutor(CoreUtils.getThreadFactory("Mailbox tracker", 1024 * 128)));
private byte m_lastChecksum[] = null;
@@ -213,4 +218,12 @@ private void readContents(JSONObject obj, Map<MailboxType, List<MailboxNodeConte
}
}
}
+
+ /*
+ * Execute a task in the mailbox tracker thread. It is necessary to deliver a
+ * new version of the site tracker from this thread if you want to avoid lost updates.
+ */
+ public ListenableFuture<?> executeTask(Runnable r) {
+ return m_es.submit(r);
+ }
}

0 comments on commit 0c9a398

Please sign in to comment.