-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Module structure: - Detached jfpv1 logic into a separate _jfpv1 module - Detached basic validators into a separate _validators module Tests: - Simplified test runner execution with test module loading via the parent __init__.py - Extended and clarified docstrings for several tests - Split existing and new tests into a per-module type structure - Added more detailed tests (positive-negative testing) for exceptions - Extended 'manual' jfpv1 testing, from raw flattened json output to actual fingerprint CI: - Updated test runner execution
- Loading branch information
Showing
10 changed files
with
387 additions
and
308 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
import hashlib | ||
import json | ||
|
||
from typing import ( | ||
Any, | ||
Dict, | ||
List, | ||
) | ||
|
||
|
||
def _create_json_sha256_hash(data: Any) -> str: | ||
"""Create an sha256 hash from json-converted data, sorted by key names.""" | ||
stringified = json.dumps(data, sort_keys=True) | ||
m = hashlib.sha256() | ||
m.update(stringified.encode('utf-8')) | ||
return m.hexdigest() | ||
|
||
|
||
def _create_sorted_sha256_hash_list(data: Dict) -> List[Dict]: | ||
"""Create a sorted sha256 hash list.""" | ||
out = [] | ||
for obj in data: | ||
hash = _create_json_sha256_hash(obj) | ||
out.append(hash) | ||
out.sort() | ||
return out | ||
|
||
|
||
def _build_path(key: str, base_path: str): | ||
"""Build a path string.""" | ||
if base_path: | ||
return f'{base_path}|{key}' | ||
return key | ||
|
||
|
||
def _build_element(path: str, siblings: str, value: Any): | ||
"""Build an element dictionary based on presence of sibling data.""" | ||
if siblings: | ||
return { | ||
'path': path, | ||
'siblings': siblings, | ||
'value': value, | ||
} | ||
|
||
return { | ||
'path': path, | ||
'value': value, | ||
} | ||
|
||
|
||
def _flatten_json(data: Dict, path: str = '', siblings: List = [], debug: bool = False) -> List: | ||
"""Flatten json data structures into a sibling-aware data element list.""" | ||
out = [] | ||
if type(data) is dict: | ||
for key in data.keys(): | ||
p = _build_path(key=f'{{{key}}}', base_path=path) | ||
output = _flatten_json(data=data[key], path=p, siblings=siblings, debug=debug) | ||
out.extend(output) | ||
return out | ||
|
||
if type(data) is list: | ||
p = _build_path(key=f'[{len(data)}]', base_path=path) | ||
|
||
# Iterate and collect sibling structures, which'll be then attached to each sibling element | ||
siblings = [] | ||
for item in data: | ||
output = _flatten_json(data=item, path=p, debug=debug) | ||
siblings.extend(output) | ||
|
||
# Debug mode, which allows non-hashed sibling structures to be inspected and tested against | ||
if not debug: | ||
siblings = _create_sorted_sha256_hash_list(siblings) | ||
siblings = _create_json_sha256_hash(siblings) | ||
|
||
# Recurse with each value in list to typecheck it and eventually get the element value | ||
for item in data: | ||
output = _flatten_json(data=item, path=p, siblings=siblings, debug=debug) | ||
out.extend(output) | ||
return out | ||
|
||
element = _build_element(path=path, siblings=siblings, value=data) | ||
out.append(element) | ||
return out | ||
|
||
|
||
def _create_jfpv1_fingerprint(data: Any, hash_function: str, version: int): | ||
"""Create a jfpv1 fingerprint.""" | ||
flattened_json = _flatten_json(data=data) | ||
sorted_hash_list = _create_sorted_sha256_hash_list(data=flattened_json) | ||
hex_digest = _create_json_sha256_hash(data=sorted_hash_list) | ||
return f'jfpv1${hash_function}${hex_digest}' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
JFPV1_HASH_FUNCTIONS = ( | ||
'sha256', | ||
) | ||
|
||
JSON_FINGERPRINT_VERSIONS = ( | ||
1, | ||
) | ||
|
||
|
||
class FingerprintHashFunctionError(Exception): | ||
pass | ||
|
||
|
||
class FingerprintInputDataTypeError(Exception): | ||
pass | ||
|
||
|
||
class FingerprintVersionError(Exception): | ||
pass | ||
|
||
|
||
def _validate_hash_function(hash_function: str, version: int): | ||
if hash_function not in JFPV1_HASH_FUNCTIONS: | ||
err = (f'Expected one of supported hash functions \'{JFPV1_HASH_FUNCTIONS}\', ' | ||
f'instead got \'{hash_function}\'') | ||
raise FingerprintHashFunctionError(err) | ||
|
||
|
||
def _validate_input_type(input: str): | ||
if type(input) is not str: | ||
err = f'Expected data type \'{type("")}\' (JSON in string format), instead got \'{type(input)}\'' | ||
raise FingerprintInputDataTypeError(err) | ||
|
||
|
||
def _validate_version(version: int): | ||
if version not in JSON_FINGERPRINT_VERSIONS: | ||
err = (f'Expected one of supported JSON fingerprint versions \'{JSON_FINGERPRINT_VERSIONS}\', ' | ||
f'instead got \'{version}\'') | ||
raise FingerprintVersionError(err) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,133 +1,27 @@ | ||
import hashlib | ||
import json | ||
|
||
from typing import ( | ||
Any, | ||
Dict, | ||
List, | ||
) | ||
|
||
JFPV1_HASH_FUNCTIONS = ( | ||
'sha256', | ||
) | ||
|
||
JSON_FINGERPRINT_VERSIONS = ( | ||
1, | ||
from ._jfpv1 import _create_jfpv1_fingerprint | ||
from ._validators import ( | ||
_validate_hash_function, | ||
_validate_input_type, | ||
_validate_version, | ||
) | ||
|
||
|
||
class FingerprintJSONLoadError(Exception): | ||
pass | ||
|
||
|
||
class FingerprintInputDataTypeError(Exception): | ||
pass | ||
|
||
|
||
class FingerprintHashFunctionError(Exception): | ||
pass | ||
|
||
|
||
class FingerprintVersionError(Exception): | ||
pass | ||
|
||
|
||
def _create_json_sha256_hash(data) -> str: | ||
"""Create an sha256 hash from json-converted data, sorted by key names.""" | ||
stringified = json.dumps(data, sort_keys=True) | ||
m = hashlib.sha256() | ||
m.update(stringified.encode('utf-8')) | ||
return m.hexdigest() | ||
|
||
|
||
def _create_sorted_sha256_hash_list(data: Dict) -> List[Dict]: | ||
"""Create a sorted sha256 hash list.""" | ||
out = [] | ||
for obj in data: | ||
hash = _create_json_sha256_hash(obj) | ||
out.append(hash) | ||
out.sort() | ||
return out | ||
|
||
|
||
def _build_path(key: str, base_path: str): | ||
if base_path: | ||
return f'{base_path}|{key}' | ||
return key | ||
|
||
|
||
def _build_element(path: str, siblings: List, value: Any): | ||
if siblings: | ||
return { | ||
'path': path, | ||
'siblings': siblings, | ||
'value': value, | ||
} | ||
|
||
return { | ||
'path': path, | ||
'value': value, | ||
} | ||
|
||
|
||
def _flatten_json(data: Dict, path: str = '', siblings: List = [], debug: bool = False) -> List: | ||
"""Flatten json data structures into a sibling-aware data element list.""" | ||
out = [] | ||
if type(data) is dict: | ||
for key in data.keys(): | ||
p = _build_path(key=f'{{{key}}}', base_path=path) | ||
output = _flatten_json(data=data[key], path=p, siblings=siblings, debug=debug) | ||
out.extend(output) | ||
return out | ||
|
||
if type(data) is list: | ||
p = _build_path(key=f'[{len(data)}]', base_path=path) | ||
|
||
# Iterate and collect sibling structures, which'll be then attached to each sibling element | ||
siblings = [] | ||
for item in data: | ||
output = _flatten_json(data=item, path=p, debug=debug) | ||
siblings.extend(output) | ||
|
||
# Debug mode, which allows non-hashed sibling structures to be inspected and tested against | ||
if not debug: | ||
siblings = _create_sorted_sha256_hash_list(siblings) | ||
siblings = _create_json_sha256_hash(siblings) | ||
|
||
# Recurse with each value in list to typecheck it and eventually get the element value | ||
for item in data: | ||
output = _flatten_json(data=item, path=p, siblings=siblings, debug=debug) | ||
out.extend(output) | ||
return out | ||
|
||
element = _build_element(path=path, siblings=siblings, value=data) | ||
out.append(element) | ||
return out | ||
|
||
|
||
def json_fingerprint(input: str, hash_function: str, version: str) -> str: | ||
def json_fingerprint(input: str, hash_function: str, version: int) -> str: | ||
"""Create json fingerprints with the selected hash function and jfp version.""" | ||
if type(input) is not str: | ||
err = f'Expected data type \'{type("")}\' (JSON in string format), instead got \'{type(input)}\'' | ||
raise FingerprintInputDataTypeError(err) | ||
|
||
if hash_function not in JFPV1_HASH_FUNCTIONS: | ||
err = (f'Expected one of supported hash functions \'{JFPV1_HASH_FUNCTIONS}\', ' | ||
f'instead got \'{hash_function}\'') | ||
raise FingerprintHashFunctionError(err) | ||
|
||
if version not in JSON_FINGERPRINT_VERSIONS: | ||
err = (f'Expected one of supported JSON fingerprint versions \'{JSON_FINGERPRINT_VERSIONS}\', ' | ||
f'instead got \'{version}\'') | ||
raise FingerprintVersionError(err) | ||
_validate_version(version=version) | ||
_validate_input_type(input=input) | ||
_validate_hash_function(hash_function=hash_function, version=version) | ||
|
||
try: | ||
loaded = json.loads(input) | ||
except Exception: | ||
err = 'Unable to load JSON' | ||
raise FingerprintJSONLoadError(err) from None | ||
|
||
flattened_json = _flatten_json(data=loaded) | ||
sorted_hash_list = _create_sorted_sha256_hash_list(data=flattened_json) | ||
hex_digest = _create_json_sha256_hash(sorted_hash_list) | ||
return f'jfpv1${hash_function}${hex_digest}' | ||
return _create_jfpv1_fingerprint(data=loaded, hash_function=hash_function, version=version) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from .test_json_fingerprint import TestJsonFingerprint | ||
from .test_jfpv1 import TestJfpv1 | ||
from .test_validators import TestValidators |
Oops, something went wrong.