Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ RESPONSE_TOPIC=xxxx
REQUEST_SUBSCRIPTION=xxxx
QUEUECONNECTION=xxxx
STORAGECONNECTION=xxxx
MAX_CONCURRENT_MESSAGES=xx
```
The application connect with the `STORAGECONNECTION` string provided in `.env` file and validates downloaded zipfile using `tdei-gtfs-csv-validator` package.
`QUEUECONNECTION` is not being used in this application but this is the main requirement for `python-ms-core` package

`MAX_CONCURRENT_MESSAGES` is the maximum number of concurrent messages that the service can handle. If not provided, defaults to 1

### How to Setup and Build
Follow the steps to install the node packages required for both building and running the application.

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ fastapi~=0.111.1
pydantic==1.10.4
html_testRunner==1.2.1
uvicorn==0.20.0
python-ms-core==0.0.19
python-ms-core==0.0.22
tcat-gtfs-csv-validator~=0.0.38
5 changes: 5 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import uuid
from dotenv import load_dotenv
from pydantic import BaseSettings

Expand All @@ -11,3 +12,7 @@ class Settings(BaseSettings):
response_topic_name: str = os.environ.get('RESPONSE_TOPIC', None)
request_subscription: str = os.environ.get('REQUEST_SUBSCRIPTION', None)
storage_container_name: str = os.environ.get('CONTAINER_NAME', 'gtfsflex')
max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 2)

def get_unique_id(self) -> str:
return str(uuid.uuid4())
22 changes: 14 additions & 8 deletions src/gtfs_flex_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

class GTFSFlexValidation:
def __init__(self, file_path=None, storage_client=None):
settings = Settings()
self.container_name = settings.storage_container_name
self.settings = Settings()
self.container_name = self.settings.storage_container_name
self.storage_client = storage_client
self.file_path = file_path
self.file_relative_path = file_path.split('/')[-1]
Expand All @@ -44,9 +44,9 @@ def is_gtfs_flex_valid(self) -> tuple[Union[bool, Any], Union[str, Any]]:
validation_message = ''
root, ext = os.path.splitext(self.file_relative_path)
if ext and ext.lower() == '.zip':
downloaded_file_path = self.download_single_file(self.file_path)
logger.info(f' Downloaded file path: {downloaded_file_path}')
try:
downloaded_file_path = self.download_single_file(self.file_path)
logger.info(f' Downloaded file path: {downloaded_file_path}')
gcv_test_release.test_release(DATA_TYPE, SCHEMA_VERSION, downloaded_file_path)
is_valid = True
except Exception as err:
Expand All @@ -67,17 +67,23 @@ def download_single_file(self, file_upload_path=None) -> str:
if not is_exists:
os.makedirs(DOWNLOAD_FILE_PATH)

unique_folder = self.settings.get_unique_id()
dl_folder_path = os.path.join(DOWNLOAD_FILE_PATH, unique_folder)

# Ensure the unique folder path is created
os.makedirs(dl_folder_path, exist_ok=True)

file = self.storage_client.get_file_from_url(self.container_name, file_upload_path)
try:
if file.file_path:
file_path = os.path.basename(file.file_path)
with open(f'{DOWNLOAD_FILE_PATH}/{file_path}', 'wb') as blob:
with open(f'{dl_folder_path}/{file_path}', 'wb') as blob:
blob.write(file.get_stream())
logger.info(f' File downloaded to location: {DOWNLOAD_FILE_PATH}/{file_path}')
return f'{DOWNLOAD_FILE_PATH}/{file_path}'
logger.info(f' File downloaded to location: {dl_folder_path}/{file_path}')
return f'{dl_folder_path}/{file_path}'
else:
logger.info(' File not found!')
raise Exception('File not found!')
raise Exception('File not found!')
except Exception as e:
traceback.print_exc()
logger.error(e)
Expand Down
80 changes: 44 additions & 36 deletions src/gtfx_flex_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .serializer.gtfx_flex_serializer import GTFSFlexUpload
from .models.file_upload_msg import FileUploadMsg
import threading
import time

logging.basicConfig()
logger = logging.getLogger('FLEX_VALIDATOR')
Expand All @@ -18,14 +19,15 @@ class GTFSFlexValidator:
_settings = Settings()

def __init__(self):
core = Core()
settings = Settings()
self._subscription_name = settings.request_subscription
self.request_topic = core.get_topic(topic_name=settings.request_topic_name)
self.response_topic = core.get_topic(topic_name=settings.response_topic_name)
self.logger = core.get_logger()
self.storage_client = core.get_storage_client()
self.subscribe()
self.core = Core()
self.settings = Settings()
self._subscription_name = self.settings.request_subscription
self.request_topic = self.core.get_topic(topic_name=self.settings.request_topic_name,max_concurrent_messages=self.settings.max_concurrent_messages)
# self.response_topic = self.core.get_topic(topic_name=self.settings.response_topic_name)
self.logger = self.core.get_logger()
self.storage_client = self.core.get_storage_client()
self.listening_thread = threading.Thread(target=self.subscribe)
self.listening_thread.start()

def subscribe(self) -> None:
# Process the incoming message
Expand All @@ -34,47 +36,53 @@ def process(message) -> None:
gtfs_upload_message = QueueMessage.to_dict(message)
upload_msg = FileUploadMsg.from_dict(gtfs_upload_message)
logger.info(upload_msg)
# upload_message = GTFSFlexUpload.data_from(gtfs_upload_message)
process_thread = threading.Thread(target=self.process_message,args=[upload_msg])
process_thread.start()
self.process_message(upload_msg)
else:
logger.info(' No Message')

self.request_topic.subscribe(subscription=self._subscription_name, callback=process)

def process_message(self, upload_msg: FileUploadMsg) -> None:
file_upload_path = urllib.parse.unquote(upload_msg.data.file_upload_path)
logger.info(f' Received message for Project Group: {upload_msg.data.tdei_project_group_id}')
logger.info(file_upload_path)
if file_upload_path:
# Do the validation in the other class
validator = GTFSFlexValidation(file_path=file_upload_path, storage_client=self.storage_client)
validation = validator.validate()
self.send_status(valid=validation[0], upload_message=upload_msg,
validation_message=validation[1])
else:
logger.info(' No file Path found in message!')

try:
file_upload_path = urllib.parse.unquote(upload_msg.data.file_upload_path)
logger.info(f' Received message for Project Group: {upload_msg.data.tdei_project_group_id}')
logger.info(file_upload_path)
if file_upload_path:
# Do the validation in the other class
validator = GTFSFlexValidation(file_path=file_upload_path, storage_client=self.storage_client)
validation = validator.validate()
self.send_status(valid=validation[0], upload_message=upload_msg,
validation_message=validation[1])
else:
logger.info(' No file Path found in message!')
except Exception as e:
logger.error(f' Error processing message: {e}')
self.send_status(valid=False, upload_message=upload_msg, validation_message=str(e))

def send_status(self, valid: bool, upload_message: FileUploadMsg, validation_message: str = '') -> None:
# upload_message.data.stage = 'flex-validation'
# upload_message.data.meta.isValid = valid
# upload_message.data.meta.validationMessage = validation_message or 'Validation successful'
# upload_message.data.response.success = valid
# upload_message.data.response.message = validation_message or 'Validation successful'
# message_id = uuid.uuid1().hex[0:24]
response_message = {
"file_upload_path": upload_message.data.file_upload_path,
"user_id": upload_message.data.user_id ,
"tdei_project_group_id": upload_message.data.tdei_project_group_id,
"success": valid,
"message": validation_message
}
"file_upload_path": upload_message.data.file_upload_path,
"user_id": upload_message.data.user_id ,
"tdei_project_group_id": upload_message.data.tdei_project_group_id,
"success": valid,
"message": validation_message
}
logger.info(f' Publishing new message with ID: {upload_message.messageId} with status: {valid} and Message: {validation_message}')
data = QueueMessage.data_from({
'messageId': upload_message.messageId,
'message': 'Validation complete',
'messageType': upload_message.messageType,
'data': response_message
})
self.response_topic.publish(data=data)
return
self.send_response(data=data)

def send_response(self, data: QueueMessage) -> None:
try:
response_topic = self.core.get_topic(self.settings.response_topic_name)
response_topic.publish(data=data)
except Exception as e:
logger.error(f'Error sending response: {e}')

def stop_listening(self):
self.listening_thread.join(timeout=0)
8 changes: 7 additions & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .gtfx_flex_validator import GTFSFlexValidator

app = FastAPI()
app.flex_validator = None

prefix_router = APIRouter(prefix='/health')

Expand All @@ -18,7 +19,7 @@ def get_settings():
@app.on_event('startup')
async def startup_event(settings: Settings = Depends(get_settings)) -> None:
try:
GTFSFlexValidator()
app.flex_validator = GTFSFlexValidator()
except:
print('\n\n\x1b[31m Application startup failed due to missing or invalid .env file \x1b[0m')
print('\x1b[31m Please provide the valid .env file and .env file should contains following parameters\x1b[0m')
Expand All @@ -34,6 +35,11 @@ async def startup_event(settings: Settings = Depends(get_settings)) -> None:
child.kill()
parent.kill()

@app.on_event('shutdown')
async def shutdown_event():
if app.flex_validator:
app.flex_validator.shutdown()


@app.get('/', status_code=status.HTTP_200_OK)
@prefix_router.get('/', status_code=status.HTTP_200_OK)
Expand Down
4 changes: 2 additions & 2 deletions test_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# Define your test cases
from tests.unit_tests.test_gtfs_flex_serializer import TestGTFSFlexUpload, TestGTFSFlexUploadData, TestRequest, \
TestMeta, TestResponse
from tests.unit_tests.test_gtfs_flex_validation import TestSuccessGTFSFlexValidation, TestFailureGTFSFlexValidation, TestSuccessWithWithMacOSFile
from tests.unit_tests.test_gtfs_flex_validation import TestSuccessGTFSFlexValidation, TestFailureGTFSFlexValidation, TestSuccessWithMacOSFile
from tests.unit_tests.test_gtfx_flex_validator import TestGTFSFlexValidator
from tests.unit_tests.test_file_upload_msg import TestFileUploadMsg
from tests.unit_tests.test_main import TestApp
Expand All @@ -19,7 +19,7 @@
test_suite.addTest(unittest.makeSuite(TestMeta))
test_suite.addTest(unittest.makeSuite(TestResponse))
test_suite.addTest(unittest.makeSuite(TestSuccessGTFSFlexValidation))
test_suite.addTest(unittest.makeSuite(TestSuccessWithWithMacOSFile))
test_suite.addTest(unittest.makeSuite(TestSuccessWithMacOSFile))
test_suite.addTest(unittest.makeSuite(TestFailureGTFSFlexValidation))
test_suite.addTest(unittest.makeSuite(TestGTFSFlexValidator))
test_suite.addTest(unittest.makeSuite(TestApp))
Expand Down
62 changes: 48 additions & 14 deletions tests/unit_tests/test_gtfs_flex_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pathlib import Path
from unittest.mock import patch, MagicMock
from src.gtfs_flex_validation import GTFSFlexValidation
from src.config import Settings

DOWNLOAD_FILE_PATH = f'{Path.cwd()}/downloads'
SAVED_FILE_PATH = f'{Path.cwd()}/tests/unit_tests/test_files'
Expand All @@ -16,7 +17,7 @@
SCHEMA_VERSION = 'v2.0'


class TestSuccessWithWithMacOSFile(unittest.TestCase):
class TestSuccessWithMacOSFile(unittest.TestCase):
@patch.object(GTFSFlexValidation, 'download_single_file')
def setUp(self, mock_download_single_file):
os.makedirs(DOWNLOAD_FILE_PATH, exist_ok=True)
Expand All @@ -33,6 +34,7 @@ def setUp(self, mock_download_single_file):
self.validator.file_path = file_path
self.validator.file_relative_path = MAC_SUCCESS_FILE_NAME
self.validator.container_name = None
self.validator.settings = Settings()
mock_download_single_file.return_value = file_path

def tearDown(self):
Expand All @@ -47,11 +49,11 @@ def test_validate_with_valid_file(self):

# Act
is_valid, _ = self.validator.validate()
(is_valid)

# Assert
self.assertTrue(is_valid)


class TestSuccessGTFSFlexValidation(unittest.TestCase):

@patch.object(GTFSFlexValidation, 'download_single_file')
Expand All @@ -64,13 +66,16 @@ def setUp(self, mock_download_single_file):
shutil.copyfile(source, destination)

file_path = f'{DOWNLOAD_FILE_PATH}/{SUCCESS_FILE_NAME}'
dl_folder_path = os.path.join(DOWNLOAD_FILE_PATH, 'dummy-uuid') # Mock the UUID generation
os.makedirs(dl_folder_path, exist_ok=True) # Ensure this directory is created in the test

with patch.object(GTFSFlexValidation, '__init__', return_value=None):
self.validator = GTFSFlexValidation(file_path=file_path, storage_client=MagicMock())
self.validator.file_path = file_path
self.validator.file_relative_path = SUCCESS_FILE_NAME
self.validator.container_name = None
mock_download_single_file.return_value = file_path
self.validator.settings = Settings()
mock_download_single_file.return_value = os.path.join(dl_folder_path, SUCCESS_FILE_NAME)

def tearDown(self):
pass
Expand Down Expand Up @@ -123,6 +128,33 @@ def test_download_single_file(self):
content = f.read()
self.assertEqual(content, b'file_content')

def test_download_multiple_file_with_same_name(self):
# Arrange
file_upload_path = DOWNLOAD_FILE_PATH
self.validator.storage_client = MagicMock()
self.validator.storage_client.get_file_from_url = MagicMock()
file = MagicMock()
file.file_path = 'text_file.txt'
file.get_stream = MagicMock(return_value=b'file_content')
self.validator.storage_client.get_file_from_url.return_value = file

# Act
first_downloaded_file_path = self.validator.download_single_file(file_upload_path=file_upload_path)
second_downloaded_file_path = self.validator.download_single_file(file_upload_path=file_upload_path)

# Assert
self.assertNotEqual(first_downloaded_file_path, second_downloaded_file_path,
"The downloaded file paths should be different for files with the same name.")

# Check if the get_file_from_url was called for both download attempts
self.assertEqual(self.validator.storage_client.get_file_from_url.call_count, 2,
"get_file_from_url should be called twice for two downloads.")
file.get_stream.assert_called()

# Additional assertions to verify that the paths indeed point to different locations
self.assertTrue(first_downloaded_file_path.startswith(DOWNLOAD_FILE_PATH))
self.assertTrue(second_downloaded_file_path.startswith(DOWNLOAD_FILE_PATH))

def test_clean_up_file(self):
# Arrange
file_upload_path = DOWNLOAD_FILE_PATH
Expand All @@ -149,7 +181,7 @@ def test_clean_up_folder(self):
GTFSFlexValidation.clean_up = MagicMock()

# Assert
# self.assertFalse(os.path.exists(directory_name))
self.assertFalse(os.path.exists(directory_name))


class TestFailureGTFSFlexValidation(unittest.TestCase):
Expand All @@ -170,6 +202,7 @@ def setUp(self, mock_download_single_file):
self.validator.file_path = file_path
self.validator.file_relative_path = FAILURE_FILE_NAME
self.validator.container_name = None
self.validator.settings = MagicMock()
mock_download_single_file.return_value = file_path

def tearDown(self):
Expand Down Expand Up @@ -223,19 +256,20 @@ def test_download_single_file_exception(self):
self.validator.storage_client.get_file_from_url = MagicMock()
file = MagicMock()
file.file_path = 'text_file.txt'
file.get_stream = MagicMock(return_value=b'file_content')
file.get_stream = MagicMock(side_effect=FileNotFoundError("Mocked FileNotFoundError"))
self.validator.storage_client.get_file_from_url.return_value = file

# Act
downloaded_file_path = self.validator.download_single_file(file_upload_path=file_upload_path)
# Create the mock folder that would be used
unique_id = "mocked-uuid"
self.validator.settings.get_unique_id = MagicMock()
self.validator.settings.get_unique_id.return_value = unique_id

# Assert
self.validator.storage_client.get_file_from_url.assert_called_once_with(self.validator.container_name,
file_upload_path)
file.get_stream.assert_called_once()
with open(downloaded_file_path, 'rb') as f:
content = f.read()
self.assertEqual(content, b'file_content')
dl_folder_path = os.path.join(DOWNLOAD_FILE_PATH, unique_id)
os.makedirs(dl_folder_path, exist_ok=True)

# Act & Assert
with self.assertRaises(FileNotFoundError):
self.validator.download_single_file(file_upload_path=file_upload_path)


if __name__ == '__main__':
Expand Down