From 670e43cf19e005585be4eb17c809e76a24b99b08 Mon Sep 17 00:00:00 2001 From: Stewart McLay Date: Wed, 16 Sep 2020 18:22:47 +0200 Subject: [PATCH 1/5] Ported partner sites feature from ALMA NGAS source code --- .../ngamsServer/commands/retrieve.py | 41 +++-- .../ngamsServer/commands/status.py | 72 +++++--- src/ngamsServer/ngamsServer/ngamsFileUtils.py | 165 +++++++++++++++++- src/ngamsServer/ngamsServer/ngamsServer.py | 105 ++++++++++- 4 files changed, 343 insertions(+), 40 deletions(-) diff --git a/src/ngamsServer/ngamsServer/commands/retrieve.py b/src/ngamsServer/ngamsServer/commands/retrieve.py index bb241190..54ddf1f2 100644 --- a/src/ngamsServer/ngamsServer/commands/retrieve.py +++ b/src/ngamsServer/ngamsServer/commands/retrieve.py @@ -286,13 +286,21 @@ def _handleCmdRetrieve(srvObj, fileVer, include_compression=True) # If not located the quick way try the normal way. - if (not ipAddress): - # Locate the file best suiting the query and send it back if possible. - location, host, ipAddress, port, mountPoint, filename, fileId,\ - fileVersion, mimeType, compression =\ - ngamsFileUtils.locateArchiveFile(srvObj, fileId, fileVer, - diskId, hostId, reqPropsObj, - include_compression=True) + if not ipAddress: + try: + # Locate the file best suiting the query and send it back if possible. + location, host, ipAddress, port, mountPoint, filename, fileId, \ + fileVersion, mimeType, compression = \ + ngamsFileUtils.locateArchiveFile(srvObj, fileId, fileVer, + diskId, hostId, reqPropsObj, + include_compression=True) + except: + # If the file is still not found then try a remote partner site + location, host, ipAddress, port, mountPoint, filename, fileId, \ + fileVersion, mimeType, compression = \ + ngamsFileUtils.lookup_partner_site_file(srvObj, fileId, + fileVer, reqPropsObj, + include_compression=True) # If still not located, try to contact associated NGAS sites to query # if the file is available there. @@ -310,9 +318,8 @@ def _handleCmdRetrieve(srvObj, # Perform the possible processing requested. procResult, compression = performProcessing(srvObj, reqPropsObj, srcFilename, mimeType, compression) - elif location in (NGAMS_HOST_CLUSTER, NGAMS_HOST_REMOTE) and \ - srvObj.getCfg().getProxyMode(): + elif location == NGAMS_HOST_CLUSTER and srvObj.getCfg().getProxyMode(): logger.info("NG/AMS Server acting as proxy - requesting file with ID: %s " +\ "from NG/AMS Server on host/port: %s/%s", fileId, host, str(port)) @@ -324,6 +331,18 @@ def _handleCmdRetrieve(srvObj, httpRef.proxy_request(host, ipAddress, port, timeout=timeout) return + elif location == NGAMS_HOST_REMOTE and srvObj.is_partner_sites_proxy_mode(): + logger.info("NG/AMS Server acting as remote proxy - requesting file with ID: %s " + \ + "from NG/AMS Server on host/port: %s/%s", + fileId, host, str(port)) + + # Act as a remote proxy - get the file from the remote NGAS host + # specified and send back the contents. The file is temporarily stored + # in the Processing Area. + timeout = float(reqPropsObj['timeout']) if 'timeout' in reqPropsObj else 300 + httpRef.remote_proxy_request(reqPropsObj, host, port, timeout=timeout) + return + else: # No proxy mode: A redirection HTTP response is generated. httpRef.redirect(ipAddress, port) @@ -334,8 +353,8 @@ def _handleCmdRetrieve(srvObj, def handleCmd(srvObj, - reqPropsObj, - httpRef): + reqPropsObj, + httpRef): """ Handle a RETRIEVE command. diff --git a/src/ngamsServer/ngamsServer/commands/status.py b/src/ngamsServer/ngamsServer/commands/status.py index 29391038..da3a1a3f 100644 --- a/src/ngamsServer/ngamsServer/commands/status.py +++ b/src/ngamsServer/ngamsServer/commands/status.py @@ -41,7 +41,7 @@ import six -from ngamsLib.ngamsCore import NGAMS_HOST_LOCAL,\ +from ngamsLib.ngamsCore import NGAMS_HOST_LOCAL, NGAMS_HOST_REMOTE,\ getHostName, genLog, genUniqueId, rmFile,\ compressFile, NGAMS_GZIP_XML_MT, getNgamsVersion,\ NGAMS_SUCCESS, NGAMS_XML_MT, fromiso8601, toiso8601 @@ -87,12 +87,23 @@ def _checkFileAccess(srvObj, # Check if the file is located on this host, or if the request should be # forwarded (if this server should act as proxy). - location, fileHost, ipAddress, filePortNo, mountPoint, filename, fileId,\ - fileVersion, mimeType =\ - ngamsFileUtils.locateArchiveFile(srvObj, fileId, fileVersion, - diskId) + try: + location, fileHost, ipAddress, filePortNo, mountPoint, filename, \ + fileId, fileVersion, mimeType = \ + ngamsFileUtils.locateArchiveFile(srvObj, fileId, fileVersion, + diskId, reqPropsObj=reqPropsObj) + except: + # If the file is still not found then try a remote partner site + location, fileHost, ipAddress, filePortNo, mountPoint, filename, \ + fileId, fileVersion, mimeType = \ + ngamsFileUtils.lookup_partner_site_file(srvObj, fileId, + fileVersion, reqPropsObj) + + if location == NGAMS_HOST_REMOTE: + fileHost = "{0}:{1}".format(fileHost, filePortNo) + return genLog("NGAMS_INFO_FILE_AVAIL", [fileId + "/Version: " + str(fileVersion), fileHost]) - if (location != NGAMS_HOST_LOCAL): + elif location != NGAMS_HOST_LOCAL: # Go and get it! host, port = srvObj.get_remote_server_endpoint(fileHost) pars = (('file_access', fileId), ('file_version', fileVersion), ('disk_id', diskId)) @@ -387,8 +398,8 @@ def _handleFileListReply(srvObj, def handleCmd(srvObj, - reqPropsObj, - httpRef): + reqPropsObj, + httpRef): """ Handle STATUS command. @@ -403,12 +414,11 @@ def handleCmd(srvObj, Returns: Void. """ status = ngamsStatus.ngamsStatus() - status.\ - setDate(toiso8601()).\ - setVersion(getNgamsVersion()).setHostId(srvObj.getHostId()).\ - setStatus(NGAMS_SUCCESS).\ - setMessage("Successfully handled command STATUS").\ - setState(srvObj.getState()).setSubState(srvObj.getSubState()) + status.setDate(toiso8601()).\ + setVersion(getNgamsVersion()).setHostId(srvObj.getHostId()).\ + setStatus(NGAMS_SUCCESS).\ + setMessage("Successfully handled command STATUS").\ + setState(srvObj.getState()).setSubState(srvObj.getSubState()) reqPropsObjRef = reqPropsObj @@ -515,19 +525,31 @@ def handleCmd(srvObj, genDiskStatus = 1 elif (fileId): if (not fileVersion): fileVersion = -1 - fileObj = ngamsFileInfo.ngamsFileInfo() - fileObj.read(srvObj.getHostId(), srvObj.getDb(), fileId, fileVersion) - diskObj = ngamsDiskInfo.ngamsDiskInfo() try: - diskObj.read(srvObj.getDb(), fileObj.getDiskId()) + fileObj = ngamsFileInfo.ngamsFileInfo() + fileObj.read(srvObj.getHostId(), srvObj.getDb(), fileId, fileVersion) + diskObj = ngamsDiskInfo.ngamsDiskInfo() + try: + diskObj.read(srvObj.getDb(), fileObj.getDiskId()) + except: + errMsg = "Illegal Disk ID found: {0} for file with ID: {1}".format(fileObj.getDiskId(), fileId) + raise Exception(errMsg) + diskObj.addFileObj(fileObj) + status.addDiskStatus(diskObj) + genDiskStatus = 1 + genFileStatus = 1 except: - errMsg = "Illegal Disk ID found: %s for file with ID: %s" %\ - (fileObj.getDiskId(), fileId) - raise Exception(errMsg) - diskObj.addFileObj(fileObj) - status.addDiskStatus(diskObj) - genDiskStatus = 1 - genFileStatus = 1 + # The file was not found in the database. Check if it is available + # on a remote partner site. + host, port, status_info, disk_info, file_info = \ + ngamsFileUtils.lookup_partner_site_file_status(srvObj, fileId, + fileVersion, + reqPropsObj) + # Update status reply using the disk status and file status + # information retrieved from the partner site + status.addDiskStatus(disk_info) + genDiskStatus = 1 + genFileStatus = 1 elif (requestId): logger.debug("Checking status of request with ID: %s", requestId) reqPropsObjRef = srvObj.getRequest(requestId) diff --git a/src/ngamsServer/ngamsServer/ngamsFileUtils.py b/src/ngamsServer/ngamsServer/ngamsFileUtils.py index 1ad7e242..cd6d4e45 100644 --- a/src/ngamsServer/ngamsServer/ngamsFileUtils.py +++ b/src/ngamsServer/ngamsServer/ngamsFileUtils.py @@ -72,6 +72,169 @@ # `from_bytes` functions need to be aligned. checksum_info = collections.namedtuple('crc_info', 'init method final from_bytes equals') + +def lookup_partner_site_file_status(ngas_server, + file_id, + file_version, + request_properties): + """ + Lookup the file indicated by the File ID using a NGAS partner site (if one + is specified in the configuration). Returns a tuple of status objects. + + Parameters: + + ngas_server: Reference to NG/AMS server class object (ngamsServer). + + file_id: File ID of file to locate (string). + + file_version: Version of the file (integer). + + request_properties: Request Property object to keep track of actions done + during the request handling (ngamsReqProps|None). + + Returns: + + host: Partner site host name (or IP address) + + port: Partner site port number + + status_info: Status response object (ngamsStatus) + + disk_info: Status response disk information object (ngamsDiskInfo) + + file_info: Status response file information object (ngamsFileInfo) + """ + # If the request came from a partner site. We will not continue to + # propagate the request to avoid a death loop scenario. We will raise an + # exception. + if request_properties.hasHttpPar("partner_site_redirect"): + error_message = genLog("NGAMS_ER_UNAVAIL_FILE", [file_id]) + logger.debug(error_message) + raise Exception(error_message) + + # Check partner sites is enabled are available from the configuration + if not ngas_server.is_partner_sites_proxy_mode()\ + or not ngas_server.get_partner_sites_address_list(): + error_message = genLog("NGAMS_ER_UNAVAIL_FILE", [file_id]) + logger.debug(error_message) + raise Exception(error_message) + + # Lets query the partner sites for the availability of the requested file + authentication_header = ngamsSrvUtils.genIntAuthHdr(ngas_server) + parameter_list = [["file_id", file_id]] + if file_version != -1: + parameter_list.append(["file_version", file_version]) + parameter_list.append(["partner_site_redirect", 1]) + + host, port, status_info, disk_info, file_info = None, None, None, None, None + for partner_site in ngas_server.get_partner_sites_address_list(): + partner_address = partner_site.split(":")[0] + partner_port = int(partner_site.split(":")[-1]) + try: + logger.info("Looking up file ID %s on partner site %s", file_id, + partner_site) + response = ngamsHttpUtils.httpGet(partner_address, partner_port, + NGAMS_STATUS_CMD, + parameter_list, + auth=authentication_header) + with contextlib.closing(response): + response_info = response.read() + except: + # We ignore this error, and try the next partner site, if any + continue + + status_info = ngamsStatus.ngamsStatus().unpackXmlDoc(response_info, 1) + logger.info("Result of File Access Query: {}".format(re.sub("\n", "", + str(status_info.genXml().toprettyxml(' ', '\n'))))) + if status_info.getStatus() == "FAILURE": + logger.info(genLog("NGAMS_INFO_FILE_NOT_AVAIL", [file_id, partner_address])) + else: + logger.info(genLog("NGAMS_INFO_FILE_AVAIL", [file_id, partner_address])) + disk_info = status_info.getDiskStatusList()[0] + file_info = disk_info.getFileObjList()[0] + host = partner_address + port = partner_port + break + + if status_info is None: + # Failed to find file on a partner site + error_message = genLog("NGAMS_ER_UNAVAIL_FILE", [file_id]) + logger.debug(error_message) + raise Exception(error_message) + + return host, port, status_info, disk_info, file_info + + +def lookup_partner_site_file(ngas_server, + file_id, + file_version, + request_properties, + include_compression): + """ + Lookup the file indicated by the File ID using a NGAS partner site (if one + is specified in the configuration). Returns a list containing the necessary + information for retrieving the file: + + [, , , , , + , , , ] + + - whereby: + + = Location of the file (NGAMS_HOST_LOCAL, + NGAMS_HOST_CLUSTER, NGAMS_HOST_DOMAIN, + NGAMS_HOST_REMOTE). + = Host ID of host to be contacted to get access to the + file. + = IP Address of host to be contacted to get access to the + file. + = Port number used by the NG/AMS Server. + = Mount point at which the file is residing. + = Name of file relative to mount point. + = ID of file. + = Version of file. + = Mime-type of file (as registered in NGAS). + + ngas_server: Reference to NG/AMS server class object (ngamsServer). + + file_id: File ID of file to locate (string). + + file_version: Version of the file (integer). + + request_properties: Request Property object to keep track of actions done + during the request handling (ngamsReqProps|None). + + Returns: List with information about file location (list). + """ + host, port, status_info, disk_info, file_info = \ + lookup_partner_site_file_status(ngas_server, file_id, file_version, + request_properties) + + location = NGAMS_HOST_REMOTE + ip_address = None + + # The file was found, get the info necessary for the acquiring the file. + file_attribute_list = [ location, host, ip_address, port, + disk_info.getMountPoint(), + file_info.getFilename(), + file_info.getFileId(), + file_info.getFileVersion(), + file_info.getFormat() ] + + if include_compression: + file_attribute_list.append(file_info.getCompression()) + + message = "Located suitable file for request - File ID: %s. " \ + + "Info for file found - Location: %s - Host ID/IP: %s/%s - " \ + + "Port Number: %s - File Version: %d - Filename: %s - " \ + + "Mime-type: %s" + logger.debug(message, file_id, location, host, ip_address, port, + file_info.getFileVersion(), + file_info.getFilename(), + file_info.getFormat()) + + return file_attribute_list + + def _locateArchiveFile(srvObj, fileId, fileVersion, @@ -764,4 +927,4 @@ def check_checksum(srvObj, fio, filename): "%s/%s/%s - skipping checksum check" logger.info(msg, fio.getDiskId(), fio.getFileId(), fio.getFileVersion()) -# EOF \ No newline at end of file +# EOF diff --git a/src/ngamsServer/ngamsServer/ngamsServer.py b/src/ngamsServer/ngamsServer/ngamsServer.py index 074dc5d9..eb453ea6 100644 --- a/src/ngamsServer/ngamsServer/ngamsServer.py +++ b/src/ngamsServer/ngamsServer/ngamsServer.py @@ -344,7 +344,6 @@ def write_data(self, data): def send_status(self, message, status=NGAMS_SUCCESS, code=None, http_message=None, hdrs={}): """Creates and sends an NGAS status XML document back to the client""" - if code is None: code = 200 if status == NGAMS_SUCCESS else 400 @@ -436,6 +435,68 @@ def proxy_request(self, host_id, host, port, timeout=300): self.send_data(data, mime_type, code=code, hdrs=hdrs) + def remote_proxy_request(self, request, host, port, timeout=300): + """Proxy the current request to remote host ``host``:``port``""" + + url = 'http://{0}:{1}/RETRIEVE'.format(host, port) + logger.info("Proxying request for /RETRIEVE to %s:%d", host, port) + + parameter_list = [] + for parameter in request.getHttpParNames(): + if parameter.lower() == "initiator": + continue + else: + parameter_list.append([parameter, request.getHttpPar(parameter)]) + + start_byte = 0 + header_list = [] + for header in request.getHttpHdrs(): + header_list.append([header, request.getHttpHdr(header)]) + if header.lower() == "range": + value = request.getHttpHdr(header) + start_byte = int(value.replace("bytes=", "").split("-")[0]) + + block_size = self.ngasServer.getCfg().getBlockSize() + + # Make sure the time_out parameters is within proper boundaries + timeout = min(max(timeout, 0), 1200) + + authorization_header = ngamsSrvUtils.genIntAuthHdr(self.ngasServer) + + response = ngamsHttpUtils.httpGetUrl(url, parameter_list, header_list, + timeout, authorization_header) + + logger.info("Received remote partner site proxy response from %s:%d, sending to client", host, port) + response_headers = {header[0]: header[1] for header in response.getheaders()} + logger.info("Headers from remote partner site proxy response: %r", response_headers) + + with contextlib.closing(response): + size = int(response.getheader("content-length")) + self.write_stream_data(response, response_headers, size, start_byte, block_size) + + def write_stream_data(self, response, headers, size, start_byte=0, block_size=65536): + """Streams the data from the remote host""" + + logger.info("Sending %d bytes of data and headers %r", size, headers) + + self.send_response(response.status, hdrs=headers) + self.end_headers() + + logger.info("Sending %d bytes to client, starting at byte %d", size, start_byte) + data_sent = start_byte + data_to_send = size + start_time = time.time() + + # TODO: Should we do something about https here? + self.wfile.flush() + while data_sent < data_to_send: + stream_buffer = response.read(block_size) + self.wfile.write(stream_buffer) + data_sent += len(stream_buffer) + + elapsed_time = time.time() - start_time + size_mb = size / 1024. / 1024. + logger.info("Sent data stream at %.3f [MB/s]", size_mb / elapsed_time) class logging_config(object): @@ -505,6 +566,10 @@ def __init__(self, cfg_fname, _cert=None): self._pid_file_created = False self._cert = _cert + # Handling partner sites proxy mode feature + self.partner_sites_proxy_mode = False + self.partner_site_address_list = [] + # Keep track of how many requests are being served, # This is slightly different from keeping track of the server's # sub-state because many commands don't bother changing it @@ -686,7 +751,7 @@ def fire_archive_event(self, file_id, file_version): try: s(evt) except: - msg = ("Error while trigerring archiving event subscriber, " + msg = ("Error while triggering archiving event subscriber, " "will continue with the rest anyway") logger.exception(msg) @@ -1426,6 +1491,24 @@ def getSrvListDic(self): return self.__srvListDic + def is_partner_sites_proxy_mode(self): + """ + Return reference to the Partner Sites Proxy Mode. + + Returns: Reference to Partner Sites Proxy Mode. + """ + return self.partner_sites_proxy_mode + + + def get_partner_sites_address_list(self): + """ + Return reference to the Partner Site Address List. + + Returns: Reference to Partner Site Address List. + """ + return self.partner_site_address_list + + def setDiskDic(self, diskDic): """ @@ -2034,6 +2117,22 @@ def noop(*args): if allowProcessingReq: checkCreatePath(self.cfg.getProcessingDirectory()) + # Extract partner site proxy mode setting + value = self.getCfg().getVal("PartnerSites[1].ProxyMode") + if value == "1": + self.partner_sites_proxy_mode = True + + # Extract partner site address list + idx = 1 + while True: + address_key = "PartnerSites[1].PartnerSite[{0}].Address".format(idx) + partner_site_address = self.getCfg().getVal(address_key) + if partner_site_address is None: + break + else: + logger.info("Registering partner site address: %s", partner_site_address) + self.partner_site_address_list.append(partner_site_address) + idx += 1 # Check if there is already a PID file. logger.debug("Check if NG/AMS PID file is existing ...") @@ -2355,4 +2454,4 @@ def main(args=None, prog='ngamsServer', server_class=ngamsServer): _parse_and_run(args, prog, server_class) if __name__ == '__main__': - main() \ No newline at end of file + main() From 3dcbe5bed946b45faffe86f993ece1308b5285a9 Mon Sep 17 00:00:00 2001 From: Stewart McLay Date: Fri, 18 Sep 2020 18:26:34 +0200 Subject: [PATCH 2/5] Apply some code cleanup according to feedback on pull request --- src/ngamsServer/ngamsServer/commands/status.py | 6 ++---- src/ngamsServer/ngamsServer/ngamsFileUtils.py | 10 +++++++--- src/ngamsServer/ngamsServer/ngamsServer.py | 12 ++++++++---- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/src/ngamsServer/ngamsServer/commands/status.py b/src/ngamsServer/ngamsServer/commands/status.py index da3a1a3f..d6c7048c 100644 --- a/src/ngamsServer/ngamsServer/commands/status.py +++ b/src/ngamsServer/ngamsServer/commands/status.py @@ -536,8 +536,6 @@ def handleCmd(srvObj, raise Exception(errMsg) diskObj.addFileObj(fileObj) status.addDiskStatus(diskObj) - genDiskStatus = 1 - genFileStatus = 1 except: # The file was not found in the database. Check if it is available # on a remote partner site. @@ -548,8 +546,8 @@ def handleCmd(srvObj, # Update status reply using the disk status and file status # information retrieved from the partner site status.addDiskStatus(disk_info) - genDiskStatus = 1 - genFileStatus = 1 + genDiskStatus = 1 + genFileStatus = 1 elif (requestId): logger.debug("Checking status of request with ID: %s", requestId) reqPropsObjRef = srvObj.getRequest(requestId) diff --git a/src/ngamsServer/ngamsServer/ngamsFileUtils.py b/src/ngamsServer/ngamsServer/ngamsFileUtils.py index cd6d4e45..8bcf59da 100644 --- a/src/ngamsServer/ngamsServer/ngamsFileUtils.py +++ b/src/ngamsServer/ngamsServer/ngamsFileUtils.py @@ -104,18 +104,22 @@ def lookup_partner_site_file_status(ngas_server, file_info: Status response file information object (ngamsFileInfo) """ + file_reference = file_id + if (file_version > 0): + file_reference += "/Version: " + str(file_version) + # If the request came from a partner site. We will not continue to # propagate the request to avoid a death loop scenario. We will raise an # exception. if request_properties.hasHttpPar("partner_site_redirect"): - error_message = genLog("NGAMS_ER_UNAVAIL_FILE", [file_id]) + error_message = genLog("NGAMS_ER_UNAVAIL_FILE", [file_reference]) logger.debug(error_message) raise Exception(error_message) # Check partner sites is enabled are available from the configuration if not ngas_server.is_partner_sites_proxy_mode()\ or not ngas_server.get_partner_sites_address_list(): - error_message = genLog("NGAMS_ER_UNAVAIL_FILE", [file_id]) + error_message = genLog("NGAMS_ER_UNAVAIL_FILE", [file_reference]) logger.debug(error_message) raise Exception(error_message) @@ -158,7 +162,7 @@ def lookup_partner_site_file_status(ngas_server, if status_info is None: # Failed to find file on a partner site - error_message = genLog("NGAMS_ER_UNAVAIL_FILE", [file_id]) + error_message = genLog("NGAMS_ER_UNAVAIL_FILE", [file_reference]) logger.debug(error_message) raise Exception(error_message) diff --git a/src/ngamsServer/ngamsServer/ngamsServer.py b/src/ngamsServer/ngamsServer/ngamsServer.py index eb453ea6..51eb2822 100644 --- a/src/ngamsServer/ngamsServer/ngamsServer.py +++ b/src/ngamsServer/ngamsServer/ngamsServer.py @@ -63,8 +63,8 @@ NGAMS_HTTP_REDIRECT, NGAMS_HTTP_INT_AUTH_USER, \ NGAMS_SUCCESS, NGAMS_FAILURE, NGAMS_OFFLINE_STATE,\ NGAMS_IDLE_SUBSTATE, NGAMS_BUSY_SUBSTATE, NGAMS_NOTIF_ERROR,\ - NGAMS_NOT_SET, NGAMS_XML_MT, loadPlugInEntryPoint, isoTime2Secs,\ - toiso8601 + NGAMS_NOT_SET, NGAMS_XML_MT, NGAMS_RETRIEVE_CMD, loadPlugInEntryPoint,\ + isoTime2Secs, toiso8601 from ngamsLib import ngamsHighLevelLib, ngamsLib, ngamsEvent, ngamsHttpUtils,\ utils, logutils from ngamsLib import ngamsDb, ngamsConfig, ngamsReqProps, pysendfile @@ -438,8 +438,9 @@ def proxy_request(self, host_id, host, port, timeout=300): def remote_proxy_request(self, request, host, port, timeout=300): """Proxy the current request to remote host ``host``:``port``""" - url = 'http://{0}:{1}/RETRIEVE'.format(host, port) - logger.info("Proxying request for /RETRIEVE to %s:%d", host, port) + url = 'http://{0}:{1}/{2}'.format(host, port, NGAMS_RETRIEVE_CMD) + logger.info("Proxying request for /%s to %s:%d", NGAMS_RETRIEVE_CMD, + host, port) parameter_list = [] for parameter in request.getHttpParNames(): @@ -495,6 +496,9 @@ def write_stream_data(self, response, headers, size, start_byte=0, block_size=65 data_sent += len(stream_buffer) elapsed_time = time.time() - start_time + # Avoid divide by zeros later on, let's say it took us 1 [us] to do this + if elapsed_time == 0: + elapsed_time = 0.000001 size_mb = size / 1024. / 1024. logger.info("Sent data stream at %.3f [MB/s]", size_mb / elapsed_time) From 951214966eb1b1776a42ab1265af80b83c3b25cc Mon Sep 17 00:00:00 2001 From: Stewart McLay Date: Mon, 21 Sep 2020 12:05:29 +0200 Subject: [PATCH 3/5] Some improvements in remote proxy streaming loop --- src/ngamsServer/ngamsServer/ngamsServer.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/ngamsServer/ngamsServer/ngamsServer.py b/src/ngamsServer/ngamsServer/ngamsServer.py index 51eb2822..ddbc69c3 100644 --- a/src/ngamsServer/ngamsServer/ngamsServer.py +++ b/src/ngamsServer/ngamsServer/ngamsServer.py @@ -489,11 +489,15 @@ def write_stream_data(self, response, headers, size, start_byte=0, block_size=65 start_time = time.time() # TODO: Should we do something about https here? - self.wfile.flush() while data_sent < data_to_send: stream_buffer = response.read(block_size) + stream_buffer_size = len(stream_buffer) self.wfile.write(stream_buffer) - data_sent += len(stream_buffer) + data_sent += stream_buffer_size + if stream_buffer_size == 0 and data_sent < data_to_send: + logger.error("Data stream is incomplete. Only received %d bytes, expected %d bytes.", + data_sent, data_to_send) + break elapsed_time = time.time() - start_time # Avoid divide by zeros later on, let's say it took us 1 [us] to do this From 3be8318b7ca4fbead18f49bc8fde3c5995e8381a Mon Sep 17 00:00:00 2001 From: Stewart McLay Date: Tue, 22 Sep 2020 21:15:25 +0200 Subject: [PATCH 4/5] Added partner sites unit test --- src/ngamsServer/ngamsServer/ngamsFileUtils.py | 5 +- test/ngamsTestLib.py | 3 +- test/test_partner_sites.py | 124 ++++++++++++++++++ 3 files changed, 127 insertions(+), 5 deletions(-) create mode 100644 test/test_partner_sites.py diff --git a/src/ngamsServer/ngamsServer/ngamsFileUtils.py b/src/ngamsServer/ngamsServer/ngamsFileUtils.py index 8bcf59da..e85ee96a 100644 --- a/src/ngamsServer/ngamsServer/ngamsFileUtils.py +++ b/src/ngamsServer/ngamsServer/ngamsFileUtils.py @@ -113,14 +113,12 @@ def lookup_partner_site_file_status(ngas_server, # exception. if request_properties.hasHttpPar("partner_site_redirect"): error_message = genLog("NGAMS_ER_UNAVAIL_FILE", [file_reference]) - logger.debug(error_message) raise Exception(error_message) # Check partner sites is enabled are available from the configuration if not ngas_server.is_partner_sites_proxy_mode()\ or not ngas_server.get_partner_sites_address_list(): error_message = genLog("NGAMS_ER_UNAVAIL_FILE", [file_reference]) - logger.debug(error_message) raise Exception(error_message) # Lets query the partner sites for the availability of the requested file @@ -160,10 +158,9 @@ def lookup_partner_site_file_status(ngas_server, port = partner_port break - if status_info is None: + if status_info is None or status_info.getStatus() == "FAILURE": # Failed to find file on a partner site error_message = genLog("NGAMS_ER_UNAVAIL_FILE", [file_reference]) - logger.debug(error_message) raise Exception(error_message) return host, port, status_info, disk_info, file_info diff --git a/test/ngamsTestLib.py b/test/ngamsTestLib.py index cc082abb..6f8ad719 100644 --- a/test/ngamsTestLib.py +++ b/test/ngamsTestLib.py @@ -1484,7 +1484,8 @@ def point_to_sqlite_database(self, cfgObj, create): # Exceptional handling for SQLite. if 'sqlite' in cfgObj.getDbInterface().lower(): - sqlite_file = os.path.join(tmp_root, 'ngas.sqlite') + # sqlite_file = os.path.join(tmp_root, 'ngas.sqlite') + sqlite_file = genTmpFilename(suffix='.sqlite') if create: rmFile(sqlite_file) import sqlite3 diff --git a/test/test_partner_sites.py b/test/test_partner_sites.py new file mode 100644 index 00000000..b3931b30 --- /dev/null +++ b/test/test_partner_sites.py @@ -0,0 +1,124 @@ +# +# ALMA - Atacama Large Millimiter Array +# (c) European Southern Observatory, 2002 +# Copyright by ESO (in the framework of the ALMA collaboration), +# All rights reserved +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307 USA +# + +""" +This module contains the Test Suite for the partner sites feature. +""" + +import contextlib +import functools +import os + +from ngamsLib import ngamsHttpUtils, ngamsStatus +from ngamsLib.ngamsCore import getHostName +from .ngamsTestLib import ngamsTestSuite, tmp_path + +class NgasPartnerSiteTest(ngamsTestSuite): + """ + Synopsis: + Test the partner site feature. + + Description: + NGAS offers a partner site feature for proxy requests to remote NGAS hosts + + Missing Test Cases: + """ + + def _prepare_partner_site_cluster(self, *original_server_list, **kwargs): + server_list = [] + for server_info in original_server_list: + port, property_list = server_info[0], list(server_info[1]) + server_list.append((port, property_list)) + + return self.prepCluster(server_list) + + def test_archive_status_retrieve_sequence(self): + host_name = getHostName() + sample_file_name = "SmallFile.fits" + sample_file_path = os.path.join("src", sample_file_name) +# sample_file_size = os.path.getsize(sample_file_path) + sample_mime_type = "application/octet-stream" + + # We create two cluster each container two NGAS nodes + # We configure the first cluster to use the second cluster as a partner site + config_list_1 = [("NgamsCfg.Server[1].RootDirectory", "/tmp/ngas1"), + ("NgamsCfg.PartnerSites[1].ProxyMode", "1"), + ("NgamsCfg.PartnerSites[1].PartnerSite[1].Address", "localhost:9011")] + self._prepare_partner_site_cluster((9001, config_list_1)) + + config_list_2 = [("NgamsCfg.Server[1].RootDirectory", "/tmp/ngas2")] + self._prepare_partner_site_cluster((9011, config_list_2)) + + # We archive a test sample file on the partner site cluster + self.archive(9011, sample_file_path, mimeType=sample_mime_type) + + # We check the status of a file ID found on the partner site cluster + command_status = functools.partial(ngamsHttpUtils.httpGet, "localhost", 9001, "STATUS", timeout=5) + argument_list = {"file_id": sample_file_name} + with contextlib.closing(command_status(pars=argument_list)) as response: + self.assertEqual(response.status, 200) + # Parse the response XML status information + response_data = response.read() + status_info = ngamsStatus.ngamsStatus().unpackXmlDoc(response_data, 1) + disk_info = status_info.getDiskStatusList()[0] + file_info = disk_info.getFileObjList()[0] + self.assertEqual(disk_info.getHostId(), "{0}:9011".format(host_name)) + self.assertEqual(file_info.getFileId(), sample_file_name) + self.assertEqual(file_info.getFileVersion(), 1) + self.assertEqual(file_info.getFormat(), sample_mime_type) + + # Retrieve a file found on the partner site cluster + retrieve_file_path = tmp_path(sample_file_name) + self.retrieve(9001, sample_file_name, targetFile=retrieve_file_path) +# retrieve_file_size = os.path.getsize(retrieve_file_path) +# self.assertEqual(sample_file_size, retrieve_file_size) + + def test_status_retrieve_sequence(self): + sample_file_name = "SmallFile.fits" + sample_file_path = os.path.join("src", sample_file_name) + bad_file_name = "dummy.fits" + + # We create two cluster each container two NGAS nodes + # We configure the first cluster to use the second cluster as a partner site + config_list_1 = [("NgamsCfg.Server[1].RootDirectory", "/tmp/ngas1"), + ("NgamsCfg.PartnerSites[1].ProxyMode", "1"), + ("NgamsCfg.PartnerSites[1].PartnerSite[1].Address", "localhost:9011")] + self._prepare_partner_site_cluster((9001, config_list_1)) + + config_list_2 = [("NgamsCfg.Server[1].RootDirectory", "/tmp/ngas2")] + self._prepare_partner_site_cluster((9011, config_list_2)) + + # We check the status of a file ID found on the partner site cluster + command_status = functools.partial(ngamsHttpUtils.httpGet, "localhost", 9001, "STATUS", timeout=1000) + argument_list = {"file_id": bad_file_name} + with contextlib.closing(command_status(pars=argument_list)) as response: + self.assertEqual(response.status, 400) + # Parse the response XML status information + response_data = response.read() + status_info = ngamsStatus.ngamsStatus().unpackXmlDoc(response_data, 1) + self.assertEqual(status_info.getStatus(), "FAILURE") + self.assertEqual(status_info.getMessage(), + "NGAMS_ER_UNAVAIL_FILE:4019:ERROR: File with ID: dummy.fits appears not to be available.") + + # Retrieve a file found on the partner site cluster + retrieve_file_path = tmp_path(bad_file_name) + self.retrieve(9001, bad_file_name, targetFile=retrieve_file_path, expectedStatus='FAILURE') From 39be688863544e72de0fb1f9fad42ff77496108d Mon Sep 17 00:00:00 2001 From: Rodrigo Tobar Date: Wed, 23 Sep 2020 12:05:27 +0800 Subject: [PATCH 5/5] Support more flexible server setups Until now every time we start an NGAS server during our tests, its sqlite database is stored not under the NGAS root but under a common directory. This path is fixed, and the only flexibility that further server instances have is to point to the same database without re-creating it, but not to point to a different one. This commit changes where the sqlite database is created in different use cases. When starting single NGAS servers, each server gets its own sqlite file created under its root directory, and when a cluster is created (via prepCluster()) a single database file is created. While this adds the ability to start independent servers that are not connected through a common database, this is true only for sqlite-based tests. Tests using other databases will still have the limitation of being connected to the same database instance. If full independence is needed, those tests should run only when the database backend is sqlite. --- test/commands/test_archive.py | 10 ++++++---- test/ngamsTestLib.py | 23 ++++++++++++++--------- test/test_config_handling.py | 4 ++-- test/test_db.py | 4 ++-- 4 files changed, 24 insertions(+), 17 deletions(-) diff --git a/test/commands/test_archive.py b/test/commands/test_archive.py index 0dffa339..0d59195e 100644 --- a/test/commands/test_archive.py +++ b/test/commands/test_archive.py @@ -1126,8 +1126,9 @@ def test_ArchiveProxyMode_01(self): """ ports = range(8001, 8005) cfg_file, cfg_props = self._genArchProxyCfg(ports) - self.prepExtSrv(port=8000, cfgFile=cfg_file, cfgProps=cfg_props) - self.prepCluster(ports, createDatabase = False) + sqlite_file = tmp_path('ngas.sqlite') + self.prepExtSrv(port=8000, cfgFile=cfg_file, cfgProps=cfg_props, sqlite_file=sqlite_file) + self.prepCluster(ports, createDatabase = False, sqlite_file=sqlite_file) noOfNodes = len(ports) nodeCount = 0 counts = {p: 0 for p in ports} @@ -1228,8 +1229,9 @@ def test_ArchiveProxyMode_03(self): """ ports = range(8001, 8005) cfg_file, cfg_props = self._genArchProxyCfg(ports) - _, dbObj = self.prepExtSrv(port=8000, cfgFile=cfg_file, cfgProps=cfg_props) - self.prepCluster(ports, createDatabase = False) + sqlite_file = tmp_path('ngas.sqlite') + _, dbObj = self.prepExtSrv(port=8000, cfgFile=cfg_file, cfgProps=cfg_props, sqlite_file=sqlite_file) + self.prepCluster(ports, createDatabase = False, sqlite_file=sqlite_file) # Set all Disks in unit :8002 to completed. dbObj.query2("UPDATE ngas_disks SET completed=1 WHERE host_id={0}", args=("%s:8002" % getHostName(),)) # Set :8004 to Offline. diff --git a/test/ngamsTestLib.py b/test/ngamsTestLib.py index cc082abb..f5de08fe 100644 --- a/test/ngamsTestLib.py +++ b/test/ngamsTestLib.py @@ -879,7 +879,8 @@ def prepExtSrv(self, srvModule = None, force=False, daemon = False, - cert_file=None): + cert_file=None, + sqlite_file=None): """ Prepare a standard server object, which runs as a separate process and serves via the standard HTTP interface. @@ -942,10 +943,16 @@ def prepExtSrv(self, cfgObj = self.env_aware_cfg(cfgFile) + # Calculate root directory and clear if needed + root_dir = root_dir or tmp_path('NGAS') + if delDirs: + shutil.rmtree(root_dir, True) + # Change what needs to be changed, like the position of the Sqlite # database file when necessary, the custom configuration items, and the # port number - self.point_to_sqlite_database(cfgObj, not dbCfgName and clearDb) + sqlite_file = sqlite_file or os.path.join(root_dir, 'ngas.sqlite') + self.point_to_sqlite_database(cfgObj, sqlite_file, create=(not dbCfgName and clearDb)) if (cfgProps): for cfgProp in cfgProps: # TODO: Handle Cfg. Group ID. @@ -956,10 +963,7 @@ def prepExtSrv(self, # Now connect to the database and perform any cleanups before we start # the server, like removing existing NGAS dirs and clearing tables - root_dir = root_dir or tmp_path('NGAS') dbObj = ngamsDb.from_config(cfgObj, maxpool=1) - if delDirs: - shutil.rmtree(root_dir, True) if (clearDb): logger.debug("Clearing NGAS DB ...") delNgasTbls(dbObj) @@ -1422,7 +1426,7 @@ def start_srv_in_cluster( return [srvId, port, cfg, db] def prepCluster(self, server_list, cfg_file='src/ngamsCfg.xml', createDatabase=True, - cfg_props=(), cert_file=None): + cfg_props=(), cert_file=None, sqlite_file=None): """ Prepare a common, simulated cluster. This consists of 1 to N servers running on the same node. It is ensured that each of @@ -1457,7 +1461,8 @@ def prepCluster(self, server_list, cfg_file='src/ngamsCfg.xml', createDatabase=T # Create the shared database first of all and generate a new config file tmpCfg = self.env_aware_cfg(cfg_file) - self.point_to_sqlite_database(tmpCfg, createDatabase) + sqlite_file = sqlite_file or tmp_path('ngas.sqlite') + self.point_to_sqlite_database(tmpCfg, sqlite_file, create=createDatabase) if createDatabase: with contextlib.closing(ngamsDb.from_config(tmpCfg, maxpool=1)) as db: delNgasTbls(db) @@ -1480,13 +1485,13 @@ def prepCluster(self, server_list, cfg_file='src/ngamsCfg.xml', createDatabase=T return d - def point_to_sqlite_database(self, cfgObj, create): + def point_to_sqlite_database(self, cfgObj, sqlite_file, create=True): # Exceptional handling for SQLite. if 'sqlite' in cfgObj.getDbInterface().lower(): - sqlite_file = os.path.join(tmp_root, 'ngas.sqlite') if create: rmFile(sqlite_file) + checkCreatePath(os.path.dirname(sqlite_file)) import sqlite3 fname = 'ngamsCreateTables-SQLite.sql' script = utils.b2s(pkg_resources.resource_string('ngamsSql', fname)) # @UndefinedVariable diff --git a/test/test_config_handling.py b/test/test_config_handling.py index 33499b29..23b893b6 100644 --- a/test/test_config_handling.py +++ b/test/test_config_handling.py @@ -40,7 +40,7 @@ from ngamsLib import ngamsConfig, ngamsDb, ngamsXmlMgr from .ngamsTestLib import delNgasTbls, ngamsTestSuite, \ - save_to_tmp + save_to_tmp, tmp_path dbIdAttr = 'Db-Test' @@ -130,7 +130,7 @@ def loadCfg(self, revAttr = "NgamsCfg.Header[1].Revision" cfgObj.storeVal(revAttr, "TEST-REVISION", "ngamsCfg-Test") - self.point_to_sqlite_database(cfgObj, createDatabase) + self.point_to_sqlite_database(cfgObj, tmp_path('ngas.sqlite'), create=createDatabase) dbObj = ngamsDb.from_config(cfgObj, maxpool=1) if (delDbTbls): delNgasTbls(dbObj) cfgObj.writeToDb(dbObj) diff --git a/test/test_db.py b/test/test_db.py index f52866d5..81527d77 100644 --- a/test/test_db.py +++ b/test/test_db.py @@ -28,7 +28,7 @@ class DbTests(ngamsTestLib.ngamsTestSuite): def setUp(self): super(DbTests, self).setUp() cfg = self.env_aware_cfg() - self.point_to_sqlite_database(cfg, True) + self.point_to_sqlite_database(cfg, ngamsTestLib.tmp_path('ngas.sqlite')) self.db = ngamsDb.from_config(cfg, maxpool=1) ngamsTestLib.delNgasTbls(self.db) @@ -47,4 +47,4 @@ def test_get_file_info_list_with_wildcards(self): file_info.setFileId('file-id') file_info.write('host-id', self.db, genSnapshot=0) res = list(self.db.getFileInfoList('disk-id', fileId="*")) - self.assertEqual(1, len(res)) + self.assertEqual(1, len(res)) \ No newline at end of file