Skip to content

Commit

Permalink
Notify clients when subscription exists
Browse files Browse the repository at this point in the history
This change is the core of #50: it checks if a subscription for the
given ID exists or not before attempting insertion. If one exists, the
server replies to the client with different HTTP codes depending on
whether the subscription is identical to the one the client is trying to
add or not.

Signed-off-by: Rodrigo Tobar <rtobar@icrar.org>
  • Loading branch information
rtobar committed May 21, 2021
1 parent 33342c6 commit f80f2fa
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 14 deletions.
17 changes: 15 additions & 2 deletions src/ngamsCore/ngamsLib/ngamsDbNgasSubscribers.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

from . import ngamsDbCore
from .ngamsCore import fromiso8601
from .ngamsSubscriber import ngamsSubscriber


class ngamsDbNgasSubscribers(ngamsDbCore.ngamsDbCore):
Expand Down Expand Up @@ -132,6 +133,11 @@ def _getSubscriberInfo(self, tx, subscrId=None, hostId=None, portNo=-1):


def insertSubscriberEntry(self, sub_obj):
"""
Inserts the new subscription object into the NGAS subscription table.
If an object with the same subscription ID exists, its contents are
returned; otherwise the given object is returned.
"""

hostId = sub_obj.getHostId()
portNo = sub_obj.getPortNo()
Expand All @@ -157,8 +163,15 @@ def insertSubscriberEntry(self, sub_obj):
filterPlugIn, filterPlugInPars, \
lastFileIngDate, concurrent_threads)

self.query2(sql, args = vals)
self.triggerEvents()
# If a subscriber with the given ID already exists return that
with self.transaction() as tx:
existing_sub = self._getSubscriberInfo(tx, subscrId=subscrId)
if existing_sub:
sub_obj = ngamsSubscriber().unpackSqlResult(existing_sub[0])
else:
tx.execute(sql, vals)
self.triggerEvents()
return sub_obj


def updateSubscriberEntry(self, sub_obj):
Expand Down
27 changes: 17 additions & 10 deletions src/ngamsServer/ngamsServer/commands/subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
import time

from ngamsLib.ngamsCore import \
genLog, NGAMS_SUBSCRIBE_CMD, fromiso8601, toiso8601
genLog, NGAMS_SUBSCRIBE_CMD, fromiso8601, toiso8601, NGAMS_FAILURE,\
NGAMS_SUCCESS
from ngamsLib import ngamsSubscriber, ngamsLib


Expand All @@ -52,10 +53,16 @@ def addSubscriber(srvObj, subscrObj):
Returns: Void.
"""
srvObj.getDb().insertSubscriberEntry(subscrObj)
#subscrObj.write(srvObj.getDb())

srvObj.registerSubscriber(subscrObj)
subscr_in_db = srvObj.getDb().insertSubscriberEntry(subscrObj)
if subscr_in_db is subscrObj:
srvObj.registerSubscriber(subscrObj)
# Trigger the Data Susbcription Thread to make it check if there are
# files to deliver to the new Subscriber.
srvObj.addSubscriptionInfo([], [subscrObj]).triggerSubscriptionThread()
elif subscr_in_db == subscrObj:
return 'equal'
else:
return 'unequal'


def handleCmd(srvObj,
Expand Down Expand Up @@ -137,10 +144,10 @@ def handleCmd(srvObj,
subscrObj.setLastFileIngDate(lastIngDate)

# Register the Subscriber.
addSubscriber(srvObj, subscrObj)

# Trigger the Data Susbcription Thread to make it check if there are
# files to deliver to the new Subscriber.
srvObj.addSubscriptionInfo([], [subscrObj]).triggerSubscriptionThread()
existence_test = addSubscriber(srvObj, subscrObj)
if existence_test == 'equal':
return 201, NGAMS_SUCCESS, "Identical subscription with ID '%s' existed" % (id,)
elif existence_test == 'unequal':
return 409, NGAMS_FAILURE, "Different subscription with ID '%s' existed" % (id,)

return "Handled SUBSCRIBE command"
35 changes: 33 additions & 2 deletions test/test_subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
import requests
import trustme

from ngamsLib import ngamsHttpUtils, ngamsSubscriber
from ngamsLib.ngamsCore import getHostName
from ngamsLib import ngamsHttpUtils, ngamsDb, ngamsSubscriber
from ngamsLib.ngamsCore import getHostName, toiso8601
from .ngamsTestLib import ngamsTestSuite, tmp_path, genTmpFilename

try:
Expand Down Expand Up @@ -437,6 +437,37 @@ def test_subscription_equality(self):
subs2 = ngamsSubscriber.ngamsSubscriber(url=URL, subscrId='my-id')
self.assertEqual(subs1, subs2)

# Store in DB, check equality holds
cfg = self.env_aware_cfg()
self.point_to_sqlite_database(cfg, tmp_path('ngas.sqlite'))
db = ngamsDb.from_config(cfg, maxpool=1)
with contextlib.closing(db):
db_subs1 = db.insertSubscriberEntry(subs1)
db_subs2 = db.insertSubscriberEntry(subs2)
self.assertIs(subs1, db_subs1)
self.assertEqual(subs1, db_subs2)
self.assertEqual(subs2, db_subs2)

def test_duplicate_subscription(self):
"""
Test that creating multiple subscriptions with the same ID results in
different HTTP codes returned to the client
"""
URL = 'http://127.0.0.1:1234/path'
NOW = time.time()
START_DATE = toiso8601(NOW, local=True)
def assert_subscription(http_status, ngams_status, url=URL, start_date=START_DATE):
status = self.client.subscribe(url=url, startDate=start_date,
pars=[['subscr_id', 'my-id']])
self.assertEqual(status.http_status, http_status)
self.assertEqual(status.getStatus(), ngams_status)

self.prepExtSrv()
assert_subscription(200, 'SUCCESS')
assert_subscription(201, 'SUCCESS')
assert_subscription(409, 'FAILURE', url=URL + '/subpath')
assert_subscription(409, 'FAILURE', start_date=toiso8601(NOW + 1, local=True))

def upload_subscription_files(self, start_port, end_port, pars=[]):
# Initial archiving
self.qarchive(start_port, 'src/SmallFile.fits', mimeType='application/octet-stream', pars=pars)
Expand Down

0 comments on commit f80f2fa

Please sign in to comment.