Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added castle/api/__init__.py
Empty file.
Empty file added castle/client_id/__init__.py
Empty file.
Empty file added castle/commands/__init__.py
Empty file.
Empty file added castle/context/__init__.py
Empty file.
Empty file added castle/core/__init__.py
Empty file.
3 changes: 2 additions & 1 deletion castle/core/process_response.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from castle.errors import BadRequestError, UnauthorizedError, ForbiddenError, NotFoundError, \
UserUnauthorizedError, InvalidParametersError, APIError, InternalServerError

from castle.logger import Logger

RESPONSE_ERRORS = {
400: BadRequestError,
Expand All @@ -22,6 +22,7 @@ def call(self):

self.verify()

Logger.call("response:", self.response.text)
return self.response.json()

def verify(self):
Expand Down
19 changes: 19 additions & 0 deletions castle/core/process_webhook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from castle.errors import APIError
from castle.logger import Logger


class CoreProcessWebhook(object):
def __init__(self, webhook):
self.webhook = webhook

def call(self):
self.verify()

Logger.call("webhook:", self.webhook.data)
return self.webhook.data

def verify(self):
if self.webhook.data is not None and len(self.webhook.data) != 0:
return

raise APIError("Invalid webhook from Castle API")
4 changes: 4 additions & 0 deletions castle/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ class APIError(CastleError):
pass


class WebhookVerificationError(CastleError):
pass


class InvalidParametersError(APIError):
pass

Expand Down
Empty file added castle/failover/__init__.py
Empty file.
Empty file added castle/headers/__init__.py
Empty file.
Empty file added castle/ip/__init__.py
Empty file.
Empty file added castle/payload/__init__.py
Empty file.
2 changes: 2 additions & 0 deletions castle/test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
'castle.test.context.prepare_test',
'castle.test.context.sanitize_test',
'castle.test.core.process_response_test',
'castle.test.core.process_webhook_test',
'castle.test.core.send_request_test',
'castle.test.failover.prepare_response_test',
'castle.test.failover.strategy_test',
Expand All @@ -47,6 +48,7 @@
'castle.test.validators.present_test',
'castle.test.verdict_test',
'castle.test.payload.prepare_test',
'castle.test.webhooks.verify_test',
]

# pylint: disable=redefined-builtin
Expand Down
45 changes: 45 additions & 0 deletions castle/test/core/process_webhook_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import requests

from castle.test import unittest
from castle.core.process_webhook import CoreProcessWebhook
from castle.errors import APIError


def webhook(data=None):
req = requests.Request()
req.data = None if data is None else bytes(data)
return req


class CoreProcessResponseTestCase(unittest.TestCase):
def test_webhook_none(self):
with self.assertRaises(APIError):
CoreProcessWebhook(webhook()).call()

def test_webhook_empty(self):
with self.assertRaises(APIError):
CoreProcessWebhook(webhook(data=b'')).call()

def test_webhook_valid(self):
data_stream = str({
'type': '$incident.confirmed',
'created_at': '2020-12-18T12:55:21.779Z',
'data': {
'id': 'test',
'device_token': 'token',
'user_id': '',
'trigger': '$login.succeeded',
'context': {},
'location': {},
'user_agent': {}
},
'user_traits': {},
'properties': {},
'policy': {}
}).encode('utf-8')

self.assertEqual(
CoreProcessWebhook(
webhook(data=data_stream)).call(),
data_stream
)
24 changes: 24 additions & 0 deletions castle/test/webhooks/verify_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import requests

from castle.test import unittest
from castle.webhooks.verify import WebhooksVerify
from castle.errors import WebhookVerificationError


def webhook(signature):
req = requests.Request(headers={"X-Castle-Signature": signature})
str_dict = "{'user_traits': {}, 'policy': {}, 'data': {'location': {}, 'id': 'test', 'user_agent': {}, 'user_id': '', 'device_token': 'token', 'context': {}, 'trigger': '$login.succeeded'}, 'created_at': '2020-12-18T12:55:21.779Z', 'type': '$incident.confirmed', 'properties': {}}"
req.data = bytes(str_dict.encode('utf-8'))
return req


class WebhooksVerifyTestCase(unittest.TestCase):
def test_webhook_malformed(self):
with self.assertRaises(WebhookVerificationError):
WebhooksVerify().call(webhook("123"))

def test_webhook_valid(self):
self.assertEqual(
WebhooksVerify().call(webhook("v61Bn6ItuClDcRqrr6++csm2Ub3Jfyos4BMR3PslhBY=")),
None
)
Empty file added castle/utils/__init__.py
Empty file.
23 changes: 23 additions & 0 deletions castle/utils/secure_compare.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import sys


class UtilsSecureCompare(object):

@staticmethod
def call(str_a, str_b):
"""
Compare two strings securely

:param str_a: First string to be compared
:param str_b: Second string to be compared
"""
if not sys.getsizeof(str_a) == sys.getsizeof(str_b):
return False

comp_a = [int(str_a_char) for str_a_char in bytes(str_a.encode('utf-8'))]

res = 0
for str_b_char in bytes(str_b.encode('utf-8')):
res |= str_b_char ^ comp_a.pop(0)

return res == 0
Empty file added castle/webhooks/__init__.py
Empty file.
33 changes: 33 additions & 0 deletions castle/webhooks/verify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import base64
import hashlib
import binascii
import hmac

from castle.configuration import configuration
from castle.core.process_webhook import CoreProcessWebhook
from castle.errors import WebhookVerificationError
from castle.utils.secure_compare import UtilsSecureCompare


class WebhooksVerify(object):
@classmethod
def call(cls, webhook):
expected_signature = cls._compute_signature(webhook)
signature = webhook.headers.get('X-Castle-Signature')
return cls._verify_signature(signature, expected_signature)

@staticmethod
def _compute_signature(webhook):
encoded_str = hmac.new(
bytes(configuration.api_secret.encode('utf-8')),
CoreProcessWebhook(webhook).call(),
hashlib.sha256
).hexdigest()
return base64.b64encode(binascii.unhexlify(encoded_str)).decode('utf-8')

@staticmethod
def _verify_signature(signature, expected_signature):
if UtilsSecureCompare.call(signature, expected_signature):
return

raise WebhookVerificationError("Invalid webhook from Castle API")