Skip to content

Commit

Permalink
ARROW-4625: [Flight][Java] Add method to await Flight server terminat…
Browse files Browse the repository at this point in the history
…ion in Java

Apologies for all the tiny PRs...trying to clean up some small API things we've run into.

Author: David Li <David.M.Li@twosigma.com>

Closes #4110 from lihalite/arrow-4625 and squashes the following commits:

1e0d67f <David Li> Add method to await Flight server termination in Java
  • Loading branch information
David Li authored and wesm committed Apr 4, 2019
1 parent af4f529 commit e20dc90
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public int getPort() {
return server.getPort();
}

/** Block until the server shuts down. */
public void awaitTermination() throws InterruptedException {
server.awaitTermination();
}

public void close() throws InterruptedException {
server.shutdown();
final boolean terminated = server.awaitTermination(3000, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public void start() throws IOException {
flightServer.start();
}

public void awaitTermination() throws InterruptedException {
flightServer.awaitTermination();
}

public InMemoryStore getStore() {
return mem;
}
Expand All @@ -74,8 +78,6 @@ public static void main(String[] args) throws Exception {
e.printStackTrace();
}
}));
while (true) {
Thread.sleep(30000);
}
efs.awaitTermination();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ private void run(String[] args) throws Exception {
}
}));

while (true) {
Thread.sleep(30000);
}
efs.awaitTermination();
}

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ public void throughput() throws Exception {
new Location(FlightTestUtil.LOCALHOST, port)));
final FlightClient client = new FlightClient(a, server.getLocation());
) {

server.start();

final FlightInfo info = client.getInfo(getPerfFlightDescriptor(50_000_000L, 4095, 2));
ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4));
List<ListenableFuture<Result>> results = info.getEndpoints()
Expand All @@ -104,7 +101,7 @@ public void throughput() throws Exception {

double seconds = r.nanos * 1.0d / 1000 / 1000 / 1000;
System.out.println(String.format(
"Transferred %d records totaling %s bytes at %f mb/s. %f record/s. %f batch/s.",
"Transferred %d records totaling %s bytes at %f MiB/s. %f record/s. %f batch/s.",
r.rows,
r.bytes,
(r.bytes * 1.0d / 1024 / 1024) / seconds,
Expand Down

0 comments on commit e20dc90

Please sign in to comment.