From 03616ea8651475f1097c73e5fe6a44a2fffc28f7 Mon Sep 17 00:00:00 2001 From: Eric Long Date: Fri, 30 Sep 2022 17:21:04 -0400 Subject: [PATCH] 0005489: Backport support snapshot improvements --- .../jumpmind/symmetric/util/SnapshotUtil.java | 699 ++++++++++-------- .../symmetric/common/ParameterConstants.java | 3 + .../service/ITriggerRouterService.java | 2 + .../service/impl/TriggerRouterService.java | 6 + .../impl/TriggerRouterServiceSqlMap.java | 2 + .../util/LogSummaryAppenderUtils.java | 2 +- .../symmetric/util/SymmetricUtils.java | 17 +- .../resources/symmetric-default.properties | 21 + .../symmetric/io/stage/IStagingManager.java | 2 + .../symmetric/io/stage/StagingManager.java | 5 + 10 files changed, 442 insertions(+), 317 deletions(-) diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/util/SnapshotUtil.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/util/SnapshotUtil.java index f860cdd906..e234cc42b1 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/util/SnapshotUtil.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/util/SnapshotUtil.java @@ -20,8 +20,7 @@ */ package org.jumpmind.symmetric.util; -import static org.apache.commons.lang3.StringUtils.isNotBlank; - +import java.io.BufferedWriter; import java.io.File; import java.io.FileOutputStream; import java.io.FileWriter; @@ -45,12 +44,12 @@ import java.util.Collections; import java.util.Comparator; import java.util.Date; -import java.util.Enumeration; import java.util.HashMap; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Properties; -import java.util.TreeSet; +import java.util.TimeZone; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -71,6 +70,7 @@ import org.jumpmind.extension.IProgressListener; import org.jumpmind.properties.DefaultParameterParser.ParameterMetaData; import org.jumpmind.symmetric.ISymmetricEngine; +import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.common.SystemConstants; import org.jumpmind.symmetric.common.TableConstants; @@ -80,63 +80,71 @@ import org.jumpmind.symmetric.db.mysql.MySqlSymmetricDialect; import org.jumpmind.symmetric.io.data.DbExport; import org.jumpmind.symmetric.io.data.DbExport.Format; +import org.jumpmind.symmetric.io.data.transform.TransformPoint; +import org.jumpmind.symmetric.io.data.transform.TransformTable; import org.jumpmind.symmetric.job.IJob; import org.jumpmind.symmetric.job.IJobManager; +import org.jumpmind.symmetric.model.Channel; import org.jumpmind.symmetric.model.Data; import org.jumpmind.symmetric.model.DataGap; import org.jumpmind.symmetric.model.Lock; +import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.model.NodeGroupLink; +import org.jumpmind.symmetric.model.NodeSecurity; import org.jumpmind.symmetric.model.OutgoingBatch; +import org.jumpmind.symmetric.model.Router; import org.jumpmind.symmetric.model.Trigger; import org.jumpmind.symmetric.model.TriggerHistory; +import org.jumpmind.symmetric.model.TriggerRouter; import org.jumpmind.symmetric.monitor.MonitorTypeBlock; import org.jumpmind.symmetric.service.IClusterService; import org.jumpmind.symmetric.service.INodeService; import org.jumpmind.symmetric.service.IParameterService; +import org.jumpmind.symmetric.service.ITransformService; import org.jumpmind.symmetric.service.ITriggerRouterService; +import org.jumpmind.symmetric.service.impl.TransformService.TransformTableNodeGroupLink; import org.jumpmind.symmetric.service.impl.UpdateService; import org.jumpmind.util.AppUtils; +import org.jumpmind.util.LogSummary; import org.jumpmind.util.ZipBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @IgnoreJRERequirement public class SnapshotUtil { - - protected static final Logger log = LoggerFactory.getLogger(SnapshotUtil.class); - + private static final Logger log = LoggerFactory.getLogger(SnapshotUtil.class); protected static final int THREAD_INDENT_SPACE = 50; + public static final String SNAPSHOT_DIR = "snapshots"; public static File getSnapshotDirectory(ISymmetricEngine engine) { - File snapshotsDir = new File(engine.getParameterService().getTempDirectory(), "snapshots"); + File snapshotsDir = new File(engine.getParameterService().getTempDirectory(), SNAPSHOT_DIR); snapshotsDir.mkdirs(); return snapshotsDir; } public static File createSnapshot(ISymmetricEngine engine, IProgressListener listener) { - if (listener != null) { listener.checkpoint(engine.getEngineName(), 0, 5); } - - String dirName = engine.getEngineName().replaceAll(" ", "-") + "-" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()); - + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); + dateFormat.setTimeZone(TimeZone.getTimeZone("GMT")); + String dirName = engine.getEngineName().replaceAll(" ", "-") + "-" + dateFormat.format(new Date()); IParameterService parameterService = engine.getParameterService(); File tmpDir = new File(parameterService.getTempDirectory(), dirName); tmpDir.mkdirs(); log.info("Creating snapshot file in " + tmpDir.getAbsolutePath()); - - try(FileWriter fwriter = new FileWriter(new File(tmpDir, "config-export.csv"))) { + log.info("Exporting configuration"); + try (FileWriter fwriter = new FileWriter(new File(tmpDir, "config-export.csv"))) { engine.getDataExtractorService().extractConfigurationStandalone(engine.getNodeService().findIdentity(), fwriter, TableConstants.SYM_NODE, TableConstants.SYM_NODE_SECURITY, TableConstants.SYM_NODE_IDENTITY, TableConstants.SYM_NODE_HOST, - TableConstants.SYM_NODE_CHANNEL_CTL,TableConstants.SYM_CONSOLE_ROLE, - TableConstants.SYM_CONSOLE_USER,TableConstants.SYM_CONSOLE_ROLE_PRIVILEGE, + TableConstants.SYM_NODE_CHANNEL_CTL, TableConstants.SYM_CONSOLE_ROLE, + TableConstants.SYM_CONSOLE_USER, TableConstants.SYM_CONSOLE_ROLE_PRIVILEGE, TableConstants.SYM_MONITOR_EVENT, TableConstants.SYM_CONSOLE_EVENT, TableConstants.SYM_CONSOLE_USER_HIST); } catch (Exception e) { log.warn("Failed to export symmetric configuration", e); } - File serviceConfFile = new File("conf/sym_service.conf"); try { if (serviceConfFile.exists()) { @@ -145,14 +153,12 @@ public static File createSnapshot(ISymmetricEngine engine, IProgressListener lis } catch (Exception e) { log.warn("Failed to copy " + serviceConfFile.getName() + " to the snapshot directory", e); } - if (listener != null) { listener.checkpoint(engine.getEngineName(), 1, 5); } - + log.info("Writing table definitions"); IDatabasePlatform targetPlatform = engine.getSymmetricDialect().getTargetPlatform(); ISymmetricDialect targetDialect = engine.getTargetDialect(); - FileOutputStream fos = null; try { HashMap> catalogSchemas = new HashMap>(); ITriggerRouterService triggerRouterService = engine.getTriggerRouterService(); @@ -167,91 +173,60 @@ public static File createSnapshot(ISymmetricEngine engine, IProgressListener lis } } } - - List catalogNames = targetPlatform.getDdlReader().getCatalogNames(); - List triggers = triggerRouterService.getTriggers(); - for (Trigger trigger : triggers) { - if (StringUtils.isBlank(trigger.getSourceCatalogName()) || catalogNames.contains(trigger.getSourceCatalogName())) { - Table table = targetPlatform.getTableFromCache(trigger.getSourceCatalogName(), trigger.getSourceSchemaName(), - trigger.getSourceTableName(), false); - if (table != null) { - addTableToMap(catalogSchemas, new CatalogSchema(table.getCatalog(), table.getSchema()), table); - } - } - } - + addTablesThatLoadIncoming(engine, catalogSchemas); for (CatalogSchema catalogSchema : catalogSchemas.keySet()) { DbExport export = new DbExport(targetPlatform); boolean isDefaultCatalog = StringUtils.equalsIgnoreCase(catalogSchema.getCatalog(), targetPlatform.getDefaultCatalog()); boolean isDefaultSchema = StringUtils.equalsIgnoreCase(catalogSchema.getSchema(), targetPlatform.getDefaultSchema()); - - try { - if (isDefaultCatalog && isDefaultSchema) { - fos = new FileOutputStream(new File(tmpDir, "table-definitions.xml")); - } else { - String extra = ""; - if (!isDefaultCatalog && catalogSchema.getCatalog() != null) { - extra += catalogSchema.getCatalog(); - export.setCatalog(catalogSchema.getCatalog()); - } - if (!isDefaultSchema && catalogSchema.getSchema() != null) { - if (!extra.equals("")) { - extra += "-"; - } - extra += catalogSchema.getSchema(); - export.setSchema(catalogSchema.getSchema()); + String filename = null; + if (isDefaultCatalog && isDefaultSchema) { + filename = "table-definitions.xml"; + } else { + String extra = ""; + if (!isDefaultCatalog && catalogSchema.getCatalog() != null) { + extra += catalogSchema.getCatalog(); + export.setCatalog(catalogSchema.getCatalog()); + } + if (!isDefaultSchema && catalogSchema.getSchema() != null) { + if (!extra.equals("")) { + extra += "-"; } - fos = new FileOutputStream(new File(tmpDir, "table-definitions-" + extra + ".xml")); + extra += catalogSchema.getSchema(); + export.setSchema(catalogSchema.getSchema()); } - + log.info("Writing table definitions for {}", extra); + filename = "table-definitions-" + extra + ".xml"; + } + try (FileOutputStream fos = new FileOutputStream(new File(tmpDir, filename))) { List tables = catalogSchemas.get(catalogSchema); export.setFormat(Format.XML); export.setNoData(true); export.exportTables(fos, tables.toArray(new Table[tables.size()])); - } finally { - if(fos != null) { - try { - fos.close(); - } catch(IOException e) { } - } } } } catch (Exception e) { log.warn("Failed to export table definitions", e); - } finally { - if(fos != null) { - try { - fos.close(); - } catch(IOException e) { } - } } - if (listener != null) { listener.checkpoint(engine.getEngineName(), 2, 5); } - + log.info("Writing runtime data"); String tablePrefix = engine.getTablePrefix(); - DbExport export = new DbExport(engine.getDatabasePlatform()); - export.setFormat(Format.CSV); + export.setFormat(Format.CSV_DQUOTE); export.setNoCreateInfo(true); - - extract(export, new File(tmpDir, "sym_identity.csv"), TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_IDENTITY)); - + int maxBatches = parameterService.getInt(ParameterConstants.SNAPSHOT_MAX_BATCHES); + int maxNodeChannels = parameterService.getInt(ParameterConstants.SNAPSHOT_MAX_NODE_CHANNELS); + extract(export, new File(tmpDir, "sym_node_identity.csv"), TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_IDENTITY)); extract(export, new File(tmpDir, "sym_node.csv"), TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE)); - extract(export, new File(tmpDir, "sym_node_security.csv"), TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_SECURITY)); - - extract(export, new File(tmpDir, "sym_node_host.csv"), + extract(export, new File(tmpDir, "sym_node_host.csv"), TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_HOST)); - - extract(export, new File(tmpDir, "sym_trigger_hist.csv"), + extract(export, new File(tmpDir, "sym_trigger_hist.csv"), TableConstants.getTableName(tablePrefix, TableConstants.SYM_TRIGGER_HIST)); - - extract(export, new File(tmpDir, "sym_node_channel_ctl.csv"), + extract(export, maxNodeChannels, "", new File(tmpDir, "sym_node_channel_ctl.csv"), TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_CHANNEL_CTL)); - try { if (!parameterService.is(ParameterConstants.CLUSTER_LOCKING_ENABLED)) { engine.getNodeCommunicationService().persistToTableForSnapshot(); @@ -260,96 +235,90 @@ public static File createSnapshot(ISymmetricEngine engine, IProgressListener lis } catch (Exception e) { log.warn("Unable to add SYM_NODE_COMMUNICATION to the snapshot.", e); } - extract(export, new File(tmpDir, "sym_lock.csv"), TableConstants.getTableName(tablePrefix, TableConstants.SYM_LOCK)); - - extract(export, new File(tmpDir, "sym_node_communication.csv"), + extract(export, maxNodeChannels, "", new File(tmpDir, "sym_node_communication.csv"), TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_COMMUNICATION)); - - extract(export, 50000, "where status = 'OK' order by batch_id desc", new File(tmpDir, "sym_outgoing_batch_ok.csv"), + extract(export, maxBatches, "where status = 'OK' order by batch_id desc", new File(tmpDir, "sym_outgoing_batch_ok.csv"), TableConstants.getTableName(tablePrefix, TableConstants.SYM_OUTGOING_BATCH)); - - extract(export, 10000, "where status != 'OK' order by batch_id desc", new File(tmpDir, "sym_outgoing_batch_not_ok.csv"), + extract(export, maxBatches, "where status != 'OK' order by batch_id", new File(tmpDir, "sym_outgoing_batch_not_ok.csv"), TableConstants.getTableName(tablePrefix, TableConstants.SYM_OUTGOING_BATCH)); - - extract(export, 10000, "where status = 'OK' order by create_time desc", new File(tmpDir, "sym_incoming_batch_ok.csv"), + extract(export, maxBatches, "where status = 'OK' order by create_time desc", new File(tmpDir, "sym_incoming_batch_ok.csv"), TableConstants.getTableName(tablePrefix, TableConstants.SYM_INCOMING_BATCH)); - - extract(export, 10000, "where status != 'OK' order by create_time", new File(tmpDir, "sym_incoming_batch_not_ok.csv"), + extract(export, maxBatches, "where status != 'OK' order by create_time", new File(tmpDir, "sym_incoming_batch_not_ok.csv"), TableConstants.getTableName(tablePrefix, TableConstants.SYM_INCOMING_BATCH)); - - extract(export, 10000, "order by start_id, end_id desc", new File(tmpDir, "sym_data_gap.csv"), + extract(export, maxBatches, "order by start_id, end_id desc", new File(tmpDir, "sym_data_gap.csv"), TableConstants.getTableName(tablePrefix, TableConstants.SYM_DATA_GAP)); - + Map nodeSecurities = engine.getNodeService().findAllNodeSecurity(true); + Map channels = engine.getConfigurationService().getChannels(false); + String byChannelId = ""; + if (nodeSecurities != null && channels != null && nodeSecurities.size() * channels.size() < maxNodeChannels) { + byChannelId = "channel_id ,"; + } + extractQuery(engine.getSqlTemplate(), tmpDir + File.separator + "sym_outgoing_batch_summary.csv", + "select node_id, " + byChannelId + "status, count(*), sum(data_row_count), sum(byte_count), sum(error_flag), min(create_time), " + + "sum(router_millis), sum(extract_millis), sum(network_millis), sum(filter_millis), sum(load_millis), " + + "sum(fallback_insert_count), sum(fallback_update_count), sum(missing_delete_count), sum(skip_count), sum(ignore_count) " + + "from " + TableConstants.getTableName(tablePrefix, TableConstants.SYM_OUTGOING_BATCH) + + " group by node_id, " + byChannelId + "status"); + extractQuery(engine.getSqlTemplate(), tmpDir + File.separator + "sym_incoming_batch_summary.csv", + "select node_id, " + byChannelId + "status, count(*), sum(data_row_count), sum(byte_count), sum(error_flag), min(create_time), " + + "sum(router_millis), sum(extract_millis), sum(network_millis), sum(filter_millis), sum(load_millis), " + + "sum(fallback_insert_count), sum(fallback_update_count), sum(missing_delete_count), sum(skip_count), sum(ignore_count) " + + "from " + TableConstants.getTableName(tablePrefix, TableConstants.SYM_INCOMING_BATCH) + + " group by node_id, " + byChannelId + "status"); try { outputSymDataForBatchesInError(engine, tmpDir); } catch (Exception e) { log.warn("Failed to export data from batch in error", e); } - extract(export, new File(tmpDir, "sym_table_reload_request.csv"), TableConstants.getTableName(tablePrefix, TableConstants.SYM_TABLE_RELOAD_REQUEST)); - extract(export, new File(tmpDir, "sym_table_reload_status.csv"), TableConstants.getTableName(tablePrefix, TableConstants.SYM_TABLE_RELOAD_STATUS)); - extract(export, 5000, "order by relative_dir, file_name", new File(tmpDir, "sym_file_snapshot.csv"), TableConstants.getTableName(tablePrefix, TableConstants.SYM_FILE_SNAPSHOT)); - export.setIgnoreMissingTables(true); extract(export, new File(tmpDir, "sym_console_event.csv"), TableConstants.getTableName(tablePrefix, TableConstants.SYM_CONSOLE_EVENT)); - extract(export, new File(tmpDir, "sym_monitor_event.csv"), TableConstants.getTableName(tablePrefix, TableConstants.SYM_MONITOR_EVENT)); - extract(export, new File(tmpDir, "sym_extract_request.csv"), TableConstants.getTableName(tablePrefix, TableConstants.SYM_EXTRACT_REQUEST)); - extract(export, new File(tmpDir, "sym_context.csv"), TableConstants.getTableName(tablePrefix, TableConstants.SYM_CONTEXT)); - - extract(export, new File(tmpDir, "sym_node_host_channel_stats.csv"), + extract(export, 10000, "order by start_time desc", new File(tmpDir, "sym_node_host_channel_stats.csv"), TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_HOST_CHANNEL_STATS)); - - fos = null; + extract(export, new File(tmpDir, "sym_registration_request.csv"), + TableConstants.getTableName(tablePrefix, TableConstants.SYM_REGISTRATION_REQUEST)); try { - fos = new FileOutputStream(new File(tmpDir, "parameters.properties")); Properties effectiveParameters = engine.getParameterService().getAllParameters(); - SortedProperties parameters = new SortedProperties(); + Properties parameters = new Properties(); parameters.putAll(effectiveParameters); parameters.remove("db.password"); - parameters.store(fos, "parameters.properties"); - } catch (IOException e) { + writeProperties(parameters, tmpDir, "parameters.properties"); + } catch (Exception e) { log.warn("Failed to export parameter information", e); - } finally { - if(fos != null) { - try { - fos.close(); - } catch(IOException e) { } - } } - - fos = null; try { - fos = new FileOutputStream(new File(tmpDir, "parameters-changed.properties")); Properties defaultParameters = new Properties(); InputStream in = SnapshotUtil.class.getResourceAsStream("/symmetric-default.properties"); defaultParameters.load(in); - if(in != null) { + if (in != null) { try { in.close(); - } catch(IOException e) { } + } catch (IOException e) { + } } in = SnapshotUtil.class.getResourceAsStream("/symmetric-console-default.properties"); if (in != null) { defaultParameters.load(in); try { in.close(); - } catch(IOException e) { } + } catch (IOException e) { + } } Properties effectiveParameters = engine.getParameterService().getAllParameters(); - Properties changedParameters = new SortedProperties(); + Properties changedParameters = new Properties(); Map parameters = ParameterConstants.getParameterMetaData(); for (String key : parameters.keySet()) { String defaultValue = defaultParameters.getProperty((String) key); @@ -358,47 +327,52 @@ public static File createSnapshot(ISymmetricEngine engine, IProgressListener lis changedParameters.put(key, currentValue == null ? "" : currentValue); } } - changedParameters.remove("db.password"); - changedParameters.store(fos, "parameters-changed.properties"); + for (String name : new String[] { "db.password", "target.db.password", "smtp.password", "redshift.bulk.load.s3.access.key", + "redshift.bulk.load.s3.secret.key", "opensearch.load.aws.access.key", "opensearch.load.aws.secret.key", "cloud.bulk.load.s3.access.key", + "cloud.bulk.load.s3.secret.key", "cloud.bulk.load.azure.sas.token", "registration.secret" }) { + changedParameters.remove(name); + } + writeProperties(changedParameters, tmpDir, "parameters-changed.properties"); } catch (Exception e) { log.warn("Failed to export parameters-changed information", e); - } finally { - if(fos != null) { - try { - fos.close(); - } catch(IOException e) { } - } } - - fos = null; try { - fos = new FileOutputStream(new File(tmpDir, "system.properties")); - SortedProperties props = new SortedProperties(); + Properties props = new Properties(); props.putAll(System.getProperties()); - props.store(fos, "system.properties"); + writeProperties(props, tmpDir, "system.properties"); } catch (Exception e) { - log.warn("Failed to export thread information", e); - } finally { - if(fos != null) { - try { - fos.close(); - } catch(IOException e) { } + log.warn("Failed to export system information", e); + } + File logSummaryFile = new File(tmpDir, "log-summary.csv"); + try (OutputStream outputStream = new FileOutputStream(logSummaryFile)) { + CsvWriter csvWriter = new CsvWriter(outputStream, ',', Charset.defaultCharset()); + csvWriter.setEscapeMode(CsvWriter.ESCAPE_MODE_DOUBLED); + csvWriter.setForceQualifier(true); + csvWriter.writeRecord(new String[] { "Level", "First Time", "Last Time", "Count", "Message", "Stack Trace" }); + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + List logSummaries = LogSummaryAppenderUtils.getLogSummaryErrors(engine.getEngineName()); + logSummaries.addAll(LogSummaryAppenderUtils.getLogSummaryWarnings(engine.getEngineName())); + for (LogSummary s : logSummaries) { + csvWriter.writeRecord(new String[] { s.getLevel().name(), df.format(new Date(s.getFirstOccurranceTime())), df.format(new Date(s + .getMostRecentTime())), String.valueOf(s.getCount()), s.getMessage(), s.getStackTrace() }); } + csvWriter.flush(); + } catch (Exception e) { + log.warn("Failed to write log summaries"); } - if (targetDialect instanceof FirebirdSymmetricDialect) { + log.info("Writing Firebird info"); final String[] monTables = { "mon$database", "mon$attachments", "mon$transactions", "mon$statements", "mon$io_stats", "mon$record_stats", "mon$memory_usage", "mon$call_stack", "mon$context_variables" }; DbExport dbexport = new DbExport(targetPlatform); - dbexport.setFormat(Format.CSV); + dbexport.setFormat(Format.CSV_DQUOTE); dbexport.setNoCreateInfo(true); - for (String table : monTables) { extract(dbexport, new File(tmpDir, "firebird-" + table + ".csv"), table); } } - if (targetDialect instanceof MySqlSymmetricDialect) { + log.info("Writing MySQL info"); extractQuery(targetPlatform.getSqlTemplate(), tmpDir + File.separator + "mysql-processlist.csv", "show processlist"); extractQuery(targetPlatform.getSqlTemplate(), tmpDir + File.separator + "mysql-global-variables.csv", @@ -406,37 +380,29 @@ public static File createSnapshot(ISymmetricEngine engine, IProgressListener lis extractQuery(targetPlatform.getSqlTemplate(), tmpDir + File.separator + "mysql-session-variables.csv", "show session variables"); } - if (listener != null) { listener.checkpoint(engine.getEngineName(), 3, 5); } - if (!engine.getParameterService().is(ParameterConstants.CLUSTER_LOCKING_ENABLED)) { - try { + try (FileOutputStream fos = new FileOutputStream(new File(tmpDir, "sym_data_gap_cache.csv"))) { List gaps = engine.getRouterService().getDataGaps(); SimpleDateFormat dformat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); - fos = new FileOutputStream(new File(tmpDir, "sym_data_gap_cache.csv")); - fos.write("start_id,end_id,create_time\n".getBytes()); + dformat.setTimeZone(TimeZone.getTimeZone("GMT")); + fos.write("start_id,end_id,create_time\n".getBytes(Charset.defaultCharset())); if (gaps != null) { for (DataGap gap : gaps) { - fos.write((gap.getStartId() + "," + gap.getEndId() + ",\"" + dformat.format(gap.getCreateTime()) + "\",\"" + "\"\n").getBytes()); + fos.write((gap.getStartId() + "," + gap.getEndId() + ",\"" + dformat.format(gap.getCreateTime()) + "\",\"" + "\"\n").getBytes(Charset + .defaultCharset())); } } } catch (Exception e) { log.warn("Failed to export data gap information", e); - } finally { - if(fos != null) { - try { - fos.close(); - } catch(IOException e) { } - } - } + } } - + log.info("Writing threads info"); createThreadsFile(tmpDir.getPath(), false); createThreadsFile(tmpDir.getPath(), true); createThreadStatsFile(tmpDir.getPath()); - try { List transactions = targetPlatform.getTransactions(); if (!transactions.isEmpty()) { @@ -445,115 +411,72 @@ public static File createSnapshot(ISymmetricEngine engine, IProgressListener lis } catch (Exception e) { log.warn("Failed to create transactions file", e); } - writeRuntimeStats(engine, tmpDir); writeJobsStats(engine, tmpDir); - if ("true".equals(System.getProperty(SystemConstants.SYSPROP_STANDALONE_WEB))) { writeDirectoryListing(engine, tmpDir); } - - File logDir = null; - - String parameterizedLogDir = parameterService.getString("server.log.dir"); - if (isNotBlank(parameterizedLogDir)) { - logDir = new File(parameterizedLogDir); - } - - if (logDir != null && logDir.exists()) { - log.info("Using server.log.dir setting as the location of the log files"); - } else { + writeDirectoryStaging(engine, tmpDir); + File logDir = LogSummaryAppenderUtils.getLogDir(); + if (logDir == null || !logDir.exists()) { logDir = new File("logs"); } - - File matches = null; - if (!logDir.exists()) { - matches = LogSummaryAppenderUtils.getLogFile(); - if (matches != null) { - logDir = matches.getParentFile(); - } - } - if (!logDir.exists()) { logDir = new File("../logs"); } - if (!logDir.exists()) { logDir = new File("target"); } - if (logDir.exists()) { - log.info("Copying log files into snapshot file"); + log.info("Copying log files"); File[] files = logDir.listFiles(); if (files != null) { for (File file : files) { - String lowerCaseFileName = file.getName().toLowerCase(); - if ((lowerCaseFileName.contains(".log") && (lowerCaseFileName.contains("symmetric") || lowerCaseFileName.contains("wrapper"))) - || compareLogFileName(lowerCaseFileName, matches)) { + String name = file.getName().toLowerCase(); + if ((name.endsWith(".log") || name.endsWith(".log.1") || name.endsWith(".log.2") || name.endsWith(".log.3")) && (name.contains("symmetric") + || name.contains("wrapper"))) { try { FileUtils.copyFileToDirectory(file, tmpDir); - } catch (IOException e) { + } catch (Exception e) { log.warn("Failed to copy " + file.getName() + " to the snapshot directory", e); } } } } } - if (listener != null) { listener.checkpoint(engine.getEngineName(), 4, 5); } - File jarFile = null; try { String filename = tmpDir.getName() + ".zip"; if (parameterService.is(ParameterConstants.SNAPSHOT_FILE_INCLUDE_HOSTNAME)) filename = AppUtils.getHostName() + "_" + filename; - jarFile = new File(getSnapshotDirectory(engine), filename); + jarFile = new File(getSnapshotDirectory(engine), filename); ZipBuilder builder = new ZipBuilder(tmpDir, jarFile, new File[] { tmpDir }); builder.build(); FileUtils.deleteDirectory(tmpDir); } catch (Exception e) { throw new IoException("Failed to package snapshot files into archive", e); } - if (listener != null) { listener.checkpoint(engine.getEngineName(), 5, 5); } - log.info("Done creating snapshot file"); return jarFile; } - - private static boolean compareLogFileName(String fileName, File matches) { - boolean ret = false; - if (fileName != null && fileName.length() > 0 && matches != null) { - if (fileName.toLowerCase().contains(matches.getName().toLowerCase())) { - ret = true; - } - } - return ret; - } protected static void extract(DbExport export, File file, String... tables) { extract(export, Integer.MAX_VALUE, null, file, tables); } protected static void extract(DbExport export, int maxRows, String whereClause, File file, String... tables) { - FileOutputStream fos = null; - try { - fos = new FileOutputStream(file); + try (FileOutputStream fos = new FileOutputStream(file)) { export.setMaxRows(maxRows); export.setWhereClause(whereClause); export.exportTables(fos, tables); } catch (Exception e) { log.warn("Failed to export table definitions", e); - } finally { - if (fos != null) { - try { - fos.close(); - } catch(IOException e) { } - } } } @@ -562,6 +485,8 @@ protected static void extractQuery(ISqlTemplate sqlTemplate, String fileName, St try { List rows = sqlTemplate.query(sql); writer = new CsvWriter(fileName); + writer.setEscapeMode(CsvWriter.ESCAPE_MODE_DOUBLED); + writer.setForceQualifier(true); boolean isFirstRow = true; for (Row row : rows) { if (isFirstRow) { @@ -579,7 +504,7 @@ protected static void extractQuery(ISqlTemplate sqlTemplate, String fileName, St } catch (Exception e) { log.warn("Failed to run extract query " + sql, e); } finally { - if(writer != null) { + if (writer != null) { writer.close(); } } @@ -591,81 +516,85 @@ protected static void writeDirectoryListing(ISymmetricEngine engine, File tmpDir if (home.getName().equalsIgnoreCase("bin")) { home = home.getParentFile(); } - + log.info("Writing directory listing of {}", home); StringBuilder output = new StringBuilder(); - Comparator fileComparator = new Comparator() { - @Override - public int compare(File o1, File o2) { - return o1.getPath().compareToIgnoreCase(o2.getPath()); - } - }; - printDirectoryContents(home, output, fileComparator); + int maxFiles = engine.getParameterService().getInt(ParameterConstants.SNAPSHOT_MAX_FILES); + printDirectoryContents(home, output, new PrintDirConfig(maxFiles, engine.getStagingManager().getStagingDirectory().getParentFile())); FileUtils.write(new File(tmpDir, "directory-listing.txt"), output, Charset.defaultCharset(), false); } catch (Exception ex) { log.warn("Failed to output the directory listing", ex); } } - protected static void printDirectoryContents(File dir, StringBuilder output, Comparator fileComparator) throws IOException { + protected static void writeDirectoryStaging(ISymmetricEngine engine, File tmpDir) { + try { + log.info("Writing staging listing of {}", engine.getStagingManager().getStagingDirectory()); + StringBuilder output = new StringBuilder(); + int maxFiles = engine.getParameterService().getInt(ParameterConstants.SNAPSHOT_MAX_FILES); + printDirectoryContents(engine.getStagingManager().getStagingDirectory(), output, new PrintDirConfig(maxFiles)); + FileUtils.write(new File(tmpDir, "directory-staging.txt"), output, Charset.defaultCharset(), false); + } catch (Exception ex) { + log.warn("Failed to output the directory staging", ex); + } + } + + protected static void printDirectoryContents(File dir, StringBuilder output, PrintDirConfig config) throws IOException { + if (config.getFileCount() >= config.getMaxCount()) { + return; + } output.append("\n"); output.append(dir.getCanonicalPath()); output.append("\n"); - File[] files = dir.listFiles(); if (files != null) { - Arrays.parallelSort(files, fileComparator); + Arrays.parallelSort(files, config.getFileComparator()); for (File file : files) { output.append(" "); + output.append(file.isDirectory() ? "d" : "-"); output.append(file.canRead() ? "r" : "-"); output.append(file.canWrite() ? "w" : "-"); output.append(file.canExecute() ? "x" : "-"); output.append(StringUtils.leftPad(file.length() + "", 11)); output.append(" "); - output.append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(file.lastModified()))); + output.append(config.getDateFormat().format(new Date(file.lastModified()))); output.append(" "); output.append(file.getName()); output.append("\n"); + if (config.incrementFileCount() >= config.getMaxCount()) { + output.append("\n*** MAX LIMIT OF " + config.getMaxCount() + " FILES ***\n"); + return; + } } - for (File file : files) { - if (file.isDirectory()) { - printDirectoryContents(file, output, fileComparator); + if (file.isDirectory() && (config.getExcludeDir() == null || (!config.getExcludeDir().equals(dir) + && !file.getName().equalsIgnoreCase("tmp")))) { + printDirectoryContents(file, output, config); } } } - } protected static void writeRuntimeStats(ISymmetricEngine engine, File tmpDir) { - FileOutputStream fos = null; + log.info("Writing runtime stats"); try { - fos = new FileOutputStream(new File(tmpDir, "runtime-stats.properties")); - Properties runtimeProperties = new Properties() { - private static final long serialVersionUID = 1L; - - public synchronized Enumeration keys() { - return Collections.enumeration(new TreeSet(super.keySet())); - } - }; - + Properties runtimeProperties = new Properties(); DataSource dataSource = engine.getDatabasePlatform().getDataSource(); if (dataSource instanceof BasicDataSource) { + @SuppressWarnings("resource") BasicDataSource dbcp = (BasicDataSource) dataSource; runtimeProperties.setProperty("connections.idle", String.valueOf(dbcp.getNumIdle())); runtimeProperties.setProperty("connections.used", String.valueOf(dbcp.getNumActive())); runtimeProperties.setProperty("connections.max", String.valueOf(dbcp.getMaxTotal())); } - Runtime rt = Runtime.getRuntime(); DecimalFormat df = new DecimalFormat("#,###"); - runtimeProperties.setProperty("memory.free", df.format(rt.freeMemory())); - runtimeProperties.setProperty("memory.used", df.format(rt.totalMemory() - rt.freeMemory())); - runtimeProperties.setProperty("memory.max", df.format(rt.maxMemory())); - + runtimeProperties.setProperty("memory.jvm.free", df.format(rt.freeMemory())); + runtimeProperties.setProperty("memory.jvm.used", df.format(rt.totalMemory() - rt.freeMemory())); + runtimeProperties.setProperty("memory.jvm.max", df.format(rt.maxMemory())); List memoryPools = new ArrayList(ManagementFactory.getMemoryPoolMXBeans()); long usedHeapMemory = 0; for (MemoryPoolMXBean memoryPool : memoryPools) { - if (memoryPool.getType().equals(MemoryType.HEAP)) { + if (memoryPool.getType() == MemoryType.HEAP) { MemoryUsage memoryUsage = memoryPool.getCollectionUsage(); runtimeProperties.setProperty("memory.heap." + memoryPool.getName().toLowerCase().replaceAll(" ", "."), df.format(memoryUsage.getUsed())); @@ -673,16 +602,18 @@ public synchronized Enumeration keys() { } } runtimeProperties.setProperty("memory.heap.total", df.format(usedHeapMemory)); - + try { + Method method = ManagementFactory.getOperatingSystemMXBean().getClass().getMethod("getTotalPhysicalMemorySize"); + method.setAccessible(true); + runtimeProperties.setProperty("memory.system.total", df.format((Long) method.invoke(ManagementFactory.getOperatingSystemMXBean()))); + } catch (Exception ignore) { + } OperatingSystemMXBean osBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); runtimeProperties.setProperty("os.name", System.getProperty("os.name") + " (" + System.getProperty("os.arch") + ")"); runtimeProperties.setProperty("os.processors", String.valueOf(osBean.getAvailableProcessors())); runtimeProperties.setProperty("os.load.average", String.valueOf(osBean.getSystemLoadAverage())); - runtimeProperties.setProperty("engine.is.started", Boolean.toString(engine.isStarted())); - runtimeProperties.setProperty("engine.last.restart", engine.getLastRestartTime() != null ? - engine.getLastRestartTime().toString() : ""); - + runtimeProperties.setProperty("engine.last.restart", engine.getLastRestartTime() != null ? engine.getLastRestartTime().toString() : ""); runtimeProperties.setProperty("time.server", new Date().toString()); runtimeProperties.setProperty("time.database", new Date(engine.getTargetDialect().getDatabaseTime()).toString()); runtimeProperties.setProperty("batch.unrouted.data.count", df.format(engine.getRouterService().getUnroutedDataCount())); @@ -692,17 +623,14 @@ public synchronized Enumeration keys() { df.format(engine.getOutgoingBatchService().countOutgoingBatchesUnsent())); runtimeProperties.setProperty("batch.incoming.errors.count", df.format(engine.getIncomingBatchService().countIncomingBatchesInError())); - List gaps = engine.getDataService().findDataGapsUnchecked(); runtimeProperties.setProperty("data.gap.count", df.format(gaps.size())); if (gaps.size() > 0) { runtimeProperties.setProperty("data.gap.start.id", df.format(gaps.get(0).getStartId())); runtimeProperties.setProperty("data.gap.end.id", df.format(gaps.get(gaps.size() - 1).getEndId())); } - runtimeProperties.setProperty("data.id.min", df.format(engine.getDataService().findMinDataId())); runtimeProperties.setProperty("data.id.max", df.format(engine.getDataService().findMaxDataId())); - String jvmTitle = Runtime.class.getPackage().getImplementationTitle(); runtimeProperties.put("jvm.title", jvmTitle != null ? jvmTitle : "Unknown"); String jvmVendor = Runtime.class.getPackage().getImplementationVendor(); @@ -712,10 +640,10 @@ public synchronized Enumeration keys() { RuntimeMXBean runtimeMxBean = ManagementFactory.getRuntimeMXBean(); List arguments = runtimeMxBean.getInputArguments(); runtimeProperties.setProperty("jvm.arguments", arguments.toString()); + runtimeProperties.setProperty("jvm.bits", System.getProperty("sun.arch.data.model", System.getProperty("com.ibm.vm.bitmode"))); runtimeProperties.setProperty("hostname", AppUtils.getHostName()); runtimeProperties.setProperty("instance.id", engine.getClusterService().getInstanceId()); runtimeProperties.setProperty("server.id", engine.getClusterService().getServerId()); - try { MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); ObjectName oName = new ObjectName("java.lang:type=OperatingSystem"); @@ -723,21 +651,31 @@ public synchronized Enumeration keys() { runtimeProperties.setProperty("file.descriptor.max.count", mbeanServer.getAttribute(oName, "MaxFileDescriptorCount").toString()); } catch (Exception e) { } - - runtimeProperties.store(fos, "runtime-stats.properties"); + writeProperties(runtimeProperties, tmpDir, "runtime-stats.properties"); } catch (Exception e) { log.warn("Failed to export runtime-stats information", e); - } finally { - if(fos != null) { - try { - fos.close(); - } catch(IOException e) { } + } + } + + protected static void writeProperties(Properties properties, File tmpDir, String fileName) { + try (BufferedWriter bw = new BufferedWriter(new FileWriter(new File(tmpDir, fileName)))) { + List keys = new ArrayList(); + for (Object key : properties.keySet()) { + keys.add(key.toString()); } + Collections.sort(keys); + for (String key : keys) { + bw.write(key + "=" + properties.getProperty(key).replace("\n", "\\n").replace("\r", "\\r")); + bw.newLine(); + } + } catch (Exception e) { + log.warn("Failed to write " + fileName, e); } } protected static void writeJobsStats(ISymmetricEngine engine, File tmpDir) { - try(FileWriter writer = new FileWriter(new File(tmpDir, "jobs.txt"))) { + log.info("Writing job stats"); + try (FileWriter writer = new FileWriter(new File(tmpDir, "jobs.txt"))) { IJobManager jobManager = engine.getJobManager(); IClusterService clusterService = engine.getClusterService(); INodeService nodeService = engine.getNodeService(); @@ -745,7 +683,8 @@ protected static void writeJobsStats(ISymmetricEngine engine, File tmpDir) { + nodeService.findNodeHosts(nodeService.findIdentityNodeId()).size() + " instances in the cluster\n\n"); writer.write(StringUtils.rightPad("Job Name", 30) + StringUtils.rightPad("Schedule", 20) + StringUtils.rightPad("Status", 10) + StringUtils.rightPad("Server Id", 30) + StringUtils.rightPad("Last Server Id", 30) - + StringUtils.rightPad("Last Finish Time", 30) + StringUtils.rightPad("Last Run Period", 20) + + StringUtils.rightPad("Last Finish Time", 30) + StringUtils.rightPad("Next Run Time", 30) + + StringUtils.rightPad("Last Run Period", 20) + StringUtils.rightPad("Avg. Run Period", 20) + "\n"); List jobs = jobManager.getJobs(); Map locks = clusterService.findLocks(); @@ -757,17 +696,17 @@ protected static void writeJobsStats(ISymmetricEngine engine, File tmpDir) { if (lock != null) { lastServerId = lock.getLastLockingServerId(); } - String schedule = job.getSchedule(); - String lastFinishTime = getLastFinishTime(job, lock); - - writer.write(StringUtils.rightPad(job.getName().replace("_", " "), 30)+ - StringUtils.rightPad(schedule, 20) + StringUtils.rightPad(status, 10) + - StringUtils.rightPad(runningServerId == null ? "" : runningServerId, 30) + - StringUtils.rightPad(lastServerId == null ? "" : lastServerId, 30) + - StringUtils.rightPad(lastFinishTime == null ? "" : lastFinishTime, 30) + - StringUtils.rightPad(job.getLastExecutionTimeInMs() + "", 20) + + String nextRunTime = job.getNextExecutionTime() == null ? "" : + DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(job.getNextExecutionTime()); + writer.write(StringUtils.rightPad(job.getName().replace("_", " "), 30) + + StringUtils.rightPad(schedule, 20) + StringUtils.rightPad(status, 10) + + StringUtils.left(StringUtils.rightPad(runningServerId == null ? "" : runningServerId, 30), 30) + + StringUtils.left(StringUtils.rightPad(lastServerId == null ? "" : lastServerId, 30), 30) + + StringUtils.rightPad(lastFinishTime == null ? "" : lastFinishTime, 30) + + StringUtils.rightPad(nextRunTime, 30) + + StringUtils.rightPad(job.getLastExecutionTimeInMs() + "", 20) + StringUtils.rightPad(job.getAverageExecutionTimeInMs() + "", 20) + "\n"); } } catch (Exception e) { @@ -799,14 +738,13 @@ protected static String getLastFinishTime(IJob job, Lock lock) { public static File createThreadsFile(String parent, boolean isFiltered) { File file = new File(parent, isFiltered ? "threads-filtered.txt" : "threads.txt"); - try(FileWriter fwriter = new FileWriter(file)) { + try (FileWriter fwriter = new FileWriter(file)) { ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); long[] threadIds = threadBean.getAllThreadIds(); for (long l : threadIds) { ThreadInfo info = threadBean.getThreadInfo(l, 100); if (info != null) { String threadName = info.getThreadName(); - boolean skip = isFiltered; if (isFiltered) { for (StackTraceElement element : info.getStackTrace()) { @@ -820,7 +758,6 @@ public static File createThreadsFile(String parent, boolean isFiltered) { } } } - if (!skip) { fwriter.append(StringUtils.rightPad(threadName, THREAD_INDENT_SPACE)); fwriter.append(AppUtils.formatStackTrace(info.getStackTrace(), THREAD_INDENT_SPACE, false)); @@ -833,14 +770,14 @@ public static File createThreadsFile(String parent, boolean isFiltered) { } return file; } - + public static File createThreadStatsFile(String parent) { - File file = new File(parent, "thread-stats.csv"); - try { - OutputStream outputStream = new FileOutputStream(file); - CsvWriter csvWriter = new CsvWriter(outputStream, ',', Charset.forName("ISO-8859-1")); - String[] heading = {"Thread", "Allocated Memory (Bytes)", "CPU Time (Seconds)"}; - csvWriter.writeRecord(heading); + File file = new File(parent, "threads-stats.csv"); + try (OutputStream outputStream = new FileOutputStream(file)) { + CsvWriter csvWriter = new CsvWriter(outputStream, ',', Charset.forName("ISO-8859-1")); + csvWriter.setEscapeMode(CsvWriter.ESCAPE_MODE_DOUBLED); + String[] heading = { "Thread", "Allocated Memory (Bytes)", "CPU Time (Seconds)" }; + csvWriter.writeRecord(heading); ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); long[] threadIds = threadBean.getAllThreadIds(); for (long l : threadIds) { @@ -849,27 +786,23 @@ public static File createThreadStatsFile(String parent) { String threadName = info.getThreadName(); long threadId = info.getThreadId(); long allocatedBytes = 0; - - try { - Method method = threadBean.getClass().getMethod("getThreadAllocatedBytes"); - method.setAccessible(true); - allocatedBytes = (Long) method.invoke(threadBean, threadId); - } catch (Exception ignore) { - } - - String[] row = {threadName, Long.toString(allocatedBytes), Float.toString(threadBean.getThreadCpuTime(threadId) / 1000000000f)}; + try { + Method method = threadBean.getClass().getMethod("getThreadAllocatedBytes"); + method.setAccessible(true); + allocatedBytes = (Long) method.invoke(threadBean, threadId); + } catch (Exception ignore) { + } + String[] row = { threadName, Long.toString(allocatedBytes), Float.toString(threadBean.getThreadCpuTime(threadId) / 1000000000f) }; csvWriter.writeRecord(row); } } csvWriter.flush(); - outputStream.flush(); - outputStream.close(); - } catch (IOException e) { - log.warn("Failed to export thread information", e); + } catch (Exception e) { + log.warn("Failed to export thread information", e); } return file; } - + private static File createTransactionsFile(ISymmetricEngine engine, String parent, List transactions) { Map transactionMap = new HashMap(); for (Transaction transaction : transactions) { @@ -881,9 +814,9 @@ private static File createTransactionsFile(ISymmetricEngine engine, String paren MonitorTypeBlock.filterTransactions(transaction, transactionMap, filteredTransactions, dbUser, false, false); } File file = new File(parent, "transactions.csv"); - try { - OutputStream outputStream = new FileOutputStream(file); + try (OutputStream outputStream = new FileOutputStream(file)) { CsvWriter csvWriter = new CsvWriter(outputStream, ',', Charset.forName("ISO-8859-1")); + csvWriter.setEscapeMode(CsvWriter.ESCAPE_MODE_DOUBLED); String[] heading = { "ID", "Username", "Remote IP", "Remote Host", "Status", "Reads", "Writes", "Blocking ID", "Duration", "Text" }; csvWriter.writeRecord(heading); @@ -897,14 +830,12 @@ private static File createTransactionsFile(ISymmetricEngine engine, String paren csvWriter.writeRecord(row); } csvWriter.flush(); - outputStream.flush(); - outputStream.close(); - } catch (IOException e) { + } catch (Exception e) { log.warn("Failed to create transactions file", e); } return file; } - + private static void addTableToMap(HashMap> catalogSchemas, CatalogSchema catalogSchema, Table table) { List
tables = catalogSchemas.get(catalogSchema); if (tables == null) { @@ -914,21 +845,11 @@ private static void addTableToMap(HashMap> catalogSch tables.add(table); } - static class SortedProperties extends Properties { - private static final long serialVersionUID = 1L; - - @Override - public synchronized Enumeration keys() { - return Collections.enumeration(new TreeSet(super.keySet())); - } - }; - public static void outputSymDataForBatchesInError(ISymmetricEngine engine, File tmpDir) { String tablePrefix = engine.getTablePrefix(); DbExport export = new DbExport(engine.getDatabasePlatform()); export.setFormat(Format.CSV_DQUOTE); export.setNoCreateInfo(true); - // Create files for each batch in error for (OutgoingBatch batch : engine.getOutgoingBatchService().getOutgoingBatchErrors(10000).getBatches()) { if (batch.getFailedDataId() > 0) { @@ -937,18 +858,18 @@ public static void outputSymDataForBatchesInError(ISymmetricEngine engine, File // Write sym_data to file String filenameCaptured = batch.getBatchId() + "_captured.csv"; String whereClause = "where data_id = " + data.getDataId(); - extract(export, 10000, whereClause, new File(tmpDir, filenameCaptured), - TableConstants.getTableName(tablePrefix, TableConstants.SYM_DATA)); - + extract(export, 10000, whereClause, new File(tmpDir, filenameCaptured), + TableConstants.getTableName(tablePrefix, TableConstants.SYM_DATA)); // Write parsed row data to file String filenameParsed = tmpDir + File.separator + batch.getBatchId() + "_parsed.csv"; CsvWriter writer = null; try { writer = new CsvWriter(filenameParsed); + writer.setEscapeMode(CsvWriter.ESCAPE_MODE_DOUBLED); writer.writeRecord(data.getTriggerHistory().getParsedColumnNames()); writer.writeRecord(data.toParsedRowData()); writer.writeRecord(data.toParsedOldData()); - } catch (IOException e) { + } catch (Exception e) { log.warn("Failed to write parsed row data from sym_data to file " + filenameParsed, e); } finally { if (writer != null) { @@ -962,4 +883,152 @@ public static void outputSymDataForBatchesInError(ISymmetricEngine engine, File } } + protected static void addTablesThatLoadIncoming(ISymmetricEngine engine, HashMap> catalogSchemas) { + ITriggerRouterService triggerRouterService = engine.getTriggerRouterService(); + IParameterService parameterService = engine.getParameterService(); + IDatabasePlatform targetPlatform = engine.getSymmetricDialect().getTargetPlatform(); + Node targetNode = engine.getNodeService().findIdentity(); + List nodes = engine.getNodeService().findAllNodes(); + Map sampleNodeForGroup = new HashMap(); + for (Node node : nodes) { + sampleNodeForGroup.put(node.getNodeGroupId(), node); + } + Map>> extractTransformMap = new HashMap>>(); + Map>> loadTransformMap = new HashMap>>(); + List triggerRouters = triggerRouterService.getTriggerRoutersForTargetNode(parameterService.getNodeGroupId()); + for (TriggerRouter triggerRouter : triggerRouters) { + Trigger trigger = triggerRouter.getTrigger(); + if (!trigger.isSourceWildCarded()) { + String catalog = null; + String schema = null; + String tableName = trigger.getSourceTableName(); + Router router = triggerRouter.getRouter(); + Node sourceNode = sampleNodeForGroup.get(router.getNodeGroupLink().getSourceNodeGroupId()); + if (router.isUseSourceCatalogSchema()) { + catalog = trigger.getSourceCatalogName(); + schema = trigger.getSourceSchemaName(); + } + if (StringUtils.equals(Constants.NONE_TOKEN, router.getTargetCatalogName())) { + catalog = null; + } else if (StringUtils.isNotBlank(router.getTargetCatalogName())) { + catalog = SymmetricUtils.replaceNodeVariables(sourceNode, targetNode, router.getTargetCatalogName()); + } + if (StringUtils.equals(Constants.NONE_TOKEN, router.getTargetSchemaName())) { + schema = null; + } else if (StringUtils.isNotBlank(router.getTargetSchemaName())) { + schema = SymmetricUtils.replaceNodeVariables(sourceNode, targetNode, router.getTargetSchemaName()); + } + if (StringUtils.isNotBlank(router.getTargetTableName())) { + tableName = router.getTargetTableName(); + } + List
tablesToLookup = new ArrayList
(); + Map> byTableExtractTransforms = getByTableTransforms(engine.getTransformService(), extractTransformMap, router + .getNodeGroupLink(), TransformPoint.EXTRACT); + String tableKey = Table.getFullyQualifiedTableName(catalog, schema, tableName).toLowerCase(); + List extractTransforms = byTableExtractTransforms.get(tableKey); + if (extractTransforms != null && extractTransforms.size() > 0) { + for (TransformTable transform : extractTransforms) { + tablesToLookup.add(new Table(transform.getTargetCatalogName(), transform.getTargetSchemaName(), transform.getTargetTableName())); + } + } + if (tablesToLookup.size() == 0) { + tablesToLookup.add(new Table(catalog, schema, tableName)); + } + Map> byTableLoadTransforms = getByTableTransforms(engine.getTransformService(), loadTransformMap, router + .getNodeGroupLink(), TransformPoint.LOAD); + ListIterator
iterator = tablesToLookup.listIterator(); + while (iterator.hasNext()) { + Table table = iterator.next(); + List loadTransforms = byTableLoadTransforms.get(table.getFullyQualifiedTableName().toLowerCase()); + if (loadTransforms != null && loadTransforms.size() > 0) { + iterator.remove(); + for (TransformTable transform : loadTransforms) { + iterator.add(new Table(transform.getTargetCatalogName(), transform.getTargetSchemaName(), transform.getTargetTableName())); + } + } + } + for (Table table : tablesToLookup) { + table = targetPlatform.getTableFromCache(table.getCatalog(), table.getSchema(), table.getName(), false); + if (table != null) { + addTableToMap(catalogSchemas, new CatalogSchema(table.getCatalog(), table.getSchema()), table); + } + } + } + } + } + + protected static Map> getByTableTransforms(ITransformService transformService, + Map>> transformMap, NodeGroupLink nodeGroupLink, TransformPoint transformPoint) { + Map> byTableTransforms = transformMap.get(nodeGroupLink); + if (byTableTransforms == null) { + List transforms = transformService.findTransformsFor(nodeGroupLink, transformPoint); + byTableTransforms = toMap(transforms); + transformMap.put(nodeGroupLink, byTableTransforms); + } + return byTableTransforms; + } + + protected static Map> toMap(List transforms) { + Map> transformsByTable = new HashMap>(); + if (transforms != null) { + for (TransformTable transformTable : transforms) { + String sourceTableName = transformTable.getFullyQualifiedSourceTableName().toLowerCase(); + List tables = transformsByTable.get(sourceTableName); + if (tables == null) { + tables = new ArrayList(); + transformsByTable.put(sourceTableName, tables); + } + tables.add(transformTable); + } + } + return transformsByTable; + } + + static class FileComparator implements Comparator { + @Override + public int compare(File o1, File o2) { + return o1.getPath().compareToIgnoreCase(o2.getPath()); + } + } + + static class PrintDirConfig { + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + Comparator fileComparator = new FileComparator(); + int fileCount; + int maxCount; + File excludeDir; + + public PrintDirConfig(int maxCount) { + this.maxCount = maxCount; + } + + public PrintDirConfig(int maxCount, File excludeDir) { + this.maxCount = maxCount; + this.excludeDir = excludeDir; + } + + public Comparator getFileComparator() { + return fileComparator; + } + + public SimpleDateFormat getDateFormat() { + return df; + } + + public int incrementFileCount() { + return fileCount++; + } + + public int getFileCount() { + return fileCount; + } + + public int getMaxCount() { + return maxCount; + } + + public File getExcludeDir() { + return excludeDir; + } + } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java index 04d4fb57eb..2299020cdc 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java @@ -534,6 +534,9 @@ private ParameterConstants() { public final static String CLOUD_BULK_LOAD_AZURE_SAS_TOKEN = "cloud.bulk.load.azure.sas.token"; public final static String SNAPSHOT_FILE_INCLUDE_HOSTNAME = "snapshot.file.include.hostname"; + public final static String SNAPSHOT_MAX_FILES = "snapshot.max.files"; + public final static String SNAPSHOT_MAX_BATCHES = "snapshot.max.batches"; + public final static String SNAPSHOT_MAX_NODE_CHANNELS = "snapshot.max.node.channels"; public final static String REDSHIFT_APPEND_TO_COPY_COMMAND = "redshift.append.to.copy.command"; public final static String REDSHIFT_BULK_LOAD_MAX_ROWS_BEFORE_FLUSH = "redshift.bulk.load.max.rows.before.flush"; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ITriggerRouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ITriggerRouterService.java index bd6f70d8c6..0cec5bfc64 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ITriggerRouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ITriggerRouterService.java @@ -112,6 +112,8 @@ public interface ITriggerRouterService { public void saveRouter(Router router); public List getAllTriggerRoutersForCurrentNode(String sourceNodeGroupId); + + public List getTriggerRoutersForTargetNode(String targetNodeGroupId); /** * Get a list of all the triggers that have been defined for the system. diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java index 7fa8e4bf2f..a8c3732fb1 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java @@ -982,6 +982,12 @@ public List getAllTriggerRoutersForCurrentNode(String sourceNodeG return triggerRouters; } + public List getTriggerRoutersForTargetNode(String targetNodeGroupId) { + List triggerRouters = enhanceTriggerRouters(sqlTemplate.query(getTriggerRouterSql("activeTriggersForTargetNodeGroupSql"), + new TriggerRouterMapper(), targetNodeGroupId)); + return triggerRouters; + } + public List getAllTriggerRoutersForReloadForCurrentNode( String sourceNodeGroupId, String targetNodeGroupId) { return enhanceTriggerRouters(sqlTemplate.query( diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterServiceSqlMap.java index 475f42c63e..1dd2eee8c9 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterServiceSqlMap.java @@ -96,6 +96,8 @@ public TriggerRouterServiceSqlMap(IDatabasePlatform platform, putSql("activeTriggersForSourceNodeGroupSql", "" + "where r.source_node_group_id = ? "); + putSql("activeTriggersForTargetNodeGroupSql", "where r.target_node_group_id = ?"); + putSql("activeTriggersForReloadSql", "" + "where r.source_node_group_id = ? and " + " r.target_node_group_id = ? and t.channel_id != ? and tr.enabled=1 order by " diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/util/LogSummaryAppenderUtils.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/util/LogSummaryAppenderUtils.java index c1c18e2285..7c5d7e71c5 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/util/LogSummaryAppenderUtils.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/util/LogSummaryAppenderUtils.java @@ -143,7 +143,7 @@ public static List getLogSummaries(String engineName, Level level) { } } - public File getLogDir() { + public static File getLogDir() { if (helper != null) { return helper.getLogDir(); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/util/SymmetricUtils.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/util/SymmetricUtils.java index b18c8661ce..86a0dea86b 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/util/SymmetricUtils.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/util/SymmetricUtils.java @@ -41,6 +41,7 @@ import org.jumpmind.symmetric.Version; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.db.ISymmetricDialect; +import org.jumpmind.symmetric.model.Node; import org.jumpmind.util.AppUtils; import org.jumpmind.util.CollectionUtils; import org.jumpmind.util.FormatUtils; @@ -141,7 +142,21 @@ public static final void replaceSystemAndEnvironmentVariables(Properties propert } } } - + + public static String replaceNodeVariables(Node sourceNode, Node targetNode, String str) { + if (sourceNode != null) { + str = FormatUtils.replace("sourceNodeId", sourceNode.getNodeId(), str); + str = FormatUtils.replace("sourceExternalId", sourceNode.getExternalId(), str); + str = FormatUtils.replace("sourceNodeGroupId", sourceNode.getNodeGroupId(), str); + } + if (targetNode != null) { + str = FormatUtils.replace("targetNodeId", targetNode.getNodeGroupId(), str); + str = FormatUtils.replace("targetExternalId", targetNode.getExternalId(), str); + str = FormatUtils.replace("targetNodeGroupId", targetNode.getNodeGroupId(), str); + } + return str; + } + public static void logNotices() { synchronized (SymmetricUtils.class) { if (isNoticeLogged) { diff --git a/symmetric-core/src/main/resources/symmetric-default.properties b/symmetric-core/src/main/resources/symmetric-default.properties index 08c1d63e5d..20bd043127 100644 --- a/symmetric-core/src/main/resources/symmetric-default.properties +++ b/symmetric-core/src/main/resources/symmetric-default.properties @@ -3019,6 +3019,27 @@ cloud.bulk.load.azure.sas.token= # Type: boolean snapshot.file.include.hostname=false +# Max number of files to write in directory listing for support snapshot. +# +# DatabaseOverridable: true +# Tags: other +# Type: integer +snapshot.max.files=50000 + +# Max number of batches to write to statistics listing for support snapshot. +# +# DatabaseOverridable: true +# Tags: other +# Type: integer +snapshot.max.batches=10000 + +# Max number of nodes and channels for batch statistics, after which it will group by node only. +# +# DatabaseOverridable: true +# Tags: other +# Type: integer +snapshot.max.node.channels=5000 + # Log Miner job to find changes from a database archive log # # DatabaseOverridable: false diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/IStagingManager.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/IStagingManager.java index 407f502d06..677a8ee5d1 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/IStagingManager.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/IStagingManager.java @@ -20,6 +20,7 @@ */ package org.jumpmind.symmetric.io.stage; +import java.io.File; import java.util.Set; public interface IStagingManager { @@ -36,4 +37,5 @@ public interface IStagingManager { public StagingFileLock acquireFileLock(String serverInfo, Object... path); + public File getStagingDirectory(); } diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java index d85cf12bd4..c20ce86d25 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java @@ -319,6 +319,11 @@ public StagingFileLock acquireFileLock(String serverInfo, Object... path) { return stagingFileLock; } + @Override + public File getStagingDirectory() { + return directory; + } + protected static final DirectoryStream.Filter STAGING_FILE_FILTER = new DirectoryStream.Filter() { @Override public boolean accept(Path entry) {