Skip to content

Commit

Permalink
Parse XML processing instructions
Browse files Browse the repository at this point in the history
  • Loading branch information
filmackay committed Jan 22, 2023
1 parent fc772e4 commit 6e7e08e
Showing 1 changed file with 38 additions and 36 deletions.
74 changes: 38 additions & 36 deletions pyvantage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,8 @@ def parse(self):
# Timer (with @VID and <Name> )
# Keypad (with @VID and <Name> )

areas = root.findall(".//Objects//Area[@VID]")
objects = root.find("Objects")
areas = objects.findall("Object/Area[@VID]")
for area_xml in areas:
if self.project_name is None:
self.project_name = area_xml.find('Name').text
Expand All @@ -404,14 +405,13 @@ def parse(self):
self.vid_to_area[area.vid] = area
self.last_area_vid = area.vid

irzones = root.findall(".//Objects//IRZone[@VID]")
irzones = objects.findall("Object/IRZone[@VID]")
for irzone_xml in irzones:
area = self._parse_irzone(irzone_xml)
_LOGGER.debug("IRZone = %s", area)
self.vid_to_area[area.vid] = area

loads = root.findall(".//Objects//Load[@VID]")
loads = loads + root.findall(".//Objects//Vantage.DDGColorLoad[@VID]")
loads = objects.findall("Object/Load[@VID]") + objects.findall("Object/Vantage.DDGColorLoad[@VID]")
other_loads = []
color_loads = []
open_loads = []
Expand All @@ -435,18 +435,18 @@ def parse(self):
_LOGGER.debug("Looking for close_name = %s", close_name)
_LOGGER.debug("Looking for stop_name = %s", stop_name)
_LOGGER.debug("Looking for isopen_name = %s", isopen_name)
close_load_xml = root.findall(
".//Objects//Load[Name='" + close_name + "']")
close_load_xml = objects.findall(
"Object/Load[Name='" + close_name + "']")
if len(close_load_xml) == 1:
close_load_xml = close_load_xml[0]
isopen_contact_xml = root.findall(
".//Objects//DryContact[Name='" + isopen_name + "']")
isopen_contact_xml = objects.findall(
"Object/DryContact[Name='" + isopen_name + "']")
if len(isopen_contact_xml) == 1:
isopen_contact_xml = isopen_contact_xml[0]
else:
isopen_contact_xml = None
stop_load_xml = root.findall(
".//Objects//Load[Name='" + stop_name + "']")
stop_load_xml = objects.findall(
"Object/Load[Name='" + stop_name + "']")
if len(stop_load_xml) == 1:
stop_load_xml = stop_load_xml[0]
else:
Expand All @@ -473,7 +473,7 @@ def parse(self):
_LOGGER.debug("Output = %s", output)
self.vid_to_area[output.area].add_output(output)

load_groups = root.findall(".//Objects//LoadGroup[@VID]")
load_groups = objects.findall("Object/LoadGroup[@VID]")
for lg_xml in load_groups:
lgroup = self._parse_load_group(lg_xml)
if lgroup is None:
Expand All @@ -484,12 +484,12 @@ def parse(self):
_LOGGER.debug("load group = %s", lgroup)
self.vid_to_area[lgroup.area].add_output(lgroup)

keypads = root.findall(".//Objects//Keypad[@VID]")
keypads = keypads + root.findall(".//Objects//DualRelayStation[@VID]")
keypads = keypads + root.findall(".//Objects//IRZone[@VID]")
keypads = keypads + root.findall(".//Objects//Dimmer[@VID]")
keypads = keypads + root.findall(".//Objects//EqCtrl[@VID]")
keypads = keypads + root.findall(".//Objects//EqUX[@VID]")
keypads = objects.findall("Object/Keypad[@VID]")
keypads = keypads + objects.findall("Object/DualRelayStation[@VID]")
keypads = keypads + objects.findall("Object/IRZone[@VID]")
keypads = keypads + objects.findall("Object/Dimmer[@VID]")
keypads = keypads + objects.findall("Object/EqCtrl[@VID]")
keypads = keypads + objects.findall("Object/EqUX[@VID]")
for kp_xml in keypads:
keypad = self._parse_keypad(kp_xml)
_LOGGER.debug("keypad = %s", keypad)
Expand All @@ -498,7 +498,7 @@ def parse(self):
self.vid_to_area[keypad.area].add_keypad(keypad)
self.keypads.append(keypad)

buttons = root.findall(".//Objects//Button[@VID]")
buttons = objects.findall("Object/Button[@VID]")
for button_xml in buttons:
b = self._parse_button(button_xml)
if not b:
Expand All @@ -509,7 +509,7 @@ def parse(self):
self.vid_to_area[b.area].add_button(b)
self.buttons.append(b)

drycontacts = root.findall(".//Objects//DryContact[@VID]")
drycontacts = objects.findall("Object/DryContact[@VID]")
for dc_xml in drycontacts:
dc = self._parse_drycontact(dc_xml)
if not dc:
Expand All @@ -518,31 +518,31 @@ def parse(self):
self.vid_to_button[dc.vid] = dc
self.buttons.append(dc)

variables = root.findall(".//Objects//GMem[@VID]")
variables = objects.findall("Object/GMem[@VID]")
for v in variables:
var = self._parse_variable(v)
_LOGGER.debug("var = %s", var)
self.vid_to_variable[var.vid] = var
# N.B. variables have categories, not areas, so no add to area
self.variables.append(var)

omnisensors = root.findall(".//Objects//OmniSensor[@VID]")
omnisensors = objects.findall("Object/OmniSensor[@VID]")
for s in omnisensors:
sensor = self._parse_omnisensor(s)
_LOGGER.debug("sensor = %s", sensor)
self.vid_to_sensor[sensor.vid] = sensor
# N.B. variables have categories, not areas, so no add to area
self.sensors.append(sensor)

lightsensors = root.findall(".//Objects//LightSensor[@VID]")
lightsensors = objects.findall("Object/LightSensor[@VID]")
for s in lightsensors:
sensor = self._parse_lightsensor(s)
_LOGGER.debug("sensor = %s", sensor)
self.vid_to_sensor[sensor.vid] = sensor
# N.B. variables have categories, not areas, so no add to area
self.sensors.append(sensor)

tasks = root.findall(".//Objects//Task[@VID]")
tasks = objects.findall("Object/Task[@VID]")
for t in tasks:
task = self._parse_task(t)
_LOGGER.debug("task = %s", task)
Expand All @@ -554,21 +554,21 @@ def parse(self):
# Lots of different shade types, one xpath for each kind of shade
# MechoShade driver shades
shades = \
root.findall(".//Objects//MechoShade.IQ2_Shade_Node_CHILD[@VID]")
objects.findall("Object/MechoShade.IQ2_Shade_Node_CHILD[@VID]")
shades = (shades +
root.findall(".//Objects//MechoShade.IQ2_Group_CHILD[@VID]"))
objects.findall("Object/MechoShade.IQ2_Group_CHILD[@VID]"))
# Native QIS QMotion shades
shades = shades + root.findall(".//Objects//QISBlind[@VID]")
shades = shades + root.findall(".//Objects//BlindGroup[@VID]")
shades = shades + objects.findall("Object/QISBlind[@VID]")
shades = shades + objects.findall("Object/BlindGroup[@VID]")
# Non-native QIS Driver QMotion shades (the old way)
shades = (shades +
root.findall(".//Objects//QMotion.QIS_Channel_CHILD[@VID]"))
objects.findall("Object/QMotion.QIS_Channel_CHILD[@VID]"))
# Somfy radio-controlled
shades = (shades +
root.findall(".//Objects//Somfy.URTSI_2_Shade_CHILD[@VID]"))
objects.findall("Object/Somfy.URTSI_2_Shade_CHILD[@VID]"))
# Somfy RS-485 SDN wired shades
shades = (shades +
root.findall(".//Objects//Somfy.RS-485_Shade_CHILD[@VID]"))
objects.findall("Object/Somfy.RS-485_Shade_CHILD[@VID]"))

for shade_xml in shades:
shade = self._parse_shade(shade_xml)
Expand Down Expand Up @@ -1369,12 +1369,14 @@ def load_xml_db(self, disable_cache=False, config_dir="./"):
_LOGGER.debug("done reading, size = %s", len(response))

response = response.decode('ascii')
response = response[response.find("</Result>\n")+10:]
response = response.replace('<?File Encode="Base64" /', '')
response = response.replace('?>', '')
response = response[:response.find('</return>')]
dbytes = base64.b64decode(response)
xml_db = dbytes.decode('utf-8')

# read XML preserving processing instructions
from xml.etree import ElementTree as ET
response = ET.fromstring(response, parser=ET.XMLParser(target=ET.TreeBuilder(insert_pis=True)))
response = response.find("GetFile/return")
response = next(response.iter(tag=ET.ProcessingInstruction))
response = response.text.split()[2][1:]
xml_db = base64.b64decode(response).decode('utf-8')
if len(xml_db) < 1000:
_LOGGER.warning("Downloaded short .dc file; "
" check saved cache file on disk")
Expand Down

0 comments on commit 6e7e08e

Please sign in to comment.