diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index c4d2884..e735b5a 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -40,4 +40,8 @@ jobs: flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - name: Test with pytest run: | + mkdir keys + echo '${{secrets.BARRACUDA_ATKEYS}}' > keys/@27barracuda_key.atKeys + echo '${{secrets.AMATEUR_ATKEYS}}' > keys/@amateur93_key.atKeys + echo '${{secrets.UNIVERSALALOO_ATKEYS}}' > keys/@universal27aloo_key.atKeys python -m unittest discover -s test -p '*_test.py' -v diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0b178a1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,212 @@ +.idea/ + +### Django ### +*.log +*.pot +*.pyc +__pycache__/ +local_settings.py +db.sqlite3 +media + +### Linux ### +*~ + +# temporary files which can be created if a process still has a handle open of a deleted file +.fuse_hidden* + +# KDE directory preferences +.directory + +# Linux trash folder which might appear on any partition or disk +.Trash-* + +# .nfs files are created when an open file is removed but is still being accessed +.nfs* + +### macOS ### +*.DS_Store +.AppleDouble +.LSOverride + +# Icon must end with two \r +Icon + +# Thumbnails +._* + +# Files that might appear in the root of a volume +.DocumentRevisions-V100 +.fseventsd +.Spotlight-V100 +.TemporaryItems +.Trashes +.VolumeIcon.icns +.com.apple.timemachine.donotpresent + +# Directories potentially created on remote AFP share +.AppleDB +.AppleDesktop +Network Trash Folder +Temporary Items +.apdisk + +### PyCharm ### +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff: +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/dictionaries + +# Sensitive or high-churn files: +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.xml +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml + +# Gradle: +.idea/**/gradle.xml +.idea/**/libraries + +# CMake +cmake-build-debug/ + +# Mongo Explorer plugin: +.idea/**/mongoSettings.xml + +## File-based project format: +*.iws + +## Plugin-specific files: + +# IntelliJ +/out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +### PyCharm Patch ### +# Comment Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-215987721 + +# *.iml +# modules.xml +# .idea/misc.xml +# *.ipr + +# Sonarlint plugin +.idea/sonarlint + +### Python ### +# Byte-compiled / optimized / DLL files +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover +.hypothesis/ + +# Translations +*.mo + +# Django stuff: + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# dotenv +.env + +# virtualenv +.venv +venv/ +ENV/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +/keys \ No newline at end of file diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 022514b..c782fbe 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -28,8 +28,10 @@ describe. The atsign-foundation GitHub organization's conventions and configurat ### Prerequisites - ``` sh - # show how to install the tools needed to work with the code here +1. Download latest python from https://www.python.org/downloads/ +2. Install required libraries using below command. + ``` + pip install -r requirements.txt ``` @@ -37,22 +39,22 @@ describe. The atsign-foundation GitHub organization's conventions and configurat To prepare your dedicated GitHub repository: -1. Fork in GitHub https://github.com/atsign-foundation/REPO -2. Clone *your forked repository* (e.g., `git clone git@github.com:yourname/REPO`) +1. Fork in GitHub https://github.com/atsign-foundation/at_python +2. Clone *your forked repository* (e.g., `git clone git@github.com:yourname/at_python`) 3. Set your remotes as follows: ```sh - cd REPO - git remote add upstream git@github.com:atsign-foundation/REPO.git + cd at_python + git remote add upstream git@github.com:atsign-foundation/at_python.git git remote set-url upstream --push DISABLED ``` Running `git remote -v` should give something similar to: ```text - origin git@github.com:yourname/REPO.git (fetch) - origin git@github.com:yourname/REPO.git (push) - upstream git@github.com:atsign-foundation/REPO.git (fetch) + origin git@github.com:yourname/at_python.git (fetch) + origin git@github.com:yourname/at_python.git (push) + upstream git@github.com:atsign-foundation/at_python.git (fetch) upstream DISABLED (push) ``` @@ -90,44 +92,14 @@ To prepare your dedicated GitHub repository: 1. How to run tests: ``` sh - # explain tests here + python -m unittest discover -s test -p '*_test.py' -v ``` 1. Open a new Pull Request to the main repository using your `trunk` branch - -## atLibrary release process - -The Atsign Foundation produces several widgets and libraries that the app developer -can make use of to develop apps on atProtocol. These libraries are developed in -Dart & Flutter and published to [pub.dev](https://pub.dev/publishers/atsign.org/packages). - -![alt_text](images/image1.png "Version flow") - -## Following the changes - -The Atsign Foundation publishes libraries and widgets to -[https://pub.dev/publishers/atsign.org/packages](https://pub.dev/publishers/atsign.org/packages). -Each of these libraries contains a tab called “Changelog” that shows various -published versions and a short description of what changes that went in. - -![alt_text](images/image2.png "Changelog screenshot") - -Also the “Versions” tab shows the versions published in the reverse -chronological order. - -![alt_text](images/image3.png "Versions screenshot") - ## Reporting a bug -The best place to start reporting bugs on the libraries published by -atProtocol would be the “View/report issues” link available on -[pub.dev](https://pub.dev/publishers/atsign.org/packages). - -![alt_text](images/image4.png "View/report issues highlight") - -Once the link is clicked, one should be redirected to GitHub repo where the -issue can be reported by clicking on the “New issue” button. +Issue can be reported by clicking on the “New issue” button in the GitHub repo. ![alt_text](images/image5.png "Issues list") diff --git a/README.md b/README.md index 165cc0a..f39fa2d 100644 --- a/README.md +++ b/README.md @@ -1,73 +1,26 @@ -# Sample README +# The atPlatform for Python developers -Open with intent - we welcome contributions - we want pull requests and to hear about issues. +This repo contains library, samples and examples for developers who wish +to work with the atPlatform from Python code. -## Who is this for? +## Open source usage and contributions -The README should be addressed to somebody who's never seen this before. -But also don't assume that they're a novice. +This is open source code, so feel free to use it as is, suggest changes or +enhancements or create your own version. See [CONTRIBUTING.md](./CONTRIBUTING.md) +for detailed guidance on how to setup tools, tests and make a pull request. -### Code user +## Acknowledgement/attribution -Does this repo publish to [pub.dev](https://pub.dev) or similar? -In which case the code user just needs a pointer there - e.g. [at_client on pub.dev](https://pub.dev/packages/at_client) +This project was originally created by [Umang Shah](https://github.com/shahumang19). -### Contributor +## Installation -This is the person who we want working with us here. -[CONTRIBUTING.md](CONTRIBUTING.md) is going to have the detailed guidance on how to setup their tools, -tests and how to make a pull request. - -## Why, What, How? - -### Why? - -What is the purpose of this project? - -### What? - -What is needed to get the project and its dependencies installed? - -### How? - -How does this work? How is this used to fulfil its intended purpose? - -## Checklist - -### Writing - -Does the writing flow, with proper grammar and correct spelling? - -### Links - -Are the links to external resources correct? -Are the links to other parts of the project correct -(beware stuff carried over from previous repos where the -project might have lived during earlier development)? - -### Description - -Has the Description field been filled out? - -### Acknowledgement/Attribution - -Have we correctly acknowledged the work of others (and their Trademarks etc.) -where appropriate (per the conditions of their LICENSE? - -### LICENSE - -Which LICENSE are we using? -Is the LICENSE(.md) file present? -Does it have the correct dates, legal entities etc.? +``` +pip install -r requirements.txt +``` ## Maintainers -Who created this? - -Do they have complete GitHub profiles? - -How can they be contacted? - -Who is going to respond to pull requests? +This project is currently maintained by [Umang Shah](https://github.com/shahumang19) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..fcb674b --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +cffi==1.15.1 +cryptography==40.0.2 +pycparser==2.21 +python-dateutil==2.8.2 +six==1.16.0 diff --git a/src/common/__init__.py b/src/common/__init__.py index 1d37656..a10f4dc 100644 --- a/src/common/__init__.py +++ b/src/common/__init__.py @@ -1 +1,4 @@ -from atsign import AtSign \ No newline at end of file +from .atsign import AtSign +from .atclient import AtClient +from .metadata import Metadata +from .keys import * \ No newline at end of file diff --git a/src/common/atclient.py b/src/common/atclient.py new file mode 100644 index 0000000..da42379 --- /dev/null +++ b/src/common/atclient.py @@ -0,0 +1,79 @@ +import json +from src.common import AtSign +from src.util.verbbuilder import * +from src.util.encryptionutil import EncryptionUtil +from src.util.keysutil import KeysUtil +from src.common.keys import Keys +from src.common.metadata import Metadata +from src.common.exception.atexception import AtException +from src.connections.atrootconnection import AtRootConnection +from src.connections.atsecondaryconnection import AtSecondaryConnection +from src.connections.address import Address + +class AtClient(ABC): + def __init__(self, atsign:AtSign, root_address:Address=Address("root.atsign.org", 64), secondary_address:Address=None, verbose:bool = False): + self.atsign = atsign + self.keys = KeysUtil.load_keys(atsign) + self.verbose = verbose + if secondary_address is None: + self.root_connection = AtRootConnection.get_instance(host=root_address.host, + port=root_address.port, + verbose=verbose) + secondary_address = self.root_connection.find_secondary(atsign) + self.secondary_connection = AtSecondaryConnection(secondary_address, verbose=verbose) + self.secondary_connection.connect() + self.authenticated = self.pkam_authenticate() + + def pkam_authenticate(self): + command = FromVerbBuilder().set_shared_by(self.atsign).build() + from_response = self.secondary_connection.execute_command(command) + + try: + signature = EncryptionUtil.sign_sha256_rsa(from_response, self.keys[KeysUtil.pkam_private_key_name]) + except: + raise Exception("Failed to create SHA256 signature") + + command = PKAMVerbBuilder().set_digest(signature).build() + pkam_response = self.secondary_connection.execute_command(command) + + if self.verbose: + print("Authentication Successful") + + return True + + def get_at_keys(self, regex, fetch_metadata): + scan_command = ScanVerbBuilder().set_regex(regex).set_show_hidden(True).build() + try: + scan_raw_response = self.secondary_connection.execute_command(scan_command) + except Exception as e: + raise AtException(f"Failed to execute : {scan_command} : {e}") + + keys_list = [] + if len(scan_raw_response) > 0: + keys_list = json.loads(scan_raw_response) + # print(keys_list) + at_keys = [] + for at_key_raw in keys_list: + at_key = Keys.from_string(at_key_raw) + if fetch_metadata: + llookup_command = "llookup:meta:" + at_key_raw + try: + llookup_meta_response = self.secondary_connection.execute_command(llookup_command, read_the_response=True) + except Exception as e: + raise AtException(f"Failed to execute : {llookup_command} : {e}") + + try: + at_key.metadata = Metadata.squash(at_key.metadata, Metadata.from_json(llookup_meta_response)) + except Exception as e: + raise AtException(f"Failed to parse JSON : {llookup_meta_response} : {e}") + + at_keys.append(at_key) + + return at_keys + + def is_authenticated(self): + return self.authenticated + + def __del__(self): + if self.secondary_connection: + self.secondary_connection.disconnect() diff --git a/src/common/atsign.py b/src/common/atsign.py index 87f4e58..a094e7e 100644 --- a/src/common/atsign.py +++ b/src/common/atsign.py @@ -3,13 +3,14 @@ def __init__(self, atsign): if atsign is None or atsign.strip() == "": raise ValueError("atSign may not be null or empty") - self.atsign = self.format_atsign(atsign) + self._atsign = self.format_atsign(atsign) - if self.atsign == "@": - raise ValueError(f"'{self.atsign}' is not a valid atSign") + if self._atsign == "@": + raise ValueError(f"'{self._atsign}' is not a valid atSign") - self.without_prefix = self.atsign[1:] + self._without_prefix_str = self._atsign[1:] + @property def without_prefix(self): """ Returns an atsign without @ prefix @@ -19,17 +20,17 @@ def without_prefix(self): str An atsign without prefix (e.g. "@alice " --> "alice"). """ - return self.without_prefix + return self._without_prefix_str def __str__(self): - return self.atsign + return self._atsign def __eq__(self, other): if self is other: return True if not isinstance(other, AtSign): return False - return self.atsign == other.atsign + return self._atsign == other.atsign @staticmethod def format_atsign(atsign_str): diff --git a/src/common/exception/__init__.py b/src/common/exception/__init__.py index 8b30ad5..6a3f83e 100644 --- a/src/common/exception/__init__.py +++ b/src/common/exception/__init__.py @@ -1 +1 @@ -from atexception import AtException \ No newline at end of file +from src.common.exception.atexception import AtException \ No newline at end of file diff --git a/src/common/keys.py b/src/common/keys.py new file mode 100644 index 0000000..ce0bd13 --- /dev/null +++ b/src/common/keys.py @@ -0,0 +1,144 @@ +from src.common.atsign import AtSign +from src.common.metadata import Metadata +from src.common.exception import AtException +from src.util.keystringutil import KeyStringUtil, KeyType + +class Keys: + @staticmethod + def from_string(full_at_key_name: str): + key_string_util = KeyStringUtil(full_at_key_name) + key_type = key_string_util.get_key_type() + key_name = key_string_util.get_key_name() + shared_by = AtSign(key_string_util.get_shared_by()) + shared_with = AtSign(key_string_util.get_shared_with()) if key_string_util.get_shared_with() else None + namespace = key_string_util.get_namespace() + is_cached = key_string_util.is_cached() + is_hidden = key_string_util.is_hidden() + + at_key = None + if key_type == KeyType.PUBLIC_KEY: + at_key = PublicKey(key_name, shared_by) + elif key_type == KeyType.SHARED_KEY: + at_key = SharedKey(key_name, shared_by, shared_with) + elif key_type == KeyType.SELF_KEY: + at_key = SelfKey(key_name, shared_by, shared_with) + elif key_type == KeyType.PRIVATE_HIDDEN_KEY: + at_key = PrivateHiddenKey(key_name, shared_by) + else: + raise AtException(f"Could not find KeyType for Key {full_at_key_name}") + + at_key.set_namespace(namespace) + at_key.metadata.is_cached = is_cached + if not at_key.metadata.is_hidden: + at_key.metadata.is_hidden = is_hidden # If KeyBuilders constructor did not already evaluate is_hidden, then do it here + + return at_key + + + +class AtKey: + def __init__(self, name, shared_by): + self.name = name + self.shared_with = None + self.shared_by = shared_by + self.namespace = None + self.metadata = Metadata() + + def __str__(self): + s = "" + if self.metadata.is_public: + s += "public:" + elif self.shared_with: + s += str(self.shared_with) + ":" + s += self.get_fully_qualified_key_name() + if self.shared_by: + s += str(self.shared_by) + return s + + def get_namespace(self): + return self.namespace + + def set_namespace(self, namespace): + if namespace: + while namespace.startswith("."): + namespace = namespace[1:] + namespace = namespace.strip() + self.namespace = namespace + return self + + def get_fully_qualified_key_name(self): + return self.name + (f".{self.namespace}" if self.namespace else "") + + def set_name(self, name): + self.name = name.strip() + return self + + def set_time_to_live(self, ttl: int): + self.metadata.ttl = ttl + return self + + def set_time_to_birth(self, ttb: int): + self.metadata.ttb = ttb + return self + + +class PublicKey(AtKey): + def __init__(self, name, shared_by: AtSign): + super().__init__(name, shared_by=shared_by) + self.metadata.is_public = True + self.metadata.is_encrypted = False + self.metadata.is_hidden = False + + def cache(self, ttr, ccd): + self.metadata.ttr = ttr + self.metadata.ccd = ccd + self.metadata.is_cached = (ttr != 0) + return self + + +class SelfKey(AtKey): + def __init__(self, name, shared_by: AtSign, shared_with: AtSign = None): + super().__init__(name, shared_by=shared_by) + self.shared_with = shared_with + self.metadata.is_public = False + self.metadata.is_encrypted = True + self.metadata.is_hidden = False + + +class SharedKey(AtKey): + def __init__(self, name, shared_by: AtSign, shared_with: AtSign): + super().__init__(name, shared_by=shared_by) + if not shared_with: + raise AtException("SharedKey: shared_with may not be null") + self.shared_with = shared_with + self.metadata.is_public = False + self.metadata.is_encrypted = True + self.metadata.is_hidden = False + + def cache(self, ttr, ccd): + self.metadata.ttr = ttr + self.metadata.ccd = ccd + self.metadata.is_cached = (ttr != 0) + return self + + @staticmethod + def from_string(key: str) -> 'SharedKey': + if not key: + raise AtException("SharedKey.from_string(key): key may not be null") + split_by_colon = key.split(":") + if len(split_by_colon) != 2: + raise AtException("SharedKey.from_string('" + key + "'): key must have structure @bob:foo.bar@alice") + shared_with = split_by_colon[0] + split_by_at_sign = split_by_colon[1].split("@") + if len(split_by_at_sign) != 2: + raise AtException("SharedKey.from_string('" + key + "'): key must have structure @bob:foo.bar@alice") + key_name = split_by_at_sign[0] + shared_by = split_by_at_sign[1] + shared_key = SharedKey(AtSign(shared_by), AtSign(shared_with)) + shared_key.name = key_name + return shared_key + + +class PrivateHiddenKey(AtKey): + def __init__(self, name, shared_by: AtSign): + super().__init__(name, shared_by=shared_by) \ No newline at end of file diff --git a/src/common/metadata.py b/src/common/metadata.py new file mode 100644 index 0000000..796b046 --- /dev/null +++ b/src/common/metadata.py @@ -0,0 +1,120 @@ +import json +import datetime +from dateutil.parser import parse +from dataclasses import dataclass + + +@dataclass +class Metadata: + ttl: int = 0 + ttb: int = 0 + ttr: int = 0 + ccd: bool = False + created_by: str = None + updated_by: str = None + available_at: datetime.datetime = None + expires_at: datetime.datetime = None + refresh_at: datetime.datetime = None + created_at: datetime.datetime = None + updated_at: datetime.datetime = None + status: str = None + version: int = 0 + data_signature: str = None + shared_key_status: str = None + is_public: bool = False + is_encrypted: bool = True + is_hidden: bool = False + namespace_aware: bool = True + is_binary: bool = False + is_cached: bool = False + shared_key_enc: str = None + pub_key_cs: str = None + encoding: str = None + + def parse_datetime(datetime_str): + if datetime_str is not None: + return parse(datetime_str) + return None + + @staticmethod + def from_json(json_str): + data = json.loads(json_str) + metadata = Metadata() + metadata.ttl = data.get('ttl') + metadata.ttb = data.get('ttb') + metadata.ttr = data.get('ttr') + metadata.ccd = data.get('ccd') + metadata.created_by = data.get('createdBy') + metadata.updated_by = data.get('updatedBy') + metadata.available_at = Metadata.parse_datetime(data.get('availableAt')) + metadata.expires_at = Metadata.parse_datetime(data.get('expiresAt')) + metadata.refresh_at = Metadata.parse_datetime(data.get('refreshAt')) + metadata.created_at = Metadata.parse_datetime(data.get('createdAt')) + metadata.updated_at = Metadata.parse_datetime(data.get('updatedAt')) + metadata.status = data.get('status') + metadata.version = data.get('version') + metadata.data_signature = data.get('dataSignature') + metadata.shared_key_status = data.get('sharedKeyStatus') + metadata.is_public = data.get('isPublic', False) + metadata.is_encrypted = data.get('isEncrypted', True) + metadata.is_hidden = data.get('isHidden', False) + metadata.namespace_aware = data.get('namespaceAware', True) + metadata.is_binary = data.get('isBinary', False) + metadata.is_cached = data.get('isCached', False) + metadata.shared_key_enc = data.get('sharedKeyEnc') + metadata.pub_key_cs = data.get('pubKeyCS') + metadata.encoding = data.get('encoding') + return metadata + + + def __str__(self): + s = "" + if self.ttl: + s += f":ttl:{self.ttl}" + if self.ttb: + s += f":ttb:{self.ttb}" + if self.ttr: + s += f":ttr:{self.ttr}" + if self.ccd: + s += f":ccd:{self.ccd}" + if self.data_signature: + s += f":dataSignature:{self.data_signature}" + if self.shared_key_status: + s += f":sharedKeyStatus:{self.shared_key_status}" + if self.shared_key_enc: + s += f":sharedKeyEnc:{self.shared_key_enc}" + if self.pub_key_cs: + s += f":pubKeyCS:{self.pub_key_cs}" + if self.is_binary: + s += f":isBinary:{self.is_binary}" + if self.is_encrypted: + s += f":isEncrypted:{self.is_encrypted}" + if self.encoding: + s += f":encoding:{self.encoding}" + return s + + @staticmethod + def squash(first_metadata, second_metadata): + metadata = Metadata() + metadata.ttl = first_metadata.ttl if first_metadata.ttl is not None else second_metadata.ttl + metadata.ttb = first_metadata.ttb if first_metadata.ttb is not None else second_metadata.ttb + metadata.ttr = first_metadata.ttr if first_metadata.ttr is not None else second_metadata.ttr + metadata.ccd = first_metadata.ccd if first_metadata.ccd is not None else second_metadata.ccd + metadata.available_at = first_metadata.available_at if first_metadata.available_at is not None else second_metadata.available_at + metadata.expires_at = first_metadata.expires_at if first_metadata.expires_at is not None else second_metadata.expires_at + metadata.refresh_at = first_metadata.refresh_at if first_metadata.refresh_at is not None else second_metadata.refresh_at + metadata.created_at = first_metadata.created_at if first_metadata.created_at is not None else second_metadata.created_at + metadata.updated_at = first_metadata.updated_at if first_metadata.updated_at is not None else second_metadata.updated_at + metadata.data_signature = first_metadata.data_signature if first_metadata.data_signature is not None else second_metadata.data_signature + metadata.shared_key_status = first_metadata.shared_key_status if first_metadata.shared_key_status is not None else second_metadata.shared_key_status + metadata.shared_key_enc = first_metadata.shared_key_enc if first_metadata.shared_key_enc is not None else second_metadata.shared_key_enc + metadata.is_public = first_metadata.is_public if first_metadata.is_public is not None else second_metadata.is_public + metadata.is_encrypted = first_metadata.is_encrypted if first_metadata.is_encrypted is not None else second_metadata.is_encrypted + metadata.is_hidden = first_metadata.is_hidden if first_metadata.is_hidden is not None else second_metadata.is_hidden + metadata.namespace_aware = first_metadata.namespace_aware if first_metadata.namespace_aware is not None else second_metadata.namespace_aware + metadata.is_binary = first_metadata.is_binary if first_metadata.is_binary is not None else second_metadata.is_binary + metadata.is_cached = first_metadata.is_cached if first_metadata.is_cached is not None else second_metadata.is_cached + metadata.shared_key_enc = first_metadata.shared_key_enc if first_metadata.shared_key_enc is not None else second_metadata.shared_key_enc + metadata.pub_key_cs = first_metadata.pub_key_cs if first_metadata.pub_key_cs is not None else second_metadata.pub_key_cs + metadata.encoding = first_metadata.encoding if first_metadata.encoding is not None else second_metadata.encoding + return metadata diff --git a/src/connections/__init__.py b/src/connections/__init__.py index c3a1597..18b7fda 100644 --- a/src/connections/__init__.py +++ b/src/connections/__init__.py @@ -1,2 +1,4 @@ -from atconnection import AtConnection -from atrootconnection import AtRootConnection \ No newline at end of file +from src.connections.address import Address +from src.connections.atconnection import AtConnection +from src.connections.atrootconnection import AtRootConnection +from src.connections.atsecondaryconnection import AtSecondaryConnection \ No newline at end of file diff --git a/src/connections/address.py b/src/connections/address.py new file mode 100644 index 0000000..7a5631b --- /dev/null +++ b/src/connections/address.py @@ -0,0 +1,27 @@ +class Address: + def __init__(self, host, port): + self._host = host + self._port = port + + @property + def host(self): + return self._host + + @property + def port(self): + return self._port + + def __str__(self): + return self._host + ":" + str(self._port) + + @staticmethod + def from_string(host_and_port): + split = host_and_port.split(":") + if len(split) != 2: + raise ValueError("Cannot construct Address from malformed host:port string '" + host_and_port + "'") + host = split[0] + try: + port = int(split[1]) + except ValueError: + raise ValueError("Cannot construct Address from malformed host:port string '" + host_and_port + "'") + return Address(host, port) diff --git a/src/connections/atconnection.py b/src/connections/atconnection.py index cf65ccd..af249e2 100644 --- a/src/connections/atconnection.py +++ b/src/connections/atconnection.py @@ -2,7 +2,7 @@ import ssl from abc import ABC, abstractmethod -from common.exception import AtException +from src.common.exception.atexception import AtException class AtConnection(ABC): @@ -20,13 +20,14 @@ def __init__(self, host:str, port:int, context:ssl.SSLContext, verbose:bool=Fals - context (ssl.SSLContext): The SSL context for secure connections. - verbose (bool, optional): Indicates if verbose output is enabled (default is False). """ - self.host = host - self.port = port - self.context = context - self.addr_info = socket.getaddrinfo(host, port)[0][-1] + self._host = host + self._port = port + self._context = context + self._addr_info = socket.getaddrinfo(host, port)[0][-1] self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.secure_root_socket = None - self.verbose = verbose + self._secure_root_socket = None + self._verbose = verbose + self._connected = False def __str__(self): """ @@ -35,7 +36,7 @@ def __str__(self): Returns: - str: A string representation of the AtConnection object in the format "host:port". """ - return f"{self.host}:{self.port}" + return f"{self._host}:{self._port}" def write(self, data: str): """ @@ -44,7 +45,7 @@ def write(self, data: str): Parameters: - data (str): The data to be written to the socket. """ - self.secure_root_socket.write(data.encode()) + self._secure_root_socket.write(data.encode()) def read(self): """ @@ -54,8 +55,11 @@ def read(self): - str: The data read from the socket. """ response = b'' - data = self.secure_root_socket.read(2048) - response += data + while True: + chunk = self._secure_root_socket.read() # Receive data in chunks of 1024 bytes + response += chunk + if chunk == b'@' or b'\n' in chunk: + break return response.decode() def is_connected(self): @@ -65,26 +69,26 @@ def is_connected(self): Returns: - bool: True if the connection is established, False otherwise. """ - return self.connected + return self._connected def connect(self): """ Establish a connection to the server. Throws IOException """ - if not self.connected: - self._socket.connect(self.addr_info) - self.secure_root_socket = self.context.wrap_socket( - self._socket, server_hostname=self.host, do_handshake_on_connect=True + if not self._connected: + self._socket.connect(self._addr_info) + self._secure_root_socket = self._context.wrap_socket( + self._socket, server_hostname=self._host, do_handshake_on_connect=True ) - self.connected = True + self._connected = True self.read() def disconnect(self): """ Close the socket connection. """ - self.secure_root_socket.close() - self.connected = False + self._secure_root_socket.close() + self._connected = False @abstractmethod def parse_raw_response(self, raw_response:str): @@ -113,12 +117,12 @@ def execute_command(self, command:str, retry_on_exception:int=0, read_the_respon command += "\n" self.write(command) - if self.verbose: + if self._verbose: print(f"\tSENT: {repr(command.strip())}") if read_the_response: raw_response = self.read() - if self.verbose: + if self._verbose: print(f"\tRCVD: {repr(raw_response)}") return self.parse_raw_response(raw_response) @@ -135,5 +139,5 @@ def execute_command(self, command:str, retry_on_exception:int=0, read_the_respon traceback.print_exc() raise AtException(f"Failed to reconnect after original exception {str(first)}: ", second) else: - self.connected = False + self._connected = False raise AtException(str(first)) diff --git a/src/connections/atrootconnection.py b/src/connections/atrootconnection.py index 80dd4b5..d20ed79 100644 --- a/src/connections/atrootconnection.py +++ b/src/connections/atrootconnection.py @@ -1,7 +1,8 @@ import ssl -from common import AtSign -from common.exception import AtException -from atconnection import AtConnection +from src.common import AtSign +from src.common.exception.atexception import AtException +from src.connections.atconnection import AtConnection +from src.connections.address import Address class AtRootConnection(AtConnection): @@ -54,17 +55,15 @@ def __init__(self, host:str, port:int, context:ssl.SSLContext, verbose:bool): if AtRootConnection.__instance is not None: raise Exception("Singleton class - use AtRootConnection.get_instance() instead") else: - self.host = host - self.port = port - self.context = context - self.verbose = verbose AtRootConnection.__instance = self super().__init__(host, port, context, verbose) def connect(self): - """Establish a connection to the root server.""" + """ + Establish a connection to the root server. + """ super().connect() - if self.verbose: + if self._verbose: print("Root Connection Successful") def parse_raw_response(self, raw_response:str): @@ -113,12 +112,12 @@ def find_secondary(self, atsign:AtSign): # Connect will only throw an AtException if authentication fails. Root connections do not require authentication. raise AtException(f"Root Connection failed - {e}") - response = self.execute_command(atsign.without_prefix()) + response = self.execute_command(atsign.without_prefix) if response == "null": raise AtException(f"Root lookup returned null for {atsign}") else: try: - return response + return Address.from_string(response) except ValueError as e: raise AtException(f"Received malformed response {response} from lookup of {atsign} on root server") diff --git a/src/connections/atsecondaryconnection.py b/src/connections/atsecondaryconnection.py new file mode 100644 index 0000000..aae0c60 --- /dev/null +++ b/src/connections/atsecondaryconnection.py @@ -0,0 +1,67 @@ +import ssl +from src.connections.atconnection import AtConnection +from src.connections.address import Address +from src.common.exception import AtException + + +class AtSecondaryConnection(AtConnection): + """ + Subclass of AtConnection representing a connection to the secondary server in the atprotocol. + """ + + def __init__(self, address: Address, context:ssl.SSLContext=ssl.create_default_context(), verbose:bool=False): + """ + Initialize the AtSecondaryConnection object. + + Parameters + ---------- + host : str + The host name or IP address of the secondary server. + port : int + The port number of the secondary server. + context : ssl.SSLContext, optional + The SSL context for secure connections (default is ssl.create_default_context()). + verbose : bool, optional + Indicates if verbose output is enabled (default is False). + """ + super().__init__(address.host, address.port, context, verbose) + + def connect(self): + """ + Establish a connection to the secondary server. + """ + super().connect() + if self._verbose: + print("Secondary Connection Successful") + + def parse_raw_response(self, raw_response:str): + """ + Parse the raw response from the secondary server. + + Parameters + ---------- + raw_response : str + The raw response received from the secondary server. + + Returns + ------- + str + The parsed response from the secondary server. + """ + if raw_response.endswith("@"): + raw_response = raw_response[:-1] + raw_response = raw_response.strip() + + # return raw_response + data_index = raw_response.find("data:") + + error_index = raw_response.find("error:") + notification_index = raw_response.find("notification") + if data_index > -1: + return raw_response[data_index+len("data:"):].split("\n")[0] + elif error_index > -1: + raise AtException(raw_response[error_index+len("error:"):]) + elif notification_index > -1: + return raw_response[notification_index+len("notification"):] + else: + raise ValueError(f"Invalid response from server: {raw_response}") diff --git a/src/util/__init__.py b/src/util/__init__.py new file mode 100644 index 0000000..f7a3ec1 --- /dev/null +++ b/src/util/__init__.py @@ -0,0 +1,3 @@ +from src.util.encryptionutil import EncryptionUtil +from src.util.keysutil import KeysUtil +from src.util.verbbuilder import * \ No newline at end of file diff --git a/src/util/encryptionutil.py b/src/util/encryptionutil.py new file mode 100644 index 0000000..3bfd253 --- /dev/null +++ b/src/util/encryptionutil.py @@ -0,0 +1,89 @@ +import base64 +import os +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives import padding, serialization +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.asymmetric import rsa, padding as rsa_padding +from cryptography.hazmat.primitives.serialization import load_der_private_key, load_der_public_key +from cryptography.hazmat.primitives import hashes + + +class EncryptionUtil: + @staticmethod + def aes_encrypt_from_base64(clear_text, key_base64, iv=b'\x00' * 16): + # clear_text = clear_text.encode('utf-8') + key = base64.b64decode(key_base64) + cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=default_backend()) + encryptor = cipher.encryptor() + padder = padding.PKCS7(algorithms.AES.block_size).padder() + padded_plaintext = padder.update(clear_text) + padder.finalize() + cipher_text = encryptor.update(padded_plaintext) + encryptor.finalize() + return base64.b64encode(cipher_text).decode() + + @staticmethod + def aes_decrypt_from_base64(encrypted_text, self_encryption_key, iv=b'\x00' * 16): + cipher_text = base64.b64decode(encrypted_text) + key = base64.b64decode(self_encryption_key) + cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=default_backend()) + decryptor = cipher.decryptor() + plain_text = decryptor.update(cipher_text) + decryptor.finalize() + + # Unpad the plain_text using PKCS7 padding + padder = padding.PKCS7(algorithms.AES.block_size).unpadder() + plain_text = padder.update(plain_text) + padder.finalize() + + # Print the decrypted plaintext + return plain_text.decode('utf-8') + + @staticmethod + def generate_rsa_key_pair(): + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend()) + public_key = private_key.public_key() + private_key_bytes = private_key.private_bytes(encoding=serialization.Encoding.DER, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption()) + public_key_bytes = public_key.public_bytes(encoding=serialization.Encoding.DER, + format=serialization.PublicFormat.SubjectPublicKeyInfo) + return private_key_bytes, public_key_bytes + + @staticmethod + def generate_aes_key_base64(): + return base64.b64encode(os.urandom(32)).decode("utf-8") + + @staticmethod + def rsa_decrypt_from_base64(cipher_text, private_key_bytes): + private_key = EncryptionUtil.private_key_from_base64(private_key_bytes) + decrypted_bytes = private_key.decrypt( + base64.b64decode(cipher_text), + rsa_padding.PKCS1v15() + ) + return decrypted_bytes.decode('utf-8') + + @staticmethod + def rsa_encrypt_to_base64(clear_text, public_key_bytes): + public_key = EncryptionUtil.public_key_from_base64(public_key_bytes) + encrypted_bytes = public_key.encrypt( + clear_text.encode('utf-8'), + rsa_padding.PKCS1v15() + ) + return base64.b64encode(encrypted_bytes).decode('utf-8') + + @staticmethod + def sign_sha256_rsa(input_data, private_key_bytes): + private_key = EncryptionUtil.private_key_from_base64(private_key_bytes) + signature = private_key.sign( + input_data.encode('utf-8'), + rsa_padding.PKCS1v15(), + hashes.SHA256() + ) + return base64.b64encode(signature).decode('utf-8') + + @staticmethod + def private_key_from_base64(s): + key_bytes = base64.b64decode(s.encode('utf-8')) + return load_der_private_key(key_bytes, password=None) + + @staticmethod + def public_key_from_base64(s): + key_bytes = base64.b64decode(s.encode('utf-8')) + return load_der_public_key(key_bytes) diff --git a/src/util/keystringutil.py b/src/util/keystringutil.py new file mode 100644 index 0000000..920559b --- /dev/null +++ b/src/util/keystringutil.py @@ -0,0 +1,90 @@ +class KeyType: + PUBLIC_KEY = "PUBLIC_KEY" + SHARED_KEY = "SHARED_KEY" + SELF_KEY = "SELF_KEY" + PRIVATE_HIDDEN_KEY = "PRIVATE_HIDDEN_KEY" + + +class KeyStringUtil: + def __init__(self, full_key_name): + self._full_key_name = full_key_name + self._key_name = None + self._key_type = None + self._namespace = None + self._shared_by = None + self._shared_with = None + self._is_cached = False + self._is_hidden = False + + self._evaluate(full_key_name) + + def get_full_key_name(self): + return self._full_key_name + + def get_key_name(self): + return self._key_name + + def get_namespace(self): + return self._namespace + + def get_key_type(self): + return self._key_type + + def get_shared_by(self): + return self._shared_by + + def get_shared_with(self): + return self._shared_with + + def is_cached(self): + return self._is_cached + + def is_hidden(self): + return self._is_hidden + + def _evaluate(self, full_key_name): + split1 = full_key_name.split(":") + + if len(split1) > 1: + if split1[0] == "public" or (split1[0] == "cached" and split1[1] == "public"): + self._key_type = KeyType.PUBLIC_KEY + elif split1[0] == "private" or split1[0] == "privatekey": + self._key_type = KeyType.PRIVATE_HIDDEN_KEY + self._is_hidden = True + + if split1[0].startswith("@") or split1[1].startswith("@"): + if self._key_type is None: + self._key_type = KeyType.SHARED_KEY + if split1[0].startswith("@"): + self._shared_with = split1[0][1:] + else: + self._shared_with = split1[1][1:] + + split2 = split1[-1].split("@") + self._key_name = split2[0] + self._shared_by = split2[1] + + if split1[0] == "cached": + self._is_cached = True + + if self._shared_by == self._shared_with: + self._key_type = KeyType.SELF_KEY + else: + if split1[0].startswith("_"): + self._key_type = KeyType.PRIVATE_HIDDEN_KEY + else: + self._key_type = KeyType.SELF_KEY + + split2 = split1[0].split("@") + self._key_name = split2[0] + self._shared_by = split2[1] + + if self._key_name.startswith("shared_key"): + self._namespace = None + + if self._shared_by is not None: + self._shared_by = "@" + self._shared_by + if self._shared_with is not None: + self._shared_with = "@" + self._shared_with + if not self._is_hidden: + self._is_hidden = self._key_name.startswith("_") diff --git a/src/util/keysutil.py b/src/util/keysutil.py new file mode 100644 index 0000000..1a0be57 --- /dev/null +++ b/src/util/keysutil.py @@ -0,0 +1,47 @@ +import os +import json +import base64 +from typing import Dict, Tuple + +# from src.common.atsign import AtSign +from src.util.encryptionutil import EncryptionUtil + + +class KeysUtil: + expected_keys_files_location = os.path.expanduser("~/.atsign/keys/") + legacy_keys_files_location = os.path.join(os.getcwd(), "keys") + keys_file_suffix = "_key.atKeys" + + pkam_public_key_name = "aesPkamPublicKey" + pkam_private_key_name = "aesPkamPrivateKey" + encryption_public_key_name = "aesEncryptPublicKey" + encryption_private_key_name = "aesEncryptPrivateKey" + self_encryption_key_name = "selfEncryptionKey" + + @staticmethod + def load_keys(at_sign: str) -> Dict[str, str]: + + file = KeysUtil.get_keys_file(at_sign, KeysUtil.expected_keys_files_location) + if not os.path.exists(file): + file = KeysUtil.get_keys_file(at_sign, KeysUtil.legacy_keys_files_location) + if not os.path.exists(file): + raise Exception(f"load_keys: No file called {at_sign}{KeysUtil.keys_file_suffix} at {KeysUtil.expected_keys_files_location} or {KeysUtil.legacy_keys_files_location}\n" + "\tKeys files are expected to be in ~/.atsign/keys/ (canonical location) or ./keys/ (legacy location)") + + with open(file) as f: + encrypted_keys = json.load(f) + + self_encryption_key = encrypted_keys[KeysUtil.self_encryption_key_name] + keys = { + KeysUtil.self_encryption_key_name: self_encryption_key, + KeysUtil.pkam_public_key_name: EncryptionUtil.aes_decrypt_from_base64(encrypted_keys[KeysUtil.pkam_public_key_name], self_encryption_key), + KeysUtil.pkam_private_key_name: EncryptionUtil.aes_decrypt_from_base64(encrypted_keys[KeysUtil.pkam_private_key_name], self_encryption_key), + KeysUtil.encryption_public_key_name: EncryptionUtil.aes_decrypt_from_base64(encrypted_keys[KeysUtil.encryption_public_key_name], self_encryption_key), + KeysUtil.encryption_private_key_name: EncryptionUtil.aes_decrypt_from_base64(encrypted_keys[KeysUtil.encryption_private_key_name], self_encryption_key), + } + + return keys + + @staticmethod + def get_keys_file(at_sign: str, folder_to_look_in: str) -> str: + return os.path.join(folder_to_look_in, "{}{}".format(at_sign, KeysUtil.keys_file_suffix)) diff --git a/src/util/verbbuilder.py b/src/util/verbbuilder.py new file mode 100644 index 0000000..a58c818 --- /dev/null +++ b/src/util/verbbuilder.py @@ -0,0 +1,63 @@ +from enum import Enum +from abc import ABC, abstractmethod + +class VerbBuilder(ABC): + @abstractmethod + def build(self): + raise NotImplementedError("Subclasses must implement the build() method") + + +class FromVerbBuilder(VerbBuilder): + def __init__(self): + self.shared_by = "" + + def set_shared_by(self, shared_by): + self.shared_by = shared_by + return self + + def build(self): + return f"from:{self.shared_by}" + +class PKAMVerbBuilder(VerbBuilder): + def __init__(self): + self.digest = "" + + def set_digest(self, digest): + self.digest = digest + return self + + def build(self): + return f"pkam:{self.digest}" + + +class ScanVerbBuilder: + def __init__(self): + self.regex = None + self.from_at_sign = None + self.show_hidden = False + + def set_regex(self, regex): + self.regex = regex + return self + + def set_from_at_sign(self, from_at_sign): + self.from_at_sign = from_at_sign + return self + + def set_show_hidden(self, show_hidden): + self.show_hidden = show_hidden + return self + + def build(self): + command = "scan" + + if self.show_hidden: + command += ":showHidden:true" + + if self.from_at_sign is not None and self.from_at_sign.strip() != "": + command += ":" + self.from_at_sign + + if self.regex is not None and self.regex.strip() != "": + command += " " + self.regex + + return command diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..84b544b --- /dev/null +++ b/test/__init__.py @@ -0,0 +1,8 @@ +import os +import sys + +PROJECT_PATH = os.getcwd() +SOURCE_PATH = os.path.join( + PROJECT_PATH,"src" +) +sys.path.append(SOURCE_PATH) \ No newline at end of file diff --git a/test/atclient_test.py b/test/atclient_test.py new file mode 100644 index 0000000..d9048c5 --- /dev/null +++ b/test/atclient_test.py @@ -0,0 +1,23 @@ +import unittest + +from src.common import AtSign, AtClient + +class AtClientTest(unittest.TestCase): + verbose = False + + def test_atsign_pkam_authenticate(self): + """Test PKAM Authentication""" + atsign = AtSign("27barracuda") + atclient = AtClient(atsign, verbose=AtClientTest.verbose) + self.assertTrue(atclient.is_authenticated()) + + def test_get_at_keys(self): + atsign = AtSign("@27barracuda") + atclient = AtClient(atsign, verbose=AtClientTest.verbose) + my_keys = atclient.get_at_keys("", fetch_metadata=True) + + + +if __name__ == '__main__': + unittest.main() + \ No newline at end of file diff --git a/test/atrootconnection_test.py b/test/atrootconnection_test.py new file mode 100644 index 0000000..c4eabab --- /dev/null +++ b/test/atrootconnection_test.py @@ -0,0 +1,52 @@ +import unittest +from src.common import AtSign +from src.common.exception import AtException +from src.connections import AtRootConnection + +class AtRootConnectionTest(unittest.TestCase): + verbose = False + + def test_root_connection(self): + """ + Test root connection establishment. + """ + root_connection = AtRootConnection.get_instance(verbose=AtRootConnectionTest.verbose) + root_connection.connect() + self.assertTrue(root_connection.is_connected()) + + def test_find_secondary(self): + """ + Test finding a secondary server address. + """ + root_connection = AtRootConnection.get_instance(verbose=AtRootConnectionTest.verbose) + secondary_address = root_connection.find_secondary(AtSign("@27barracuda")) + self.assertIsNotNone(secondary_address) + + def test_find_secondary_failure(self): + """ + Test finding a secondary server address for a non-existent AtSign. + """ + try: + root_connection = AtRootConnection.get_instance(verbose=AtRootConnectionTest.verbose) + secondary_address = root_connection.find_secondary(AtSign("@wrongAtSign")) + except AtException as e: + self.assertEqual("Root lookup returned null for @wrongAtSign", str(e)) + + def test_find_multiple_secondary_addresses(self): + """ + Test finding multiple secondary server addresses. + """ + root_connection = AtRootConnection.get_instance(verbose=AtRootConnectionTest.verbose) + secondary_address1 = root_connection.find_secondary(AtSign("@27barracuda")) + secondary_address2 = root_connection.find_secondary(AtSign("@19total67")) + secondary_address3 = root_connection.find_secondary(AtSign("@wildgreen")) + secondary_address4 = root_connection.find_secondary(AtSign("@colin")) + + self.assertIsNotNone(secondary_address1) + self.assertIsNotNone(secondary_address2) + self.assertIsNotNone(secondary_address3) + self.assertIsNotNone(secondary_address4) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/test/atsecondaryconnection_test.py b/test/atsecondaryconnection_test.py new file mode 100644 index 0000000..ecb6b4e --- /dev/null +++ b/test/atsecondaryconnection_test.py @@ -0,0 +1,64 @@ +import unittest +import socket +from src.common import AtSign +from src.connections import AtRootConnection, AtSecondaryConnection, Address + + +class AtSecondaryConnectionTest(unittest.TestCase): + verbose = False + + def test_secondary_connection(self): + """ + Test secondary connection establishment. + """ + root_connection = AtRootConnection.get_instance(verbose=AtSecondaryConnectionTest.verbose) + secondary_address = root_connection.find_secondary(AtSign("@27barracuda")) + secondary_connection = AtSecondaryConnection(secondary_address, verbose=AtSecondaryConnectionTest.verbose) + secondary_connection.connect() + self.assertTrue(secondary_connection.is_connected()) + secondary_connection.disconnect() + + def test_secondary_connection_failure(self): + """ + Test secondary connection failure. + """ + try: + root_connection = AtRootConnection.get_instance(verbose=AtSecondaryConnectionTest.verbose) + secondary_address = root_connection.find_secondary(AtSign("@27barracuda")) + wrong_address = Address(secondary_address.host+"0", secondary_address.port) + secondary_connection = AtSecondaryConnection(wrong_address, verbose=AtSecondaryConnectionTest.verbose) + secondary_connection.connect() + secondary_connection.disconnect() + except Exception as e: + self.assertTrue(isinstance(e, socket.gaierror)) + + def test_multiple_secondary_connections(self): + """ + Test multiple secondary connections. + """ + root_connection = AtRootConnection.get_instance(verbose=AtSecondaryConnectionTest.verbose) + secondary_address1 = root_connection.find_secondary(AtSign("@27barracuda")) + secondary_connection1 = AtSecondaryConnection(secondary_address1, verbose=AtSecondaryConnectionTest.verbose) + secondary_connection1.connect() + secondary_address2 = root_connection.find_secondary(AtSign("@19total67")) + secondary_connection2 = AtSecondaryConnection(secondary_address2, verbose=AtSecondaryConnectionTest.verbose) + secondary_connection2.connect() + secondary_address3 = root_connection.find_secondary(AtSign("@wildgreen")) + secondary_connection3 = AtSecondaryConnection(secondary_address3, verbose=AtSecondaryConnectionTest.verbose) + secondary_connection3.connect() + secondary_address4 = root_connection.find_secondary(AtSign("@colin")) + secondary_connection4 = AtSecondaryConnection(secondary_address4, verbose=AtSecondaryConnectionTest.verbose) + secondary_connection4.connect() + + self.assertIsNotNone(secondary_connection1) + self.assertIsNotNone(secondary_connection2) + self.assertIsNotNone(secondary_connection3) + self.assertIsNotNone(secondary_connection4) + secondary_connection1.disconnect() + secondary_connection2.disconnect() + secondary_connection3.disconnect() + secondary_connection4.disconnect() + + +if __name__ == '__main__': + unittest.main() diff --git a/test/encryptionutil_test.py b/test/encryptionutil_test.py new file mode 100644 index 0000000..3f1237b --- /dev/null +++ b/test/encryptionutil_test.py @@ -0,0 +1,25 @@ +import unittest, base64 + +from src.util import EncryptionUtil + +class EncryptionUtilTest(unittest.TestCase): + + def test_aes_encryption(self): + """Test generating an AES key and encryption/decryption.""" + secret_key = EncryptionUtil.generate_aes_key_base64() + plain_text = b"AES" + encrypted_text = EncryptionUtil.aes_encrypt_from_base64(plain_text, secret_key) + decrypted_text = EncryptionUtil.aes_decrypt_from_base64(encrypted_text, secret_key) + self.assertEqual(plain_text.decode("utf-8"), decrypted_text) + + def test_rsa_encryption(self): + """Test generating RSA key pair and encryption/decryption.""" + private_key, public_key = EncryptionUtil.generate_rsa_key_pair() + plain_text = "RSA" + encrypted_text = EncryptionUtil.rsa_encrypt_to_base64(plain_text, base64.b64encode(public_key).decode("utf-8")) + decrypted_text = EncryptionUtil.rsa_decrypt_from_base64(encrypted_text, base64.b64encode(private_key).decode("utf-8")) + self.assertEqual(plain_text, decrypted_text) + +if __name__ == '__main__': + unittest.main() + \ No newline at end of file diff --git a/test/keysutil_test.py b/test/keysutil_test.py new file mode 100644 index 0000000..8ae8190 --- /dev/null +++ b/test/keysutil_test.py @@ -0,0 +1,20 @@ +import unittest + +from src.common import AtSign +from src.util import KeysUtil + +class KeysUtilTest(unittest.TestCase): + + def test_load_keys(self): + """Test atKeys Loading""" + keys = KeysUtil.load_keys(AtSign("27barracuda")) + self.assertIsNotNone(keys[KeysUtil.self_encryption_key_name]) + self.assertIsNotNone(keys[KeysUtil.pkam_private_key_name]) + self.assertIsNotNone(keys[KeysUtil.pkam_public_key_name]) + self.assertIsNotNone(keys[KeysUtil.encryption_private_key_name]) + self.assertIsNotNone(keys[KeysUtil.encryption_public_key_name]) + + +if __name__ == '__main__': + unittest.main() + \ No newline at end of file diff --git a/test/verbbuilder_test.py b/test/verbbuilder_test.py new file mode 100644 index 0000000..df091ec --- /dev/null +++ b/test/verbbuilder_test.py @@ -0,0 +1,58 @@ +import unittest +from src.util import FromVerbBuilder, PKAMVerbBuilder, ScanVerbBuilder + +class AtVerbBuilderTest(unittest.TestCase): + verbose = False + + def test_from_verb_builder(self): + """ + Test From Verb Builder. + """ + command = FromVerbBuilder().set_shared_by("@bob").build() + self.assertEqual(command, "from:@bob") + + def test_pkam_verb_builder(self): + """ + Test PKAM Verb Builder. + """ + command = PKAMVerbBuilder().set_digest("digest").build() + self.assertEqual(command, "pkam:digest") + + def test_scan_verb_builder(self): + """ + Test Scan Verb Builder. + """ + # Test not setting any parameters + command = ScanVerbBuilder().build() + self.assertEqual(command, "scan") + + # Test setting just regex + command = ScanVerbBuilder().set_regex("*.public").build() + self.assertEqual(command, "scan *.public") + + # Test setting just fromAtSign + command = ScanVerbBuilder().set_from_at_sign("@other").build() + self.assertEqual(command, "scan:@other") + + # Test setting just showHidden + command = ScanVerbBuilder().set_show_hidden(True).build() + self.assertEqual(command, "scan:showHidden:true") + + # Test setting regex & fromAtSign + command = ScanVerbBuilder().set_regex("*.public").set_from_at_sign("@other").build() + self.assertEqual(command, "scan:@other *.public") + + # Test setting regex & showHidden + command = ScanVerbBuilder().set_regex("*.public").set_show_hidden(True).build() + self.assertEqual(command, "scan:showHidden:true *.public") + + # Test setting fromAtSign & showHidden + command = ScanVerbBuilder().set_from_at_sign("@other").set_show_hidden(True).build() + self.assertEqual(command, "scan:showHidden:true:@other") + + # Test setting regex & fromAtSign & showHidden + command = ScanVerbBuilder().set_regex("*.public").set_from_at_sign("@other").set_show_hidden(True).build() + self.assertEqual(command, "scan:showHidden:true:@other *.public") + +if __name__ == '__main__': + unittest.main()