diff --git a/csharp/snippets/redo/RedoContinuousViaFutures/Program.cs b/csharp/snippets/redo/RedoContinuousViaFutures/Program.cs index 5d34686..2708c4f 100644 --- a/csharp/snippets/redo/RedoContinuousViaFutures/Program.cs +++ b/csharp/snippets/redo/RedoContinuousViaFutures/Program.cs @@ -73,15 +73,10 @@ TaskScheduler taskScheduler { // loop through the example records and queue them up so long // as we have more records and backlog is not too large - while (pendingFutures.Count < MaximumBacklog) + for (string redo = engine.GetRedoRecord(); + redo != null; + redo = engine.GetRedoRecord()) { - - // get the next redo record - string redo = engine.GetRedoRecord(); - - // check if no redo records are available - if (redo == null) break; - Task task = factory.StartNew(() => { engine.ProcessRedoRecord(redo, SzNoFlags); @@ -92,28 +87,32 @@ TaskScheduler taskScheduler // add the future to the pending future list pendingFutures.Add((task, redo)); - } - - do - { - // handle any pending futures WITHOUT blocking to reduce the backlog - HandlePendingFutures(pendingFutures, false); - // if we still have exceeded the backlog size then pause - // briefly before trying again - if (pendingFutures.Count >= MaximumBacklog) + // handle the pending futures as log as maximum backlog exceeded + for (int loop = 0; + pendingFutures.Count >= MaximumBacklog; + loop++) { - try + // check if this is NOT our first iteration through the loop + if (loop > 0) { - Thread.Sleep(HandlePauseTimeout); - - } - catch (ThreadInterruptedException) - { - // do nothing + // if we still have exceeded the backlog size after the first + // loop iteration then pause briefly before trying again + try + { + Thread.Sleep(HandlePauseTimeout); + + } + catch (ThreadInterruptedException) + { + // do nothing + } } + + // handle any pending futures WITHOUT blocking to reduce the backlog + HandlePendingFutures(pendingFutures, false); } - } while (pendingFutures.Count >= MaximumBacklog); + } // check if there are no redo records right now // NOTE: we do NOT want to call countRedoRecords() in a loop that diff --git a/java/snippets/deleting/DeleteViaFutures.java b/java/snippets/deleting/DeleteViaFutures.java index f8a9706..c357dc1 100644 --- a/java/snippets/deleting/DeleteViaFutures.java +++ b/java/snippets/deleting/DeleteViaFutures.java @@ -13,276 +13,276 @@ * Provides a simple example of deleting records from the Senzing repository. */ public class DeleteViaFutures { - public static void main(String[] args) { - // get the senzing repository settings - String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); - if (settings == null) { - System.err.println("Unable to get settings."); - throw new IllegalArgumentException("Unable to get settings"); - } - - // create a descriptive instance name (can be anything) - String instanceName = DeleteViaFutures.class.getSimpleName(); - - // initialize the Senzing environment - SzEnvironment env = SzCoreEnvironment.newBuilder() - .settings(settings) - .instanceName(instanceName) - .verboseLogging(false) - .build(); - - String filePath = (args.length > 0) ? args[0] : DEFAULT_FILE_PATH; - - // create the thread pool and executor service - ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); - - // keep track of pending futures and don't backlog too many for memory's sake - Map, Record> pendingFutures = new IdentityHashMap<>(); - - try (FileInputStream fis = new FileInputStream(filePath); - InputStreamReader isr = new InputStreamReader(fis, UTF_8); - BufferedReader br = new BufferedReader(isr)) { - // get the engine from the environment - SzEngine engine = env.getEngine(); - - int lineNumber = 0; - boolean eof = false; - - while (!eof) { - // loop through the example records and queue them up so long - // as we have more records and backlog is not too large - while (pendingFutures.size() < MAXIMUM_BACKLOG) { - // read the next line - String line = br.readLine(); - lineNumber++; - - // check for EOF - if (line == null) { - eof = true; - break; - } - - // trim the line - line = line.trim(); - - // skip any blank lines - if (line.length() == 0) { - continue; - } - - // skip any commented lines - if (line.startsWith("#")) { - continue; - } - - // construct the Record instance - Record record = new Record(lineNumber, line); - - try { - // parse the line as a JSON object - JsonObject recordJson = Json.createReader(new StringReader(line)).readObject(); - - // extract the data source code and record ID - String dataSourceCode = recordJson.getString(DATA_SOURCE, null); - String recordId = recordJson.getString(RECORD_ID, null); - SzRecordKey recordKey = SzRecordKey.of(dataSourceCode, recordId); - - Future future = executor.submit(() -> { - // call the deleteRecord() function with no flags - engine.deleteRecord(recordKey, SZ_NO_FLAGS); - - return null; - }); - - // add the futures to the pending future list - pendingFutures.put(future, record); - - } catch (JsonException e) { - logFailedRecord(ERROR, e, lineNumber, line); - errorCount++; // increment the error count - } + public static void main(String[] args) { + // get the senzing repository settings + String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); + if (settings == null) { + System.err.println("Unable to get settings."); + throw new IllegalArgumentException("Unable to get settings"); } - do { - // handle any pending futures WITHOUT blocking to reduce the backlog - handlePendingFutures(pendingFutures, false); + // create a descriptive instance name (can be anything) + String instanceName = DeleteViaFutures.class.getSimpleName(); + + // initialize the Senzing environment + SzEnvironment env = SzCoreEnvironment.newBuilder() + .settings(settings) + .instanceName(instanceName) + .verboseLogging(false) + .build(); + + String filePath = (args.length > 0) ? args[0] : DEFAULT_FILE_PATH; + + // create the thread pool and executor service + ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); + + // keep track of pending futures and don't backlog too many for memory's sake + Map, Record> pendingFutures = new IdentityHashMap<>(); + + try (FileInputStream fis = new FileInputStream(filePath); + InputStreamReader isr = new InputStreamReader(fis, UTF_8); + BufferedReader br = new BufferedReader(isr)) { + // get the engine from the environment + SzEngine engine = env.getEngine(); + + int lineNumber = 0; + boolean eof = false; + + while (!eof) { + // loop through the example records and queue them up so long + // as we have more records and backlog is not too large + while (pendingFutures.size() < MAXIMUM_BACKLOG) { + // read the next line + String line = br.readLine(); + lineNumber++; + + // check for EOF + if (line == null) { + eof = true; + break; + } + + // trim the line + line = line.trim(); + + // skip any blank lines + if (line.length() == 0) { + continue; + } + + // skip any commented lines + if (line.startsWith("#")) { + continue; + } + + // construct the Record instance + Record record = new Record(lineNumber, line); + + try { + // parse the line as a JSON object + JsonObject recordJson = Json.createReader(new StringReader(line)).readObject(); + + // extract the data source code and record ID + String dataSourceCode = recordJson.getString(DATA_SOURCE, null); + String recordId = recordJson.getString(RECORD_ID, null); + SzRecordKey recordKey = SzRecordKey.of(dataSourceCode, recordId); + + Future future = executor.submit(() -> { + // call the deleteRecord() function with no flags + engine.deleteRecord(recordKey, SZ_NO_FLAGS); + + return null; + }); + + // add the futures to the pending future list + pendingFutures.put(future, record); + + } catch (JsonException e) { + logFailedRecord(ERROR, e, lineNumber, line); + errorCount++; // increment the error count + } + } + + do { + // handle any pending futures WITHOUT blocking to reduce the backlog + handlePendingFutures(pendingFutures, false); + + // if we still have exceeded the backlog size then pause + // briefly before trying again + if (pendingFutures.size() >= MAXIMUM_BACKLOG) { + try { + Thread.sleep(PAUSE_TIMEOUT); + + } catch (InterruptedException ignore) { + // do nothing + } + } + } while (pendingFutures.size() >= MAXIMUM_BACKLOG); + } - // if we still have exceeded the backlog size then pause - // briefly before trying again - if (pendingFutures.size() >= MAXIMUM_BACKLOG) { - try { - Thread.sleep(PAUSE_TIMEOUT); + // shutdown the executor service + executor.shutdown(); + + // after we have submitted all records we need to handle the remaining + // pending futures so this time we block on each future + handlePendingFutures(pendingFutures, true); - } catch (InterruptedException ignore) { - // do nothing + } catch (Exception e) { + System.err.println(); + System.err.println("*** Terminated due to critical error ***"); + System.err.flush(); + if (e instanceof RuntimeException) { + throw ((RuntimeException) e); } - } - } while (pendingFutures.size() >= MAXIMUM_BACKLOG); - } - - // shutdown the executor service - executor.shutdown(); - - // after we have submitted all records we need to handle the remaining - // pending futures so this time we block on each future - handlePendingFutures(pendingFutures, true); - - } catch (Exception e) { - System.err.println(); - System.err.println("*** Terminated due to critical error ***"); - System.err.flush(); - if (e instanceof RuntimeException) { - throw ((RuntimeException) e); - } - throw new RuntimeException(e); - - } finally { - // check if executor service is shutdown - if (!executor.isShutdown()) { - executor.shutdown(); - } - - // IMPORTANT: make sure to destroy the environment - env.destroy(); - - System.out.println(); - System.out.println("Successful delete operations : " + successCount); - System.out.println("Failed delete operations : " + errorCount); - - // check on any retry records - if (retryWriter != null) { - retryWriter.flush(); - retryWriter.close(); - } - if (retryCount > 0) { - System.out.println(retryCount + " deletions to be retried in " + retryFile); - } - System.out.flush(); + throw new RuntimeException(e); - } + } finally { + // check if executor service is shutdown + if (!executor.isShutdown()) { + executor.shutdown(); + } + + // IMPORTANT: make sure to destroy the environment + env.destroy(); + + System.out.println(); + System.out.println("Successful delete operations : " + successCount); + System.out.println("Failed delete operations : " + errorCount); + + // check on any retry records + if (retryWriter != null) { + retryWriter.flush(); + retryWriter.close(); + } + if (retryCount > 0) { + System.out.println(retryCount + " deletions to be retried in " + retryFile); + } + System.out.flush(); - } - - private static void handlePendingFutures(Map, Record> pendingFutures, boolean blocking) - throws Exception { - // check for completed futures - Iterator, Record>> iter = pendingFutures.entrySet().iterator(); - - // loop through the pending futures - while (iter.hasNext()) { - // get the next pending future - Map.Entry, Record> entry = iter.next(); - Future future = entry.getKey(); - Record record = entry.getValue(); - - // if not blocking and this one is not done then continue - if (!blocking && !future.isDone()) { - continue; - } - - // remove the pending future from the map - iter.remove(); - - try { - try { - // get the value to see if there was an exception - future.get(); - - // if we get here then increment the success count - successCount++; - - } catch (InterruptedException e) { - // this could only happen if blocking is true, just - // rethrow as retryable and log the interruption - throw e; - - } catch (ExecutionException e) { - // if execution failed with an exception then rethrow - Throwable cause = e.getCause(); - if ((cause == null) || !(cause instanceof Exception)) { - // rethrow the execution exception - throw e; - } - // cast to an Exception and rethrow - throw ((Exception) cause); } - } catch (SzBadInputException e) { - logFailedRecord(ERROR, e, record.lineNumber, record.line); - errorCount++; // increment the error count - - } catch (SzRetryableException | InterruptedException | CancellationException e) { - // handle thread interruption and cancellation as retries - logFailedRecord(WARNING, e, record.lineNumber, record.line); - errorCount++; // increment the error count - retryCount++; // increment the retry count - - // track the retry record so it can be retried later - if (retryFile == null) { - retryFile = File.createTempFile(RETRY_PREFIX, RETRY_SUFFIX); - retryWriter = new PrintWriter( - new OutputStreamWriter(new FileOutputStream(retryFile), UTF_8)); + } + + private static void handlePendingFutures(Map, Record> pendingFutures, boolean blocking) + throws Exception { + // check for completed futures + Iterator, Record>> iter = pendingFutures.entrySet().iterator(); + + // loop through the pending futures + while (iter.hasNext()) { + // get the next pending future + Map.Entry, Record> entry = iter.next(); + Future future = entry.getKey(); + Record record = entry.getValue(); + + // if not blocking and this one is not done then continue + if (!blocking && !future.isDone()) { + continue; + } + + // remove the pending future from the map + iter.remove(); + + try { + try { + // get the value to see if there was an exception + future.get(); + + // if we get here then increment the success count + successCount++; + + } catch (InterruptedException e) { + // this could only happen if blocking is true, just + // rethrow as retryable and log the interruption + throw e; + + } catch (ExecutionException e) { + // if execution failed with an exception then rethrow + Throwable cause = e.getCause(); + if ((cause == null) || !(cause instanceof Exception)) { + // rethrow the execution exception + throw e; + } + // cast to an Exception and rethrow + throw ((Exception) cause); + } + + } catch (SzBadInputException e) { + logFailedRecord(ERROR, e, record.lineNumber, record.line); + errorCount++; // increment the error count + + } catch (SzRetryableException | InterruptedException | CancellationException e) { + // handle thread interruption and cancellation as retries + logFailedRecord(WARNING, e, record.lineNumber, record.line); + errorCount++; // increment the error count + retryCount++; // increment the retry count + + // track the retry record so it can be retried later + if (retryFile == null) { + retryFile = File.createTempFile(RETRY_PREFIX, RETRY_SUFFIX); + retryWriter = new PrintWriter( + new OutputStreamWriter(new FileOutputStream(retryFile), UTF_8)); + } + retryWriter.println(record.line); + + } catch (Exception e) { + // catch any other exception (incl. SzException) here + logFailedRecord(CRITICAL, e, record.lineNumber, record.line); + errorCount++; + throw e; // rethrow since exception is critical + } } - retryWriter.println(record.line); - - } catch (Exception e) { - // catch any other exception (incl. SzException) here - logFailedRecord(CRITICAL, e, record.lineNumber, record.line); - errorCount++; - throw e; // rethrow since exception is critical - } } - } - - /** - * Example method for logging failed records. - * - * @param errorType The error type description. - * @param exception The exception itself. - * @param lineNumber The line number of the failed record in the JSON input - * file. - * @param recordJson The JSON text for the failed record. - */ - private static void logFailedRecord(String errorType, - Exception exception, - int lineNumber, - String recordJson) { - System.err.println(); - System.err.println( - "** " + errorType + " ** FAILED TO DELETE RECORD AT LINE " + lineNumber + ": "); - System.err.println(recordJson); - System.err.println(exception); - System.err.flush(); - } - - private static final String DEFAULT_FILE_PATH = "../resources/data/del-500.jsonl"; - - private static final String UTF_8 = "UTF-8"; - - private static final String RETRY_PREFIX = "retry-"; - private static final String RETRY_SUFFIX = ".jsonl"; - - private static final int THREAD_COUNT = 8; - - private static final int BACKLOG_FACTOR = 10; - - private static final int MAXIMUM_BACKLOG = THREAD_COUNT * BACKLOG_FACTOR; - - private static final long PAUSE_TIMEOUT = 100L; - - private static final String DATA_SOURCE = "DATA_SOURCE"; - private static final String RECORD_ID = "RECORD_ID"; - - private static final String ERROR = "ERROR"; - private static final String WARNING = "WARNING"; - private static final String CRITICAL = "CRITICAL"; - - public record Record(int lineNumber, String line) { - } - - private static int errorCount = 0; - private static int successCount = 0; - private static int retryCount = 0; - private static File retryFile = null; - private static PrintWriter retryWriter = null; + + /** + * Example method for logging failed records. + * + * @param errorType The error type description. + * @param exception The exception itself. + * @param lineNumber The line number of the failed record in the JSON input + * file. + * @param recordJson The JSON text for the failed record. + */ + private static void logFailedRecord(String errorType, + Exception exception, + int lineNumber, + String recordJson) { + System.err.println(); + System.err.println( + "** " + errorType + " ** FAILED TO DELETE RECORD AT LINE " + lineNumber + ": "); + System.err.println(recordJson); + System.err.println(exception); + System.err.flush(); + } + + private static final String DEFAULT_FILE_PATH = "../resources/data/del-500.jsonl"; + + private static final String UTF_8 = "UTF-8"; + + private static final String RETRY_PREFIX = "retry-"; + private static final String RETRY_SUFFIX = ".jsonl"; + + private static final int THREAD_COUNT = 8; + + private static final int BACKLOG_FACTOR = 10; + + private static final int MAXIMUM_BACKLOG = THREAD_COUNT * BACKLOG_FACTOR; + + private static final long PAUSE_TIMEOUT = 100L; + + private static final String DATA_SOURCE = "DATA_SOURCE"; + private static final String RECORD_ID = "RECORD_ID"; + + private static final String ERROR = "ERROR"; + private static final String WARNING = "WARNING"; + private static final String CRITICAL = "CRITICAL"; + + public record Record(int lineNumber, String line) { + } + + private static int errorCount = 0; + private static int successCount = 0; + private static int retryCount = 0; + private static File retryFile = null; + private static PrintWriter retryWriter = null; } diff --git a/java/snippets/deleting/DeleteWithInfoViaFutures.java b/java/snippets/deleting/DeleteWithInfoViaFutures.java index 45c385b..e0f88a2 100644 --- a/java/snippets/deleting/DeleteWithInfoViaFutures.java +++ b/java/snippets/deleting/DeleteWithInfoViaFutures.java @@ -13,318 +13,318 @@ * Provides a simple example of deleting records from the Senzing repository. */ public class DeleteWithInfoViaFutures { - public static void main(String[] args) { - // get the senzing repository settings - String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); - if (settings == null) { - System.err.println("Unable to get settings."); - throw new IllegalArgumentException("Unable to get settings"); - } - - // create a descriptive instance name (can be anything) - String instanceName = DeleteWithInfoViaFutures.class.getSimpleName(); - - // initialize the Senzing environment - SzEnvironment env = SzCoreEnvironment.newBuilder() - .settings(settings) - .instanceName(instanceName) - .verboseLogging(false) - .build(); - - String filePath = (args.length > 0) ? args[0] : DEFAULT_FILE_PATH; - - // create the thread pool and executor service - ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); - - // keep track of pending futures and don't backlog too many for memory's sake - Map, Record> pendingFutures = new IdentityHashMap<>(); - - try (FileInputStream fis = new FileInputStream(filePath); - InputStreamReader isr = new InputStreamReader(fis, UTF_8); - BufferedReader br = new BufferedReader(isr)) { - // get the engine from the environment - SzEngine engine = env.getEngine(); - - int lineNumber = 0; - boolean eof = false; - - while (!eof) { - // loop through the example records and queue them up so long - // as we have more records and backlog is not too large - while (pendingFutures.size() < MAXIMUM_BACKLOG) { - // read the next line - String line = br.readLine(); - lineNumber++; - - // check for EOF - if (line == null) { - eof = true; - break; - } - - // trim the line - line = line.trim(); - - // skip any blank lines - if (line.length() == 0) { - continue; - } - - // skip any commented lines - if (line.startsWith("#")) { - continue; - } - - // construct the Record instance - Record record = new Record(lineNumber, line); - - try { - // parse the line as a JSON object - JsonObject recordJson = Json.createReader(new StringReader(line)).readObject(); - - // extract the data source code and record ID - String dataSourceCode = recordJson.getString(DATA_SOURCE, null); - String recordId = recordJson.getString(RECORD_ID, null); - SzRecordKey recordKey = SzRecordKey.of(dataSourceCode, recordId); - - Future future = executor.submit(() -> { - // call the deleteRecord() function with info flags - return engine.deleteRecord(recordKey, SZ_WITH_INFO_FLAGS); - }); - - // add the futures to the pending future list - pendingFutures.put(future, record); - - } catch (JsonException e) { - logFailedRecord(ERROR, e, lineNumber, line); - errorCount++; // increment the error count - } + public static void main(String[] args) { + // get the senzing repository settings + String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); + if (settings == null) { + System.err.println("Unable to get settings."); + throw new IllegalArgumentException("Unable to get settings"); } - do { - // handle any pending futures WITHOUT blocking to reduce the backlog - handlePendingFutures(engine, pendingFutures, false); + // create a descriptive instance name (can be anything) + String instanceName = DeleteWithInfoViaFutures.class.getSimpleName(); + + // initialize the Senzing environment + SzEnvironment env = SzCoreEnvironment.newBuilder() + .settings(settings) + .instanceName(instanceName) + .verboseLogging(false) + .build(); + + String filePath = (args.length > 0) ? args[0] : DEFAULT_FILE_PATH; + + // create the thread pool and executor service + ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); + + // keep track of pending futures and don't backlog too many for memory's sake + Map, Record> pendingFutures = new IdentityHashMap<>(); + + try (FileInputStream fis = new FileInputStream(filePath); + InputStreamReader isr = new InputStreamReader(fis, UTF_8); + BufferedReader br = new BufferedReader(isr)) { + // get the engine from the environment + SzEngine engine = env.getEngine(); + + int lineNumber = 0; + boolean eof = false; + + while (!eof) { + // loop through the example records and queue them up so long + // as we have more records and backlog is not too large + while (pendingFutures.size() < MAXIMUM_BACKLOG) { + // read the next line + String line = br.readLine(); + lineNumber++; + + // check for EOF + if (line == null) { + eof = true; + break; + } + + // trim the line + line = line.trim(); + + // skip any blank lines + if (line.length() == 0) { + continue; + } + + // skip any commented lines + if (line.startsWith("#")) { + continue; + } + + // construct the Record instance + Record record = new Record(lineNumber, line); + + try { + // parse the line as a JSON object + JsonObject recordJson = Json.createReader(new StringReader(line)).readObject(); + + // extract the data source code and record ID + String dataSourceCode = recordJson.getString(DATA_SOURCE, null); + String recordId = recordJson.getString(RECORD_ID, null); + SzRecordKey recordKey = SzRecordKey.of(dataSourceCode, recordId); + + Future future = executor.submit(() -> { + // call the deleteRecord() function with info flags + return engine.deleteRecord(recordKey, SZ_WITH_INFO_FLAGS); + }); + + // add the futures to the pending future list + pendingFutures.put(future, record); + + } catch (JsonException e) { + logFailedRecord(ERROR, e, lineNumber, line); + errorCount++; // increment the error count + } + } + + do { + // handle any pending futures WITHOUT blocking to reduce the backlog + handlePendingFutures(engine, pendingFutures, false); + + // if we still have exceeded the backlog size then pause + // briefly before trying again + if (pendingFutures.size() >= MAXIMUM_BACKLOG) { + try { + Thread.sleep(PAUSE_TIMEOUT); + + } catch (InterruptedException ignore) { + // do nothing + } + } + } while (pendingFutures.size() >= MAXIMUM_BACKLOG); + } - // if we still have exceeded the backlog size then pause - // briefly before trying again - if (pendingFutures.size() >= MAXIMUM_BACKLOG) { - try { - Thread.sleep(PAUSE_TIMEOUT); + // shutdown the executor service + executor.shutdown(); + + // after we have submitted all records we need to handle the remaining + // pending futures so this time we block on each future + handlePendingFutures(engine, pendingFutures, true); - } catch (InterruptedException ignore) { - // do nothing + } catch (Exception e) { + System.err.println(); + System.err.println("*** Terminated due to critical error ***"); + System.err.flush(); + if (e instanceof RuntimeException) { + throw ((RuntimeException) e); } - } - } while (pendingFutures.size() >= MAXIMUM_BACKLOG); - } - - // shutdown the executor service - executor.shutdown(); - - // after we have submitted all records we need to handle the remaining - // pending futures so this time we block on each future - handlePendingFutures(engine, pendingFutures, true); - - } catch (Exception e) { - System.err.println(); - System.err.println("*** Terminated due to critical error ***"); - System.err.flush(); - if (e instanceof RuntimeException) { - throw ((RuntimeException) e); - } - throw new RuntimeException(e); - - } finally { - // check if executor service is shutdown - if (!executor.isShutdown()) { - executor.shutdown(); - } - - // IMPORTANT: make sure to destroy the environment - env.destroy(); - - System.out.println(); - System.out.println("Successful delete operations : " + successCount); - System.out.println("Total entities deleted : " + entityIdSet.size()); - System.out.println("Failed delete operations : " + errorCount); - - // check on any retry records - if (retryWriter != null) { - retryWriter.flush(); - retryWriter.close(); - } - if (retryCount > 0) { - System.out.println(retryCount + " deletions to be retried in " + retryFile); - } - System.out.flush(); + throw new RuntimeException(e); - } + } finally { + // check if executor service is shutdown + if (!executor.isShutdown()) { + executor.shutdown(); + } + + // IMPORTANT: make sure to destroy the environment + env.destroy(); + + System.out.println(); + System.out.println("Successful delete operations : " + successCount); + System.out.println("Total entities deleted : " + entityIdSet.size()); + System.out.println("Failed delete operations : " + errorCount); + + // check on any retry records + if (retryWriter != null) { + retryWriter.flush(); + retryWriter.close(); + } + if (retryCount > 0) { + System.out.println(retryCount + " deletions to be retried in " + retryFile); + } + System.out.flush(); - } - - private static void handlePendingFutures(SzEngine engine, - Map, Record> pendingFutures, - boolean blocking) - throws Exception { - // check for completed futures - Iterator, Record>> iter = pendingFutures.entrySet().iterator(); - - // loop through the pending futures - while (iter.hasNext()) { - // get the next pending future - Map.Entry, Record> entry = iter.next(); - Future future = entry.getKey(); - Record record = entry.getValue(); - - // if not blocking and this one is not done then continue - if (!blocking && !future.isDone()) { - continue; - } - - // remove the pending future from the map - iter.remove(); - - try { - try { - // get the value to see if there was an exception - String info = future.get(); - - // if we get here then increment the success count - successCount++; - - // process the info - processInfo(engine, info); - - } catch (InterruptedException e) { - // this could only happen if blocking is true, just - // rethrow as retryable and log the interruption - throw e; - - } catch (ExecutionException e) { - // if execution failed with an exception then rethrow - Throwable cause = e.getCause(); - if ((cause == null) || !(cause instanceof Exception)) { - // rethrow the execution exception - throw e; - } - // cast to an Exception and rethrow - throw ((Exception) cause); } - } catch (SzBadInputException e) { - logFailedRecord(ERROR, e, record.lineNumber, record.line); - errorCount++; // increment the error count - - } catch (SzRetryableException | InterruptedException | CancellationException e) { - // handle thread interruption and cancellation as retries - logFailedRecord(WARNING, e, record.lineNumber, record.line); - errorCount++; // increment the error count - retryCount++; // increment the retry count - - // track the retry record so it can be retried later - if (retryFile == null) { - retryFile = File.createTempFile(RETRY_PREFIX, RETRY_SUFFIX); - retryWriter = new PrintWriter( - new OutputStreamWriter(new FileOutputStream(retryFile), UTF_8)); + } + + private static void handlePendingFutures(SzEngine engine, + Map, Record> pendingFutures, + boolean blocking) + throws Exception { + // check for completed futures + Iterator, Record>> iter = pendingFutures.entrySet().iterator(); + + // loop through the pending futures + while (iter.hasNext()) { + // get the next pending future + Map.Entry, Record> entry = iter.next(); + Future future = entry.getKey(); + Record record = entry.getValue(); + + // if not blocking and this one is not done then continue + if (!blocking && !future.isDone()) { + continue; + } + + // remove the pending future from the map + iter.remove(); + + try { + try { + // get the value to see if there was an exception + String info = future.get(); + + // if we get here then increment the success count + successCount++; + + // process the info + processInfo(engine, info); + + } catch (InterruptedException e) { + // this could only happen if blocking is true, just + // rethrow as retryable and log the interruption + throw e; + + } catch (ExecutionException e) { + // if execution failed with an exception then rethrow + Throwable cause = e.getCause(); + if ((cause == null) || !(cause instanceof Exception)) { + // rethrow the execution exception + throw e; + } + // cast to an Exception and rethrow + throw ((Exception) cause); + } + + } catch (SzBadInputException e) { + logFailedRecord(ERROR, e, record.lineNumber, record.line); + errorCount++; // increment the error count + + } catch (SzRetryableException | InterruptedException | CancellationException e) { + // handle thread interruption and cancellation as retries + logFailedRecord(WARNING, e, record.lineNumber, record.line); + errorCount++; // increment the error count + retryCount++; // increment the retry count + + // track the retry record so it can be retried later + if (retryFile == null) { + retryFile = File.createTempFile(RETRY_PREFIX, RETRY_SUFFIX); + retryWriter = new PrintWriter( + new OutputStreamWriter(new FileOutputStream(retryFile), UTF_8)); + } + retryWriter.println(record.line); + + } catch (Exception e) { + // catch any other exception (incl. SzException) here + logFailedRecord(CRITICAL, e, record.lineNumber, record.line); + errorCount++; + throw e; // rethrow since exception is critical + } } - retryWriter.println(record.line); - - } catch (Exception e) { - // catch any other exception (incl. SzException) here - logFailedRecord(CRITICAL, e, record.lineNumber, record.line); - errorCount++; - throw e; // rethrow since exception is critical - } } - } - - /** - * Example method for parsing and handling the INFO message (formatted - * as JSON). This example implementation simply tracks all entity ID's - * that appear as "AFFECTED_ENTITIES" to count the number - * of entities deleted for the records -- essentially a contrived - * data mart. - * - * @param engine the {@link SzEngine} to use. - * @param info The info message. - */ - private static void processInfo(SzEngine engine, String info) { - JsonObject jsonObject = Json.createReader(new StringReader(info)).readObject(); - if (!jsonObject.containsKey(AFFECTED_ENTITIES)) { - return; + + /** + * Example method for parsing and handling the INFO message (formatted + * as JSON). This example implementation simply tracks all entity ID's + * that appear as "AFFECTED_ENTITIES" to count the number + * of entities deleted for the records -- essentially a contrived + * data mart. + * + * @param engine the {@link SzEngine} to use. + * @param info The info message. + */ + private static void processInfo(SzEngine engine, String info) { + JsonObject jsonObject = Json.createReader(new StringReader(info)).readObject(); + if (!jsonObject.containsKey(AFFECTED_ENTITIES)) { + return; + } + JsonArray affectedArr = jsonObject.getJsonArray(AFFECTED_ENTITIES); + for (JsonObject affected : affectedArr.getValuesAs(JsonObject.class)) { + JsonNumber number = affected.getJsonNumber(ENTITY_ID); + long entityId = number.longValue(); + + try { + engine.getEntity(entityId, null); + entityIdSet.remove(entityId); + } catch (SzNotFoundException e) { + entityIdSet.add(entityId); + } catch (SzException e) { + // simply log the exception, do not rethrow + System.err.println(); + System.err.println("**** FAILED TO RETRIEVE ENTITY: " + entityId); + System.err.println(e.toString()); + System.err.flush(); + } + } } - JsonArray affectedArr = jsonObject.getJsonArray(AFFECTED_ENTITIES); - for (JsonObject affected : affectedArr.getValuesAs(JsonObject.class)) { - JsonNumber number = affected.getJsonNumber(ENTITY_ID); - long entityId = number.longValue(); - - try { - engine.getEntity(entityId, null); - entityIdSet.remove(entityId); - } catch (SzNotFoundException e) { - entityIdSet.add(entityId); - } catch (SzException e) { - // simply log the exception, do not rethrow + + /** + * Example method for logging failed records. + * + * @param errorType The error type description. + * @param exception The exception itself. + * @param lineNumber The line number of the failed record in the JSON input + * file. + * @param recordJson The JSON text for the failed record. + */ + private static void logFailedRecord(String errorType, + Exception exception, + int lineNumber, + String recordJson) { System.err.println(); - System.err.println("**** FAILED TO RETRIEVE ENTITY: " + entityId); - System.err.println(e.toString()); + System.err.println( + "** " + errorType + " ** FAILED TO DELETE RECORD AT LINE " + lineNumber + ": "); + System.err.println(recordJson); + System.err.println(exception); System.err.flush(); - } } - } - - /** - * Example method for logging failed records. - * - * @param errorType The error type description. - * @param exception The exception itself. - * @param lineNumber The line number of the failed record in the JSON input - * file. - * @param recordJson The JSON text for the failed record. - */ - private static void logFailedRecord(String errorType, - Exception exception, - int lineNumber, - String recordJson) { - System.err.println(); - System.err.println( - "** " + errorType + " ** FAILED TO DELETE RECORD AT LINE " + lineNumber + ": "); - System.err.println(recordJson); - System.err.println(exception); - System.err.flush(); - } - - private static final String DEFAULT_FILE_PATH = "../resources/data/del-500.jsonl"; - - private static final String UTF_8 = "UTF-8"; - - private static final String RETRY_PREFIX = "retry-"; - private static final String RETRY_SUFFIX = ".jsonl"; - - private static final int THREAD_COUNT = 8; - - private static final int BACKLOG_FACTOR = 10; - - private static final int MAXIMUM_BACKLOG = THREAD_COUNT * BACKLOG_FACTOR; - - private static final long PAUSE_TIMEOUT = 100L; - - private static final String DATA_SOURCE = "DATA_SOURCE"; - private static final String RECORD_ID = "RECORD_ID"; - private static final String AFFECTED_ENTITIES = "AFFECTED_ENTITIES"; - private static final String ENTITY_ID = "ENTITY_ID"; - - private static final String ERROR = "ERROR"; - private static final String WARNING = "WARNING"; - private static final String CRITICAL = "CRITICAL"; - - public record Record(int lineNumber, String line) { - } - - private static int errorCount = 0; - private static int successCount = 0; - private static int retryCount = 0; - private static File retryFile = null; - private static PrintWriter retryWriter = null; - private static Set entityIdSet = new HashSet<>(); + + private static final String DEFAULT_FILE_PATH = "../resources/data/del-500.jsonl"; + + private static final String UTF_8 = "UTF-8"; + + private static final String RETRY_PREFIX = "retry-"; + private static final String RETRY_SUFFIX = ".jsonl"; + + private static final int THREAD_COUNT = 8; + + private static final int BACKLOG_FACTOR = 10; + + private static final int MAXIMUM_BACKLOG = THREAD_COUNT * BACKLOG_FACTOR; + + private static final long PAUSE_TIMEOUT = 100L; + + private static final String DATA_SOURCE = "DATA_SOURCE"; + private static final String RECORD_ID = "RECORD_ID"; + private static final String AFFECTED_ENTITIES = "AFFECTED_ENTITIES"; + private static final String ENTITY_ID = "ENTITY_ID"; + + private static final String ERROR = "ERROR"; + private static final String WARNING = "WARNING"; + private static final String CRITICAL = "CRITICAL"; + + public record Record(int lineNumber, String line) { + } + + private static int errorCount = 0; + private static int successCount = 0; + private static int retryCount = 0; + private static File retryFile = null; + private static PrintWriter retryWriter = null; + private static Set entityIdSet = new HashSet<>(); } diff --git a/java/snippets/loading/LoadRecords.java b/java/snippets/loading/LoadRecords.java index 7cfef80..f08e6e8 100644 --- a/java/snippets/loading/LoadRecords.java +++ b/java/snippets/loading/LoadRecords.java @@ -11,161 +11,161 @@ * Provides a simple example of adding records to the Senzing repository. */ public class LoadRecords { - public static void main(String[] args) { - // get the senzing repository settings - String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); - if (settings == null) { - System.err.println("Unable to get settings."); - throw new IllegalArgumentException("Unable to get settings"); - } - - // create a descriptive instance name (can be anything) - String instanceName = LoadRecords.class.getSimpleName(); - - // initialize the Senzing environment - SzEnvironment env = SzCoreEnvironment.newBuilder() - .settings(settings) - .instanceName(instanceName) - .verboseLogging(false) - .build(); - - try { - // get the engine from the environment - SzEngine engine = env.getEngine(); - - // loop through the example records and add them to the repository - for (Map.Entry entry : getRecords().entrySet()) { - SzRecordKey recordKey = entry.getKey(); - String recordDefinition = entry.getValue(); - - // call the addRecord() function with no flags - engine.addRecord(recordKey, recordDefinition, SZ_NO_FLAGS); - - System.out.println("Record " + recordKey.recordId() + " added"); - System.out.flush(); - } - - } catch (SzException e) { - // handle any exception that may have occurred - System.err.println("Senzing Error Message : " + e.getMessage()); - System.err.println("Senzing Error Code : " + e.getErrorCode()); - e.printStackTrace(); - throw new RuntimeException(e); - - } catch (Exception e) { - e.printStackTrace(); - if (e instanceof RuntimeException) { - throw ((RuntimeException) e); - } - throw new RuntimeException(e); - - } finally { - // IMPORTANT: make sure to destroy the environment - env.destroy(); - } - - } - - /** - * This is a support method for providing example records to add. - * - * @return A {@link Map} of {@link SzRecordKey} keys to {@link String} - * JSON text values describing the records to be added. - */ - public static Map getRecords() { - Map records = new LinkedHashMap<>(); - records.put( - SzRecordKey.of("TEST", "1001"), - """ - { - "DATA_SOURCE": "TEST", - "RECORD_ID": "1001", - "RECORD_TYPE": "PERSON", - "PRIMARY_NAME_FIRST": "Robert", - "PRIMARY_NAME_LAST": "Smith", - "DATE_OF_BIRTH": "12/11/1978", - "ADDR_TYPE": "MAILING", - "ADDR_FULL": "123 Main Street, Las Vegas, NV 89132", - "PHONE_TYPE": "HOME", - "PHONE_NUMBER": "702-919-1300", - "EMAIL_ADDRESS": "bsmith@work.com" - } - """); - - records.put( - SzRecordKey.of("TEST", "1002"), - """ - { - "DATA_SOURCE": "TEST", - "RECORD_ID": "1002", - "RECORD_TYPE": "PERSON", - "PRIMARY_NAME_FIRST": "Bob", - "PRIMARY_NAME_LAST": "Smith", - "PRIMARY_NAME_GENERATION": "II", - "DATE_OF_BIRTH": "11/12/1978", - "ADDR_TYPE": "HOME", - "ADDR_LINE1": "1515 Adela Lane", - "ADDR_CITY": "Las Vegas", - "ADDR_STATE": "NV", - "ADDR_POSTAL_CODE": "89111", - "PHONE_TYPE": "MOBILE", - "PHONE_NUMBER": "702-919-1300" - } - """); - - records.put( - SzRecordKey.of("TEST", "1003"), - """ - { - "DATA_SOURCE": "TEST", - "RECORD_ID": "1003", - "RECORD_TYPE": "PERSON", - "PRIMARY_NAME_FIRST": "Bob", - "PRIMARY_NAME_LAST": "Smith", - "PRIMARY_NAME_MIDDLE": "J", - "DATE_OF_BIRTH": "12/11/1978", - "EMAIL_ADDRESS": "bsmith@work.com" + public static void main(String[] args) { + // get the senzing repository settings + String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); + if (settings == null) { + System.err.println("Unable to get settings."); + throw new IllegalArgumentException("Unable to get settings"); + } + + // create a descriptive instance name (can be anything) + String instanceName = LoadRecords.class.getSimpleName(); + + // initialize the Senzing environment + SzEnvironment env = SzCoreEnvironment.newBuilder() + .settings(settings) + .instanceName(instanceName) + .verboseLogging(false) + .build(); + + try { + // get the engine from the environment + SzEngine engine = env.getEngine(); + + // loop through the example records and add them to the repository + for (Map.Entry entry : getRecords().entrySet()) { + SzRecordKey recordKey = entry.getKey(); + String recordDefinition = entry.getValue(); + + // call the addRecord() function with no flags + engine.addRecord(recordKey, recordDefinition, SZ_NO_FLAGS); + + System.out.println("Record " + recordKey.recordId() + " added"); + System.out.flush(); } - """); - - records.put( - SzRecordKey.of("TEST", "1004"), - """ - { - "DATA_SOURCE": "TEST", - "RECORD_ID": "1004", - "RECORD_TYPE": "PERSON", - "PRIMARY_NAME_FIRST": "B", - "PRIMARY_NAME_LAST": "Smith", - "ADDR_TYPE": "HOME", - "ADDR_LINE1": "1515 Adela Ln", - "ADDR_CITY": "Las Vegas", - "ADDR_STATE": "NV", - "ADDR_POSTAL_CODE": "89132", - "EMAIL_ADDRESS": "bsmith@work.com" - } - """); - - records.put( - SzRecordKey.of("TEST", "1005"), - """ - { - "DATA_SOURCE": "TEST", - "RECORD_ID": "1005", - "RECORD_TYPE": "PERSON", - "PRIMARY_NAME_FIRST": "Rob", - "PRIMARY_NAME_MIDDLE": "E", - "PRIMARY_NAME_LAST": "Smith", - "DRIVERS_LICENSE_NUMBER": "112233", - "DRIVERS_LICENSE_STATE": "NV", - "ADDR_TYPE": "MAILING", - "ADDR_LINE1": "123 E Main St", - "ADDR_CITY": "Henderson", - "ADDR_STATE": "NV", - "ADDR_POSTAL_CODE": "89132" + + } catch (SzException e) { + // handle any exception that may have occurred + System.err.println("Senzing Error Message : " + e.getMessage()); + System.err.println("Senzing Error Code : " + e.getErrorCode()); + e.printStackTrace(); + throw new RuntimeException(e); + + } catch (Exception e) { + e.printStackTrace(); + if (e instanceof RuntimeException) { + throw ((RuntimeException) e); } - """); + throw new RuntimeException(e); + + } finally { + // IMPORTANT: make sure to destroy the environment + env.destroy(); + } - return records; - } + } + + /** + * This is a support method for providing example records to add. + * + * @return A {@link Map} of {@link SzRecordKey} keys to {@link String} + * JSON text values describing the records to be added. + */ + public static Map getRecords() { + Map records = new LinkedHashMap<>(); + records.put( + SzRecordKey.of("TEST", "1001"), + """ + { + "DATA_SOURCE": "TEST", + "RECORD_ID": "1001", + "RECORD_TYPE": "PERSON", + "PRIMARY_NAME_FIRST": "Robert", + "PRIMARY_NAME_LAST": "Smith", + "DATE_OF_BIRTH": "12/11/1978", + "ADDR_TYPE": "MAILING", + "ADDR_FULL": "123 Main Street, Las Vegas, NV 89132", + "PHONE_TYPE": "HOME", + "PHONE_NUMBER": "702-919-1300", + "EMAIL_ADDRESS": "bsmith@work.com" + } + """); + + records.put( + SzRecordKey.of("TEST", "1002"), + """ + { + "DATA_SOURCE": "TEST", + "RECORD_ID": "1002", + "RECORD_TYPE": "PERSON", + "PRIMARY_NAME_FIRST": "Bob", + "PRIMARY_NAME_LAST": "Smith", + "PRIMARY_NAME_GENERATION": "II", + "DATE_OF_BIRTH": "11/12/1978", + "ADDR_TYPE": "HOME", + "ADDR_LINE1": "1515 Adela Lane", + "ADDR_CITY": "Las Vegas", + "ADDR_STATE": "NV", + "ADDR_POSTAL_CODE": "89111", + "PHONE_TYPE": "MOBILE", + "PHONE_NUMBER": "702-919-1300" + } + """); + + records.put( + SzRecordKey.of("TEST", "1003"), + """ + { + "DATA_SOURCE": "TEST", + "RECORD_ID": "1003", + "RECORD_TYPE": "PERSON", + "PRIMARY_NAME_FIRST": "Bob", + "PRIMARY_NAME_LAST": "Smith", + "PRIMARY_NAME_MIDDLE": "J", + "DATE_OF_BIRTH": "12/11/1978", + "EMAIL_ADDRESS": "bsmith@work.com" + } + """); + + records.put( + SzRecordKey.of("TEST", "1004"), + """ + { + "DATA_SOURCE": "TEST", + "RECORD_ID": "1004", + "RECORD_TYPE": "PERSON", + "PRIMARY_NAME_FIRST": "B", + "PRIMARY_NAME_LAST": "Smith", + "ADDR_TYPE": "HOME", + "ADDR_LINE1": "1515 Adela Ln", + "ADDR_CITY": "Las Vegas", + "ADDR_STATE": "NV", + "ADDR_POSTAL_CODE": "89132", + "EMAIL_ADDRESS": "bsmith@work.com" + } + """); + + records.put( + SzRecordKey.of("TEST", "1005"), + """ + { + "DATA_SOURCE": "TEST", + "RECORD_ID": "1005", + "RECORD_TYPE": "PERSON", + "PRIMARY_NAME_FIRST": "Rob", + "PRIMARY_NAME_MIDDLE": "E", + "PRIMARY_NAME_LAST": "Smith", + "DRIVERS_LICENSE_NUMBER": "112233", + "DRIVERS_LICENSE_STATE": "NV", + "ADDR_TYPE": "MAILING", + "ADDR_LINE1": "123 E Main St", + "ADDR_CITY": "Henderson", + "ADDR_STATE": "NV", + "ADDR_POSTAL_CODE": "89132" + } + """); + + return records; + } } diff --git a/java/snippets/loading/LoadViaFutures.java b/java/snippets/loading/LoadViaFutures.java index 96c39ff..dbfa35f 100644 --- a/java/snippets/loading/LoadViaFutures.java +++ b/java/snippets/loading/LoadViaFutures.java @@ -13,277 +13,277 @@ * Provides a simple example of adding records to the Senzing repository. */ public class LoadViaFutures { - public static void main(String[] args) { - // get the senzing repository settings - String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); - if (settings == null) { - System.err.println("Unable to get settings."); - throw new IllegalArgumentException("Unable to get settings"); - } - - // create a descriptive instance name (can be anything) - String instanceName = LoadViaFutures.class.getSimpleName(); - - // initialize the Senzing environment - SzEnvironment env = SzCoreEnvironment.newBuilder() - .settings(settings) - .instanceName(instanceName) - .verboseLogging(false) - .build(); - - String filePath = (args.length > 0) ? args[0] : DEFAULT_FILE_PATH; - - // create the thread pool and executor service - ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); - - // keep track of pending futures and don't backlog too many for memory's sake - Map, Record> pendingFutures = new IdentityHashMap<>(); - - try (FileInputStream fis = new FileInputStream(filePath); - InputStreamReader isr = new InputStreamReader(fis, UTF_8); - BufferedReader br = new BufferedReader(isr)) { - // get the engine from the environment - SzEngine engine = env.getEngine(); - - int lineNumber = 0; - boolean eof = false; - - while (!eof) { - // loop through the example records and queue them up so long - // as we have more records and backlog is not too large - while (pendingFutures.size() < MAXIMUM_BACKLOG) { - // read the next line - String line = br.readLine(); - lineNumber++; - - // check for EOF - if (line == null) { - eof = true; - break; - } - - // trim the line - line = line.trim(); - - // skip any blank lines - if (line.length() == 0) { - continue; - } - - // skip any commented lines - if (line.startsWith("#")) { - continue; - } - - // construct the Record instance - Record record = new Record(lineNumber, line); - - try { - // parse the line as a JSON object - JsonObject recordJson = Json.createReader(new StringReader(line)).readObject(); - - // extract the data source code and record ID - String dataSourceCode = recordJson.getString(DATA_SOURCE, null); - String recordId = recordJson.getString(RECORD_ID, null); - SzRecordKey recordKey = SzRecordKey.of(dataSourceCode, recordId); - - Future future = executor.submit(() -> { - // call the addRecord() function with no flags - engine.addRecord(recordKey, record.line, SZ_NO_FLAGS); - - // return null since we have no "info" to return - return null; - }); - - // add the future to the pending future list - pendingFutures.put(future, record); - - } catch (JsonException e) { - logFailedRecord(ERROR, e, lineNumber, line); - errorCount++; // increment the error count - } + public static void main(String[] args) { + // get the senzing repository settings + String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); + if (settings == null) { + System.err.println("Unable to get settings."); + throw new IllegalArgumentException("Unable to get settings"); } - do { - // handle any pending futures WITHOUT blocking to reduce the backlog - handlePendingFutures(pendingFutures, false); + // create a descriptive instance name (can be anything) + String instanceName = LoadViaFutures.class.getSimpleName(); + + // initialize the Senzing environment + SzEnvironment env = SzCoreEnvironment.newBuilder() + .settings(settings) + .instanceName(instanceName) + .verboseLogging(false) + .build(); + + String filePath = (args.length > 0) ? args[0] : DEFAULT_FILE_PATH; + + // create the thread pool and executor service + ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); + + // keep track of pending futures and don't backlog too many for memory's sake + Map, Record> pendingFutures = new IdentityHashMap<>(); + + try (FileInputStream fis = new FileInputStream(filePath); + InputStreamReader isr = new InputStreamReader(fis, UTF_8); + BufferedReader br = new BufferedReader(isr)) { + // get the engine from the environment + SzEngine engine = env.getEngine(); + + int lineNumber = 0; + boolean eof = false; + + while (!eof) { + // loop through the example records and queue them up so long + // as we have more records and backlog is not too large + while (pendingFutures.size() < MAXIMUM_BACKLOG) { + // read the next line + String line = br.readLine(); + lineNumber++; + + // check for EOF + if (line == null) { + eof = true; + break; + } + + // trim the line + line = line.trim(); + + // skip any blank lines + if (line.length() == 0) { + continue; + } + + // skip any commented lines + if (line.startsWith("#")) { + continue; + } + + // construct the Record instance + Record record = new Record(lineNumber, line); + + try { + // parse the line as a JSON object + JsonObject recordJson = Json.createReader(new StringReader(line)).readObject(); + + // extract the data source code and record ID + String dataSourceCode = recordJson.getString(DATA_SOURCE, null); + String recordId = recordJson.getString(RECORD_ID, null); + SzRecordKey recordKey = SzRecordKey.of(dataSourceCode, recordId); + + Future future = executor.submit(() -> { + // call the addRecord() function with no flags + engine.addRecord(recordKey, record.line, SZ_NO_FLAGS); + + // return null since we have no "info" to return + return null; + }); + + // add the future to the pending future list + pendingFutures.put(future, record); + + } catch (JsonException e) { + logFailedRecord(ERROR, e, lineNumber, line); + errorCount++; // increment the error count + } + } + + do { + // handle any pending futures WITHOUT blocking to reduce the backlog + handlePendingFutures(pendingFutures, false); + + // if we still have exceeded the backlog size then pause + // briefly before trying again + if (pendingFutures.size() >= MAXIMUM_BACKLOG) { + try { + Thread.sleep(PAUSE_TIMEOUT); + + } catch (InterruptedException ignore) { + // do nothing + } + } + } while (pendingFutures.size() >= MAXIMUM_BACKLOG); + } - // if we still have exceeded the backlog size then pause - // briefly before trying again - if (pendingFutures.size() >= MAXIMUM_BACKLOG) { - try { - Thread.sleep(PAUSE_TIMEOUT); + // shutdown the executor service + executor.shutdown(); + + // after we have submitted all records we need to handle the remaining + // pending futures so this time we block on each future + handlePendingFutures(pendingFutures, true); - } catch (InterruptedException ignore) { - // do nothing + } catch (Exception e) { + System.err.println(); + System.err.println("*** Terminated due to critical error ***"); + System.err.flush(); + if (e instanceof RuntimeException) { + throw ((RuntimeException) e); } - } - } while (pendingFutures.size() >= MAXIMUM_BACKLOG); - } - - // shutdown the executor service - executor.shutdown(); - - // after we have submitted all records we need to handle the remaining - // pending futures so this time we block on each future - handlePendingFutures(pendingFutures, true); - - } catch (Exception e) { - System.err.println(); - System.err.println("*** Terminated due to critical error ***"); - System.err.flush(); - if (e instanceof RuntimeException) { - throw ((RuntimeException) e); - } - throw new RuntimeException(e); - - } finally { - // check if executor service is shutdown - if (!executor.isShutdown()) { - executor.shutdown(); - } - - // IMPORTANT: make sure to destroy the environment - env.destroy(); - - System.out.println(); - System.out.println("Records successfully added : " + successCount); - System.out.println("Records failed with errors : " + errorCount); - - // check on any retry records - if (retryWriter != null) { - retryWriter.flush(); - retryWriter.close(); - } - if (retryCount > 0) { - System.out.println(retryCount + " records to be retried in " + retryFile); - } - System.out.flush(); + throw new RuntimeException(e); - } + } finally { + // check if executor service is shutdown + if (!executor.isShutdown()) { + executor.shutdown(); + } + + // IMPORTANT: make sure to destroy the environment + env.destroy(); + + System.out.println(); + System.out.println("Records successfully added : " + successCount); + System.out.println("Records failed with errors : " + errorCount); + + // check on any retry records + if (retryWriter != null) { + retryWriter.flush(); + retryWriter.close(); + } + if (retryCount > 0) { + System.out.println(retryCount + " records to be retried in " + retryFile); + } + System.out.flush(); - } - - private static void handlePendingFutures(Map, Record> pendingFutures, boolean blocking) - throws Exception { - // check for completed futures - Iterator, Record>> iter = pendingFutures.entrySet().iterator(); - - // loop through the pending futures - while (iter.hasNext()) { - // get the next pending future - Map.Entry, Record> entry = iter.next(); - Future future = entry.getKey(); - Record record = entry.getValue(); - - // if not blocking and this one is not done then continue - if (!blocking && !future.isDone()) { - continue; - } - - // remove the pending future from the map - iter.remove(); - - try { - try { - // get the value to see if there was an exception - future.get(); - - // if we get here then increment the success count - successCount++; - - } catch (InterruptedException e) { - // this could only happen if blocking is true, just - // rethrow as retryable and log the interruption - throw e; - - } catch (ExecutionException e) { - // if execution failed with an exception then rethrow - Throwable cause = e.getCause(); - if ((cause == null) || !(cause instanceof Exception)) { - // rethrow the execution exception - throw e; - } - // cast to an Exception and rethrow - throw ((Exception) cause); } - } catch (SzBadInputException e) { - logFailedRecord(ERROR, e, record.lineNumber, record.line); - errorCount++; // increment the error count - - } catch (SzRetryableException | InterruptedException | CancellationException e) { - // handle thread interruption and cancellation as retries - logFailedRecord(WARNING, e, record.lineNumber, record.line); - errorCount++; // increment the error count - retryCount++; // increment the retry count - - // track the retry record so it can be retried later - if (retryFile == null) { - retryFile = File.createTempFile(RETRY_PREFIX, RETRY_SUFFIX); - retryWriter = new PrintWriter( - new OutputStreamWriter(new FileOutputStream(retryFile), UTF_8)); + } + + private static void handlePendingFutures(Map, Record> pendingFutures, boolean blocking) + throws Exception { + // check for completed futures + Iterator, Record>> iter = pendingFutures.entrySet().iterator(); + + // loop through the pending futures + while (iter.hasNext()) { + // get the next pending future + Map.Entry, Record> entry = iter.next(); + Future future = entry.getKey(); + Record record = entry.getValue(); + + // if not blocking and this one is not done then continue + if (!blocking && !future.isDone()) { + continue; + } + + // remove the pending future from the map + iter.remove(); + + try { + try { + // get the value to see if there was an exception + future.get(); + + // if we get here then increment the success count + successCount++; + + } catch (InterruptedException e) { + // this could only happen if blocking is true, just + // rethrow as retryable and log the interruption + throw e; + + } catch (ExecutionException e) { + // if execution failed with an exception then rethrow + Throwable cause = e.getCause(); + if ((cause == null) || !(cause instanceof Exception)) { + // rethrow the execution exception + throw e; + } + // cast to an Exception and rethrow + throw ((Exception) cause); + } + + } catch (SzBadInputException e) { + logFailedRecord(ERROR, e, record.lineNumber, record.line); + errorCount++; // increment the error count + + } catch (SzRetryableException | InterruptedException | CancellationException e) { + // handle thread interruption and cancellation as retries + logFailedRecord(WARNING, e, record.lineNumber, record.line); + errorCount++; // increment the error count + retryCount++; // increment the retry count + + // track the retry record so it can be retried later + if (retryFile == null) { + retryFile = File.createTempFile(RETRY_PREFIX, RETRY_SUFFIX); + retryWriter = new PrintWriter( + new OutputStreamWriter(new FileOutputStream(retryFile), UTF_8)); + } + retryWriter.println(record.line); + + } catch (Exception e) { + // catch any other exception (incl. SzException) here + logFailedRecord(CRITICAL, e, record.lineNumber, record.line); + errorCount++; + throw e; // rethrow since exception is critical + } } - retryWriter.println(record.line); - - } catch (Exception e) { - // catch any other exception (incl. SzException) here - logFailedRecord(CRITICAL, e, record.lineNumber, record.line); - errorCount++; - throw e; // rethrow since exception is critical - } } - } - - /** - * Example method for logging failed records. - * - * @param errorType The error type description. - * @param exception The exception itself. - * @param lineNumber The line number of the failed record in the JSON input - * file. - * @param recordJson The JSON text for the failed record. - */ - private static void logFailedRecord(String errorType, - Exception exception, - int lineNumber, - String recordJson) { - System.err.println(); - System.err.println( - "** " + errorType + " ** FAILED TO ADD RECORD AT LINE " + lineNumber + ": "); - System.err.println(recordJson); - System.err.println(exception); - System.err.flush(); - } - - private static final String DEFAULT_FILE_PATH = "../resources/data/load-500.jsonl"; - - private static final String UTF_8 = "UTF-8"; - - private static final String RETRY_PREFIX = "retry-"; - private static final String RETRY_SUFFIX = ".jsonl"; - - private static final int THREAD_COUNT = 8; - - private static final int BACKLOG_FACTOR = 10; - - private static final int MAXIMUM_BACKLOG = THREAD_COUNT * BACKLOG_FACTOR; - - private static final long PAUSE_TIMEOUT = 100L; - - private static final String DATA_SOURCE = "DATA_SOURCE"; - private static final String RECORD_ID = "RECORD_ID"; - - private static final String ERROR = "ERROR"; - private static final String WARNING = "WARNING"; - private static final String CRITICAL = "CRITICAL"; - - public record Record(int lineNumber, String line) { - } - - private static int errorCount = 0; - private static int successCount = 0; - private static int retryCount = 0; - private static File retryFile = null; - private static PrintWriter retryWriter = null; + + /** + * Example method for logging failed records. + * + * @param errorType The error type description. + * @param exception The exception itself. + * @param lineNumber The line number of the failed record in the JSON input + * file. + * @param recordJson The JSON text for the failed record. + */ + private static void logFailedRecord(String errorType, + Exception exception, + int lineNumber, + String recordJson) { + System.err.println(); + System.err.println( + "** " + errorType + " ** FAILED TO ADD RECORD AT LINE " + lineNumber + ": "); + System.err.println(recordJson); + System.err.println(exception); + System.err.flush(); + } + + private static final String DEFAULT_FILE_PATH = "../resources/data/load-500.jsonl"; + + private static final String UTF_8 = "UTF-8"; + + private static final String RETRY_PREFIX = "retry-"; + private static final String RETRY_SUFFIX = ".jsonl"; + + private static final int THREAD_COUNT = 8; + + private static final int BACKLOG_FACTOR = 10; + + private static final int MAXIMUM_BACKLOG = THREAD_COUNT * BACKLOG_FACTOR; + + private static final long PAUSE_TIMEOUT = 100L; + + private static final String DATA_SOURCE = "DATA_SOURCE"; + private static final String RECORD_ID = "RECORD_ID"; + + private static final String ERROR = "ERROR"; + private static final String WARNING = "WARNING"; + private static final String CRITICAL = "CRITICAL"; + + public record Record(int lineNumber, String line) { + } + + private static int errorCount = 0; + private static int successCount = 0; + private static int retryCount = 0; + private static File retryFile = null; + private static PrintWriter retryWriter = null; } diff --git a/java/snippets/loading/LoadWithInfoViaFutures.java b/java/snippets/loading/LoadWithInfoViaFutures.java index 3151b69..986f651 100644 --- a/java/snippets/loading/LoadWithInfoViaFutures.java +++ b/java/snippets/loading/LoadWithInfoViaFutures.java @@ -13,318 +13,318 @@ * Provides a simple example of adding records to the Senzing repository. */ public class LoadWithInfoViaFutures { - public static void main(String[] args) { - // get the senzing repository settings - String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); - if (settings == null) { - System.err.println("Unable to get settings."); - throw new IllegalArgumentException("Unable to get settings"); - } - - // create a descriptive instance name (can be anything) - String instanceName = LoadWithInfoViaFutures.class.getSimpleName(); - - // initialize the Senzing environment - SzEnvironment env = SzCoreEnvironment.newBuilder() - .settings(settings) - .instanceName(instanceName) - .verboseLogging(false) - .build(); - - String filePath = (args.length > 0) ? args[0] : DEFAULT_FILE_PATH; - - // create the thread pool and executor service - ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); - - // keep track of pending futures and don't backlog too many for memory's sake - Map, Record> pendingFutures = new IdentityHashMap<>(); - - try (FileInputStream fis = new FileInputStream(filePath); - InputStreamReader isr = new InputStreamReader(fis, UTF_8); - BufferedReader br = new BufferedReader(isr)) { - // get the engine from the environment - SzEngine engine = env.getEngine(); - - int lineNumber = 0; - boolean eof = false; - - while (!eof) { - // loop through the example records and queue them up so long - // as we have more records and backlog is not too large - while (pendingFutures.size() < MAXIMUM_BACKLOG) { - // read the next line - String line = br.readLine(); - lineNumber++; - - // check for EOF - if (line == null) { - eof = true; - break; - } - - // trim the line - line = line.trim(); - - // skip any blank lines - if (line.length() == 0) { - continue; - } - - // skip any commented lines - if (line.startsWith("#")) { - continue; - } - - // construct the Record instance - Record record = new Record(lineNumber, line); - - try { - // parse the line as a JSON object - JsonObject recordJson = Json.createReader(new StringReader(line)).readObject(); - - // extract the data source code and record ID - String dataSourceCode = recordJson.getString(DATA_SOURCE, null); - String recordId = recordJson.getString(RECORD_ID, null); - SzRecordKey recordKey = SzRecordKey.of(dataSourceCode, recordId); - - Future future = executor.submit(() -> { - // call the addRecord() function with info flags - return engine.addRecord(recordKey, record.line, SZ_WITH_INFO_FLAGS); - }); - - // add the futures to the pending future list - pendingFutures.put(future, record); - - } catch (JsonException e) { - logFailedRecord(ERROR, e, lineNumber, line); - errorCount++; // increment the error count - } + public static void main(String[] args) { + // get the senzing repository settings + String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); + if (settings == null) { + System.err.println("Unable to get settings."); + throw new IllegalArgumentException("Unable to get settings"); } - do { - // handle any pending futures WITHOUT blocking to reduce the backlog - handlePendingFutures(engine, pendingFutures, false); + // create a descriptive instance name (can be anything) + String instanceName = LoadWithInfoViaFutures.class.getSimpleName(); + + // initialize the Senzing environment + SzEnvironment env = SzCoreEnvironment.newBuilder() + .settings(settings) + .instanceName(instanceName) + .verboseLogging(false) + .build(); + + String filePath = (args.length > 0) ? args[0] : DEFAULT_FILE_PATH; + + // create the thread pool and executor service + ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); + + // keep track of pending futures and don't backlog too many for memory's sake + Map, Record> pendingFutures = new IdentityHashMap<>(); + + try (FileInputStream fis = new FileInputStream(filePath); + InputStreamReader isr = new InputStreamReader(fis, UTF_8); + BufferedReader br = new BufferedReader(isr)) { + // get the engine from the environment + SzEngine engine = env.getEngine(); + + int lineNumber = 0; + boolean eof = false; + + while (!eof) { + // loop through the example records and queue them up so long + // as we have more records and backlog is not too large + while (pendingFutures.size() < MAXIMUM_BACKLOG) { + // read the next line + String line = br.readLine(); + lineNumber++; + + // check for EOF + if (line == null) { + eof = true; + break; + } + + // trim the line + line = line.trim(); + + // skip any blank lines + if (line.length() == 0) { + continue; + } + + // skip any commented lines + if (line.startsWith("#")) { + continue; + } + + // construct the Record instance + Record record = new Record(lineNumber, line); + + try { + // parse the line as a JSON object + JsonObject recordJson = Json.createReader(new StringReader(line)).readObject(); + + // extract the data source code and record ID + String dataSourceCode = recordJson.getString(DATA_SOURCE, null); + String recordId = recordJson.getString(RECORD_ID, null); + SzRecordKey recordKey = SzRecordKey.of(dataSourceCode, recordId); + + Future future = executor.submit(() -> { + // call the addRecord() function with info flags + return engine.addRecord(recordKey, record.line, SZ_WITH_INFO_FLAGS); + }); + + // add the futures to the pending future list + pendingFutures.put(future, record); + + } catch (JsonException e) { + logFailedRecord(ERROR, e, lineNumber, line); + errorCount++; // increment the error count + } + } + + do { + // handle any pending futures WITHOUT blocking to reduce the backlog + handlePendingFutures(engine, pendingFutures, false); + + // if we still have exceeded the backlog size then pause + // briefly before trying again + if (pendingFutures.size() >= MAXIMUM_BACKLOG) { + try { + Thread.sleep(PAUSE_TIMEOUT); + + } catch (InterruptedException ignore) { + // do nothing + } + } + } while (pendingFutures.size() >= MAXIMUM_BACKLOG); + } - // if we still have exceeded the backlog size then pause - // briefly before trying again - if (pendingFutures.size() >= MAXIMUM_BACKLOG) { - try { - Thread.sleep(PAUSE_TIMEOUT); + // shutdown the executor service + executor.shutdown(); + + // after we have submitted all records we need to handle the remaining + // pending futures so this time we block on each future + handlePendingFutures(engine, pendingFutures, true); - } catch (InterruptedException ignore) { - // do nothing + } catch (Exception e) { + System.err.println(); + System.err.println("*** Terminated due to critical error ***"); + System.err.flush(); + if (e instanceof RuntimeException) { + throw ((RuntimeException) e); } - } - } while (pendingFutures.size() >= MAXIMUM_BACKLOG); - } - - // shutdown the executor service - executor.shutdown(); - - // after we have submitted all records we need to handle the remaining - // pending futures so this time we block on each future - handlePendingFutures(engine, pendingFutures, true); - - } catch (Exception e) { - System.err.println(); - System.err.println("*** Terminated due to critical error ***"); - System.err.flush(); - if (e instanceof RuntimeException) { - throw ((RuntimeException) e); - } - throw new RuntimeException(e); - - } finally { - // check if executor service is shutdown - if (!executor.isShutdown()) { - executor.shutdown(); - } - - // IMPORTANT: make sure to destroy the environment - env.destroy(); - - System.out.println(); - System.out.println("Records successfully added : " + successCount); - System.out.println("Total entities created : " + entityIdSet.size()); - System.out.println("Records failed with errors : " + errorCount); - - // check on any retry records - if (retryWriter != null) { - retryWriter.flush(); - retryWriter.close(); - } - if (retryCount > 0) { - System.out.println(retryCount + " records to be retried in " + retryFile); - } - System.out.flush(); + throw new RuntimeException(e); - } + } finally { + // check if executor service is shutdown + if (!executor.isShutdown()) { + executor.shutdown(); + } + + // IMPORTANT: make sure to destroy the environment + env.destroy(); + + System.out.println(); + System.out.println("Records successfully added : " + successCount); + System.out.println("Total entities created : " + entityIdSet.size()); + System.out.println("Records failed with errors : " + errorCount); + + // check on any retry records + if (retryWriter != null) { + retryWriter.flush(); + retryWriter.close(); + } + if (retryCount > 0) { + System.out.println(retryCount + " records to be retried in " + retryFile); + } + System.out.flush(); - } - - private static void handlePendingFutures(SzEngine engine, - Map, Record> pendingFutures, - boolean blocking) - throws Exception { - // check for completed futures - Iterator, Record>> iter = pendingFutures.entrySet().iterator(); - - // loop through the pending futures - while (iter.hasNext()) { - // get the next pending future - Map.Entry, Record> entry = iter.next(); - Future future = entry.getKey(); - Record record = entry.getValue(); - - // if not blocking and this one is not done then continue - if (!blocking && !future.isDone()) { - continue; - } - - // remove the pending future from the map - iter.remove(); - - try { - try { - // get the value to see if there was an exception - String info = future.get(); - - // if we get here then increment the success count - successCount++; - - // process the info - processInfo(engine, info); - - } catch (InterruptedException e) { - // this could only happen if blocking is true, just - // rethrow as retryable and log the interruption - throw e; - - } catch (ExecutionException e) { - // if execution failed with an exception then rethrow - Throwable cause = e.getCause(); - if ((cause == null) || !(cause instanceof Exception)) { - // rethrow the execution exception - throw e; - } - // cast to an Exception and rethrow - throw ((Exception) cause); } - } catch (SzBadInputException e) { - logFailedRecord(ERROR, e, record.lineNumber, record.line); - errorCount++; // increment the error count - - } catch (SzRetryableException | InterruptedException | CancellationException e) { - // handle thread interruption and cancellation as retries - logFailedRecord(WARNING, e, record.lineNumber, record.line); - errorCount++; // increment the error count - retryCount++; // increment the retry count - - // track the retry record so it can be retried later - if (retryFile == null) { - retryFile = File.createTempFile(RETRY_PREFIX, RETRY_SUFFIX); - retryWriter = new PrintWriter( - new OutputStreamWriter(new FileOutputStream(retryFile), UTF_8)); + } + + private static void handlePendingFutures(SzEngine engine, + Map, Record> pendingFutures, + boolean blocking) + throws Exception { + // check for completed futures + Iterator, Record>> iter = pendingFutures.entrySet().iterator(); + + // loop through the pending futures + while (iter.hasNext()) { + // get the next pending future + Map.Entry, Record> entry = iter.next(); + Future future = entry.getKey(); + Record record = entry.getValue(); + + // if not blocking and this one is not done then continue + if (!blocking && !future.isDone()) { + continue; + } + + // remove the pending future from the map + iter.remove(); + + try { + try { + // get the value to see if there was an exception + String info = future.get(); + + // if we get here then increment the success count + successCount++; + + // process the info + processInfo(engine, info); + + } catch (InterruptedException e) { + // this could only happen if blocking is true, just + // rethrow as retryable and log the interruption + throw e; + + } catch (ExecutionException e) { + // if execution failed with an exception then rethrow + Throwable cause = e.getCause(); + if ((cause == null) || !(cause instanceof Exception)) { + // rethrow the execution exception + throw e; + } + // cast to an Exception and rethrow + throw ((Exception) cause); + } + + } catch (SzBadInputException e) { + logFailedRecord(ERROR, e, record.lineNumber, record.line); + errorCount++; // increment the error count + + } catch (SzRetryableException | InterruptedException | CancellationException e) { + // handle thread interruption and cancellation as retries + logFailedRecord(WARNING, e, record.lineNumber, record.line); + errorCount++; // increment the error count + retryCount++; // increment the retry count + + // track the retry record so it can be retried later + if (retryFile == null) { + retryFile = File.createTempFile(RETRY_PREFIX, RETRY_SUFFIX); + retryWriter = new PrintWriter( + new OutputStreamWriter(new FileOutputStream(retryFile), UTF_8)); + } + retryWriter.println(record.line); + + } catch (Exception e) { + // catch any other exception (incl. SzException) here + logFailedRecord(CRITICAL, e, record.lineNumber, record.line); + errorCount++; + throw e; // rethrow since exception is critical + } } - retryWriter.println(record.line); - - } catch (Exception e) { - // catch any other exception (incl. SzException) here - logFailedRecord(CRITICAL, e, record.lineNumber, record.line); - errorCount++; - throw e; // rethrow since exception is critical - } } - } - - /** - * Example method for parsing and handling the INFO message (formatted - * as JSON). This example implementation simply tracks all entity ID's - * that appear as "AFFECTED_ENTITIES" to count the number - * of entities created for the records -- essentially a contrived - * data mart. - * - * @param engine The {@link SzEngine} to use. - * @param info The info message. - */ - private static void processInfo(SzEngine engine, String info) { - JsonObject jsonObject = Json.createReader(new StringReader(info)).readObject(); - if (!jsonObject.containsKey(AFFECTED_ENTITIES)) { - return; + + /** + * Example method for parsing and handling the INFO message (formatted + * as JSON). This example implementation simply tracks all entity ID's + * that appear as "AFFECTED_ENTITIES" to count the number + * of entities created for the records -- essentially a contrived + * data mart. + * + * @param engine The {@link SzEngine} to use. + * @param info The info message. + */ + private static void processInfo(SzEngine engine, String info) { + JsonObject jsonObject = Json.createReader(new StringReader(info)).readObject(); + if (!jsonObject.containsKey(AFFECTED_ENTITIES)) { + return; + } + JsonArray affectedArr = jsonObject.getJsonArray(AFFECTED_ENTITIES); + for (JsonObject affected : affectedArr.getValuesAs(JsonObject.class)) { + JsonNumber number = affected.getJsonNumber(ENTITY_ID); + long entityId = number.longValue(); + + try { + engine.getEntity(entityId, null); + entityIdSet.add(entityId); + } catch (SzNotFoundException e) { + entityIdSet.remove(entityId); + } catch (SzException e) { + // simply log the exception, do not rethrow + System.err.println(); + System.err.println("**** FAILED TO RETRIEVE ENTITY: " + entityId); + System.err.println(e.toString()); + System.err.flush(); + } + } } - JsonArray affectedArr = jsonObject.getJsonArray(AFFECTED_ENTITIES); - for (JsonObject affected : affectedArr.getValuesAs(JsonObject.class)) { - JsonNumber number = affected.getJsonNumber(ENTITY_ID); - long entityId = number.longValue(); - - try { - engine.getEntity(entityId, null); - entityIdSet.add(entityId); - } catch (SzNotFoundException e) { - entityIdSet.remove(entityId); - } catch (SzException e) { - // simply log the exception, do not rethrow + + /** + * Example method for logging failed records. + * + * @param errorType The error type description. + * @param exception The exception itself. + * @param lineNumber The line number of the failed record in the JSON input + * file. + * @param recordJson The JSON text for the failed record. + */ + private static void logFailedRecord(String errorType, + Exception exception, + int lineNumber, + String recordJson) { System.err.println(); - System.err.println("**** FAILED TO RETRIEVE ENTITY: " + entityId); - System.err.println(e.toString()); + System.err.println( + "** " + errorType + " ** FAILED TO ADD RECORD AT LINE " + lineNumber + ": "); + System.err.println(recordJson); + System.err.println(exception); System.err.flush(); - } } - } - - /** - * Example method for logging failed records. - * - * @param errorType The error type description. - * @param exception The exception itself. - * @param lineNumber The line number of the failed record in the JSON input - * file. - * @param recordJson The JSON text for the failed record. - */ - private static void logFailedRecord(String errorType, - Exception exception, - int lineNumber, - String recordJson) { - System.err.println(); - System.err.println( - "** " + errorType + " ** FAILED TO ADD RECORD AT LINE " + lineNumber + ": "); - System.err.println(recordJson); - System.err.println(exception); - System.err.flush(); - } - - private static final String DEFAULT_FILE_PATH = "../resources/data/load-500.jsonl"; - - private static final String UTF_8 = "UTF-8"; - - private static final String RETRY_PREFIX = "retry-"; - private static final String RETRY_SUFFIX = ".jsonl"; - - private static final int THREAD_COUNT = 8; - - private static final int BACKLOG_FACTOR = 10; - - private static final int MAXIMUM_BACKLOG = THREAD_COUNT * BACKLOG_FACTOR; - - private static final long PAUSE_TIMEOUT = 100L; - - private static final String DATA_SOURCE = "DATA_SOURCE"; - private static final String RECORD_ID = "RECORD_ID"; - private static final String AFFECTED_ENTITIES = "AFFECTED_ENTITIES"; - private static final String ENTITY_ID = "ENTITY_ID"; - - private static final String ERROR = "ERROR"; - private static final String WARNING = "WARNING"; - private static final String CRITICAL = "CRITICAL"; - - public record Record(int lineNumber, String line) { - } - - private static int errorCount = 0; - private static int successCount = 0; - private static int retryCount = 0; - private static File retryFile = null; - private static PrintWriter retryWriter = null; - private static Set entityIdSet = new HashSet<>(); + + private static final String DEFAULT_FILE_PATH = "../resources/data/load-500.jsonl"; + + private static final String UTF_8 = "UTF-8"; + + private static final String RETRY_PREFIX = "retry-"; + private static final String RETRY_SUFFIX = ".jsonl"; + + private static final int THREAD_COUNT = 8; + + private static final int BACKLOG_FACTOR = 10; + + private static final int MAXIMUM_BACKLOG = THREAD_COUNT * BACKLOG_FACTOR; + + private static final long PAUSE_TIMEOUT = 100L; + + private static final String DATA_SOURCE = "DATA_SOURCE"; + private static final String RECORD_ID = "RECORD_ID"; + private static final String AFFECTED_ENTITIES = "AFFECTED_ENTITIES"; + private static final String ENTITY_ID = "ENTITY_ID"; + + private static final String ERROR = "ERROR"; + private static final String WARNING = "WARNING"; + private static final String CRITICAL = "CRITICAL"; + + public record Record(int lineNumber, String line) { + } + + private static int errorCount = 0; + private static int successCount = 0; + private static int retryCount = 0; + private static File retryFile = null; + private static PrintWriter retryWriter = null; + private static Set entityIdSet = new HashSet<>(); } diff --git a/java/snippets/loading/LoadWithStatsViaLoop.java b/java/snippets/loading/LoadWithStatsViaLoop.java index efd71ec..ac94faf 100644 --- a/java/snippets/loading/LoadWithStatsViaLoop.java +++ b/java/snippets/loading/LoadWithStatsViaLoop.java @@ -11,178 +11,178 @@ * Provides a simple example of adding records to the Senzing repository. */ public class LoadWithStatsViaLoop { - public static void main(String[] args) { - // get the senzing repository settings - String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); - if (settings == null) { - System.err.println("Unable to get settings."); - throw new IllegalArgumentException("Unable to get settings"); - } - - // create a descriptive instance name (can be anything) - String instanceName = LoadWithStatsViaLoop.class.getSimpleName(); + public static void main(String[] args) { + // get the senzing repository settings + String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); + if (settings == null) { + System.err.println("Unable to get settings."); + throw new IllegalArgumentException("Unable to get settings"); + } - // initialize the Senzing environment - SzEnvironment env = SzCoreEnvironment.newBuilder() - .settings(settings) - .instanceName(instanceName) - .verboseLogging(false) - .build(); + // create a descriptive instance name (can be anything) + String instanceName = LoadWithStatsViaLoop.class.getSimpleName(); + + // initialize the Senzing environment + SzEnvironment env = SzCoreEnvironment.newBuilder() + .settings(settings) + .instanceName(instanceName) + .verboseLogging(false) + .build(); + + String filePath = (args.length > 0) ? args[0] : DEFAULT_FILE_PATH; + + try (FileInputStream fis = new FileInputStream(filePath); + InputStreamReader isr = new InputStreamReader(fis, UTF_8); + BufferedReader br = new BufferedReader(isr)) { + // get the engine from the environment + SzEngine engine = env.getEngine(); + + int lineNumber = 0; + // loop through the example records and add them to the repository + for (String line = br.readLine(); line != null; line = br.readLine()) { + // increment the line number + lineNumber++; + + // trim the line + line = line.trim(); + + // skip any blank lines + if (line.length() == 0) { + continue; + } + + // skip any commented lines + if (line.startsWith("#")) { + continue; + } + + try { + // parse the line as a JSON object + JsonObject recordJson = Json.createReader(new StringReader(line)).readObject(); + + // extract the data source code and record ID + String dataSourceCode = recordJson.getString(DATA_SOURCE, null); + String recordId = recordJson.getString(RECORD_ID, null); + + // call the addRecord() function with no flags + engine.addRecord( + SzRecordKey.of(dataSourceCode, recordId), line, SZ_NO_FLAGS); + + successCount++; + + // check if it is time obtain stats + if ((successCount % STATS_INTERVAL) == 0) { + try { + String stats = engine.getStats(); + if (stats.length() > STATS_TRUNCATE) { + stats = stats.substring(0, STATS_TRUNCATE) + " ..."; + } + System.out.println("* STATS: " + stats); + + } catch (SzException e) { + // trap the stats exception so it is not misinterpreted + // as an exception from engine.addRecord() + System.err.println("**** FAILED TO OBTAIN STATS: " + e); + } + } + + } catch (JsonException | SzBadInputException e) { + logFailedRecord(ERROR, e, lineNumber, line); + errorCount++; // increment the error count + + } catch (SzRetryableException e) { + logFailedRecord(WARNING, e, lineNumber, line); + errorCount++; // increment the error count + retryCount++; // increment the retry count + + // track the retry record so it can be retried later + if (retryFile == null) { + retryFile = File.createTempFile(RETRY_PREFIX, RETRY_SUFFIX); + retryWriter = new PrintWriter( + new OutputStreamWriter(new FileOutputStream(retryFile), UTF_8)); + } + retryWriter.println(line); + + } catch (Exception e) { + // catch any other exception (incl. SzException) here + logFailedRecord(CRITICAL, e, lineNumber, line); + errorCount++; + throw e; // rethrow since exception is critical + } + } - String filePath = (args.length > 0) ? args[0] : DEFAULT_FILE_PATH; + } catch (Exception e) { + System.err.println(); + System.err.println("*** Terminated due to critical error ***"); + System.err.flush(); + if (e instanceof RuntimeException) { + throw ((RuntimeException) e); + } + throw new RuntimeException(e); - try (FileInputStream fis = new FileInputStream(filePath); - InputStreamReader isr = new InputStreamReader(fis, UTF_8); - BufferedReader br = new BufferedReader(isr)) { - // get the engine from the environment - SzEngine engine = env.getEngine(); + } finally { + // IMPORTANT: make sure to destroy the environment + env.destroy(); - int lineNumber = 0; - // loop through the example records and add them to the repository - for (String line = br.readLine(); line != null; line = br.readLine()) { - // increment the line number - lineNumber++; + System.out.println(); + System.out.println("Records successfully added : " + successCount); + System.out.println("Records failed with errors : " + errorCount); - // trim the line - line = line.trim(); + // check on any retry records + if (retryWriter != null) { + retryWriter.flush(); + retryWriter.close(); + } + if (retryCount > 0) { + System.out.println(retryCount + " records to be retried in " + retryFile); + } + System.out.flush(); - // skip any blank lines - if (line.length() == 0) { - continue; } - // skip any commented lines - if (line.startsWith("#")) { - continue; - } + } - try { - // parse the line as a JSON object - JsonObject recordJson = Json.createReader(new StringReader(line)).readObject(); - - // extract the data source code and record ID - String dataSourceCode = recordJson.getString(DATA_SOURCE, null); - String recordId = recordJson.getString(RECORD_ID, null); - - // call the addRecord() function with no flags - engine.addRecord( - SzRecordKey.of(dataSourceCode, recordId), line, SZ_NO_FLAGS); - - successCount++; - - // check if it is time obtain stats - if ((successCount % STATS_INTERVAL) == 0) { - try { - String stats = engine.getStats(); - if (stats.length() > STATS_TRUNCATE) { - stats = stats.substring(0, STATS_TRUNCATE) + " ..."; - } - System.out.println("* STATS: " + stats); - - } catch (SzException e) { - // trap the stats exception so it is not misinterpreted - // as an exception from engine.addRecord() - System.err.println("**** FAILED TO OBTAIN STATS: " + e); - } - } + /** + * Example method for logging failed records. + * + * @param errorType The error type description. + * @param exception The exception itself. + * @param lineNumber The line number of the failed record in the JSON input + * file. + * @param recordJson The JSON text for the failed record. + */ + private static void logFailedRecord(String errorType, + Exception exception, + int lineNumber, + String recordJson) { + System.err.println(); + System.err.println( + "** " + errorType + " ** FAILED TO ADD RECORD AT LINE " + lineNumber + ": "); + System.err.println(recordJson); + System.err.println(exception); + System.err.flush(); + } - } catch (JsonException | SzBadInputException e) { - logFailedRecord(ERROR, e, lineNumber, line); - errorCount++; // increment the error count + private static final String DEFAULT_FILE_PATH = "../resources/data/load-500.jsonl"; - } catch (SzRetryableException e) { - logFailedRecord(WARNING, e, lineNumber, line); - errorCount++; // increment the error count - retryCount++; // increment the retry count + private static final String UTF_8 = "UTF-8"; - // track the retry record so it can be retried later - if (retryFile == null) { - retryFile = File.createTempFile(RETRY_PREFIX, RETRY_SUFFIX); - retryWriter = new PrintWriter( - new OutputStreamWriter(new FileOutputStream(retryFile), UTF_8)); - } - retryWriter.println(line); + private static final int STATS_INTERVAL = 100; + private static final int STATS_TRUNCATE = 70; - } catch (Exception e) { - // catch any other exception (incl. SzException) here - logFailedRecord(CRITICAL, e, lineNumber, line); - errorCount++; - throw e; // rethrow since exception is critical - } - } - - } catch (Exception e) { - System.err.println(); - System.err.println("*** Terminated due to critical error ***"); - System.err.flush(); - if (e instanceof RuntimeException) { - throw ((RuntimeException) e); - } - throw new RuntimeException(e); - - } finally { - // IMPORTANT: make sure to destroy the environment - env.destroy(); - - System.out.println(); - System.out.println("Records successfully added : " + successCount); - System.out.println("Records failed with errors : " + errorCount); - - // check on any retry records - if (retryWriter != null) { - retryWriter.flush(); - retryWriter.close(); - } - if (retryCount > 0) { - System.out.println(retryCount + " records to be retried in " + retryFile); - } - System.out.flush(); + private static final String RETRY_PREFIX = "retry-"; + private static final String RETRY_SUFFIX = ".jsonl"; - } + private static final String DATA_SOURCE = "DATA_SOURCE"; + private static final String RECORD_ID = "RECORD_ID"; + + private static final String ERROR = "ERROR"; + private static final String WARNING = "WARNING"; + private static final String CRITICAL = "CRITICAL"; - } - - /** - * Example method for logging failed records. - * - * @param errorType The error type description. - * @param exception The exception itself. - * @param lineNumber The line number of the failed record in the JSON input - * file. - * @param recordJson The JSON text for the failed record. - */ - private static void logFailedRecord(String errorType, - Exception exception, - int lineNumber, - String recordJson) { - System.err.println(); - System.err.println( - "** " + errorType + " ** FAILED TO ADD RECORD AT LINE " + lineNumber + ": "); - System.err.println(recordJson); - System.err.println(exception); - System.err.flush(); - } - - private static final String DEFAULT_FILE_PATH = "../resources/data/load-500.jsonl"; - - private static final String UTF_8 = "UTF-8"; - - private static final int STATS_INTERVAL = 100; - private static final int STATS_TRUNCATE = 70; - - private static final String RETRY_PREFIX = "retry-"; - private static final String RETRY_SUFFIX = ".jsonl"; - - private static final String DATA_SOURCE = "DATA_SOURCE"; - private static final String RECORD_ID = "RECORD_ID"; - - private static final String ERROR = "ERROR"; - private static final String WARNING = "WARNING"; - private static final String CRITICAL = "CRITICAL"; - - private static int errorCount = 0; - private static int successCount = 0; - private static int retryCount = 0; - private static File retryFile = null; - private static PrintWriter retryWriter = null; + private static int errorCount = 0; + private static int successCount = 0; + private static int retryCount = 0; + private static File retryFile = null; + private static PrintWriter retryWriter = null; } diff --git a/java/snippets/redo/RedoContinuous.java b/java/snippets/redo/RedoContinuous.java index 69fa7a6..4ef6c9c 100644 --- a/java/snippets/redo/RedoContinuous.java +++ b/java/snippets/redo/RedoContinuous.java @@ -12,162 +12,162 @@ * the pending redo records in the Senzing repository. */ public class RedoContinuous { - public static void main(String[] args) { - // get the senzing repository settings - String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); - if (settings == null) { - System.err.println("Unable to get settings."); - throw new IllegalArgumentException("Unable to get settings"); - } - - // create a descriptive instance name (can be anything) - String instanceName = RedoContinuous.class.getSimpleName(); - - // initialize the Senzing environment - SzEnvironment env = SzCoreEnvironment.newBuilder() - .settings(settings) - .instanceName(instanceName) - .verboseLogging(false) - .build(); - - // make sure we cleanup if exiting by CTRL-C or due to an exception - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - // IMPORTANT: make sure to destroy the environment - env.destroy(); - outputRedoStatistics(); - })); - - try { - // get the engine from the environment - SzEngine engine = env.getEngine(); - - while (true) { - // get the next redo record - String redo = engine.getRedoRecord(); - - // check if no redo records are available - if (redo == null) { - outputRedoStatistics(); - System.out.println(); - System.out.println( - "No redo records to process. Pausing for " - + REDO_PAUSE_DESCRIPTION + "...."); - System.out.println("Press CTRL-C to exit."); - try { - Thread.sleep(REDO_PAUSE_TIMEOUT); - } catch (InterruptedException ignore) { - // ignore the exception - } - continue; + public static void main(String[] args) { + // get the senzing repository settings + String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); + if (settings == null) { + System.err.println("Unable to get settings."); + throw new IllegalArgumentException("Unable to get settings"); } - try { - // process the redo record - engine.processRedoRecord(redo, SZ_NO_FLAGS); + // create a descriptive instance name (can be anything) + String instanceName = RedoContinuous.class.getSimpleName(); - // increment the redone count - redoneCount++; + // initialize the Senzing environment + SzEnvironment env = SzCoreEnvironment.newBuilder() + .settings(settings) + .instanceName(instanceName) + .verboseLogging(false) + .build(); - } catch (SzRetryableException e) { - logFailedRedo(WARNING, e, redo); - errorCount++; - retryCount++; - trackRetryRecord(redo); + // make sure we cleanup if exiting by CTRL-C or due to an exception + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + // IMPORTANT: make sure to destroy the environment + env.destroy(); + outputRedoStatistics(); + })); + + try { + // get the engine from the environment + SzEngine engine = env.getEngine(); + + while (true) { + // get the next redo record + String redo = engine.getRedoRecord(); + + // check if no redo records are available + if (redo == null) { + outputRedoStatistics(); + System.out.println(); + System.out.println( + "No redo records to process. Pausing for " + + REDO_PAUSE_DESCRIPTION + "...."); + System.out.println("Press CTRL-C to exit."); + try { + Thread.sleep(REDO_PAUSE_TIMEOUT); + } catch (InterruptedException ignore) { + // ignore the exception + } + continue; + } + + try { + // process the redo record + engine.processRedoRecord(redo, SZ_NO_FLAGS); + + // increment the redone count + redoneCount++; + + } catch (SzRetryableException e) { + logFailedRedo(WARNING, e, redo); + errorCount++; + retryCount++; + trackRetryRecord(redo); + + } catch (Exception e) { + logFailedRedo(CRITICAL, e, redo); + errorCount++; + throw e; + } + } } catch (Exception e) { - logFailedRedo(CRITICAL, e, redo); - errorCount++; - throw e; + System.err.println(); + System.err.println("*** Terminated due to critical error ***"); + System.err.flush(); + if (e instanceof RuntimeException) { + throw ((RuntimeException) e); + } + throw new RuntimeException(e); + + } finally { + // normally we would call env.destroy() here, but we have registered + // a shutdown hook to do that since termination will typically occur + // via CTRL-C being pressed, and the shutdown hook will still run if + // we get an exception } - } - - } catch (Exception e) { - System.err.println(); - System.err.println("*** Terminated due to critical error ***"); - System.err.flush(); - if (e instanceof RuntimeException) { - throw ((RuntimeException) e); - } - throw new RuntimeException(e); - - } finally { - // normally we would call env.destroy() here, but we have registered - // a shutdown hook to do that since termination will typically occur - // via CTRL-C being pressed, and the shutdown hook will still run if - // we get an exception - } - } + } - private static void outputRedoStatistics() { - System.out.println(); - System.out.println("Redos successfully processed : " + redoneCount); - System.out.println("Total failed records/redos : " + errorCount); + private static void outputRedoStatistics() { + System.out.println(); + System.out.println("Redos successfully processed : " + redoneCount); + System.out.println("Total failed records/redos : " + errorCount); - // check on any retry records - if (retryWriter != null) { - retryWriter.flush(); - retryWriter.close(); + // check on any retry records + if (retryWriter != null) { + retryWriter.flush(); + retryWriter.close(); + } + if (retryCount > 0) { + System.out.println( + retryCount + " records/redos to be retried in " + retryFile); + } + System.out.flush(); } - if (retryCount > 0) { - System.out.println( - retryCount + " records/redos to be retried in " + retryFile); + + /** + * Example method for logging failed records. + * + * @param errorType The error type description. + * @param exception The exception itself. + * @param redoRecord The JSON text for the redo record. + */ + private static void logFailedRedo(String errorType, + Exception exception, + String redoRecord) { + System.err.println(); + System.err.println("** " + errorType + " ** FAILED TO PROCESS REDO: "); + System.err.println(redoRecord); + System.err.println(exception); + System.err.flush(); } - System.out.flush(); - } - - /** - * Example method for logging failed records. - * - * @param errorType The error type description. - * @param exception The exception itself. - * @param redoRecord The JSON text for the redo record. - */ - private static void logFailedRedo(String errorType, - Exception exception, - String redoRecord) { - System.err.println(); - System.err.println("** " + errorType + " ** FAILED TO PROCESS REDO: "); - System.err.println(redoRecord); - System.err.println(exception); - System.err.flush(); - } - - /** - * Tracks the specified JSON record definition to be retried in a - * retry file. - * - * @param recordJson The JSON text defining the record to be retried. - * - * @throws IOException If a failure occurs in writing the record to the - * retry file. - */ - private static void trackRetryRecord(String recordJson) - throws IOException { - // track the retry record so it can be retried later - if (retryFile == null) { - retryFile = File.createTempFile(RETRY_PREFIX, RETRY_SUFFIX); - retryWriter = new PrintWriter( - new OutputStreamWriter(new FileOutputStream(retryFile), UTF_8)); + + /** + * Tracks the specified JSON record definition to be retried in a + * retry file. + * + * @param recordJson The JSON text defining the record to be retried. + * + * @throws IOException If a failure occurs in writing the record to the + * retry file. + */ + private static void trackRetryRecord(String recordJson) + throws IOException { + // track the retry record so it can be retried later + if (retryFile == null) { + retryFile = File.createTempFile(RETRY_PREFIX, RETRY_SUFFIX); + retryWriter = new PrintWriter( + new OutputStreamWriter(new FileOutputStream(retryFile), UTF_8)); + } + retryWriter.println(recordJson); } - retryWriter.println(recordJson); - } - private static final String UTF_8 = "UTF-8"; + private static final String UTF_8 = "UTF-8"; - private static final String RETRY_PREFIX = "retry-"; - private static final String RETRY_SUFFIX = ".jsonl"; + private static final String RETRY_PREFIX = "retry-"; + private static final String RETRY_SUFFIX = ".jsonl"; - private static final long REDO_PAUSE_TIMEOUT = 30000L; + private static final long REDO_PAUSE_TIMEOUT = 30000L; - private static final String REDO_PAUSE_DESCRIPTION = "30 seconds"; + private static final String REDO_PAUSE_DESCRIPTION = "30 seconds"; - private static final String WARNING = "WARNING"; - private static final String CRITICAL = "CRITICAL"; + private static final String WARNING = "WARNING"; + private static final String CRITICAL = "CRITICAL"; - private static int errorCount = 0; - private static int redoneCount = 0; - private static int retryCount = 0; - private static File retryFile = null; - private static PrintWriter retryWriter = null; + private static int errorCount = 0; + private static int redoneCount = 0; + private static int retryCount = 0; + private static File retryFile = null; + private static PrintWriter retryWriter = null; } diff --git a/java/snippets/redo/RedoContinuousViaFutures.java b/java/snippets/redo/RedoContinuousViaFutures.java index e897d1a..83fdf0a 100644 --- a/java/snippets/redo/RedoContinuousViaFutures.java +++ b/java/snippets/redo/RedoContinuousViaFutures.java @@ -15,271 +15,269 @@ * futures. */ public class RedoContinuousViaFutures { - public static void main(String[] args) { - // get the senzing repository settings - String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); - if (settings == null) { - System.err.println("Unable to get settings."); - throw new IllegalArgumentException("Unable to get settings"); - } - - // create a descriptive instance name (can be anything) - String instanceName = RedoContinuousViaFutures.class.getSimpleName(); - - // initialize the Senzing environment - SzEnvironment env = SzCoreEnvironment.newBuilder() - .settings(settings) - .instanceName(instanceName) - .verboseLogging(false) - .build(); - - // create the thread pool and executor service - ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); - - // keep track of pending futures and don't backlog too many for memory's sake - Map, String> pendingFutures = new IdentityHashMap<>(); - - // make sure we cleanup if exiting by CTRL-C or due to an exception - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - // shutdown the executor service - if (!executor.isShutdown()) { - executor.shutdown(); - } - - try { - handlePendingFutures(pendingFutures, true); - } catch (Exception e) { - e.printStackTrace(); - } - - // IMPORTANT: make sure to destroy the environment - env.destroy(); - outputRedoStatistics(); - })); - - try { - // get the engine from the environment - SzEngine engine = env.getEngine(); - - while (true) { - // loop through the example records and queue them up so long - // as we have more records and backlog is not too large - while (pendingFutures.size() < MAXIMUM_BACKLOG) { - - // get the next redo record - String redo = engine.getRedoRecord(); - - // check if no redo records are available - if (redo == null) { - break; - } - - Future future = executor.submit(() -> { - // process the redo record - engine.processRedoRecord(redo, SZ_NO_FLAGS); - - // return null since we have no "info" to return - return null; - }); - - // add the future to the pending future list - pendingFutures.put(future, redo); + public static void main(String[] args) { + // get the senzing repository settings + String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); + if (settings == null) { + System.err.println("Unable to get settings."); + throw new IllegalArgumentException("Unable to get settings"); } - do { - // handle any pending futures WITHOUT blocking to reduce the backlog - handlePendingFutures(pendingFutures, false); - - // if we still have exceeded the backlog size then pause - // briefly before trying again - if (pendingFutures.size() >= MAXIMUM_BACKLOG) { - try { - Thread.sleep(HANDLE_PAUSE_TIMEOUT); + // create a descriptive instance name (can be anything) + String instanceName = RedoContinuousViaFutures.class.getSimpleName(); - } catch (InterruptedException ignore) { - // do nothing - } - } - } while (pendingFutures.size() >= MAXIMUM_BACKLOG); - - // check if there are no redo records right now - // NOTE: we do NOT want to call countRedoRecords() in a loop that - // is processing redo records, we call it here AFTER we believe - // have processed all pending redos to confirm still zero - if (engine.countRedoRecords() == 0) { - outputRedoStatistics(); - System.out.println(); - System.out.println( - "No redo records to process. Pausing for " - + REDO_PAUSE_DESCRIPTION + "...."); - System.out.println("Press CTRL-C to exit."); - try { - Thread.sleep(REDO_PAUSE_TIMEOUT); - } catch (InterruptedException ignore) { - // ignore the exception - } - continue; - } - } - - } catch (Exception e) { - System.err.println(); - System.err.println("*** Terminated due to critical error ***"); - System.err.flush(); - if (e instanceof RuntimeException) { - throw ((RuntimeException) e); - } - throw new RuntimeException(e); - - } finally { - // normally we would call env.destroy() here, but we have registered - // a shutdown hook to do that since termination will typically occur - // via CTRL-C being pressed, and the shutdown hook will still run if - // we get an exception - } + // initialize the Senzing environment + SzEnvironment env = SzCoreEnvironment.newBuilder() + .settings(settings) + .instanceName(instanceName) + .verboseLogging(false) + .build(); - } + // create the thread pool and executor service + ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); - private static void handlePendingFutures(Map, String> pendingFutures, - boolean blocking) - throws Exception { - // check for completed futures - Iterator, String>> iter = pendingFutures.entrySet().iterator(); + // keep track of pending futures and don't backlog too many for memory's sake + Map, String> pendingFutures = new IdentityHashMap<>(); - // loop through the pending futures - while (iter.hasNext()) { - // get the next pending future - Map.Entry, String> entry = iter.next(); - Future future = entry.getKey(); - String redoRecord = entry.getValue(); + // make sure we cleanup if exiting by CTRL-C or due to an exception + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + // shutdown the executor service + if (!executor.isShutdown()) { + executor.shutdown(); + } - // if not blocking and this one is not done then continue - if (!blocking && !future.isDone()) { - continue; - } + try { + handlePendingFutures(pendingFutures, true); + } catch (Exception e) { + e.printStackTrace(); + } - // remove the pending future from the map - iter.remove(); + // IMPORTANT: make sure to destroy the environment + env.destroy(); + outputRedoStatistics(); + })); - try { try { - // get the value to see if there was an exception - future.get(); - - // if we get here then increment the success count - redoneCount++; - - } catch (InterruptedException e) { - // this could only happen if blocking is true, just - // rethrow as retryable and log the interruption - throw e; - - } catch (ExecutionException e) { - // if execution failed with an exception then rethrow - Throwable cause = e.getCause(); - if ((cause == null) || !(cause instanceof Exception)) { - // rethrow the execution exception - throw e; - } - // cast to an Exception and rethrow - throw ((Exception) cause); + // get the engine from the environment + SzEngine engine = env.getEngine(); + + while (true) { + // loop through the example records and queue them up so long + // as we have more records and backlog is not too large + for (String redo = engine.getRedoRecord(); + redo != null; + redo = engine.getRedoRecord()) + { + final String currentRedo = redo; // need final reference for lambda + + Future future = executor.submit(() -> { + // process the redo record + return engine.processRedoRecord(currentRedo, SZ_NO_FLAGS); + }); + + // add the future to the pending future list + pendingFutures.put(future, redo); + + // handle the pending futures as log as maximum backlog exceeded + for (int loop = 0; + pendingFutures.size() >= MAXIMUM_BACKLOG; + loop++) + { + // check if this is NOT our first iteration through the loop + if (loop > 0) { + // if we still have exceeded the backlog size after the first + // loop iteration then pause briefly before trying again + try { + Thread.sleep(HANDLE_PAUSE_TIMEOUT); + + } catch (InterruptedException ignore) { + // do nothing + } + } + + // handle any pending futures WITHOUT blocking to reduce the backlog + handlePendingFutures(pendingFutures, false); + } + } + + // check if there are no redo records right now + // NOTE: we do NOT want to call countRedoRecords() in a loop that + // is processing redo records, we call it here AFTER we believe + // have processed all pending redos to confirm still zero + if (engine.countRedoRecords() == 0) { + outputRedoStatistics(); + System.out.println(); + System.out.println( + "No redo records to process. Pausing for " + + REDO_PAUSE_DESCRIPTION + "...."); + System.out.println("Press CTRL-C to exit."); + try { + Thread.sleep(REDO_PAUSE_TIMEOUT); + } catch (InterruptedException ignore) { + // ignore the exception + } + continue; + } + } + + } catch (Exception e) { + System.err.println(); + System.err.println("*** Terminated due to critical error ***"); + System.err.flush(); + if (e instanceof RuntimeException) { + throw ((RuntimeException) e); + } + throw new RuntimeException(e); + + } finally { + // normally we would call env.destroy() here, but we have registered + // a shutdown hook to do that since termination will typically occur + // via CTRL-C being pressed, and the shutdown hook will still run if + // we get an exception } - } catch (SzRetryableException | InterruptedException | CancellationException e) { - // handle thread interruption and cancellation as retries - logFailedRedo(WARNING, e, redoRecord); - errorCount++; // increment the error count - retryCount++; // increment the retry count + } - // track the retry record so it can be retried later - trackRetryRecord(redoRecord); - - } catch (Exception e) { - // catch any other exception (incl. SzException) here - logFailedRedo(CRITICAL, e, redoRecord); - errorCount++; - throw e; // rethrow since exception is critical - } + private static void handlePendingFutures(Map, String> pendingFutures, + boolean blocking) + throws Exception { + // check for completed futures + Iterator, String>> iter = pendingFutures.entrySet().iterator(); + + // loop through the pending futures + while (iter.hasNext()) { + // get the next pending future + Map.Entry, String> entry = iter.next(); + Future future = entry.getKey(); + String redoRecord = entry.getValue(); + + // if not blocking and this one is not done then continue + if (!blocking && !future.isDone()) { + continue; + } + + // remove the pending future from the map + iter.remove(); + + try { + try { + // get the value to see if there was an exception + future.get(); + + // if we get here then increment the success count + redoneCount++; + + } catch (InterruptedException e) { + // this could only happen if blocking is true, just + // rethrow as retryable and log the interruption + throw e; + + } catch (ExecutionException e) { + // if execution failed with an exception then rethrow + Throwable cause = e.getCause(); + if ((cause == null) || !(cause instanceof Exception)) { + // rethrow the execution exception + throw e; + } + // cast to an Exception and rethrow + throw ((Exception) cause); + } + + } catch (SzRetryableException | InterruptedException | CancellationException e) { + // handle thread interruption and cancellation as retries + logFailedRedo(WARNING, e, redoRecord); + errorCount++; // increment the error count + retryCount++; // increment the retry count + + // track the retry record so it can be retried later + trackRetryRecord(redoRecord); + + } catch (Exception e) { + // catch any other exception (incl. SzException) here + logFailedRedo(CRITICAL, e, redoRecord); + errorCount++; + throw e; // rethrow since exception is critical + } + } } - } - private static void outputRedoStatistics() { - System.out.println(); - System.out.println("Redos successfully processed : " + redoneCount); - System.out.println("Total failed records/redos : " + errorCount); + private static void outputRedoStatistics() { + System.out.println(); + System.out.println("Redos successfully processed : " + redoneCount); + System.out.println("Total failed records/redos : " + errorCount); - // check on any retry records - if (retryWriter != null) { - retryWriter.flush(); - retryWriter.close(); + // check on any retry records + if (retryWriter != null) { + retryWriter.flush(); + retryWriter.close(); + } + if (retryCount > 0) { + System.out.println( + retryCount + " records/redos to be retried in " + retryFile); + } + System.out.flush(); } - if (retryCount > 0) { - System.out.println( - retryCount + " records/redos to be retried in " + retryFile); + + /** + * Example method for logging failed records. + * + * @param errorType The error type description. + * @param exception The exception itself. + * @param redoRecord The JSON text for the redo record. + */ + private static void logFailedRedo(String errorType, + Exception exception, + String redoRecord) { + System.err.println(); + System.err.println("** " + errorType + " ** FAILED TO PROCESS REDO: "); + System.err.println(redoRecord); + System.err.println(exception); + System.err.flush(); } - System.out.flush(); - } - - /** - * Example method for logging failed records. - * - * @param errorType The error type description. - * @param exception The exception itself. - * @param redoRecord The JSON text for the redo record. - */ - private static void logFailedRedo(String errorType, - Exception exception, - String redoRecord) { - System.err.println(); - System.err.println("** " + errorType + " ** FAILED TO PROCESS REDO: "); - System.err.println(redoRecord); - System.err.println(exception); - System.err.flush(); - } - - /** - * Tracks the specified JSON record definition to be retried in a - * retry file. - * - * @param recordJson The JSON text defining the record to be retried. - * - * @throws IOException If a failure occurs in writing the record to the - * retry file. - */ - private static void trackRetryRecord(String recordJson) - throws IOException { - // track the retry record so it can be retried later - if (retryFile == null) { - retryFile = File.createTempFile(RETRY_PREFIX, RETRY_SUFFIX); - retryWriter = new PrintWriter( - new OutputStreamWriter(new FileOutputStream(retryFile), UTF_8)); + + /** + * Tracks the specified JSON record definition to be retried in a + * retry file. + * + * @param recordJson The JSON text defining the record to be retried. + * + * @throws IOException If a failure occurs in writing the record to the + * retry file. + */ + private static void trackRetryRecord(String recordJson) + throws IOException { + // track the retry record so it can be retried later + if (retryFile == null) { + retryFile = File.createTempFile(RETRY_PREFIX, RETRY_SUFFIX); + retryWriter = new PrintWriter( + new OutputStreamWriter(new FileOutputStream(retryFile), UTF_8)); + } + retryWriter.println(recordJson); } - retryWriter.println(recordJson); - } - private static final String UTF_8 = "UTF-8"; + private static final String UTF_8 = "UTF-8"; - private static final String RETRY_PREFIX = "retry-"; - private static final String RETRY_SUFFIX = ".jsonl"; + private static final String RETRY_PREFIX = "retry-"; + private static final String RETRY_SUFFIX = ".jsonl"; - private static final int THREAD_COUNT = 8; + private static final int THREAD_COUNT = 8; - private static final int BACKLOG_FACTOR = 10; + private static final int BACKLOG_FACTOR = 10; - private static final int MAXIMUM_BACKLOG = THREAD_COUNT * BACKLOG_FACTOR; + private static final int MAXIMUM_BACKLOG = THREAD_COUNT * BACKLOG_FACTOR; - private static final long HANDLE_PAUSE_TIMEOUT = 100L; + private static final long HANDLE_PAUSE_TIMEOUT = 100L; - private static final long REDO_PAUSE_TIMEOUT = 30000L; + private static final long REDO_PAUSE_TIMEOUT = 30000L; - private static final String REDO_PAUSE_DESCRIPTION = "30 seconds"; + private static final String REDO_PAUSE_DESCRIPTION = "30 seconds"; - private static final String WARNING = "WARNING"; - private static final String CRITICAL = "CRITICAL"; + private static final String WARNING = "WARNING"; + private static final String CRITICAL = "CRITICAL"; - private static int errorCount = 0; - private static int redoneCount = 0; - private static int retryCount = 0; - private static File retryFile = null; - private static PrintWriter retryWriter = null; + private static int errorCount = 0; + private static int redoneCount = 0; + private static int retryCount = 0; + private static File retryFile = null; + private static PrintWriter retryWriter = null; } diff --git a/java/snippets/redo/RedoWithInfoContinuous.java b/java/snippets/redo/RedoWithInfoContinuous.java index cd6b7b6..e0d4948 100644 --- a/java/snippets/redo/RedoWithInfoContinuous.java +++ b/java/snippets/redo/RedoWithInfoContinuous.java @@ -20,205 +20,205 @@ * the INFO messages returned from processing those redo records. */ public class RedoWithInfoContinuous { - public static void main(String[] args) { - // get the senzing repository settings - String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); - if (settings == null) { - System.err.println("Unable to get settings."); - throw new IllegalArgumentException("Unable to get settings"); - } - - // create a descriptive instance name (can be anything) - String instanceName = RedoWithInfoContinuous.class.getSimpleName(); - - // initialize the Senzing environment - SzEnvironment env = SzCoreEnvironment.newBuilder() - .settings(settings) - .instanceName(instanceName) - .verboseLogging(false) - .build(); - - // make sure we cleanup if exiting by CTRL-C or due to an exception - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - // IMPORTANT: make sure to destroy the environment - env.destroy(); - outputRedoStatistics(); - })); - - try { - // get the engine from the environment - SzEngine engine = env.getEngine(); - - while (true) { - // get the next redo record - String redo = engine.getRedoRecord(); - - // check if no redo records are available - if (redo == null) { - outputRedoStatistics(); - System.out.println(); - System.out.println( - "No redo records to process. Pausing for " - + REDO_PAUSE_DESCRIPTION + "...."); - System.out.println("Press CTRL-C to exit."); - try { - Thread.sleep(REDO_PAUSE_TIMEOUT); - } catch (InterruptedException ignore) { - // ignore the exception - } - continue; + public static void main(String[] args) { + // get the senzing repository settings + String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); + if (settings == null) { + System.err.println("Unable to get settings."); + throw new IllegalArgumentException("Unable to get settings"); } - try { - // process the redo record - String info = engine.processRedoRecord(redo, SZ_WITH_INFO_FLAGS); + // create a descriptive instance name (can be anything) + String instanceName = RedoWithInfoContinuous.class.getSimpleName(); - // increment the redone count - redoneCount++; + // initialize the Senzing environment + SzEnvironment env = SzCoreEnvironment.newBuilder() + .settings(settings) + .instanceName(instanceName) + .verboseLogging(false) + .build(); - // process the info - processInfo(engine, info); + // make sure we cleanup if exiting by CTRL-C or due to an exception + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + // IMPORTANT: make sure to destroy the environment + env.destroy(); + outputRedoStatistics(); + })); - } catch (SzRetryableException e) { - logFailedRedo(WARNING, e, redo); - errorCount++; - retryCount++; - trackRetryRecord(redo); + try { + // get the engine from the environment + SzEngine engine = env.getEngine(); + + while (true) { + // get the next redo record + String redo = engine.getRedoRecord(); + + // check if no redo records are available + if (redo == null) { + outputRedoStatistics(); + System.out.println(); + System.out.println( + "No redo records to process. Pausing for " + + REDO_PAUSE_DESCRIPTION + "...."); + System.out.println("Press CTRL-C to exit."); + try { + Thread.sleep(REDO_PAUSE_TIMEOUT); + } catch (InterruptedException ignore) { + // ignore the exception + } + continue; + } + + try { + // process the redo record + String info = engine.processRedoRecord(redo, SZ_WITH_INFO_FLAGS); + + // increment the redone count + redoneCount++; + + // process the info + processInfo(engine, info); + + } catch (SzRetryableException e) { + logFailedRedo(WARNING, e, redo); + errorCount++; + retryCount++; + trackRetryRecord(redo); + + } catch (Exception e) { + logFailedRedo(CRITICAL, e, redo); + errorCount++; + throw e; + } + } } catch (Exception e) { - logFailedRedo(CRITICAL, e, redo); - errorCount++; - throw e; + System.err.println(); + System.err.println("*** Terminated due to critical error ***"); + System.err.flush(); + if (e instanceof RuntimeException) { + throw ((RuntimeException) e); + } + throw new RuntimeException(e); + + } finally { + // normally we would call env.destroy() here, but we have registered + // a shutdown hook to do that since termination will typically occur + // via CTRL-C being pressed, and the shutdown hook will still run if + // we get an exception } - } - - } catch (Exception e) { - System.err.println(); - System.err.println("*** Terminated due to critical error ***"); - System.err.flush(); - if (e instanceof RuntimeException) { - throw ((RuntimeException) e); - } - throw new RuntimeException(e); - - } finally { - // normally we would call env.destroy() here, but we have registered - // a shutdown hook to do that since termination will typically occur - // via CTRL-C being pressed, and the shutdown hook will still run if - // we get an exception - } - } + } - private static void outputRedoStatistics() { - System.out.println(); - System.out.println("Redos successfully processed : " + redoneCount); - System.out.println("Total entities affected : " + entityIdSet.size()); - System.out.println("Total failed records/redos : " + errorCount); + private static void outputRedoStatistics() { + System.out.println(); + System.out.println("Redos successfully processed : " + redoneCount); + System.out.println("Total entities affected : " + entityIdSet.size()); + System.out.println("Total failed records/redos : " + errorCount); - // check on any retry records - if (retryWriter != null) { - retryWriter.flush(); - retryWriter.close(); - } - if (retryCount > 0) { - System.out.println( - retryCount + " records/redos to be retried in " + retryFile); - } - System.out.flush(); - } - - /** - * Example method for logging failed records. - * - * @param errorType The error type description. - * @param exception The exception itself. - * @param redoRecord The JSON text for the redo record. - */ - private static void logFailedRedo(String errorType, - Exception exception, - String redoRecord) { - System.err.println(); - System.err.println("** " + errorType + " ** FAILED TO PROCESS REDO: "); - System.err.println(redoRecord); - System.err.println(exception); - System.err.flush(); - } - - /** - * Tracks the specified JSON record definition to be retried in a - * retry file. - * - * @param recordJson The JSON text defining the record to be retried. - * - * @throws IOException If a failure occurs in writing the record to the - * retry file. - */ - private static void trackRetryRecord(String recordJson) - throws IOException { - // track the retry record so it can be retried later - if (retryFile == null) { - retryFile = File.createTempFile(RETRY_PREFIX, RETRY_SUFFIX); - retryWriter = new PrintWriter( - new OutputStreamWriter(new FileOutputStream(retryFile), UTF_8)); - } - retryWriter.println(recordJson); - } - - /** - * Example method for parsing and handling the INFO message (formatted - * as JSON). This example implementation simply tracks all entity ID's - * that appear as "AFFECTED_ENTITIES" to count the number - * of entities created for the records -- essentially a contrived - * data mart. - * - * @param engine The {@link SzEngine} to use. - * @param info The info message. - */ - private static void processInfo(SzEngine engine, String info) { - JsonObject jsonObject = Json.createReader(new StringReader(info)).readObject(); - if (!jsonObject.containsKey(AFFECTED_ENTITIES)) { - return; + // check on any retry records + if (retryWriter != null) { + retryWriter.flush(); + retryWriter.close(); + } + if (retryCount > 0) { + System.out.println( + retryCount + " records/redos to be retried in " + retryFile); + } + System.out.flush(); } - JsonArray affectedArr = jsonObject.getJsonArray(AFFECTED_ENTITIES); - for (JsonObject affected : affectedArr.getValuesAs(JsonObject.class)) { - JsonNumber number = affected.getJsonNumber(ENTITY_ID); - long entityId = number.longValue(); - - try { - engine.getEntity(entityId, null); - entityIdSet.add(entityId); - } catch (SzNotFoundException e) { - entityIdSet.remove(entityId); - } catch (SzException e) { - // simply log the exception, do not rethrow + + /** + * Example method for logging failed records. + * + * @param errorType The error type description. + * @param exception The exception itself. + * @param redoRecord The JSON text for the redo record. + */ + private static void logFailedRedo(String errorType, + Exception exception, + String redoRecord) { System.err.println(); - System.err.println("**** FAILED TO RETRIEVE ENTITY: " + entityId); - System.err.println(e.toString()); + System.err.println("** " + errorType + " ** FAILED TO PROCESS REDO: "); + System.err.println(redoRecord); + System.err.println(exception); System.err.flush(); - } } - } - private static final String UTF_8 = "UTF-8"; + /** + * Tracks the specified JSON record definition to be retried in a + * retry file. + * + * @param recordJson The JSON text defining the record to be retried. + * + * @throws IOException If a failure occurs in writing the record to the + * retry file. + */ + private static void trackRetryRecord(String recordJson) + throws IOException { + // track the retry record so it can be retried later + if (retryFile == null) { + retryFile = File.createTempFile(RETRY_PREFIX, RETRY_SUFFIX); + retryWriter = new PrintWriter( + new OutputStreamWriter(new FileOutputStream(retryFile), UTF_8)); + } + retryWriter.println(recordJson); + } + + /** + * Example method for parsing and handling the INFO message (formatted + * as JSON). This example implementation simply tracks all entity ID's + * that appear as "AFFECTED_ENTITIES" to count the number + * of entities created for the records -- essentially a contrived + * data mart. + * + * @param engine The {@link SzEngine} to use. + * @param info The info message. + */ + private static void processInfo(SzEngine engine, String info) { + JsonObject jsonObject = Json.createReader(new StringReader(info)).readObject(); + if (!jsonObject.containsKey(AFFECTED_ENTITIES)) { + return; + } + JsonArray affectedArr = jsonObject.getJsonArray(AFFECTED_ENTITIES); + for (JsonObject affected : affectedArr.getValuesAs(JsonObject.class)) { + JsonNumber number = affected.getJsonNumber(ENTITY_ID); + long entityId = number.longValue(); + + try { + engine.getEntity(entityId, null); + entityIdSet.add(entityId); + } catch (SzNotFoundException e) { + entityIdSet.remove(entityId); + } catch (SzException e) { + // simply log the exception, do not rethrow + System.err.println(); + System.err.println("**** FAILED TO RETRIEVE ENTITY: " + entityId); + System.err.println(e.toString()); + System.err.flush(); + } + } + } + + private static final String UTF_8 = "UTF-8"; - private static final String RETRY_PREFIX = "retry-"; - private static final String RETRY_SUFFIX = ".jsonl"; + private static final String RETRY_PREFIX = "retry-"; + private static final String RETRY_SUFFIX = ".jsonl"; - private static final long REDO_PAUSE_TIMEOUT = 30000L; + private static final long REDO_PAUSE_TIMEOUT = 30000L; - private static final String REDO_PAUSE_DESCRIPTION = "30 seconds"; + private static final String REDO_PAUSE_DESCRIPTION = "30 seconds"; - private static final String AFFECTED_ENTITIES = "AFFECTED_ENTITIES"; - private static final String ENTITY_ID = "ENTITY_ID"; + private static final String AFFECTED_ENTITIES = "AFFECTED_ENTITIES"; + private static final String ENTITY_ID = "ENTITY_ID"; - private static final String WARNING = "WARNING"; - private static final String CRITICAL = "CRITICAL"; + private static final String WARNING = "WARNING"; + private static final String CRITICAL = "CRITICAL"; - private static int errorCount = 0; - private static int redoneCount = 0; - private static int retryCount = 0; - private static File retryFile = null; - private static PrintWriter retryWriter = null; - private static Set entityIdSet = new HashSet<>(); + private static int errorCount = 0; + private static int redoneCount = 0; + private static int retryCount = 0; + private static File retryFile = null; + private static PrintWriter retryWriter = null; + private static Set entityIdSet = new HashSet<>(); } diff --git a/java/snippets/searching/SearchRecords.java b/java/snippets/searching/SearchRecords.java index 2c29828..468056d 100644 --- a/java/snippets/searching/SearchRecords.java +++ b/java/snippets/searching/SearchRecords.java @@ -14,110 +14,110 @@ * repository. */ public class SearchRecords { - public static void main(String[] args) { - // get the senzing repository settings - String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); - if (settings == null) { - System.err.println("Unable to get settings."); - throw new IllegalArgumentException("Unable to get settings"); - } - - // create a descriptive instance name (can be anything) - String instanceName = SearchRecords.class.getSimpleName(); - - // initialize the Senzing environment - SzEnvironment env = SzCoreEnvironment.newBuilder() - .settings(settings) - .instanceName(instanceName) - .verboseLogging(false) - .build(); - - try { - // get the engine from the environment - SzEngine engine = env.getEngine(); - - // loop through the example records and add them to the repository - for (String criteria : getSearchCriteria()) { - // call the searchByAttributes() function with default flags - String result = engine.searchByAttributes( - criteria, SZ_SEARCH_BY_ATTRIBUTES_DEFAULT_FLAGS); - - JsonObject jsonObj = Json.createReader( - new StringReader(result)).readObject(); - - System.out.println(); - JsonArray jsonArr = jsonObj.getJsonArray("RESOLVED_ENTITIES"); - if (jsonArr.size() == 0) { - System.out.println("No results for criteria: " + criteria); - } else { - System.out.println("Results for criteria: " + criteria); - for (JsonObject obj : jsonArr.getValuesAs(JsonObject.class)) { - obj = obj.getJsonObject("ENTITY"); - obj = obj.getJsonObject("RESOLVED_ENTITY"); - long entityId = obj.getJsonNumber("ENTITY_ID").longValue(); - String name = obj.getString("ENTITY_NAME", null); - System.out.println(entityId + ": " + name); - } + public static void main(String[] args) { + // get the senzing repository settings + String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); + if (settings == null) { + System.err.println("Unable to get settings."); + throw new IllegalArgumentException("Unable to get settings"); } - System.out.flush(); - } - - } catch (SzException e) { - // handle any exception that may have occurred - System.err.println("Senzing Error Message : " + e.getMessage()); - System.err.println("Senzing Error Code : " + e.getErrorCode()); - e.printStackTrace(); - throw new RuntimeException(e); - - } catch (Exception e) { - e.printStackTrace(); - if (e instanceof RuntimeException) { - throw ((RuntimeException) e); - } - throw new RuntimeException(e); - - } finally { - // IMPORTANT: make sure to destroy the environment - env.destroy(); - } - } - - /** - * This is a support method for providing a list of criteria to search on. - * - * @return A {@link List} {@link String} JSON text values describing the - * sets of criteria with which to search. - */ - public static List getSearchCriteria() { - List records = new LinkedList<>(); - records.add( - """ - { - "NAME_FULL": "Susan Moony", - "DATE_OF_BIRTH": "15/6/1998", - "SSN_NUMBER": "521212123" + // create a descriptive instance name (can be anything) + String instanceName = SearchRecords.class.getSimpleName(); + + // initialize the Senzing environment + SzEnvironment env = SzCoreEnvironment.newBuilder() + .settings(settings) + .instanceName(instanceName) + .verboseLogging(false) + .build(); + + try { + // get the engine from the environment + SzEngine engine = env.getEngine(); + + // loop through the example records and add them to the repository + for (String criteria : getSearchCriteria()) { + // call the searchByAttributes() function with default flags + String result = engine.searchByAttributes( + criteria, SZ_SEARCH_BY_ATTRIBUTES_DEFAULT_FLAGS); + + JsonObject jsonObj = Json.createReader( + new StringReader(result)).readObject(); + + System.out.println(); + JsonArray jsonArr = jsonObj.getJsonArray("RESOLVED_ENTITIES"); + if (jsonArr.size() == 0) { + System.out.println("No results for criteria: " + criteria); + } else { + System.out.println("Results for criteria: " + criteria); + for (JsonObject obj : jsonArr.getValuesAs(JsonObject.class)) { + obj = obj.getJsonObject("ENTITY"); + obj = obj.getJsonObject("RESOLVED_ENTITY"); + long entityId = obj.getJsonNumber("ENTITY_ID").longValue(); + String name = obj.getString("ENTITY_NAME", null); + System.out.println(entityId + ": " + name); + } + } + System.out.flush(); } - """); - - records.add( - """ - { - "NAME_FIRST": "Robert", - "NAME_LAST": "Smith", - "ADDR_FULL": "123 Main Street Las Vegas NV 89132" - } - """); - - records.add( - """ - { - "NAME_FIRST": "Makio", - "NAME_LAST": "Yamanaka", - "ADDR_FULL": "787 Rotary Drive Rotorville FL 78720" + + } catch (SzException e) { + // handle any exception that may have occurred + System.err.println("Senzing Error Message : " + e.getMessage()); + System.err.println("Senzing Error Code : " + e.getErrorCode()); + e.printStackTrace(); + throw new RuntimeException(e); + + } catch (Exception e) { + e.printStackTrace(); + if (e instanceof RuntimeException) { + throw ((RuntimeException) e); } - """); + throw new RuntimeException(e); - return records; - } + } finally { + // IMPORTANT: make sure to destroy the environment + env.destroy(); + } + + } + + /** + * This is a support method for providing a list of criteria to search on. + * + * @return A {@link List} {@link String} JSON text values describing the + * sets of criteria with which to search. + */ + public static List getSearchCriteria() { + List records = new LinkedList<>(); + records.add( + """ + { + "NAME_FULL": "Susan Moony", + "DATE_OF_BIRTH": "15/6/1998", + "SSN_NUMBER": "521212123" + } + """); + + records.add( + """ + { + "NAME_FIRST": "Robert", + "NAME_LAST": "Smith", + "ADDR_FULL": "123 Main Street Las Vegas NV 89132" + } + """); + + records.add( + """ + { + "NAME_FIRST": "Makio", + "NAME_LAST": "Yamanaka", + "ADDR_FULL": "787 Rotary Drive Rotorville FL 78720" + } + """); + + return records; + } } diff --git a/java/snippets/searching/SearchViaFutures.java b/java/snippets/searching/SearchViaFutures.java index ff2d25e..0455b39 100644 --- a/java/snippets/searching/SearchViaFutures.java +++ b/java/snippets/searching/SearchViaFutures.java @@ -14,283 +14,283 @@ * using futures. */ public class SearchViaFutures { - public static void main(String[] args) { - // get the senzing repository settings - String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); - if (settings == null) { - System.err.println("Unable to get settings."); - throw new IllegalArgumentException("Unable to get settings"); - } - - // create a descriptive instance name (can be anything) - String instanceName = SearchViaFutures.class.getSimpleName(); - - // initialize the Senzing environment - SzEnvironment env = SzCoreEnvironment.newBuilder() - .settings(settings) - .instanceName(instanceName) - .verboseLogging(false) - .build(); - - String filePath = (args.length > 0) ? args[0] : DEFAULT_FILE_PATH; - - // create the thread pool and executor service - ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); - - // keep track of pending futures and don't backlog too many for memory's sake - Map, Criteria> pendingFutures = new IdentityHashMap<>(); - - try (FileInputStream fis = new FileInputStream(filePath); - InputStreamReader isr = new InputStreamReader(fis, UTF_8); - BufferedReader br = new BufferedReader(isr)) { - // get the engine from the environment - SzEngine engine = env.getEngine(); - - int lineNumber = 0; - boolean eof = false; - - while (!eof) { - // loop through the example records and queue them up so long - // as we have more records and backlog is not too large - while (pendingFutures.size() < MAXIMUM_BACKLOG) { - // read the next line - String line = br.readLine(); - lineNumber++; - - // check for EOF - if (line == null) { - eof = true; - break; - } - - // trim the line - line = line.trim(); - - // skip any blank lines - if (line.length() == 0) { - continue; - } - - // skip any commented lines - if (line.startsWith("#")) { - continue; - } - - // construct the Record instance - Criteria criteria = new Criteria(lineNumber, line); - - try { - Future future = executor.submit(() -> { - // call the searchByAttributes() function with default flags - return engine.searchByAttributes( - criteria.line, SZ_SEARCH_BY_ATTRIBUTES_DEFAULT_FLAGS); - }); - - // add the future to the pending future list - pendingFutures.put(future, criteria); - - } catch (JsonException e) { - logFailedSearch(ERROR, e, lineNumber, line); - errorCount++; // increment the error count - } + public static void main(String[] args) { + // get the senzing repository settings + String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); + if (settings == null) { + System.err.println("Unable to get settings."); + throw new IllegalArgumentException("Unable to get settings"); } - do { - // handle any pending futures WITHOUT blocking to reduce the backlog - handlePendingFutures(pendingFutures, false); + // create a descriptive instance name (can be anything) + String instanceName = SearchViaFutures.class.getSimpleName(); + + // initialize the Senzing environment + SzEnvironment env = SzCoreEnvironment.newBuilder() + .settings(settings) + .instanceName(instanceName) + .verboseLogging(false) + .build(); + + String filePath = (args.length > 0) ? args[0] : DEFAULT_FILE_PATH; + + // create the thread pool and executor service + ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); + + // keep track of pending futures and don't backlog too many for memory's sake + Map, Criteria> pendingFutures = new IdentityHashMap<>(); + + try (FileInputStream fis = new FileInputStream(filePath); + InputStreamReader isr = new InputStreamReader(fis, UTF_8); + BufferedReader br = new BufferedReader(isr)) { + // get the engine from the environment + SzEngine engine = env.getEngine(); + + int lineNumber = 0; + boolean eof = false; + + while (!eof) { + // loop through the example records and queue them up so long + // as we have more records and backlog is not too large + while (pendingFutures.size() < MAXIMUM_BACKLOG) { + // read the next line + String line = br.readLine(); + lineNumber++; + + // check for EOF + if (line == null) { + eof = true; + break; + } + + // trim the line + line = line.trim(); + + // skip any blank lines + if (line.length() == 0) { + continue; + } + + // skip any commented lines + if (line.startsWith("#")) { + continue; + } + + // construct the Record instance + Criteria criteria = new Criteria(lineNumber, line); + + try { + Future future = executor.submit(() -> { + // call the searchByAttributes() function with default flags + return engine.searchByAttributes( + criteria.line, SZ_SEARCH_BY_ATTRIBUTES_DEFAULT_FLAGS); + }); + + // add the future to the pending future list + pendingFutures.put(future, criteria); + + } catch (JsonException e) { + logFailedSearch(ERROR, e, lineNumber, line); + errorCount++; // increment the error count + } + } + + do { + // handle any pending futures WITHOUT blocking to reduce the backlog + handlePendingFutures(pendingFutures, false); + + // if we still have exceeded the backlog size then pause + // briefly before trying again + if (pendingFutures.size() >= MAXIMUM_BACKLOG) { + try { + Thread.sleep(PAUSE_TIMEOUT); + + } catch (InterruptedException ignore) { + // do nothing + } + } + } while (pendingFutures.size() >= MAXIMUM_BACKLOG); + } - // if we still have exceeded the backlog size then pause - // briefly before trying again - if (pendingFutures.size() >= MAXIMUM_BACKLOG) { - try { - Thread.sleep(PAUSE_TIMEOUT); + // shutdown the executor service + executor.shutdown(); - } catch (InterruptedException ignore) { - // do nothing + // after we have submitted all records we need to handle the remaining + // pending futures so this time we block on each future + handlePendingFutures(pendingFutures, true); + + } catch (Exception e) { + System.err.println(); + System.err.println("*** Terminated due to critical error ***"); + System.err.flush(); + if (e instanceof RuntimeException) { + throw ((RuntimeException) e); } - } - } while (pendingFutures.size() >= MAXIMUM_BACKLOG); - } - - // shutdown the executor service - executor.shutdown(); - - // after we have submitted all records we need to handle the remaining - // pending futures so this time we block on each future - handlePendingFutures(pendingFutures, true); - - } catch (Exception e) { - System.err.println(); - System.err.println("*** Terminated due to critical error ***"); - System.err.flush(); - if (e instanceof RuntimeException) { - throw ((RuntimeException) e); - } - throw new RuntimeException(e); - - } finally { - // check if executor service is shutdown - if (!executor.isShutdown()) { - executor.shutdown(); - } - - // IMPORTANT: make sure to destroy the environment - env.destroy(); - - System.out.println(); - System.out.println( - "Searches successfully completed : " + successCount); - System.out.println( - "Total entities found via searches : " + foundEntities.size()); - System.out.println( - "Searches failed with errors : " + errorCount); - - // check on any retry records - if (retryWriter != null) { - retryWriter.flush(); - retryWriter.close(); - } - if (retryCount > 0) { - System.out.println(retryCount + " searches to be retried in " + retryFile); - } - System.out.flush(); + throw new RuntimeException(e); - } + } finally { + // check if executor service is shutdown + if (!executor.isShutdown()) { + executor.shutdown(); + } + + // IMPORTANT: make sure to destroy the environment + env.destroy(); + + System.out.println(); + System.out.println( + "Searches successfully completed : " + successCount); + System.out.println( + "Total entities found via searches : " + foundEntities.size()); + System.out.println( + "Searches failed with errors : " + errorCount); + + // check on any retry records + if (retryWriter != null) { + retryWriter.flush(); + retryWriter.close(); + } + if (retryCount > 0) { + System.out.println(retryCount + " searches to be retried in " + retryFile); + } + System.out.flush(); - } - - private static void handlePendingFutures(Map, Criteria> pendingFutures, - boolean blocking) - throws Exception { - // check for completed futures - Iterator, Criteria>> iter = pendingFutures.entrySet().iterator(); - - // loop through the pending futures - while (iter.hasNext()) { - // get the next pending future - Map.Entry, Criteria> entry = iter.next(); - Future future = entry.getKey(); - Criteria criteria = entry.getValue(); - - // if not blocking and this one is not done then continue - if (!blocking && !future.isDone()) { - continue; - } - - // remove the pending future from the map - iter.remove(); - - try { - try { - // get the value and check for an exception - String results = future.get(); - - // if we get here then increment the success count - successCount++; - - // parse the results - JsonObject jsonObj = Json.createReader( - new StringReader(results)).readObject(); - - JsonArray jsonArr = jsonObj.getJsonArray("RESOLVED_ENTITIES"); - for (JsonObject obj : jsonArr.getValuesAs(JsonObject.class)) { - obj = obj.getJsonObject("ENTITY"); - obj = obj.getJsonObject("RESOLVED_ENTITY"); - long entityId = obj.getJsonNumber("ENTITY_ID").longValue(); - foundEntities.add(entityId); - } - - } catch (InterruptedException e) { - // this could only happen if blocking is true, just - // rethrow as retryable and log the interruption - throw e; - - } catch (ExecutionException e) { - // if execution failed with an exception then rethrow - Throwable cause = e.getCause(); - if ((cause == null) || !(cause instanceof Exception)) { - // rethrow the execution exception - throw e; - } - // cast to an Exception and rethrow - throw ((Exception) cause); } - } catch (SzBadInputException e) { - logFailedSearch(ERROR, e, criteria.lineNumber, criteria.line); - errorCount++; // increment the error count - - } catch (SzRetryableException | InterruptedException | CancellationException e) { - // handle thread interruption and cancellation as retries - logFailedSearch(WARNING, e, criteria.lineNumber, criteria.line); - errorCount++; // increment the error count - retryCount++; // increment the retry count - - // track the retry record so it can be retried later - if (retryFile == null) { - retryFile = File.createTempFile(RETRY_PREFIX, RETRY_SUFFIX); - retryWriter = new PrintWriter( - new OutputStreamWriter(new FileOutputStream(retryFile), UTF_8)); + } + + private static void handlePendingFutures(Map, Criteria> pendingFutures, + boolean blocking) + throws Exception { + // check for completed futures + Iterator, Criteria>> iter = pendingFutures.entrySet().iterator(); + + // loop through the pending futures + while (iter.hasNext()) { + // get the next pending future + Map.Entry, Criteria> entry = iter.next(); + Future future = entry.getKey(); + Criteria criteria = entry.getValue(); + + // if not blocking and this one is not done then continue + if (!blocking && !future.isDone()) { + continue; + } + + // remove the pending future from the map + iter.remove(); + + try { + try { + // get the value and check for an exception + String results = future.get(); + + // if we get here then increment the success count + successCount++; + + // parse the results + JsonObject jsonObj = Json.createReader( + new StringReader(results)).readObject(); + + JsonArray jsonArr = jsonObj.getJsonArray("RESOLVED_ENTITIES"); + for (JsonObject obj : jsonArr.getValuesAs(JsonObject.class)) { + obj = obj.getJsonObject("ENTITY"); + obj = obj.getJsonObject("RESOLVED_ENTITY"); + long entityId = obj.getJsonNumber("ENTITY_ID").longValue(); + foundEntities.add(entityId); + } + + } catch (InterruptedException e) { + // this could only happen if blocking is true, just + // rethrow as retryable and log the interruption + throw e; + + } catch (ExecutionException e) { + // if execution failed with an exception then rethrow + Throwable cause = e.getCause(); + if ((cause == null) || !(cause instanceof Exception)) { + // rethrow the execution exception + throw e; + } + // cast to an Exception and rethrow + throw ((Exception) cause); + } + + } catch (SzBadInputException e) { + logFailedSearch(ERROR, e, criteria.lineNumber, criteria.line); + errorCount++; // increment the error count + + } catch (SzRetryableException | InterruptedException | CancellationException e) { + // handle thread interruption and cancellation as retries + logFailedSearch(WARNING, e, criteria.lineNumber, criteria.line); + errorCount++; // increment the error count + retryCount++; // increment the retry count + + // track the retry record so it can be retried later + if (retryFile == null) { + retryFile = File.createTempFile(RETRY_PREFIX, RETRY_SUFFIX); + retryWriter = new PrintWriter( + new OutputStreamWriter(new FileOutputStream(retryFile), UTF_8)); + } + retryWriter.println(criteria.line); + + } catch (Exception e) { + // catch any other exception (incl. SzException) here + logFailedSearch(CRITICAL, e, criteria.lineNumber, criteria.line); + errorCount++; + throw e; // rethrow since exception is critical + } } - retryWriter.println(criteria.line); - - } catch (Exception e) { - // catch any other exception (incl. SzException) here - logFailedSearch(CRITICAL, e, criteria.lineNumber, criteria.line); - errorCount++; - throw e; // rethrow since exception is critical - } } - } - - /** - * Example method for logging failed records. - * - * @param errorType The error type description. - * @param exception The exception itself. - * @param lineNumber The line number of the failed record in the JSON input - * file. - * @param criteriaJson The JSON text for the failed search criteria. - */ - private static void logFailedSearch(String errorType, - Exception exception, - int lineNumber, - String criteriaJson) { - System.err.println(); - System.err.println( - "** " + errorType + " ** FAILED TO SEARCH CRITERIA AT LINE " + lineNumber + ": "); - System.err.println(criteriaJson); - System.err.println(exception); - System.err.flush(); - } - - private static final String DEFAULT_FILE_PATH = "../resources/data/search-5K.jsonl"; - - private static final String UTF_8 = "UTF-8"; - - private static final String RETRY_PREFIX = "retry-"; - private static final String RETRY_SUFFIX = ".jsonl"; - - private static final int THREAD_COUNT = 8; - - private static final int BACKLOG_FACTOR = 10; - - private static final int MAXIMUM_BACKLOG = THREAD_COUNT * BACKLOG_FACTOR; - - private static final long PAUSE_TIMEOUT = 100L; - - private static final String ERROR = "ERROR"; - private static final String WARNING = "WARNING"; - private static final String CRITICAL = "CRITICAL"; - - public record Criteria(int lineNumber, String line) { - } - - private static int errorCount = 0; - private static int successCount = 0; - private static int retryCount = 0; - private static File retryFile = null; - private static PrintWriter retryWriter = null; - - private static Set foundEntities = new HashSet<>(); + + /** + * Example method for logging failed records. + * + * @param errorType The error type description. + * @param exception The exception itself. + * @param lineNumber The line number of the failed record in the JSON input + * file. + * @param criteriaJson The JSON text for the failed search criteria. + */ + private static void logFailedSearch(String errorType, + Exception exception, + int lineNumber, + String criteriaJson) { + System.err.println(); + System.err.println( + "** " + errorType + " ** FAILED TO SEARCH CRITERIA AT LINE " + lineNumber + ": "); + System.err.println(criteriaJson); + System.err.println(exception); + System.err.flush(); + } + + private static final String DEFAULT_FILE_PATH = "../resources/data/search-5K.jsonl"; + + private static final String UTF_8 = "UTF-8"; + + private static final String RETRY_PREFIX = "retry-"; + private static final String RETRY_SUFFIX = ".jsonl"; + + private static final int THREAD_COUNT = 8; + + private static final int BACKLOG_FACTOR = 10; + + private static final int MAXIMUM_BACKLOG = THREAD_COUNT * BACKLOG_FACTOR; + + private static final long PAUSE_TIMEOUT = 100L; + + private static final String ERROR = "ERROR"; + private static final String WARNING = "WARNING"; + private static final String CRITICAL = "CRITICAL"; + + public record Criteria(int lineNumber, String line) { + } + + private static int errorCount = 0; + private static int successCount = 0; + private static int retryCount = 0; + private static File retryFile = null; + private static PrintWriter retryWriter = null; + + private static Set foundEntities = new HashSet<>(); } diff --git a/java/snippets/stewardship/ForceResolve.java b/java/snippets/stewardship/ForceResolve.java index bb54c5e..4b2d856 100644 --- a/java/snippets/stewardship/ForceResolve.java +++ b/java/snippets/stewardship/ForceResolve.java @@ -14,161 +14,161 @@ * otherwise will not resolve to one another. */ public class ForceResolve { - public static void main(String[] args) { - // get the senzing repository settings - String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); - if (settings == null) { - System.err.println("Unable to get settings."); - throw new IllegalArgumentException("Unable to get settings"); - } + public static void main(String[] args) { + // get the senzing repository settings + String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); + if (settings == null) { + System.err.println("Unable to get settings."); + throw new IllegalArgumentException("Unable to get settings"); + } + + // create a descriptive instance name (can be anything) + String instanceName = ForceResolve.class.getSimpleName(); + + // initialize the Senzing environment + SzEnvironment env = SzCoreEnvironment.newBuilder() + .settings(settings) + .instanceName(instanceName) + .verboseLogging(false) + .build(); + + try { + // get the engine from the environment + SzEngine engine = env.getEngine(); + + Map recordMap = getRecords(); + // loop through the example records and add them to the repository + for (Map.Entry entry : recordMap.entrySet()) { + SzRecordKey recordKey = entry.getKey(); + String recordDefinition = entry.getValue(); + + // call the addRecord() function with no flags + engine.addRecord(recordKey, recordDefinition, SZ_NO_FLAGS); + + System.out.println("Record " + recordKey.recordId() + " added"); + System.out.flush(); + } - // create a descriptive instance name (can be anything) - String instanceName = ForceResolve.class.getSimpleName(); - - // initialize the Senzing environment - SzEnvironment env = SzCoreEnvironment.newBuilder() - .settings(settings) - .instanceName(instanceName) - .verboseLogging(false) - .build(); - - try { - // get the engine from the environment - SzEngine engine = env.getEngine(); - - Map recordMap = getRecords(); - // loop through the example records and add them to the repository - for (Map.Entry entry : recordMap.entrySet()) { - SzRecordKey recordKey = entry.getKey(); - String recordDefinition = entry.getValue(); - - // call the addRecord() function with no flags - engine.addRecord(recordKey, recordDefinition, SZ_NO_FLAGS); - - System.out.println("Record " + recordKey.recordId() + " added"); - System.out.flush(); - } - - System.out.println(); - for (SzRecordKey recordKey : recordMap.keySet()) { - String result = engine.getEntity(recordKey, SZ_ENTITY_BRIEF_DEFAULT_FLAGS); - JsonObject jsonObj = Json.createReader(new StringReader(result)).readObject(); - long entityId = jsonObj.getJsonObject("RESOLVED_ENTITY") - .getJsonNumber("ENTITY_ID").longValue(); - System.out.println( - "Record " + recordKey + " originally resolves to entity " + entityId); - } - System.out.println(); - System.out.println("Updating records with TRUSTED_ID to force resolve..."); - SzRecordKey key1 = SzRecordKey.of(TEST, "1"); - SzRecordKey key3 = SzRecordKey.of(TEST, "3"); - - String record1 = engine.getRecord(key1, SZ_RECORD_DEFAULT_FLAGS); - String record3 = engine.getRecord(key3, SZ_RECORD_DEFAULT_FLAGS); - - JsonObject obj1 = Json.createReader(new StringReader(record1)).readObject(); - JsonObject obj3 = Json.createReader(new StringReader(record3)).readObject(); - - obj1 = obj1.getJsonObject("JSON_DATA"); - obj3 = obj3.getJsonObject("JSON_DATA"); - - JsonObjectBuilder job1 = Json.createObjectBuilder(obj1); - JsonObjectBuilder job3 = Json.createObjectBuilder(obj3); - - for (JsonObjectBuilder job : List.of(job1, job3)) { - job.add("TRUSTED_ID_NUMBER", "TEST_R1-TEST_R3"); - job.add("TRUSTED_ID_TYPE", "FORCE_RESOLVE"); - } - - record1 = job1.build().toString(); - record3 = job3.build().toString(); - - engine.addRecord(key1, record1, SZ_NO_FLAGS); - engine.addRecord(key3, record3, SZ_NO_FLAGS); - - System.out.println(); - for (SzRecordKey recordKey : recordMap.keySet()) { - String result = engine.getEntity(recordKey, SZ_ENTITY_BRIEF_DEFAULT_FLAGS); - JsonObject jsonObj = Json.createReader(new StringReader(result)).readObject(); - long entityId = jsonObj.getJsonObject("RESOLVED_ENTITY") - .getJsonNumber("ENTITY_ID").longValue(); - System.out.println( - "Record " + recordKey + " now resolves to entity " + entityId); - } - System.out.println(); - - } catch (SzException e) { - // handle any exception that may have occurred - System.err.println("Senzing Error Message : " + e.getMessage()); - System.err.println("Senzing Error Code : " + e.getErrorCode()); - e.printStackTrace(); - throw new RuntimeException(e); - - } catch (Exception e) { - System.err.println(); - System.err.println("*** Terminated due to critical error ***"); - e.printStackTrace(); - if (e instanceof RuntimeException) { - throw ((RuntimeException) e); - } - throw new RuntimeException(e); - - } finally { - // IMPORTANT: make sure to destroy the environment - env.destroy(); - } + System.out.println(); + for (SzRecordKey recordKey : recordMap.keySet()) { + String result = engine.getEntity(recordKey, SZ_ENTITY_BRIEF_DEFAULT_FLAGS); + JsonObject jsonObj = Json.createReader(new StringReader(result)).readObject(); + long entityId = jsonObj.getJsonObject("RESOLVED_ENTITY") + .getJsonNumber("ENTITY_ID").longValue(); + System.out.println( + "Record " + recordKey + " originally resolves to entity " + entityId); + } + System.out.println(); + System.out.println("Updating records with TRUSTED_ID to force resolve..."); + SzRecordKey key1 = SzRecordKey.of(TEST, "1"); + SzRecordKey key3 = SzRecordKey.of(TEST, "3"); + + String record1 = engine.getRecord(key1, SZ_RECORD_DEFAULT_FLAGS); + String record3 = engine.getRecord(key3, SZ_RECORD_DEFAULT_FLAGS); + + JsonObject obj1 = Json.createReader(new StringReader(record1)).readObject(); + JsonObject obj3 = Json.createReader(new StringReader(record3)).readObject(); + + obj1 = obj1.getJsonObject("JSON_DATA"); + obj3 = obj3.getJsonObject("JSON_DATA"); + + JsonObjectBuilder job1 = Json.createObjectBuilder(obj1); + JsonObjectBuilder job3 = Json.createObjectBuilder(obj3); - } - - /** - * This is a support method for providing example records to add. - * - * @return A {@link Map} of {@link SzRecordKey} keys to {@link String} - * JSON text values describing the records to be added. - */ - public static Map getRecords() { - Map records = new LinkedHashMap<>(); - records.put( - SzRecordKey.of("TEST", "1"), - """ - { - "DATA_SOURCE": "TEST", - "RECORD_ID": "1", - "PRIMARY_NAME_FULL": "Patrick Smith", - "AKA_NAME_FULL": "Paddy Smith", - "ADDR_FULL": "787 Rotary Dr, Rotorville, RI, 78720", - "PHONE_NUMBER": "787-767-2688", - "DATE_OF_BIRTH": "1/12/1990" + for (JsonObjectBuilder job : List.of(job1, job3)) { + job.add("TRUSTED_ID_NUMBER", "TEST_R1-TEST_R3"); + job.add("TRUSTED_ID_TYPE", "FORCE_RESOLVE"); } - """); - - records.put( - SzRecordKey.of("TEST", "2"), - """ - { - "DATA_SOURCE": "TEST", - "RECORD_ID": "2", - "PRIMARY_NAME_FULL": "Patricia Smith", - "ADDR_FULL": "787 Rotary Dr, Rotorville, RI, 78720", - "PHONE_NUMBER": "787-767-2688", - "DATE_OF_BIRTH": "5/4/1994" + + record1 = job1.build().toString(); + record3 = job3.build().toString(); + + engine.addRecord(key1, record1, SZ_NO_FLAGS); + engine.addRecord(key3, record3, SZ_NO_FLAGS); + + System.out.println(); + for (SzRecordKey recordKey : recordMap.keySet()) { + String result = engine.getEntity(recordKey, SZ_ENTITY_BRIEF_DEFAULT_FLAGS); + JsonObject jsonObj = Json.createReader(new StringReader(result)).readObject(); + long entityId = jsonObj.getJsonObject("RESOLVED_ENTITY") + .getJsonNumber("ENTITY_ID").longValue(); + System.out.println( + "Record " + recordKey + " now resolves to entity " + entityId); } - """); - - records.put( - SzRecordKey.of("TEST", "3"), - """ - { - "DATA_SOURCE": "TEST", - "RECORD_ID": "3", - "PRIMARY_NAME_FULL": "Pat Smith", - "ADDR_FULL": "787 Rotary Dr, Rotorville, RI, 78720", - "PHONE_NUMBER": "787-767-2688" + System.out.println(); + + } catch (SzException e) { + // handle any exception that may have occurred + System.err.println("Senzing Error Message : " + e.getMessage()); + System.err.println("Senzing Error Code : " + e.getErrorCode()); + e.printStackTrace(); + throw new RuntimeException(e); + + } catch (Exception e) { + System.err.println(); + System.err.println("*** Terminated due to critical error ***"); + e.printStackTrace(); + if (e instanceof RuntimeException) { + throw ((RuntimeException) e); } - """); + throw new RuntimeException(e); + + } finally { + // IMPORTANT: make sure to destroy the environment + env.destroy(); + } - return records; - } + } + + /** + * This is a support method for providing example records to add. + * + * @return A {@link Map} of {@link SzRecordKey} keys to {@link String} + * JSON text values describing the records to be added. + */ + public static Map getRecords() { + Map records = new LinkedHashMap<>(); + records.put( + SzRecordKey.of("TEST", "1"), + """ + { + "DATA_SOURCE": "TEST", + "RECORD_ID": "1", + "PRIMARY_NAME_FULL": "Patrick Smith", + "AKA_NAME_FULL": "Paddy Smith", + "ADDR_FULL": "787 Rotary Dr, Rotorville, RI, 78720", + "PHONE_NUMBER": "787-767-2688", + "DATE_OF_BIRTH": "1/12/1990" + } + """); + + records.put( + SzRecordKey.of("TEST", "2"), + """ + { + "DATA_SOURCE": "TEST", + "RECORD_ID": "2", + "PRIMARY_NAME_FULL": "Patricia Smith", + "ADDR_FULL": "787 Rotary Dr, Rotorville, RI, 78720", + "PHONE_NUMBER": "787-767-2688", + "DATE_OF_BIRTH": "5/4/1994" + } + """); + + records.put( + SzRecordKey.of("TEST", "3"), + """ + { + "DATA_SOURCE": "TEST", + "RECORD_ID": "3", + "PRIMARY_NAME_FULL": "Pat Smith", + "ADDR_FULL": "787 Rotary Dr, Rotorville, RI, 78720", + "PHONE_NUMBER": "787-767-2688" + } + """); + + return records; + } - private static final String TEST = "TEST"; + private static final String TEST = "TEST"; } diff --git a/java/snippets/stewardship/ForceUnresolve.java b/java/snippets/stewardship/ForceUnresolve.java index df2b5ae..54551b3 100644 --- a/java/snippets/stewardship/ForceUnresolve.java +++ b/java/snippets/stewardship/ForceUnresolve.java @@ -14,159 +14,159 @@ * otherwise will not resolve to one another. */ public class ForceUnresolve { - public static void main(String[] args) { - // get the senzing repository settings - String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); - if (settings == null) { - System.err.println("Unable to get settings."); - throw new IllegalArgumentException("Unable to get settings"); - } - - // create a descriptive instance name (can be anything) - String instanceName = ForceUnresolve.class.getSimpleName(); - - // initialize the Senzing environment - SzEnvironment env = SzCoreEnvironment.newBuilder() - .settings(settings) - .instanceName(instanceName) - .verboseLogging(false) - .build(); - - try { - // get the engine from the environment - SzEngine engine = env.getEngine(); - - Map recordMap = getRecords(); - // loop through the example records and add them to the repository - for (Map.Entry entry : recordMap.entrySet()) { - SzRecordKey recordKey = entry.getKey(); - String recordDefinition = entry.getValue(); - - // call the addRecord() function with no flags - engine.addRecord(recordKey, recordDefinition, SZ_NO_FLAGS); - - System.out.println("Record " + recordKey.recordId() + " added"); - System.out.flush(); - } - - System.out.println(); - for (SzRecordKey recordKey : recordMap.keySet()) { - String result = engine.getEntity(recordKey, SZ_ENTITY_BRIEF_DEFAULT_FLAGS); - JsonObject jsonObj = Json.createReader(new StringReader(result)).readObject(); - long entityId = jsonObj.getJsonObject("RESOLVED_ENTITY") - .getJsonNumber("ENTITY_ID").longValue(); - System.out.println( - "Record " + recordKey + " originally resolves to entity " + entityId); - } - System.out.println(); - System.out.println("Updating records with TRUSTED_ID to force unresolve..."); - SzRecordKey key4 = SzRecordKey.of(TEST, "4"); - SzRecordKey key6 = SzRecordKey.of(TEST, "6"); - - String record4 = engine.getRecord(key4, SZ_RECORD_DEFAULT_FLAGS); - String record6 = engine.getRecord(key6, SZ_RECORD_DEFAULT_FLAGS); - - JsonObject obj4 = Json.createReader(new StringReader(record4)).readObject(); - JsonObject obj6 = Json.createReader(new StringReader(record6)).readObject(); - - obj4 = obj4.getJsonObject("JSON_DATA"); - obj6 = obj6.getJsonObject("JSON_DATA"); - - JsonObjectBuilder job4 = Json.createObjectBuilder(obj4); - JsonObjectBuilder job6 = Json.createObjectBuilder(obj6); - - job4.add("TRUSTED_ID_NUMBER", "TEST_R4-TEST_R6"); - job4.add("TRUSTED_ID_TYPE", "FORCE_UNRESOLVE"); - - job6.add("TRUSTED_ID_NUMBER", "TEST_R6-TEST_R4"); - job6.add("TRUSTED_ID_TYPE", "FORCE_UNRESOLVE"); - - record4 = job4.build().toString(); - record6 = job6.build().toString(); - - engine.addRecord(key4, record4, SZ_NO_FLAGS); - engine.addRecord(key6, record6, SZ_NO_FLAGS); - - System.out.println(); - for (SzRecordKey recordKey : recordMap.keySet()) { - String result = engine.getEntity(recordKey, SZ_ENTITY_BRIEF_DEFAULT_FLAGS); - JsonObject jsonObj = Json.createReader(new StringReader(result)).readObject(); - long entityId = jsonObj.getJsonObject("RESOLVED_ENTITY") - .getJsonNumber("ENTITY_ID").longValue(); - System.out.println( - "Record " + recordKey + " now resolves to entity " + entityId); - } - System.out.println(); - - } catch (SzException e) { - // handle any exception that may have occurred - System.err.println("Senzing Error Message : " + e.getMessage()); - System.err.println("Senzing Error Code : " + e.getErrorCode()); - e.printStackTrace(); - throw new RuntimeException(e); - - } catch (Exception e) { - e.printStackTrace(); - if (e instanceof RuntimeException) { - throw ((RuntimeException) e); - } - throw new RuntimeException(e); - - } finally { - // IMPORTANT: make sure to destroy the environment - env.destroy(); - } + public static void main(String[] args) { + // get the senzing repository settings + String settings = System.getenv("SENZING_ENGINE_CONFIGURATION_JSON"); + if (settings == null) { + System.err.println("Unable to get settings."); + throw new IllegalArgumentException("Unable to get settings"); + } + + // create a descriptive instance name (can be anything) + String instanceName = ForceUnresolve.class.getSimpleName(); + + // initialize the Senzing environment + SzEnvironment env = SzCoreEnvironment.newBuilder() + .settings(settings) + .instanceName(instanceName) + .verboseLogging(false) + .build(); + + try { + // get the engine from the environment + SzEngine engine = env.getEngine(); + + Map recordMap = getRecords(); + // loop through the example records and add them to the repository + for (Map.Entry entry : recordMap.entrySet()) { + SzRecordKey recordKey = entry.getKey(); + String recordDefinition = entry.getValue(); + + // call the addRecord() function with no flags + engine.addRecord(recordKey, recordDefinition, SZ_NO_FLAGS); + + System.out.println("Record " + recordKey.recordId() + " added"); + System.out.flush(); + } - } - - /** - * This is a support method for providing example records to add. - * - * @return A {@link Map} of {@link SzRecordKey} keys to {@link String} - * JSON text values describing the records to be added. - */ - public static Map getRecords() { - Map records = new LinkedHashMap<>(); - records.put( - SzRecordKey.of("TEST", "4"), - """ - { - "DATA_SOURCE": "TEST", - "RECORD_ID": "4", - "PRIMARY_NAME_FULL": "Elizabeth Jonas", - "ADDR_FULL": "202 Rotary Dr, Rotorville, RI, 78720", - "SSN_NUMBER": "767-87-7678", - "DATE_OF_BIRTH": "1/12/1990" + System.out.println(); + for (SzRecordKey recordKey : recordMap.keySet()) { + String result = engine.getEntity(recordKey, SZ_ENTITY_BRIEF_DEFAULT_FLAGS); + JsonObject jsonObj = Json.createReader(new StringReader(result)).readObject(); + long entityId = jsonObj.getJsonObject("RESOLVED_ENTITY") + .getJsonNumber("ENTITY_ID").longValue(); + System.out.println( + "Record " + recordKey + " originally resolves to entity " + entityId); } - """); - - records.put( - SzRecordKey.of("TEST", "5"), - """ - { - "DATA_SOURCE": "TEST", - "RECORD_ID": "5", - "PRIMARY_NAME_FULL": "Beth Jones", - "ADDR_FULL": "202 Rotary Dr, Rotorville, RI, 78720", - "SSN_NUMBER": "767-87-7678", - "DATE_OF_BIRTH": "1/12/1990" + System.out.println(); + System.out.println("Updating records with TRUSTED_ID to force unresolve..."); + SzRecordKey key4 = SzRecordKey.of(TEST, "4"); + SzRecordKey key6 = SzRecordKey.of(TEST, "6"); + + String record4 = engine.getRecord(key4, SZ_RECORD_DEFAULT_FLAGS); + String record6 = engine.getRecord(key6, SZ_RECORD_DEFAULT_FLAGS); + + JsonObject obj4 = Json.createReader(new StringReader(record4)).readObject(); + JsonObject obj6 = Json.createReader(new StringReader(record6)).readObject(); + + obj4 = obj4.getJsonObject("JSON_DATA"); + obj6 = obj6.getJsonObject("JSON_DATA"); + + JsonObjectBuilder job4 = Json.createObjectBuilder(obj4); + JsonObjectBuilder job6 = Json.createObjectBuilder(obj6); + + job4.add("TRUSTED_ID_NUMBER", "TEST_R4-TEST_R6"); + job4.add("TRUSTED_ID_TYPE", "FORCE_UNRESOLVE"); + + job6.add("TRUSTED_ID_NUMBER", "TEST_R6-TEST_R4"); + job6.add("TRUSTED_ID_TYPE", "FORCE_UNRESOLVE"); + + record4 = job4.build().toString(); + record6 = job6.build().toString(); + + engine.addRecord(key4, record4, SZ_NO_FLAGS); + engine.addRecord(key6, record6, SZ_NO_FLAGS); + + System.out.println(); + for (SzRecordKey recordKey : recordMap.keySet()) { + String result = engine.getEntity(recordKey, SZ_ENTITY_BRIEF_DEFAULT_FLAGS); + JsonObject jsonObj = Json.createReader(new StringReader(result)).readObject(); + long entityId = jsonObj.getJsonObject("RESOLVED_ENTITY") + .getJsonNumber("ENTITY_ID").longValue(); + System.out.println( + "Record " + recordKey + " now resolves to entity " + entityId); } - """); - - records.put( - SzRecordKey.of("TEST", "6"), - """ - { - "DATA_SOURCE": "TEST", - "RECORD_ID": "6", - "PRIMARY_NAME_FULL": "Betsey Jones", - "ADDR_FULL": "202 Rotary Dr, Rotorville, RI, 78720", - "PHONE_NUMBER": "202-787-7678" + System.out.println(); + + } catch (SzException e) { + // handle any exception that may have occurred + System.err.println("Senzing Error Message : " + e.getMessage()); + System.err.println("Senzing Error Code : " + e.getErrorCode()); + e.printStackTrace(); + throw new RuntimeException(e); + + } catch (Exception e) { + e.printStackTrace(); + if (e instanceof RuntimeException) { + throw ((RuntimeException) e); } - """); + throw new RuntimeException(e); - return records; - } + } finally { + // IMPORTANT: make sure to destroy the environment + env.destroy(); + } + + } + + /** + * This is a support method for providing example records to add. + * + * @return A {@link Map} of {@link SzRecordKey} keys to {@link String} + * JSON text values describing the records to be added. + */ + public static Map getRecords() { + Map records = new LinkedHashMap<>(); + records.put( + SzRecordKey.of("TEST", "4"), + """ + { + "DATA_SOURCE": "TEST", + "RECORD_ID": "4", + "PRIMARY_NAME_FULL": "Elizabeth Jonas", + "ADDR_FULL": "202 Rotary Dr, Rotorville, RI, 78720", + "SSN_NUMBER": "767-87-7678", + "DATE_OF_BIRTH": "1/12/1990" + } + """); + + records.put( + SzRecordKey.of("TEST", "5"), + """ + { + "DATA_SOURCE": "TEST", + "RECORD_ID": "5", + "PRIMARY_NAME_FULL": "Beth Jones", + "ADDR_FULL": "202 Rotary Dr, Rotorville, RI, 78720", + "SSN_NUMBER": "767-87-7678", + "DATE_OF_BIRTH": "1/12/1990" + } + """); + + records.put( + SzRecordKey.of("TEST", "6"), + """ + { + "DATA_SOURCE": "TEST", + "RECORD_ID": "6", + "PRIMARY_NAME_FULL": "Betsey Jones", + "ADDR_FULL": "202 Rotary Dr, Rotorville, RI, 78720", + "PHONE_NUMBER": "202-787-7678" + } + """); + + return records; + } - private static final String TEST = "TEST"; + private static final String TEST = "TEST"; }