Skip to content

Commit

Permalink
0001811: Add new transform types that will put the source catalog, sc…
Browse files Browse the repository at this point in the history
…hema or table names into a column
  • Loading branch information
chenson42 committed Jul 15, 2014
1 parent 3a9d354 commit 92dd5fa
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 0 deletions.
@@ -0,0 +1,41 @@
package org.jumpmind.symmetric.io.data.transform;

import java.util.Map;

import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.reader.ExtractDataReader;
import org.jumpmind.symmetric.model.Data;

public class SourceCatalogNameColumnTransform implements ISingleNewAndOldValueColumnTransform,
IBuiltInExtensionPoint {

@Override
public String getName() {
return "source_catalog_name";
}

@Override
public NewAndOldValue transform(IDatabasePlatform platform, DataContext context,
TransformColumn column, TransformedData data, Map<String, String> sourceValues,
String newValue, String oldValue) throws IgnoreColumnException, IgnoreRowException {
NewAndOldValue value = new NewAndOldValue();
Data csvData = (Data)context.get(ExtractDataReader.DATA_CONTEXT_CURRENT_CSV_DATA);
if (csvData != null && csvData.getTriggerHistory() != null) {
value.setNewValue(csvData.getTriggerHistory().getSourceCatalogName());
}
return value;
}

@Override
public boolean isExtractColumnTransform() {
return true;
}

@Override
public boolean isLoadColumnTransform() {
return false;
}

}
@@ -0,0 +1,41 @@
package org.jumpmind.symmetric.io.data.transform;

import java.util.Map;

import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.reader.ExtractDataReader;
import org.jumpmind.symmetric.model.Data;

public class SourceSchemaNameColumnTransform implements ISingleNewAndOldValueColumnTransform,
IBuiltInExtensionPoint {

@Override
public String getName() {
return "source_schema_name";
}

@Override
public NewAndOldValue transform(IDatabasePlatform platform, DataContext context,
TransformColumn column, TransformedData data, Map<String, String> sourceValues,
String newValue, String oldValue) throws IgnoreColumnException, IgnoreRowException {
NewAndOldValue value = new NewAndOldValue();
Data csvData = (Data)context.get(ExtractDataReader.DATA_CONTEXT_CURRENT_CSV_DATA);
if (csvData != null && csvData.getTriggerHistory() != null) {
value.setNewValue(csvData.getTriggerHistory().getSourceSchemaName());
}
return value;
}

@Override
public boolean isExtractColumnTransform() {
return true;
}

@Override
public boolean isLoadColumnTransform() {
return false;
}

}
@@ -0,0 +1,41 @@
package org.jumpmind.symmetric.io.data.transform;

import java.util.Map;

import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.reader.ExtractDataReader;
import org.jumpmind.symmetric.model.Data;

public class SourceTableNameColumnTransform implements ISingleNewAndOldValueColumnTransform,
IBuiltInExtensionPoint {

@Override
public String getName() {
return "source_table_name";
}

@Override
public NewAndOldValue transform(IDatabasePlatform platform, DataContext context,
TransformColumn column, TransformedData data, Map<String, String> sourceValues,
String newValue, String oldValue) throws IgnoreColumnException, IgnoreRowException {
NewAndOldValue value = new NewAndOldValue();
Data csvData = (Data)context.get(ExtractDataReader.DATA_CONTEXT_CURRENT_CSV_DATA);
if (csvData != null && csvData.getTriggerHistory() != null) {
value.setNewValue(csvData.getTriggerHistory().getSourceTableName());
}
return value;
}

@Override
public boolean isExtractColumnTransform() {
return true;
}

@Override
public boolean isLoadColumnTransform() {
return false;
}

}
Expand Up @@ -36,6 +36,9 @@
import org.jumpmind.symmetric.io.data.transform.DeleteAction;
import org.jumpmind.symmetric.io.data.transform.IColumnTransform;
import org.jumpmind.symmetric.io.data.transform.ParameterColumnTransform;
import org.jumpmind.symmetric.io.data.transform.SourceCatalogNameColumnTransform;
import org.jumpmind.symmetric.io.data.transform.SourceSchemaNameColumnTransform;
import org.jumpmind.symmetric.io.data.transform.SourceTableNameColumnTransform;
import org.jumpmind.symmetric.io.data.transform.TransformColumn;
import org.jumpmind.symmetric.io.data.transform.TransformColumn.IncludeOnType;
import org.jumpmind.symmetric.io.data.transform.TransformPoint;
Expand Down Expand Up @@ -65,6 +68,9 @@ public TransformService(IParameterService parameterService, ISymmetricDialect sy

columnTransforms = TransformWriter.buildDefaultColumnTransforms();
addColumnTransform(new ParameterColumnTransform(parameterService));
addColumnTransform(new SourceCatalogNameColumnTransform());
addColumnTransform(new SourceSchemaNameColumnTransform());
addColumnTransform(new SourceTableNameColumnTransform());

setSqlMap(new TransformServiceSqlMap(symmetricDialect.getPlatform(),
createSqlReplacementTokens()));
Expand Down
Expand Up @@ -46,6 +46,8 @@
public class ExtractDataReader implements IDataReader {

protected static final Logger log = LoggerFactory.getLogger(ExtractDataReader.class);

public static final String DATA_CONTEXT_CURRENT_CSV_DATA = CsvData.class.getName();

protected Map<Batch, Statistics> statistics = new HashMap<Batch, Statistics>();

Expand All @@ -60,6 +62,8 @@ public class ExtractDataReader implements IDataReader {
protected Table table;

protected CsvData data;

protected DataContext dataContext;

public ExtractDataReader(IDatabasePlatform platform, IExtractDataReaderSource source) {
this.sourcesToUse = new ArrayList<IExtractDataReaderSource>();
Expand All @@ -73,6 +77,7 @@ public ExtractDataReader(IDatabasePlatform platform, List<IExtractDataReaderSour
}

public void open(DataContext context) {
this.dataContext = context;
}

public Batch nextBatch() {
Expand Down Expand Up @@ -125,6 +130,7 @@ public CsvData nextData() {

CsvData dataToReturn = this.data;
this.data = null;
this.dataContext.put(DATA_CONTEXT_CURRENT_CSV_DATA, dataToReturn);
return dataToReturn;
}

Expand Down

0 comments on commit 92dd5fa

Please sign in to comment.