Skip to content

Commit

Permalink
[FLINK-2386] [kafka connector] Remove copied Kafka code again. Implem…
Browse files Browse the repository at this point in the history
…ented our own topic metadata retrieval.

This closes #1039
  • Loading branch information
rmetzger authored and StephanEwen committed Aug 27, 2015
1 parent b9892a0 commit 76fcaca
Show file tree
Hide file tree
Showing 226 changed files with 224 additions and 23,489 deletions.
4 changes: 2 additions & 2 deletions docs/_includes/navbar.html
Expand Up @@ -81,8 +81,8 @@
<li><a href="{{ apis }}/web_client.html">Web Client</a></li>
<li><a href="{{ apis }}/iterations.html">Iterations</a></li>
<li><a href="{{ apis }}/java8.html">Java 8</a></li>
<li><a href="{{ apis }}/hadoop_compatibility.html">Hadoop Compatability <span class="badge">Beta</span></a></li>
<li><a href="{{ apis }}/storm_compatibility.html">Storm Compatability <span class="badge">Beta</span></a></li>
<li><a href="{{ apis }}/hadoop_compatibility.html">Hadoop Compatibility <span class="badge">Beta</span></a></li>
<li><a href="{{ apis }}/storm_compatibility.html">Storm Compatibility <span class="badge">Beta</span></a></li>
</ul>
</li>

Expand Down
5 changes: 2 additions & 3 deletions docs/apis/kafka.md
Expand Up @@ -57,8 +57,7 @@ Please pick a package (maven artifact id) and class name for your use-case and e
| ------------- |-------------| -----| ------ | ------ |
| flink-connector-kafka | 0.9, 0.10 | `KafkaSource` | 0.8.1, 0.8.2 | **No**, does not participate in checkpointing at all. | Uses the old, high level KafkaConsumer API, autocommits to ZK by Kafka |
| flink-connector-kafka | 0.9, 0.10 | `PersistentKafkaSource` | 0.8.1, 0.8.2 | **No**, does not guarantee exactly-once processing, element order or strict partition assignment | Uses the old, high level KafkaConsumer API, offsets are committed into ZK manually |
| flink-connector-kafka-083 | 0.10 | `FlinkKafkaConsumer081` | 0.8.1 | **yes** | Uses the [SimpleConsumer](https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example) API of Kafka internally. Offsets are committed to ZK manually |
| flink-connector-kafka-083 | 0.10 | `FlinkKafkaConsumer082` | 0.8.2 | **yes** | Uses the new, unreleased consumer API of Kafka 0.9.3 internally. Offsets are committed to ZK manually |
| flink-connector-kafka-083 | 0.10 | `FlinkKafkaConsumer083` | 0.8.3 | **yes** | **EXPERIMENTAL** Uses the new, unreleased consumer of Kafka 0.9.3. Offsets are committed using the Consumer API |
| flink-connector-kafka-083 | 0.9.1 0.10 | `FlinkKafkaConsumer081` | 0.8.1 | **yes** | Uses the [SimpleConsumer](https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example) API of Kafka internally. Offsets are committed to ZK manually |
| flink-connector-kafka-083 | 0.9.1 0.10 | `FlinkKafkaConsumer082` | 0.8.2 | **yes** | Uses the [SimpleConsumer](https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example) API of Kafka internally. Offsets are committed to ZK manually |


Expand Up @@ -41,7 +41,7 @@ public void submitTopology(final String topologyName, final Map<?, ?> conf, fina

public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
final SubmitOptions submitOpts) throws Exception {
ClusterUtil.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), topology.getNumberOfTasks(), -1);
ClusterUtil.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), topology.getNumberOfTasks());
}

public void killTopology(final String topologyName) {
Expand All @@ -60,9 +60,7 @@ public void deactivate(final String topologyName) {
public void rebalance(final String name, final RebalanceOptions options) {
}

public void shutdown() {
ClusterUtil.stopOnMiniCluster();
}
public void shutdown() {}

public String getTopologyConf(final String id) {
return null;
Expand Down
5 changes: 4 additions & 1 deletion flink-core/src/main/java/org/apache/flink/util/NetUtils.java
Expand Up @@ -48,8 +48,10 @@ public static String getHostnameFromFQDN(String fqdn) {
* Works also for ipv6.
*
* See: http://stackoverflow.com/questions/2345063/java-common-way-to-validate-and-convert-hostport-to-inetsocketaddress
*
* @return URL object for accessing host and Port
*/
public static void ensureCorrectHostnamePort(String hostPort) {
public static URL getCorrectHostnamePort(String hostPort) {
try {
URL u = new URL("http://"+hostPort);
if(u.getHost() == null) {
Expand All @@ -58,6 +60,7 @@ public static void ensureCorrectHostnamePort(String hostPort) {
if(u.getPort() == -1) {
throw new IllegalArgumentException("The given host:port ('"+hostPort+"') doesn't contain a valid port");
}
return u;
} catch (MalformedURLException e) {
throw new IllegalArgumentException("The given host:port ('"+hostPort+"') is invalid", e);
}
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

0 comments on commit 76fcaca

Please sign in to comment.