Permalink
Browse files

SnapshotSave and Restore works in all configurations

+ For both partitioned and replicated tables
+ Separated the test suite into 4 tests
  • Loading branch information...
jarulraj committed Jan 17, 2014
1 parent 856993f commit 23cfb0217cfe243cb7e4052c960cc5952265ba97
View
@@ -37,21 +37,21 @@ log4j.logger.edu.brown.utils=INFO
## VoltDB Stuff
log4j.logger.org.voltdb.VoltProcedure=INFO
log4j.logger.org.voltdb.VoltSystemProcedure=TRACE
log4j.logger.org.voltdb.VoltSystemProcedure=INFO
log4j.logger.org.voltdb.client=INFO
log4j.logger.org.voltdb.compiler=INFO
log4j.logger.org.voltdb.planner=INFO
log4j.logger.org.voltdb.sysprocs=INFO
log4j.logger.org.voltdb.sysprocs.LoadMultipartitionTable=TRACE
log4j.logger.org.voltdb.sysprocs.LoadMultipartitionTable=INFO
log4j.logger.org.voltdb.sysprocs.SnapshotSave=TRACE
log4j.logger.org.voltdb.SnapshotSaveAPI=TRACE
log4j.logger.org.voltdb.SnapshotSiteProcessor=TRACE
log4j.logger.org.voltdb.DefaultSnapshotDataTarget=TRACE
log4j.logger.org.voltdb.sysprocs.SnapshotDelete=TRACE
log4j.logger.org.voltdb.sysprocs.SnapshotRestore=TRACE
log4j.logger.org.voltdb.sysprocs.SnapshotRestoreLocal=TRACE
#log4j.logger.org.voltdb.sysprocs.saverestore.PartitionedTableSaveFileState=TRACE
#log4j.logger.org.voltdb.sysprocs.saverestore.ReplicatedTableSaveFileState=TRACE
log4j.logger.org.voltdb.sysprocs.saverestore.PartitionedTableSaveFileState=TRACE
log4j.logger.org.voltdb.sysprocs.saverestore.ReplicatedTableSaveFileState=TRACE
## Research Projects
@@ -1209,7 +1209,7 @@ bool VoltDBEngine::activateTableStream(const CatalogId tableId, TableStreamType
// can not be re-activated for cow mode.
if (m_snapshottingTables.find(tableId) != m_snapshottingTables.end()) {
assert(false);
return false;
return true;
}
table->incrementRefcount();
@@ -269,6 +269,10 @@ public void close() throws IOException, InterruptedException {
public int getHeaderSize() {
return 4;
}
public String toString(){
return "File :"+m_file.getAbsolutePath();
}
private Future<?> write(final BBContainer tupleData, final boolean prependLength) {
if (m_writeFailed) {
@@ -64,54 +64,63 @@
public VoltTable startSnapshotting(String file_path, String file_nonce, byte block,
long startTime, SystemProcedureExecutionContext context, String hostname)
{
LOG.trace("Creating snapshot target and handing to EEs");
//LOG.trace("Creating snapshot target and handing to EEs");
final VoltTable result = SnapshotSave.constructNodeResultsTable();
// One site wins the race to create the snapshot targets, populating
// m_taskListsForSites for the other sites and creating an appropriate
// number of snapshot permits
if (SnapshotSiteProcessor.m_snapshotCreateSetupPermit.tryAcquire()) {
createSetup(file_path, file_nonce, startTime, context, hostname, result);
}
LOG.trace("Stage 0 : at partition : "+context.getPartitionExecutor().getPartitionId());
// Each partition does this to accumulate tasks for it
createSetup(file_path, file_nonce, startTime, context, hostname, result);
//LOG.trace("Stage 0 : at partition : "+context.getPartitionExecutor().getPartitionId());
// All sites wait for a permit to start their individual snapshot tasks
VoltTable error = acquireSnapshotPermit(context, hostname, result);
if (error != null) {
LOG.trace("Stage 0->1 : at partition :"+context.getPartitionExecutor().getPartitionId()+ " error :"+error);
return error;
}
LOG.trace("Stage 1 : at partition : "+context.getPartitionExecutor().getPartitionId());
Site site = context.getSite();
CatalogMap<Partition> partition_map = site.getPartitions();
Integer lowest_partition_id = Integer.MAX_VALUE, p_id;
for (Partition pt : partition_map) {
p_id = pt.getId();
lowest_partition_id = Math.min(p_id, lowest_partition_id);
}
assert (lowest_partition_id != Integer.MAX_VALUE);
int partition_id = context.getPartitionExecutor().getPartitionId();
LOG.trace("Stage 1 : at partition : "+partition_id);
synchronized (SnapshotSiteProcessor.m_taskListsForSites) {
final Deque<SnapshotTableTask> m_taskList = SnapshotSiteProcessor.m_taskListsForSites.poll();
// CHANGE :: Fetch work for this partition
int index = partition_id - lowest_partition_id;
final Deque<SnapshotTableTask> m_taskList = SnapshotSiteProcessor.m_taskListsForSites.get(index);
if (m_taskList == null) {
LOG.trace("tasklist null");
return result;
} else {
if (SnapshotSiteProcessor.m_taskListsForSites.isEmpty()) {
assert(SnapshotSiteProcessor.m_snapshotCreateSetupPermit.availablePermits() == 1);
//assert(SnapshotSiteProcessor.m_snapshotCreateSetupPermit.availablePermits() == 1);
assert(SnapshotSiteProcessor.m_snapshotPermits.availablePermits() == 0);
}
LOG.trace("m_taskListsforSites not empty ");
LOG.trace("ExecutionSitesCurrentlySnapshotting :"+SnapshotSiteProcessor.ExecutionSitesCurrentlySnapshotting.get());
// CHANGE :: Don't use
LOG.trace("ExecutionSitesCurrentlySnapshotting :"+SnapshotSiteProcessor.ExecutionSitesCurrentlySnapshotting.get());
assert(SnapshotSiteProcessor.ExecutionSitesCurrentlySnapshotting.get() > 0);
context.getPartitionExecutor().initiateSnapshots(m_taskList);
}
}
LOG.trace("Stage 2 : at partition : "+context.getPartitionExecutor().getPartitionId());
//LOG.trace("Stage 2 : at partition : "+context.getPartitionExecutor().getPartitionId());
if (block != 0) {
Collection<Exception> failures = null;
String status = "SUCCESS";
String err = "";
try {
failures = context.getPartitionExecutor().completeSnapshotWork();
} catch (InterruptedException e) {
status = "FAILURE";
err = e.toString();
@@ -152,25 +161,12 @@ public VoltTable startSnapshotting(String file_path, String file_nonce, byte blo
private void createSetup(String file_path, String file_nonce,
long startTime, SystemProcedureExecutionContext context,
String hostname, final VoltTable result) {
{
// CHANGE
final int numLocalSites = CatalogUtil.getNumberOfSites(context.getHost());
Host catalog_host = context.getHost();
Site catalog_site = CollectionUtil.first(CatalogUtil.getSitesForHost(catalog_host));
Integer lowest_site_id = catalog_site.getId();
CatalogMap<Partition> partition_map = catalog_site.getPartitions();
Integer lowest_partition_id = Integer.MAX_VALUE, p_id;
{
Site site = context.getSite();
int numLocalPartitions = site.getPartitions().size();
LOG.trace("createSetup at : partition "+context.getPartitionExecutor().getPartitionId());
for(Partition pt : partition_map){
p_id = pt.getId();
lowest_partition_id = Math.min(p_id, lowest_partition_id);
}
assert(lowest_partition_id != Integer.MAX_VALUE);
LOG.info("Local Sites :"+numLocalSites);
/*
* Used to close targets on failure
*/
@@ -181,22 +177,21 @@ private void createSetup(String file_path, String file_nonce,
final ArrayList<SnapshotTableTask> replicatedSnapshotTasks =
new ArrayList<SnapshotTableTask>();
LOG.trace("ExecutionSitesCurrentlySnapshotting check : " + SnapshotSiteProcessor.ExecutionSitesCurrentlySnapshotting.get());
LOG.trace("ExecutionSitesCurrentlySnapshotting initial check : " + SnapshotSiteProcessor.ExecutionSitesCurrentlySnapshotting.get());
assert(SnapshotSiteProcessor.ExecutionSitesCurrentlySnapshotting.get() == -1);
final List<Table> tables = SnapshotUtil.getTablesToSave(context.getDatabase());
// CHANGE :: Do it only at partition with lowest id on site with lowest id
if (context.getPartitionExecutor().getSiteId() == lowest_site_id && context.getPartitionExecutor().getPartitionId() == lowest_partition_id) {
SnapshotUtil.recordSnapshotTableList(
startTime,
file_path,
file_nonce,
tables);
synchronized (SnapshotSiteProcessor.m_digestWritten) {
if (SnapshotSiteProcessor.m_digestWritten.get() == false) {
SnapshotSiteProcessor.m_digestWritten.set(true);
SnapshotUtil.recordSnapshotTableList(startTime, file_path, file_nonce, tables);
LOG.trace("Digest written at partition " + context.getPartitionExecutor().getPartitionId());
}
}
final AtomicInteger numTables = new AtomicInteger(tables.size());
LOG.info("NumTables Initial : "+numTables);
//LOG.info("NumTables Initial : "+numTables);
final SnapshotRegistry.Snapshot snapshotRecord =
SnapshotRegistry.startSnapshot(
@@ -207,6 +202,7 @@ private void createSetup(String file_path, String file_nonce,
file_path,
file_nonce,
tables.toArray(new Table[0]));
for (final Table table : SnapshotUtil.getTablesToSave(context.getDatabase()))
{
String canSnapshot = "SUCCESS";
@@ -225,7 +221,7 @@ private void createSetup(String file_path, String file_nonce,
saveFilePath,
table,
context.getSite().getHost(),
CatalogUtil.getNumberOfPartitions(context.getCluster()),
numLocalPartitions,
startTime);
targets.add(sdt);
final SnapshotDataTarget sdtFinal = sdt;
@@ -262,7 +258,7 @@ public void run() {
final SnapshotTableTask task =
new SnapshotTableTask(
table.getRelativeIndex(),
sdt,
sdt,
table.getIsreplicated(),
table.getTypeName());
@@ -306,11 +302,14 @@ public void run() {
synchronized (SnapshotSiteProcessor.m_taskListsForSites) {
if (!partitionedSnapshotTasks.isEmpty() || !replicatedSnapshotTasks.isEmpty()) {
SnapshotSiteProcessor.ExecutionSitesCurrentlySnapshotting.set(2);
// Each site runs on a separate JVM - so can't synchronize using this
//CatalogUtil.getNumberOfSites(context.getHost()));
LOG.trace("ExecutionSitesCurrentlySnapshotting set :"+SnapshotSiteProcessor.ExecutionSitesCurrentlySnapshotting.get());
for (int ii = 0; ii < numLocalSites; ii++) {
// Used to sync across all partitions on all sites - set only once
if(SnapshotSiteProcessor.ExecutionSitesCurrentlySnapshotting.get() == -1){
SnapshotSiteProcessor.ExecutionSitesCurrentlySnapshotting.set(numLocalPartitions);
LOG.trace("ExecutionSitesCurrentlySnapshotting set :" + SnapshotSiteProcessor.ExecutionSitesCurrentlySnapshotting.get());
}
for (int ii = 0; ii < numLocalPartitions; ii++) {
SnapshotSiteProcessor.m_taskListsForSites.add(new ArrayDeque<SnapshotTableTask>());
}
} else {
@@ -321,9 +320,20 @@ public void run() {
* Distribute the writing of replicated tables to exactly one partition.
*/
// CHANGE :: Assign replicated table work to single partition with lowest id
CatalogMap<Partition> partition_map = site.getPartitions();
Integer lowest_partition_id = Integer.MAX_VALUE, p_id;
for (Partition pt : partition_map) {
p_id = pt.getId();
lowest_partition_id = Math.min(p_id, lowest_partition_id);
}
assert (lowest_partition_id != Integer.MAX_VALUE);
int partition_id = context.getPartitionExecutor().getPartitionId();
int index = partition_id - lowest_partition_id;
// CHANGE :: Each partition gets a task
for (SnapshotTableTask t : partitionedSnapshotTasks) {
SnapshotSiteProcessor.m_taskListsForSites.get(lowest_site_id).offer(t);
SnapshotSiteProcessor.m_taskListsForSites.get(index).offer(t);
}
//for (int ii = 0; ii < numLocalSites && !partitionedSnapshotTasks.isEmpty(); ii++) {
@@ -333,7 +343,7 @@ public void run() {
//int siteIndex = 0;
for (SnapshotTableTask t : replicatedSnapshotTasks) {
//SnapshotSiteProcessor.m_taskListsForSites.get(siteIndex++ % numLocalSites).offer(t);
SnapshotSiteProcessor.m_taskListsForSites.get(lowest_site_id).offer(t);
SnapshotSiteProcessor.m_taskListsForSites.get(index).offer(t);
}
}
@@ -365,7 +375,8 @@ public void run() {
"RESULTED IN Exception: \n" + sw.toString());
LOG.error(result);
} finally {
SnapshotSiteProcessor.m_snapshotPermits.release(numLocalSites);
SnapshotSiteProcessor.m_snapshotPermits.release(numLocalPartitions);
LOG.trace("Released "+ numLocalPartitions + " snapshot permits at partition "+ context.getPartitionExecutor().getPartitionId());
}
}
}
@@ -389,12 +400,14 @@ private VoltTable acquireSnapshotPermit(SystemProcedureExecutionContext context,
* to release the setup permit to ensure that a thread
* doesn't come late and think it is supposed to do the setup work
*/
/*
synchronized (SnapshotSiteProcessor.m_snapshotPermits) {
if (SnapshotSiteProcessor.m_snapshotPermits.availablePermits() == 0 &&
SnapshotSiteProcessor.m_snapshotCreateSetupPermit.availablePermits() == 0) {
SnapshotSiteProcessor.m_snapshotCreateSetupPermit.release();
}
}
*/
}
return null;
}
Oops, something went wrong.

0 comments on commit 23cfb02

Please sign in to comment.