From 46b72a3960b73f1cf80096222d7e7710416b1b43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Tue, 1 May 2018 19:14:56 +0200 Subject: [PATCH 1/2] Detach storm-kafka-client from main Storm project, make storm projects using storm-kafka-client point to 2.0.0-SNAPSHOT until the first release, copy release notes script for use during storm-kafka-client release --- dev-tools/release_notes.py | 2 +- dev-tools/storm-kafka-client/release_notes.py | 118 ++++++++++++++++++ docs/storm-kafka-client.md | 10 +- examples/storm-kafka-client-examples/pom.xml | 4 +- examples/storm-perf/pom.xml | 2 +- external/storm-kafka-client/pom.xml | 114 ++++++++++++----- external/storm-kafka-migration/pom.xml | 2 +- external/storm-kafka-monitor/pom.xml | 2 +- flux/flux-core/pom.xml | 2 +- flux/flux-examples/pom.xml | 2 +- pom.xml | 9 +- 11 files changed, 220 insertions(+), 47 deletions(-) create mode 100644 dev-tools/storm-kafka-client/release_notes.py diff --git a/dev-tools/release_notes.py b/dev-tools/release_notes.py index c1e053a2d9b..ca4a7421935 100644 --- a/dev-tools/release_notes.py +++ b/dev-tools/release_notes.py @@ -57,7 +57,7 @@ def issue_link(issue): if __name__ == "__main__": apache = JIRA(JIRA_BASE_URL) - issues = get_issues(apache, 'project=STORM and fixVersion=%s' % version) + issues = get_issues(apache, 'project=STORM and fixVersion=%s and component!=storm-kafka-client' % version) if not issues: print >>sys.stderr, "Didn't find any issues for the target fix version" sys.exit(1) diff --git a/dev-tools/storm-kafka-client/release_notes.py b/dev-tools/storm-kafka-client/release_notes.py new file mode 100644 index 00000000000..0c8e76872dd --- /dev/null +++ b/dev-tools/storm-kafka-client/release_notes.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Usage: release_notes.py > RELEASE_NOTES.html + +Depends on https://pypi.python.org/pypi/jira/, please use pip to install this module. + +Generates release notes for a Storm-kafka-client release by generating an HTML doc containing some introductory information about the + release with links to the Storm docs followed by a list of issues resolved in the release. The script will fail if it finds + any unresolved issues still marked with the target release. You should run this script after either resolving all issues or + moving outstanding issues to a later release. + +""" + +from jira import JIRA +import itertools, sys + +if len(sys.argv) < 2: + print >>sys.stderr, "Usage: release_notes.py " + sys.exit(1) + +version = sys.argv[1] + +JIRA_BASE_URL = 'https://issues.apache.org/jira' +MAX_RESULTS = 100 # This is constrained for cloud instances so we need to fix this value + +def get_issues(jira, query, **kwargs): + """ + Get all issues matching the JQL query from the JIRA instance. This handles expanding paginated results for you. Any additional keyword arguments are forwarded to the JIRA.search_issues call. + """ + results = [] + startAt = 0 + new_results = None + while new_results == None or len(new_results) == MAX_RESULTS: + new_results = jira.search_issues(query, startAt=startAt, maxResults=MAX_RESULTS, **kwargs) + results += new_results + startAt += len(new_results) + return results + +def issue_link(issue): + return "%s/browse/%s" % (JIRA_BASE_URL, issue.key) + + +if __name__ == "__main__": + apache = JIRA(JIRA_BASE_URL) + issues = get_issues(apache, 'project=STORM and fixVersion=%s and component=storm-kafka-client' % version) + if not issues: + print >>sys.stderr, "Didn't find any issues for the target fix version" + sys.exit(1) + + # Some resolutions, including a lack of resolution, indicate that the bug hasn't actually been addressed and we shouldn't even be able to create a release until they are fixed + UNRESOLVED_RESOLUTIONS = [None, + "Unresolved", + "Duplicate", + "Invalid", + "Not A Problem", + "Not A Bug", + "Won't Fix", + "Incomplete", + "Cannot Reproduce", + "Later", + "Works for Me", + "Workaround", + "Information Provided" + ] + unresolved_issues = [issue for issue in issues if issue.fields.resolution in UNRESOLVED_RESOLUTIONS or issue.fields.resolution.name in UNRESOLVED_RESOLUTIONS] + if unresolved_issues: + print >>sys.stderr, "The release is not completed since unresolved issues or improperly resolved issues were found still tagged with this release as the fix version:" + for issue in unresolved_issues: + print >>sys.stderr, "Unresolved issue: %15s %20s %s" % (issue.key, issue.fields.resolution, issue_link(issue)) + print >>sys.stderr + print >>sys.stderr, "Note that for some resolutions, you should simply remove the fix version as they have not been truly fixed in this release." + sys.exit(1) + + # Get list of (issue type, [issues]) sorted by the issue ID type, with each subset of issues sorted by their key so they + # are in increasing order of bug #. To get a nice ordering of the issue types we customize the key used to sort by issue + # type a bit to ensure features and improvements end up first. + def issue_type_key(issue): + if issue.fields.issuetype.name == 'New Feature': + return -2 + if issue.fields.issuetype.name == 'Improvement': + return -1 + return issue.fields.issuetype.id + by_group = [(k,sorted(g, key=lambda issue: issue.id)) for k,g in itertools.groupby(sorted(issues, key=issue_type_key), lambda issue: issue.fields.issuetype.name)] + + print "" + print "" + print "" + print "" + print "Storm-kafka-client %(version)s Release Notes" % { 'version': version } + print "" + print "" + print "

Release Notes for Storm-kafka-client %s

" % version + print """

JIRA issues addressed in the %(version)s release of Storm-kafka-client. Documentation for this + release is available at the Apache Storm + project site.

""" % { 'version': version } + for itype, issues in by_group: + print "

%s

" % itype + print "
    " + for issue in issues: + print '
  • [%(key)s] - %(summary)s
  • ' % {'key': issue.key, 'link': issue_link(issue), 'summary': issue.fields.summary} + print "
" + print "" + print "" diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md index a6f7ac1ed86..252799b1b43 100644 --- a/docs/storm-kafka-client.md +++ b/docs/storm-kafka-client.md @@ -8,7 +8,15 @@ This includes the new Apache Kafka consumer API. ## Compatibility -Apache Kafka versions 0.10 onwards +Apache Kafka versions 0.10 onwards. +Note that unlike Storm 1.x, this module requires Java 8. +Compatible Storm versions are listed below, for each active Storm release line. + +| Storm release line | Earliest supported version | +| --- | --- | +| 1.1.x | 1.1.3 | +| 1.2.x | 1.2.2 | +| 2.x | 2.0.0 | ## Writing to Kafka as part of your topology You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you diff --git a/examples/storm-kafka-client-examples/pom.xml b/examples/storm-kafka-client-examples/pom.xml index 74d761485ac..c983a3aa659 100644 --- a/examples/storm-kafka-client-examples/pom.xml +++ b/examples/storm-kafka-client-examples/pom.xml @@ -41,13 +41,13 @@ org.apache.storm storm-kafka-client - ${project.version} + ${storm.kafka.client.version} compile org.apache.kafka kafka-clients - ${storm.kafka.client.version} + ${kafka.client.version} compile diff --git a/examples/storm-perf/pom.xml b/examples/storm-perf/pom.xml index 4bbabfa392e..16369ffbb94 100644 --- a/examples/storm-perf/pom.xml +++ b/examples/storm-perf/pom.xml @@ -120,7 +120,7 @@ org.apache.storm storm-kafka-client - ${project.version} + ${storm.kafka.client.version} org.apache.kafka diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml index acb041bc8d6..af224bb5f4a 100644 --- a/external/storm-kafka-client/pom.xml +++ b/external/storm-kafka-client/pom.xml @@ -1,21 +1,21 @@ +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/--> 4.0.0 @@ -28,6 +28,7 @@ storm-kafka-client + 2.0.0-SNAPSHOT storm-kafka-client jar @@ -40,26 +41,63 @@ - - - - org.apache.storm - storm-client - ${project.version} - ${provided.scope} - - - org.apache.storm - storm-server - ${project.version} - test - + + 0.10.1.0 + 2.x + + + + + storm-2.x + + true + + + 2.0.0-SNAPSHOT + + + + org.apache.storm + storm-client + ${storm.version} + ${provided.scope} + + + org.apache.storm + storm-server + ${storm.version} + test + + + + + storm-1.x + + + storm.release.line + 1.x + + + + 1.2.2-SNAPSHOT + + + + org.apache.storm + storm-core + ${storm.version} + ${provided.scope} + + + + + org.apache.kafka kafka-clients - ${storm.kafka.client.version} + ${kafka.client.version} org.apache.zookeeper @@ -83,6 +121,10 @@ com.google.guava guava + + commons-lang + commons-lang + org.mockito @@ -100,7 +142,7 @@ org.apache.kafka kafka_2.11 - ${storm.kafka.client.version} + ${kafka.client.version} test test @@ -113,14 +155,14 @@ org.apache.kafka kafka-clients - ${storm.kafka.client.version} + ${kafka.client.version} test test org.apache.kafka kafka_2.11 - ${storm.kafka.client.version} + ${kafka.client.version} test @@ -129,6 +171,10 @@ + + com.googlecode.json-simple + json-simple + diff --git a/external/storm-kafka-migration/pom.xml b/external/storm-kafka-migration/pom.xml index c935954b2cc..9556d8810c7 100644 --- a/external/storm-kafka-migration/pom.xml +++ b/external/storm-kafka-migration/pom.xml @@ -42,7 +42,7 @@ org.apache.kafka kafka-clients - ${storm.kafka.client.version} + ${kafka.client.version} provided diff --git a/external/storm-kafka-monitor/pom.xml b/external/storm-kafka-monitor/pom.xml index 711dbea3de1..514ac6181fa 100644 --- a/external/storm-kafka-monitor/pom.xml +++ b/external/storm-kafka-monitor/pom.xml @@ -46,7 +46,7 @@ org.apache.kafka kafka-clients - ${storm.kafka.client.version} + ${kafka.client.version} org.apache.kafka diff --git a/flux/flux-core/pom.xml b/flux/flux-core/pom.xml index d29e619c0f5..27363d60032 100644 --- a/flux/flux-core/pom.xml +++ b/flux/flux-core/pom.xml @@ -39,7 +39,7 @@ org.apache.storm storm-kafka-client - ${project.version} + ${storm.kafka.client.version} test diff --git a/flux/flux-examples/pom.xml b/flux/flux-examples/pom.xml index dca5a836c47..05aca4c560b 100644 --- a/flux/flux-examples/pom.xml +++ b/flux/flux-examples/pom.xml @@ -95,7 +95,7 @@ org.apache.storm storm-kafka-client - ${project.version} + ${storm.kafka.client.version} diff --git a/pom.xml b/pom.xml index c486ac6d328..cced5f94965 100644 --- a/pom.xml +++ b/pom.xml @@ -324,8 +324,10 @@ kafka_2.10 - 0.10.1.0 - + 0.10.1.0 + + + 2.0.0-SNAPSHOT org.apache.storm.testing.IntegrationTest @@ -383,7 +385,6 @@ external/storm-cassandra external/storm-mqtt external/storm-mongodb - external/storm-kafka-client external/storm-kafka-migration external/storm-opentsdb external/storm-kafka-monitor @@ -1075,7 +1076,7 @@ org.apache.kafka kafka-clients - ${storm.kafka.client.version} + ${kafka.client.version} provided From 111e7a1db6066f8b49f26fa8f5cba30075dfee25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Tue, 1 May 2018 20:21:09 +0200 Subject: [PATCH 2/2] Remove generic types in a few places to be compatible with Storm 1.x --- .../src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java | 4 ++-- .../main/java/org/apache/storm/kafka/spout/KafkaSpout.java | 2 +- .../storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java | 4 ++-- .../apache/storm/kafka/trident/TridentKafkaStateFactory.java | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java index 2ff1a54d3ce..4984f50337a 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java @@ -99,12 +99,12 @@ public KafkaBolt withProducerProperties(Properties producerProperties) { } @Override - public void prepare(Map topoConf, TopologyContext context, OutputCollector collector) { + public void prepare(@SuppressWarnings("rawtypes") Map topoConf, TopologyContext context, OutputCollector collector) { LOG.info("Preparing bolt with configuration {}", this); //for backward compatibility. if (mapper == null) { LOG.info("Mapper not specified. Setting default mapper to {}", FieldNameBasedTupleToKafkaMapper.class.getSimpleName()); - this.mapper = new FieldNameBasedTupleToKafkaMapper(); + this.mapper = new FieldNameBasedTupleToKafkaMapper<>(); } //for backward compatibility. diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 901e97f5897..a08fd34f203 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -118,7 +118,7 @@ public KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig) { } @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) { this.context = context; // Spout internals diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java index 3257be799e6..8fb352a4711 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java @@ -43,12 +43,12 @@ public KafkaTridentSpoutOpaque(KafkaSpoutConfig kafkaSpoutConfig) { @Override public Emitter>, KafkaTridentSpoutTopicPartition, Map> getEmitter( - Map conf, TopologyContext context) { + @SuppressWarnings("rawtypes") Map conf, TopologyContext context) { return new KafkaTridentSpoutEmitter<>(kafkaSpoutConfig, context); } @Override - public Coordinator>> getCoordinator(Map conf, TopologyContext context) { + public Coordinator>> getCoordinator(@SuppressWarnings("rawtypes") Map conf, TopologyContext context) { return new KafkaTridentSpoutOpaqueCoordinator<>(kafkaSpoutConfig); } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java index 38f5c6e5bcb..6136f9209e0 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java @@ -53,7 +53,7 @@ public TridentKafkaStateFactory withProducerProperties(Properties props) { } @Override - public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { + public State makeState(@SuppressWarnings("rawtypes") Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions); TridentKafkaState state = new TridentKafkaState<>(); state.withKafkaTopicSelector(this.topicSelector)