From c23737093fc1f1604762eb73323cad1d797bc837 Mon Sep 17 00:00:00 2001 From: Eric Long Date: Mon, 20 Nov 2023 13:06:58 -0500 Subject: [PATCH] 0006113: Add process info to support snapshot --- .../jumpmind/symmetric/util/SnapshotUtil.java | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/util/SnapshotUtil.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/util/SnapshotUtil.java index fb89a9ce84..b197af97fe 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/util/SnapshotUtil.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/util/SnapshotUtil.java @@ -94,6 +94,7 @@ import org.jumpmind.symmetric.model.NodeGroupLink; import org.jumpmind.symmetric.model.NodeSecurity; import org.jumpmind.symmetric.model.OutgoingBatch; +import org.jumpmind.symmetric.model.ProcessInfo; import org.jumpmind.symmetric.model.Router; import org.jumpmind.symmetric.model.Trigger; import org.jumpmind.symmetric.model.TriggerHistory; @@ -387,7 +388,9 @@ public static File createSnapshot(ISymmetricEngine engine, IProgressListener lis createThreadsFile(tmpDir.getPath(), false); createThreadsFile(tmpDir.getPath(), true); createThreadStatsFile(tmpDir.getPath()); + createProcessInfoFile(engine, tmpDir.getPath()); try { + log.info("Writing transactions file"); List transactions = targetPlatform.getTransactions(); if (!transactions.isEmpty()) { createTransactionsFile(engine, tmpDir.getPath(), transactions); @@ -810,6 +813,44 @@ public static File createThreadStatsFile(String parent) { return file; } + public static void createProcessInfoFile(ISymmetricEngine engine, String parent) { + try { + File file = new File(parent, "process-info.csv"); + File fileActive = new File(parent, "process-info-active.csv"); + List infos = engine.getStatisticManager().getProcessInfos(); + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + try (OutputStream outputStream = new FileOutputStream(file); + OutputStream outputActiveStream = new FileOutputStream(fileActive); + CsvWriter csvWriter = new CsvWriter(outputStream, ',', Charset.defaultCharset()); + CsvWriter csvActiveWriter = new CsvWriter(outputActiveStream, ',', Charset.defaultCharset())) { + csvWriter.setEscapeMode(CsvWriter.ESCAPE_MODE_DOUBLED); + String[] heading = { "Thread Name", "Source Node", "Target Node", "Type", "Queue", "Current Channel ID", "Status", "Current Data Count", + "Total Data Count", "Total Batch Count", "Current Batch ID", "Current Batch Count", "Current Table Name", "Batch Start Time", "Load ID", + "Start Time", "End Time" }; + csvWriter.writeRecord(heading); + csvActiveWriter.writeRecord(heading); + for (ProcessInfo i : infos) { + Thread t = i.getThread(); + String[] row = { t == null ? null : t.getName(), i.getSourceNodeId(), i.getTargetNodeId(), i.getProcessType().toString(), + i.getQueue(), i.getCurrentChannelId(), i.getStatus().toString(), String.valueOf(i.getCurrentDataCount()), + String.valueOf(i.getTotalDataCount()), String.valueOf(i.getTotalBatchCount()), String.valueOf(i.getCurrentBatchId()), + String.valueOf(i.getCurrentBatchCount()), i.getCurrentTableName(), + i.getCurrentBatchStartTime() == null ? null : df.format(i.getCurrentBatchStartTime()), + String.valueOf(i.getCurrentLoadId()), i.getStartTime() == null ? null : df.format(i.getStartTime()), + i.getEndTime() == null ? null : df.format(i.getEndTime()) }; + csvWriter.writeRecord(row); + if (i.getEndTime() == null) { + csvActiveWriter.writeRecord(row); + } + } + csvWriter.flush(); + csvActiveWriter.flush(); + } + } catch (Exception e) { + log.warn("Failed to write process info", e); + } + } + private static File createTransactionsFile(ISymmetricEngine engine, String parent, List transactions) { Map transactionMap = new HashMap(); for (Transaction transaction : transactions) {