Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #116 from CSCfi/dev
Browse files Browse the repository at this point in the history
Release 1.3.0
  • Loading branch information
teemukataja committed Sep 29, 2021
2 parents 295c415 + 69229e9 commit 0452af1
Show file tree
Hide file tree
Showing 19 changed files with 351 additions and 138 deletions.
28 changes: 26 additions & 2 deletions .github/workflows/int.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,31 @@ jobs:
BRANCH=dev
fi
echo "BUILD_BRANCH=$BRANCH" >> $GITHUB_ENV
- name: Clone beacon 2
uses: actions/checkout@v2
with:
repository: 'CSCfi/beacon-2.x'
ref: 'test'
path: beacon2

- name: Download beacon 2 data
env:
BEACON_2_DATA_URL: ${{ secrets.BEACON_2_DATA_URL }}
run: |
cd beacon2/deploy/db
rm data.sql.gz
wget $BEACON_2_DATA_URL
- name: Run beacon 2
run: |
cd beacon2/deploy
docker network create beacon-network_apps
docker-compose up -d db
sleep 30
docker-compose up -d
sleep 30
- name: Build
uses: docker/build-push-action@v2
with:
Expand All @@ -56,12 +80,12 @@ jobs:
run: |
docker-compose -f docker-compose-test.yml up -d
sleep 30
- name: Set up external services for integration
run: ./tests/test_files/add_fixtures.sh

- name: Install integration tests dependencies
run: pip install asyncio httpx
run: pip install asyncio httpx ujson

- name: Run Integration tests
run: python tests/integration/run_tests.py
4 changes: 3 additions & 1 deletion .github/workflows/unit-agg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ jobs:
with:
python-version: ${{ matrix.python-version }}
- name: Install libcurl-devel
run: sudo apt-get install libcurl4-openssl-dev
run: |
sudo apt-get update
sudo apt-get install libcurl4-openssl-dev
- name: Install dependencies
run: |
python -m pip install --upgrade pip
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/unit-reg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ jobs:
with:
python-version: ${{ matrix.python-version }}
- name: Install libcurl-devel
run: sudo apt-get install libcurl4-openssl-dev
run: |
sudo apt-get update
sudo apt-get install libcurl4-openssl-dev
- name: Install dependencies
run: |
python -m pip install --upgrade pip
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ LABEL maintainer "CSC Developers"

RUN apk add --update \
&& apk add --no-cache build-base curl-dev linux-headers bash git \
&& apk add --no-cache libressl-dev libffi-dev \
&& apk add --no-cache libressl-dev libffi-dev libstdc++ \
&& apk add --no-cache supervisor \
&& rm -rf /var/cache/apk/*

Expand All @@ -20,7 +20,7 @@ RUN pip install --upgrade pip && \

FROM python:3.8-alpine3.13

RUN apk add --no-cache --update bash
RUN apk add --no-cache --update libstdc++

LABEL maintainer "CSC Developers"
LABEL org.label-schema.schema-version="1.0"
Expand Down
4 changes: 2 additions & 2 deletions aggregator/config/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Aggregator Configuration."""

import os
import json
import ujson

from configparser import ConfigParser
from collections import namedtuple
Expand All @@ -16,7 +16,7 @@ def load_json(json_file):
data = {}
if os.path.isfile(json_file):
with open(json_file, "r") as contents:
data = json.loads(contents.read())
data = ujson.loads(contents.read())
return data


Expand Down
2 changes: 1 addition & 1 deletion aggregator/config/registries.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[
{
"url": "https://localhost:8080/services",
"url": "http://localhost:9090/services",
"key": "secret"
}
]
189 changes: 117 additions & 72 deletions aggregator/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import os
import sys
import json
import ujson
import ssl

from urllib import parse
Expand Down Expand Up @@ -97,32 +97,36 @@ async def process_url(url):
New in Beacon 2.0: `/g_variants` endpoint replaces the 1.0 `/query` endpoint.
"""
LOG.debug("Processing URLs.")

# convert tuple to list for processing
url = list(url)

# Check which endpoint to use, Beacon 1.0 or 2.0
query_endpoint = "query"
query_endpoints = ["query"]
if url[1] == 2:
query_endpoint = "g_variants"
LOG.debug(f"Using endpoint {query_endpoint}")
query_endpoints = ["individuals", "g_variants", "biosamples", "runs", "analyses", "interactors", "cohorts"]

LOG.debug(f"Using endpoint {query_endpoints}")
urls = []
# Add endpoint
if url[0].endswith("/"):
url[0] += query_endpoint
for endpoint in query_endpoints:
urls.append([url[0] + endpoint, url[1]])
elif url[0].endswith("/service-info"):
url[0] = url[0].replace("service-info", query_endpoint)
for endpoint in query_endpoints:
urls.append([url[0].replace("service-info", endpoint), url[1]])
else:
# Unknown case
# One case is observed, where URL was similar to https://service.institution.org/beacon
# For URLs where the info endpoint is /, but / is not present, let's add /query
url[0] += "/" + query_endpoint
for endpoint in query_endpoints:
urls.append([url[0] + "/" + endpoint, url[1]])

pass

# convert back to tuple after processing
url = tuple(url)

return url
urlTuples = []
for url in urls:
urlTuples.append(tuple(url))
return urlTuples


async def remove_self(url_self, urls):
Expand All @@ -134,10 +138,12 @@ async def remove_self(url_self, urls):
LOG.debug("Look for self from service URLs.")

for url in urls:
url_split = url[0].split("/")
if url_self in url_split:
urls.remove(url)
LOG.debug("Found and removed self from service URLs.")
url = list(url)
for u in url[0]:
url_split = str(u).split("/")
if url_self in url_split:
urls.remove(url)
LOG.debug("Found and removed self from service URLs.")

return urls

Expand Down Expand Up @@ -180,32 +186,40 @@ async def pre_process_payload(version, params):

# parse the query string into a dict
raw_data = dict(parse.parse_qsl(params))

if version == 2:
# default data which is always present
data = {"assemblyId": raw_data.get("assemblyId", "GRCh38"), "includeDatasetResponses": raw_data.get("includeDatasetResponses", "ALL")}

# optionals
if (rn := raw_data.get("referenceName")) is not None:
data["referenceName"] = rn
if (vt := raw_data.get("variantType")) is not None:
data["variantType"] = vt
if (rb := raw_data.get("referenceBases")) is not None:
data["referenceBases"] = rb
if (ab := raw_data.get("alternateBases")) is not None:
data["alternateBases"] = ab

# exact coordinates
if (s := raw_data.get("start")) is not None:
data["start"] = s
if (e := raw_data.get("end")) is not None:
data["end"] = e

# range coordinates
if (smin := raw_data.get("startMin")) is not None and (smax := raw_data.get("startMax")) is not None:
data["start"] = ",".join([smin, smax])
if (emin := raw_data.get("endMin")) is not None and (emax := raw_data.get("endMax")) is not None:
data["end"] = ",".join([emin, emax])
# checks if a query is a listing search
if (raw_data.get("referenceName")) is not None:
# default data which is always present
data = {
"assemblyId": raw_data.get("assemblyId"),
"includeDatasetResponses": raw_data.get("includeDatasetResponses"),
}

# optionals
if (rn := raw_data.get("referenceName")) is not None:
data["referenceName"] = rn
if (vt := raw_data.get("variantType")) is not None:
data["variantType"] = vt
if (rb := raw_data.get("referenceBases")) is not None:
data["referenceBases"] = rb
if (ab := raw_data.get("alternateBases")) is not None:
data["alternateBases"] = ab

# exact coordinates
if (s := raw_data.get("start")) is not None:
data["start"] = s
if (e := raw_data.get("end")) is not None:
data["end"] = e

# range coordinates
if (smin := raw_data.get("startMin")) is not None and (smax := raw_data.get("startMax")) is not None:
data["start"] = ",".join([smin, smax])
if (emin := raw_data.get("endMin")) is not None and (emax := raw_data.get("endMax")) is not None:
data["end"] = ",".join([emin, emax])
else:
# beaconV2 expects some data but in listing search these are not needed thus they are empty
data = {"assemblyId": "", "includeDatasetResponses": ""}

else:
# convert string digits into integers
# Beacon 1.0 uses integer coordinates, while Beacon 2.0 uses string coordinates (ignore referenceName, it should stay as a string)
Expand All @@ -217,6 +231,33 @@ async def pre_process_payload(version, params):
return data


async def find_query_endpoint(service, params):
"""Find endpoint for queries by parameters."""
# since beaconV2 has multiple endpoints this method is used to define those endpoints from parameters
endpoints = service
# if lenght is 1 then beacon is v1
raw_data = dict(parse.parse_qsl(params))
if len(endpoints) <= 1 and raw_data.get("searchInInput") is None:
return service[0]
else:
for endpoint in endpoints:
if raw_data.get("searchInInput") is not None:
if raw_data.get("searchInInput") in endpoint[0]:
if raw_data.get("id") != "0" and raw_data.get("id") is not None:
if raw_data.get("searchByInput") != "" and raw_data.get("searchByInput") is not None:

url = list(endpoint)
url[0] += "/" + raw_data.get("id") + "/" + raw_data.get("searchByInput")
endpoint = tuple(url)
return endpoint

url = list(endpoint)
url[0] += "/" + raw_data.get("id")
endpoint = tuple(url)
return endpoint
return endpoint


async def _service_response(response, ws):
"""Process response to web socket or HTTP."""
result = await response.json()
Expand All @@ -234,7 +275,7 @@ async def _service_response(response, ws):
else:
# The response came from a beacon and is a single object (dict {})
# Send result to websocket
return await ws.send_str(json.dumps(result))
return await ws.send_str(ujson.dumps(result, escape_forward_slashes=False))
else:
# Standard response
return result
Expand All @@ -253,7 +294,7 @@ async def _get_request(session, service, params, headers, ws):
error = {"service": service[0], "queryParams": params, "responseStatus": response.status, "exists": None}
LOG.error(f"Query to {service} failed: {response}.")
if ws is not None:
return await ws.send_str(json.dumps(error))
return await ws.send_str(ujson.dumps(error, escape_forward_slashes=False))
else:
return error

Expand All @@ -262,37 +303,40 @@ async def query_service(service, params, access_token, ws=None):
"""Query service with params."""
LOG.debug("Querying service.")
headers = {}

if access_token:
headers.update({"Authorization": f"Bearer {access_token}"})

endpoint = await find_query_endpoint(service, params)
# Pre-process query string into payload format
data = await pre_process_payload(service[1], params)

# Query service in a session
async with aiohttp.ClientSession() as session:
try:
async with session.post(service[0], json=data, headers=headers, ssl=await request_security()) as response:
LOG.info(f"POST query to service: {service[0]}")
# On successful response, forward response
if response.status == 200:
return await _service_response(response, ws)

elif response.status == 405:
return await _get_request(session, service, params, headers, ws)

else:
# HTTP errors
error = {"service": service[0], "queryParams": params, "responseStatus": response.status, "exists": None}
LOG.error(f"Query to {service} failed: {response}.")
if ws is not None:
return await ws.send_str(json.dumps(error))
if endpoint is not None:
data = await pre_process_payload(endpoint[1], params)

# Query service in a session
async with aiohttp.ClientSession() as session:
try:
async with session.post(endpoint[0], json=data, headers=headers, ssl=await request_security()) as response:
LOG.info(f"POST query to service: {endpoint}")
# On successful response, forward response
if response.status == 200:
return await _service_response(response, ws)
elif response.status == 405:
return await _get_request(session, endpoint, params, headers, ws)
else:
return error

except Exception as e:
LOG.debug(f"Query error {e}.")
web.HTTPInternalServerError(text="An error occurred while attempting to query services.")
# HTTP errors
error = {
"service": endpoint[0],
"queryParams": params,
"responseStatus": response.status,
"exists": None,
}

LOG.error(f"Query to {service} failed: {response}.")
if ws is not None:
return await ws.send_str(ujson.dumps(error, escape_forward_slashes=False))
else:
return error
except Exception as e:
LOG.debug(f"Query error {e}.")
web.HTTPInternalServerError(text="An error occurred while attempting to query services.")


async def ws_bundle_return(result, ws):
Expand All @@ -301,7 +345,7 @@ async def ws_bundle_return(result, ws):

# A simple function to bundle up websocket returns
# when broken down from an aggregator response list
return await ws.send_str(json.dumps(result))
return await ws.send_str(ujson.dumps(result, escape_forward_slashes=False))


async def validate_service_key(key):
Expand Down Expand Up @@ -366,7 +410,8 @@ def load_certs(ssl_context):

try:
ssl_context.load_cert_chain(
os.environ.get("PATH_SSL_CERT_FILE", "/etc/ssl/certs/cert.pem"), keyfile=os.environ.get("PATH_SSL_KEY_FILE", "/etc/ssl/certs/key.pem")
os.environ.get("PATH_SSL_CERT_FILE", "/etc/ssl/certs/cert.pem"),
keyfile=os.environ.get("PATH_SSL_KEY_FILE", "/etc/ssl/certs/key.pem"),
)
ssl_context.load_verify_locations(cafile=os.environ.get("PATH_SSL_CA_FILE", "/etc/ssl/certs/ca.pem"))
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion deploy/app.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/bin/sh

THE_HOST=${APP_HOST:="0.0.0.0"}
THE_PORT=${APP_PORT:="8080"}
Expand Down

0 comments on commit 0452af1

Please sign in to comment.