diff --git a/src/ngamsServer/ngamsServer/commands/retrieve.py b/src/ngamsServer/ngamsServer/commands/retrieve.py index bb241190..54ddf1f2 100644 --- a/src/ngamsServer/ngamsServer/commands/retrieve.py +++ b/src/ngamsServer/ngamsServer/commands/retrieve.py @@ -286,13 +286,21 @@ def _handleCmdRetrieve(srvObj, fileVer, include_compression=True) # If not located the quick way try the normal way. - if (not ipAddress): - # Locate the file best suiting the query and send it back if possible. - location, host, ipAddress, port, mountPoint, filename, fileId,\ - fileVersion, mimeType, compression =\ - ngamsFileUtils.locateArchiveFile(srvObj, fileId, fileVer, - diskId, hostId, reqPropsObj, - include_compression=True) + if not ipAddress: + try: + # Locate the file best suiting the query and send it back if possible. + location, host, ipAddress, port, mountPoint, filename, fileId, \ + fileVersion, mimeType, compression = \ + ngamsFileUtils.locateArchiveFile(srvObj, fileId, fileVer, + diskId, hostId, reqPropsObj, + include_compression=True) + except: + # If the file is still not found then try a remote partner site + location, host, ipAddress, port, mountPoint, filename, fileId, \ + fileVersion, mimeType, compression = \ + ngamsFileUtils.lookup_partner_site_file(srvObj, fileId, + fileVer, reqPropsObj, + include_compression=True) # If still not located, try to contact associated NGAS sites to query # if the file is available there. @@ -310,9 +318,8 @@ def _handleCmdRetrieve(srvObj, # Perform the possible processing requested. procResult, compression = performProcessing(srvObj, reqPropsObj, srcFilename, mimeType, compression) - elif location in (NGAMS_HOST_CLUSTER, NGAMS_HOST_REMOTE) and \ - srvObj.getCfg().getProxyMode(): + elif location == NGAMS_HOST_CLUSTER and srvObj.getCfg().getProxyMode(): logger.info("NG/AMS Server acting as proxy - requesting file with ID: %s " +\ "from NG/AMS Server on host/port: %s/%s", fileId, host, str(port)) @@ -324,6 +331,18 @@ def _handleCmdRetrieve(srvObj, httpRef.proxy_request(host, ipAddress, port, timeout=timeout) return + elif location == NGAMS_HOST_REMOTE and srvObj.is_partner_sites_proxy_mode(): + logger.info("NG/AMS Server acting as remote proxy - requesting file with ID: %s " + \ + "from NG/AMS Server on host/port: %s/%s", + fileId, host, str(port)) + + # Act as a remote proxy - get the file from the remote NGAS host + # specified and send back the contents. The file is temporarily stored + # in the Processing Area. + timeout = float(reqPropsObj['timeout']) if 'timeout' in reqPropsObj else 300 + httpRef.remote_proxy_request(reqPropsObj, host, port, timeout=timeout) + return + else: # No proxy mode: A redirection HTTP response is generated. httpRef.redirect(ipAddress, port) @@ -334,8 +353,8 @@ def _handleCmdRetrieve(srvObj, def handleCmd(srvObj, - reqPropsObj, - httpRef): + reqPropsObj, + httpRef): """ Handle a RETRIEVE command. diff --git a/src/ngamsServer/ngamsServer/commands/status.py b/src/ngamsServer/ngamsServer/commands/status.py index 29391038..d6c7048c 100644 --- a/src/ngamsServer/ngamsServer/commands/status.py +++ b/src/ngamsServer/ngamsServer/commands/status.py @@ -41,7 +41,7 @@ import six -from ngamsLib.ngamsCore import NGAMS_HOST_LOCAL,\ +from ngamsLib.ngamsCore import NGAMS_HOST_LOCAL, NGAMS_HOST_REMOTE,\ getHostName, genLog, genUniqueId, rmFile,\ compressFile, NGAMS_GZIP_XML_MT, getNgamsVersion,\ NGAMS_SUCCESS, NGAMS_XML_MT, fromiso8601, toiso8601 @@ -87,12 +87,23 @@ def _checkFileAccess(srvObj, # Check if the file is located on this host, or if the request should be # forwarded (if this server should act as proxy). - location, fileHost, ipAddress, filePortNo, mountPoint, filename, fileId,\ - fileVersion, mimeType =\ - ngamsFileUtils.locateArchiveFile(srvObj, fileId, fileVersion, - diskId) + try: + location, fileHost, ipAddress, filePortNo, mountPoint, filename, \ + fileId, fileVersion, mimeType = \ + ngamsFileUtils.locateArchiveFile(srvObj, fileId, fileVersion, + diskId, reqPropsObj=reqPropsObj) + except: + # If the file is still not found then try a remote partner site + location, fileHost, ipAddress, filePortNo, mountPoint, filename, \ + fileId, fileVersion, mimeType = \ + ngamsFileUtils.lookup_partner_site_file(srvObj, fileId, + fileVersion, reqPropsObj) + + if location == NGAMS_HOST_REMOTE: + fileHost = "{0}:{1}".format(fileHost, filePortNo) + return genLog("NGAMS_INFO_FILE_AVAIL", [fileId + "/Version: " + str(fileVersion), fileHost]) - if (location != NGAMS_HOST_LOCAL): + elif location != NGAMS_HOST_LOCAL: # Go and get it! host, port = srvObj.get_remote_server_endpoint(fileHost) pars = (('file_access', fileId), ('file_version', fileVersion), ('disk_id', diskId)) @@ -387,8 +398,8 @@ def _handleFileListReply(srvObj, def handleCmd(srvObj, - reqPropsObj, - httpRef): + reqPropsObj, + httpRef): """ Handle STATUS command. @@ -403,12 +414,11 @@ def handleCmd(srvObj, Returns: Void. """ status = ngamsStatus.ngamsStatus() - status.\ - setDate(toiso8601()).\ - setVersion(getNgamsVersion()).setHostId(srvObj.getHostId()).\ - setStatus(NGAMS_SUCCESS).\ - setMessage("Successfully handled command STATUS").\ - setState(srvObj.getState()).setSubState(srvObj.getSubState()) + status.setDate(toiso8601()).\ + setVersion(getNgamsVersion()).setHostId(srvObj.getHostId()).\ + setStatus(NGAMS_SUCCESS).\ + setMessage("Successfully handled command STATUS").\ + setState(srvObj.getState()).setSubState(srvObj.getSubState()) reqPropsObjRef = reqPropsObj @@ -515,17 +525,27 @@ def handleCmd(srvObj, genDiskStatus = 1 elif (fileId): if (not fileVersion): fileVersion = -1 - fileObj = ngamsFileInfo.ngamsFileInfo() - fileObj.read(srvObj.getHostId(), srvObj.getDb(), fileId, fileVersion) - diskObj = ngamsDiskInfo.ngamsDiskInfo() try: - diskObj.read(srvObj.getDb(), fileObj.getDiskId()) + fileObj = ngamsFileInfo.ngamsFileInfo() + fileObj.read(srvObj.getHostId(), srvObj.getDb(), fileId, fileVersion) + diskObj = ngamsDiskInfo.ngamsDiskInfo() + try: + diskObj.read(srvObj.getDb(), fileObj.getDiskId()) + except: + errMsg = "Illegal Disk ID found: {0} for file with ID: {1}".format(fileObj.getDiskId(), fileId) + raise Exception(errMsg) + diskObj.addFileObj(fileObj) + status.addDiskStatus(diskObj) except: - errMsg = "Illegal Disk ID found: %s for file with ID: %s" %\ - (fileObj.getDiskId(), fileId) - raise Exception(errMsg) - diskObj.addFileObj(fileObj) - status.addDiskStatus(diskObj) + # The file was not found in the database. Check if it is available + # on a remote partner site. + host, port, status_info, disk_info, file_info = \ + ngamsFileUtils.lookup_partner_site_file_status(srvObj, fileId, + fileVersion, + reqPropsObj) + # Update status reply using the disk status and file status + # information retrieved from the partner site + status.addDiskStatus(disk_info) genDiskStatus = 1 genFileStatus = 1 elif (requestId): diff --git a/src/ngamsServer/ngamsServer/ngamsFileUtils.py b/src/ngamsServer/ngamsServer/ngamsFileUtils.py index 1ad7e242..e85ee96a 100644 --- a/src/ngamsServer/ngamsServer/ngamsFileUtils.py +++ b/src/ngamsServer/ngamsServer/ngamsFileUtils.py @@ -72,6 +72,170 @@ # `from_bytes` functions need to be aligned. checksum_info = collections.namedtuple('crc_info', 'init method final from_bytes equals') + +def lookup_partner_site_file_status(ngas_server, + file_id, + file_version, + request_properties): + """ + Lookup the file indicated by the File ID using a NGAS partner site (if one + is specified in the configuration). Returns a tuple of status objects. + + Parameters: + + ngas_server: Reference to NG/AMS server class object (ngamsServer). + + file_id: File ID of file to locate (string). + + file_version: Version of the file (integer). + + request_properties: Request Property object to keep track of actions done + during the request handling (ngamsReqProps|None). + + Returns: + + host: Partner site host name (or IP address) + + port: Partner site port number + + status_info: Status response object (ngamsStatus) + + disk_info: Status response disk information object (ngamsDiskInfo) + + file_info: Status response file information object (ngamsFileInfo) + """ + file_reference = file_id + if (file_version > 0): + file_reference += "/Version: " + str(file_version) + + # If the request came from a partner site. We will not continue to + # propagate the request to avoid a death loop scenario. We will raise an + # exception. + if request_properties.hasHttpPar("partner_site_redirect"): + error_message = genLog("NGAMS_ER_UNAVAIL_FILE", [file_reference]) + raise Exception(error_message) + + # Check partner sites is enabled are available from the configuration + if not ngas_server.is_partner_sites_proxy_mode()\ + or not ngas_server.get_partner_sites_address_list(): + error_message = genLog("NGAMS_ER_UNAVAIL_FILE", [file_reference]) + raise Exception(error_message) + + # Lets query the partner sites for the availability of the requested file + authentication_header = ngamsSrvUtils.genIntAuthHdr(ngas_server) + parameter_list = [["file_id", file_id]] + if file_version != -1: + parameter_list.append(["file_version", file_version]) + parameter_list.append(["partner_site_redirect", 1]) + + host, port, status_info, disk_info, file_info = None, None, None, None, None + for partner_site in ngas_server.get_partner_sites_address_list(): + partner_address = partner_site.split(":")[0] + partner_port = int(partner_site.split(":")[-1]) + try: + logger.info("Looking up file ID %s on partner site %s", file_id, + partner_site) + response = ngamsHttpUtils.httpGet(partner_address, partner_port, + NGAMS_STATUS_CMD, + parameter_list, + auth=authentication_header) + with contextlib.closing(response): + response_info = response.read() + except: + # We ignore this error, and try the next partner site, if any + continue + + status_info = ngamsStatus.ngamsStatus().unpackXmlDoc(response_info, 1) + logger.info("Result of File Access Query: {}".format(re.sub("\n", "", + str(status_info.genXml().toprettyxml(' ', '\n'))))) + if status_info.getStatus() == "FAILURE": + logger.info(genLog("NGAMS_INFO_FILE_NOT_AVAIL", [file_id, partner_address])) + else: + logger.info(genLog("NGAMS_INFO_FILE_AVAIL", [file_id, partner_address])) + disk_info = status_info.getDiskStatusList()[0] + file_info = disk_info.getFileObjList()[0] + host = partner_address + port = partner_port + break + + if status_info is None or status_info.getStatus() == "FAILURE": + # Failed to find file on a partner site + error_message = genLog("NGAMS_ER_UNAVAIL_FILE", [file_reference]) + raise Exception(error_message) + + return host, port, status_info, disk_info, file_info + + +def lookup_partner_site_file(ngas_server, + file_id, + file_version, + request_properties, + include_compression): + """ + Lookup the file indicated by the File ID using a NGAS partner site (if one + is specified in the configuration). Returns a list containing the necessary + information for retrieving the file: + + [, , , , , + , , , ] + + - whereby: + + = Location of the file (NGAMS_HOST_LOCAL, + NGAMS_HOST_CLUSTER, NGAMS_HOST_DOMAIN, + NGAMS_HOST_REMOTE). + = Host ID of host to be contacted to get access to the + file. + = IP Address of host to be contacted to get access to the + file. + = Port number used by the NG/AMS Server. + = Mount point at which the file is residing. + = Name of file relative to mount point. + = ID of file. + = Version of file. + = Mime-type of file (as registered in NGAS). + + ngas_server: Reference to NG/AMS server class object (ngamsServer). + + file_id: File ID of file to locate (string). + + file_version: Version of the file (integer). + + request_properties: Request Property object to keep track of actions done + during the request handling (ngamsReqProps|None). + + Returns: List with information about file location (list). + """ + host, port, status_info, disk_info, file_info = \ + lookup_partner_site_file_status(ngas_server, file_id, file_version, + request_properties) + + location = NGAMS_HOST_REMOTE + ip_address = None + + # The file was found, get the info necessary for the acquiring the file. + file_attribute_list = [ location, host, ip_address, port, + disk_info.getMountPoint(), + file_info.getFilename(), + file_info.getFileId(), + file_info.getFileVersion(), + file_info.getFormat() ] + + if include_compression: + file_attribute_list.append(file_info.getCompression()) + + message = "Located suitable file for request - File ID: %s. " \ + + "Info for file found - Location: %s - Host ID/IP: %s/%s - " \ + + "Port Number: %s - File Version: %d - Filename: %s - " \ + + "Mime-type: %s" + logger.debug(message, file_id, location, host, ip_address, port, + file_info.getFileVersion(), + file_info.getFilename(), + file_info.getFormat()) + + return file_attribute_list + + def _locateArchiveFile(srvObj, fileId, fileVersion, @@ -764,4 +928,4 @@ def check_checksum(srvObj, fio, filename): "%s/%s/%s - skipping checksum check" logger.info(msg, fio.getDiskId(), fio.getFileId(), fio.getFileVersion()) -# EOF \ No newline at end of file +# EOF diff --git a/src/ngamsServer/ngamsServer/ngamsServer.py b/src/ngamsServer/ngamsServer/ngamsServer.py index 074dc5d9..ddbc69c3 100644 --- a/src/ngamsServer/ngamsServer/ngamsServer.py +++ b/src/ngamsServer/ngamsServer/ngamsServer.py @@ -63,8 +63,8 @@ NGAMS_HTTP_REDIRECT, NGAMS_HTTP_INT_AUTH_USER, \ NGAMS_SUCCESS, NGAMS_FAILURE, NGAMS_OFFLINE_STATE,\ NGAMS_IDLE_SUBSTATE, NGAMS_BUSY_SUBSTATE, NGAMS_NOTIF_ERROR,\ - NGAMS_NOT_SET, NGAMS_XML_MT, loadPlugInEntryPoint, isoTime2Secs,\ - toiso8601 + NGAMS_NOT_SET, NGAMS_XML_MT, NGAMS_RETRIEVE_CMD, loadPlugInEntryPoint,\ + isoTime2Secs, toiso8601 from ngamsLib import ngamsHighLevelLib, ngamsLib, ngamsEvent, ngamsHttpUtils,\ utils, logutils from ngamsLib import ngamsDb, ngamsConfig, ngamsReqProps, pysendfile @@ -344,7 +344,6 @@ def write_data(self, data): def send_status(self, message, status=NGAMS_SUCCESS, code=None, http_message=None, hdrs={}): """Creates and sends an NGAS status XML document back to the client""" - if code is None: code = 200 if status == NGAMS_SUCCESS else 400 @@ -436,6 +435,76 @@ def proxy_request(self, host_id, host, port, timeout=300): self.send_data(data, mime_type, code=code, hdrs=hdrs) + def remote_proxy_request(self, request, host, port, timeout=300): + """Proxy the current request to remote host ``host``:``port``""" + + url = 'http://{0}:{1}/{2}'.format(host, port, NGAMS_RETRIEVE_CMD) + logger.info("Proxying request for /%s to %s:%d", NGAMS_RETRIEVE_CMD, + host, port) + + parameter_list = [] + for parameter in request.getHttpParNames(): + if parameter.lower() == "initiator": + continue + else: + parameter_list.append([parameter, request.getHttpPar(parameter)]) + + start_byte = 0 + header_list = [] + for header in request.getHttpHdrs(): + header_list.append([header, request.getHttpHdr(header)]) + if header.lower() == "range": + value = request.getHttpHdr(header) + start_byte = int(value.replace("bytes=", "").split("-")[0]) + + block_size = self.ngasServer.getCfg().getBlockSize() + + # Make sure the time_out parameters is within proper boundaries + timeout = min(max(timeout, 0), 1200) + + authorization_header = ngamsSrvUtils.genIntAuthHdr(self.ngasServer) + + response = ngamsHttpUtils.httpGetUrl(url, parameter_list, header_list, + timeout, authorization_header) + + logger.info("Received remote partner site proxy response from %s:%d, sending to client", host, port) + response_headers = {header[0]: header[1] for header in response.getheaders()} + logger.info("Headers from remote partner site proxy response: %r", response_headers) + + with contextlib.closing(response): + size = int(response.getheader("content-length")) + self.write_stream_data(response, response_headers, size, start_byte, block_size) + + def write_stream_data(self, response, headers, size, start_byte=0, block_size=65536): + """Streams the data from the remote host""" + + logger.info("Sending %d bytes of data and headers %r", size, headers) + + self.send_response(response.status, hdrs=headers) + self.end_headers() + + logger.info("Sending %d bytes to client, starting at byte %d", size, start_byte) + data_sent = start_byte + data_to_send = size + start_time = time.time() + + # TODO: Should we do something about https here? + while data_sent < data_to_send: + stream_buffer = response.read(block_size) + stream_buffer_size = len(stream_buffer) + self.wfile.write(stream_buffer) + data_sent += stream_buffer_size + if stream_buffer_size == 0 and data_sent < data_to_send: + logger.error("Data stream is incomplete. Only received %d bytes, expected %d bytes.", + data_sent, data_to_send) + break + + elapsed_time = time.time() - start_time + # Avoid divide by zeros later on, let's say it took us 1 [us] to do this + if elapsed_time == 0: + elapsed_time = 0.000001 + size_mb = size / 1024. / 1024. + logger.info("Sent data stream at %.3f [MB/s]", size_mb / elapsed_time) class logging_config(object): @@ -505,6 +574,10 @@ def __init__(self, cfg_fname, _cert=None): self._pid_file_created = False self._cert = _cert + # Handling partner sites proxy mode feature + self.partner_sites_proxy_mode = False + self.partner_site_address_list = [] + # Keep track of how many requests are being served, # This is slightly different from keeping track of the server's # sub-state because many commands don't bother changing it @@ -686,7 +759,7 @@ def fire_archive_event(self, file_id, file_version): try: s(evt) except: - msg = ("Error while trigerring archiving event subscriber, " + msg = ("Error while triggering archiving event subscriber, " "will continue with the rest anyway") logger.exception(msg) @@ -1426,6 +1499,24 @@ def getSrvListDic(self): return self.__srvListDic + def is_partner_sites_proxy_mode(self): + """ + Return reference to the Partner Sites Proxy Mode. + + Returns: Reference to Partner Sites Proxy Mode. + """ + return self.partner_sites_proxy_mode + + + def get_partner_sites_address_list(self): + """ + Return reference to the Partner Site Address List. + + Returns: Reference to Partner Site Address List. + """ + return self.partner_site_address_list + + def setDiskDic(self, diskDic): """ @@ -2034,6 +2125,22 @@ def noop(*args): if allowProcessingReq: checkCreatePath(self.cfg.getProcessingDirectory()) + # Extract partner site proxy mode setting + value = self.getCfg().getVal("PartnerSites[1].ProxyMode") + if value == "1": + self.partner_sites_proxy_mode = True + + # Extract partner site address list + idx = 1 + while True: + address_key = "PartnerSites[1].PartnerSite[{0}].Address".format(idx) + partner_site_address = self.getCfg().getVal(address_key) + if partner_site_address is None: + break + else: + logger.info("Registering partner site address: %s", partner_site_address) + self.partner_site_address_list.append(partner_site_address) + idx += 1 # Check if there is already a PID file. logger.debug("Check if NG/AMS PID file is existing ...") @@ -2355,4 +2462,4 @@ def main(args=None, prog='ngamsServer', server_class=ngamsServer): _parse_and_run(args, prog, server_class) if __name__ == '__main__': - main() \ No newline at end of file + main() diff --git a/test/commands/test_archive.py b/test/commands/test_archive.py index 0dffa339..0d59195e 100644 --- a/test/commands/test_archive.py +++ b/test/commands/test_archive.py @@ -1126,8 +1126,9 @@ def test_ArchiveProxyMode_01(self): """ ports = range(8001, 8005) cfg_file, cfg_props = self._genArchProxyCfg(ports) - self.prepExtSrv(port=8000, cfgFile=cfg_file, cfgProps=cfg_props) - self.prepCluster(ports, createDatabase = False) + sqlite_file = tmp_path('ngas.sqlite') + self.prepExtSrv(port=8000, cfgFile=cfg_file, cfgProps=cfg_props, sqlite_file=sqlite_file) + self.prepCluster(ports, createDatabase = False, sqlite_file=sqlite_file) noOfNodes = len(ports) nodeCount = 0 counts = {p: 0 for p in ports} @@ -1228,8 +1229,9 @@ def test_ArchiveProxyMode_03(self): """ ports = range(8001, 8005) cfg_file, cfg_props = self._genArchProxyCfg(ports) - _, dbObj = self.prepExtSrv(port=8000, cfgFile=cfg_file, cfgProps=cfg_props) - self.prepCluster(ports, createDatabase = False) + sqlite_file = tmp_path('ngas.sqlite') + _, dbObj = self.prepExtSrv(port=8000, cfgFile=cfg_file, cfgProps=cfg_props, sqlite_file=sqlite_file) + self.prepCluster(ports, createDatabase = False, sqlite_file=sqlite_file) # Set all Disks in unit :8002 to completed. dbObj.query2("UPDATE ngas_disks SET completed=1 WHERE host_id={0}", args=("%s:8002" % getHostName(),)) # Set :8004 to Offline. diff --git a/test/ngamsTestLib.py b/test/ngamsTestLib.py index cc082abb..f5de08fe 100644 --- a/test/ngamsTestLib.py +++ b/test/ngamsTestLib.py @@ -879,7 +879,8 @@ def prepExtSrv(self, srvModule = None, force=False, daemon = False, - cert_file=None): + cert_file=None, + sqlite_file=None): """ Prepare a standard server object, which runs as a separate process and serves via the standard HTTP interface. @@ -942,10 +943,16 @@ def prepExtSrv(self, cfgObj = self.env_aware_cfg(cfgFile) + # Calculate root directory and clear if needed + root_dir = root_dir or tmp_path('NGAS') + if delDirs: + shutil.rmtree(root_dir, True) + # Change what needs to be changed, like the position of the Sqlite # database file when necessary, the custom configuration items, and the # port number - self.point_to_sqlite_database(cfgObj, not dbCfgName and clearDb) + sqlite_file = sqlite_file or os.path.join(root_dir, 'ngas.sqlite') + self.point_to_sqlite_database(cfgObj, sqlite_file, create=(not dbCfgName and clearDb)) if (cfgProps): for cfgProp in cfgProps: # TODO: Handle Cfg. Group ID. @@ -956,10 +963,7 @@ def prepExtSrv(self, # Now connect to the database and perform any cleanups before we start # the server, like removing existing NGAS dirs and clearing tables - root_dir = root_dir or tmp_path('NGAS') dbObj = ngamsDb.from_config(cfgObj, maxpool=1) - if delDirs: - shutil.rmtree(root_dir, True) if (clearDb): logger.debug("Clearing NGAS DB ...") delNgasTbls(dbObj) @@ -1422,7 +1426,7 @@ def start_srv_in_cluster( return [srvId, port, cfg, db] def prepCluster(self, server_list, cfg_file='src/ngamsCfg.xml', createDatabase=True, - cfg_props=(), cert_file=None): + cfg_props=(), cert_file=None, sqlite_file=None): """ Prepare a common, simulated cluster. This consists of 1 to N servers running on the same node. It is ensured that each of @@ -1457,7 +1461,8 @@ def prepCluster(self, server_list, cfg_file='src/ngamsCfg.xml', createDatabase=T # Create the shared database first of all and generate a new config file tmpCfg = self.env_aware_cfg(cfg_file) - self.point_to_sqlite_database(tmpCfg, createDatabase) + sqlite_file = sqlite_file or tmp_path('ngas.sqlite') + self.point_to_sqlite_database(tmpCfg, sqlite_file, create=createDatabase) if createDatabase: with contextlib.closing(ngamsDb.from_config(tmpCfg, maxpool=1)) as db: delNgasTbls(db) @@ -1480,13 +1485,13 @@ def prepCluster(self, server_list, cfg_file='src/ngamsCfg.xml', createDatabase=T return d - def point_to_sqlite_database(self, cfgObj, create): + def point_to_sqlite_database(self, cfgObj, sqlite_file, create=True): # Exceptional handling for SQLite. if 'sqlite' in cfgObj.getDbInterface().lower(): - sqlite_file = os.path.join(tmp_root, 'ngas.sqlite') if create: rmFile(sqlite_file) + checkCreatePath(os.path.dirname(sqlite_file)) import sqlite3 fname = 'ngamsCreateTables-SQLite.sql' script = utils.b2s(pkg_resources.resource_string('ngamsSql', fname)) # @UndefinedVariable diff --git a/test/test_config_handling.py b/test/test_config_handling.py index 33499b29..23b893b6 100644 --- a/test/test_config_handling.py +++ b/test/test_config_handling.py @@ -40,7 +40,7 @@ from ngamsLib import ngamsConfig, ngamsDb, ngamsXmlMgr from .ngamsTestLib import delNgasTbls, ngamsTestSuite, \ - save_to_tmp + save_to_tmp, tmp_path dbIdAttr = 'Db-Test' @@ -130,7 +130,7 @@ def loadCfg(self, revAttr = "NgamsCfg.Header[1].Revision" cfgObj.storeVal(revAttr, "TEST-REVISION", "ngamsCfg-Test") - self.point_to_sqlite_database(cfgObj, createDatabase) + self.point_to_sqlite_database(cfgObj, tmp_path('ngas.sqlite'), create=createDatabase) dbObj = ngamsDb.from_config(cfgObj, maxpool=1) if (delDbTbls): delNgasTbls(dbObj) cfgObj.writeToDb(dbObj) diff --git a/test/test_db.py b/test/test_db.py index f52866d5..81527d77 100644 --- a/test/test_db.py +++ b/test/test_db.py @@ -28,7 +28,7 @@ class DbTests(ngamsTestLib.ngamsTestSuite): def setUp(self): super(DbTests, self).setUp() cfg = self.env_aware_cfg() - self.point_to_sqlite_database(cfg, True) + self.point_to_sqlite_database(cfg, ngamsTestLib.tmp_path('ngas.sqlite')) self.db = ngamsDb.from_config(cfg, maxpool=1) ngamsTestLib.delNgasTbls(self.db) @@ -47,4 +47,4 @@ def test_get_file_info_list_with_wildcards(self): file_info.setFileId('file-id') file_info.write('host-id', self.db, genSnapshot=0) res = list(self.db.getFileInfoList('disk-id', fileId="*")) - self.assertEqual(1, len(res)) + self.assertEqual(1, len(res)) \ No newline at end of file diff --git a/test/test_partner_sites.py b/test/test_partner_sites.py new file mode 100644 index 00000000..b3931b30 --- /dev/null +++ b/test/test_partner_sites.py @@ -0,0 +1,124 @@ +# +# ALMA - Atacama Large Millimiter Array +# (c) European Southern Observatory, 2002 +# Copyright by ESO (in the framework of the ALMA collaboration), +# All rights reserved +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307 USA +# + +""" +This module contains the Test Suite for the partner sites feature. +""" + +import contextlib +import functools +import os + +from ngamsLib import ngamsHttpUtils, ngamsStatus +from ngamsLib.ngamsCore import getHostName +from .ngamsTestLib import ngamsTestSuite, tmp_path + +class NgasPartnerSiteTest(ngamsTestSuite): + """ + Synopsis: + Test the partner site feature. + + Description: + NGAS offers a partner site feature for proxy requests to remote NGAS hosts + + Missing Test Cases: + """ + + def _prepare_partner_site_cluster(self, *original_server_list, **kwargs): + server_list = [] + for server_info in original_server_list: + port, property_list = server_info[0], list(server_info[1]) + server_list.append((port, property_list)) + + return self.prepCluster(server_list) + + def test_archive_status_retrieve_sequence(self): + host_name = getHostName() + sample_file_name = "SmallFile.fits" + sample_file_path = os.path.join("src", sample_file_name) +# sample_file_size = os.path.getsize(sample_file_path) + sample_mime_type = "application/octet-stream" + + # We create two cluster each container two NGAS nodes + # We configure the first cluster to use the second cluster as a partner site + config_list_1 = [("NgamsCfg.Server[1].RootDirectory", "/tmp/ngas1"), + ("NgamsCfg.PartnerSites[1].ProxyMode", "1"), + ("NgamsCfg.PartnerSites[1].PartnerSite[1].Address", "localhost:9011")] + self._prepare_partner_site_cluster((9001, config_list_1)) + + config_list_2 = [("NgamsCfg.Server[1].RootDirectory", "/tmp/ngas2")] + self._prepare_partner_site_cluster((9011, config_list_2)) + + # We archive a test sample file on the partner site cluster + self.archive(9011, sample_file_path, mimeType=sample_mime_type) + + # We check the status of a file ID found on the partner site cluster + command_status = functools.partial(ngamsHttpUtils.httpGet, "localhost", 9001, "STATUS", timeout=5) + argument_list = {"file_id": sample_file_name} + with contextlib.closing(command_status(pars=argument_list)) as response: + self.assertEqual(response.status, 200) + # Parse the response XML status information + response_data = response.read() + status_info = ngamsStatus.ngamsStatus().unpackXmlDoc(response_data, 1) + disk_info = status_info.getDiskStatusList()[0] + file_info = disk_info.getFileObjList()[0] + self.assertEqual(disk_info.getHostId(), "{0}:9011".format(host_name)) + self.assertEqual(file_info.getFileId(), sample_file_name) + self.assertEqual(file_info.getFileVersion(), 1) + self.assertEqual(file_info.getFormat(), sample_mime_type) + + # Retrieve a file found on the partner site cluster + retrieve_file_path = tmp_path(sample_file_name) + self.retrieve(9001, sample_file_name, targetFile=retrieve_file_path) +# retrieve_file_size = os.path.getsize(retrieve_file_path) +# self.assertEqual(sample_file_size, retrieve_file_size) + + def test_status_retrieve_sequence(self): + sample_file_name = "SmallFile.fits" + sample_file_path = os.path.join("src", sample_file_name) + bad_file_name = "dummy.fits" + + # We create two cluster each container two NGAS nodes + # We configure the first cluster to use the second cluster as a partner site + config_list_1 = [("NgamsCfg.Server[1].RootDirectory", "/tmp/ngas1"), + ("NgamsCfg.PartnerSites[1].ProxyMode", "1"), + ("NgamsCfg.PartnerSites[1].PartnerSite[1].Address", "localhost:9011")] + self._prepare_partner_site_cluster((9001, config_list_1)) + + config_list_2 = [("NgamsCfg.Server[1].RootDirectory", "/tmp/ngas2")] + self._prepare_partner_site_cluster((9011, config_list_2)) + + # We check the status of a file ID found on the partner site cluster + command_status = functools.partial(ngamsHttpUtils.httpGet, "localhost", 9001, "STATUS", timeout=1000) + argument_list = {"file_id": bad_file_name} + with contextlib.closing(command_status(pars=argument_list)) as response: + self.assertEqual(response.status, 400) + # Parse the response XML status information + response_data = response.read() + status_info = ngamsStatus.ngamsStatus().unpackXmlDoc(response_data, 1) + self.assertEqual(status_info.getStatus(), "FAILURE") + self.assertEqual(status_info.getMessage(), + "NGAMS_ER_UNAVAIL_FILE:4019:ERROR: File with ID: dummy.fits appears not to be available.") + + # Retrieve a file found on the partner site cluster + retrieve_file_path = tmp_path(bad_file_name) + self.retrieve(9001, bad_file_name, targetFile=retrieve_file_path, expectedStatus='FAILURE')