Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

EAGLE-14: Hdfs User command re-assembler #19

Closed
wants to merge 14 commits into from
1 change: 1 addition & 0 deletions eagle-assembly/src/main/bin/eagle-create-table.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,6 @@ def createEagleTable(admin, tableName)
createEagleTable(admin, 'hbaseResourceSensitivity')
createEagleTable(admin, 'mlmodel')
createEagleTable(admin, 'userprofile')
createEagleTable(admin, 'hfdsusercommandpattern')

exit
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.eagle.alert.entity.AlertDataSourceEntity;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
import org.apache.eagle.common.config.EagleConfigConstants;
import org.apache.eagle.service.client.EagleServiceConnector;
import org.apache.eagle.service.client.IEagleServiceClient;
import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
import org.apache.commons.lang.time.DateUtils;
Expand All @@ -31,37 +32,16 @@

public class AlertDataSourceDAOImpl implements AlertDataSourceDAO{
private final Logger LOG = LoggerFactory.getLogger(AlertDataSourceDAOImpl.class);
private final EagleServiceConnector connector;

private final String eagleServiceHost;
private final Integer eagleServicePort;
private String username;
private String password;

public AlertDataSourceDAOImpl(String eagleServiceHost, Integer eagleServicePort, String username, String password) {
this.eagleServiceHost = eagleServiceHost;
this.eagleServicePort = eagleServicePort;
this.username = username;
this.password = password;
}

public AlertDataSourceDAOImpl(String eagleServiceHost, Integer eagleServicePort) {
this(eagleServiceHost,eagleServicePort, null, null);
}

public AlertDataSourceDAOImpl(Config config) {
this.eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
this.eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
if (config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) &&
config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD)) {
this.username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
this.password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
}
public AlertDataSourceDAOImpl(EagleServiceConnector connector){
this.connector = connector;
}

@Override
public List<AlertDataSourceEntity> findAlertDataSourceBySite(String site) throws Exception{
try {
IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password);
IEagleServiceClient client = new EagleServiceClientImpl(connector);
String query = AlertConstants.ALERT_STREAM_SERVICE_ENDPOINT_NAME + "[@site=\"" + site + "\"]{*}";
GenericServiceAPIResponseEntity<AlertDataSourceEntity> response = client.search()
.startTime(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
*/
package org.apache.eagle.alert.dao;

import com.typesafe.config.Config;
import org.apache.eagle.alert.common.AlertConstants;
import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
import org.apache.eagle.common.config.EagleConfigConstants;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
import org.apache.eagle.service.client.EagleServiceConnector;
import org.apache.eagle.service.client.IEagleServiceClient;
import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
import org.slf4j.Logger;
Expand All @@ -35,37 +34,17 @@
* Utility methods to load alert definitions for a program
*/
public class AlertDefinitionDAOImpl implements AlertDefinitionDAO {
private final String eagleServiceHost;
private final Integer eagleServicePort;
private String username;
private String password;
private final Logger LOG = LoggerFactory.getLogger(AlertDefinitionDAOImpl.class);
private final EagleServiceConnector connector;

public AlertDefinitionDAOImpl(String eagleServiceHost, Integer eagleServicePort) {
this(eagleServiceHost, eagleServicePort, null, null);
}

public AlertDefinitionDAOImpl(String eagleServiceHost, Integer eagleServicePort, String username, String password) {
this.eagleServiceHost = eagleServiceHost;
this.eagleServicePort = eagleServicePort;
this.username = username;
this.password = password;
}

public AlertDefinitionDAOImpl(Config config){
this.eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
this.eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
if (config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) &&
config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD)) {
this.username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
this.password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
}
public AlertDefinitionDAOImpl(EagleServiceConnector connector){
this.connector = connector;
}

@Override
public List<AlertDefinitionAPIEntity> findActiveAlertDefs(String site, String dataSource) throws Exception {
try {
IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password);
IEagleServiceClient client = new EagleServiceClientImpl(connector);
String query = AlertConstants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME + "[@site=\"" + site + "\" AND @dataSource=\"" + dataSource + "\"]{*}";
GenericServiceAPIResponseEntity<AlertDefinitionAPIEntity> response = client.search()
.pageSize(Integer.MAX_VALUE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
*/
package org.apache.eagle.alert.dao;

import com.typesafe.config.Config;
import org.apache.eagle.alert.common.AlertConstants;
import org.apache.eagle.alert.entity.AlertExecutorEntity;
import org.apache.eagle.common.config.EagleConfigConstants;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
import org.apache.eagle.service.client.EagleServiceConnector;
import org.apache.eagle.service.client.IEagleServiceClient;
import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
import org.apache.commons.lang.time.DateUtils;
Expand All @@ -30,38 +29,17 @@
import java.util.List;

public class AlertExecutorDAOImpl implements AlertExecutorDAO{
private final Logger LOG = LoggerFactory.getLogger(AlertDataSourceDAOImpl.class);
private final Logger LOG = LoggerFactory.getLogger(AlertExecutorDAOImpl.class);
private final EagleServiceConnector connector;

private final String eagleServiceHost;
private final Integer eagleServicePort;
private String username;
private String password;

public AlertExecutorDAOImpl(String eagleServiceHost, Integer eagleServicePort) {
this(eagleServiceHost, eagleServicePort, null, null);
}

public AlertExecutorDAOImpl(String eagleServiceHost, Integer eagleServicePort, String username, String password) {
this.eagleServiceHost = eagleServiceHost;
this.eagleServicePort = eagleServicePort;
this.username = username;
this.password = password;
}

public AlertExecutorDAOImpl(Config config) {
this.eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
this.eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
if (config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) &&
config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD)) {
this.username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
this.password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
}
public AlertExecutorDAOImpl(EagleServiceConnector connector){
this.connector = connector;
}

@Override
public List<AlertExecutorEntity> findAlertExecutorByDataSource(String dataSource) throws Exception{
try {
IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password);
IEagleServiceClient client = new EagleServiceClientImpl(connector);
String query = AlertConstants.ALERT_EXECUTOR_SERVICE_ENDPOINT_NAME + "[@dataSource=\"" + dataSource + "\"]{*}";
GenericServiceAPIResponseEntity<AlertExecutorEntity> response = client.search()
.startTime(0)
Expand All @@ -84,7 +62,7 @@ public List<AlertExecutorEntity> findAlertExecutorByDataSource(String dataSource
@Override
public List<AlertExecutorEntity> findAlertExecutor(String dataSource, String alertExecutorId) throws Exception{
try {
IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password);
IEagleServiceClient client = new EagleServiceClientImpl(connector);
String query = AlertConstants.ALERT_EXECUTOR_SERVICE_ENDPOINT_NAME + "[@dataSource=\"" + dataSource + "\""
+ " AND @alertExecutorId=\"" + alertExecutorId + "\""
+ "]{*}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.eagle.alert.entity.AlertStreamEntity;
import org.apache.eagle.common.config.EagleConfigConstants;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
import org.apache.eagle.service.client.EagleServiceConnector;
import org.apache.eagle.service.client.IEagleServiceClient;
import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
import org.apache.commons.lang.time.DateUtils;
Expand All @@ -31,36 +32,15 @@

public class AlertStreamDAOImpl implements AlertStreamDAO{
private final Logger LOG = LoggerFactory.getLogger(AlertStreamDAOImpl.class);
private final EagleServiceConnector connector;

private final String eagleServiceHost;
private final Integer eagleServicePort;
private String username;
private String password;

public AlertStreamDAOImpl(String eagleServiceHost, Integer eagleServicePort) {
this(eagleServiceHost, eagleServicePort, null, null);
}

public AlertStreamDAOImpl(String eagleServiceHost, Integer eagleServicePort, String username, String password) {
this.eagleServiceHost = eagleServiceHost;
this.eagleServicePort = eagleServicePort;
this.username = username;
this.password = password;
}

public AlertStreamDAOImpl(Config config) {
this.eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
this.eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
if (config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) &&
config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD)) {
this.username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
this.password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
}
public AlertStreamDAOImpl(EagleServiceConnector connector){
this.connector = connector;
}

public List<AlertStreamEntity> findAlertStreamByDataSource(String dataSource) throws Exception{
try {
IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password);
IEagleServiceClient client = new EagleServiceClientImpl(connector);
String query = AlertConstants.ALERT_STREAM_SERVICE_ENDPOINT_NAME + "[@dataSource=\"" + dataSource + "\"]{*}";
GenericServiceAPIResponseEntity<AlertStreamEntity> response = client.search()
.startTime(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,7 @@ public static String convertToStreamDef(String streamName){
StringBuilder sb = new StringBuilder();
sb.append(EAGLE_ALERT_CONTEXT_FIELD + " object,");
for(Map.Entry<String, AlertStreamSchemaEntity> entry : map.entrySet()){
String attrName = entry.getKey();
sb.append(attrName);
sb.append(" ");
String attrType = entry.getValue().getAttrType();
if(attrType.equalsIgnoreCase(AttributeType.STRING.name())){
sb.append("string");
}else if(attrType.equalsIgnoreCase(AttributeType.INTEGER.name())){
sb.append("int");
}else if(attrType.equalsIgnoreCase(AttributeType.LONG.name())){
sb.append("long");
}else if(attrType.equalsIgnoreCase(AttributeType.BOOL.name())){
sb.append("bool");
}else if(attrType.equalsIgnoreCase(AttributeType.FLOAT.name())){
sb.append("float");
}else if(attrType.equalsIgnoreCase(AttributeType.DOUBLE.name())){
sb.append("double");
}else{
LOG.warn("AttrType is not recognized, ignore : " + attrType);
}
sb.append(",");
appendAttributeNameType(sb, entry.getKey(), entry.getValue().getAttrType());
}
if(sb.length() > 0){
sb.deleteCharAt(sb.length()-1);
Expand All @@ -80,6 +61,41 @@ public static String convertToStreamDef(String streamName){
return String.format(siddhiStreamDefFormat, sb.toString());
}

public static String convertToStreamDef(String streamName, Map<String, String> eventSchema){
StringBuilder sb = new StringBuilder();
sb.append("context" + " object,");
for(Map.Entry<String, String> entry : eventSchema.entrySet()){
appendAttributeNameType(sb, entry.getKey(), entry.getValue());
}
if(sb.length() > 0){
sb.deleteCharAt(sb.length()-1);
}

String siddhiStreamDefFormat = "define stream " + streamName + "(" + "%s" + ");";
return String.format(siddhiStreamDefFormat, sb.toString());
}

private static void appendAttributeNameType(StringBuilder sb, String attrName, String attrType){
sb.append(attrName);
sb.append(" ");
if(attrType.equalsIgnoreCase(AttributeType.STRING.name())){
sb.append("string");
}else if(attrType.equalsIgnoreCase(AttributeType.INTEGER.name())){
sb.append("int");
}else if(attrType.equalsIgnoreCase(AttributeType.LONG.name())){
sb.append("long");
}else if(attrType.equalsIgnoreCase(AttributeType.BOOL.name())){
sb.append("bool");
}else if(attrType.equalsIgnoreCase(AttributeType.FLOAT.name())){
sb.append("float");
}else if(attrType.equalsIgnoreCase(AttributeType.DOUBLE.name())){
sb.append("double");
}else{
LOG.warn("AttrType is not recognized, ignore : " + attrType);
}
sb.append(",");
}

public static Object getAttrDefaultValue(String streamName, String attrName){
SortedMap<String, AlertStreamSchemaEntity> map = getAttrMap(streamName);
AlertStreamSchemaEntity entity = map.get(attrName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValue;
import org.apache.eagle.common.config.EagleConfigConstants;
import org.apache.eagle.service.client.EagleServiceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -61,7 +62,7 @@ public static AlertExecutor[] createAlertExecutors(Config config, String alertEx
// Get map from alertExecutorId to alert stream
// (dataSource) => Map[alertExecutorId:String,streamName:List[String]]
List<String> streamNames = new ArrayList<String>();
AlertExecutorDAOImpl alertExecutorDAO = new AlertExecutorDAOImpl(config);
AlertExecutorDAOImpl alertExecutorDAO = new AlertExecutorDAOImpl(new EagleServiceConnector(config));
List<AlertExecutorEntity> alertExecutorEntities = alertExecutorDAO.findAlertExecutor(dataSource, alertExecutorId);
for(AlertExecutorEntity entity : alertExecutorEntities){
streamNames.add(entity.getTags().get(AlertConstants.STREAM_NAME));
Expand All @@ -70,7 +71,7 @@ public static AlertExecutor[] createAlertExecutors(Config config, String alertEx
if(streamNames.isEmpty()){
throw new IllegalStateException("upstream names should not be empty for alert " + alertExecutorId);
}
return createAlertExecutors(config, new AlertDefinitionDAOImpl(config),
return createAlertExecutors(config, new AlertDefinitionDAOImpl(new EagleServiceConnector(config)),
streamNames, alertExecutorId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.eagle.datastream.Tuple2;
import org.apache.eagle.executor.AlertExecutor;
import junit.framework.Assert;
import org.apache.eagle.service.client.EagleServiceConnector;
import org.junit.Test;

import java.util.*;
Expand Down Expand Up @@ -94,7 +95,7 @@ public List<AlertStreamSchemaEntity> findAlertStreamSchemaByDataSource(String da
SiddhiPolicyEvaluator evaluator = new SiddhiPolicyEvaluator(config, "testPolicy", policyDef, new String[]{"hdfsAuditLogEventStream"});
EagleAlertContext context = new EagleAlertContext();

AlertDefinitionDAO alertDao = new AlertDefinitionDAOImpl(null, null) {
AlertDefinitionDAO alertDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(null, null)) {
@Override
public Map<String, Map<String, AlertDefinitionAPIEntity>> findActiveAlertDefsGroupbyAlertExecutorId(String site, String dataSource) throws Exception {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.typesafe.config.ConfigFactory;
import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
import org.apache.eagle.common.config.EagleConfigConstants;
import org.apache.eagle.service.client.EagleServiceConnector;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -51,7 +52,7 @@ public void test() throws Exception{

String site = "sandbox";
String dataSource = "UnitTest";
AlertDefinitionDAO dao = new AlertDefinitionDAOImpl(eagleServiceHost, eagleServicePort) {
AlertDefinitionDAO dao = new AlertDefinitionDAOImpl(new EagleServiceConnector(eagleServiceHost, eagleServicePort)) {
@Override
public List<AlertDefinitionAPIEntity> findActiveAlertDefs(String site, String dataSource) throws Exception {
List<AlertDefinitionAPIEntity> list = new ArrayList<AlertDefinitionAPIEntity>();
Expand Down