diff --git a/flink-cyber/flink-alert-scoring-api/src/main/java/com/cloudera/cyber/scoring/ScoredMessage.java b/flink-cyber/flink-alert-scoring-api/src/main/java/com/cloudera/cyber/scoring/ScoredMessage.java index b2ce2b6a2..f38eaab00 100644 --- a/flink-cyber/flink-alert-scoring-api/src/main/java/com/cloudera/cyber/scoring/ScoredMessage.java +++ b/flink-cyber/flink-alert-scoring-api/src/main/java/com/cloudera/cyber/scoring/ScoredMessage.java @@ -14,6 +14,7 @@ import com.cloudera.cyber.IdentifiedMessage; import com.cloudera.cyber.Message; +import com.cloudera.cyber.avro.AvroSchemas; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -22,7 +23,6 @@ import lombok.ToString; import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; -import org.apache.avro.SchemaBuilder; import org.apache.avro.specific.SpecificRecord; import org.apache.avro.specific.SpecificRecordBase; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -54,8 +54,7 @@ public long getTs() { return message.getTs(); } - public static final Schema SCHEMA$ = SchemaBuilder.record(ScoredMessage.class.getName()) - .namespace(ScoredMessage.class.getPackage().getName()) + public static final Schema SCHEMA$ = AvroSchemas.createRecordBuilder(ScoredMessage.class.getPackage().getName(), ScoredMessage.class.getName()) .fields() .name("message").type(Message.SCHEMA$).noDefault() .name("cyberScoresDetails").type(Schema.createArray(Scores.SCHEMA$)).noDefault() diff --git a/flink-cyber/flink-cyber-api/src/main/java/com/cloudera/cyber/Message.java b/flink-cyber/flink-cyber-api/src/main/java/com/cloudera/cyber/Message.java index e871a85bb..44e5dc8c1 100644 --- a/flink-cyber/flink-cyber-api/src/main/java/com/cloudera/cyber/Message.java +++ b/flink-cyber/flink-cyber-api/src/main/java/com/cloudera/cyber/Message.java @@ -12,6 +12,7 @@ package com.cloudera.cyber; +import com.cloudera.cyber.avro.AvroSchemas; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -52,7 +53,7 @@ public class Message extends SpecificRecordBase implements SpecificRecord, Ident @NonNull private String source; private List dataQualityMessages; - public static final Schema SCHEMA$ = SchemaBuilder.record(Message.class.getName()).namespace(Message.class.getPackage().getName()) + public static final Schema SCHEMA$ = AvroSchemas.createRecordBuilder(Message.class.getPackage().getName(), Message.class.getName()) .fields() .requiredString("id") .requiredLong("ts") diff --git a/flink-cyber/flink-cyber-api/src/main/java/com/cloudera/cyber/avro/AvroSchemas.java b/flink-cyber/flink-cyber-api/src/main/java/com/cloudera/cyber/avro/AvroSchemas.java new file mode 100644 index 000000000..9bec6f190 --- /dev/null +++ b/flink-cyber/flink-cyber-api/src/main/java/com/cloudera/cyber/avro/AvroSchemas.java @@ -0,0 +1,12 @@ +package com.cloudera.cyber.avro; + +import org.apache.avro.SchemaBuilder; + +public class AvroSchemas { + + public static SchemaBuilder.RecordBuilder createRecordBuilder(String namespace, String recordName) { + return SchemaBuilder.record(recordName).namespace(namespace) + .prop("ssb.rowtimeAttribute", "ts") + .prop("ssb.watermarkExpression", "`ts` - INTERVAL '30' SECOND"); + } +} diff --git a/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml b/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml index d89ea2012..76f345fdf 100644 --- a/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml +++ b/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml @@ -104,6 +104,14 @@ ${hive.version} compile + + org.codehaus.janino + commons-compiler + + + org.codehaus.janino + janino + org.apache.calcite * @@ -116,6 +124,106 @@ commons-cli commons-cli + + org.apache.hive + hive-vector-code-gen + + + org.apache.hive + hive-llap-tez + + + org.apache.hive + hive-shims + + + commons-codec + commons-codec + + + commons-httpclient + commons-httpclient + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.antlr + antlr-runtime + + + org.antlr + ST4 + + + org.apache.ant + ant + + + org.apache.commons + commons-compress + + + org.apache.ivy + ivy + + + org.apache.zookeeper + zookeeper + + + org.apache.curator + apache-curator + + + org.apache.curator + curator-framework + + + org.codehaus.groovy + groovy-all + + + org.apache.calcite + calcite-core + + + org.apache.calcite + calcite-druid + + + org.apache.calcite.avatica + avatica + + + org.apache.calcite + calcite-avatica + + + com.google.code.gson + gson + + + stax + stax-api + + + com.google.guava + guava + + + log4j + log4j + + + log4j + apache-log4j-extras + + + org.slf4j + slf4j-log4j12 + diff --git a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJob.java b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJob.java index c2243f0a2..c9e8116d5 100644 --- a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJob.java +++ b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJob.java @@ -172,7 +172,8 @@ protected TableDescriptor.Builder fillTableOptions(TableDescriptor.Builder build .option("partition.time-extractor.timestamp-pattern", "$dt $hr:00:00") .option("sink.partition-commit.trigger", "process-time") .option("sink.partition-commit.delay", "1 h") - .option("sink.partition-commit.policy.kind", "metastore,success-file"); + .option("sink.partition-commit.policy.kind", "metastore,success-file") + .option("hive.storage.file-format", "orc"); } private StreamTableEnvironment getTableEnvironment() { diff --git a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/util/AvroSchemaUtil.java b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/util/AvroSchemaUtil.java index 563d92288..413638750 100644 --- a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/util/AvroSchemaUtil.java +++ b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/util/AvroSchemaUtil.java @@ -1,5 +1,6 @@ package com.cloudera.cyber.indexing.hive.util; +import com.cloudera.cyber.avro.AvroSchemas; import com.cloudera.cyber.indexing.TableColumnDto; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; @@ -28,7 +29,8 @@ public static Schema convertToAvro(List tableColumnList) { //method that converts from flink Schema to avro Schema public static Schema convertToAvro(ResolvedSchema schema) { - SchemaBuilder.FieldAssembler fieldAssembler = SchemaBuilder.record("base").fields(); + SchemaBuilder.FieldAssembler fieldAssembler = AvroSchemas.createRecordBuilder("com.cloudera.cyber","base") + .fields(); for (Column col : schema.getColumns()) { fieldAssembler = fieldAssembler.name(col.getName()).type().optional().type(AvroSchemaUtil.convertTypeToAvro(col.getName(), col.getDataType().getLogicalType())); @@ -43,7 +45,11 @@ public static void putRowIntoAvro(Row row, GenericRecord record, String fieldNam if (row == null) { value = null; } else { + try { value = convertToAvroObject(record.getSchema().getField(avroFieldName).schema(), row.getField(fieldName)); + } catch (Exception e) { + throw new RuntimeException(String.format("Error converting avro field %s", avroFieldName), e); + } } record.put(avroFieldName, value); System.out.println("fieldName: " + fieldName + " value: " + value);