Skip to content

Querying Sketched Data from Sustain DHT

Thilina Buddhika edited this page Jun 13, 2020 · 2 revisions

Sketched data can be retrieved from the Sustain DHT using its query API. We use gRPC and Protocol Buffers to implement the query API. The definition of the gRPC service and message types are available here.

A working example of a query client is available in the samples module.

You need JDK 1.8 or above to compile and run the code.

To run the example query client;

  1. Download/clone the source

  2. We use Gradle to build the source. It is recommended to use the embedded the Gradle wrapper to avoid any Gradle version related issues.
    cd synopsis-dht-master
    ./gradlew build

  3. Unzip the ingestion-client distribution.
    cd distribution/build/distributions
    unzip synopsis-ingestion-client-0.1.zip

  4. Run the sample query client.
    cd synopsis-ingestion-client-0.1/bin
    sh run_class sh run_class sustain.synopsis.samples.client.query.NOAAQueryClient proxy_hostname:proxy_port

Following are the key steps in implementing a query client.

Initializing a client stub

Channel channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
TargetedQueryServiceGrpc.TargetedQueryServiceBlockingStub stub = TargetedQueryServiceGrpc.newBlockingStub(channel);

If your setup contains more than one storage node, it is recommended to use a proxy node to query the DHT. It will consolidate streams from all DHT nodes into a single stream simplifying the client implementation.

Constructing the query request

Query requests are protocol buffer messages following the TargetQueryRequest message definition in the service definition.

Temporal constraints

You can include multiple temporal intervals in a single query request. Multiple predicates can be combined using combine operators. All temporal predicate values should be provided as epochs in UTC time zone. Following example shows how to construct a temporal interval for [00:00 January 01 2015, 23:59 Jan 31, 2015).

// t >= 00:00 Jan 01, 2015`  
Predicate fromPredicate = Predicate.newBuilder().setComparisonOp(Predicate.ComparisonOperator.GREATER_THAN_OR_EQUAL)
                            .setIntegerValue(CommonUtil.localDateTimeToEpoch(LocalDateTime.of(2015, Month.JANUARY, 1, 0,0))).build();`  
        
// t < 23:59 Jan 31, 2015  
Predicate toPredicate = Predicate.newBuilder().setComparisonOp(Predicate.ComparisonOperator.LESS_THAN)
                                         .setIntegerValue(CommonUtil.localDateTimeToEpoch(LocalDateTime.of(2015, Month.JANUARY, 31, 23, 59))).build();  
        
// combine both predicates such that 00:00 Jan 01, 2015 =< t < 23:59 Jan 31, 2015
Expression temporalExp = Expression.newBuilder().setPredicate1(fromPredicate).setCombineOp(Expression.CombineOperator.AND)
                          .setPredicate2(toPredicate).build();

Spatial constraints

Spatial scopes can be provided as a list of geohash prefixes.

Predicate spatialPredicate = Predicate.newBuilder().setStringValue(geohash).build();

Other feature constraints

Support for other feature constraints will be added through the range query support in future.

Combining constraints

TargetQueryRequest targetQueryRequest =
                    TargetQueryRequest.newBuilder().setDataset("noaa_2015_jan").addSpatialScope(spatialPredicate)
                                      .setTemporalScope(temporalExp).build();

Executing the query request

Query responses are sent as a stream of TargetQueryResponse messages --- each containing one or more Strands.

Iterator<TargetQueryResponse> queryResponseIterator = stub.query(targetQueryRequest);

Strands are a summary of the a set of similar observations occurred within a certain temporal and spatial proximity.

while (queryResponseIterator.hasNext()) {
    TargetQueryResponse response = queryResponseIterator.next();
    for (ProtoBuffSerializedStrand strand : response.getStrandsList()) {
        System.out.println("Geohash: " + strand.getGeohash());
        System.out.println("Start TS: " + strand.getStartTS());
        System.out.println("Features: " + strand.getFeaturesList());
        System.out.println("Observation Count: " + strand.getObservationCount());
        /* Only need to access data container values if the
                     observation count is > 1. Otherwise you can use the feature list.*/
        if (strand.getObservationCount() > 1) {
            System.out.println("Min values: " + strand.getMinList());
            System.out.println("Max values: " + strand.getMaxList());
            System.out.println("M2 values: " + strand.getM2List());
            System.out.println("S2 values: " + strand.getS2List());
        }
    }
}