Skip to content

Commit

Permalink
[FLINK-2490][streaming]rerun CI
Browse files Browse the repository at this point in the history
  • Loading branch information
HuangWHWHW committed Sep 6, 2015
1 parent 4911462 commit d6e6247
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
private boolean retryForever;
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
private static final int CONNECTION_RETRY_SLEEP = 1000;
static int CONNECTION_RETRY_SLEEP = 1000;
protected long retries;

private volatile boolean isRunning;

Expand All @@ -67,9 +68,9 @@ public void run(SourceContext<String> ctx) throws Exception {
streamFromSocket(ctx, socket);
}

public void streamFromSocket(SourceContext<String> ctx, Socket socket) throws Exception {
private void streamFromSocket(SourceContext<String> ctx, Socket socket) throws Exception {
try {
StringBuffer buffer = new StringBuffer();
StringBuilder buffer = new StringBuilder();
BufferedReader reader = new BufferedReader(new InputStreamReader(
socket.getInputStream()));

Expand All @@ -87,11 +88,11 @@ public void streamFromSocket(SourceContext<String> ctx, Socket socket) throws Ex

if (data == -1) {
socket.close();
long retry = 0;
boolean success = false;
while (retry < maxRetry && !success) {
retries = 0;
while ((retries < maxRetry || retryForever) && !success) {
if (!retryForever) {
retry++;
retries++;
}
LOG.warn("Lost connection to server socket. Retrying in "
+ (CONNECTION_RETRY_SLEEP / 1000) + " seconds...");
Expand All @@ -118,7 +119,7 @@ public void streamFromSocket(SourceContext<String> ctx, Socket socket) throws Ex

if (data == delimiter) {
ctx.collect(buffer.toString());
buffer = new StringBuffer();
buffer = new StringBuilder();
} else if (data != '\r') { // ignore carriage return
buffer.append((char) data);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.functions.source;

import java.io.DataOutputStream;
import java.net.Socket;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.junit.Test;

import static java.lang.Thread.sleep;
import static org.junit.Assert.*;

import java.net.ServerSocket;
import java.util.concurrent.atomic.AtomicReference;

/**
* Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
*/
public class SocketTextStreamFunctionTest{

final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
private final String host = "127.0.0.1";

SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
public String result;

@Override
public void collect(String element) {
result = element;
}

@Override
public String toString() {
return this.result;
}

@Override
public void collectWithTimestamp(String element, long timestamp) {

}

@Override
public void emitWatermark(Watermark mark) {

}

@Override
public Object getCheckpointLock() {
return null;
}

@Override
public void close() {

}
};

public SocketTextStreamFunctionTest() {
}

class SocketSource extends Thread {

SocketTextStreamFunction socketSource;

public SocketSource(ServerSocket serverSo, int maxRetry) throws Exception {
this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
}

public void run() {
try {
this.socketSource.open(new Configuration());
this.socketSource.run(ctx);
}catch(Exception e){
error.set(e);
}
}

public void cancel(){
this.socketSource.cancel();
}
}

@Test
public void testSocketSourceRetryForever() throws Exception{
error.set(null);
ServerSocket serverSo = new ServerSocket(0);
SocketSource source = new SocketSource(serverSo, -1);
source.start();

int count = 0;
Socket channel;
while (count < 100) {
channel = serverSo.accept();
count++;
channel.close();
assertEquals(0, source.socketSource.retries);
}
source.cancel();

if (error.get() != null) {
Throwable t = error.get();
t.printStackTrace();
fail("Error in spawned thread: " + t.getMessage());
}

assertEquals(100, count);
}

@Test
public void testSocketSourceRetryTenTimes() throws Exception{
error.set(null);
ServerSocket serverSo = new ServerSocket(0);
SocketSource source = new SocketSource(serverSo, 10);
source.socketSource.CONNECTION_RETRY_SLEEP = 200;

assertEquals(0, source.socketSource.retries);

source.start();

Socket channel;
channel = serverSo.accept();
channel.close();
serverSo.close();
while(source.socketSource.retries < 10){
long lastRetry = source.socketSource.retries;
sleep(100);
assertTrue(source.socketSource.retries >= lastRetry);
};
assertEquals(10, source.socketSource.retries);
source.cancel();

if (error.get() != null) {
Throwable t = error.get();
t.printStackTrace();
fail("Error in spawned thread: " + t.getMessage());
}

assertEquals(10, source.socketSource.retries);
}

@Test
public void testSocketSourceNeverRetry() throws Exception{
error.set(null);
ServerSocket serverSo = new ServerSocket(0);
SocketSource source = new SocketSource(serverSo, 0);
source.start();

Socket channel;
channel = serverSo.accept();
channel.close();
serverSo.close();
sleep(2000);
source.cancel();

if (error.get() != null) {
Throwable t = error.get();
t.printStackTrace();
fail("Error in spawned thread: " + t.getMessage());
}

assertEquals(0, source.socketSource.retries);
}

@Test
public void testSocketSourceRetryTenTimesWithFirstPass() throws Exception{
error.set(null);
ServerSocket serverSo = new ServerSocket(0);
SocketSource source = new SocketSource(serverSo, 10);
source.socketSource.CONNECTION_RETRY_SLEEP = 200;

assertEquals(0, source.socketSource.retries);

source.start();

Socket channel;
channel = serverSo.accept();
DataOutputStream dataOutputStream = new DataOutputStream(channel.getOutputStream());
dataOutputStream.write("testFirstSocketpass\n".getBytes());
channel.close();
serverSo.close();
while(source.socketSource.retries < 10){
long lastRetry = source.socketSource.retries;
sleep(100);
assertTrue(source.socketSource.retries >= lastRetry);
};
assertEquals(10, source.socketSource.retries);
source.cancel();

if (error.get() != null) {
Throwable t = error.get();
t.printStackTrace();
fail("Error in spawned thread: " + t.getMessage());
}

assertEquals("testFirstSocketpass", ctx.toString());
assertEquals(10, source.socketSource.retries);
}
}

0 comments on commit d6e6247

Please sign in to comment.