Skip to content

Commit

Permalink
chore(agw): activate mypy code scanning in lte except integ_tests (ma…
Browse files Browse the repository at this point in the history
…gma#13187)

Signed-off-by: Alex Jahl <alexander.jahl@tngtech.com>
  • Loading branch information
ajahl authored and emakeev committed Aug 5, 2022
1 parent a25d26d commit b7b9d69
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 73 deletions.
2 changes: 1 addition & 1 deletion lte/gateway/python/magma/enodebd/enodebd_iptables_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def check_rules(
port: str,
enodebd_public_ip: str,
private_ip: str,
) -> None:
) -> bool:
unexpected_rules = []
expected_rules_present = False
pattern = r'DNAT\s+tcp\s+--\s+anywhere\s+{pub_ip}\s+tcp\s+dpt:{dport} to:{ip}'.format(
Expand Down
5 changes: 2 additions & 3 deletions lte/gateway/python/magma/enodebd/stats_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,10 @@ def _get_enb_label_from_request(self, request) -> str:
logger.error("Couldn't find serial for ip", ip)
return label

@asyncio.coroutine
def _post_and_put_handler(self, request) -> web.Response:
async def _post_and_put_handler(self, request) -> web.Response:
""" HTTP POST handler """
# Read request body and convert to XML tree
body = yield from request.read()
body = await request.read()

root = ElementTree.fromstring(body)
label = self._get_enb_label_from_request(request)
Expand Down
93 changes: 39 additions & 54 deletions lte/gateway/python/magma/pipelined/encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import gzip
import hashlib
import logging
from typing import Union
from typing import Optional

from Crypto.Cipher import AES, ARC4
from Crypto.Hash import HMAC
Expand All @@ -27,94 +27,79 @@ def pad(m):
return m + ' ' * (16 - len(m) % 16)


def encrypt_str(s: str, key: bytes, encryption_algorithm, mac: bytes = None):
ret: Union[str, bytes]
def encrypt_str(s: str, key: bytes, encryption_algorithm, mac: Optional[bytes] = None):
if encryption_algorithm == PipelineD.HEConfig.RC4:
cipher = ARC4.new(key)
ret = cipher.encrypt(s.encode('utf-8')).hex()
elif encryption_algorithm == PipelineD.HEConfig.AES256_CBC_HMAC_MD5:
iv = get_random_bytes(16)
key_val = key
key_mac = mac

cipher = AES.new(key_val, AES.MODE_CBC, iv)
enc = cipher.encrypt(pad(s).encode('utf-8'))
return cipher.encrypt(s.encode('utf-8')).hex()

hmac = HMAC.new(key_mac)
hmac.update(iv + enc)

ret = hmac.hexdigest() + iv.hex() + enc.hex()
elif encryption_algorithm == PipelineD.HEConfig.AES256_ECB_HMAC_MD5:
if mac is not None:
key_val = key
key_mac = mac

cipher = AES.new(key_val, AES.MODE_ECB)
enc = cipher.encrypt(pad(s).encode('utf-8'))

hmac = HMAC.new(key_mac)
hmac.update(enc)

ret = hmac.hexdigest() + enc.hex()
elif encryption_algorithm == PipelineD.HEConfig.GZIPPED_AES256_ECB_SHA1:
key_val = key
key_mac = mac

cipher = AES.new(key_val, AES.MODE_ECB)
enc = cipher.encrypt(pad(s).encode('utf-8'))

hmac = HMAC.new(key_mac)
hmac.update(enc)
ret = gzip.compress(hmac.digest() + enc)
else:
logging.error("Unsupported encryption algorithm")
return ret
if encryption_algorithm == PipelineD.HEConfig.AES256_CBC_HMAC_MD5:
iv = get_random_bytes(16)
aes_cipher = AES.new(key_val, AES.MODE_CBC, iv)
enc = aes_cipher.encrypt(pad(s).encode('utf-8'))
hmac.update(iv + enc)
return hmac.hexdigest() + iv.hex() + enc.hex()
elif encryption_algorithm == PipelineD.HEConfig.AES256_ECB_HMAC_MD5:
aes_cipher = AES.new(key_val, AES.MODE_ECB)
enc = aes_cipher.encrypt(pad(s).encode('utf-8'))
hmac.update(enc)
return hmac.hexdigest() + enc.hex()
elif encryption_algorithm == PipelineD.HEConfig.GZIPPED_AES256_ECB_SHA1:
aes_cipher = AES.new(key_val, AES.MODE_ECB)
enc = aes_cipher.encrypt(pad(s).encode('utf-8'))
hmac.update(enc)
return gzip.compress(hmac.digest() + enc)

raise ValueError("Unsupported encryption algorithm")


def decrypt_str(data, key: bytes, encryption_algorithm, mac) -> str:
ret = ""
if encryption_algorithm == PipelineD.HEConfig.RC4:
cipher = ARC4.new(key)
ret = cipher.decrypt(data).hex()
elif encryption_algorithm == PipelineD.HEConfig.AES256_CBC_HMAC_MD5:
return cipher.decrypt(data).hex()

hmac = HMAC.new(mac)

if encryption_algorithm == PipelineD.HEConfig.AES256_CBC_HMAC_MD5:
verify = data[0:32]
hmac = HMAC.new(mac)
hmac.update(codecs.decode(data[32:], 'hex_codec'))

if hmac.hexdigest() != verify:
return ""

iv = codecs.decode(data[32:64], 'hex_codec')
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = cipher.decrypt(codecs.decode(data[64:], 'hex_codec'))
ret = decrypted.decode("utf-8").strip()
aes_cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = aes_cipher.decrypt(codecs.decode(data[64:], 'hex_codec'))
return decrypted.decode("utf-8").strip()

elif encryption_algorithm == PipelineD.HEConfig.AES256_ECB_HMAC_MD5:
verify = data[0:32]
hmac = HMAC.new(mac)
hmac.update(codecs.decode(data[32:], 'hex_codec'))

if hmac.hexdigest() != verify:
return ""

cipher = AES.new(key, AES.MODE_ECB)
decrypted = cipher.decrypt(codecs.decode(data[32:], 'hex_codec'))
ret = decrypted.decode("utf-8").strip()
aes_cipher = AES.new(key, AES.MODE_ECB)
decrypted = aes_cipher.decrypt(codecs.decode(data[32:], 'hex_codec'))
return decrypted.decode("utf-8").strip()

elif encryption_algorithm == PipelineD.HEConfig.GZIPPED_AES256_ECB_SHA1:
# Convert to hex str
data = gzip.decompress(data).hex()

verify = data[0:32]
hmac = HMAC.new(mac)
hmac.update(codecs.decode(data[32:], 'hex_codec'))

if hmac.hexdigest() != verify:
return ""

cipher = AES.new(key, AES.MODE_ECB)
decrypted = cipher.decrypt(codecs.decode(data[32:], 'hex_codec'))
ret = decrypted.decode("utf-8").strip()
else:
logging.error("Unsupported encryption algorithm")
return ret
aes_cipher = AES.new(key, AES.MODE_ECB)
decrypted = aes_cipher.decrypt(codecs.decode(data[32:], 'hex_codec'))
return decrypted.decode("utf-8").strip()
raise ValueError("Unsupported encryption algorithm")


def get_hash(s, hash_function) -> bytes:
Expand Down
2 changes: 1 addition & 1 deletion lte/gateway/python/magma/pipelined/rpc_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from typing import List, OrderedDict

import grpc
from lte.protos import pipelined_pb2_grpc
from lte.protos import pipelined_pb2_grpc # type: ignore[attr-defined]
from lte.protos.apn_pb2 import AggregatedMaximumBitrate
from lte.protos.mobilityd_pb2 import IPAddress
from lte.protos.pipelined_pb2 import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import logging

from grpc import StatusCode
from lte.protos import (
from lte.protos import ( # type: ignore[attr-defined]
diam_errors_pb2,
subscriberauth_pb2,
subscriberauth_pb2_grpc,
Expand Down
6 changes: 5 additions & 1 deletion lte/gateway/python/magma/subscriberdb/rpc_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
from typing import NamedTuple

import grpc
from lte.protos import apn_pb2, subscriberdb_pb2, subscriberdb_pb2_grpc
from lte.protos import ( # type: ignore[attr-defined]
apn_pb2,
subscriberdb_pb2,
subscriberdb_pb2_grpc,
)
from magma.common.rpc_utils import print_grpc, return_void
from magma.subscriberdb.sid import SIDUtils
from magma.subscriberdb.store.base import (
Expand Down
16 changes: 8 additions & 8 deletions lte/gateway/python/scripts/generate_oai_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def _get_congestion_control_config(service_mconfig):
return True


def _get_converged_core_config(service_mconfig: object) -> bool:
def _get_converged_core_config(service_mconfig: MME) -> bool:
"""Retrieve enable5g_features config value. If it does not exist it defaults to False. It gives precedence to the service_mconfig file.
Args:
Expand All @@ -273,7 +273,7 @@ def _get_converged_core_config(service_mconfig: object) -> bool:
return False


def _get_default_slice_service_type_config(service_mconfig: object) -> str:
def _get_default_slice_service_type_config(service_mconfig: MME) -> str:
"""Retrieve default_slice_service_type config value. If it does not exist, it defaults to DEFAULT_NGAP_S_NSSAI_SST.
Args:
Expand All @@ -294,7 +294,7 @@ def _get_default_slice_service_type_config(service_mconfig: object) -> str:
return service_mconfig.amf_default_slice_service_type or DEFAULT_NGAP_S_NSSAI_SST


def _get_default_slice_differentiator_type_config(service_mconfig: object) -> str:
def _get_default_slice_differentiator_type_config(service_mconfig: MME) -> str:
"""Retrieve default_slice_differentiator config value. If it does not exist it defaults to DEFAULT_NGAP_S_NSSAI_SD.
Args:
Expand All @@ -313,7 +313,7 @@ def _get_default_slice_differentiator_type_config(service_mconfig: object) -> st
return service_mconfig.amf_default_slice_differentiator or DEFAULT_NGAP_S_NSSAI_SD


def _get_amf_name_config(service_mconfig: object) -> str:
def _get_amf_name_config(service_mconfig: MME) -> str:
"""Retrieve amf_name config value. If it does not exist, it defaults to DEFAULT_NGAP_AMF_NAME.
Args:
Expand Down Expand Up @@ -360,7 +360,7 @@ def _get_default_auth_timer_expire_msec() -> str:
)


def _get_default_dnn_config(service_mconfig: object) -> str:
def _get_default_dnn_config(service_mconfig: MME) -> str:
"""Retrieve default_dnn config value. If it does not exist, it defaults to DEFAULT_DEFAULT_DNN.
Args:
Expand All @@ -379,7 +379,7 @@ def _get_default_dnn_config(service_mconfig: object) -> str:
return DEFAULT_DEFAULT_DNN


def _get_amf_region_id(service_mconfig: object) -> str:
def _get_amf_region_id(service_mconfig: MME) -> str:
"""Retrieve amf_region_id config value. If it does not exist it defaults to DEFAULT_NGAP_AMF_REGION_ID.
Args:
Expand All @@ -398,7 +398,7 @@ def _get_amf_region_id(service_mconfig: object) -> str:
return service_mconfig.amf_region_id or DEFAULT_NGAP_AMF_REGION_ID


def _get_amf_set_id(service_mconfig: object) -> str:
def _get_amf_set_id(service_mconfig: MME) -> str:
"""Retrieve amf_set_id config value. If it does not exist it defaults to DEFAULT_NGAP_SET_ID.
Args:
Expand All @@ -417,7 +417,7 @@ def _get_amf_set_id(service_mconfig: object) -> str:
return service_mconfig.amf_set_id or DEFAULT_NGAP_SET_ID


def _get_amf_pointer(service_mconfig: object) -> str:
def _get_amf_pointer(service_mconfig: MME) -> str:
"""Retrieve amf_pointer config value. If it does not exist it defaults to DEFAULT_NGAP_AMF_POINTER.
Args:
Expand Down
4 changes: 0 additions & 4 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,4 @@ install_types = True
non_interactive = True
exclude = (?x)(
^lte/gateway/python/integ_tests/ |
^lte/gateway/python/magma/pipelined/ |
^lte/gateway/python/magma/enodebd/ |
^lte/gateway/python/magma/subscriberdb/protocols/m5g_auth_servicer.py$ |
^lte/gateway/python/scripts/generate_oai_config.py$
)

0 comments on commit b7b9d69

Please sign in to comment.