Skip to content

Commit

Permalink
Fixed control code validation in encoders, Limmer developement should…
Browse files Browse the repository at this point in the history
… continue now
  • Loading branch information
adalfarus committed Jun 8, 2024
1 parent 6183b6d commit 3fee928
Show file tree
Hide file tree
Showing 25 changed files with 488,655 additions and 35 deletions.
41,242 changes: 41,242 additions & 0 deletions aplustools/security/dicts/12-dicts/2of12.txt

Large diffs are not rendered by default.

81,883 changes: 81,883 additions & 0 deletions aplustools/security/dicts/12-dicts/2of12inf.txt

Large diffs are not rendered by default.

60,388 changes: 60,388 additions & 0 deletions aplustools/security/dicts/12-dicts/2of4brif.txt

Large diffs are not rendered by default.

4,690 changes: 4,690 additions & 0 deletions aplustools/security/dicts/12-dicts/2of5core.txt

Large diffs are not rendered by default.

21,877 changes: 21,877 additions & 0 deletions aplustools/security/dicts/12-dicts/3esl.txt

Large diffs are not rendered by default.

82,532 changes: 82,532 additions & 0 deletions aplustools/security/dicts/12-dicts/3of6all.txt

Large diffs are not rendered by default.

64,662 changes: 64,662 additions & 0 deletions aplustools/security/dicts/12-dicts/3of6game.txt

Large diffs are not rendered by default.

67,725 changes: 67,725 additions & 0 deletions aplustools/security/dicts/12-dicts/5d+2a.txt

Large diffs are not rendered by default.

32,193 changes: 32,193 additions & 0 deletions aplustools/security/dicts/12-dicts/6of12.txt

Large diffs are not rendered by default.

22,227 changes: 22,227 additions & 0 deletions aplustools/security/dicts/12-dicts/6phrase.txt

Large diffs are not rendered by default.

File renamed without changes.
7,776 changes: 7,776 additions & 0 deletions aplustools/security/dicts/eff-large-dict.txt

Large diffs are not rendered by default.

1,296 changes: 1,296 additions & 0 deletions aplustools/security/dicts/eff-short-dict.txt

Large diffs are not rendered by default.

File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
19 changes: 16 additions & 3 deletions aplustools/security/passwords.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os

from cryptography.hazmat.primitives.ciphers import Cipher as _Cipher, algorithms as _algorithms, modes as _modes
from typing import Union as _Union, Literal as _Literal, Optional as _Optional, Dict as _Dict, Tuple as _Tuple
from cryptography.hazmat.backends import default_backend as _default_backend
Expand Down Expand Up @@ -306,20 +308,26 @@ def _debug_print(self, *args, **kwargs):
print(*args, **kwargs)

def load_def_dict(self):
with _resources.path("aplustools.security", "def-dict.txt") as file_path:
with _resources.path("aplustools.security.dicts", "def-dict.txt") as file_path:
with open(file_path, "r") as f:
self.words.extend([line.strip() for line in f])

def load_google_10000_dict(self):
with _resources.path("aplustools.security", "google-10000-dict.txt") as file_path:
with _resources.path("aplustools.security.dicts", "google-10000-dict.txt") as file_path:
with open(file_path, "r") as f:
self.words.extend([line.strip() for line in f])

def load_scowl_dict(self, size: _Literal[50, 60, 70, 80, 95] = 50):
with _resources.path("aplustools.security", f"scowl-{size}-dict.txt") as file_path:
with _resources.path("aplustools.security.dicts", f"scowl-{size}-dict.txt") as file_path:
with open(file_path, "r") as f:
self.words.extend([line.strip() for line in f])

def load_12_dicts(self):
with _resources.path("aplustools.security.dicts", "12-dicts") as file_path:
for file in os.listdir(file_path):
with open(os.path.join(file_path, file), "r") as f:
self.words.extend([line.strip() for line in f])

def unload_dicts(self):
self.words = []

Expand Down Expand Up @@ -713,6 +721,11 @@ def generate_secure_password(self, predetermined: _Optional[_Literal["random", "
return result


class PasswordReGenerator:
def __init__(self):
pass


@_strict
class SecurePasswordManager:
def __init__(self, key: bytes = _secrets.token_bytes(32), debug: bool = False):
Expand Down
20 changes: 19 additions & 1 deletion aplustools/security/protocols/control_code_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ def __init__(self, comm_code: Optional[str] = None, exec_code_delimiter: str = "
def get_exec_code_delimiters(self) -> tuple:
return self._exec_code_start, self._exec_code_end

def get_comm_code(self) -> str:
return self._comm_code

@staticmethod
def _generate_random_string(length):
# Calculate how many bytes are needed to get the desired string length after Base64 encoding
Expand All @@ -48,8 +51,17 @@ def _generate_random_string(length):
# Return the required length
return random_string_base64[:length]

def _escape(self, string: str):
for escape in [self._exec_code_start, self._exec_code_end, '"']:
string = string.replace(escape, f"\\{escape}")
return string

def get_control_code(self, control_code: str, add_in: Optional[str] = None) -> str:
add_in_str = f"{self._exec_code_delimiter}{add_in}" if add_in else ""
if add_in:
add_in = ('"' + self._escape(add_in) + '"')
add_in_str = f"{self._exec_code_delimiter}{add_in}"
else:
add_in_str = ""
return (f"{self._exec_code_start}"
f"{self._comm_code}{self._exec_code_delimiter}{self._control_codes.get(control_code.lower())}"
f"{add_in_str}{self._exec_code_end}")
Expand Down Expand Up @@ -99,3 +111,9 @@ def __repr__(self):

def is_control_code(obj: Any) -> bool:
return isinstance(obj, _ControlCode)


if __name__ == "__main__":
protocol = ControlCodeProtocol()
print(protocol.get_control_code("input", '"HELLO WORLD" [Please input your message]: '))
print(protocol.get_control_code("input", f'[{protocol.get_control_code.__self__._comm_code}::IN::"EXX"]'))
103 changes: 82 additions & 21 deletions aplustools/security/protocols/secure_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,39 +207,52 @@ def _decrypt_and_validate_chunk(self, chunk):

return actual_message

def add_chunk(self, encrypted_chunk):
# Decrypt and validate the chunk
try:
decrypted_chunk = self._decrypt_and_validate_chunk(encrypted_chunk).decode('utf-8')
except Exception as e:
print(f"Error in decryption/validation: {e}")
return
self._buffer += decrypted_chunk

def _split_buffer_into_parts(self):
# Process complete and partial messages in buffer
exec_start, exec_end = self._protocol.get_exec_code_delimiters()
pattern = fr'(\{exec_start}{exec_start}^\{exec_end}{exec_end}+\{exec_end})|({exec_start}^\{exec_start}\{exec_end}{exec_end}+)'
matches = re.findall(pattern, self._buffer)
exec_start, exec_end = map(re.escape, self._protocol.get_exec_code_delimiters())
comm_code = re.escape(self._protocol.get_comm_code())
pattern = re.compile(r'(' + exec_start + comm_code + r'::.*?[^\\]' + exec_end + ')')
matches = pattern.finditer(self._buffer)

if not matches:
return

# Flatten the tuple results and filter out empty matches
parsed_parts = [part for match in matches for part in match if part]
validations = []
# Initial positions
last_end = 0
results = []

for match in matches:
start, end = match.span()
# Append everything before the current match that hasn't been matched yet
if start > last_end:
results.append(self._buffer[last_end:start])
# Append the current match
results.append(match.group(0))
last_end = end

# Append any remaining part of the text after the last match
if last_end < len(self._buffer):
results.append(self._buffer[last_end:])

return results

def _process_parsed_parts(self, parsed_parts):
# Validate and process the parsed parts
validations = []
last_end = 0

for i, expression in enumerate(parsed_parts):
try:
validation_result, add_in = self._protocol.validate_control_code(expression)
except ValueError:
continue

if validation_result == "end":
# Consider message as complete
self._complete_buffer[-1] += ''.join(parsed_parts[last_end:i])
len_to_remove = len(self._complete_buffer[-1]) + len(parsed_parts[i])
self._buffer = self._buffer[len_to_remove:]
last_end = i
last_end = i + 1 # Move to the next part
self._complete_buffer.extend([_ControlCode(validation_result, add_in)] + validations + [""])
validations.clear()
elif validation_result in ["Invalid control code", "Invalid key"]:
Expand All @@ -251,12 +264,26 @@ def add_chunk(self, encrypted_chunk):
start_index = self._buffer.find(expression)
end_index = start_index + len(expression)
self._buffer = self._buffer[:start_index] + self._buffer[end_index:]

if self._complete_buffer[-1] == "":
self._complete_buffer = self._complete_buffer[:-1]

self._complete_buffer.extend(validations)
self._complete_buffer.append("")
validations.clear()

def add_chunk(self, encrypted_chunk):
# Decrypt and validate the chunk
try:
decrypted_chunk = self._decrypt_and_validate_chunk(encrypted_chunk).decode('utf-8')
except Exception as e:
print(f"Error in decryption/validation: {e}")
return
self._buffer += decrypted_chunk

parsed_parts = self._split_buffer_into_parts()
self._process_parsed_parts(parsed_parts)

def get_complete(self):
return_lst = self._complete_buffer[:-1] if self._complete_buffer[-1] == "" else self._complete_buffer
self._complete_buffer = [""]
Expand All @@ -271,15 +298,49 @@ def get_all(self):


if __name__ == "__main__":
from aplustools.security.rand import SecretsRandomGenerator
from aplustools.package.timid import TimidTimer

timer = TimidTimer()
protocol = ControlCodeProtocol()

decoder = MessageDecoder(protocol=protocol)
encoder = MessageEncoder(protocol=protocol, public_key_bytes=decoder.get_public_key_bytes())

encoder.add_message("HEllo")
message = encoder.flush()
while True:
try:
# Generate and add random messages
random_messages = [SecretsRandomGenerator.generate_random_string(length=50) for _ in range(5)]

for msg in random_messages:
encoder.add_message(msg)

message = encoder.flush()

for chunk in message:
decoder.add_chunk(chunk)
for chunk in message:
decoder.add_chunk(chunk)

print(decoder.get_complete())
decoded_messages = decoder.get_complete()

# Verify the results
correct = True
for msg in random_messages:
if msg not in decoded_messages:
print(f"Missing message: {msg}")
correct = False

control_codes_correct = all(
isinstance(part, _ControlCode) or isinstance(part, str) for part in decoded_messages
)

if correct and control_codes_correct:
print("All messages and control codes are correct.")
else:
print("There was an error in the encoding/decoding process.")
raise KeyboardInterrupt

end = timer.tock()
print("Decoded messages:", decoded_messages)
print("Time taken:", end)
except KeyboardInterrupt:
print("Ending")
30 changes: 30 additions & 0 deletions aplustools/security/protocols/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import re

# Variable part that should be matched exactly
variable_part = "a4lXzWuMmk5lIw792npa4Jh5FMwqqZxYCDyQkANqhmlaR9EmLA"

# Construct the regex pattern using the variable part
pattern = re.compile(r'(' + re.escape("[") + re.escape(variable_part) + r'::.*?[^\\]' + re.escape("]") + ')')

text = '[a4lXzWuMmk5lIw792npa4Jh5FMwqqZxYCDyQkANqhmlaR9EmLA::NEWLINE]Meow'

matches = pattern.finditer(text)

# Initial positions
last_end = 0
results = []

for match in matches:
start, end = match.span()
# Append everything before the current match that hasn't been matched yet
if start > last_end:
results.append(text[last_end:start])
# Append the current match
results.append(match.group(0))
last_end = end

# Append any remaining part of the text after the last match
if last_end < len(text):
results.append(text[last_end:])

print(results)
10 changes: 3 additions & 7 deletions aplustools/security/rand.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def string_sample(cls, s: Union[str, list], k: int) -> str:
return ''.join(cls.sample(s, k))

@classmethod
def generate_random_string(cls, length: int, char_set: str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") -> str:
def generate_random_string(cls, length: int, char_set: str = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~') -> str:
return ''.join(cls.choice(char_set) for _ in range(length))

@classmethod
Expand Down Expand Up @@ -223,8 +223,8 @@ def secrets_random() -> float:

class WeightedRandom:
def __init__(self, generator: Literal["random", "np_random", "os", "sys_random", "secrets"] = "sys_random"):
self._generator = {"random": random, "np_random": np_random, "os": OSRandomGenerator, "sys_random": secrets.SystemRandom(),
"secrets": SecretsRandomGenerator}[generator]
self._generator = {"random": random, "np_random": np_random, "os": OSRandomGenerator,
"sys_random": secrets.SystemRandom(), "secrets": SecretsRandomGenerator}[generator]

def random(self) -> float:
return self._generator.random()
Expand Down Expand Up @@ -470,10 +470,6 @@ def piecewise(x):

return self._transform_and_scale(self._generator.random(), piecewise, lower_bound, upper_bound)

def binomial(self, n, p):
secrets.SystemRandom().bi
OSRandomGenerator.bi


if __name__ == "__main__":
for generator in ("random", "os", "sys_random", "secrets"):
Expand Down
17 changes: 14 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,30 @@ packages = [
"aplustools.tests",
"aplustools.io.gui",
"aplustools.security",
"aplustools.security.protocols"
"aplustools.security.protocols",
"aplustools.security.dicts"
]
include-package-data = true

[tool.setuptools.package-data]
"aplustools.security" = [
"aplustools.security.dicts" = [
"def-dict.txt",
"google-10000-dict.txt",
"scowl-50-dict.txt",
"scowl-60-dict.txt",
"scowl-70-dict.txt",
"scowl-80-dict.txt",
"scowl-95-dict.txt"
"scowl-95-dict.txt",
"12-dicts/2of4brif.txt",
"12-dicts/2of5core.txt",
"12-dicts/2of12.txt",
"12-dicts/2of12inf.txt",
"12-dicts/3esl.txt",
"12-dicts/3of6all.txt",
"12-dicts/3of6game.txt",
"12-dicts/5d+2a.txt",
"12-dicts/6of12.txt",
"12-dicts/6phrase.txt"
]

[project]
Expand Down

0 comments on commit 3fee928

Please sign in to comment.