Skip to content

Commit

Permalink
internal cleanup
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 357245049
  • Loading branch information
dhercher authored and cloud-teleport committed Feb 12, 2021
1 parent 4e90531 commit c8d3404
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 11 deletions.
Expand Up @@ -117,28 +117,38 @@ public FailsafeElement<String, String> apply(GenericRecord record) {
}

// General DataStream Metadata
String sourceType = getSourceType(record);

outputObject.put("_metadata_stream", getStreamName(record));
outputObject.put("_metadata_timestamp", getSourceTimestamp(record));
outputObject.put("_metadata_read_timestamp", getMetadataTimestamp(record));
outputObject.put("_metadata_read_method", record.get("read_method").toString());
outputObject.put("_metadata_source_type", sourceType);

// Source Specific Metadata
outputObject.put("_metadata_deleted", getMetadataIsDeleted(record));
outputObject.put("_metadata_schema", getMetadataSchema(record));
outputObject.put("_metadata_table", getMetadataTable(record));
outputObject.put("_metadata_change_type", getMetadataChangeType(record));

// Oracle Specific Metadata
outputObject.put("_metadata_row_id", getOracleRowId(record));
outputObject.put("_metadata_scn", getOracleScn(record));
outputObject.put("_metadata_ssn", getOracleSsn(record));
outputObject.put("_metadata_rs_id", getOracleRsId(record));
outputObject.put("_metadata_tx_id", getOracleTxId(record));

if (sourceType.equals("mysql")) {
// MySQL Specific Metadata
outputObject.put("_metadata_schema", getMetadataDatabase(record));
outputObject.put("_metadata_log_file", getSourceMetadata(record, "log_file"));
outputObject.put("_metadata_log_position", getSourceMetadata(record, "log_position"));
} else {
// Oracle Specific Metadata
outputObject.put("_metadata_schema", getMetadataSchema(record));
outputObject.put("_metadata_row_id", getOracleRowId(record));
outputObject.put("_metadata_scn", getOracleScn(record));
outputObject.put("_metadata_ssn", getOracleSsn(record));
outputObject.put("_metadata_rs_id", getOracleRsId(record));
outputObject.put("_metadata_tx_id", getOracleTxId(record));
}
// Hash columns supplied to be hashed
applyHashToColumns(record, outputObject);

// All Raw Metadata
outputObject.put("_metadata_source", getSourceMetadata(record));
outputObject.put("_metadata_source", getSourceMetadataJson(record));

return FailsafeElement.of(outputObject.toString(), outputObject.toString());
}
Expand All @@ -159,7 +169,7 @@ private ObjectNode getLowerCaseObject(ObjectNode outputObject) {
return loweredOutputObject;
}

private JsonNode getSourceMetadata(GenericRecord record) {
private JsonNode getSourceMetadataJson(GenericRecord record) {
ObjectMapper mapper = new ObjectMapper();
JsonNode dataInput;
try {
Expand All @@ -178,6 +188,12 @@ private String getStreamName(GenericRecord record) {
return this.streamName;
}

private String getSourceType(GenericRecord record) {
String sourceType = record.get("read_method").toString().split("-")[0];
// TODO: consider validating the value is mysql or oracle
return sourceType;
}

private long getMetadataTimestamp(GenericRecord record) {
long unixTimestampMilli = (long) record.get("read_timestamp");
return unixTimestampMilli / 1000;
Expand All @@ -190,10 +206,22 @@ private long getSourceTimestamp(GenericRecord record) {
return unixTimestampSec;
}

private String getSourceMetadata(GenericRecord record, String fieldName) {
if (((GenericRecord) record.get("source_metadata")).get(fieldName) != null) {
return ((GenericRecord) record.get("source_metadata")).get(fieldName).toString();
}

return null;
}

private String getMetadataSchema(GenericRecord record) {
return ((GenericRecord) record.get("source_metadata")).get("schema").toString();
}

private String getMetadataDatabase(GenericRecord record) {
return ((GenericRecord) record.get("source_metadata")).get("database").toString();
}

private String getMetadataTable(GenericRecord record) {
return ((GenericRecord) record.get("source_metadata")).get("table").toString();
}
Expand Down
Expand Up @@ -43,10 +43,12 @@ public class FormatDatastreamRecordToJsonTest {
+ "\"_metadata_stream\":\"projects/596161805475/locations/us-central1/streams/dylan-stream-20200810test2\","
+ "\"_metadata_timestamp\":1597101230,"
+ "\"_metadata_read_timestamp\":1597101230,"
+ "\"_metadata_read_method\":\"oracle_dump\","
+ "\"_metadata_source_type\":\"oracle_dump\","
+ "\"_metadata_deleted\":false,"
+ "\"_metadata_schema\":\"HR\","
+ "\"_metadata_table\":\"LOCATIONS\","
+ "\"_metadata_change_type\":null,"
+ "\"_metadata_schema\":\"HR\","
+ "\"_metadata_row_id\":\"AAAEALAAEAAAACdAAB\","
+ "\"_metadata_scn\":null,"
+ "\"_metadata_ssn\":null,"
Expand Down

0 comments on commit c8d3404

Please sign in to comment.