Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Matp 1019 Photon market data view should allow specific exchange #1130

Merged
merged 4 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import org.marketcetera.core.InMemoryIDFactory;
import org.marketcetera.core.InternalID;
import org.marketcetera.core.NoMoreIDsException;
import org.marketcetera.core.publisher.Subscriber;
import org.marketcetera.core.publisher.PublisherEngine;
import org.marketcetera.core.publisher.Subscriber;
import org.marketcetera.event.AggregateEvent;
import org.marketcetera.event.Event;
import org.marketcetera.event.EventTranslator;
Expand Down Expand Up @@ -120,8 +120,7 @@ public abstract class AbstractMarketDataFeed<T extends AbstractMarketDataFeedTok
private static final Callable<Boolean> PUBLISHING_CONDITION = ConditionsFactory.createSamplingCondition(100,
"metc.metrics.marketdata.sampling.interval"); //$NON-NLS-1$
/**
* Indicates if the feed is allowed to simulate market data if the normal source is not
* available.
* Indicates if the feed is allowed to simulate market data if the normal source is not available.
*
* @return a <code>boolean</code> value
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.marketcetera.util.log.SLF4JLoggerProxy;
import org.marketcetera.util.misc.ClassVersion;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -171,51 +173,57 @@ public final void requestData(DataRequest inRequest,
}
} else if (requestPayload instanceof MarketDataRequest) {
request = (MarketDataRequest)requestPayload;
} else if(requestPayload instanceof FeedStatusRequest) {
doFeedStatusRequest((FeedStatusRequest)requestPayload,
inRequest,
inSupport);
} else {
throw new UnsupportedRequestParameterType(instanceURN,
requestPayload);
}
try {
Subscriber subscriber = new Subscriber() {
@Override
public boolean isInteresting(Object inData)
{
return inData instanceof Event;
}
@Override
public void publishTo(final Object inEvent)
{
if(inEvent instanceof Event) {
requestLock.executeRead(new Runnable() {
@Override
public void run()
{
Event event = (Event)inEvent;
Object token = event.getSource();
RequestID requestID = requests.get(token);
event.setSource(requestID);
}
});
if(request != null) {
try {
Subscriber subscriber = new Subscriber() {
@Override
public boolean isInteresting(Object inData)
{
return inData instanceof Event;
}
ThreadedMetric.event("mdata-OUT"); //$NON-NLS-1$
inSupport.send(inEvent);
}
};
MarketDataFeedTokenSpec spec = MarketDataFeedTokenSpec.generateTokenSpec(request,
subscriber);
final T token = feed.execute(spec);
requestLock.executeWrite(new Runnable() {
@Override
public void run()
{
requests.put(token,
inSupport.getRequestID());
}
});
} catch (Exception e) {
throw new IllegalRequestParameterValue(instanceURN,
requestPayload,
e);
@Override
public void publishTo(final Object inEvent)
{
if(inEvent instanceof Event) {
requestLock.executeRead(new Runnable() {
@Override
public void run()
{
Event event = (Event)inEvent;
Object token = event.getSource();
RequestID requestID = requests.get(token);
event.setSource(requestID);
}
});
}
ThreadedMetric.event("mdata-OUT"); //$NON-NLS-1$
inSupport.send(inEvent);
}
};
MarketDataFeedTokenSpec spec = MarketDataFeedTokenSpec.generateTokenSpec(request,
subscriber);
final T token = feed.execute(spec);
requestLock.executeWrite(new Runnable() {
@Override
public void run()
{
requests.put(token,
inSupport.getRequestID());
}
});
} catch (Exception e) {
throw new IllegalRequestParameterValue(instanceURN,
requestPayload,
e);
}
}
}
/* (non-Javadoc)
Expand Down Expand Up @@ -369,6 +377,14 @@ private void setFeedStatus(FeedStatus inNewFeedStatus)
String newStatusString = inNewFeedStatus.toString();
String oldStatusString = feedStatus.toString();
feedStatus = inNewFeedStatus;
for(FeedStatusRequestData feedStatusRequestData : feedStatusRequestDataByDataFlowId.asMap().values()) {
try {
feedStatusRequestData.getDataEmitterSupport().send(feedStatus);
} catch (Exception e) {
SLF4JLoggerProxy.warn(this,
e);
}
}
mNotificationDelegate.sendNotification(new AttributeChangeNotification(this,
mSequence.getAndIncrement(),
System.currentTimeMillis(),
Expand All @@ -378,6 +394,57 @@ private void setFeedStatus(FeedStatus inNewFeedStatus)
oldStatusString,
newStatusString));
}
/**
* Create a feed status request.
*
* @param inRequestPayload a <code>FeedStatusRequest</code> value
* @param inRequest a <code>DataRequest</code> value
* @param inSupport a <code>DataEmitterSupport</code> value
*/
private void doFeedStatusRequest(FeedStatusRequest inRequestPayload,
DataRequest inRequest,
DataEmitterSupport inSupport)
{
FeedStatusRequestData metaData = new FeedStatusRequestData(inSupport);
feedStatusRequestDataByDataFlowId.put(inSupport.getFlowID(),
metaData);
}
/**
* Holds data relevant to a feed status request as part of a module data flow.
*
* @author <a href="mailto:colin@marketcetera.com">Colin DuPlantis</a>
* @version $Id$
* @since $Release$
*/
private static class FeedStatusRequestData
{
/**
* Get the dataEmitterSupport value.
*
* @return a <code>DataEmitterSupport</code> value
*/
protected DataEmitterSupport getDataEmitterSupport()
{
return dataEmitterSupport;
}
/**
* Create a new FeedStatusRequestData instance.
*
* @param inDataEmitterSupport a <code>DataEmitterSupport</code> value
*/
private FeedStatusRequestData(DataEmitterSupport inDataEmitterSupport)
{
dataEmitterSupport = inDataEmitterSupport;
}
/**
* data emitter support value
*/
private final DataEmitterSupport dataEmitterSupport;
}
/**
* holds feed status requests by data flow id
*/
private final Cache<DataFlowID,FeedStatusRequestData> feedStatusRequestDataByDataFlowId = CacheBuilder.newBuilder().build();
/**
* tracks feeds by provider name as the feeds are instantiated (not started) - may not be active
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,13 @@ CollectionPageResponse<Event> getSnapshot(Instrument inInstrument,
* @return a <code>Set&lt;Capability&gt;</code> value
*/
Set<Capability> getAvailableCapability();
/**
* Gets the active providers.
*
* <p>Providers may or may not be connected at this time, these are the providers known
* to the system.</p>
*
* @return a <code>Set&lt;String&gt;</code> value
*/
Set<String> getProviders();
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,13 @@ CollectionPageResponse<Event> getSnapshot(Instrument inInstrument,
* @return a <code>Set&lt;Capability&gt;</code> value
*/
Set<Capability> getAvailableCapability();
/**
* Gets the active providers.
*
* <p>Providers may or may not be connected at this time, these are the providers known
* to the system.</p>
*
* @return a <code>Set&lt;String&gt;</code> value
*/
Set<String> getProviders();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.marketcetera.core.publisher.Subscriber;
import org.marketcetera.event.Event;
import org.marketcetera.marketdata.IFeedComponent.FeedType;
import org.marketcetera.marketdata.service.MarketDataService;
import org.marketcetera.metrics.ThreadedMetric;
import org.marketcetera.module.AutowiredModule;
import org.marketcetera.module.DataEmitter;
Expand Down Expand Up @@ -387,7 +388,13 @@ private void setFeedStatus(FeedStatus inNewFeedStatus)
for(MarketDataStatusBroadcaster marketDataStatusPublisher : marketDataStatusBroadcasters) {
marketDataStatusPublisher.reportMarketDataStatus(status);
}
marketDataService.reportMarketDataStatus(status);
}
/**
* provides access to market data services
*/
@Autowired
private MarketDataService marketDataService;
/**
* optional market data status publishers
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public Void call()
@Override
public Set<Capability> getAvailableCapability()
{
return executeCall(new Callable<Set<Capability>>(){
return executeCall(new Callable<Set<Capability>>() {
@Override
public Set<Capability> call()
throws Exception
Expand Down Expand Up @@ -299,6 +299,41 @@ public Set<Capability> call()
}
});
}
/* (non-Javadoc)
* @see org.marketcetera.marketdata.MarketDataClient#getProviders()
*/
@Override
public Set<String> getProviders()
{
return executeCall(new Callable<Set<String>>() {
@Override
public Set<String> call()
throws Exception
{
SLF4JLoggerProxy.trace(MarketDataRpcClient.this,
"{} getting available providers",
getSessionId());
MarketDataRpc.GetMarketDataProvidersRequest.Builder requestBuilder = MarketDataRpc.GetMarketDataProvidersRequest.newBuilder();
requestBuilder.setSessionId(getSessionId().getValue());
MarketDataRpc.GetMarketDataProvidersRequest request = requestBuilder.build();
SLF4JLoggerProxy.trace(MarketDataRpcClient.this,
"{} sending {}",
getSessionId(),
request);
MarketDataRpc.GetMarketDataProvidersResponse response = getBlockingStub().getMarketDataProviders(request);
SLF4JLoggerProxy.trace(MarketDataRpcClient.this,
"{} received {}",
getSessionId(),
response);
Set<String> providers = Sets.newHashSet(response.getProviderList());
SLF4JLoggerProxy.trace(MarketDataRpcClient.this,
"{} returning {}",
getSessionId(),
providers);
return providers;
}
});
}
/**
* Create a new MarketDataRpcClient instance.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ message RemoveMarketDataStatusListenerRequest {
message RemoveMarketDataStatusListenerResponse {
}

message GetMarketDataProvidersRequest {
string sessionId = 1;
}

message GetMarketDataProvidersResponse {
repeated string provider = 1;
}

service MarketDataRpcService {
rpc login(LoginRequest) returns (LoginResponse);
rpc logout(LogoutRequest) returns (LogoutResponse);
Expand All @@ -77,4 +85,5 @@ service MarketDataRpcService {
rpc getAvailableCapability(AvailableCapabilityRequest) returns (AvailableCapabilityResponse);
rpc addMarketDataStatusListener(AddMarketDataStatusListenerRequest) returns (stream MarketDataStatusListenerResponse);
rpc removeMarketDataStatusListener(RemoveMarketDataStatusListenerRequest) returns (RemoveMarketDataStatusListenerResponse);
rpc getMarketDataProviders(GetMarketDataProvidersRequest) returns (GetMarketDataProvidersResponse);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.marketcetera.marketdata.MarketDataStatus;
import org.marketcetera.marketdata.MarketDataStatusListener;
import org.marketcetera.marketdata.core.rpc.MarketDataRpc;
import org.marketcetera.marketdata.core.rpc.MarketDataRpc.GetMarketDataProvidersRequest;
import org.marketcetera.marketdata.core.rpc.MarketDataRpc.GetMarketDataProvidersResponse;
import org.marketcetera.marketdata.core.rpc.MarketDataRpcServiceGrpc;
import org.marketcetera.marketdata.core.rpc.MarketDataRpcServiceGrpc.MarketDataRpcServiceImplBase;
import org.marketcetera.marketdata.core.rpc.MarketDataTypesRpc;
Expand Down Expand Up @@ -283,6 +285,33 @@ public void getAvailableCapability(MarketDataRpc.AvailableCapabilityRequest inRe
inResponseObserver.onCompleted();
}
}
/* (non-Javadoc)
* @see org.marketcetera.marketdata.core.rpc.MarketDataRpcServiceGrpc.MarketDataRpcServiceImplBase#getMarketDataProviders(org.marketcetera.marketdata.core.rpc.MarketDataRpc.GetMarketDataProvidersRequest, io.grpc.stub.StreamObserver)
*/
@Override
public void getMarketDataProviders(GetMarketDataProvidersRequest inRequest,
StreamObserver<GetMarketDataProvidersResponse> inResponseObserver)
{
try {
validateAndReturnSession(inRequest.getSessionId());
SLF4JLoggerProxy.trace(MarketDataRpcService.this,
"Received market data providers request {}",
inRequest);
MarketDataRpc.GetMarketDataProvidersResponse.Builder responseBuilder = MarketDataRpc.GetMarketDataProvidersResponse.newBuilder();
Set<String> providers = marketDataService.getProviders();
providers.forEach(provider -> responseBuilder.addProvider(provider));
MarketDataRpc.GetMarketDataProvidersResponse response = responseBuilder.build();
SLF4JLoggerProxy.trace(MarketDataRpcService.this,
"Sending response: {}",
response);
inResponseObserver.onNext(response);
inResponseObserver.onCompleted();
} catch (Exception e) {
handleError(e,
inResponseObserver);
inResponseObserver.onCompleted();
}
}
/* (non-Javadoc)
* @see org.marketcetera.marketdata.core.rpc.MarketDataRpcServiceGrpc.MarketDataRpcServiceImplBase#addMarketDataStatusListener(org.marketcetera.marketdata.core.rpc.MarketDataRpc.AddMarketDataStatusListenerRequest, io.grpc.stub.StreamObserver)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ public void reportCapability(Collection<Capability> inCapabilities)
{
throw new UnsupportedOperationException(); // TODO
}
/* (non-Javadoc)
* @see org.marketcetera.marketdata.service.MarketDataService#getProviders()
*/
@Override
public Set<String> getProviders()
{
throw new UnsupportedOperationException(); // TODO
}
/**
* Get the requests value.
*
Expand Down Expand Up @@ -218,9 +226,7 @@ public Set<Capability> getCapabilitiesToReturn()
return capabilitiesToReturn;
}
/**
*
*
*
* Reset the object.
*/
public void reset()
{
Expand Down
Loading