Skip to content

Commit

Permalink
Merge pull request #231 from eclecticiq/nested-job-and-details
Browse files Browse the repository at this point in the history
Nest details inside job and allow counts without details
  • Loading branch information
erwin-eiq committed Jun 5, 2022
2 parents 9538a6e + 04eb509 commit e0da85c
Show file tree
Hide file tree
Showing 13 changed files with 280 additions and 216 deletions.
4 changes: 4 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
Changelog
=========

0.8.0 (2022-06-05)
------------------
* Nest details inside taxii2 job and allow counts without details

0.7.0 (2022-05-27)
------------------
* Nest taxii2 endpoints under `/taxii2/`
Expand Down
2 changes: 1 addition & 1 deletion opentaxii/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
This module defines the package version for use in __init__.py and setup.py.
"""

__version__ = '0.7.0'
__version__ = '0.8.0'
11 changes: 8 additions & 3 deletions opentaxii/common/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ def sorted_dicts(obj):
"""
sort all dicts contained in obj, for repeatable repr
"""
from opentaxii.taxii2.entities import JobDetails

if isinstance(obj, dict):
response = {}
for key, value in sorted(obj.items()):
value = sorted_dicts(value)
response[key] = value
elif isinstance(obj, JobDetails):
response = type(obj)(*[sorted_dicts(item) for item in obj])
elif isinstance(obj, (list, tuple)):
response = type(obj)(sorted_dicts(item) for item in obj)
else:
Expand All @@ -15,11 +19,12 @@ def sorted_dicts(obj):


class Entity:
'''Abstract TAXII entity class.
'''
"""Abstract TAXII entity class."""

def __repr__(self):
pairs = ["%s=%s" % (k, v) for k, v in sorted(sorted_dicts(self.__dict__).items())]
pairs = [
"%s=%s" % (k, v) for k, v in sorted(sorted_dicts(self.__dict__).items())
]
return "%s(%s)" % (self.__class__.__name__, ", ".join(pairs))

def to_dict(self):
Expand Down
6 changes: 3 additions & 3 deletions opentaxii/persistence/api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import datetime
from typing import Dict, List, Optional, Tuple

from opentaxii.taxii2.entities import (ApiRoot, Collection, Job, JobDetail,
from opentaxii.taxii2.entities import (ApiRoot, Collection, Job,
ManifestRecord, STIXObject,
VersionRecord)

Expand Down Expand Up @@ -297,7 +297,7 @@ def get_api_root(self, api_root_id: str) -> Optional[ApiRoot]:

def get_job_and_details(
self, api_root_id: str, job_id: str
) -> Tuple[Optional[Job], List[JobDetail]]:
) -> Optional[Job]:
raise NotImplementedError

def get_collections(self, api_root_id: str) -> List[Collection]:
Expand Down Expand Up @@ -334,7 +334,7 @@ def get_objects(
) -> Tuple[List[STIXObject], bool, Optional[str]]:
raise NotImplementedError

def add_objects(self, api_root_id: str, collection_id: str, objects: List[Dict]) -> Tuple[Job, List[JobDetail]]:
def add_objects(self, api_root_id: str, collection_id: str, objects: List[Dict]) -> Job:
raise NotImplementedError

def get_object(
Expand Down
39 changes: 8 additions & 31 deletions opentaxii/persistence/manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import datetime
from typing import Dict, List, NamedTuple, Optional, Tuple
from typing import Dict, List, Optional, Tuple

import structlog
from opentaxii.local import context
Expand All @@ -9,7 +9,7 @@
NoWritePermission)
from opentaxii.signals import (CONTENT_BLOCK_CREATED, INBOX_MESSAGE_CREATED,
SUBSCRIPTION_CREATED)
from opentaxii.taxii2.entities import (ApiRoot, Collection, Job, JobDetail,
from opentaxii.taxii2.entities import (ApiRoot, Collection, Job,
ManifestRecord, STIXObject,
VersionRecord)

Expand Down Expand Up @@ -384,13 +384,6 @@ def delete_content_blocks(
return count


class JobDetailsResponse(NamedTuple):
total_count: int
success: List[JobDetail]
failure: List[JobDetail]
pending: List[JobDetail]


class Taxii2PersistenceManager(BasePersistenceManager):
"""Manager responsible for persisting and retrieving data.
Expand Down Expand Up @@ -427,26 +420,11 @@ def get_api_root(self, api_root_id: str) -> ApiRoot:
raise DoesNotExistError()
return api_root

def _get_job_details_response(
self, job_details: List[JobDetail]
) -> JobDetailsResponse:
job_details_response = JobDetailsResponse(
total_count=len(job_details), success=[], failure=[], pending=[]
)
for job_detail in job_details:
getattr(job_details_response, job_detail.status).append(job_detail)
return job_details_response

def get_job_and_details(
self, api_root_id: str, job_id: str
) -> Tuple[Job, JobDetailsResponse]:
job, job_details = self.api.get_job_and_details(
api_root_id=api_root_id, job_id=job_id
)
def get_job_and_details(self, api_root_id: str, job_id: str) -> Job:
job = self.api.get_job_and_details(api_root_id=api_root_id, job_id=job_id)
if job is None:
raise DoesNotExistError()
job_details_response = self._get_job_details_response(job_details)
return (job, job_details_response)
return job

def get_collections(self, api_root_id: str) -> List[Collection]:
return self.api.get_collections(api_root_id=api_root_id)
Expand Down Expand Up @@ -522,19 +500,18 @@ def add_objects(
api_root_id: str,
collection_id_or_alias: str,
data: Dict,
) -> Tuple[Job, JobDetailsResponse]:
) -> Job:
collection = self.get_collection(
api_root_id=api_root_id, collection_id_or_alias=collection_id_or_alias
)
if not collection.can_write(context.account):
raise NoWritePermission()
job, job_details = self.api.add_objects(
job = self.api.add_objects(
api_root_id=api_root_id,
collection_id=collection.id,
objects=data["objects"],
)
job_details_response = self._get_job_details_response(job_details)
return (job, job_details_response)
return job

def get_object(
self,
Expand Down
83 changes: 40 additions & 43 deletions opentaxii/persistence/sqldb/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,9 +574,36 @@ def add_api_root(
is_public=is_public,
)

def _job_and_details_to_entity(
self, job: taxii2models.Job, job_details: List[taxii2models.JobDetail]
) -> entities.Job:
job_entity = entities.Job(
id=job.id,
api_root_id=job.api_root_id,
status=job.status,
request_timestamp=job.request_timestamp,
completed_timestamp=job.completed_timestamp,
total_count=job.total_count,
success_count=job.success_count,
failure_count=job.failure_count,
pending_count=job.pending_count,
)
for job_detail in job_details:
getattr(job_entity.details, job_detail.status).append(
entities.JobDetail(
id=job_detail.id,
job_id=job_detail.job_id,
stix_id=job_detail.stix_id,
version=job_detail.version,
message=job_detail.message,
status=job_detail.status,
)
)
return job_entity

def get_job_and_details(
self, api_root_id: str, job_id: str
) -> Tuple[Optional[entities.Job], List[entities.JobDetail]]:
) -> Optional[entities.Job]:
job = (
self.db.session.query(taxii2models.Job)
.filter(
Expand All @@ -586,7 +613,7 @@ def get_job_and_details(
.one_or_none()
)
if job is None:
return None, []
return None
job_details = (
self.db.session.query(taxii2models.JobDetail)
.filter(
Expand All @@ -595,26 +622,8 @@ def get_job_and_details(
.order_by(taxii2models.JobDetail.stix_id)
.all()
)
return (
entities.Job(
id=job.id,
api_root_id=job.api_root_id,
status=job.status,
request_timestamp=job.request_timestamp,
completed_timestamp=job.completed_timestamp,
),
[
entities.JobDetail(
id=job_detail.id,
job_id=job_detail.job_id,
stix_id=job_detail.stix_id,
version=job_detail.version,
message=job_detail.message,
status=job_detail.status,
)
for job_detail in job_details
],
)
job_entity = self._job_and_details_to_entity(job, job_details)
return job_entity

def job_cleanup(self) -> int:
"""
Expand Down Expand Up @@ -954,11 +963,15 @@ def get_objects(

def add_objects(
self, api_root_id: str, collection_id: str, objects: List[Dict]
) -> Tuple[entities.Job, List[entities.JobDetail]]:
) -> entities.Job:
job = taxii2models.Job(
api_root_id=api_root_id,
status="pending",
request_timestamp=datetime.datetime.now(datetime.timezone.utc),
total_count=0,
success_count=0,
failure_count=0,
pending_count=0,
)
self.db.session.add(job)
self.db.session.commit()
Expand Down Expand Up @@ -1004,29 +1017,13 @@ def add_objects(
)
job_details.append(job_detail)
self.db.session.add(job_detail)
job.total_count += 1
job.success_count += 1
job.status = "complete"
job.completed_timestamp = datetime.datetime.now(datetime.timezone.utc)
self.db.session.commit()
return (
entities.Job(
id=job.id,
api_root_id=job.api_root_id,
status=job.status,
request_timestamp=job.request_timestamp,
completed_timestamp=job.completed_timestamp,
),
[
entities.JobDetail(
id=job_detail.id,
job_id=job_detail.job_id,
stix_id=job_detail.stix_id,
version=job_detail.version,
message=job_detail.message,
status=job_detail.status,
)
for job_detail in job_details
],
)
job_entity = self._job_and_details_to_entity(job, job_details)
return job_entity

def get_object(
self,
Expand Down
8 changes: 7 additions & 1 deletion opentaxii/persistence/sqldb/taxii2models.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class Job(Base):
)
request_timestamp = sqlalchemy.Column(UTCDateTime, nullable=True)
completed_timestamp = sqlalchemy.Column(UTCDateTime, nullable=True)
total_count = sqlalchemy.Column(sqlalchemy.Integer, nullable=True)
success_count = sqlalchemy.Column(sqlalchemy.Integer, nullable=True)
failure_count = sqlalchemy.Column(sqlalchemy.Integer, nullable=True)
pending_count = sqlalchemy.Column(sqlalchemy.Integer, nullable=True)

details = relationship("JobDetail", back_populates="job")

Expand All @@ -80,7 +84,9 @@ def cleanup(cls, session: sqlalchemy.orm.Session) -> int:
@classmethod
def from_entity(cls, entity: entities.Job):
"""Generate database model from input entity."""
return cls(**entity.to_dict())
kwargs = entity.to_dict()
kwargs.pop("details")
return cls(**kwargs)


class JobDetail(Base):
Expand Down
46 changes: 7 additions & 39 deletions opentaxii/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,32 +506,17 @@ def api_root_handler(self, api_root_id):
@register_handler(r"^/taxii2/(?P<api_root_id>[^/]+)/status/(?P<job_id>[^/]+)/$")
def job_handler(self, api_root_id, job_id):
try:
job, job_details = self.persistence.get_job_and_details(
job = self.persistence.get_job_and_details(
api_root_id=api_root_id, job_id=job_id
)
except DoesNotExistError:
raise NotFound()
response = {
"id": job.id,
"status": job.status,
"request_timestamp": taxii2_datetimeformat(job.request_timestamp),
"total_count": job_details.total_count,
"success_count": len(job_details.success),
"successes": [
job_detail.as_taxii2_dict() for job_detail in job_details.success
],
"failure_count": len(job_details.failure),
"failures": [
job_detail.as_taxii2_dict() for job_detail in job_details.failure
],
"pending_count": len(job_details.pending),
"pendings": [
job_detail.as_taxii2_dict() for job_detail in job_details.pending
],
}
response = job.as_taxii2_dict()
return make_taxii2_response(response)

@register_handler(r"^/taxii2/(?P<api_root_id>[^/]+)/collections/$", handles_own_auth=True)
@register_handler(
r"^/taxii2/(?P<api_root_id>[^/]+)/collections/$", handles_own_auth=True
)
def collections_handler(self, api_root_id):
try:
api_root = self.persistence.get_api_root(api_root_id=api_root_id)
Expand Down Expand Up @@ -691,7 +676,7 @@ def objects_get_handler(self, api_root_id, collection_id_or_alias):
def objects_post_handler(self, api_root_id, collection_id_or_alias):
validate_envelope(request.data)
try:
job, job_details = self.persistence.add_objects(
job = self.persistence.add_objects(
api_root_id=api_root_id,
collection_id_or_alias=collection_id_or_alias,
data=request.get_json(),
Expand All @@ -700,24 +685,7 @@ def objects_post_handler(self, api_root_id, collection_id_or_alias):
if context.account is None:
raise Unauthorized()
raise NotFound()
response = {
"id": job.id,
"status": job.status,
"request_timestamp": taxii2_datetimeformat(job.request_timestamp),
"total_count": job_details.total_count,
"success_count": len(job_details.success),
"successes": [
job_detail.as_taxii2_dict() for job_detail in job_details.success
],
"failure_count": len(job_details.failure),
"failures": [
job_detail.as_taxii2_dict() for job_detail in job_details.failure
],
"pending_count": len(job_details.pending),
"pendings": [
job_detail.as_taxii2_dict() for job_detail in job_details.pending
],
}
response = job.as_taxii2_dict()
headers = {}
return make_taxii2_response(
response,
Expand Down

0 comments on commit e0da85c

Please sign in to comment.