Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for HerdDB database #183

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ plugins {
repositories {
maven { url 'http://maven.aliyun.com/nexus/content/groups/public' }
jcenter()
mavenCentral()
}

configurations {
Expand Down Expand Up @@ -64,6 +65,7 @@ dependencies {
compile group: 'org.springframework.cloud', name: 'spring-cloud-starter-netflix-zuul', version: springBootVersion
compile group: 'org.mybatis.spring.boot', name: 'mybatis-spring-boot-starter', version: springMybatisVersion
compile group: 'mysql', name: 'mysql-connector-java', version: mysqlConnectorVersion
compile group: 'org.herddb', name: 'herddb-jdbc', version: herddbVersion
compile group: 'javax.validation', name: 'validation-api', version: javaxValidationVersion
compile group: 'io.jsonwebtoken', name: 'jjwt', version: jsonWebTokenVersion
compile group: 'org.xerial', name: 'sqlite-jdbc', version: sqliteVersion
Expand Down
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ swaggeruiVersion=2.9.2
apiMockitoVersion=1.7.1
mockitoJunit4Version=1.7.1
gsonVersion=2.8.2
herddbVersion=0.12.0
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Page<ConsumerStatsEntity> findByReplicationStatsId(Integer pageNum, Integ
}

public void remove(long timestamp, long timeInterval) {
consumerStatsMapper.delete(timestamp, timeInterval);
consumerStatsMapper.delete(timestamp - timeInterval);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public Page<PublisherStatsEntity> findByTopicStatsId(Integer pageNum, Integer pa
}

public void remove(long timestamp, long timeInterval) {
publishersStatsMapper.delete(timestamp, timeInterval);
publishersStatsMapper.delete(timestamp - timeInterval);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ public Page<ReplicationStatsEntity> findByTopicStatsId(Integer pageNum, Integer
}

public void remove(long timestamp, long timeInterval) {
replicationsStatsMapper.delete(timestamp, timeInterval);
replicationsStatsMapper.delete(timestamp - timeInterval);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ public Page<SubscriptionStatsEntity> findByTopicStatsId(Integer pageNum, Integer
}

public void remove(long timestamp, long timeInterval) {
subscriptionsStatsMapper.delete(timestamp, timeInterval);
subscriptionsStatsMapper.delete(timestamp - timeInterval);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,6 @@ public Page<TopicStatsEntity> findByMultiTopic(Integer pageNum,
}

public void remove(long timestamp, long timeInterval) {
topicsStatsMapper.delete(timestamp, timeInterval);
topicsStatsMapper.delete(timestamp - timeInterval);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public interface ConsumerStatsMapper {

@Insert("INSERT INTO consumersStats(consumer,topicStatsId,replicationStatsId,subscriptionStatsId,address," +
"availablePermits,connectedSince,msgRateOut,msgThroughputOut,msgRateRedeliver," +
"clientVersion,timestamp,metadata) " +
"clientVersion,`timestamp`,metadata) " +
"VALUES(#{consumer},#{topicStatsId},#{replicationStatsId},#{subscriptionStatsId},#{address}," +
"#{availablePermits},#{connectedSince},#{msgRateOut},#{msgThroughputOut},#{msgRateRedeliver}," +
"#{clientVersion},#{timestamp},#{metadata})")
Expand All @@ -31,25 +31,25 @@ public interface ConsumerStatsMapper {

@Select("SELECT consumerStatsId,consumer,topicStatsId,replicationStatsId,subscriptionStatsId,address," +
"availablePermits,connectedSince,msgRateOut,msgThroughputOut,msgRateRedeliver," +
"clientVersion,timestamp,metadata FROM consumersStats " +
"where topicStatsId=#{topicStatsId} and timestamp=#{timestamp}")
"clientVersion,`timestamp`,metadata FROM consumersStats " +
"where topicStatsId=#{topicStatsId} and `timestamp`=#{timestamp}")
Page<ConsumerStatsEntity> findByTopicStatsId(@Param("topicStatsId") long topicStatsId,
@Param("timestamp") long timestamp);

@Select("SELECT consumerStatsId,consumer,topicStatsId,replicationStatsId,subscriptionStatsId,address," +
"availablePermits,connectedSince,msgRateOut,msgThroughputOut,msgRateRedeliver," +
"clientVersion,timestamp,metadata FROM consumersStats " +
"where subscriptionStatsId=#{subscriptionStatsId} and timestamp=#{timestamp}")
"clientVersion,`timestamp`,metadata FROM consumersStats " +
"where subscriptionStatsId=#{subscriptionStatsId} and `timestamp`=#{timestamp}")
Page<ConsumerStatsEntity> findBySubscriptionStatsId(@Param("subscriptionStatsId") long subscriptionStatsId,
@Param("timestamp") long timestamp);

@Select("SELECT consumerStatsId,consumer,topicStatsId,replicationStatsId,subscriptionStatsId,address," +
"availablePermits,connectedSince,msgRateOut,msgThroughputOut,msgRateRedeliver," +
"clientVersion,timestamp,metadata FROM consumersStats " +
"where replicationStatsId=#{replicationStatsId} and timestamp=#{timestamp}")
"clientVersion,`timestamp`,metadata FROM consumersStats " +
"where replicationStatsId=#{replicationStatsId} and `timestamp`=#{timestamp}")
Page<ConsumerStatsEntity> findByReplicationStatsId(@Param("replicationStatsId") long replicationStatsId,
@Param("timestamp") long timestamp);

@Delete("DELETE FROM consumersStats WHERE #{nowTime} - #{timeInterval} >= timestamp")
void delete(@Param("nowTime") long nowTime, @Param("timeInterval") long timeInterval);
@Delete("DELETE FROM consumersStats WHERE `timestamp` <= #{refTime}")
void delete(@Param("refTime") long refTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@
public interface PublishersStatsMapper {

@Insert("INSERT INTO publishersStats(producerId,topicStatsId,producerName,msgRateIn," +
"msgThroughputIn,averageMsgSize,address,connectedSince,clientVersion,metadata,timestamp) " +
"msgThroughputIn,averageMsgSize,address,connectedSince,clientVersion,metadata,`timestamp`) " +
"VALUES(#{producerId},#{topicStatsId},#{producerName},#{msgRateIn},#{msgThroughputIn}," +
"#{averageMsgSize},#{address},#{connectedSince},#{clientVersion},#{metadata},#{timestamp})")
@Options(useGeneratedKeys=true, keyProperty="publisherStatsId", keyColumn="publisherStatsId")
void save(PublisherStatsEntity publisherStatsEntity);

@Select("SELECT publisherStatsId,producerId,topicStatsId,producerName,msgRateIn,msgThroughputIn,averageMsgSize," +
"address,connectedSince,clientVersion,metadata,timestamp From publishersStats " +
"WHERE topicStatsId=#{topicStatsId} and timestamp=#{timestamp}")
"address,connectedSince,clientVersion,metadata,`timestamp` From publishersStats " +
"WHERE topicStatsId=#{topicStatsId} and `timestamp`=#{timestamp}")
Page<PublisherStatsEntity> findByTopicStatsId(@Param("topicStatsId") long topicStatsId,
@Param("timestamp") long timestamp);

@Delete("DELETE FROM publishersStats WHERE #{nowTime} - #{timeInterval} >= timestamp")
void delete(@Param("nowTime") long nowTime, @Param("timeInterval") long timeInterval);
@Delete("DELETE FROM publishersStats WHERE `timestamp` <= #{refTime}")
void delete(@Param("refTime") long refTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public interface ReplicationsStatsMapper {

@Insert("INSERT INTO replicationsStats(topicStatsId,cluster,connected,msgRateIn,msgRateOut,msgThroughputIn," +
"msgThroughputOut,replicationBacklog,replicationDelayInSeconds,inboundConnection," +
"inboundConnectedSince,outboundConnection,outboundConnectedSince,timestamp,msgRateExpired) " +
"inboundConnectedSince,outboundConnection,outboundConnectedSince,`timestamp`,msgRateExpired) " +
"VALUES(#{topicStatsId},#{cluster},#{connected},#{msgRateIn},#{msgRateOut},#{msgThroughputIn}," +
"#{msgThroughputOut},#{replicationBacklog},#{replicationDelayInSeconds}," +
"#{inboundConnection},#{inboundConnectedSince},#{outboundConnection},#{outboundConnectedSince}," +
Expand All @@ -32,11 +32,11 @@ public interface ReplicationsStatsMapper {

@Select("SELECT replicationStatsId,topicStatsId,cluster,connected,msgRateIn,msgRateOut,msgThroughputIn,msgThroughputOut," +
"replicationBacklog,replicationDelayInSeconds,inboundConnection,inboundConnectedSince," +
"outboundConnection,outboundConnectedSince,timestamp,msgRateExpired FROM replicationsStats " +
"where topicStatsId=#{topicStatsId} and timestamp=#{timestamp}")
"outboundConnection,outboundConnectedSince,`timestamp`,msgRateExpired FROM replicationsStats " +
"where topicStatsId=#{topicStatsId} and `timestamp`=#{timestamp}")
Page<ReplicationStatsEntity> findByTopicStatsId(@Param("topicStatsId") long topicStatsId,
@Param("timestamp") long timestamp);

@Delete("DELETE FROM replicationsStats WHERE #{nowTime} - #{timeInterval} >= timestamp")
void delete(@Param("nowTime") long nowTime, @Param("timeInterval") long timeInterval);
@Delete("DELETE FROM replicationsStats WHERE `timestamp` <= #{refTime}")
void delete(@Param("refTime") long refTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public interface SubscriptionsStatsMapper {

@Insert("INSERT INTO subscriptionsStats(topicStatsId,subscription,msgBacklog,msgRateExpired," +
"msgRateOut,msgThroughputOut,msgRateRedeliver,numberOfEntriesSinceFirstNotAckedMessage," +
"totalNonContiguousDeletedMessagesRange,subscriptionType,timestamp) " +
"totalNonContiguousDeletedMessagesRange,subscriptionType,`timestamp`) " +
"VALUES(#{topicStatsId},#{subscription},#{msgBacklog},#{msgRateExpired},#{msgRateOut}," +
"#{msgThroughputOut},#{msgRateRedeliver},#{numberOfEntriesSinceFirstNotAckedMessage}," +
"#{totalNonContiguousDeletedMessagesRange},#{subscriptionType}," +
Expand All @@ -32,11 +32,11 @@ public interface SubscriptionsStatsMapper {

@Select("SELECT subscriptionStatsId,topicStatsId,subscription,msgBacklog,msgRateExpired,msgRateOut," +
"msgThroughputOut,msgRateRedeliver,numberOfEntriesSinceFirstNotAckedMessage," +
"totalNonContiguousDeletedMessagesRange,subscriptionType,timestamp FROM subscriptionsStats " +
"where topicStatsId=#{topicStatsId} and timestamp=#{timestamp}")
"totalNonContiguousDeletedMessagesRange,subscriptionType,`timestamp` FROM subscriptionsStats " +
"where topicStatsId=#{topicStatsId} and `timestamp`=#{timestamp}")
Page<SubscriptionStatsEntity> findByTopicStatsId(@Param("topicStatsId") long topicStatsId,
@Param("timestamp") long timestamp);

@Delete("DELETE FROM subscriptionsStats WHERE #{nowTime} - #{timeInterval} >= timestamp")
void delete(@Param("nowTime") long nowTime, @Param("timeInterval") long timeInterval);
@Delete("DELETE FROM subscriptionsStats WHERE `timestamp` <= #{refTime}")
void delete(@Param("refTime") long refTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,31 @@ public interface TopicsStatsMapper {

@Insert("INSERT INTO topicsStats(environment, cluster,broker,tenant,namespace,bundle,persistent,topic," +
"producerCount,subscriptionCount,msgRateIn,msgThroughputIn,msgRateOut,msgThroughputOut," +
"averageMsgSize,storageSize,timestamp) " +
"averageMsgSize,storageSize,`timestamp`) " +
"VALUES(#{environment},#{cluster},#{broker},#{tenant},#{namespace},#{bundle},#{persistent},#{topic}," +
"#{producerCount},#{subscriptionCount},#{msgRateIn},#{msgThroughputIn},#{msgRateOut},#{msgThroughputOut}," +
"#{averageMsgSize},#{storageSize},#{timestamp})")
@Options(useGeneratedKeys=true, keyProperty="topicStatsId", keyColumn="topicStatsId")
void insert(TopicStatsEntity topicStatsEntity);

@Select("SELECT topicStatsId,environment,cluster,broker,tenant,namespace,bundle,persistent,topic,producerCount,subscriptionCount," +
"msgRateIn,msgThroughputIn,msgRateOut,msgThroughputOut,averageMsgSize,storageSize,timestamp FROM topicsStats " +
"ORDER BY timestamp DESC limit 1 ")
"msgRateIn,msgThroughputIn,msgRateOut,msgThroughputOut,averageMsgSize,storageSize,`timestamp` FROM topicsStats " +
"ORDER BY `timestamp` DESC limit 1 ")
TopicStatsEntity findMaxTime();

@Select("SELECT topicStatsId,environment,cluster,broker,tenant,namespace,bundle,persistent,topic,producerCount,subscriptionCount," +
"msgRateIn,msgThroughputIn,msgRateOut,msgThroughputOut,averageMsgSize,storageSize,timestamp FROM topicsStats " +
"WHERE environment=#{environment} and cluster=#{cluster} and broker=#{broker} and timestamp=#{timestamp}")
"msgRateIn,msgThroughputIn,msgRateOut,msgThroughputOut,averageMsgSize,storageSize,`timestamp` FROM topicsStats " +
"WHERE environment=#{environment} and cluster=#{cluster} and broker=#{broker} and `timestamp`=#{timestamp}")
Page<TopicStatsEntity> findByClusterBroker(
@Param("environment") String environment,
@Param("cluster") String cluster,
@Param("broker") String broker,
@Param("timestamp") long timestamp);

@Select("SELECT topicStatsId,environment,cluster,tenant,namespace,bundle,persistent,topic,producerCount,subscriptionCount," +
"msgRateIn,msgThroughputIn,msgRateOut,msgThroughputOut,averageMsgSize,storageSize,timestamp FROM topicsStats " +
"msgRateIn,msgThroughputIn,msgRateOut,msgThroughputOut,averageMsgSize,storageSize,`timestamp` FROM topicsStats " +
"WHERE environment=#{environment} and tenant=#{tenant} and namespace=#{namespace} " +
"and timestamp=#{timestamp}")
"and `timestamp`=#{timestamp}")
Page<TopicStatsEntity> findByNamespace(
@Param("environment") String environment,
@Param("tenant") String tenant,
Expand All @@ -64,10 +64,10 @@ Page<TopicStatsEntity> findByNamespace(
+ "sum(msgRateOut) as msgRateOut,"
+ "sum(msgThroughputOut) as msgThroughputOut,"
+ "avg(averageMsgSize) as averageMsgSize,"
+ "sum(storageSize) as storageSize, timestamp FROM topicsStats",
"WHERE environment=#{environment} and tenant=#{tenant} and namespace=#{namespace} and timestamp=#{timestamp} and " +
+ "sum(storageSize) as storageSize, `timestamp` FROM topicsStats",
"WHERE environment=#{environment} and tenant=#{tenant} and namespace=#{namespace} and `timestamp`=#{timestamp} and " +
"topic IN <foreach collection='topicList' item='topic' open='(' separator=',' close=')'> #{topic} </foreach>" +
"GROUP BY cluster, persistent, topic" +
"GROUP BY environment, cluster, tenant, namespace, persistent, topic, `timestamp` " +
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this "group by" clause was incomplete and Apache Calcite wasn't happy

Copy link
Member

@tuteng tuteng Sep 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed it here streamnative#190

"</script>"})
Page<TopicStatsEntity> findByMultiTopic(
@Param("environment") String environment,
Expand All @@ -77,6 +77,6 @@ Page<TopicStatsEntity> findByMultiTopic(
@Param("topicList") List<String> topicList,
@Param("timestamp") long timestamp);

@Delete("DELETE FROM topicsStats WHERE #{nowTime} - #{timeInterval} >= timestamp")
void delete(@Param("nowTime") long nowTime, @Param("timeInterval") long timeInterval);
@Delete("DELETE FROM topicsStats WHERE `timestamp` <= #{refTime}")
void delete(@Param("refTime") long refTime);
}
110 changes: 110 additions & 0 deletions src/main/resources/META-INF/sql/herddb-schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

CREATE TABLE IF NOT EXISTS environments (
name varchar(256) NOT NULL PRIMARY KEY,
broker varchar(1024) NOT NULL,
UNIQUE (broker)
) ;

CREATE TABLE IF NOT EXISTS topicsStats (
topicStatsId BIGINT PRIMARY KEY AUTO_INCREMENT,
environment varchar(255) NOT NULL,
cluster varchar(255) NOT NULL,
broker varchar(255) NOT NULL,
tenant varchar(255) NOT NULL,
namespace varchar(255) NOT NULL,
bundle varchar(255) NOT NULL,
persistent varchar(36) NOT NULL,
topic varchar(255) NOT NULL,
producerCount BIGINT,
subscriptionCount BIGINT,
msgRateIn double,
msgThroughputIn double,
msgRateOut double,
msgThroughputOut double,
averageMsgSize double,
storageSize double,
timestamp BIGINT
) ;

CREATE TABLE IF NOT EXISTS publishersStats (
publisherStatsId BIGINT PRIMARY KEY AUTO_INCREMENT,
producerId BIGINT,
topicStatsId BIGINT NOT NULL,
producerName varchar(255) NOT NULL,
msgRateIn double,
msgThroughputIn double,
averageMsgSize double,
address varchar(255),
connectedSince varchar(128),
clientVersion varchar(36),
metadata text,
timestamp BIGINT,
CONSTRAINT FK_publishers_stats_topic_stats_id FOREIGN KEY (topicStatsId) References topicsStats(topicStatsId)
);

CREATE TABLE IF NOT EXISTS replicationsStats (
replicationStatsId BIGINT PRIMARY KEY AUTO_INCREMENT,
topicStatsId BIGINT NOT NULL,
cluster varchar(255) NOT NULL,
connected BOOLEAN,
msgRateIn double,
msgRateOut double,
msgRateExpired double,
msgThroughputIn double,
msgThroughputOut double,
msgRateRedeliver double,
replicationBacklog BIGINT,
replicationDelayInSeconds BIGINT,
inboundConnection varchar(255),
inboundConnectedSince varchar(255),
outboundConnection varchar(255),
outboundConnectedSince varchar(255),
timestamp BIGINT
) ;

CREATE TABLE IF NOT EXISTS subscriptionsStats (
subscriptionStatsId BIGINT PRIMARY KEY AUTO_INCREMENT,
topicStatsId BIGINT NOT NULL,
subscription varchar(255) NULL,
msgBacklog BIGINT,
msgRateExpired double,
msgRateOut double,
msgThroughputOut double,
msgRateRedeliver double,
numberOfEntriesSinceFirstNotAckedMessage BIGINT,
totalNonContiguousDeletedMessagesRange BIGINT,
subscriptionType varchar(16),
blockedSubscriptionOnUnackedMsgs BOOLEAN,
timestamp BIGINT,
UNIQUE (topicStatsId, subscription)
) ;

CREATE TABLE IF NOT EXISTS consumersStats (
consumerStatsId BIGINT PRIMARY KEY AUTO_INCREMENT,
consumer varchar(255) NOT NULL,
topicStatsId BIGINT NOT NUll,
replicationStatsId BIGINT,
subscriptionStatsId BIGINT,
address varchar(255),
availablePermits BIGINT,
connectedSince varchar(255),
msgRateOut double,
msgThroughputOut double,
msgRateRedeliver double,
clientVersion varchar(36),
timestamp BIGINT,
metadata text
) ;
Loading