Skip to content

Commit

Permalink
adding metrics to presto pulsar connector (#2631)
Browse files Browse the repository at this point in the history
* adding metrics to presto pulsar connector

* rename batch size

* adding comments

* refactoring metrics

* modifying bytes read metric

* fixing tests

* deleting tmp file

* adding jars to LICENSE
  • Loading branch information
jerrypeng committed Oct 4, 2018
1 parent 606b4de commit 474645f
Show file tree
Hide file tree
Showing 10 changed files with 529 additions and 31 deletions.
2 changes: 1 addition & 1 deletion conf/presto/catalog/pulsar.properties
Expand Up @@ -24,7 +24,7 @@ pulsar.broker-service-url=http://localhost:8080
# URI of Zookeeper cluster
pulsar.zookeeper-uri=localhost:2181
# minimum number of entries to read at a single time
pulsar.entry-read-batch-size=100
pulsar.max-entry-read-batch-size=100
# default number of splits to use per query
pulsar.target-num-splits=2
# max message queue size
Expand Down
5 changes: 5 additions & 0 deletions distribution/server/src/assemble/LICENSE.bin.txt
Expand Up @@ -387,6 +387,7 @@ The Apache Software License, Version 2.0
- org.apache.distributedlog-distributedlog-core-4.7.2-tests.jar
- org.apache.distributedlog-distributedlog-core-4.7.2.jar
- org.apache.distributedlog-distributedlog-protocol-4.7.2.jar
- org.apache.bookkeeper.stats-codahale-metrics-provider-4.7.2.jar
* LZ4 -- net.jpountz.lz4-lz4-1.3.0.jar
* AsyncHttpClient
- org.asynchttpclient-async-http-client-2.1.0-alpha26.jar
Expand Down Expand Up @@ -459,6 +460,10 @@ The Apache Software License, Version 2.0
- io.kubernetes-client-java-proto-2.0.0.jar
* Joda Time
- joda-time-joda-time-2.9.3.jar
* Dropwizard
- io.dropwizard.metrics-metrics-core-3.1.0.jar
- io.dropwizard.metrics-metrics-graphite-3.1.0.jar
- io.dropwizard.metrics-metrics-jvm-3.1.0.jar


BSD 3-clause "New" or "Revised" License
Expand Down
12 changes: 12 additions & 0 deletions managed-ledger/pom.xml
Expand Up @@ -38,6 +38,18 @@
<artifactId>bookkeeper-server</artifactId>
</dependency>

<dependency>
<groupId>org.apache.bookkeeper.stats</groupId>
<artifactId>prometheus-metrics-provider</artifactId>
<version>${bookkeeper.version}</version>
</dependency>

<dependency>
<groupId>org.apache.bookkeeper.stats</groupId>
<artifactId>codahale-metrics-provider</artifactId>
<version>${bookkeeper.version}</version>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
Expand Down
9 changes: 9 additions & 0 deletions pulsar-sql/presto-distribution/LICENSE
Expand Up @@ -382,6 +382,15 @@ The Apache Software License, Version 2.0
- validation-api-1.1.0.Final.jar
* Objectsize
- objectsize-0.0.12.jar
* Dropwizard Metrics
- metrics-core-3.1.0.jar
- metrics-graphite-3.1.0.jar
- metrics-jvm-3.1.0.jar
* Prometheus
- simpleclient-0.0.23.jar
- simpleclient_common-0.0.23.jar
- simpleclient_hotspot-0.0.23.jar
- simpleclient_servlet-0.0.23.jar

Protocol Buffers License
* Protocol Buffers
Expand Down
Expand Up @@ -22,15 +22,27 @@
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsProvider;

public class PulsarConnectorCache {

private static PulsarConnectorCache instance;

private final ManagedLedgerFactory managedLedgerFactory;

private final StatsProvider statsProvider;

private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
this.managedLedgerFactory = initManagedLedgerFactory(pulsarConnectorConfig);
this.statsProvider = PulsarConnectorUtils.createInstance(pulsarConnectorConfig.getStatsProvider(),
StatsProvider.class, getClass().getClassLoader());

// start stats provider
ClientConfiguration clientConfiguration = new ClientConfiguration();

pulsarConnectorConfig.getStatsProviderConfigs().forEach((key, value) -> clientConfiguration.setProperty(key, value));

this.statsProvider.start(clientConfiguration);
}

public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
Expand All @@ -55,9 +67,14 @@ public ManagedLedgerFactory getManagedLedgerFactory() {
return managedLedgerFactory;
}

public StatsProvider getStatsProvider() {
return statsProvider;
}

public static void shutdown() throws ManagedLedgerException, InterruptedException {
if (instance != null) {
instance.managedLedgerFactory.shutdown();
instance.statsProvider.stop();
instance = null;
}
}
Expand Down
Expand Up @@ -18,11 +18,17 @@
*/
package org.apache.pulsar.sql.presto;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.airlift.configuration.Config;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsProvider;

import javax.validation.constraints.NotNull;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;

public class PulsarConnectorConfig implements AutoCloseable {

Expand All @@ -32,6 +38,8 @@ public class PulsarConnectorConfig implements AutoCloseable {
private int targetNumSplits = 2;
private int maxSplitMessageQueueSize = 10000;
private int maxSplitEntryQueueSize = 1000;
private String statsProvider = NullStatsProvider.class.getName();
private Map<String, String> statsProviderConfigs = new HashMap<>();
private PulsarAdmin pulsarAdmin;

@NotNull
Expand All @@ -57,12 +65,12 @@ public PulsarConnectorConfig setZookeeperUri(String zookeeperUri) {
}

@NotNull
public int getEntryReadBatchSize() {
public int getMaxEntryReadBatchSize() {
return this.entryReadBatchSize;
}

@Config("pulsar.entry-read-batch-size")
public PulsarConnectorConfig setEntryReadBatchSize(int batchSize) {
@Config("pulsar.max-entry-read-batch-size")
public PulsarConnectorConfig setMaxEntryReadBatchSize(int batchSize) {
this.entryReadBatchSize = batchSize;
return this;
}
Expand Down Expand Up @@ -100,6 +108,29 @@ public PulsarConnectorConfig setMaxSplitEntryQueueSize(int maxSplitEntryQueueSiz
return this;
}

@NotNull
public String getStatsProvider() {
return statsProvider;
}

@Config("pulsar.stats-provider")
public PulsarConnectorConfig setStatsProvider(String statsProvider) {
this.statsProvider = statsProvider;
return this;
}

@NotNull
public Map<String, String> getStatsProviderConfigs() {
return statsProviderConfigs;
}

@Config("pulsar.stats-provider-configs")
public PulsarConnectorConfig setStatsProviderConfigs(String statsProviderConfigs) {
Type type = new TypeToken<Map<String, String>>(){}.getType();
this.statsProviderConfigs = new Gson().fromJson(statsProviderConfigs, type);
return this;
}

@NotNull
public PulsarAdmin getPulsarAdmin() throws PulsarClientException {
if (this.pulsarAdmin == null) {
Expand Down

0 comments on commit 474645f

Please sign in to comment.