Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ddb1197
cyb-4 temp
stas-panasiuk Mar 2, 2023
9a71819
cyb-4 finished main storage + fields logic
stas-panasiuk Mar 9, 2023
579967b
cyb-4 partition commit based on the system time with 1h frequency
stas-panasiuk Mar 15, 2023
d8a6b46
cyb-4 cleanup
stas-panasiuk Mar 15, 2023
3a26c4c
CYB-137 Added index mapping parser UI tab that provides data for fiel…
stas-panasiuk Mar 17, 2023
1f633ea
CYB-137 cleanup
stas-panasiuk Mar 17, 2023
61d267a
CYB-137 cleanup #2
stas-panasiuk Mar 17, 2023
fd2955d
CYB-4 moved DTOs to higher level dependency
stas-panasiuk Mar 20, 2023
adf9f6f
CYB-4 indexing DTOs
stas-panasiuk Mar 20, 2023
333bd51
CYB-137 indexing file and source selection implemented
stas-panasiuk Mar 22, 2023
40f1b8e
CYB-137 removed unnecessary sourceList variable
stas-panasiuk Mar 22, 2023
0ba0ece
CYB-4 hive.writer->flink.writer
stas-panasiuk Apr 3, 2023
81ab197
CYB-4 switched table creation from SQL to avoid hive dialect
stas-panasiuk May 2, 2023
e83d421
CYB-4 added support for Kafka connector
stas-panasiuk May 8, 2023
c943c91
fixed index properties file
stas-panasiuk May 8, 2023
7eac6df
Kafka table creation fix + insert logs
stas-panasiuk May 8, 2023
0db248a
Kafka table creation fix #2
stas-panasiuk May 8, 2023
023dea4
Merge branch 'develop' into cyb-4
stas-panasiuk May 9, 2023
cc3edd4
Merge branch 'CYB-137' into cyb-4-137
stas-panasiuk Jun 12, 2023
bab003e
Merge branch 'develop' into cyb-4-137
stas-panasiuk Jun 12, 2023
2a7cbd7
CYB-4 merge fix
stas-panasiuk Jun 14, 2023
0d928ef
CYB-4 index properties comments added
stas-panasiuk Jun 19, 2023
d3ab953
[CYB-4] Added readme for the new logic
stas-panasiuk Jun 20, 2023
3f4e16f
Merge branch 'develop' into cyb-4-137
stas-panasiuk Jun 22, 2023
f363082
Merge branch 'develop' into cyb-4-137
stas-panasiuk Jul 3, 2023
41dfd7a
[CYB-4] review fixes
stas-panasiuk Jul 3, 2023
4e3b200
[CYB-4] git bug fix
stas-panasiuk Jul 5, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@ hive.catalog=hive
hive.schema=cyber
hive.table=events

#flink.writer=tableapi

#Uncomment one of the following lines in case flink.writer=tableapi is specified
#flink.output-connector=hive
#flink.output-connector=kafka

#Uncomment following two lines in case flink.writer=tableapi is specified
#flink.tables-init-file=PIPELINE/index/table-config.json
#flink.mapping-file=PIPELINE/index/mapping-config.json

#hive.confdir=/etc/hive/conf/

#hive.transaction.messages=500
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"squid": {
"table_name": "hive_table",
"ignore_fields": [
"code"
],
"column_mapping": [
{
"name": "full_hostname"
},
{
"name": "action"
}
]
},
"test": {
"table_name": "another_hive_table",
"column_mapping": [
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"hive_table": [
{
"name": "full_hostname",
"type": "string"
},
{
"name": "action",
"type": "string"
}
],
"another_hive_table": [
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fi
pipeline=$1
index_dir=$pipeline/index

mkdir -p $index_dir
mkdir -p $index_dir/conf
for FILE in $TEMPLATES_DIR/index/*; do
sed -e s/PIPELINE/$pipeline/g $FILE > $index_dir/${FILE##*/}
done
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
* Copyright 2020 - 2022 Cloudera. All Rights Reserved.
*
* This file is licensed under the Apache License Version 2.0 (the "License"). You may not use this file
* except in compliance with the License. You may obtain a copy of the License at
* This file is licensed under the Apache License Version 2.0 (the "License"). You may not use this file
* except in compliance with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. Refer to the License for the specific permissions and
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. Refer to the License for the specific permissions and
* limitations governing your use of the file.
*/

Expand All @@ -25,6 +25,9 @@
import org.apache.avro.SchemaBuilder;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;

import java.util.List;

Expand Down Expand Up @@ -59,6 +62,18 @@ public long getTs() {
.optionalDouble("cyberScore")
.endRecord();

public static final TypeInformation<Row> FLINK_TYPE_INFO = Types.ROW_NAMED(
new String[]{"message", "cyberScoresDetails", "cyberScore"},
Message.FLINK_TYPE_INFO, Types.OBJECT_ARRAY(Scores.FLINK_TYPE_INFO), Types.DOUBLE);

public Row toRow() {
return Row.of(message == null ? null : message.toRow(),
cyberScoresDetails == null ? null : cyberScoresDetails.stream()
.map(Scores::toRow)
.toArray(Row[]::new),
cyberScore);
}

@Override
public Schema getSchema() {
return SCHEMA$;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
* Copyright 2020 - 2022 Cloudera. All Rights Reserved.
*
* This file is licensed under the Apache License Version 2.0 (the "License"). You may not use this file
* except in compliance with the License. You may obtain a copy of the License at
* This file is licensed under the Apache License Version 2.0 (the "License"). You may not use this file
* except in compliance with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. Refer to the License for the specific permissions and
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. Refer to the License for the specific permissions and
* limitations governing your use of the file.
*/

Expand All @@ -22,8 +22,9 @@
import org.apache.avro.SchemaBuilder;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;

import java.util.UUID;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;

@Data
@Builder
Expand All @@ -42,6 +43,14 @@ public class Scores extends SpecificRecordBase implements SpecificRecord {
.requiredString("reason")
.endRecord();

public static final TypeInformation<Row> FLINK_TYPE_INFO = Types.ROW_NAMED(
new String[]{"ruleId", "score", "reason"},
Types.STRING, Types.DOUBLE, Types.STRING);

public Row toRow() {
return Row.of(ruleId, score, reason);
}

@Override
public Schema getSchema() {
return SCHEMA$;
Expand All @@ -50,20 +59,31 @@ public Schema getSchema() {
@Override
public Object get(int field$) {
switch (field$) {
case 0: return ruleId;
case 1: return score;
case 2: return reason;
default: throw new AvroRuntimeException("Bad index");
case 0:
return ruleId;
case 1:
return score;
case 2:
return reason;
default:
throw new AvroRuntimeException("Bad index");
}
}

@Override
public void put(int field$, Object value$) {
switch (field$) {
case 0: ruleId = value$.toString(); break;
case 1: score = (Double)value$; break;
case 2: reason = value$.toString(); break;
default: throw new AvroRuntimeException("Bad Index");
case 0:
ruleId = value$.toString();
break;
case 1:
score = (Double) value$;
break;
case 2:
reason = value$.toString();
break;
default:
throw new AvroRuntimeException("Bad Index");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

package com.cloudera.cyber.flink;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.Resources;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;

Expand All @@ -34,8 +37,6 @@
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.security.PrivateKey;
Expand Down Expand Up @@ -67,6 +68,8 @@ public class Utils {
public static final String K_KEYSTORE_PASSWORD = "keyStorePassword";
public static final String PASSWORD = "password";

public static final ObjectMapper MAPPER = new ObjectMapper();

public static Properties readProperties(Properties properties, String prefix) {
Properties targetProperties = new Properties();
for (String key : properties.stringPropertyNames()) {
Expand Down Expand Up @@ -162,6 +165,13 @@ public static Map<String, Object> readSchemaRegistryProperties(ParameterTool par
return schemaRegistryConf;
}

private static <T> T jsonToObject(String json, TypeReference<T> typeReference) throws JsonProcessingException {
return MAPPER.readValue(json, typeReference);
}

public static <T> T readResourceFile(String resourceLocation, Class<?> cls, TypeReference<T> typeReference) throws IOException {
return jsonToObject(readResourceFile(resourceLocation, cls), typeReference);
}

public static String readResourceFile(String resourceLocation, Class<?> cls) {
try {
Expand Down Expand Up @@ -271,8 +281,18 @@ public static ParameterTool getParamToolsFromProperties(String[] pathToPropertyF
}


public static <T> T readFile(String path, TypeReference<T> typeReference) throws IOException {
return jsonToObject(readFile(path), typeReference);
}

public static String readFile(String path) throws IOException {
return new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8);
final Path filePath = new Path(path);
final FileSystem fileSystem = filePath.getFileSystem();
try (FSDataInputStream fsDataInputStream = fileSystem.open(filePath)) {
return IOUtils.toString(fsDataInputStream, StandardCharsets.UTF_8);
} catch (Exception e) {
throw new RuntimeException(String.format("Wasn't able to read file [%s]!", filePath), e);
}
}


Expand Down
5 changes: 5 additions & 0 deletions flink-cyber/flink-cyber-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
</dependency>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.flink.api.common.typeinfo.TypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;

@Data
@Builder(toBuilder = true)
Expand All @@ -45,6 +48,14 @@ public class DataQualityMessage extends SpecificRecordBase implements SpecificRe
.requiredString("message")
.endRecord();

public static final TypeInformation<Row> FLINK_TYPE_INFO = Types.ROW_NAMED(
new String[]{"level", "feature", "field", "message"},
Types.STRING, Types.STRING, Types.STRING, Types.STRING);

public Row toRow() {
return Row.of(level, feature, field, message);
}

@Override
public Schema getSchema() {
return SCHEMA$;
Expand All @@ -71,5 +82,4 @@ public void put(int field$, Object value$) {
default: throw new AvroRuntimeException("Bad index");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,25 @@

package com.cloudera.cyber;

import lombok.*;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.avro.util.Utf8;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;

import static com.cloudera.cyber.AvroTypes.toListOf;
import static com.cloudera.cyber.AvroTypes.utf8toStringMap;
Expand Down Expand Up @@ -56,6 +64,31 @@ public class Message extends SpecificRecordBase implements SpecificRecord, Ident
.name("dataQualityMessages").type().optional().type(Schema.createArray(DataQualityMessage.SCHEMA$))
.endRecord();

public static final TypeInformation<Row> FLINK_TYPE_INFO = Types.ROW_NAMED(
new String[]{"id", "ts", "originalSource", "message", "threats", "extensions", "source", "dataQualityMessages"},
Types.STRING, Types.LONG, SignedSourceKey.FLINK_TYPE_INFO,Types.STRING,
Types.MAP(Types.STRING, Types.OBJECT_ARRAY(ThreatIntelligence.FLINK_TYPE_INFO)),
Types.MAP(Types.STRING, Types.STRING), Types.STRING, Types.OBJECT_ARRAY(DataQualityMessage.FLINK_TYPE_INFO));

public Row toRow() {
return Row.of(id,
ts,
originalSource.toRow(),
message,
threats == null ? null : threats.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue().stream()
.map(ThreatIntelligence::toRow)
.toArray(Row[]::new))),
extensions,
source,
dataQualityMessages == null ? null : dataQualityMessages.stream()
.map(DataQualityMessage::toRow)
.toArray(Row[]::new)
);
}

public static Schema getClassSchema() {
return SCHEMA$;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.apache.avro.SchemaBuilder;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;

import java.nio.ByteBuffer;

Expand All @@ -34,6 +37,7 @@ public class SignedSourceKey extends SpecificRecordBase implements SpecificRecor
private int partition;
private long offset;
private byte[] signature;

public static final Schema SCHEMA$ = SchemaBuilder
.record(SignedSourceKey.class.getName())
.namespace(SignedSourceKey.class.getPackage().getName())
Expand All @@ -44,6 +48,15 @@ public class SignedSourceKey extends SpecificRecordBase implements SpecificRecor
.name("signature").type().bytesBuilder().endBytes().noDefault()
.endRecord();

public static final TypeInformation<Row> FLINK_TYPE_INFO = Types.ROW_NAMED(
new String[]{"topic", "partition", "offset", "signature"},
Types.STRING, Types.INT, Types.LONG, Types.PRIMITIVE_ARRAY(Types.BYTE));


public Row toRow() {
return Row.of(topic, partition, offset, signature);
}

// static public class SignedSourceKeyBuilder {
// public SignedSourceKeyBuilder signature(byte[] bytes) {
// this.signature = ByteBuffer.wrap(bytes);
Expand Down
Loading