Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@

import com.epam.deltix.tbwg.webapp.interceptors.TimebaseLoginInterceptor;
import com.epam.deltix.tbwg.webapp.interceptors.RestLogInterceptor;
import com.epam.deltix.tbwg.webapp.utils.json.JsonBigIntEncodingArgumentResolver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.web.method.support.HandlerMethodArgumentResolver;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.PathMatchConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import org.springframework.web.util.UrlPathHelper;

import java.util.List;

@Configuration
public class WebMvcConfig implements WebMvcConfigurer {

Expand All @@ -37,6 +41,9 @@ public class WebMvcConfig implements WebMvcConfigurer {
private final RestLogInterceptor logInterceptor;
private final TimebaseLoginInterceptor timebaseLoginInterceptor;

@Autowired
private JsonBigIntEncodingArgumentResolver argumentResolver;

@Autowired
public WebMvcConfig(AsyncTaskExecutor asyncTaskExecutor,
RestLogInterceptor logInterceptor,
Expand Down Expand Up @@ -65,4 +72,9 @@ public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(logInterceptor);
registry.addInterceptor(timebaseLoginInterceptor);
}

@Override
public void addArgumentResolvers(List<HandlerMethodArgumentResolver> resolvers) {
resolvers.add(argumentResolver);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.epam.deltix.tbwg.webapp.config.WebSocketConfig;
import com.epam.deltix.tbwg.webapp.services.timebase.MonitorService;
import com.epam.deltix.tbwg.webapp.utils.HeaderAccessorHelper;
import com.epam.deltix.tbwg.webapp.utils.json.JsonBigIntEncoding;
import com.epam.deltix.tbwg.webapp.websockets.subscription.Subscription;
import com.epam.deltix.tbwg.webapp.websockets.subscription.SubscriptionChannel;
import com.epam.deltix.tbwg.webapp.websockets.subscription.SubscriptionController;
Expand Down Expand Up @@ -53,8 +54,9 @@ public Subscription onSubscribe(SimpMessageHeaderAccessor headerAccessor, Subscr
long fromTimestamp = headerAccessorHelper.getTimestamp(headerAccessor);
List<String> symbols = headerAccessorHelper.getSymbols(headerAccessor);
List<String> types = headerAccessorHelper.getTypes(headerAccessor);
JsonBigIntEncoding bigIntEncoding = HeaderAccessorHelper.getJsonBigIntEncoding(headerAccessor);

monitorService.subscribe(sessionId, subscriptionId, stream, null, fromTimestamp, types, symbols, channel::sendMessage);
monitorService.subscribe(sessionId, subscriptionId, stream, null, fromTimestamp, types, symbols, channel::sendMessage, bigIntEncoding);
return () -> monitorService.unsubscribe(sessionId, subscriptionId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
*/
package com.epam.deltix.tbwg.webapp.controllers;

import com.epam.deltix.gflog.api.Log;
import com.epam.deltix.gflog.api.LogFactory;
import com.epam.deltix.tbwg.webapp.config.WebSocketConfig;
import com.epam.deltix.tbwg.webapp.services.timebase.MonitorService;
import com.epam.deltix.tbwg.webapp.utils.HeaderAccessorHelper;
import com.epam.deltix.tbwg.webapp.utils.json.JsonBigIntEncoding;
import com.epam.deltix.tbwg.webapp.websockets.subscription.Subscription;
import com.epam.deltix.tbwg.webapp.websockets.subscription.SubscriptionChannel;
import com.epam.deltix.tbwg.webapp.websockets.subscription.SubscriptionController;
Expand Down Expand Up @@ -55,8 +54,9 @@ public Subscription onSubscribe(SimpMessageHeaderAccessor headerAccessor, Subscr
long fromTimestamp = headerAccessorHelper.getTimestamp(headerAccessor);
List<String> symbols = headerAccessorHelper.getSymbols(headerAccessor);
List<String> types = headerAccessorHelper.getTypes(headerAccessor);
JsonBigIntEncoding bigIntEncoding = HeaderAccessorHelper.getJsonBigIntEncoding(headerAccessor);

monitorService.subscribe(sessionId, subscriptionId, null, qql, fromTimestamp, types, symbols, channel::sendMessage);
monitorService.subscribe(sessionId, subscriptionId, null, qql, fromTimestamp, types, symbols, channel::sendMessage, bigIntEncoding);
return () -> monitorService.unsubscribe(sessionId, subscriptionId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.epam.deltix.qsrv.hf.tickdb.pub.lock.LockType;
import com.epam.deltix.qsrv.hf.tickdb.ui.tbshell.TickDBShell;
import com.epam.deltix.tbwg.webapp.model.smd.CurrencyDef;
import com.epam.deltix.tbwg.webapp.utils.json.JsonBigIntEncoding;
import com.epam.deltix.timebase.messages.IdentityKey;
import com.epam.deltix.timebase.messages.InstrumentKey;
import com.epam.deltix.timebase.messages.InstrumentMessage;
Expand Down Expand Up @@ -175,14 +176,14 @@ public long correlationId() {
*/
@PreAuthorize("hasAnyAuthority('TB_ALLOW_READ', 'TB_ALLOW_WRITE')")
@RequestMapping(value = "/select", method = {RequestMethod.POST}, produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<StreamingResponseBody> select(@Valid @RequestBody(required = false) SelectRequest select)
throws NoStreamsException {
public ResponseEntity<StreamingResponseBody> select(@Valid @RequestBody(required = false) SelectRequest select,
JsonBigIntEncoding bigIntEncoding) throws NoStreamsException {
if (select == null) {
select = new SelectRequest();
}
return ResponseEntity.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(selectService.select(select, MAX_NUMBER_OF_RECORDS_PER_REST_RESULTSET));
.body(selectService.select(select, MAX_NUMBER_OF_RECORDS_PER_REST_RESULTSET, bigIntEncoding));
}

/**
Expand Down Expand Up @@ -217,7 +218,8 @@ public ResponseEntity<StreamingResponseBody> select(
@RequestParam(required = false) Long offset,
@RequestParam(required = false) Integer rows,
@RequestParam(required = false) String space,
@RequestParam(required = false) boolean reverse) throws NoStreamsException {
@RequestParam(required = false) boolean reverse,
JsonBigIntEncoding bigIntEncoding) throws NoStreamsException {
SelectRequest request = new SelectRequest();
request.streams = streams;
request.symbols = symbols;
Expand All @@ -230,7 +232,7 @@ public ResponseEntity<StreamingResponseBody> select(
request.reverse = reverse;
request.depth = depth;
request.space = space;
return select(request);
return select(request, bigIntEncoding);
}

/**
Expand All @@ -249,13 +251,14 @@ public ResponseEntity<StreamingResponseBody> select(
@PreAuthorize("hasAnyAuthority('TB_ALLOW_READ', 'TB_ALLOW_WRITE')")
@RequestMapping(value = "/{streamId}/select", method = {RequestMethod.POST}, produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<StreamingResponseBody> select(@PathVariable String streamId,
@Valid @RequestBody(required = false) StreamRequest select)
@Valid @RequestBody(required = false) StreamRequest select,
JsonBigIntEncoding bigIntEncoding)
throws NoStreamsException {
if (select == null)
select = new StreamRequest();

return select(streamId, select.symbols, select.types, null, select.from, select.to, select.offset,
select.rows, select.space, select.reverse);
select.rows, select.space, select.reverse, bigIntEncoding);
}

/**
Expand Down Expand Up @@ -297,11 +300,12 @@ public ResponseEntity<StreamingResponseBody> select(
@RequestParam(required = false) Long offset,
@RequestParam(required = false) Integer rows,
@RequestParam(required = false) String space,
@RequestParam(required = false) boolean reverse) throws NoStreamsException {
@RequestParam(required = false) boolean reverse,
JsonBigIntEncoding bigIntEncoding) throws NoStreamsException {
if (TextUtils.isEmpty(streamId))
throw new NoStreamsException();

return select(new String[]{streamId}, symbols, types, depth, from, to, offset, rows, space, reverse);
return select(new String[]{streamId}, symbols, types, depth, from, to, offset, rows, space, reverse, bigIntEncoding);
}

// download operation is permitted for any user
Expand Down Expand Up @@ -1025,7 +1029,7 @@ ResponseEntity<StreamingResponseBody> checkWritable(String error) {
@RequestMapping(value = "/{streamId}/{symbolId}/select", method = {RequestMethod.POST}, produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<StreamingResponseBody> select(@PathVariable String streamId, @PathVariable String symbolId,
@Valid @RequestBody(required = false) InstrumentRequest select,
OutputStream outputStream) {
OutputStream outputStream, JsonBigIntEncoding bigIntEncoding) {
DXTickStream stream = service.getStream(streamId);

if (stream == null)
Expand Down Expand Up @@ -1054,7 +1058,7 @@ public ResponseEntity<StreamingResponseBody> select(@PathVariable String streamI
.contentType(MediaType.APPLICATION_JSON)
.body(new MessageSource2ResponseStream(
stream.select(startTime, options, select.types, ids), select.getEndTime(), startIndex, endIndex,
MAX_NUMBER_OF_RECORDS_PER_REST_RESULTSET)
MAX_NUMBER_OF_RECORDS_PER_REST_RESULTSET, bigIntEncoding)
);
}

Expand Down Expand Up @@ -1090,14 +1094,15 @@ public ResponseEntity<StreamingResponseBody> select(
@RequestParam(required = false) Long offset,
@RequestParam(required = false) Integer rows,
@RequestParam(required = false) String space,
@RequestParam(required = false) boolean reverse) throws NoStreamsException {
@RequestParam(required = false) boolean reverse,
JsonBigIntEncoding bigIntEncoding) throws NoStreamsException {
if (TextUtils.isEmpty(streamId))
throw new NoStreamsException();

if (TextUtils.isEmpty(symbolId))
return ResponseEntity.notFound().build();

return select(new String[]{streamId}, new String[]{symbolId}, types, depth, from, to, offset, rows, space, reverse);
return select(new String[]{streamId}, new String[]{symbolId}, types, depth, from, to, offset, rows, space, reverse, bigIntEncoding);
}

private SelectionOptions getSelectionOption(BaseRequest r) {
Expand Down Expand Up @@ -1535,9 +1540,8 @@ public ResponseEntity<StreamDef[]> streams(@RequestParam(required = false, defau
*/
@PreAuthorize("hasAnyAuthority('TB_ALLOW_READ', 'TB_ALLOW_WRITE')")
@RequestMapping(value = "/query", method = {RequestMethod.POST})
public ResponseEntity<StreamingResponseBody> query(Principal principal, @Valid @RequestBody(required = false) QueryRequest select)
throws InvalidQueryException, WriteOperationsException {

public ResponseEntity<StreamingResponseBody> query(Principal principal, @Valid @RequestBody(required = false) QueryRequest select,
JsonBigIntEncoding bigIntEncoding) throws InvalidQueryException, WriteOperationsException {
if (select == null || StringUtils.isEmpty(select.query))
throw new InvalidQueryException(select == null ? "" : select.query);

Expand All @@ -1559,7 +1563,34 @@ public ResponseEntity<StreamingResponseBody> query(Principal principal, @Valid @
.body(new MessageSource2ResponseStream(
service.getConnection().executeQuery(
select.query, options, null, null, select.getStartTime(Long.MIN_VALUE), select.getEndTime(Long.MIN_VALUE)),
select.getEndTime(), startIndex, endIndex, MAX_NUMBER_OF_RECORDS_PER_REST_RESULTSET));
select.getEndTime(), startIndex, endIndex, MAX_NUMBER_OF_RECORDS_PER_REST_RESULTSET, bigIntEncoding));
}

/**
* Executes an QQL query and returns the maximum possible number of records
*/
@PreAuthorize("hasAnyAuthority('TB_ALLOW_READ', 'TB_ALLOW_WRITE')")
@RequestMapping(value = "/unlimitedQuery", method = {RequestMethod.POST})
public ResponseEntity<StreamingResponseBody> unlimitedQuery(Principal principal, @Valid @RequestBody(required = false) QueryRequest select,
JsonBigIntEncoding bigIntEncoding)
throws InvalidQueryException, WriteOperationsException {

if (select == null || StringUtils.isEmpty(select.query))
throw new InvalidQueryException(select == null ? "" : select.query);
if (service.isReadonly() && (select.query.toLowerCase().contains("drop") || select.query.toLowerCase().contains("create")))
throw new WriteOperationsException("CREATE or DROP");
if (isDdlQuery(select.query) && !hasAuthority(principal, "TB_ALLOW_WRITE")) {
throw new AccessDeniedException("TB_ALLOW_WRITE permission required.");
}

SelectionOptions options = getSelectionOption(select);
LOGGER.info().append("UNLIMITED QUERY: (").append(select.query).append(")").commit();
return ResponseEntity.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(new MessageSource2ResponseStream(
service.getConnection().executeQuery(
select.query, options, null, null, select.getStartTime(Long.MIN_VALUE), select.getEndTime(Long.MIN_VALUE)),
select.getEndTime(), 0, Integer.MAX_VALUE, Integer.MAX_VALUE, bigIntEncoding));
}

private boolean isDdlQuery(String query) {
Expand Down Expand Up @@ -1663,8 +1694,8 @@ public Set<ShortFunctionDef> queryFunctionsShort() {
@PreAuthorize("hasAnyAuthority('TB_ALLOW_READ', 'TB_ALLOW_WRITE')")
@RequestMapping(value = "/{streamId}/filter", method = {RequestMethod.POST}, consumes = MediaType.APPLICATION_JSON_VALUE,
produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<StreamingResponseBody> filter(@PathVariable String streamId, @Valid @RequestBody FilterRequest filter)
throws UnknownStreamException {
public ResponseEntity<StreamingResponseBody> filter(@PathVariable String streamId, @Valid @RequestBody FilterRequest filter,
JsonBigIntEncoding bigIntEncoding) throws UnknownStreamException {
DXTickStream stream = service.getStream(streamId);
if (stream == null)
throw new UnknownStreamException(streamId);
Expand Down Expand Up @@ -1697,7 +1728,7 @@ public ResponseEntity<StreamingResponseBody> filter(@PathVariable String streamI
.contentType(MediaType.APPLICATION_JSON)
.body(new MessageSource2ResponseStream(service.getConnection()
.executeQuery(query, options, null, null, startTime), endTime, startIndex, endIndex,
MAX_NUMBER_OF_RECORDS_PER_REST_RESULTSET));
MAX_NUMBER_OF_RECORDS_PER_REST_RESULTSET, bigIntEncoding));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.epam.deltix.tbwg.webapp.model.tree.TreeNodeDef;
import com.epam.deltix.tbwg.webapp.services.timebase.MonitorService;
import com.epam.deltix.tbwg.webapp.services.topic.TopicService;
import com.epam.deltix.tbwg.webapp.utils.HeaderAccessorHelper;
import com.epam.deltix.tbwg.webapp.utils.json.JsonBigIntEncoding;
import com.epam.deltix.tbwg.webapp.websockets.subscription.Subscription;
import com.epam.deltix.tbwg.webapp.websockets.subscription.SubscriptionChannel;
import com.epam.deltix.tbwg.webapp.websockets.subscription.SubscriptionController;
Expand Down Expand Up @@ -126,8 +128,9 @@ public Subscription onSubscribe(SimpMessageHeaderAccessor header, SubscriptionCh
String topicKey = URLDecoder.decode(extractId(destination), StandardCharsets.UTF_8);
String sessionId = header.getSessionId();
String subscriptionId = header.getSubscriptionId();
JsonBigIntEncoding bigIntEncoding = HeaderAccessorHelper.getJsonBigIntEncoding(header);

monitorService.subscribeTopic(sessionId, subscriptionId, topicKey, channel::sendMessage);
monitorService.subscribeTopic(sessionId, subscriptionId, topicKey, channel::sendMessage, bigIntEncoding);
return () -> monitorService.unsubscribe(sessionId, subscriptionId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
import com.epam.deltix.qsrv.util.json.JSONRawMessagePrinter;
import com.epam.deltix.tbwg.messages.Message;
import com.epam.deltix.tbwg.webapp.model.charting.line.RawElementDef;
import com.epam.deltix.tbwg.webapp.utils.json.WebGatewayJsonRawMessagePrinterFactory;

import java.util.Collections;

@Deprecated // not implemented on the front-end side
public class QqlConversionTransformation extends AbstractChartTransformation<RawElementDef, RawMessage> {

private final StringBuilder sb = new StringBuilder();
private final JSONRawMessagePrinter rawMessagePrinter = new JSONRawMessagePrinter();
private final JSONRawMessagePrinter rawMessagePrinter = WebGatewayJsonRawMessagePrinterFactory.create();

public QqlConversionTransformation() {
super(Collections.singletonList(RawMessage.class), Collections.singletonList(RawElementDef.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@
*/
package com.epam.deltix.tbwg.webapp.services.timebase;

import com.epam.deltix.tbwg.webapp.utils.json.JsonBigIntEncoding;

import java.util.List;
import java.util.function.Consumer;

public interface MonitorService {

void subscribe(String sessionId, String subscriptionId, String key, String qql, long fromTimestamp, List<String> types,
List<String> symbols, Consumer<String> consumer);
List<String> symbols, Consumer<String> consumer, JsonBigIntEncoding bigIntEncoding);

void subscribeTopic(String sessionId, String subscriptionId, String key,
Consumer<String> consumer);
Consumer<String> consumer, JsonBigIntEncoding bigIntEncoding);

void unsubscribe(String sessionId, String subscriptionId);

Expand Down
Loading