Skip to content

Commit

Permalink
Merge pull request #127 from BaQs/code_cleanup
Browse files Browse the repository at this point in the history
Improve error messages for device api, cleanup and simplify camera.
  • Loading branch information
RenierM26 committed Jul 4, 2023
2 parents f012768 + e978dd9 commit 56af67e
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 74 deletions.
116 changes: 58 additions & 58 deletions pyezviz/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from .constants import DeviceSwitchType, SoundMode
from .exceptions import PyEzvizError
from .utils import fetch_nested_value, string_to_list

if TYPE_CHECKING:
from .client import EzvizClient
Expand All @@ -29,29 +30,32 @@ def __init__(
)
self._last_alarm: dict[str, Any] = {}
self._switch: dict[int, bool] = {
switch["type"]: switch["enable"]
for switch in self._device.get("SWITCH", {})
switch["type"]: switch["enable"] for switch in self._device["SWITCH"]
}

def fetch_key(self, keys: list, default_value: Any = None) -> Any:
"""Fetch dictionary key."""
return fetch_nested_value(self._device, keys, default_value)

def _alarm_list(self) -> None:
"""Get last alarm info for this camera's self._serial."""
_alarmlist = self._client.get_alarminfo(self._serial)

if _alarmlist["page"].get("totalResults") > 0:
if fetch_nested_value(_alarmlist, ["page", "totalResults"], 0) > 0:
self._last_alarm = _alarmlist["alarms"][0]
return self._motion_trigger()

def _local_ip(self) -> Any:
"""Fix empty ip value for certain cameras."""
if (
self._device["WIFI"].get("address")
self.fetch_key(["WIFI", "address"])
and self._device["WIFI"]["address"] != "0.0.0.0"
):
return self._device["WIFI"]["address"]

# Seems to return none or 0.0.0.0 on some.
if (
self._device["CONNECTION"].get("localIp")
self.fetch_key(["CONNECTION", "localIp"])
and self._device["CONNECTION"]["localIp"] != "0.0.0.0"
):
return self._device["CONNECTION"]["localIp"]
Expand Down Expand Up @@ -82,7 +86,7 @@ def _motion_trigger(self) -> None:
def _is_alarm_schedules_enabled(self) -> bool:
"""Check if alarm schedules enabled."""
_alarm_schedules = [
item for item in self._device.get("TIME_PLAN", {}) if item.get("type") == 2
item for item in self._device["TIME_PLAN"] if item.get("type") == 2
]

if _alarm_schedules:
Expand All @@ -96,75 +100,71 @@ def status(self) -> dict[Any, Any]:

return {
"serial": self._serial,
"name": self._device["deviceInfos"].get("name"),
"version": self._device["deviceInfos"].get("version"),
"name": self.fetch_key(["deviceInfos", "name"]),
"version": self.fetch_key(["deviceInfos", "version"]),
"upgrade_available": bool(
self._device["UPGRADE"].get("isNeedUpgrade") == 3
self.fetch_key(["UPGRADE", "isNeedUpgrade"]) == 3
),
"status": self._device["deviceInfos"].get("status"),
"device_category": self._device["deviceInfos"].get("deviceCategory"),
"device_sub_category": self._device["deviceInfos"].get("deviceSubCategory"),
"sleep": self._switch.get(DeviceSwitchType.SLEEP.value)
or self._switch.get(DeviceSwitchType.AUTO_SLEEP.value),
"privacy": self._switch.get(DeviceSwitchType.PRIVACY.value),
"audio": self._switch.get(DeviceSwitchType.SOUND.value),
"ir_led": self._switch.get(DeviceSwitchType.INFRARED_LIGHT.value),
"state_led": self._switch.get(DeviceSwitchType.LIGHT.value),
"upgrade_percent": self._device["STATUS"].get("upgradeProcess"),
"status": self.fetch_key(["deviceInfos", "status"]),
"device_category": self.fetch_key(["deviceInfos", "deviceCategory"]),
"device_sub_category": self.fetch_key(["deviceInfos", "deviceSubCategory"]),
"upgrade_percent": self.fetch_key(["STATUS", "upgradeProcess"]),
"upgrade_in_progress": bool(
self._device["STATUS"].get("upgradeStatus") == 0
self.fetch_key(["STATUS", "upgradeStatus"]) == 0
),
"latest_firmware_info": self._device["UPGRADE"].get("upgradePackageInfo"),
"follow_move": self._switch.get(DeviceSwitchType.MOBILE_TRACKING.value),
"alarm_notify": bool(self._device["STATUS"].get("globalStatus")),
"latest_firmware_info": self.fetch_key(["UPGRADE", "upgradePackageInfo"]),
"alarm_notify": bool(self.fetch_key(["STATUS", "globalStatus"])),
"alarm_schedules_enabled": self._is_alarm_schedules_enabled(),
"alarm_sound_mod": SoundMode(
self._device["STATUS"].get("alarmSoundMode", -1)
self.fetch_key(["STATUS", "alarmSoundMode"], -1)
).name,
"encrypted": bool(self._device["STATUS"].get("isEncrypt")),
"encrypted_pwd_hash": self._device["STATUS"].get("encryptPwd"),
"encrypted": bool(self.fetch_key(["STATUS", "isEncrypt"])),
"encrypted_pwd_hash": self.fetch_key(["STATUS", "encryptPwd"]),
"local_ip": self._local_ip(),
"wan_ip": self._device["CONNECTION"].get("netIp"),
"mac_address": self._device["deviceInfos"].get("mac"),
"local_rtsp_port": self._device["CONNECTION"].get("localRtspPort", "554")
if self._device["CONNECTION"].get("localRtspPort", "554") != 0
"wan_ip": self.fetch_key(["CONNECTION", "netIp"]),
"mac_address": self.fetch_key(["deviceInfos", "mac"]),
"local_rtsp_port": self.fetch_key(["CONNECTION", "localRtspPort"], "554")
if self.fetch_key(["CONNECTION", "localRtspPort"], "554") != 0
else "554",
"supported_channels": self._device["deviceInfos"].get("channelNumber"),
"battery_level": self._device["STATUS"]
.get("optionals", {})
.get("powerRemaining"),
"battery_camera_work_mode": self._device["STATUS"]
.get("optionals", {})
.get("batteryCameraWorkMode"),
"Alarm_DetectHumanCar": self._device["STATUS"]
.get("optionals", {})
.get("Alarm_DetectHumanCar"),
"NightVision_Model": self._device["STATUS"]
.get("optionals", {})
.get("NightVision_Model"),
"PIR_Status": self._device["STATUS"].get("pirStatus"),
"Motion_Trigger": self._alarmmotiontrigger.get("alarm_trigger_active"),
"Seconds_Last_Trigger": self._alarmmotiontrigger.get("timepassed"),
"last_alarm_time": self._last_alarm.get("alarmStartTimeStr"),
"supported_channels": self.fetch_key(["deviceInfos", "channelNumber"]),
"battery_level": self.fetch_key(["STATUS", "optionals", "powerRemaining"]),
"PIR_Status": self.fetch_key(["STATUS", "pirStatus"]),
"Motion_Trigger": self._alarmmotiontrigger["alarm_trigger_active"],
"Seconds_Last_Trigger": self._alarmmotiontrigger["timepassed"],
"last_alarm_time": self._last_alarm.get(
"alarmStartTimeStr", "2000-01-01 00:00:00"
),
"last_alarm_pic": self._last_alarm.get(
"picUrl",
"https://eustatics.ezvizlife.com/ovs_mall/web/img/index/EZVIZ_logo.png?ver=3007907502",
),
"last_alarm_type_code": self._last_alarm.get("alarmType", "0000"),
"last_alarm_type_name": self._last_alarm.get("sampleName", "NoAlarm"),
"alarm_light_luminance": int(
"".join(
filter(
str.isdigit,
self._device["STATUS"]
.get("optionals", {})
.get("Alarm_Light", "0"),
)
)
), # extract the value from the string and cater for the case where the value is not set.
"wifiInfos": self._device.get("WIFI"),
"cam_timezone": self.fetch_key(["STATUS", "optionals", "timeZone"]),
"push_notify_alarm": not bool(self.fetch_key(["NODISTURB", "alarmEnable"])),
"push_notify_call": not bool(
self.fetch_key(["NODISTURB", "callingEnable"])
),
"alarm_light_luminance": self.fetch_key(
["STATUS", "optionals", "Alarm_Light", "luminance"]
),
"Alarm_DetectHumanCar": self.fetch_key(
["STATUS", "optionals", "Alarm_DetectHumanCar", "type"]
),
"diskCapacity": string_to_list(
self.fetch_key(["STATUS", "optionals", "diskCapacity"])
),
"NightVision_Model": self.fetch_key(
["STATUS", "optionals", "NightVision_Model"]
),
"batteryCameraWorkMode": self.fetch_key(
["STATUS", "optionals", "workMode"]
),
"wifiInfos": self._device["WIFI"],
"switches": self._switch,
"optionals": self.fetch_key(["STATUS", "optionals"]),
"supportExt": self._device["deviceInfos"]["supportExt"],
"ezDeviceCapability": self.fetch_key(["deviceInfos", "ezDeviceCapability"]),
}

def move(self, direction: str, speed: int = 5) -> bool:
Expand Down
6 changes: 3 additions & 3 deletions pyezviz/cas.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def xor_enc_dec(msg, xor_key=XOR_KEY):
class EzvizCAS:
"""Ezviz CAS server client."""

def __init__(self, token):
def __init__(self, token) -> None:
"""Initialize the client object."""
self._session = None
self._token = token or {
Expand All @@ -38,7 +38,7 @@ def cas_get_encryption(self, devserial):
"""Fetch encryption code from ezviz cas server."""

# Random hex 64 characters long.
rand_hex = random.randrange(10 ** 80)
rand_hex = random.randrange(10**80)
rand_hex = "%064x" % rand_hex
rand_hex = rand_hex[:64]

Expand Down Expand Up @@ -94,7 +94,7 @@ def set_camera_defence_state(self, serial, enable=1):
"""Enable alarm notifications."""

# Random hex 64 characters long.
rand_hex = random.randrange(10 ** 80)
rand_hex = random.randrange(10**80)
rand_hex = "%064x" % rand_hex
rand_hex = rand_hex[:64]

Expand Down
50 changes: 39 additions & 11 deletions pyezviz/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
InvalidURL,
PyEzvizError,
)
from .utils import convert_to_dict

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -733,7 +734,11 @@ def get_storage_status(self, serial: str, max_retries: int = 0) -> Any:

if json_output["resultCode"] != "0":
if json_output["resultCode"] == "-1":
_LOGGER.warning("Server busy, retrying %s", max_retries)
_LOGGER.warning(
"Can't get storage status from device %s, retrying %s",
serial,
max_retries,
)
return self.get_storage_status(serial, max_retries + 1)
raise PyEzvizError(
f"Could not get device storage status: Got {json_output})"
Expand Down Expand Up @@ -948,7 +953,11 @@ def reboot_camera(

if json_output["resultCode"] != "0":
if json_output["resultCode"] == "-1":
_LOGGER.warning("Server busy, retrying %s", max_retries)
_LOGGER.warning(
"Unable to reboot camera, camera %s is unreachable, retrying %s",
serial,
max_retries,
)
return self.reboot_camera(serial, delay, operation, max_retries + 1)
raise PyEzvizError(f"Could not reboot device {json_output})")

Expand Down Expand Up @@ -1112,6 +1121,7 @@ def get_device_infos(self, serial: str | None = None) -> dict[Any, Any]:
result[_serial]["deviceInfos"]["supportExt"] = json.loads(
result[_serial]["deviceInfos"]["supportExt"]
)
convert_to_dict(result[_serial]["STATUS"].get("optionals"))

if not serial:
return result
Expand Down Expand Up @@ -1214,7 +1224,11 @@ def get_cam_key(

if json_output["resultCode"] != "0":
if json_output["resultCode"] == "-1":
_LOGGER.warning("Server busy, retrying %s", max_retries)
_LOGGER.warning(
"Camera %s encryption key not found, retrying %s",
serial,
max_retries,
)
return self.get_cam_key(serial, smscode, max_retries + 1)
raise PyEzvizError(
f"Could not get camera encryption key: Got {json_output})"
Expand Down Expand Up @@ -1258,7 +1272,11 @@ def create_panoramic(self, serial: str, max_retries: int = 0) -> Any:

if json_output["resultCode"] != "0":
if json_output["resultCode"] == "-1":
_LOGGER.warning("Server busy, retrying %s", max_retries)
_LOGGER.warning(
"Create panoramic failed on device %s retrying %s",
serial,
max_retries,
)
return self.create_panoramic(serial, max_retries + 1)
raise PyEzvizError(
f"Could not send command to create panoramic photo: Got {json_output})"
Expand Down Expand Up @@ -1302,7 +1320,9 @@ def return_panoramic(self, serial: str, max_retries: int = 0) -> Any:

if json_output["resultCode"] != "0":
if json_output["resultCode"] == "-1":
_LOGGER.warning("Server busy, retrying %s", max_retries)
_LOGGER.warning(
"Camera %s busy or unreachable, retrying %s", serial, max_retries
)
return self.return_panoramic(serial, max_retries + 1)
raise PyEzvizError(f"Could retrieve panoramic photo: Got {json_output})")

Expand Down Expand Up @@ -1507,7 +1527,9 @@ def api_set_defence_schedule(

if json_output["resultCode"] != "0":
if json_output["resultCode"] == "-1":
_LOGGER.warning("Server busy, retrying %s", max_retries)
_LOGGER.warning(
"Camara %s offline or unreachable, retrying %s", serial, max_retries
)
return self.api_set_defence_schedule(
serial, schedule, enable, max_retries + 1
)
Expand Down Expand Up @@ -1562,7 +1584,7 @@ def do_not_disturb(
enable: int = 1,
channelno: int = 1,
max_retries: int = 0,
) -> bool | str:
) -> bool:
"""Set do not disturb on camera with specified serial."""
if max_retries > MAX_RETRIES:
raise PyEzvizError("Can't gather proper data. Max retries exceeded.")
Expand All @@ -1585,7 +1607,7 @@ def do_not_disturb(
if err.response.status_code == 401:
# session is wrong, need to re-log-in
self.login()
return self.do_not_disturb(serial, enable, max_retries + 1)
return self.do_not_disturb(serial, enable, channelno, max_retries + 1)

raise HTTPError from err

Expand All @@ -1596,7 +1618,7 @@ def do_not_disturb(
raise PyEzvizError("Could not decode response:" + str(err)) from err

if json_output["meta"]["code"] != 200:
raise PyEzvizError(f"Could not set defence mode: Got {json_output})")
raise PyEzvizError(f"Could not set do not disturb: Got {json_output})")

return True

Expand Down Expand Up @@ -1704,7 +1726,11 @@ def detection_sensibility(

if response_json["resultCode"] != "0":
if response_json["resultCode"] == "-1":
_LOGGER.warning("Server busy, retrying %s", max_retries)
_LOGGER.warning(
"Camera %s is offline, can't set sensitivity, retrying %s",
serial,
max_retries,
)
return self.detection_sensibility(
serial, sensibility, type_value, max_retries + 1
)
Expand Down Expand Up @@ -1752,7 +1778,9 @@ def get_detection_sensibility(

if response_json["resultCode"] != "0":
if response_json["resultCode"] == "-1":
_LOGGER.warning("Server busy, retrying %s", max_retries)
_LOGGER.warning(
"Camera %s is offline, retrying %s", serial, max_retries
)
return self.get_detection_sensibility(
serial, type_value, max_retries + 1
)
Expand Down
2 changes: 1 addition & 1 deletion pyezviz/test_cam_rtsp.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(
username=None,
password=None,
test_uri="",
):
) -> None:
"""Initialize RTSP credential test."""
self._rtsp_details = {
"bufLen": 1024,
Expand Down
Loading

0 comments on commit 56af67e

Please sign in to comment.