Skip to content

Commit

Permalink
testing create table during initial load
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed May 1, 2012
1 parent 8297a56 commit f2f8c99
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 59 deletions.
Expand Up @@ -120,14 +120,28 @@ public void beforeRouting() {
.getTime() + transactionViewClockSyncThresholdInMs)) {
if (dataService.countDataInRange(dataGap.getStartId() - 1,
dataGap.getEndId() + 1) == 0) {
log.info("Found a gap in data_id from {} to {}. Skipping it because there are no pending transactions in the database.",
dataGap.getStartId(), dataGap.getEndId());
if (dataGap.getStartId() == dataGap.getEndId()) {
log.info(
"Found a gap in data_id at {}. Skipping it because there are no pending transactions in the database",
dataGap.getStartId());
} else {
log.info(
"Found a gap in data_id from {} to {}. Skipping it because there are no pending transactions in the database",
dataGap.getStartId(), dataGap.getEndId());
}
dataService.updateDataGap(dataGap, DataGap.Status.SK);
}
}
} else if (isDataGapExpired(dataGap.getEndId() + 1)) {
log.info("Found a gap in data_id from {} to {}. Skipping it because the gap expired.", dataGap.getStartId(),
dataGap.getEndId());
if (dataGap.getStartId() == dataGap.getEndId()) {
log.info(
"Found a gap in data_id at {}. Skipping it because the gap expired",
dataGap.getStartId());
} else {
log.info(
"Found a gap in data_id from {} to {}. Skipping it because the gap expired",
dataGap.getStartId(), dataGap.getEndId());
}
dataService.updateDataGap(dataGap, DataGap.Status.SK);
}
} else {
Expand Down
Expand Up @@ -182,13 +182,13 @@ stream.to.file.enabled=true
# entire payload of the synchronization will be buffered in memory up to this number (at
# which point it will be written and continue to stream to disk.)
#
# DatabaseOverridable: false
# DatabaseOverridable: true
# Tags: transport
stream.to.file.threshold.bytes=32767

# If stream.to.file.enabled is true, then this is how long a file will be retained in the
# staging directory after it has been marked as done.
# DatabaseOverridable: false
# DatabaseOverridable: true
# Tags: transport
stream.to.file.ttl.ms=3600000

Expand Down
Expand Up @@ -231,4 +231,8 @@ public Map<String, String> toColumnNameValuePairs(Table table, String key) {
return new HashMap<String, String>(0);
}
}

public boolean requiresTable() {
return dataEventType != null && dataEventType != DataEventType.CREATE;
}
}
Expand Up @@ -28,18 +28,18 @@
import java.io.Reader;
import java.io.Writer;

import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.csv.CsvReader;
import org.jumpmind.symmetric.csv.CsvWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class CsvUtils {

static final Logger log = LoggerFactory.getLogger(CsvUtils.class);

public static final String DELIMITER = ", ";

public static final String LINE_SEPARATOR = System.getProperty("line.separator");

public static CsvReader getCsvReader(Reader reader) {
Expand All @@ -52,7 +52,8 @@ public static CsvReader getCsvReader(Reader reader) {
public static String[] tokenizeCsvData(String csvData) {
String[] tokens = null;
if (csvData != null) {
InputStreamReader reader = new InputStreamReader(new ByteArrayInputStream(csvData.getBytes()));
InputStreamReader reader = new InputStreamReader(new ByteArrayInputStream(
csvData.getBytes()));
CsvReader csvReader = getCsvReader(reader);
try {
if (csvReader.readRecord()) {
Expand All @@ -76,11 +77,11 @@ public static String escapeCsvData(String data) {
}
return out.toString();
}

public static String escapeCsvData(String[] data) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
CsvWriter writer = new CsvWriter(new OutputStreamWriter(out), ',');
writer.setEscapeMode(CsvWriter.ESCAPE_MODE_BACKSLASH);
writer.setEscapeMode(CsvWriter.ESCAPE_MODE_BACKSLASH);
for (String s : data) {
try {
writer.write(s);
Expand All @@ -91,31 +92,40 @@ public static String escapeCsvData(String[] data) {
return out.toString();
}

public static int write(Writer writer, String... data) throws IOException {
StringBuilder buffer = new StringBuilder();
for (String string : data) {
buffer.append(string);
}
public static int write(Writer writer, String... data) {
try {
StringBuilder buffer = new StringBuilder();
for (String string : data) {
buffer.append(string);
}

writer.write(buffer.toString());
if (log.isDebugEnabled()) {
log.debug(buffer.toString());
writer.write(buffer.toString());
if (log.isDebugEnabled()) {
log.debug(buffer.toString());
}
return buffer.length();
} catch (IOException ex) {
throw new IoException(ex);
}
return buffer.length();
}

public static void writeSql(String sql, Writer writer) throws IOException {
public static void writeSql(String sql, Writer writer) {
write(writer, CsvConstants.SQL, DELIMITER, sql);
writeLineFeed(writer);
}
public static void writeBsh(String script, Writer writer) throws IOException {

public static void writeBsh(String script, Writer writer) {
write(writer, CsvConstants.BSH, DELIMITER, script);
writeLineFeed(writer);
writeLineFeed(writer);
}

public static void writeLineFeed(Writer writer) throws IOException {
writer.write(LINE_SEPARATOR);

public static void writeLineFeed(Writer writer) {
try {
writer.write(LINE_SEPARATOR);
} catch (IOException ex) {
throw new IoException(ex);
}

}

}
Expand Up @@ -152,7 +152,7 @@ protected int forEachDataInTable(DataContext context, boolean processTable, Batc
batch.incrementDataReadMillis(batch.endTimer(STAT_READ_DATA));
if (currentData != null) {
dataRow++;
if (processTable) {
if (processTable || !currentData.requiresTable()) {
try {
batch.startTimer(STAT_WRITE_DATA);
batch.incrementLineCount();
Expand Down
Expand Up @@ -34,8 +34,9 @@
import org.slf4j.LoggerFactory;

public class ProtocolDataReader implements IDataReader {

public static final String CTX_LINE_NUMBER = ProtocolDataReader.class.getSimpleName() + ".lineNumber";

public static final String CTX_LINE_NUMBER = ProtocolDataReader.class.getSimpleName()
+ ".lineNumber";

protected Logger log = LoggerFactory.getLogger(getClass());

Expand Down Expand Up @@ -90,7 +91,7 @@ public ProtocolDataReader(File file) {
throw new IoException(ex);
}
}

public IStagedResource getStagedResource() {
return stagedResource;
}
Expand Down Expand Up @@ -138,9 +139,11 @@ protected Object readNext() {
} else if (tokens[0].equals(CsvConstants.CHANNEL)) {
this.channelId = tokens[1];
} else if (tokens[0].equals(CsvConstants.SCHEMA)) {
schemaName = StringUtils.isBlank(tokens[1]) ? null : tokens[1];
schemaName = tokens.length == 1 || StringUtils.isBlank(tokens[1]) ? null
: tokens[1];
} else if (tokens[0].equals(CsvConstants.CATALOG)) {
catalogName = StringUtils.isBlank(tokens[1]) ? null : tokens[1];
catalogName = tokens.length == 1 || StringUtils.isBlank(tokens[1]) ? null
: tokens[1];
} else if (tokens[0].equals(CsvConstants.TABLE)) {
String tableName = tokens[1];
table = tables.get(Table.getFullyQualifiedTableName(catalogName, schemaName,
Expand Down Expand Up @@ -181,8 +184,8 @@ protected Object readNext() {
data.setDataEventType(DataEventType.UPDATE);
data.putParsedData(CsvData.ROW_DATA,
CollectionUtils.copyOfRange(tokens, 1, table.getColumnCount() + 1));
data.putParsedData(CsvData.PK_DATA,
CollectionUtils.copyOfRange(tokens, table.getColumnCount() + 1, tokens.length));
data.putParsedData(CsvData.PK_DATA, CollectionUtils.copyOfRange(tokens,
table.getColumnCount() + 1, tokens.length));
data.putParsedData(CsvData.OLD_DATA, parsedOldData);
return data;
} else if (tokens[0].equals(CsvConstants.DELETE)) {
Expand All @@ -195,17 +198,17 @@ protected Object readNext() {
} else if (tokens[0].equals(CsvConstants.SQL)) {
CsvData data = new CsvData();
data.setDataEventType(DataEventType.SQL);
data.putCsvData(CsvData.ROW_DATA, tokens[1]);
data.putParsedData(CsvData.ROW_DATA, new String[] {tokens[1]});
return data;
} else if (tokens[0].equals(CsvConstants.BSH)) {
CsvData data = new CsvData();
data.setDataEventType(DataEventType.BSH);
data.putCsvData(CsvData.ROW_DATA, tokens[1]);
data.putParsedData(CsvData.ROW_DATA, new String[] {tokens[1]});
return data;
} else if (tokens[0].equals(CsvConstants.CREATE)) {
CsvData data = new CsvData();
data.setDataEventType(DataEventType.CREATE);
data.putCsvData(CsvData.ROW_DATA, tokens[1]);
data.putParsedData(CsvData.ROW_DATA, new String[] {tokens[1]});
return data;
} else {
log.info("Unable to handle unknown csv values: " + Arrays.toString(tokens));
Expand Down Expand Up @@ -277,11 +280,11 @@ public void close() {
if (csvReader != null) {
csvReader.close();
}

if (stagedResource != null) {
stagedResource.close();
}

}

public Map<Batch, Statistics> getStatistics() {
Expand Down
Expand Up @@ -14,7 +14,6 @@
import org.jumpmind.db.io.DatabaseIO;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Database;
import org.jumpmind.db.model.ModelException;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.DmlStatement;
Expand Down Expand Up @@ -550,7 +549,7 @@ protected void logFailure(SqlException e, CsvData data) {
protected boolean script(CsvData data) {
try {
statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS);
String script = data.getCsvData(CsvData.ROW_DATA);
String script = data.getParsedData(CsvData.ROW_DATA)[0];
Map<String, Object> variables = new HashMap<String, Object>();
variables.put("SOURCE_NODE_ID", batch.getNodeId());
ISqlTemplate template = platform.getSqlTemplate();
Expand Down Expand Up @@ -581,23 +580,20 @@ protected boolean script(CsvData data) {
}

protected boolean create(CsvData data) {
String xml = null;
try {
try {
statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS);
String xml = data.getCsvData(CsvData.ROW_DATA);
if (log.isDebugEnabled()) {
log.debug("About to create table using the following definition: ", xml);
}
StringReader reader = new StringReader(xml);
Database db = (Database) new DatabaseIO().read(reader);
platform.alterTables(false, db.getTables());
platform.resetCachedTableModel();
statistics.get(batch).increment(DataWriterStatisticConstants.CREATECOUNT);
return true;
} catch (Exception e) {
throw new ModelException(e);
}

statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS);
xml = data.getParsedData(CsvData.ROW_DATA)[0];
log.info("About to create table using the following definition: ", xml);
StringReader reader = new StringReader(xml);
Database db = (Database) new DatabaseIO().read(reader);
platform.alterTables(false, db.getTables());
platform.resetCachedTableModel();
statistics.get(batch).increment(DataWriterStatisticConstants.CREATECOUNT);
return true;
} catch (RuntimeException ex) {
log.error("Failed to alter table using the following xml: {}", xml);
throw ex;
} finally {
statistics.get(batch).stopTimer(DataWriterStatisticConstants.DATABASEMILLIS);
}
Expand All @@ -607,7 +603,7 @@ protected boolean create(CsvData data) {
protected boolean sql(CsvData data) {
try {
statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS);
String sql = data.getCsvData(CsvData.ROW_DATA);
String sql = data.getParsedData(CsvData.ROW_DATA)[0];
transaction.prepare(sql);
log.info("About to run: {}", sql);
long count = transaction.prepareAndExecute(sql);
Expand Down

0 comments on commit f2f8c99

Please sign in to comment.