From f2412805e12b5c7c7fda3d4d6a33d5af1159aa72 Mon Sep 17 00:00:00 2001 From: Adam Carpenter Date: Thu, 21 May 2026 18:10:17 -0400 Subject: [PATCH] Manually compress data if acceptable by client and periodically flush it. This removes the compression responsibility from the Tomcat connector via use of vnd+json mime type. Periodically flushing the output stream keeps the compression buffer from taking too long to flush and hitting a timeout when a constant data stream is being sampled from. --- .../org/jlab/myquery/MySamplerController.java | 34 ++++++++++++++---- .../org/jlab/myquery/QueryController.java | 36 +++++++++++++++++++ 2 files changed, 63 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/jlab/myquery/MySamplerController.java b/src/main/java/org/jlab/myquery/MySamplerController.java index 16f6940..7895aed 100644 --- a/src/main/java/org/jlab/myquery/MySamplerController.java +++ b/src/main/java/org/jlab/myquery/MySamplerController.java @@ -16,6 +16,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.PatternSyntaxException; +import java.util.zip.GZIPOutputStream; import org.jlab.mya.ExtraInfo; import org.jlab.mya.Metadata; import org.jlab.mya.MyaDataType; @@ -47,12 +48,6 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { String jsonp = request.getParameter("jsonp"); - if (jsonp != null) { - response.setContentType("application/javascript"); - } else { - response.setContentType("application/json"); - } - String errorReason = null; List channels = null; MySamplerWebService service = null; @@ -155,7 +150,26 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) boolean adjustMillisWithServerOffset = (a != null); try { - OutputStream out = response.getOutputStream(); + OutputStream rawOut = response.getOutputStream(); + OutputStream out = rawOut; + GZIPOutputStream gzipOut = null; + + String acceptEncoding = request.getHeader("Accept-Encoding"); + boolean useGzip = acceptEncoding != null && acceptEncoding.contains("gzip"); + if (useGzip) { + // Use a vnd tree content type for JSON with GZIP compression to avoid compression by the + // Tomcat connector + response.setHeader("Content-Type", "application/vnd.org.jlab.myquery+json"); + response.setHeader("Content-Encoding", "gzip"); + response.setHeader("Vary", "Accept-Encoding"); + // Need syncflush=true here so that a flush on the stream also flushes the deflate buffer. + gzipOut = new GZIPOutputStream(rawOut, true); + out = gzipOut; + } else if (jsonp != null) { + response.setContentType("application/javascript"); + } else { + response.setContentType("application/json"); + } if (jsonp != null) { out.write((jsonp + "(").getBytes(StandardCharsets.UTF_8)); @@ -195,6 +209,7 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) service, deployment, gen, + gzipOut, channelName, begin, intervalMillis, @@ -220,6 +235,10 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) if (jsonp != null) { out.write((");").getBytes(StandardCharsets.UTF_8)); } + + if (gzipOut != null) { + gzipOut.finish(); + } } } catch (Exception ex) { response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); @@ -233,6 +252,7 @@ private boolean processChannelRequest( MySamplerWebService service, String deployment, JsonGenerator gen, + GZIPOutputStream gzipOut, String channel, Instant begin, long intervalMillis, diff --git a/src/main/java/org/jlab/myquery/QueryController.java b/src/main/java/org/jlab/myquery/QueryController.java index e895f0d..21054c8 100644 --- a/src/main/java/org/jlab/myquery/QueryController.java +++ b/src/main/java/org/jlab/myquery/QueryController.java @@ -385,6 +385,12 @@ public long generateIntStream( IntEvent event; while ((event = stream.read()) != null) { count++; + // Run a periodic flush to keep any compression buffers flushed. Highly compressible data + // might not trigger + // a flush before a timeout is hit in certain pathological cases + if (count % 1000 == 0) { + gen.flush(); + } writeIntEvent( null, gen, @@ -437,6 +443,12 @@ public long generateStatisticsStream( timestampFormatter, sigFigs); count++; + // Run a periodic flush to keep any compression buffers flushed. Highly compressible data + // might not trigger + // a flush before a timeout is hit in certain pathological cases + if (count % 1000 == 0) { + gen.flush(); + } } gen.writeEnd(); @@ -468,6 +480,12 @@ public long generateFloatStream( FloatEvent event; while ((event = stream.read()) != null) { count++; + // Run a periodic flush to keep any compression buffers flushed. Highly compressible data + // might not trigger + // a flush before a timeout is hit in certain pathological cases + if (count % 1000 == 0) { + gen.flush(); + } writeFloatEvent( null, gen, @@ -505,6 +523,12 @@ public long generateAnalyzedFloatStream( AnalyzedFloatEvent event; while ((event = stream.read()) != null) { count++; + // Run a periodic flush to keep any compression buffers flushed. Highly compressible data + // might not trigger + // a flush before a timeout is hit in certain pathological cases + if (count % 1000 == 0) { + gen.flush(); + } writeAnalyzedFloatEvent( null, gen, @@ -538,6 +562,12 @@ public long generateLabeledEnumStream( long count = 0; while ((event = stream.read()) != null) { count++; + // Run a periodic flush to keep any compression buffers flushed. Highly compressible data + // might not trigger + // a flush before a timeout is hit in certain pathological cases + if (count % 1000 == 0) { + gen.flush(); + } writeLabeledEnumEvent( null, gen, @@ -570,6 +600,12 @@ public long generateMultiStringStream( long count = 0; while ((event = stream.read()) != null) { count++; + // Run a periodic flush to keep any compression buffers flushed. Highly compressible data + // might not trigger + // a flush before a timeout is hit in certain pathological cases + if (count % 1000 == 0) { + gen.flush(); + } writeMultiStringEvent( null, gen,