Skip to content

Commit

Permalink
Fix subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesagnew committed Aug 13, 2017
1 parent be5c5eb commit 04f1629
Show file tree
Hide file tree
Showing 10 changed files with 268 additions and 299 deletions.
Expand Up @@ -1077,7 +1077,7 @@ public <R extends IBaseResource> Set<Long> processMatchUrl(String theMatchUrl, C
RuntimeResourceDefinition resourceDef = getContext().getResourceDefinition(theResourceType);

SearchParameterMap paramMap = translateMatchUrl(this, myContext, theMatchUrl, resourceDef);
paramMap.setPersistResults(false);
paramMap.setLoadSynchronous(true);

if (paramMap.isEmpty() && paramMap.getLastUpdated() == null) {
throw new InvalidRequestException("Invalid match URL[" + theMatchUrl + "] - URL has no search parameters");
Expand Down
Expand Up @@ -20,21 +20,26 @@
* #L%
*/

import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;

import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.jpa.dao.*;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
import ca.uhn.fhir.jpa.provider.ServletSubRequestDetails;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.interceptor.ServerOperationInterceptorAdapter;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;

public abstract class BaseRestHookSubscriptionInterceptor extends ServerOperationInterceptorAdapter {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(BaseRestHookSubscriptionInterceptor.class);
import javax.annotation.PostConstruct;
import java.util.concurrent.*;

public abstract class BaseRestHookSubscriptionInterceptor extends ServerOperationInterceptorAdapter {
protected static final Integer MAX_SUBSCRIPTION_RESULTS = 10000;
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(BaseRestHookSubscriptionInterceptor.class);
protected ExecutorService myExecutor;
private int myExecutorThreadCount = 1;

protected abstract IFhirResourceDao<?> getSubscriptionDao();

Expand All @@ -47,6 +52,18 @@ protected void checkSubscriptionCriterias(String theCriteria) {
}
}

@PostConstruct
public void postConstruct() {
try {
myExecutor = new ThreadPoolExecutor(myExecutorThreadCount, myExecutorThreadCount,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1000));

myExecutor = Executors.newFixedThreadPool(myExecutorThreadCount);
} catch (Exception e) {
throw new RuntimeException("Unable to get DAO from PROXY");
}
}

private IBundleProvider executeSubscriptionCriteria(String theCriteria, IIdType idType) {
String criteria = theCriteria;

Expand All @@ -64,7 +81,7 @@ private IBundleProvider executeSubscriptionCriteria(String theCriteria, IIdType

/**
* Search based on a query criteria
*
*
* @param theCheckOnly Is this just a test that the search works
*/
protected IBundleProvider getBundleProvider(String theCriteria, boolean theCheckOnly) {
Expand All @@ -75,13 +92,13 @@ protected IBundleProvider getBundleProvider(String theCriteria, boolean theCheck
req.setSubRequest(true);

IFhirResourceDao<? extends IBaseResource> responseDao = getSubscriptionDao().getDao(responseResourceDef.getImplementingClass());

if (theCheckOnly) {
responseCriteriaUrl.setLoadSynchronousUpTo(1);
} else {
responseCriteriaUrl.setLoadSynchronousUpTo(MAX_SUBSCRIPTION_RESULTS);
}

IBundleProvider responseResults = responseDao.search(responseCriteriaUrl, req);
return responseResults;
}
Expand Down
Expand Up @@ -20,25 +20,6 @@
* #L%
*/

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.annotation.PostConstruct;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.http.client.methods.*;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;

import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
Expand All @@ -49,24 +30,36 @@
import ca.uhn.fhir.model.dstu2.valueset.SubscriptionChannelTypeEnum;
import ca.uhn.fhir.model.dstu2.valueset.SubscriptionStatusEnum;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.*;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.param.TokenParam;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;

public class RestHookSubscriptionDstu2Interceptor extends BaseRestHookSubscriptionInterceptor {
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.ArrayList;
import java.util.List;

private static volatile ExecutorService executor;
private final static int MAX_THREADS = 1;
public class RestHookSubscriptionDstu2Interceptor extends BaseRestHookSubscriptionInterceptor {

private static final Logger ourLog = LoggerFactory.getLogger(RestHookSubscriptionDstu2Interceptor.class);

private final List<Subscription> myRestHookSubscriptions = new ArrayList<>();
@Autowired
private FhirContext myFhirContext;

private boolean myNotifyOnDelete = false;

private final List<Subscription> myRestHookSubscriptions = new ArrayList<Subscription>();
@Autowired
@Qualifier("mySubscriptionDaoDstu2")
private IFhirResourceDao<Subscription> mySubscriptionDao;
Expand Down Expand Up @@ -117,14 +110,13 @@ private void checkSubscriptions(IIdType idType, String resourceType, RestOperati
ourLog.info("Found match: queueing rest-hook notification for resource: {}", next.getIdElement());
HttpUriRequest request = createRequest(subscription, next, theOperation);
if (request != null) {
executor.submit(new HttpRequestDstu2Job(request, subscription));
myExecutor.submit(new HttpRequestDstu2Job(request, subscription));
}
}
}
}



/**
* Creates an HTTP Post for a subscription
*/
Expand Down Expand Up @@ -242,6 +234,10 @@ protected IFhirResourceDao<?> getSubscriptionDao() {
return mySubscriptionDao;
}

public void setSubscriptionDao(IFhirResourceDao<Subscription> theSubscriptionDao) {
mySubscriptionDao = theSubscriptionDao;
}

@Override
public void incomingRequestPreHandled(RestOperationTypeEnum theOperation, ActionRequestDetails theDetails) {
// check the subscription criteria to see if its valid before creating or updating a subscription
Expand Down Expand Up @@ -286,22 +282,17 @@ public boolean isNotifyOnDelete() {
return myNotifyOnDelete;
}

public void setNotifyOnDelete(boolean notifyOnDelete) {
this.myNotifyOnDelete = notifyOnDelete;
}

/**
* Subclasses may override
*/
protected String massageCriteria(String theCriteria) {
return theCriteria;
}

@PostConstruct
public void postConstruct() {
try {
executor = Executors.newFixedThreadPool(MAX_THREADS);
} catch (Exception e) {
throw new RuntimeException("Unable to get DAO from PROXY");
}
}

/**
* Remove subscription from cache
*
Expand Down Expand Up @@ -330,8 +321,8 @@ public void resourceCreated(RequestDetails theRequest, IBaseResource theResource
if (theResource instanceof Subscription) {
Subscription subscription = (Subscription) theResource;
if (subscription.getChannel() != null
&& subscription.getChannel().getTypeElement().getValueAsEnum() == SubscriptionChannelTypeEnum.REST_HOOK
&& subscription.getStatusElement().getValueAsEnum() == SubscriptionStatusEnum.REQUESTED) {
&& subscription.getChannel().getTypeElement().getValueAsEnum() == SubscriptionChannelTypeEnum.REST_HOOK
&& subscription.getStatusElement().getValueAsEnum() == SubscriptionStatusEnum.REQUESTED) {
removeLocalSubscription(subscription.getIdElement().getIdPart());
subscription.setStatus(SubscriptionStatusEnum.ACTIVE);
myRestHookSubscriptions.add(subscription);
Expand All @@ -345,16 +336,13 @@ public void resourceCreated(RequestDetails theRequest, IBaseResource theResource
/**
* Check subscriptions to see if there is a matching subscription when there is delete
*
* @param theRequest
* A bean containing details about the request that is about to be processed, including details such as the
* resource type and logical ID (if any) and other FHIR-specific aspects of the request which have been
* pulled out of the {@link HttpServletRequest servlet request}.
* @param theRequest
* The incoming request
* @param theResource
* The response. Note that interceptors may choose to provide a response (i.e. by calling
* {@link HttpServletResponse#getWriter()}) but in that case it is important to return <code>false</code>
* to indicate that the server itself should not also provide a response.
* @param theRequest A bean containing details about the request that is about to be processed, including details such as the
* resource type and logical ID (if any) and other FHIR-specific aspects of the request which have been
* pulled out of the {@link HttpServletRequest servlet request}.
* @param theRequest The incoming request
* @param theResource The response. Note that interceptors may choose to provide a response (i.e. by calling
* {@link HttpServletResponse#getWriter()}) but in that case it is important to return <code>false</code>
* to indicate that the server itself should not also provide a response.
*/
@Override
public void resourceDeleted(RequestDetails theRequest, IBaseResource theResource) {
Expand Down Expand Up @@ -401,12 +389,4 @@ public void setFhirContext(FhirContext theFhirContext) {
myFhirContext = theFhirContext;
}

public void setNotifyOnDelete(boolean notifyOnDelete) {
this.myNotifyOnDelete = notifyOnDelete;
}

public void setSubscriptionDao(IFhirResourceDao<Subscription> theSubscriptionDao) {
mySubscriptionDao = theSubscriptionDao;
}

}

0 comments on commit 04f1629

Please sign in to comment.