From f71286a44843d1f322ff2d8e2bfee80b6ac0ca69 Mon Sep 17 00:00:00 2001 From: vvaks Date: Wed, 27 Sep 2017 13:23:23 -0400 Subject: [PATCH 1/5] NIFI-4428: - Implement PutDruid Processor and Controller --- .../nifi-druid-bundle-nar/pom.xml | 39 ++ .../pom.xml | 37 ++ .../nifi-druid-controller-service-api/pom.xml | 37 ++ .../api/DruidTranquilityService.java | 28 ++ .../nifi-druid-controller-service/pom.xml | 57 +++ .../DruidTranquilityController.java | 416 ++++++++++++++++++ ...g.apache.nifi.controller.ControllerService | 16 + .../nifi-druid-processors/pom.xml | 56 +++ .../org/apache/nifi/processors/PutDruid.java | 206 +++++++++ .../org.apache.nifi.processor.Processor | 16 + nifi-nar-bundles/nifi-druid-bundle/pom.xml | 82 ++++ 11 files changed, 990 insertions(+) create mode 100644 nifi-nar-bundles/nifi-druid-bundle/nifi-druid-bundle-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml create mode 100644 nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/src/main/java/org/apache/nifi/controller/api/DruidTranquilityService.java create mode 100644 nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml create mode 100644 nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java create mode 100644 nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService create mode 100644 nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml create mode 100644 nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java create mode 100644 nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor create mode 100644 nifi-nar-bundles/nifi-druid-bundle/pom.xml diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-bundle-nar/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-bundle-nar/pom.xml new file mode 100644 index 000000000000..b872cf9062d2 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-bundle-nar/pom.xml @@ -0,0 +1,39 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-druid-bundle + 1.0-SNAPSHOT + + + nifi-druid-bundle-nar + nar + + + + org.apache.nifi + nifi-druid-controller-service + 1.0-SNAPSHOT + + + org.apache.nifi + nifi-druid-processors + 1.0-SNAPSHOT + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml new file mode 100644 index 000000000000..1439062ef0f9 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml @@ -0,0 +1,37 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-druid-bundle + 1.0-SNAPSHOT + + + nifi-druid-controller-service-api-nar + nar + + + 1.1.1 + + + + org.apache.nifi + nifi-druid-controller-service-api + 1.0-SNAPSHOT + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml new file mode 100644 index 000000000000..4c48d7b3e20b --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml @@ -0,0 +1,37 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-druid-bundle + 1.0-SNAPSHOT + + + nifi-druid-controller-service-api + + + 1.1.1 + + + + + org.apache.nifi + nifi-api + ${nifi.version} + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/src/main/java/org/apache/nifi/controller/api/DruidTranquilityService.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/src/main/java/org/apache/nifi/controller/api/DruidTranquilityService.java new file mode 100644 index 000000000000..705303f4eda3 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/src/main/java/org/apache/nifi/controller/api/DruidTranquilityService.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.api; + +import java.util.Map; + +import org.apache.nifi.controller.ControllerService; + +import com.metamx.tranquility.tranquilizer.Tranquilizer; + +public interface DruidTranquilityService extends ControllerService{ + Tranquilizer> getTranquilizer(); +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml new file mode 100644 index 000000000000..b15aff0db39f --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml @@ -0,0 +1,57 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-druid-bundle + 1.0-SNAPSHOT + + + nifi-druid-controller-service + jar + + + 1.1.1 + + + + org.apache.nifi + nifi-api + ${nifi.version} + + + org.apache.nifi + nifi-utils + ${nifi.version} + + + org.apache.nifi + nifi-processor-utils + ${nifi.version} + + + org.apache.nifi + nifi-druid-controller-service-api + 1.0-SNAPSHOT + + + io.druid + tranquility-core_2.10 + 0.8.2 + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java new file mode 100644 index 000000000000..2b5023d9abf4 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.codehaus.jackson.map.ObjectMapper; +import org.joda.time.DateTime; +import org.joda.time.Period; + +import com.metamx.common.Granularity; +import com.metamx.tranquility.beam.Beam; +import com.metamx.tranquility.beam.ClusteredBeamTuning; +import com.metamx.tranquility.druid.DruidBeamConfig; +import com.metamx.tranquility.druid.DruidBeams; +import com.metamx.tranquility.druid.DruidDimensions; +import com.metamx.tranquility.druid.DruidEnvironment; +import com.metamx.tranquility.druid.DruidLocation; +import com.metamx.tranquility.druid.DruidRollup; +import com.metamx.tranquility.tranquilizer.Tranquilizer; +import com.metamx.tranquility.typeclass.Timestamper; + +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularity; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.aggregation.DoubleMaxAggregatorFactory; +import io.druid.query.aggregation.DoubleMinAggregatorFactory; +import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.query.aggregation.LongMaxAggregatorFactory; +import io.druid.query.aggregation.LongMinAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; + +@Tags({"Druid","Timeseries","OLAP","ingest"}) +@CapabilityDescription("Asyncronously sends flowfiles to Druid Indexing Task using Tranquility API. " + + "If aggregation and roll-up of data is required, an Aggregator JSON desriptor needs to be provided." + + "Details on how desribe aggregation using JSON can be found at: http://druid.io/docs/latest/querying/aggregations.html") +public class DruidTranquilityController extends AbstractControllerService implements org.apache.nifi.controller.api.DruidTranquilityService{ + private String firehosePattern = "druid:firehose:%s"; + private int clusterPartitions = 1; + private int clusterReplication = 1 ; + private String indexRetryPeriod = "PT10M"; + + private Tranquilizer tranquilizer = null; + + public static final PropertyDescriptor DATASOURCE = new PropertyDescriptor.Builder() + .name("data_source") + .description("Druid Data Source") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .build(); + + public static final PropertyDescriptor CONNECT_STRING = new PropertyDescriptor.Builder() + .name("zk_connect_string") + .description("ZK Connect String for Druid ") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor DRUID_INDEX_SERVICE_PATH = new PropertyDescriptor.Builder() + .name("index_service_path") + .description("Druid Index Service path as defined via the Druid Overlord druid.service property.") + .required(true) + .defaultValue("druid/overlord") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor DRUID_DISCOVERY_PATH = new PropertyDescriptor.Builder() + .name("discovery_path") + .description("Druid Discovery Path as configured in Druid Common druid.discovery.curator.path property") + .required(true) + .defaultValue("/druid/discovery") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TIMESTAMP_FIELD = new PropertyDescriptor.Builder() + .name("timestamp_field") + .description("The name of the field that will be used as the timestamp. Should be in ISO format.") + .required(true) + //.allowableValues("json", "xml") + .defaultValue("timestamp") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor AGGREGATOR_JSON = new PropertyDescriptor.Builder() + .name("aggregators_descriptor") + .description("Tranquility compliant JSON string that defines what aggregators to apply on ingest." + + "Example: " + + "[" + + "{" + + " \"type\" : \"count\"," + + " \"name\" : \"count\"," + + "}," + + "{" + + " \"name\" : \"value_sum\"," + + " \"type\" : \"doubleSum\"," + + " \"fieldName\" : \"value\"" + + "}," + + "{" + + " \"fieldName\" : \"value\"," + + " \"name\" : \"value_min\"," + + " \"type\" : \"doubleMin\"" + + "}," + + "{" + + " \"type\" : \"doubleMax\"," + + " \"name\" : \"value_max\"," + + " \"fieldName\" : \"value\"" + + "}" + + "]") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor DIMENSIONS_LIST = new PropertyDescriptor.Builder() + .name("dimensions_list") + .description("A comma separated list of field names that will be stored as dimensions on ingest.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor SEGMENT_GRANULARITY = new PropertyDescriptor.Builder() + .name("segment_granularity") + .description("Time unit by which to group and aggregate/rollup events.") + .required(true) + .allowableValues("NONE","SECOND","MINUTE","TEN_MINUTE","HOUR","DAY","MONTH","YEAR") + .defaultValue("MINUTE") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor QUERY_GRANULARITY = new PropertyDescriptor.Builder() + .name("query_granularity") + .description("Time unit by which to group and aggregate/rollup events.") + .required(true) + .allowableValues("NONE","SECOND","MINUTE","TEN_MINUTE","HOUR","DAY","MONTH","YEAR") + .defaultValue("TEN_MINUTE") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor WINDOW_PERIOD = new PropertyDescriptor.Builder() + .name("window_period") + .description("Grace period to allow late arriving events for real time ingest.") + .required(true) + .allowableValues("PT1M","PT10M","PT60M") + .defaultValue("PT10M") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private static final List properties; + + static{ + final List props = new ArrayList<>(); + props.add(DATASOURCE); + props.add(CONNECT_STRING); + props.add(DRUID_INDEX_SERVICE_PATH); + props.add(DRUID_DISCOVERY_PATH); + props.add(DIMENSIONS_LIST); + props.add(AGGREGATOR_JSON); + props.add(SEGMENT_GRANULARITY); + props.add(QUERY_GRANULARITY); + props.add(WINDOW_PERIOD); + props.add(TIMESTAMP_FIELD); + + properties = Collections.unmodifiableList(props); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @OnEnabled + public void onConfigured(final ConfigurationContext context) throws InitializationException{ + getLogger().info("Starting Druid Tranquility Controller Service..."); + + final String dataSource = context.getProperty(DATASOURCE).getValue(); + final String zkConnectString = context.getProperty(CONNECT_STRING).getValue(); + final String indexService = context.getProperty(DRUID_INDEX_SERVICE_PATH).getValue(); + final String discoveryPath = context.getProperty(DRUID_DISCOVERY_PATH).getValue(); + final String timestampField = context.getProperty(TIMESTAMP_FIELD).getValue(); + final String segmentGranularity = context.getProperty(SEGMENT_GRANULARITY).getValue(); + final String queryGranularity = context.getProperty(QUERY_GRANULARITY).getValue(); + final String windowPeriod = context.getProperty(WINDOW_PERIOD).getValue(); + final String aggregatorJSON = context.getProperty(AGGREGATOR_JSON).getValue(); + final String dimensionsStringList = context.getProperty(DIMENSIONS_LIST).getValue(); + + final List dimensions = getDimensions(dimensionsStringList); + final List aggregator = getAggregatorList(aggregatorJSON); + + final Timestamper> timestamper = new Timestamper>(){ + private static final long serialVersionUID = 1L; + + @Override + public DateTime timestamp(Map theMap){ + return new DateTime(theMap.get(timestampField)); + } + }; + + Iterator aggIterator = aggregator.iterator(); + AggregatorFactory currFactory; + getLogger().debug("Number of Aggregations Defined: " + aggregator.size()); + while(aggIterator.hasNext()){ + currFactory = aggIterator.next(); + getLogger().debug("Verifying Aggregator Definition"); + getLogger().debug("Aggregator Name: " + currFactory.getName()); + getLogger().debug("Aggregator Type: " + currFactory.getTypeName()); + getLogger().debug("Aggregator Req Fields: " + currFactory.requiredFields()); + } + // Tranquility uses ZooKeeper (through Curator) for coordination. + final CuratorFramework curator = CuratorFrameworkFactory + .builder() + .connectString(zkConnectString) + .retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000)) + .build(); + curator.start(); + + // The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default, + // Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp. + final TimestampSpec timestampSpec = new TimestampSpec(timestampField, "auto", null); + + final Beam> beam = DruidBeams.builder(timestamper) + .curator(curator) + .discoveryPath(discoveryPath) + .location(DruidLocation.create(DruidEnvironment.create(indexService, firehosePattern),dataSource)) + .timestampSpec(timestampSpec) + .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregator, QueryGranularity.fromString(queryGranularity))) + .tuning( + ClusteredBeamTuning + .builder() + //.segmentGranularity(Granularity.MINUTE) + .segmentGranularity(getSegmentGranularity(segmentGranularity)) + .windowPeriod(new Period(windowPeriod)) + .partitions(clusterPartitions) + .replicants(clusterReplication) + .build() + ) + .druidBeamConfig( + DruidBeamConfig + .builder() + .indexRetryPeriod(new Period(indexRetryPeriod)) + .build()) + .buildBeam(); + + tranquilizer = Tranquilizer.builder() + .maxBatchSize(10000000) + .maxPendingBatches(1000) + .lingerMillis(1000) + .blockOnFull(true) + .build(beam); + + tranquilizer.start(); + } + + public Tranquilizer getTranquilizer(){ + return tranquilizer; + } + + private List> parseJsonString(String aggregatorJson) { + ObjectMapper mapper = new ObjectMapper(); + List> aggSpecList = null; + try { + getLogger().debug("Druid Tranquility Service: Aggregator Spec as String: " + aggregatorJson); + aggSpecList = mapper.readValue(aggregatorJson, List.class); + getLogger().debug("Druid Tranquility Service: Aggregator Spec as List: " + aggSpecList); + return aggSpecList; + } catch (IOException e) { + throw new IllegalArgumentException("Exception while parsing the aggregratorJson"); + } + } + + private List getDimensions(String dimensionStringList){ + List dimensionList = new LinkedList(Arrays.asList(dimensionStringList.split(","))); + return dimensionList; + } + + private List getAggregatorList(String aggregatorJSON) { + List aggregatorList = new LinkedList<>(); + List> aggregatorInfo = parseJsonString(aggregatorJSON); + for (Map aggregator : aggregatorInfo) { + + if (aggregator.get("type").equalsIgnoreCase("count")) { + //Map map = aggregator.get("count"); + aggregatorList.add(getCountAggregator(aggregator)); + } + else if (aggregator.get("type").equalsIgnoreCase("doublesum")) { + //Map map = aggregator.get("doublesum"); + aggregatorList.add(getDoubleSumAggregator(aggregator)); + } + else if (aggregator.get("type").equalsIgnoreCase("doublemax")) { + //Map map = aggregator.get("doublemax"); + aggregatorList.add(getDoubleMaxAggregator(aggregator)); + } + else if (aggregator.get("type").equalsIgnoreCase("doublemin")) { + //Map map = aggregator.get("doublemin"); + aggregatorList.add(getDoubleMinAggregator(aggregator)); + } + else if (aggregator.get("type").equalsIgnoreCase("longsum")) { + //Map map = aggregator.get("longsum"); + aggregatorList.add(getLongSumAggregator(aggregator)); + } + else if (aggregator.get("type").equalsIgnoreCase("longmax")) { + //Map map = aggregator.get("longmax"); + aggregatorList.add(getLongMaxAggregator(aggregator)); + } + else if (aggregator.get("type").equalsIgnoreCase("longmin")) { + //Map map = aggregator.get("longmin"); + aggregatorList.add(getLongMinAggregator(aggregator)); + } + } + + return aggregatorList; + } + + private AggregatorFactory getLongMinAggregator(Map map) { + return new LongMinAggregatorFactory(map.get("name"), map.get("fieldName")); + } + + private AggregatorFactory getLongMaxAggregator(Map map) { + return new LongMaxAggregatorFactory(map.get("name"), map.get("fieldName")); + } + + private AggregatorFactory getLongSumAggregator(Map map) { + return new LongSumAggregatorFactory(map.get("name"), map.get("fieldName")); + } + + private AggregatorFactory getDoubleMinAggregator(Map map) { + return new DoubleMinAggregatorFactory(map.get("name"), map.get("fieldName")); + } + + private AggregatorFactory getDoubleMaxAggregator(Map map) { + return new DoubleMaxAggregatorFactory(map.get("name"), map.get("fieldName")); + } + + private AggregatorFactory getDoubleSumAggregator(Map map) { + return new DoubleSumAggregatorFactory(map.get("name"), map.get("fieldName")); + } + + private AggregatorFactory getCountAggregator(Map map) { + return new CountAggregatorFactory(map.get("name")); + } + + private Granularity getSegmentGranularity(String segmentGranularity) { + Granularity granularity = Granularity.HOUR; + + switch (segmentGranularity) { + case "SECOND": + granularity = Granularity.SECOND; + break; + case "MINUTE": + granularity = Granularity.MINUTE; + break; + case "FIVE_MINUTE": + granularity = Granularity.FIVE_MINUTE; + break; + case "TEN_MINUTE": + granularity = Granularity.TEN_MINUTE; + break; + case "FIFTEEN_MINUTE": + granularity = Granularity.FIFTEEN_MINUTE; + break; + case "HOUR": + granularity = Granularity.HOUR; + break; + case "SIX_HOUR": + granularity = Granularity.SIX_HOUR; + break; + case "DAY": + granularity = Granularity.DAY; + break; + case "WEEK": + granularity = Granularity.WEEK; + break; + case "MONTH": + granularity = Granularity.MONTH; + break; + case "YEAR": + granularity = Granularity.YEAR; + break; + default: + break; + } + return granularity; + } +} diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 000000000000..53d6d0626dc7 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.nifi.controller.DruidTranquilityController \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml new file mode 100644 index 000000000000..5cf0456b3bb6 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml @@ -0,0 +1,56 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-druid-bundle + 1.0-SNAPSHOT + + + nifi-druid-processors + + + 1.1.1 + + + + org.apache.nifi + nifi-api + ${nifi.version} + + + org.apache.nifi + nifi-utils + ${nifi.version} + + + org.apache.nifi + nifi-processor-utils + ${nifi.version} + + + org.apache.nifi + nifi-druid-controller-service-api + 1.0-SNAPSHOT + + + io.druid + tranquility-core_2.10 + 0.8.2 + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java new file mode 100644 index 000000000000..630b94fa1861 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java @@ -0,0 +1,206 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.stream.io.StreamUtils; + +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; + +import org.apache.nifi.controller.api.DruidTranquilityService; +import com.metamx.tranquility.tranquilizer.MessageDroppedException; +import com.metamx.tranquility.tranquilizer.Tranquilizer; +import com.twitter.util.Await; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; + +import scala.runtime.BoxedUnit; + +@SideEffectFree +@Tags({"Druid","Timeseries","OLAP","ingest"}) +@CapabilityDescription("Sends events to Apache Druid for Indexing. " + + "Leverages Druid Tranquility Controller service." + + "Incoming flow files are expected to contain 1 or many JSON objects, one JSON object per line") +public class PutDruid extends AbstractSessionFactoryProcessor { + + private List properties; + private Set relationships; + private final Map messageStatus = new HashMap(); + + public static final PropertyDescriptor DRUID_TRANQUILITY_SERVICE = new PropertyDescriptor.Builder() + .name("druid_tranquility_service") + .description("Tranquility Service to use for sending events to Druid") + .required(true) + .identifiesControllerService(DruidTranquilityService.class) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("SUCCESS") + .description("Succes relationship") + .build(); + + public static final Relationship REL_FAIL = new Relationship.Builder() + .name("FAIL") + .description("FlowFiles are routed to this relationship when they cannot be parsed") + .build(); + + public static final Relationship REL_DROPPED = new Relationship.Builder() + .name("DROPPED") + .description("FlowFiles are routed to this relationship when they are outside of the configured time window, timestamp format is invalid, ect...") + .build(); + + public void init(final ProcessorInitializationContext context){ + List properties = new ArrayList<>(); + properties.add(DRUID_TRANQUILITY_SERVICE); + this.properties = Collections.unmodifiableList(properties); + + Set relationships = new HashSet(); + relationships.add(REL_SUCCESS); + relationships.add(REL_DROPPED); + relationships.add(REL_FAIL); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships(){ + return relationships; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + //Method breaks down incoming flow file and sends it to Druid Indexing servic + private void processFlowFile(ProcessContext context, ProcessSession session){ + //Get handle on Druid Tranquility session + DruidTranquilityService tranquilityController = context.getProperty(DRUID_TRANQUILITY_SERVICE) + .asControllerService(DruidTranquilityService.class); + Tranquilizer> tranquilizer = tranquilityController.getTranquilizer(); + + final FlowFile flowFile = session.get(); + if (flowFile == null || flowFile.getSize() == 0) { + return; + } + + //Get data from flow file body + final byte[] buffer = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, buffer); + } + }); + + + String contentString = new String(buffer, StandardCharsets.UTF_8); + Map contentMap = null; + + //Create payload array from flow file content, one element per line + String[] messageArray = contentString.split("\\R"); + + //Convert each array element from JSON to HashMap and send to Druid + for(String message: messageArray){ + try { + contentMap = new ObjectMapper().readValue(message, HashMap.class); + //contentMap = new ObjectMapper().readValue(message, HashMap.class); + } catch (JsonParseException e) { + getLogger().error("Error parsing incoming message array in the flowfile body"); + } catch (JsonMappingException e) { + getLogger().error("Error parsing incoming message array in the flowfile body"); + } catch (IOException e) { + getLogger().error("Error parsing incoming message array in the flowfile body"); + } + + getLogger().debug("Tranquilizer Status: " + tranquilizer.status().toString()); + messageStatus.put(flowFile, "pending"); + //Send data element to Druid, Asynch + Future future = tranquilizer.send(contentMap); + getLogger().debug(" Sent Payload to Druid: " + contentMap); + + //Wait for Druid to call back with status + future.addEventListener(new FutureEventListener() { + @Override + public void onFailure(Throwable cause) { + if (cause instanceof MessageDroppedException) { + //This happens when event timestamp targets a Druid Indexing task that has closed (Late Arriving Data) + getLogger().error(" FlowFile Dropped due to MessageDroppedException: " + cause.getMessage() + " : " + cause); + cause.getStackTrace(); + getLogger().error(" Transfering FlowFile to DROPPED relationship"); + session.transfer(flowFile, REL_DROPPED); + } else { + getLogger().error(" FlowFile Processing Failed due to: " + cause.getMessage() + " : " + cause); + cause.printStackTrace(); + getLogger().error(" Transfering FlowFile to FAIL relationship"); + session.transfer(flowFile, REL_FAIL); + } + } + + @Override + public void onSuccess(Object value) { + getLogger().debug(" FlowFile Processing Success : "+ value.toString()); + session.transfer(flowFile, REL_SUCCESS); + } + }); + + try { + //Wait for result from Druid + //This method will be asynch since this is a SessionFactoryProcessor and OnTrigger will create a new Thread + Await.result(future); + } catch (Exception e) { + e.printStackTrace(); + } + } + //session.transfer(flowFile, REL_SUCCESS); + session.commit(); + } + + public void onTrigger(ProcessContext context, ProcessSessionFactory factory) throws ProcessException { + final ProcessSession session = factory.createSession(); + //Create new Thread to ensure that waiting for callback does not reduce throughput + new Thread(() -> { + processFlowFile(context, session); + }).start(); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 000000000000..4136d5e5a6ba --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.nifi.processors.PutDruid \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/pom.xml new file mode 100644 index 000000000000..f123e3961bf7 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/pom.xml @@ -0,0 +1,82 @@ + + + + 4.0.0 + org.apache.nifi + nifi-druid-bundle + 1.0-SNAPSHOT + pom + + UTF-8 + 1.8 + 1.8 + 1.1.1 + + + nifi-druid-bundle-nar + nifi-druid-controller-service-api-nar + nifi-druid-controller-service-api + nifi-druid-controller-service + nifi-druid-processors + + + + org.apache.nifi + nifi-api + ${nifi.version} + + + org.apache.nifi + nifi-utils + ${nifi.version} + + + org.apache.nifi + nifi-processor-utils + ${nifi.version} + + + io.druid + tranquility-core_2.10 + 0.8.2 + + + commons-configuration + commons-configuration + 1.10 + + + com.sun.jersey + jersey-client + 1.19.1 + + + + + + org.apache.nifi + nifi-nar-maven-plugin + 1.0.0-incubating + true + + + org.apache.maven.plugins + maven-surefire-plugin + 2.15 + + + + \ No newline at end of file From 5f77e50243d68f6e017d75c700d71b67fa5177b5 Mon Sep 17 00:00:00 2001 From: vvaks Date: Wed, 11 Oct 2017 21:26:40 -0400 Subject: [PATCH 2/5] update --- .../nifi-druid-bundle-nar/pom.xml | 6 +++--- .../nifi-druid-controller-service-api-nar/pom.xml | 8 ++++---- .../nifi-druid-controller-service-api/pom.xml | 8 ++++---- .../nifi-druid-controller-service/pom.xml | 14 +++++++------- .../controller/DruidTranquilityController.java | 10 ---------- .../nifi-druid-processors/pom.xml | 14 +++++++------- .../java/org/apache/nifi/processors/PutDruid.java | 15 +++++---------- nifi-nar-bundles/nifi-druid-bundle/pom.xml | 10 +++++----- 8 files changed, 35 insertions(+), 50 deletions(-) diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-bundle-nar/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-bundle-nar/pom.xml index b872cf9062d2..3c0403a71b04 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-bundle-nar/pom.xml +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-bundle-nar/pom.xml @@ -18,7 +18,7 @@ org.apache.nifi nifi-druid-bundle - 1.0-SNAPSHOT + 1.5.0-SNAPSHOT nifi-druid-bundle-nar @@ -28,12 +28,12 @@ org.apache.nifi nifi-druid-controller-service - 1.0-SNAPSHOT + 1.5.0-SNAPSHOT org.apache.nifi nifi-druid-processors - 1.0-SNAPSHOT + 1.5.0-SNAPSHOT \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml index 1439062ef0f9..62982b652a43 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml @@ -18,20 +18,20 @@ org.apache.nifi nifi-druid-bundle - 1.0-SNAPSHOT + 1.5.0-SNAPSHOT nifi-druid-controller-service-api-nar nar - + org.apache.nifi nifi-druid-controller-service-api - 1.0-SNAPSHOT + 1.5.0-SNAPSHOT \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml index 4c48d7b3e20b..d5e9643e22b7 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml @@ -18,20 +18,20 @@ org.apache.nifi nifi-druid-bundle - 1.0-SNAPSHOT + 1.5.0-SNAPSHOT nifi-druid-controller-service-api - + org.apache.nifi nifi-api - ${nifi.version} + 1.3.0 \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml index b15aff0db39f..253ca8b16036 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml @@ -18,35 +18,35 @@ org.apache.nifi nifi-druid-bundle - 1.0-SNAPSHOT + 1.5.0-SNAPSHOT nifi-druid-controller-service jar - + org.apache.nifi nifi-api - ${nifi.version} + 1.3.0 org.apache.nifi nifi-utils - ${nifi.version} + 1.3.0 org.apache.nifi nifi-processor-utils - ${nifi.version} + 1.3.0 org.apache.nifi nifi-druid-controller-service-api - 1.0-SNAPSHOT + 1.5.0-SNAPSHOT io.druid diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java index 2b5023d9abf4..ffe4e6359686 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java @@ -259,7 +259,6 @@ public DateTime timestamp(Map theMap){ .tuning( ClusteredBeamTuning .builder() - //.segmentGranularity(Granularity.MINUTE) .segmentGranularity(getSegmentGranularity(segmentGranularity)) .windowPeriod(new Period(windowPeriod)) .partitions(clusterPartitions) @@ -291,9 +290,7 @@ private List> parseJsonString(String aggregatorJson) { ObjectMapper mapper = new ObjectMapper(); List> aggSpecList = null; try { - getLogger().debug("Druid Tranquility Service: Aggregator Spec as String: " + aggregatorJson); aggSpecList = mapper.readValue(aggregatorJson, List.class); - getLogger().debug("Druid Tranquility Service: Aggregator Spec as List: " + aggSpecList); return aggSpecList; } catch (IOException e) { throw new IllegalArgumentException("Exception while parsing the aggregratorJson"); @@ -311,31 +308,24 @@ private List getAggregatorList(String aggregatorJSON) { for (Map aggregator : aggregatorInfo) { if (aggregator.get("type").equalsIgnoreCase("count")) { - //Map map = aggregator.get("count"); aggregatorList.add(getCountAggregator(aggregator)); } else if (aggregator.get("type").equalsIgnoreCase("doublesum")) { - //Map map = aggregator.get("doublesum"); aggregatorList.add(getDoubleSumAggregator(aggregator)); } else if (aggregator.get("type").equalsIgnoreCase("doublemax")) { - //Map map = aggregator.get("doublemax"); aggregatorList.add(getDoubleMaxAggregator(aggregator)); } else if (aggregator.get("type").equalsIgnoreCase("doublemin")) { - //Map map = aggregator.get("doublemin"); aggregatorList.add(getDoubleMinAggregator(aggregator)); } else if (aggregator.get("type").equalsIgnoreCase("longsum")) { - //Map map = aggregator.get("longsum"); aggregatorList.add(getLongSumAggregator(aggregator)); } else if (aggregator.get("type").equalsIgnoreCase("longmax")) { - //Map map = aggregator.get("longmax"); aggregatorList.add(getLongMaxAggregator(aggregator)); } else if (aggregator.get("type").equalsIgnoreCase("longmin")) { - //Map map = aggregator.get("longmin"); aggregatorList.add(getLongMinAggregator(aggregator)); } } diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml index 5cf0456b3bb6..4e536d758490 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml @@ -18,34 +18,34 @@ org.apache.nifi nifi-druid-bundle - 1.0-SNAPSHOT + 1.5.0-SNAPSHOT nifi-druid-processors - + org.apache.nifi nifi-api - ${nifi.version} + 1.3.0 org.apache.nifi nifi-utils - ${nifi.version} + 1.3.0 org.apache.nifi nifi-processor-utils - ${nifi.version} + 1.3.0 org.apache.nifi nifi-druid-controller-service-api - 1.0-SNAPSHOT + 1.5.0-SNAPSHOT io.druid diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java index 630b94fa1861..648627c81e54 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java @@ -143,15 +143,10 @@ public void process(final InputStream in) throws IOException { //Convert each array element from JSON to HashMap and send to Druid for(String message: messageArray){ try { - contentMap = new ObjectMapper().readValue(message, HashMap.class); - //contentMap = new ObjectMapper().readValue(message, HashMap.class); - } catch (JsonParseException e) { - getLogger().error("Error parsing incoming message array in the flowfile body"); - } catch (JsonMappingException e) { - getLogger().error("Error parsing incoming message array in the flowfile body"); - } catch (IOException e) { - getLogger().error("Error parsing incoming message array in the flowfile body"); - } + contentMap = new ObjectMapper().readValue(message, HashMap.class); + } catch (IOException e1) { + getLogger().error("Error parsing incoming message array in the flowfile body"); + } getLogger().debug("Tranquilizer Status: " + tranquilizer.status().toString()); messageStatus.put(flowFile, "pending"); @@ -189,7 +184,7 @@ public void onSuccess(Object value) { //This method will be asynch since this is a SessionFactoryProcessor and OnTrigger will create a new Thread Await.result(future); } catch (Exception e) { - e.printStackTrace(); + getLogger().error(" Caught exception while waiting for result of put request: " + e.getMessage()); } } //session.transfer(flowFile, REL_SUCCESS); diff --git a/nifi-nar-bundles/nifi-druid-bundle/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/pom.xml index f123e3961bf7..c34571a31de2 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-druid-bundle/pom.xml @@ -17,13 +17,13 @@ 4.0.0 org.apache.nifi nifi-druid-bundle - 1.0-SNAPSHOT + 1.5.0-SNAPSHOT pom UTF-8 1.8 1.8 - 1.1.1 + nifi-druid-bundle-nar @@ -36,17 +36,17 @@ org.apache.nifi nifi-api - ${nifi.version} + 1.3.0 org.apache.nifi nifi-utils - ${nifi.version} + 1.3.0 org.apache.nifi nifi-processor-utils - ${nifi.version} + 1.3.0 io.druid From b55db604c4701f73e1529772412d6d9445b1d6a7 Mon Sep 17 00:00:00 2001 From: vvaks Date: Wed, 11 Oct 2017 21:26:40 -0400 Subject: [PATCH 3/5] NIFI-4428: - Implement PutDruid Processor and Controller --- .../nifi-druid-bundle-nar/pom.xml | 6 +++--- .../nifi-druid-controller-service-api-nar/pom.xml | 8 ++++---- .../nifi-druid-controller-service-api/pom.xml | 8 ++++---- .../nifi-druid-controller-service/pom.xml | 14 +++++++------- .../controller/DruidTranquilityController.java | 10 ---------- .../nifi-druid-processors/pom.xml | 14 +++++++------- .../java/org/apache/nifi/processors/PutDruid.java | 15 +++++---------- nifi-nar-bundles/nifi-druid-bundle/pom.xml | 10 +++++----- 8 files changed, 35 insertions(+), 50 deletions(-) diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-bundle-nar/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-bundle-nar/pom.xml index b872cf9062d2..3c0403a71b04 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-bundle-nar/pom.xml +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-bundle-nar/pom.xml @@ -18,7 +18,7 @@ org.apache.nifi nifi-druid-bundle - 1.0-SNAPSHOT + 1.5.0-SNAPSHOT nifi-druid-bundle-nar @@ -28,12 +28,12 @@ org.apache.nifi nifi-druid-controller-service - 1.0-SNAPSHOT + 1.5.0-SNAPSHOT org.apache.nifi nifi-druid-processors - 1.0-SNAPSHOT + 1.5.0-SNAPSHOT \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml index 1439062ef0f9..62982b652a43 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml @@ -18,20 +18,20 @@ org.apache.nifi nifi-druid-bundle - 1.0-SNAPSHOT + 1.5.0-SNAPSHOT nifi-druid-controller-service-api-nar nar - + org.apache.nifi nifi-druid-controller-service-api - 1.0-SNAPSHOT + 1.5.0-SNAPSHOT \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml index 4c48d7b3e20b..d5e9643e22b7 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml @@ -18,20 +18,20 @@ org.apache.nifi nifi-druid-bundle - 1.0-SNAPSHOT + 1.5.0-SNAPSHOT nifi-druid-controller-service-api - + org.apache.nifi nifi-api - ${nifi.version} + 1.3.0 \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml index b15aff0db39f..253ca8b16036 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml @@ -18,35 +18,35 @@ org.apache.nifi nifi-druid-bundle - 1.0-SNAPSHOT + 1.5.0-SNAPSHOT nifi-druid-controller-service jar - + org.apache.nifi nifi-api - ${nifi.version} + 1.3.0 org.apache.nifi nifi-utils - ${nifi.version} + 1.3.0 org.apache.nifi nifi-processor-utils - ${nifi.version} + 1.3.0 org.apache.nifi nifi-druid-controller-service-api - 1.0-SNAPSHOT + 1.5.0-SNAPSHOT io.druid diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java index 2b5023d9abf4..ffe4e6359686 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java @@ -259,7 +259,6 @@ public DateTime timestamp(Map theMap){ .tuning( ClusteredBeamTuning .builder() - //.segmentGranularity(Granularity.MINUTE) .segmentGranularity(getSegmentGranularity(segmentGranularity)) .windowPeriod(new Period(windowPeriod)) .partitions(clusterPartitions) @@ -291,9 +290,7 @@ private List> parseJsonString(String aggregatorJson) { ObjectMapper mapper = new ObjectMapper(); List> aggSpecList = null; try { - getLogger().debug("Druid Tranquility Service: Aggregator Spec as String: " + aggregatorJson); aggSpecList = mapper.readValue(aggregatorJson, List.class); - getLogger().debug("Druid Tranquility Service: Aggregator Spec as List: " + aggSpecList); return aggSpecList; } catch (IOException e) { throw new IllegalArgumentException("Exception while parsing the aggregratorJson"); @@ -311,31 +308,24 @@ private List getAggregatorList(String aggregatorJSON) { for (Map aggregator : aggregatorInfo) { if (aggregator.get("type").equalsIgnoreCase("count")) { - //Map map = aggregator.get("count"); aggregatorList.add(getCountAggregator(aggregator)); } else if (aggregator.get("type").equalsIgnoreCase("doublesum")) { - //Map map = aggregator.get("doublesum"); aggregatorList.add(getDoubleSumAggregator(aggregator)); } else if (aggregator.get("type").equalsIgnoreCase("doublemax")) { - //Map map = aggregator.get("doublemax"); aggregatorList.add(getDoubleMaxAggregator(aggregator)); } else if (aggregator.get("type").equalsIgnoreCase("doublemin")) { - //Map map = aggregator.get("doublemin"); aggregatorList.add(getDoubleMinAggregator(aggregator)); } else if (aggregator.get("type").equalsIgnoreCase("longsum")) { - //Map map = aggregator.get("longsum"); aggregatorList.add(getLongSumAggregator(aggregator)); } else if (aggregator.get("type").equalsIgnoreCase("longmax")) { - //Map map = aggregator.get("longmax"); aggregatorList.add(getLongMaxAggregator(aggregator)); } else if (aggregator.get("type").equalsIgnoreCase("longmin")) { - //Map map = aggregator.get("longmin"); aggregatorList.add(getLongMinAggregator(aggregator)); } } diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml index 5cf0456b3bb6..4e536d758490 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml @@ -18,34 +18,34 @@ org.apache.nifi nifi-druid-bundle - 1.0-SNAPSHOT + 1.5.0-SNAPSHOT nifi-druid-processors - + org.apache.nifi nifi-api - ${nifi.version} + 1.3.0 org.apache.nifi nifi-utils - ${nifi.version} + 1.3.0 org.apache.nifi nifi-processor-utils - ${nifi.version} + 1.3.0 org.apache.nifi nifi-druid-controller-service-api - 1.0-SNAPSHOT + 1.5.0-SNAPSHOT io.druid diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java index 630b94fa1861..648627c81e54 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java @@ -143,15 +143,10 @@ public void process(final InputStream in) throws IOException { //Convert each array element from JSON to HashMap and send to Druid for(String message: messageArray){ try { - contentMap = new ObjectMapper().readValue(message, HashMap.class); - //contentMap = new ObjectMapper().readValue(message, HashMap.class); - } catch (JsonParseException e) { - getLogger().error("Error parsing incoming message array in the flowfile body"); - } catch (JsonMappingException e) { - getLogger().error("Error parsing incoming message array in the flowfile body"); - } catch (IOException e) { - getLogger().error("Error parsing incoming message array in the flowfile body"); - } + contentMap = new ObjectMapper().readValue(message, HashMap.class); + } catch (IOException e1) { + getLogger().error("Error parsing incoming message array in the flowfile body"); + } getLogger().debug("Tranquilizer Status: " + tranquilizer.status().toString()); messageStatus.put(flowFile, "pending"); @@ -189,7 +184,7 @@ public void onSuccess(Object value) { //This method will be asynch since this is a SessionFactoryProcessor and OnTrigger will create a new Thread Await.result(future); } catch (Exception e) { - e.printStackTrace(); + getLogger().error(" Caught exception while waiting for result of put request: " + e.getMessage()); } } //session.transfer(flowFile, REL_SUCCESS); diff --git a/nifi-nar-bundles/nifi-druid-bundle/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/pom.xml index f123e3961bf7..c34571a31de2 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-druid-bundle/pom.xml @@ -17,13 +17,13 @@ 4.0.0 org.apache.nifi nifi-druid-bundle - 1.0-SNAPSHOT + 1.5.0-SNAPSHOT pom UTF-8 1.8 1.8 - 1.1.1 + nifi-druid-bundle-nar @@ -36,17 +36,17 @@ org.apache.nifi nifi-api - ${nifi.version} + 1.3.0 org.apache.nifi nifi-utils - ${nifi.version} + 1.3.0 org.apache.nifi nifi-processor-utils - ${nifi.version} + 1.3.0 io.druid From 621a9eaac4cfe6e86cdc0fa4200be0edcedb9778 Mon Sep 17 00:00:00 2001 From: vvaks Date: Thu, 12 Oct 2017 15:02:10 -0400 Subject: [PATCH 4/5] added provenance report --- .../src/main/java/org/apache/nifi/processors/PutDruid.java | 1 + 1 file changed, 1 insertion(+) diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java index 648627c81e54..e89674213d20 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java @@ -176,6 +176,7 @@ public void onFailure(Throwable cause) { public void onSuccess(Object value) { getLogger().debug(" FlowFile Processing Success : "+ value.toString()); session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().send(flowFile, "Druid Tranquility Service"); } }); From 027f6abbdbb5a9bcf0428dc91986c56c95a9c76a Mon Sep 17 00:00:00 2001 From: vvaks Date: Thu, 12 Oct 2017 15:15:01 -0400 Subject: [PATCH 5/5] added parameters for batch control --- .../DruidTranquilityController.java | 38 +++++++++++++++++-- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java index ffe4e6359686..196b4901dd0c 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java @@ -177,6 +177,32 @@ public class DruidTranquilityController extends AbstractControllerService implem .defaultValue("PT10M") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + + public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder() + .name("maxBatchSize") + .description("Maximum number of messages to send at once.") + .required(true) + .defaultValue("2000") + .addValidator(StandardValidators.NUMBER_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAX_PENDING_BATCHES = new PropertyDescriptor.Builder() + .name("maxPendingBatches") + .description("Maximum number of batches that may be in flight before service blocks and waits for one to finish.") + .required(true) + .defaultValue("5") + .addValidator(StandardValidators.NUMBER_VALIDATOR) + .build(); + + public static final PropertyDescriptor LINGER_MILLIS = new PropertyDescriptor.Builder() + .name("lingerMillis") + .description("Wait this long for batches to collect more messages (up to maxBatchSize) before sending them. " + + "Set to zero to disable waiting. " + + "Set to -1 to always wait for complete batches before sending. ") + .required(true) + .defaultValue("1000") + .addValidator(StandardValidators.NUMBER_VALIDATOR) + .build(); private static final List properties; @@ -192,6 +218,9 @@ public class DruidTranquilityController extends AbstractControllerService implem props.add(QUERY_GRANULARITY); props.add(WINDOW_PERIOD); props.add(TIMESTAMP_FIELD); + props.add(MAX_BATCH_SIZE); + props.add(MAX_PENDING_BATCHES); + props.add(LINGER_MILLIS); properties = Collections.unmodifiableList(props); } @@ -215,6 +244,9 @@ public void onConfigured(final ConfigurationContext context) throws Initializati final String windowPeriod = context.getProperty(WINDOW_PERIOD).getValue(); final String aggregatorJSON = context.getProperty(AGGREGATOR_JSON).getValue(); final String dimensionsStringList = context.getProperty(DIMENSIONS_LIST).getValue(); + final int maxBatchSize = Integer.valueOf(context.getProperty(MAX_BATCH_SIZE).getValue()); + final int maxPendingBatches = Integer.valueOf(context.getProperty(MAX_PENDING_BATCHES).getValue()); + final int lingerMillis = Integer.valueOf(context.getProperty(LINGER_MILLIS).getValue()); final List dimensions = getDimensions(dimensionsStringList); final List aggregator = getAggregatorList(aggregatorJSON); @@ -273,9 +305,9 @@ public DateTime timestamp(Map theMap){ .buildBeam(); tranquilizer = Tranquilizer.builder() - .maxBatchSize(10000000) - .maxPendingBatches(1000) - .lingerMillis(1000) + .maxBatchSize(maxBatchSize) + .maxPendingBatches(maxPendingBatches) + .lingerMillis(lingerMillis) .blockOnFull(true) .build(beam);