Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
[EAGLE-911]: Have mongo metadata dbname configurable
Browse files Browse the repository at this point in the history
Support configurable database for mongo metadata implementation, this would help to ease prod migration (use a test db first)

Author: ralphsu
This closes #
  • Loading branch information
RalphSu committed Feb 18, 2017
1 parent 73d03b9 commit e0f04d4
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 35 deletions.
Expand Up @@ -18,20 +18,20 @@

package org.apache.eagle.alert.metadata;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

import org.apache.eagle.alert.coordination.model.ScheduleState;
import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
import org.apache.eagle.alert.engine.coordinator.PublishmentType;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import com.typesafe.config.Config;
import org.apache.eagle.alert.engine.model.AlertPublishEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import com.typesafe.config.Config;

public class MetadataUtils {

Expand All @@ -45,6 +45,7 @@ public class MetadataUtils {
public static final String JDBC_CONNECTION_PATH = "jdbc.connection";
public static final String JDBC_CONNECTION_PROPERTIES_PATH = "jdbc.connectionProperties";
public static final String MONGO_CONNECTION_PATH = "mongo.connection";
public static final String MONGO_DATABASE = "mongo.database";

public static <T> String getKey(T t) {
if (t instanceof StreamDefinition) {
Expand Down
Expand Up @@ -16,63 +16,80 @@
*/
package org.apache.eagle.alert.metadata.impl;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.mongodb.Block;
import com.mongodb.Function;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.MongoIterable;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
import com.typesafe.config.Config;
import org.apache.eagle.alert.coordination.model.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
import org.apache.eagle.alert.coordination.model.PublishSpec;
import org.apache.eagle.alert.coordination.model.RouterSpec;
import org.apache.eagle.alert.coordination.model.ScheduleState;
import org.apache.eagle.alert.coordination.model.SpoutSpec;
import org.apache.eagle.alert.coordination.model.VersionedPolicyDefinition;
import org.apache.eagle.alert.coordination.model.VersionedStreamDefinition;
import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
import org.apache.eagle.alert.coordination.model.internal.ScheduleStateBase;
import org.apache.eagle.alert.coordination.model.internal.Topology;
import org.apache.eagle.alert.engine.coordinator.*;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.engine.coordinator.Publishment;
import org.apache.eagle.alert.engine.coordinator.PublishmentType;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
import org.apache.eagle.alert.engine.model.AlertPublishEvent;
import org.apache.eagle.alert.metadata.IMetadataDao;
import org.apache.eagle.alert.metadata.MetadataUtils;
import org.apache.eagle.alert.metadata.resource.Models;
import org.apache.eagle.alert.metadata.resource.OpResult;

import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonString;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.mongodb.Block;
import com.mongodb.Function;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.MongoIterable;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
import com.typesafe.config.Config;

/**
* @since Apr 11, 2016.
*/
public class MongoMetadataDaoImpl implements IMetadataDao {

private static final String DB_NAME = "ump_alert_metadata";
private static final String DEFAULT_DB_NAME = "ump_alert_metadata";
private static final Logger LOG = LoggerFactory.getLogger(MongoMetadataDaoImpl.class);
private static final ObjectMapper mapper = new ObjectMapper();
private static final int DEFAULT_CAPPED_MAX_SIZE = 500 * 1024 * 1024;
private static final int DEFAULT_CAPPED_MAX_DOCUMENTS = 20000;
private static final String MANGO_CAPPED_MAX_SIZE = "mongo.cappedMaxSize";
private static final String MANGO_CAPPED_MAX_DOCUMENTS = "mongo.cappedMaxDocuments";
private static final String MONGO_CAPPED_MAX_SIZE = "mongo.cappedMaxSize";
private static final String MONGO_CAPPED_MAX_DOCUMENTS = "mongo.cappedMaxDocuments";

static {
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

private final String connection;
private final String dbname;
private final MongoClient client;
private final int cappedMaxSize;
private final int cappedMaxDocuments;
Expand Down Expand Up @@ -101,9 +118,10 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
@Inject
public MongoMetadataDaoImpl(Config config) {
this.connection = config.getString(MetadataUtils.MONGO_CONNECTION_PATH);
this.cappedMaxSize = config.hasPath(MANGO_CAPPED_MAX_SIZE) ? config.getInt(MANGO_CAPPED_MAX_SIZE) : DEFAULT_CAPPED_MAX_SIZE;
this.cappedMaxDocuments = config.hasPath(MANGO_CAPPED_MAX_DOCUMENTS) ? config.getInt(MANGO_CAPPED_MAX_DOCUMENTS) : DEFAULT_CAPPED_MAX_DOCUMENTS;
this.cappedMaxSize = config.hasPath(MONGO_CAPPED_MAX_SIZE) ? config.getInt(MONGO_CAPPED_MAX_SIZE) : DEFAULT_CAPPED_MAX_SIZE;
this.cappedMaxDocuments = config.hasPath(MONGO_CAPPED_MAX_DOCUMENTS) ? config.getInt(MONGO_CAPPED_MAX_DOCUMENTS) : DEFAULT_CAPPED_MAX_DOCUMENTS;
this.client = new MongoClient(new MongoClientURI(this.connection));
this.dbname = config.hasPath(MetadataUtils.MONGO_DATABASE) ? config.getString(MetadataUtils.MONGO_DATABASE) : DEFAULT_DB_NAME;
init();
}

Expand Down Expand Up @@ -135,7 +153,7 @@ private MongoCollection<Document> getCollection(String collectionName) {
}

private void init() {
db = client.getDatabase(DB_NAME);
db = client.getDatabase(this.dbname);
IndexOptions io = new IndexOptions().background(true).name("nameIndex");
BsonDocument doc = new BsonDocument();
doc.append("name", new BsonInt32(1));
Expand Down
Expand Up @@ -19,6 +19,7 @@ metadata {
mongo {
connection = "mongodb://localhost:27017"
cappedSize = 20000
database = testdb
}
}

0 comments on commit e0f04d4

Please sign in to comment.