Skip to content

Commit

Permalink
Refactor modules and tests #minor
Browse files Browse the repository at this point in the history
Module structure:
 - Detached jfpv1 logic into a separate _jfpv1 module
 - Detached basic validators into a separate _validators module

Tests:
 - Simplified test runner execution with test module loading via
 the parent __init__.py
 - Extended and clarified docstrings for several tests
 - Split existing and new tests into a per-module type structure
 - Added more detailed tests (positive-negative testing) for exceptions
 - Extended 'manual' jfpv1 testing, from raw flattened json output to
 actual fingerprint

CI:
 - Updated test runner execution
  • Loading branch information
cobaltine committed Jan 1, 2021
1 parent 5445bef commit 130be43
Show file tree
Hide file tree
Showing 10 changed files with 387 additions and 308 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ jobs:
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
coverage run -m unittest discover -s json_fingerprint -p "test_*.py"
coverage run -m unittest json_fingerprint.tests
coveralls
91 changes: 91 additions & 0 deletions json_fingerprint/_jfpv1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import hashlib
import json

from typing import (
Any,
Dict,
List,
)


def _create_json_sha256_hash(data: Any) -> str:
"""Create an sha256 hash from json-converted data, sorted by key names."""
stringified = json.dumps(data, sort_keys=True)
m = hashlib.sha256()
m.update(stringified.encode('utf-8'))
return m.hexdigest()


def _create_sorted_sha256_hash_list(data: Dict) -> List[Dict]:
"""Create a sorted sha256 hash list."""
out = []
for obj in data:
hash = _create_json_sha256_hash(obj)
out.append(hash)
out.sort()
return out


def _build_path(key: str, base_path: str):
"""Build a path string."""
if base_path:
return f'{base_path}|{key}'
return key


def _build_element(path: str, siblings: str, value: Any):
"""Build an element dictionary based on presence of sibling data."""
if siblings:
return {
'path': path,
'siblings': siblings,
'value': value,
}

return {
'path': path,
'value': value,
}


def _flatten_json(data: Dict, path: str = '', siblings: List = [], debug: bool = False) -> List:
"""Flatten json data structures into a sibling-aware data element list."""
out = []
if type(data) is dict:
for key in data.keys():
p = _build_path(key=f'{{{key}}}', base_path=path)
output = _flatten_json(data=data[key], path=p, siblings=siblings, debug=debug)
out.extend(output)
return out

if type(data) is list:
p = _build_path(key=f'[{len(data)}]', base_path=path)

# Iterate and collect sibling structures, which'll be then attached to each sibling element
siblings = []
for item in data:
output = _flatten_json(data=item, path=p, debug=debug)
siblings.extend(output)

# Debug mode, which allows non-hashed sibling structures to be inspected and tested against
if not debug:
siblings = _create_sorted_sha256_hash_list(siblings)
siblings = _create_json_sha256_hash(siblings)

# Recurse with each value in list to typecheck it and eventually get the element value
for item in data:
output = _flatten_json(data=item, path=p, siblings=siblings, debug=debug)
out.extend(output)
return out

element = _build_element(path=path, siblings=siblings, value=data)
out.append(element)
return out


def _create_jfpv1_fingerprint(data: Any, hash_function: str, version: int):
"""Create a jfpv1 fingerprint."""
flattened_json = _flatten_json(data=data)
sorted_hash_list = _create_sorted_sha256_hash_list(data=flattened_json)
hex_digest = _create_json_sha256_hash(data=sorted_hash_list)
return f'jfpv1${hash_function}${hex_digest}'
39 changes: 39 additions & 0 deletions json_fingerprint/_validators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
JFPV1_HASH_FUNCTIONS = (
'sha256',
)

JSON_FINGERPRINT_VERSIONS = (
1,
)


class FingerprintHashFunctionError(Exception):
pass


class FingerprintInputDataTypeError(Exception):
pass


class FingerprintVersionError(Exception):
pass


def _validate_hash_function(hash_function: str, version: int):
if hash_function not in JFPV1_HASH_FUNCTIONS:
err = (f'Expected one of supported hash functions \'{JFPV1_HASH_FUNCTIONS}\', '
f'instead got \'{hash_function}\'')
raise FingerprintHashFunctionError(err)


def _validate_input_type(input: str):
if type(input) is not str:
err = f'Expected data type \'{type("")}\' (JSON in string format), instead got \'{type(input)}\''
raise FingerprintInputDataTypeError(err)


def _validate_version(version: int):
if version not in JSON_FINGERPRINT_VERSIONS:
err = (f'Expected one of supported JSON fingerprint versions \'{JSON_FINGERPRINT_VERSIONS}\', '
f'instead got \'{version}\'')
raise FingerprintVersionError(err)
126 changes: 10 additions & 116 deletions json_fingerprint/json_fingerprint.py
Original file line number Diff line number Diff line change
@@ -1,133 +1,27 @@
import hashlib
import json

from typing import (
Any,
Dict,
List,
)

JFPV1_HASH_FUNCTIONS = (
'sha256',
)

JSON_FINGERPRINT_VERSIONS = (
1,
from ._jfpv1 import _create_jfpv1_fingerprint
from ._validators import (
_validate_hash_function,
_validate_input_type,
_validate_version,
)


class FingerprintJSONLoadError(Exception):
pass


class FingerprintInputDataTypeError(Exception):
pass


class FingerprintHashFunctionError(Exception):
pass


class FingerprintVersionError(Exception):
pass


def _create_json_sha256_hash(data) -> str:
"""Create an sha256 hash from json-converted data, sorted by key names."""
stringified = json.dumps(data, sort_keys=True)
m = hashlib.sha256()
m.update(stringified.encode('utf-8'))
return m.hexdigest()


def _create_sorted_sha256_hash_list(data: Dict) -> List[Dict]:
"""Create a sorted sha256 hash list."""
out = []
for obj in data:
hash = _create_json_sha256_hash(obj)
out.append(hash)
out.sort()
return out


def _build_path(key: str, base_path: str):
if base_path:
return f'{base_path}|{key}'
return key


def _build_element(path: str, siblings: List, value: Any):
if siblings:
return {
'path': path,
'siblings': siblings,
'value': value,
}

return {
'path': path,
'value': value,
}


def _flatten_json(data: Dict, path: str = '', siblings: List = [], debug: bool = False) -> List:
"""Flatten json data structures into a sibling-aware data element list."""
out = []
if type(data) is dict:
for key in data.keys():
p = _build_path(key=f'{{{key}}}', base_path=path)
output = _flatten_json(data=data[key], path=p, siblings=siblings, debug=debug)
out.extend(output)
return out

if type(data) is list:
p = _build_path(key=f'[{len(data)}]', base_path=path)

# Iterate and collect sibling structures, which'll be then attached to each sibling element
siblings = []
for item in data:
output = _flatten_json(data=item, path=p, debug=debug)
siblings.extend(output)

# Debug mode, which allows non-hashed sibling structures to be inspected and tested against
if not debug:
siblings = _create_sorted_sha256_hash_list(siblings)
siblings = _create_json_sha256_hash(siblings)

# Recurse with each value in list to typecheck it and eventually get the element value
for item in data:
output = _flatten_json(data=item, path=p, siblings=siblings, debug=debug)
out.extend(output)
return out

element = _build_element(path=path, siblings=siblings, value=data)
out.append(element)
return out


def json_fingerprint(input: str, hash_function: str, version: str) -> str:
def json_fingerprint(input: str, hash_function: str, version: int) -> str:
"""Create json fingerprints with the selected hash function and jfp version."""
if type(input) is not str:
err = f'Expected data type \'{type("")}\' (JSON in string format), instead got \'{type(input)}\''
raise FingerprintInputDataTypeError(err)

if hash_function not in JFPV1_HASH_FUNCTIONS:
err = (f'Expected one of supported hash functions \'{JFPV1_HASH_FUNCTIONS}\', '
f'instead got \'{hash_function}\'')
raise FingerprintHashFunctionError(err)

if version not in JSON_FINGERPRINT_VERSIONS:
err = (f'Expected one of supported JSON fingerprint versions \'{JSON_FINGERPRINT_VERSIONS}\', '
f'instead got \'{version}\'')
raise FingerprintVersionError(err)
_validate_version(version=version)
_validate_input_type(input=input)
_validate_hash_function(hash_function=hash_function, version=version)

try:
loaded = json.loads(input)
except Exception:
err = 'Unable to load JSON'
raise FingerprintJSONLoadError(err) from None

flattened_json = _flatten_json(data=loaded)
sorted_hash_list = _create_sorted_sha256_hash_list(data=flattened_json)
hex_digest = _create_json_sha256_hash(sorted_hash_list)
return f'jfpv1${hash_function}${hex_digest}'
return _create_jfpv1_fingerprint(data=loaded, hash_function=hash_function, version=version)
3 changes: 3 additions & 0 deletions json_fingerprint/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .test_json_fingerprint import TestJsonFingerprint
from .test_jfpv1 import TestJfpv1
from .test_validators import TestValidators
Loading

0 comments on commit 130be43

Please sign in to comment.