Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import com.cloudera.cyber.IdentifiedMessage;
import com.cloudera.cyber.Message;
import com.cloudera.cyber.avro.AvroSchemas;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
Expand All @@ -22,7 +23,6 @@
import lombok.ToString;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand Down Expand Up @@ -54,8 +54,7 @@ public long getTs() {
return message.getTs();
}

public static final Schema SCHEMA$ = SchemaBuilder.record(ScoredMessage.class.getName())
.namespace(ScoredMessage.class.getPackage().getName())
public static final Schema SCHEMA$ = AvroSchemas.createRecordBuilder(ScoredMessage.class.getPackage().getName(), ScoredMessage.class.getName())
.fields()
.name("message").type(Message.SCHEMA$).noDefault()
.name("cyberScoresDetails").type(Schema.createArray(Scores.SCHEMA$)).noDefault()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

package com.cloudera.cyber;

import com.cloudera.cyber.avro.AvroSchemas;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
Expand Down Expand Up @@ -52,7 +53,7 @@ public class Message extends SpecificRecordBase implements SpecificRecord, Ident
@NonNull private String source;
private List<DataQualityMessage> dataQualityMessages;

public static final Schema SCHEMA$ = SchemaBuilder.record(Message.class.getName()).namespace(Message.class.getPackage().getName())
public static final Schema SCHEMA$ = AvroSchemas.createRecordBuilder(Message.class.getPackage().getName(), Message.class.getName())
.fields()
.requiredString("id")
.requiredLong("ts")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.cloudera.cyber.avro;

import org.apache.avro.SchemaBuilder;

public class AvroSchemas {

public static SchemaBuilder.RecordBuilder<org.apache.avro.Schema> createRecordBuilder(String namespace, String recordName) {
return SchemaBuilder.record(recordName).namespace(namespace)
.prop("ssb.rowtimeAttribute", "ts")
.prop("ssb.watermarkExpression", "`ts` - INTERVAL '30' SECOND");
}
}
108 changes: 108 additions & 0 deletions flink-cyber/flink-indexing/flink-indexing-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@
<version>${hive.version}</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>*</artifactId>
Expand All @@ -116,6 +124,106 @@
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-vector-code-gen</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-llap-tez</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-shims</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.antlr</groupId>
<artifactId>antlr-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.antlr</groupId>
<artifactId>ST4</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.ant</groupId>
<artifactId>ant</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.ivy</groupId>
<artifactId>ivy</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>apache-curator</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-druid</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-avatica</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</exclusion>
<exclusion>
<groupId>stax</groupId>
<artifactId>stax-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>apache-log4j-extras</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ protected TableDescriptor.Builder fillTableOptions(TableDescriptor.Builder build
.option("partition.time-extractor.timestamp-pattern", "$dt $hr:00:00")
.option("sink.partition-commit.trigger", "process-time")
.option("sink.partition-commit.delay", "1 h")
.option("sink.partition-commit.policy.kind", "metastore,success-file");
.option("sink.partition-commit.policy.kind", "metastore,success-file")
.option("hive.storage.file-format", "orc");
}

private StreamTableEnvironment getTableEnvironment() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.cloudera.cyber.indexing.hive.util;

import com.cloudera.cyber.avro.AvroSchemas;
import com.cloudera.cyber.indexing.TableColumnDto;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -28,7 +29,8 @@ public static Schema convertToAvro(List<TableColumnDto> tableColumnList) {

//method that converts from flink Schema to avro Schema
public static Schema convertToAvro(ResolvedSchema schema) {
SchemaBuilder.FieldAssembler<Schema> fieldAssembler = SchemaBuilder.record("base").fields();
SchemaBuilder.FieldAssembler<Schema> fieldAssembler = AvroSchemas.createRecordBuilder("com.cloudera.cyber","base")
.fields();

for (Column col : schema.getColumns()) {
fieldAssembler = fieldAssembler.name(col.getName()).type().optional().type(AvroSchemaUtil.convertTypeToAvro(col.getName(), col.getDataType().getLogicalType()));
Expand All @@ -43,7 +45,11 @@ public static void putRowIntoAvro(Row row, GenericRecord record, String fieldNam
if (row == null) {
value = null;
} else {
try {
value = convertToAvroObject(record.getSchema().getField(avroFieldName).schema(), row.getField(fieldName));
} catch (Exception e) {
throw new RuntimeException(String.format("Error converting avro field %s", avroFieldName), e);
}
}
record.put(avroFieldName, value);
System.out.println("fieldName: " + fieldName + " value: " + value);
Expand Down