Skip to content

Commit

Permalink
Merge pull request #1130 from Marketcetera/MATP-1019
Browse files Browse the repository at this point in the history
Matp 1019 Photon market data view should allow specific exchange
  • Loading branch information
colinduplantis committed Mar 30, 2023
2 parents a3d899f + 7dbeadc commit 31b9243
Show file tree
Hide file tree
Showing 27 changed files with 893 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import org.marketcetera.core.InMemoryIDFactory;
import org.marketcetera.core.InternalID;
import org.marketcetera.core.NoMoreIDsException;
import org.marketcetera.core.publisher.Subscriber;
import org.marketcetera.core.publisher.PublisherEngine;
import org.marketcetera.core.publisher.Subscriber;
import org.marketcetera.event.AggregateEvent;
import org.marketcetera.event.Event;
import org.marketcetera.event.EventTranslator;
Expand Down Expand Up @@ -120,8 +120,7 @@ public abstract class AbstractMarketDataFeed<T extends AbstractMarketDataFeedTok
private static final Callable<Boolean> PUBLISHING_CONDITION = ConditionsFactory.createSamplingCondition(100,
"metc.metrics.marketdata.sampling.interval"); //$NON-NLS-1$
/**
* Indicates if the feed is allowed to simulate market data if the normal source is not
* available.
* Indicates if the feed is allowed to simulate market data if the normal source is not available.
*
* @return a <code>boolean</code> value
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.marketcetera.util.log.SLF4JLoggerProxy;
import org.marketcetera.util.misc.ClassVersion;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -171,51 +173,57 @@ public final void requestData(DataRequest inRequest,
}
} else if (requestPayload instanceof MarketDataRequest) {
request = (MarketDataRequest)requestPayload;
} else if(requestPayload instanceof FeedStatusRequest) {
doFeedStatusRequest((FeedStatusRequest)requestPayload,
inRequest,
inSupport);
} else {
throw new UnsupportedRequestParameterType(instanceURN,
requestPayload);
}
try {
Subscriber subscriber = new Subscriber() {
@Override
public boolean isInteresting(Object inData)
{
return inData instanceof Event;
}
@Override
public void publishTo(final Object inEvent)
{
if(inEvent instanceof Event) {
requestLock.executeRead(new Runnable() {
@Override
public void run()
{
Event event = (Event)inEvent;
Object token = event.getSource();
RequestID requestID = requests.get(token);
event.setSource(requestID);
}
});
if(request != null) {
try {
Subscriber subscriber = new Subscriber() {
@Override
public boolean isInteresting(Object inData)
{
return inData instanceof Event;
}
ThreadedMetric.event("mdata-OUT"); //$NON-NLS-1$
inSupport.send(inEvent);
}
};
MarketDataFeedTokenSpec spec = MarketDataFeedTokenSpec.generateTokenSpec(request,
subscriber);
final T token = feed.execute(spec);
requestLock.executeWrite(new Runnable() {
@Override
public void run()
{
requests.put(token,
inSupport.getRequestID());
}
});
} catch (Exception e) {
throw new IllegalRequestParameterValue(instanceURN,
requestPayload,
e);
@Override
public void publishTo(final Object inEvent)
{
if(inEvent instanceof Event) {
requestLock.executeRead(new Runnable() {
@Override
public void run()
{
Event event = (Event)inEvent;
Object token = event.getSource();
RequestID requestID = requests.get(token);
event.setSource(requestID);
}
});
}
ThreadedMetric.event("mdata-OUT"); //$NON-NLS-1$
inSupport.send(inEvent);
}
};
MarketDataFeedTokenSpec spec = MarketDataFeedTokenSpec.generateTokenSpec(request,
subscriber);
final T token = feed.execute(spec);
requestLock.executeWrite(new Runnable() {
@Override
public void run()
{
requests.put(token,
inSupport.getRequestID());
}
});
} catch (Exception e) {
throw new IllegalRequestParameterValue(instanceURN,
requestPayload,
e);
}
}
}
/* (non-Javadoc)
Expand Down Expand Up @@ -369,6 +377,14 @@ private void setFeedStatus(FeedStatus inNewFeedStatus)
String newStatusString = inNewFeedStatus.toString();
String oldStatusString = feedStatus.toString();
feedStatus = inNewFeedStatus;
for(FeedStatusRequestData feedStatusRequestData : feedStatusRequestDataByDataFlowId.asMap().values()) {
try {
feedStatusRequestData.getDataEmitterSupport().send(feedStatus);
} catch (Exception e) {
SLF4JLoggerProxy.warn(this,
e);
}
}
mNotificationDelegate.sendNotification(new AttributeChangeNotification(this,
mSequence.getAndIncrement(),
System.currentTimeMillis(),
Expand All @@ -378,6 +394,57 @@ private void setFeedStatus(FeedStatus inNewFeedStatus)
oldStatusString,
newStatusString));
}
/**
* Create a feed status request.
*
* @param inRequestPayload a <code>FeedStatusRequest</code> value
* @param inRequest a <code>DataRequest</code> value
* @param inSupport a <code>DataEmitterSupport</code> value
*/
private void doFeedStatusRequest(FeedStatusRequest inRequestPayload,
DataRequest inRequest,
DataEmitterSupport inSupport)
{
FeedStatusRequestData metaData = new FeedStatusRequestData(inSupport);
feedStatusRequestDataByDataFlowId.put(inSupport.getFlowID(),
metaData);
}
/**
* Holds data relevant to a feed status request as part of a module data flow.
*
* @author <a href="mailto:colin@marketcetera.com">Colin DuPlantis</a>
* @version $Id$
* @since $Release$
*/
private static class FeedStatusRequestData
{
/**
* Get the dataEmitterSupport value.
*
* @return a <code>DataEmitterSupport</code> value
*/
protected DataEmitterSupport getDataEmitterSupport()
{
return dataEmitterSupport;
}
/**
* Create a new FeedStatusRequestData instance.
*
* @param inDataEmitterSupport a <code>DataEmitterSupport</code> value
*/
private FeedStatusRequestData(DataEmitterSupport inDataEmitterSupport)
{
dataEmitterSupport = inDataEmitterSupport;
}
/**
* data emitter support value
*/
private final DataEmitterSupport dataEmitterSupport;
}
/**
* holds feed status requests by data flow id
*/
private final Cache<DataFlowID,FeedStatusRequestData> feedStatusRequestDataByDataFlowId = CacheBuilder.newBuilder().build();
/**
* tracks feeds by provider name as the feeds are instantiated (not started) - may not be active
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,13 @@ CollectionPageResponse<Event> getSnapshot(Instrument inInstrument,
* @return a <code>Set&lt;Capability&gt;</code> value
*/
Set<Capability> getAvailableCapability();
/**
* Gets the active providers.
*
* <p>Providers may or may not be connected at this time, these are the providers known
* to the system.</p>
*
* @return a <code>Set&lt;String&gt;</code> value
*/
Set<String> getProviders();
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,13 @@ CollectionPageResponse<Event> getSnapshot(Instrument inInstrument,
* @return a <code>Set&lt;Capability&gt;</code> value
*/
Set<Capability> getAvailableCapability();
/**
* Gets the active providers.
*
* <p>Providers may or may not be connected at this time, these are the providers known
* to the system.</p>
*
* @return a <code>Set&lt;String&gt;</code> value
*/
Set<String> getProviders();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.marketcetera.core.publisher.Subscriber;
import org.marketcetera.event.Event;
import org.marketcetera.marketdata.IFeedComponent.FeedType;
import org.marketcetera.marketdata.service.MarketDataService;
import org.marketcetera.metrics.ThreadedMetric;
import org.marketcetera.module.AutowiredModule;
import org.marketcetera.module.DataEmitter;
Expand Down Expand Up @@ -387,7 +388,13 @@ private void setFeedStatus(FeedStatus inNewFeedStatus)
for(MarketDataStatusBroadcaster marketDataStatusPublisher : marketDataStatusBroadcasters) {
marketDataStatusPublisher.reportMarketDataStatus(status);
}
marketDataService.reportMarketDataStatus(status);
}
/**
* provides access to market data services
*/
@Autowired
private MarketDataService marketDataService;
/**
* optional market data status publishers
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public Void call()
@Override
public Set<Capability> getAvailableCapability()
{
return executeCall(new Callable<Set<Capability>>(){
return executeCall(new Callable<Set<Capability>>() {
@Override
public Set<Capability> call()
throws Exception
Expand Down Expand Up @@ -299,6 +299,41 @@ public Set<Capability> call()
}
});
}
/* (non-Javadoc)
* @see org.marketcetera.marketdata.MarketDataClient#getProviders()
*/
@Override
public Set<String> getProviders()
{
return executeCall(new Callable<Set<String>>() {
@Override
public Set<String> call()
throws Exception
{
SLF4JLoggerProxy.trace(MarketDataRpcClient.this,
"{} getting available providers",
getSessionId());
MarketDataRpc.GetMarketDataProvidersRequest.Builder requestBuilder = MarketDataRpc.GetMarketDataProvidersRequest.newBuilder();
requestBuilder.setSessionId(getSessionId().getValue());
MarketDataRpc.GetMarketDataProvidersRequest request = requestBuilder.build();
SLF4JLoggerProxy.trace(MarketDataRpcClient.this,
"{} sending {}",
getSessionId(),
request);
MarketDataRpc.GetMarketDataProvidersResponse response = getBlockingStub().getMarketDataProviders(request);
SLF4JLoggerProxy.trace(MarketDataRpcClient.this,
"{} received {}",
getSessionId(),
response);
Set<String> providers = Sets.newHashSet(response.getProviderList());
SLF4JLoggerProxy.trace(MarketDataRpcClient.this,
"{} returning {}",
getSessionId(),
providers);
return providers;
}
});
}
/**
* Create a new MarketDataRpcClient instance.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ message RemoveMarketDataStatusListenerRequest {
message RemoveMarketDataStatusListenerResponse {
}

message GetMarketDataProvidersRequest {
string sessionId = 1;
}

message GetMarketDataProvidersResponse {
repeated string provider = 1;
}

service MarketDataRpcService {
rpc login(LoginRequest) returns (LoginResponse);
rpc logout(LogoutRequest) returns (LogoutResponse);
Expand All @@ -77,4 +85,5 @@ service MarketDataRpcService {
rpc getAvailableCapability(AvailableCapabilityRequest) returns (AvailableCapabilityResponse);
rpc addMarketDataStatusListener(AddMarketDataStatusListenerRequest) returns (stream MarketDataStatusListenerResponse);
rpc removeMarketDataStatusListener(RemoveMarketDataStatusListenerRequest) returns (RemoveMarketDataStatusListenerResponse);
rpc getMarketDataProviders(GetMarketDataProvidersRequest) returns (GetMarketDataProvidersResponse);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.marketcetera.marketdata.MarketDataStatus;
import org.marketcetera.marketdata.MarketDataStatusListener;
import org.marketcetera.marketdata.core.rpc.MarketDataRpc;
import org.marketcetera.marketdata.core.rpc.MarketDataRpc.GetMarketDataProvidersRequest;
import org.marketcetera.marketdata.core.rpc.MarketDataRpc.GetMarketDataProvidersResponse;
import org.marketcetera.marketdata.core.rpc.MarketDataRpcServiceGrpc;
import org.marketcetera.marketdata.core.rpc.MarketDataRpcServiceGrpc.MarketDataRpcServiceImplBase;
import org.marketcetera.marketdata.core.rpc.MarketDataTypesRpc;
Expand Down Expand Up @@ -283,6 +285,33 @@ public void getAvailableCapability(MarketDataRpc.AvailableCapabilityRequest inRe
inResponseObserver.onCompleted();
}
}
/* (non-Javadoc)
* @see org.marketcetera.marketdata.core.rpc.MarketDataRpcServiceGrpc.MarketDataRpcServiceImplBase#getMarketDataProviders(org.marketcetera.marketdata.core.rpc.MarketDataRpc.GetMarketDataProvidersRequest, io.grpc.stub.StreamObserver)
*/
@Override
public void getMarketDataProviders(GetMarketDataProvidersRequest inRequest,
StreamObserver<GetMarketDataProvidersResponse> inResponseObserver)
{
try {
validateAndReturnSession(inRequest.getSessionId());
SLF4JLoggerProxy.trace(MarketDataRpcService.this,
"Received market data providers request {}",
inRequest);
MarketDataRpc.GetMarketDataProvidersResponse.Builder responseBuilder = MarketDataRpc.GetMarketDataProvidersResponse.newBuilder();
Set<String> providers = marketDataService.getProviders();
providers.forEach(provider -> responseBuilder.addProvider(provider));
MarketDataRpc.GetMarketDataProvidersResponse response = responseBuilder.build();
SLF4JLoggerProxy.trace(MarketDataRpcService.this,
"Sending response: {}",
response);
inResponseObserver.onNext(response);
inResponseObserver.onCompleted();
} catch (Exception e) {
handleError(e,
inResponseObserver);
inResponseObserver.onCompleted();
}
}
/* (non-Javadoc)
* @see org.marketcetera.marketdata.core.rpc.MarketDataRpcServiceGrpc.MarketDataRpcServiceImplBase#addMarketDataStatusListener(org.marketcetera.marketdata.core.rpc.MarketDataRpc.AddMarketDataStatusListenerRequest, io.grpc.stub.StreamObserver)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ public void reportCapability(Collection<Capability> inCapabilities)
{
throw new UnsupportedOperationException(); // TODO
}
/* (non-Javadoc)
* @see org.marketcetera.marketdata.service.MarketDataService#getProviders()
*/
@Override
public Set<String> getProviders()
{
throw new UnsupportedOperationException(); // TODO
}
/**
* Get the requests value.
*
Expand Down Expand Up @@ -218,9 +226,7 @@ public Set<Capability> getCapabilitiesToReturn()
return capabilitiesToReturn;
}
/**
*
*
*
* Reset the object.
*/
public void reset()
{
Expand Down
Loading

0 comments on commit 31b9243

Please sign in to comment.