Skip to content
Permalink
Browse files
Allow diff tool to run in-jvm multiple times, and resolve some confli…
…cting shading

closes #4
  • Loading branch information
JeetKunDoug authored and krummas committed Jan 14, 2020
1 parent 9c55826 commit b933540469dfccc48e2c35a93cd6807784feb16b
Showing 3 changed files with 31 additions and 8 deletions.
@@ -149,7 +149,10 @@ public void run(JobConfiguration configuration, JavaSparkContext sc) {
throw new RuntimeException("Diff job failed", e);
} finally {
if (sc.isLocal())
{
Differ.shutdown();
JobMetadataDb.ProgressTracker.resetStatements();
}
}
}

@@ -307,14 +307,17 @@ public static void shutdown()
if (srcDiffCluster != null) {
srcDiffCluster.stop();
srcDiffCluster.close();
srcDiffCluster = null;
}
if (targetDiffCluster != null) {
targetDiffCluster.stop();
targetDiffCluster.close();
targetDiffCluster = null;
}
if (journalSession != null) {
journalSession.close();
journalSession.getCluster().close();
journalSession = null;
}
COMPARISON_EXECUTOR.shutdown();
}
@@ -129,6 +129,15 @@ public static void initializeStatements(Session session, String keyspace) {

}

public static void resetStatements()
{
updateStmt = null;
mismatchStmt = null;
errorSummaryStmt = null;
errorDetailStmt = null;
updateCompleteStmt = null;
}

/**
*
* @param table
@@ -400,15 +409,23 @@ public void finalizeJob(UUID jobId, Map<String, RangeStats> results) {


public void markNotRunning(UUID jobId) {
logger.info("Marking job {} as not running", jobId);

ResultSet rs = session.execute(String.format("DELETE FROM %s.%s WHERE job_id = ? IF EXISTS",
keyspace, Schema.RUNNING_JOBS),
jobId);
if (!rs.one().getBool("[applied]")) {
logger.warn("Non-fatal: Unable to mark job %s as not running, check logs for errors " +
"during initialization as there may be no entry for this job in the {} table",
try
{
logger.info("Marking job {} as not running", jobId);

ResultSet rs = session.execute(String.format("DELETE FROM %s.%s WHERE job_id = ? IF EXISTS",
keyspace, Schema.RUNNING_JOBS),
jobId);
if (!rs.one().getBool("[applied]"))
{
logger.warn("Non-fatal: Unable to mark job %s as not running, check logs for errors " +
"during initialization as there may be no entry for this job in the {} table",
jobId, Schema.RUNNING_JOBS);
}
} catch (Exception e) {
// Because this is called from another exception handler, we don't want to lose the original exception
// just because we may not have been able to mark the job as not running. Just log here
logger.error("Could not mark job {} as not running.", e);
}
}
}

0 comments on commit b933540

Please sign in to comment.