This repository has been archived by the owner on Jul 15, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
6b5cbb9
commit 6effee0
Showing
4 changed files
with
291 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
import json | ||
from pystackpath.util import BaseObject, api_time_format | ||
from datetime import datetime as dt | ||
from datetime import timedelta | ||
|
||
|
||
ACCEPTED_ACTIONS = ( | ||
'ANY_ACTION', 'ALLOW_ACTION', | ||
'BLOCK_ACTION', 'CAPTCHA_ACTION', | ||
'HANDSHAKE_ACTION', 'MONITOR_ACTION' | ||
) | ||
ACCEPTED_RESULTS = ('ANY_RESULT', 'BLOCKED_RESULT') | ||
ACCEPTED_SORT_BY = ('TIMESTAMP', 'COUNTRY', 'RULE_NAME') | ||
ACCEPTED_SORT_ORDER = ('ASCENDING', 'DESCENDING') | ||
DEFAULT_DELTA_TIME = 1 # day | ||
|
||
|
||
class Events(BaseObject): | ||
|
||
def get(self, event_id): | ||
""" | ||
Get an Event by its ID | ||
""" | ||
response = self._client.get(f"{self._base_api}/events/{event_id}") | ||
return self.loaddict(response.json()) | ||
|
||
def index( | ||
self, | ||
page_request_first=None, | ||
page_request_after=None, | ||
page_request_filter=None, | ||
page_request_sort_by=None, | ||
start_date=None, | ||
end_date=None, | ||
filter_action_value=ACCEPTED_ACTIONS[0], | ||
filter_result_value=ACCEPTED_RESULTS[0], | ||
filter_client_ip=None, | ||
filter_reference_id=None, | ||
sort_by=ACCEPTED_SORT_BY[0], | ||
sort_order=ACCEPTED_SORT_ORDER[0] | ||
): | ||
""" | ||
Get all the Events. | ||
You can use the parameters to add options to your request. | ||
""" | ||
|
||
params = Events._common_params(start_date, end_date, filter_action_value, \ | ||
filter_result_value, filter_client_ip, filter_reference_id) | ||
|
||
if page_request_first: | ||
params['page_request.first'] = page_request_first | ||
|
||
if page_request_after: | ||
params['page_request.after'] = page_request_after | ||
|
||
if page_request_filter: | ||
params['page_request.filter'] = page_request_filter | ||
|
||
if page_request_sort_by: | ||
params['page_request.sort_by'] = page_request_sort_by | ||
|
||
if sort_by not in ACCEPTED_SORT_BY: | ||
raise ValueError(f"{sort_by} is not a valid sort type: {ACCEPTED_SORT_BY}") | ||
params['sort_by'] = sort_by | ||
|
||
if sort_order not in ACCEPTED_SORT_ORDER: | ||
raise ValueError(f"{sort_order} is not a valid sort type: {ACCEPTED_SORT_ORDER}") | ||
params['sort_order'] = sort_order | ||
|
||
response = self._client.get(f"{self._base_api}/events", params=params) | ||
return self.loaddict(response.json()) | ||
|
||
|
||
def get_event_statistics( | ||
self, | ||
start_date=None, | ||
end_date=None, | ||
filter_action_value=ACCEPTED_ACTIONS[0], | ||
filter_result_value=ACCEPTED_RESULTS[0], | ||
filter_client_ip=None, | ||
filter_reference_id=None | ||
): | ||
""" | ||
Get WAF Event statistics | ||
You can use the parameters to add options to your request. | ||
""" | ||
params = Events._common_params(start_date, end_date, filter_action_value, \ | ||
filter_result_value, filter_client_ip, filter_reference_id) | ||
response = self._client.get(f"{self._base_api}/event_stats", params=params) | ||
return self.loaddict(response.json()) | ||
|
||
|
||
@staticmethod | ||
def _common_params( | ||
start_date, | ||
end_date, | ||
filter_action_value, | ||
filter_result_value, | ||
filter_client_ip, | ||
filter_reference_id | ||
): | ||
params = dict() | ||
|
||
if end_date is None: | ||
end_date = dt.today() | ||
if start_date is None: | ||
start_date = end_date - timedelta(days=DEFAULT_DELTA_TIME) | ||
|
||
end_date_iso = api_time_format(end_date) | ||
start_date_iso = api_time_format(start_date) | ||
|
||
if start_date_iso > end_date_iso: | ||
raise ValueError(f"Search start date, \"{start_date_iso}\", is later than end date, \"{end_date_iso}\"!") | ||
|
||
params['start_date'] = start_date_iso | ||
params['end_date'] = end_date_iso | ||
|
||
if filter_action_value not in ACCEPTED_ACTIONS: | ||
raise ValueError(f"{filter_action_value} is not a valid action filter: {ACCEPTED_ACTIONS}") | ||
params['filter.action_value'] = filter_action_value | ||
|
||
if filter_result_value not in ACCEPTED_RESULTS: | ||
raise ValueError(f"{filter_result_value} is not a valid action filter: {ACCEPTED_RESULTS}") | ||
params['filter.result_value'] = filter_result_value | ||
|
||
if filter_client_ip: | ||
params['filter.client_ip'] = filter_client_ip | ||
|
||
if filter_reference_id: | ||
params['filter.reference_id'] = filter_reference_id | ||
|
||
return params |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
import json | ||
from pystackpath.util import BaseObject, api_time_format | ||
from datetime import datetime as dt | ||
from datetime import timedelta | ||
|
||
|
||
ACCEPTED_GRANULARITY = ('AUTO', 'PT5M', 'PT1H', 'P1D', 'P1M') | ||
ACCEPTED_GROUP_BY = ( | ||
'NONE', 'SITE', 'PLATFORM', 'POP', | ||
'REGION', 'STATUS', 'STATUS_CATEGORY' | ||
) | ||
ACCEPTED_METRIC_TYPE = ('TRANSFER', 'STATUS_CODE') | ||
DEFAULT_DELTA_TIME = 1 # day | ||
|
||
|
||
class Metrics(BaseObject): | ||
|
||
def index( | ||
self, | ||
start_date=None, | ||
end_date=None, | ||
granularity=ACCEPTED_GRANULARITY[0], | ||
group_by=ACCEPTED_GROUP_BY[0], | ||
status_category=[], | ||
status_code=[], | ||
sites=[], | ||
billing_regions=[], | ||
pops=[], | ||
platforms=[], | ||
site_type_filter=[], | ||
metric_type=ACCEPTED_METRIC_TYPE[0] | ||
): | ||
""" | ||
Get all the Metrics. | ||
You can use the parameters to add options to your request. | ||
""" | ||
|
||
params = dict() | ||
|
||
if end_date is None: | ||
end_date = dt.today() | ||
if start_date is None: | ||
start_date = end_date - timedelta(days=DEFAULT_DELTA_TIME) | ||
|
||
end_date_iso = api_time_format(end_date) | ||
start_date_iso = api_time_format(start_date) | ||
|
||
if start_date_iso > end_date_iso: | ||
raise ValueError(f"Search start date, \"{start_date_iso}\", is later than end date, \"{end_date_iso}\"!") | ||
|
||
params['start_date'] = start_date_iso | ||
params['end_date'] = end_date_iso | ||
|
||
if granularity not in ACCEPTED_GRANULARITY: | ||
raise ValueError(f"{granularity} is not a valid granularity value: {ACCEPTED_GRANULARITY}") | ||
params['granularity'] = granularity | ||
|
||
if group_by not in ACCEPTED_GROUP_BY: | ||
raise ValueError(f"{group_by} is not a valid group_by value: {ACCEPTED_GROUP_BY}") | ||
params['group_by'] = group_by | ||
|
||
if metric_type not in ACCEPTED_METRIC_TYPE: | ||
raise ValueError(f"{metric_type} is not a valid metric type: {ACCEPTED_METRIC_TYPE}") | ||
params['metric_type'] = metric_type | ||
|
||
if status_category: | ||
params['status_category'] = Metrics._list_to_string(status_category) | ||
|
||
if status_code: | ||
params['status_code'] = Metrics._list_to_string(status_code) | ||
|
||
if sites: | ||
params['sites'] = Metrics._list_to_string(sites) | ||
|
||
if billing_regions: | ||
params['billing_regions'] = Metrics._list_to_string(billing_regions) | ||
|
||
if pops: | ||
params['pops'] = Metrics._list_to_string(pops) | ||
|
||
if platforms: | ||
params['platforms'] = Metrics._list_to_string(platforms) | ||
|
||
if site_type_filter: | ||
params['site_type_filter'] = Metrics._list_to_string(site_type_filter) | ||
|
||
response = self._client.get(f"{self._base_api}/metrics", params=params) | ||
return self.loaddict(response.json()) | ||
|
||
|
||
@staticmethod | ||
def _list_to_string(items, separator=','): | ||
if not isinstance(items, list): | ||
raise ValueError(f"{items} is not a list") | ||
|
||
try: | ||
return separator.join(items) | ||
except Exception as e: | ||
raise ValueError(f"{items} is not valid data: {e}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
import json | ||
from pystackpath.util import BaseObject, api_time_format | ||
from datetime import datetime as dt | ||
from datetime import timedelta | ||
|
||
|
||
ACCEPTED_RESOLUTIONS = ('HOURLY', 'MINUTELY') | ||
DEFAULT_DELTA_TIME = 1 # day | ||
|
||
|
||
class Traffic(BaseObject): | ||
|
||
def get( | ||
self, | ||
site_id=None, | ||
start_date=None, | ||
end_date=None, | ||
resolution=ACCEPTED_RESOLUTIONS[0] | ||
): | ||
""" | ||
Get Waf Traffic | ||
""" | ||
|
||
params = dict() | ||
|
||
if site_id is None: | ||
params['site_id'] = site_id | ||
|
||
if end_date is None: | ||
end_date = dt.today() | ||
if start_date is None: | ||
start_date = end_date - timedelta(days=DEFAULT_DELTA_TIME) | ||
|
||
end_date_iso = api_time_format(end_date) | ||
start_date_iso = api_time_format(start_date) | ||
|
||
if start_date_iso > end_date_iso: | ||
raise ValueError(f"Search start date, \"{start_date_iso}\", is later than end date, \"{end_date_iso}\"!") | ||
|
||
params['start_date'] = start_date_iso | ||
params['end_date'] = end_date_iso | ||
|
||
if resolution not in ACCEPTED_RESOLUTIONS: | ||
raise ValueError(f"{resolution} is not a valid resolution: {ACCEPTED_RESOLUTIONS}") | ||
params['resolution'] = resolution | ||
|
||
response = self._client.get(f"{self._base_api}/traffic", params=params) | ||
return self.loaddict(response.json()) |