diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c47b9fe --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +psutil==5.9.5 +fastapi==0.88.0 +python-dotenv==0.21.0 +pydantic==1.10.4 +python-ms-core==0.0.16 +uvicorn==0.20.0 \ No newline at end of file diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..c6396c6 --- /dev/null +++ b/src/config.py @@ -0,0 +1,29 @@ +import os +from dotenv import load_dotenv +from pydantic import BaseSettings + +load_dotenv() + + +class EventBusSettings: + connection_string: str = os.environ.get('QUEUECONNECTION', None) + upload_topic: str = os.environ.get('UPLOAD_TOPIC', None) + upload_subscription: str = os.environ.get('UPLOAD_SUBSCRIPTION', None) + validation_topic: str = os.environ.get('VALIDATION_TOPIC', None) + container_name: str = os.environ.get('CONTAINER_NAME', 'osw') + + +class Settings(BaseSettings): + app_name: str = 'python-osw-validation' + event_bus = EventBusSettings() + auth_permission_url: str = os.environ.get('AUTH_PERMISSION_URL', None) + + @property + def auth_provider(self) -> str: + is_simulated: str = os.environ.get('AUTH_SIMULATE', 'False') + if is_simulated.lower() in ('true', 'yes', '1'): + return 'Simulated' + elif is_simulated.lower() in ('false', 'no', '0'): + return 'Hosted' + else: + return 'Hosted' diff --git a/src/interface/validator_abstract.py b/src/interface/validator_abstract.py new file mode 100644 index 0000000..9c82c6f --- /dev/null +++ b/src/interface/validator_abstract.py @@ -0,0 +1,9 @@ +from abc import ABC, abstractmethod +from python_ms_core.core.queue.models.queue_message import QueueMessage + + +class ValidatorAbstract(ABC): + + @abstractmethod + def validate(self, message: QueueMessage) -> None: + pass diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..f22ea11 --- /dev/null +++ b/src/main.py @@ -0,0 +1,52 @@ +import os +import psutil +from fastapi import FastAPI, APIRouter, Depends, status +from functools import lru_cache +from .config import Settings +from .osw_validator import OswValidator + +app = FastAPI() + +prefix_router = APIRouter(prefix='/health') + + +@lru_cache() +def get_settings(): + return Settings() + + +@app.on_event('startup') +async def startup_event(settings: Settings = Depends(get_settings)) -> None: + try: + OswValidator().start_listening() + except: + print('\n\n\x1b[31m Application startup failed due to missing or invalid .env file \x1b[0m') + print('\x1b[31m Please provide the valid .env file and .env file should contains following parameters\x1b[0m') + print() + print('\x1b[31m UPLOAD_TOPIC=xxxx \x1b[0m') + print('\x1b[31m UPLOAD_SUBSCRIPTION=xxxx \x1b[0m') + print('\x1b[31m VALIDATION_TOPIC=xxxx \x1b[0m') + print('\x1b[31m QUEUECONNECTION=xxxx \x1b[0m') + print('\x1b[31m STORAGECONNECTION=xxxx \x1b[0m \n\n') + parent_pid = os.getpid() + parent = psutil.Process(parent_pid) + for child in parent.children(recursive=True): + child.kill() + parent.kill() + + +@app.get('/', status_code=status.HTTP_200_OK) +@prefix_router.get('/', status_code=status.HTTP_200_OK) +def root(): + return "I'm healthy !!" + + +@app.get('/ping', status_code=status.HTTP_200_OK) +@app.post('/ping', status_code=status.HTTP_200_OK) +@prefix_router.get('/ping', status_code=status.HTTP_200_OK) +@prefix_router.post('/ping', status_code=status.HTTP_200_OK) +def ping(): + return "I'm healthy !!" + + +app.include_router(prefix_router) diff --git a/src/models/queue_message_content.py b/src/models/queue_message_content.py new file mode 100644 index 0000000..e20decc --- /dev/null +++ b/src/models/queue_message_content.py @@ -0,0 +1,243 @@ +import json + + +class ValidationResult: + is_valid: bool + validation_message: str + + +class Upload: + + def __init__(self, data: dict): + upload_data = data.get('data', None) + self._message = data.get('message', None) + self._message_type = data.get('messageType', None) + self._message_id = data.get('messageId', '') + self._published_date = data.get('publishedDate', None) + self.data = UploadData(data=upload_data) if upload_data else {} + + @property + def message(self): + return self._message + + @message.setter + def message(self, value): + self._message = value + + @property + def message_type(self): + return self._message_type + + @message_type.setter + def message_type(self, value): + self._message_type = value + + @property + def message_id(self): + return self._message_id + + @message_id.setter + def message_id(self, value): + self._message_id = value + + @property + def published_date(self): + return self._published_date + + @published_date.setter + def published_date(self, value): + self._published_date = value + + def to_json(self): + self.data = self.data.to_json() + return to_json(self.__dict__) + + def data_from(self): + message = self + if isinstance(message, str): + message = json.loads(self) + if message: + try: + return Upload(data=message) + except Exception as e: + error = str(e).replace('GTFSFlexUpload', 'Invalid parameter,') + raise TypeError(error) + + +class UploadData: + def __init__(self, data: dict): + polygon = data.get('polygon', None) + request = data.get('request', None) + meta = data.get('meta', None) + + response = data.get('response', None) + self._stage = data.get('stage', '') + self.request = Request(data=request) if request else {} + self.meta = Meta(data=meta) if meta else {} + self.response = Response(data=response) if response else {} + self._tdei_record_id = data.get('tdei_record_id', '') + self._tdei_org_id = data.get('tdei_org_id', '') + self._user_id = data.get('user_id', '') + + @property + def stage(self): return self._stage + + @stage.setter + def stage(self, value): self._stage = value + + @property + def tdei_record_id(self): return self._tdei_record_id + + @tdei_record_id.setter + def tdei_record_id(self, value): self._tdei_record_id = value + + @property + def tdei_org_id(self): return self._tdei_org_id + + @tdei_org_id.setter + def tdei_org_id(self, value): self._tdei_org_id = value + + @property + def user_id(self): return self._user_id + + @user_id.setter + def user_id(self, value): self._user_id = value + + def to_json(self): + self.request = to_json(self.request.__dict__) + self.meta = to_json(self.meta.__dict__) + self.response = to_json(self.response.__dict__) + return to_json(self.__dict__) + + +class Request: + def __init__(self, data: dict): + self._tdei_org_id = data.get('tdei_org_id', '') + self._tdei_station_id = data.get('tdei_station_id', '') + self._collected_by = data.get('collected_by', '') + self._collection_date = data.get('collection_date', '') + self._collection_method = data.get('collection_method', '') + self._valid_from = data.get('valid_from', '') + self._valid_to = data.get('valid_to', '') + self._data_source = data.get('data_source', '') + self._polygon = data.get('polygon', {}) + self._pathways_schema_version = data.get('pathways_schema_version', '') + + @property + def tdei_org_id(self): return self._tdei_org_id + + @tdei_org_id.setter + def tdei_org_id(self, value): self._tdei_org_id = value + + @property + def tdei_station_id(self): return self._tdei_station_id + + @tdei_station_id.setter + def tdei_station_id(self, value): self._tdei_station_id = value + + @property + def collected_by(self): return self._collected_by + + @collected_by.setter + def collected_by(self, value): self._collected_by = value + + @property + def collection_date(self): return self._collection_date + + @collection_date.setter + def collection_date(self, value): self._collection_date = value + + @property + def collection_method(self): return self._collection_method + + @collection_method.setter + def collection_method(self, value): self._collection_method = value + + @property + def valid_from(self): return self._valid_from + + @valid_from.setter + def valid_from(self, value): self._valid_from = value + + @property + def valid_to(self): return self._valid_to + + @valid_to.setter + def valid_to(self, value): self._valid_to = value + + @property + def data_source(self): return self._data_source + + @data_source.setter + def data_source(self, value): self._data_source = value + + @property + def polygon(self): return self._polygon + + @polygon.setter + def polygon(self, value): self._polygon = value + + @property + def pathways_schema_version(self): return self._pathways_schema_version + + @pathways_schema_version.setter + def pathways_schema_version(self, value): self._pathways_schema_version = value + + +class Meta: + def __init__(self, data: dict): + self._file_upload_path = data.get('file_upload_path', '') + self._isValid = False + self._validationMessage = '' + self.validationTime = 90 + + @property + def file_upload_path(self): return self._file_upload_path + + @file_upload_path.setter + def file_upload_path(self, value): self._file_upload_path = value + + @property + def isValid(self): return self._isValid + + @isValid.setter + def isValid(self, value): self._isValid = value + + @property + def validationMessage(self): return self._validationMessage + + @validationMessage.setter + def validationMessage(self, value): self._validationMessage = value + + +class Response: + + def __init__(self, data: dict): + self._success = data.get('success', False) + self._message = data.get('message', '') + + @property + def success(self): return self._success + + @success.setter + def success(self, value): self._success = value + + @property + def message(self): return self._message + + @message.setter + def message(self, value): self._message = value + + +def remove_underscore(string: str): + return string if not string.startswith('_') else string[1:] + + +def to_json(data: object): + result = {} + for key in data: + value = data[key] + key = remove_underscore(key) + result[key] = value + + return result diff --git a/src/osw_validator.py b/src/osw_validator.py new file mode 100644 index 0000000..5f956a9 --- /dev/null +++ b/src/osw_validator.py @@ -0,0 +1,118 @@ +import uuid +import datetime +import urllib.parse +from typing import List +from python_ms_core import Core +from python_ms_core.core.queue.models.queue_message import QueueMessage +from python_ms_core.core.storage.abstract.file_entity import FileEntity +from python_ms_core.core.auth.models.permission_request import PermissionRequest +from .models.queue_message_content import Upload, ValidationResult +from .config import Settings + + +class OswValidator: + _settings = Settings() + + def __init__(self): + core = Core() + options = { + 'provider': self._settings.auth_provider, + 'api_url': self._settings.auth_permission_url + } + listening_topic_name = self._settings.event_bus.upload_topic or '' + publishing_topic_name = self._settings.event_bus.validation_topic or '' + self.subscription_name = self._settings.event_bus.upload_subscription or '' + self.listening_topic = core.get_topic(topic_name=listening_topic_name) + self.publishing_topic = core.get_topic(topic_name=publishing_topic_name) + self.logger = core.get_logger() + self.storage_client = core.get_storage_client() + self.auth = core.get_authorizer(config=options) + self.container_name = self._settings.event_bus.container_name + + def start_listening(self): + def process(message) -> None: + if message is not None: + queue_message = QueueMessage.to_dict(message) + upload_message = Upload.data_from(queue_message) + self.validate(upload_message) + + self.listening_topic.subscribe(subscription=self.subscription_name, callback=process) + + def validate(self, received_message: Upload): + tdei_record_id: str = '' + try: + + tdei_record_id = received_message.data.tdei_record_id + print(f'Received message for : {tdei_record_id} Message received for osw validation !') + if received_message.data.response.success is False: + error_msg = 'Received failed workflow request' + print(tdei_record_id, error_msg, received_message) + + if received_message.data.meta.file_upload_path is None: + error_msg = 'Request does not have valid file path specified.' + print(tdei_record_id, error_msg, received_message) + raise Exception(error_msg) + + if self.has_permission(roles=['tdei-admin', 'poc', 'osw_data_generator'], queue_message=received_message) is None: + error_msg = 'Unauthorized request !' + print(tdei_record_id, error_msg, received_message) + raise Exception(error_msg) + + url = urllib.parse.unquote(received_message.data.meta.file_upload_path) + file_entity = self.storage_client.get_file_from_url(container_name=self.container_name, full_url=url) + if file_entity: + # TODO: Validation + validation_result = self.dummy_validation(file_entity=file_entity, queue_message=received_message) + self.send_status(result=validation_result, upload_message=received_message) + else: + raise Exception('File entity not found') + except Exception as e: + print(f'{tdei_record_id} Error occurred while validating osw request, {e}') + result = ValidationResult() + result.is_valid = False + result.validation_message = f'Error occurred while validating osw request {e}' + self.send_status(result=result, upload_message=received_message) + + def dummy_validation(self, file_entity: FileEntity, queue_message: Upload): + file_name = file_entity.name + result = ValidationResult() + if 'invalid' in file_name: + result.is_valid = False + result.validation_message = 'file name contains invalid' + else: + result.is_valid = True + result.validation_message = '' + return result + + def send_status(self, result: ValidationResult, upload_message: Upload): + + upload_message.data.meta.isValid = result.is_valid + upload_message.data.meta.validationMessage = result.validation_message + upload_message.data.stage = 'osw-validation' + + upload_message.data.response.success = result.is_valid + upload_message.data.response.message = result.validation_message + + data = QueueMessage.data_from({ + 'messageId': uuid.uuid1().hex[0:24], + 'message': 'OSW validation output', + 'messageType': 'osw-validation', + 'data': upload_message.data.to_json(), + 'publishedDate': str(datetime.datetime.now()) + }) + self.publishing_topic.publish(data=data) + print(f'Publishing message for : {upload_message.data.tdei_record_id}') + + def has_permission(self, roles: List[str], queue_message: Upload) -> bool: + try: + permission_request = PermissionRequest( + user_id=queue_message.data.user_id, + org_id=queue_message.data.tdei_org_id, + permissions=roles, + should_satisfy_all=False + ) + response = self.auth.has_permission(request_params=permission_request) + return response if response is not None else False + except Exception as error: + print('Error validating the request authorization:', error) + return False