Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ported partner sites feature from ALMA NGAS source code #28

Merged
merged 6 commits into from
Sep 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 30 additions & 11 deletions src/ngamsServer/ngamsServer/commands/retrieve.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,21 @@ def _handleCmdRetrieve(srvObj,
fileVer, include_compression=True)

# If not located the quick way try the normal way.
if (not ipAddress):
# Locate the file best suiting the query and send it back if possible.
location, host, ipAddress, port, mountPoint, filename, fileId,\
fileVersion, mimeType, compression =\
ngamsFileUtils.locateArchiveFile(srvObj, fileId, fileVer,
diskId, hostId, reqPropsObj,
include_compression=True)
if not ipAddress:
try:
# Locate the file best suiting the query and send it back if possible.
location, host, ipAddress, port, mountPoint, filename, fileId, \
fileVersion, mimeType, compression = \
ngamsFileUtils.locateArchiveFile(srvObj, fileId, fileVer,
diskId, hostId, reqPropsObj,
include_compression=True)
except:
# If the file is still not found then try a remote partner site
location, host, ipAddress, port, mountPoint, filename, fileId, \
fileVersion, mimeType, compression = \
ngamsFileUtils.lookup_partner_site_file(srvObj, fileId,
fileVer, reqPropsObj,
include_compression=True)

# If still not located, try to contact associated NGAS sites to query
# if the file is available there.
Expand All @@ -310,9 +318,8 @@ def _handleCmdRetrieve(srvObj,
# Perform the possible processing requested.
procResult, compression = performProcessing(srvObj, reqPropsObj, srcFilename,
mimeType, compression)
elif location in (NGAMS_HOST_CLUSTER, NGAMS_HOST_REMOTE) and \
srvObj.getCfg().getProxyMode():

elif location == NGAMS_HOST_CLUSTER and srvObj.getCfg().getProxyMode():
logger.info("NG/AMS Server acting as proxy - requesting file with ID: %s " +\
"from NG/AMS Server on host/port: %s/%s",
fileId, host, str(port))
Expand All @@ -324,6 +331,18 @@ def _handleCmdRetrieve(srvObj,
httpRef.proxy_request(host, ipAddress, port, timeout=timeout)
return

elif location == NGAMS_HOST_REMOTE and srvObj.is_partner_sites_proxy_mode():
logger.info("NG/AMS Server acting as remote proxy - requesting file with ID: %s " + \
"from NG/AMS Server on host/port: %s/%s",
fileId, host, str(port))

# Act as a remote proxy - get the file from the remote NGAS host
# specified and send back the contents. The file is temporarily stored
# in the Processing Area.
timeout = float(reqPropsObj['timeout']) if 'timeout' in reqPropsObj else 300
httpRef.remote_proxy_request(reqPropsObj, host, port, timeout=timeout)
return

else:
# No proxy mode: A redirection HTTP response is generated.
httpRef.redirect(ipAddress, port)
Expand All @@ -334,8 +353,8 @@ def _handleCmdRetrieve(srvObj,


def handleCmd(srvObj,
reqPropsObj,
httpRef):
reqPropsObj,
httpRef):
"""
Handle a RETRIEVE command.

Expand Down
66 changes: 43 additions & 23 deletions src/ngamsServer/ngamsServer/commands/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

import six

from ngamsLib.ngamsCore import NGAMS_HOST_LOCAL,\
from ngamsLib.ngamsCore import NGAMS_HOST_LOCAL, NGAMS_HOST_REMOTE,\
getHostName, genLog, genUniqueId, rmFile,\
compressFile, NGAMS_GZIP_XML_MT, getNgamsVersion,\
NGAMS_SUCCESS, NGAMS_XML_MT, fromiso8601, toiso8601
Expand Down Expand Up @@ -87,12 +87,23 @@ def _checkFileAccess(srvObj,

# Check if the file is located on this host, or if the request should be
# forwarded (if this server should act as proxy).
location, fileHost, ipAddress, filePortNo, mountPoint, filename, fileId,\
fileVersion, mimeType =\
ngamsFileUtils.locateArchiveFile(srvObj, fileId, fileVersion,
diskId)
try:
location, fileHost, ipAddress, filePortNo, mountPoint, filename, \
fileId, fileVersion, mimeType = \
ngamsFileUtils.locateArchiveFile(srvObj, fileId, fileVersion,
diskId, reqPropsObj=reqPropsObj)
except:
# If the file is still not found then try a remote partner site
location, fileHost, ipAddress, filePortNo, mountPoint, filename, \
fileId, fileVersion, mimeType = \
ngamsFileUtils.lookup_partner_site_file(srvObj, fileId,
fileVersion, reqPropsObj)

if location == NGAMS_HOST_REMOTE:
fileHost = "{0}:{1}".format(fileHost, filePortNo)
return genLog("NGAMS_INFO_FILE_AVAIL", [fileId + "/Version: " + str(fileVersion), fileHost])

if (location != NGAMS_HOST_LOCAL):
elif location != NGAMS_HOST_LOCAL:
# Go and get it!
host, port = srvObj.get_remote_server_endpoint(fileHost)
pars = (('file_access', fileId), ('file_version', fileVersion), ('disk_id', diskId))
Expand Down Expand Up @@ -387,8 +398,8 @@ def _handleFileListReply(srvObj,


def handleCmd(srvObj,
reqPropsObj,
httpRef):
reqPropsObj,
httpRef):
"""
Handle STATUS command.

Expand All @@ -403,12 +414,11 @@ def handleCmd(srvObj,
Returns: Void.
"""
status = ngamsStatus.ngamsStatus()
status.\
setDate(toiso8601()).\
setVersion(getNgamsVersion()).setHostId(srvObj.getHostId()).\
setStatus(NGAMS_SUCCESS).\
setMessage("Successfully handled command STATUS").\
setState(srvObj.getState()).setSubState(srvObj.getSubState())
status.setDate(toiso8601()).\
setVersion(getNgamsVersion()).setHostId(srvObj.getHostId()).\
setStatus(NGAMS_SUCCESS).\
setMessage("Successfully handled command STATUS").\
setState(srvObj.getState()).setSubState(srvObj.getSubState())

reqPropsObjRef = reqPropsObj

Expand Down Expand Up @@ -515,17 +525,27 @@ def handleCmd(srvObj,
genDiskStatus = 1
elif (fileId):
if (not fileVersion): fileVersion = -1
fileObj = ngamsFileInfo.ngamsFileInfo()
fileObj.read(srvObj.getHostId(), srvObj.getDb(), fileId, fileVersion)
diskObj = ngamsDiskInfo.ngamsDiskInfo()
try:
diskObj.read(srvObj.getDb(), fileObj.getDiskId())
fileObj = ngamsFileInfo.ngamsFileInfo()
fileObj.read(srvObj.getHostId(), srvObj.getDb(), fileId, fileVersion)
diskObj = ngamsDiskInfo.ngamsDiskInfo()
try:
diskObj.read(srvObj.getDb(), fileObj.getDiskId())
except:
errMsg = "Illegal Disk ID found: {0} for file with ID: {1}".format(fileObj.getDiskId(), fileId)
raise Exception(errMsg)
diskObj.addFileObj(fileObj)
status.addDiskStatus(diskObj)
except:
errMsg = "Illegal Disk ID found: %s for file with ID: %s" %\
(fileObj.getDiskId(), fileId)
raise Exception(errMsg)
diskObj.addFileObj(fileObj)
status.addDiskStatus(diskObj)
# The file was not found in the database. Check if it is available
# on a remote partner site.
host, port, status_info, disk_info, file_info = \
ngamsFileUtils.lookup_partner_site_file_status(srvObj, fileId,
fileVersion,
reqPropsObj)
# Update status reply using the disk status and file status
# information retrieved from the partner site
status.addDiskStatus(disk_info)
genDiskStatus = 1
genFileStatus = 1
elif (requestId):
Expand Down
166 changes: 165 additions & 1 deletion src/ngamsServer/ngamsServer/ngamsFileUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,170 @@
# `from_bytes` functions need to be aligned.
checksum_info = collections.namedtuple('crc_info', 'init method final from_bytes equals')


def lookup_partner_site_file_status(ngas_server,
file_id,
file_version,
request_properties):
"""
Lookup the file indicated by the File ID using a NGAS partner site (if one
is specified in the configuration). Returns a tuple of status objects.

Parameters:

ngas_server: Reference to NG/AMS server class object (ngamsServer).

file_id: File ID of file to locate (string).

file_version: Version of the file (integer).

request_properties: Request Property object to keep track of actions done
during the request handling (ngamsReqProps|None).

Returns:

host: Partner site host name (or IP address)

port: Partner site port number

status_info: Status response object (ngamsStatus)

disk_info: Status response disk information object (ngamsDiskInfo)

file_info: Status response file information object (ngamsFileInfo)
"""
file_reference = file_id
if (file_version > 0):
file_reference += "/Version: " + str(file_version)

# If the request came from a partner site. We will not continue to
# propagate the request to avoid a death loop scenario. We will raise an
# exception.
if request_properties.hasHttpPar("partner_site_redirect"):
error_message = genLog("NGAMS_ER_UNAVAIL_FILE", [file_reference])
raise Exception(error_message)

# Check partner sites is enabled are available from the configuration
if not ngas_server.is_partner_sites_proxy_mode()\
or not ngas_server.get_partner_sites_address_list():
error_message = genLog("NGAMS_ER_UNAVAIL_FILE", [file_reference])
raise Exception(error_message)

# Lets query the partner sites for the availability of the requested file
authentication_header = ngamsSrvUtils.genIntAuthHdr(ngas_server)
parameter_list = [["file_id", file_id]]
if file_version != -1:
parameter_list.append(["file_version", file_version])
parameter_list.append(["partner_site_redirect", 1])

host, port, status_info, disk_info, file_info = None, None, None, None, None
for partner_site in ngas_server.get_partner_sites_address_list():
partner_address = partner_site.split(":")[0]
partner_port = int(partner_site.split(":")[-1])
try:
logger.info("Looking up file ID %s on partner site %s", file_id,
partner_site)
response = ngamsHttpUtils.httpGet(partner_address, partner_port,
NGAMS_STATUS_CMD,
parameter_list,
auth=authentication_header)
with contextlib.closing(response):
response_info = response.read()
except:
# We ignore this error, and try the next partner site, if any
continue

status_info = ngamsStatus.ngamsStatus().unpackXmlDoc(response_info, 1)
logger.info("Result of File Access Query: {}".format(re.sub("\n", "",
str(status_info.genXml().toprettyxml(' ', '\n')))))
if status_info.getStatus() == "FAILURE":
logger.info(genLog("NGAMS_INFO_FILE_NOT_AVAIL", [file_id, partner_address]))
else:
logger.info(genLog("NGAMS_INFO_FILE_AVAIL", [file_id, partner_address]))
disk_info = status_info.getDiskStatusList()[0]
file_info = disk_info.getFileObjList()[0]
host = partner_address
port = partner_port
break
rtobar marked this conversation as resolved.
Show resolved Hide resolved

if status_info is None or status_info.getStatus() == "FAILURE":
# Failed to find file on a partner site
error_message = genLog("NGAMS_ER_UNAVAIL_FILE", [file_reference])
raise Exception(error_message)

return host, port, status_info, disk_info, file_info


def lookup_partner_site_file(ngas_server,
file_id,
file_version,
request_properties,
include_compression):
"""
Lookup the file indicated by the File ID using a NGAS partner site (if one
is specified in the configuration). Returns a list containing the necessary
information for retrieving the file:

[<Location>, <File Host>, <IP Address>, <Port No>, <Mount Point>,
<Filename>, <File ID>, <File Version>, <Mime-Type>]

- whereby:

<Location> = Location of the file (NGAMS_HOST_LOCAL,
NGAMS_HOST_CLUSTER, NGAMS_HOST_DOMAIN,
NGAMS_HOST_REMOTE).
<File Host> = Host ID of host to be contacted to get access to the
file.
<IP Address> = IP Address of host to be contacted to get access to the
file.
<Port No> = Port number used by the NG/AMS Server.
<Mount Point> = Mount point at which the file is residing.
<Filename> = Name of file relative to mount point.
<File ID> = ID of file.
<File Version> = Version of file.
<Mime-Type> = Mime-type of file (as registered in NGAS).

ngas_server: Reference to NG/AMS server class object (ngamsServer).

file_id: File ID of file to locate (string).

file_version: Version of the file (integer).

request_properties: Request Property object to keep track of actions done
during the request handling (ngamsReqProps|None).

Returns: List with information about file location (list).
"""
host, port, status_info, disk_info, file_info = \
lookup_partner_site_file_status(ngas_server, file_id, file_version,
request_properties)

location = NGAMS_HOST_REMOTE
ip_address = None

# The file was found, get the info necessary for the acquiring the file.
file_attribute_list = [ location, host, ip_address, port,
disk_info.getMountPoint(),
file_info.getFilename(),
file_info.getFileId(),
file_info.getFileVersion(),
file_info.getFormat() ]

if include_compression:
file_attribute_list.append(file_info.getCompression())

message = "Located suitable file for request - File ID: %s. " \
+ "Info for file found - Location: %s - Host ID/IP: %s/%s - " \
+ "Port Number: %s - File Version: %d - Filename: %s - " \
+ "Mime-type: %s"
logger.debug(message, file_id, location, host, ip_address, port,
file_info.getFileVersion(),
file_info.getFilename(),
file_info.getFormat())

return file_attribute_list


def _locateArchiveFile(srvObj,
fileId,
fileVersion,
Expand Down Expand Up @@ -764,4 +928,4 @@ def check_checksum(srvObj, fio, filename):
"%s/%s/%s - skipping checksum check"
logger.info(msg, fio.getDiskId(), fio.getFileId(), fio.getFileVersion())

# EOF
# EOF
Loading