Skip to content

Commit

Permalink
ARROW-8176: [FlightRPC] bind to a free port for integration tests
Browse files Browse the repository at this point in the history
This should fix issues with Flight integration tests stomping on each others' ports.

Closes #6693 from lidavidm/arrow-8176-flight-integration-port

Authored-by: David Li <li.davidm96@gmail.com>
Signed-off-by: Wes McKinney <wesm+git@apache.org>
  • Loading branch information
lidavidm authored and wesm committed Mar 24, 2020
1 parent bc873dc commit 8a33313
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 16 deletions.
6 changes: 4 additions & 2 deletions cpp/src/arrow/flight/test_integration_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ class FlightIntegrationTestServer : public FlightServerBase {
}
auto flight = data->second;

FlightEndpoint endpoint1({{request.path[0]}, {}});
Location server_location;
RETURN_NOT_OK(Location::ForGrpcTcp("127.0.0.1", port(), &server_location));
FlightEndpoint endpoint1({{request.path[0]}, {server_location}});

FlightInfo::Data flight_data;
RETURN_NOT_OK(internal::SchemaToString(*flight.schema, &flight_data.schema));
Expand Down Expand Up @@ -166,7 +168,7 @@ int main(int argc, char** argv) {
// Exit with a clean error code (0) on SIGTERM
ARROW_CHECK_OK(g_server->SetShutdownOnSignals({SIGTERM}));

std::cout << "Server listening on localhost:" << FLAGS_port << std::endl;
std::cout << "Server listening on localhost:" << g_server->port() << std::endl;
ARROW_CHECK_OK(g_server->Serve());
return 0;
}
5 changes: 2 additions & 3 deletions dev/archery/archery/integration/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from .tester_java import JavaTester
from .tester_js import JSTester
from .util import (ARROW_ROOT_DEFAULT, guid, SKIP_ARROW, SKIP_FLIGHT,
find_unused_port, printer)
printer)
from . import datagen


Expand Down Expand Up @@ -280,8 +280,7 @@ def _run_flight_test_case(self, producer, consumer, test_case):

else:
try:
port = find_unused_port()
with producer.flight_server(port):
with producer.flight_server() as port:
# Have the client upload the file, then download and
# compare
consumer.flight_request(port, json_path)
Expand Down
7 changes: 6 additions & 1 deletion dev/archery/archery/integration/tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ def file_to_stream(self, file_path, stream_path):
def validate(self, json_path, arrow_path):
raise NotImplementedError

def flight_server(self, port):
def flight_server(self):
"""Start the Flight server on a free port.
This should be a context manager that returns the port as the
managed object, and cleans up the server on exit.
"""
raise NotImplementedError

def flight_request(self, port, json_path):
Expand Down
9 changes: 5 additions & 4 deletions dev/archery/archery/integration/tester_cpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,23 +76,24 @@ def file_to_stream(self, file_path, stream_path):
self.run_shell_command(cmd)

@contextlib.contextmanager
def flight_server(self, port):
cmd = self.FLIGHT_SERVER_CMD + ['-port=' + str(port)]
def flight_server(self):
cmd = self.FLIGHT_SERVER_CMD + ['-port=0']
if self.debug:
log(' '.join(cmd))
server = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
output = server.stdout.readline().decode()
if not output.startswith("Server listening on localhost"):
if not output.startswith("Server listening on localhost:"):
server.kill()
out, err = server.communicate()
raise RuntimeError(
"Flight-C++ server did not start properly, "
"stdout:\n{}\n\nstderr:\n{}\n"
.format(output + out.decode(), err.decode()))
yield
port = int(output.split(":")[1])
yield port
finally:
server.kill()
server.wait(5)
Expand Down
9 changes: 5 additions & 4 deletions dev/archery/archery/integration/tester_java.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,24 +105,25 @@ def flight_request(self, port, json_path):
run_cmd(cmd)

@contextlib.contextmanager
def flight_server(self, port):
def flight_server(self):
cmd = ['java'] + self.JAVA_OPTS + \
['-cp', self.ARROW_FLIGHT_JAR, self.ARROW_FLIGHT_SERVER,
'-port', str(port)]
'-port', '0']
if self.debug:
log(' '.join(cmd))
server = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
output = server.stdout.readline().decode()
if not output.startswith("Server listening on localhost"):
if not output.startswith("Server listening on localhost:"):
server.kill()
out, err = server.communicate()
raise RuntimeError(
"Flight-Java server did not start properly, "
"stdout:\n{}\n\nstderr:\n{}\n"
.format(output + out.decode(), err.decode()))
yield
port = int(output.split(":")[1])
yield port
finally:
server.kill()
server.wait(5)
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public Location getLocation() {
return location;
}

public int getPort() {
return this.flightServer.getPort();
}

public void start() throws IOException {
flightServer.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class InMemoryStore implements FlightProducer, AutoCloseable {

private final ConcurrentMap<FlightDescriptor, FlightHolder> holders = new ConcurrentHashMap<>();
private final BufferAllocator allocator;
private final Location location;
private Location location;

/**
* Constructs a new instance.
Expand All @@ -59,6 +59,15 @@ public InMemoryStore(BufferAllocator allocator, Location location) {
this.location = location;
}

/**
* Update the location after server start.
*
* <p>Useful for binding to port 0 to get a free port.
*/
public void setLocation(Location location) {
this.location = location;
}

@Override
public void getStream(CallContext context, Ticket ticket,
ServerStreamListener listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ private void run(String[] args) throws Exception {
final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
final ExampleFlightServer efs = new ExampleFlightServer(allocator, Location.forGrpcInsecure("localhost", port));
efs.start();
efs.getStore().setLocation(Location.forGrpcInsecure("localhost", efs.getPort()));
// Print out message for integration test script
System.out.println("Server listening on localhost:" + port);
System.out.println("Server listening on localhost:" + efs.getPort());

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
Expand Down

0 comments on commit 8a33313

Please sign in to comment.