Skip to content

Commit

Permalink
Migrate to IOPathMatch, code cleanup/fixes
Browse files Browse the repository at this point in the history
- Migrate to IOPathMatch via IOService pathing over IONameMatch (removes PXSX rename need, and may help with some AMD setups)
- Add safe-guards for no acpi-path - should fix "'NoneType' object has no attribute 'split'" in #17 and #19
- Check for no valid controllers before discovery
- Add option to generate config.plist renames for conflicting controller names
- Removed unused method and cleaned some code
  • Loading branch information
corpnewt authored Dec 24, 2020
1 parent 7c097c0 commit f3be689
Showing 1 changed file with 134 additions and 38 deletions.
172 changes: 134 additions & 38 deletions USBMap.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(self):
self.map_list = self.get_map_list()
self.port_map_list = self.get_port_map_list()
self.discover_wait = 5
self.default_names = ("XHC1","EHC1","EHC2","PXSX")
self.default_names = ("XHC1","EHC1","EHC2")
self.cs = u"\u001b[32;1m"
self.ce = u"\u001b[0m"
self.bs = u"\u001b[36;1m"
Expand All @@ -47,6 +47,8 @@ def __init__(self):
self.rsdt_path = os.path.join(self.output,"SSDT-RHUB-Reset.dsl")
self.kext_path = os.path.join(self.output,"USBMap.kext")
self.info_path = os.path.join(self.kext_path,"Contents","Info.plist")
self.oc_patches = os.path.join(self.output,"patches_OC.plist")
self.clover_patches = os.path.join(self.output,"patches_Clover.plist")
self.merged_list = OrderedDict()
# Load the USB list as needed
if os.path.exists(self.usb_list):
Expand All @@ -64,7 +66,7 @@ def __init__(self):
def get_illegal_names(self):
if not self.smbios or not os.path.exists(self.plugin_path):
return [x for x in self.default_names] # No SMBIOS, fall back on defaults
illegal_names = ["PXSX"] # Always start with the default PXSX name
illegal_names = []
for plugin in os.listdir(self.plugin_path):
plug_path = os.path.join(self.plugin_path,plugin)
info_path = os.path.join(plug_path,"Contents","Info.plist")
Expand Down Expand Up @@ -111,6 +113,9 @@ def get_matching_controller(self,controller_name,from_cont=None,into_cont=None):
# If it has a location - we want to match that
cont_adj = next((x for x in into_cont if from_cont[controller_name]["locationid"] == into_cont[x].get("locationid",None)),None)
if cont_adj: return cont_adj
# Let's try by matching the IOService path?
cont_adj = next((x for x in into_cont if from_cont[controller_name].get("ioservice_path",None) == into_cont[x].get("ioservice_path","Unknown")),None)
if cont_adj: return cont_adj
# Let's try by matching the ACPI path?
cont_adj = next((x for x in into_cont if from_cont[controller_name].get("acpi_path",None) == into_cont[x].get("acpi_path","Unknown")),None)
if cont_adj: return cont_adj
Expand Down Expand Up @@ -297,17 +302,31 @@ def get_populated_count_for_controller(self,controller):
port_dict = self.get_ports_and_devices_for_controller(controller)
return len([x for x in port_dict if len(port_dict[x])])

def get_ioservice_path(self,check_line):
line_id = "id "+check_line.split(", registered")[0].split("id ")[-1]
for index,line in enumerate(self.ioreg):
if not line_id in line: continue
# Walk backwards, keeping track of any stepped up path elements
indent = -1
path = []
for l in self.ioreg[index::-1]:
if not " <class" in l: continue # Only consider classes
l_check = l.replace("|"," ").replace("+-o","")
i_check = len(l_check) - len(l_check.lstrip())
if i_check < indent or indent == -1: # Got a step up - add it
entry = l_check.lstrip().split(" <class")[0]
if entry == self.smbios: break # We got our SMBIOS - bail
path.append(entry)
indent = i_check
return "IOService:/"+"/".join(path[::-1])
return None

def populate_controllers(self):
assert self.ioreg != None # Error if we have no ioreg to iterate
self.smbios = None
controllers = OrderedDict()
if os.path.exists("ioreg.txt"):
with open("ioreg.txt","rb") as f:
ioreg = f.read().decode("utf-8",errors="ignore")
else:
ioreg = self.r.run({"args":["ioreg","-c","IOUSBDevice","-w0"]})[0]
# Trim the list down to only what we want
valid = [x.replace("|"," ").replace("+-o ","").split(", registered")[0] for x in ioreg.split("\n") if any((y.search(x) for y in self.map_list))]
valid = [x.replace("|"," ").replace("+-o ","").split(", registered")[0] for x in self.ioreg if any((y.search(x) for y in self.map_list))]
for index,wline in enumerate(valid):
# Walk until we find a port, then walk backward to figure out the controller
if not any((x.search(wline) for x in self.port_map_list)): continue
Expand Down Expand Up @@ -401,6 +420,11 @@ def populate_controllers(self):
controllers[current_obj]["acpi_address"] = acpi_addr if acpi_addr else "Zero"
# Reset the temp vars
parent_name = acpi_addr = acpi_path = None
# Let's get the IOService path for each controller
for controller in controllers:
path = self.get_ioservice_path(controllers[controller]["line"])
if not path: continue
controllers[controller]["ioservice_path"] = path
return controllers

def build_kext(self):
Expand Down Expand Up @@ -494,13 +518,18 @@ def build_info_plist(self,skip_empty=True):
},
"model": self.smbios
}
if "ioservice_path" in self.merged_list[x]:
# We have a more elegant way to match than the ham-fisted IONameMatch
new_entry.pop("IONameMatch",None)
new_entry["IOPathMatch"] = self.merged_list[x]["ioservice_path"]
if "locationid" in self.merged_list[x]:
# We have a hub - save the loc id and up the IOProbeScore - use the most recent though - by the name's address
try: new_entry["locationID"] = self.hex_dec(x.split("-")[-1])
except: new_entry["locationID"] = self.merged_list[x]["locationid"] # Fall back on the original locationid
new_entry["IOProbeScore"] = 5000
# No need to name match as we're using locationID instead
# No need to name/path match as we're using locationID instead
new_entry.pop("IONameMatch",None)
new_entry.pop("IOPathMatch",None)
if "XHCI" in self.merged_list[x]["type"]:
# Only add the kUSBMuxEnabled property to XHCI controllers
new_entry["IOProviderMergeProperties"]["kUSBMuxEnabled"] = True
Expand Down Expand Up @@ -560,26 +589,6 @@ def hex_dec(self, value):
except: return None
return dec

def get_os_from_build(self, build_number):
# Returns the best-guess OS version for the build number
alpha = "abcdefghijklmnopqrstuvwxyz"
os_version = "Unknown"
major = minor = ""
try:
# Formula looks like this: AAB; AA - 4 = 10.## version
# B index in "ABCDEFGHIJKLMNOPQRSTUVXYZ" = 10.##.## version
split = re.findall(r"[^\W\d_]+|\d+", build_number)
major = int(split[0])-4
minor = alpha.index(split[1].lower())
# Account for 11.0 at 10.16+
osx = 11 if major >= 16 else 10
major = major-16 if major >= 16 else major
os_version = "{}.{}.{}".format(osx, major, minor)
# Python also has string comparisons so "10.15.0" > "10.14.0" would return True
except:
pass
return os_version

def discover_ports(self):
# Iterates every 5 seconds showing any newly populated ports
self.check_controllers()
Expand Down Expand Up @@ -900,14 +909,83 @@ def edit_plist(self):
continue

def get_safe_acpi_path(self, path):
return ".".join([x.split("@")[0] for x in path.split("/") if len(x) and not ":" in x])
return None if path == None else ".".join([x.split("@")[0] for x in path.split("/") if len(x) and not ":" in x])

def get_numbered_name(self, base_name, number, use_hex=True):
if use_hex: number = hex(number).replace("0x","").upper()
else: number = str(number)
return base_name[:-1*len(number)]+number

def generate_renames(self, cont_list):
used_names = [x for x in self.illegal_names]
used_names.extend([self.connected_controllers[x]["parent_name"].upper() for x in self.connected_controllers if self.connected_controllers[x].get("parent_name",None)])
self.u.head("Rename Conflicting Controllers")
print("")
oc_patches = {"ACPI":{"Patch":[]}}
clover_patches = {"ACPI":{"DSDT":{"Patches":[]}}}
zero = plist.wrap_data(binascii.unhexlify("00000000"))
for cont in cont_list:
con_type = "XHCI"
print("Checking {}...".format(cont))
c_type = self.connected_controllers[cont]["type"]
if "XHCI" in c_type:
print(" - XHCI device")
elif "EHCI" in c_type:
print(" - EHCI device")
con_type = "EH01"
else: print(" - Unknown type - using XHCI")
print(" - Gathering unique name...")
# Now we have the base - let's increment!
starting_number = 1 if con_type == "EH01" else 2
while True:
name = self.get_numbered_name(con_type,starting_number)
if not name in used_names:
used_names.append(name)
break
starting_number += 1
# We should have a unique name here, add the info
print(" --> Got {}".format(name))
cname = cont.split("@")[0].ljust(4,"_")
find = plist.wrap_data(cname.encode("utf-8"))
repl = plist.wrap_data(name.encode("utf-8"))
comm = "Rename {} to {}".format(cname,name)
c_patch = {
"Comment":comm,
"Disabled":False,
"Find":find,
"Replace":repl
}
oc_patch = {
"Comment":comm,
"Count":0,
"Enabled":True,
"Find":find,
"Limit":0,
"Mask":plist.wrap_data(b""),
"OemTableId": zero,
"Replace":repl,
"ReplaceMask":plist.wrap_data(b""),
"Skip":0,
"TableLength":0,
"TableSignature":zero
}
clover_patches["ACPI"]["DSDT"]["Patches"].append(c_patch)
oc_patches["ACPI"]["Patch"].append(oc_patch)
print("Saving patches_OC.plist...")
os.chdir(os.path.dirname(os.path.realpath(__file__)))
if not os.path.exists(self.output): os.mkdir(self.output)
with open(self.oc_patches,"wb") as f:
plist.dump(oc_patches,f)
print("Saving patches_Clover.plist...")
with open(self.clover_patches,"wb") as f:
plist.dump(clover_patches,f)
self.re.reveal(self.oc_patches,True)
print("")
print("Done.")
print("")
self.u.grab("Press [enter] to return to the menu...")

def generate_acpi_renames(self, cont_list):
used_names = [x for x in self.illegal_names]
used_names.extend([self.connected_controllers[x]["parent_name"].upper() for x in self.connected_controllers if self.connected_controllers[x].get("parent_name",None)])
self.u.head("Rename Devices")
Expand All @@ -929,6 +1007,9 @@ def generate_renames(self, cont_list):
print("Checking {}...".format(cont))
c_type = self.connected_controllers[cont]["type"]
acpi_path = self.get_safe_acpi_path(self.connected_controllers[cont]["acpi_path"])
if not acpi_path:
print(" - ACPI path not found - skipping.")
continue
acpi_parent = ".".join(acpi_path.split(".")[:-1])
acpi_addr = self.connected_controllers[cont]["acpi_address"]
if "XHCI" in c_type:
Expand All @@ -953,6 +1034,10 @@ def generate_renames(self, cont_list):
print(" --> Got {}".format(name))
parents.append(acpi_parent)
devices.append((acpi_path,name,acpi_addr,acpi_parent))
if not len(devices):
print("No valid devices - nothing to build.")
print("")
return self.u.grab("Press [enter] to return to the menu...")
print("Building SSDT-USB-Reset.dsl...")
# Add the parents as needed
for parent in sorted(list(set(parents))):
Expand Down Expand Up @@ -1084,20 +1169,21 @@ def main(self):
names = [self.connected_controllers[x]["parent_name"] for x in c_check]
for x in c_check:
if "locationid" in self.connected_controllers[x]: continue # don't show hubs in this list
acpi = self.get_safe_acpi_path(self.connected_controllers[x].get("acpi_path","Unknown ACPI Path"))
acpi = self.get_safe_acpi_path(self.connected_controllers[x].get("acpi_path",None))
name = self.connected_controllers[x]["parent_name"]
par = self.connected_controllers[x]["parent"]
if name in self.illegal_names or names.count(name) > 1:
if name in self.illegal_names:
needs_rename.append(x)
self.controllers.pop(x,None) # Remove it from the controllers to map
print(" - {}{}{} @ {} ({}{}{})".format(self.rs,par.rjust(pad),self.ce,acpi,self.rs,"Needs Rename" if name in self.illegal_names else "Not Unique",self.ce))
else: print(" - {}{}{} @ {}".format(self.cs,par.rjust(pad),self.ce,acpi))
print(" - {}{}{} @ {} ({}{}{})".format(self.rs,par.rjust(pad),self.ce,acpi if acpi else "Unknown ACPI Path",self.rs,"Needs Rename" if name in self.illegal_names else "Not Unique",self.ce))
else: print(" - {}{}{} @ {}".format(self.cs,par.rjust(pad),self.ce,acpi if acpi else "Unknown ACPI Path"))
if not "XHCI" in self.connected_controllers[x]["type"]: continue # Only check XHCI for RHUB paths
# Get the RHUB name - mirrors the controller name if actually "RHUB"
rhub_name = "RHUB" if x.split("@")[0].upper() == self.connected_controllers[x]["parent_name"] else x.split("@")[0].upper()
rhub_path = ".".join([acpi,rhub_name])
rhub_paths.append(rhub_path)
print(" \\-> {}RHUB{} @ {}".format(self.bs,self.ce,rhub_path))
if acpi:
rhub_name = "RHUB" if x.split("@")[0].upper() == self.connected_controllers[x]["parent_name"] else x.split("@")[0].upper()
rhub_path = ".".join([acpi,rhub_name])
rhub_paths.append(rhub_path)
print(" \\-> {}RHUB{} @ {}".format(self.bs,self.ce,rhub_path))
print("")
print("{}D. Discover Ports{}{}".format(
self.rs if needs_rename else "",
Expand All @@ -1112,6 +1198,7 @@ def main(self):
print("R. Reset All Detected Ports")
if needs_rename:
print("A. Generate ACPI Renames For Conflicting Controllers")
print("L. Generate Plist Renames For Conflicting Controllers")
if rhub_paths:
print("H. Generate ACPI To Reset RHUBs ({}May Conflict With Existing SSDT-USB-Reset.aml!{})".format(self.rs,self.ce))
print("")
Expand All @@ -1133,10 +1220,19 @@ def main(self):
print("Failed to remove USB.plist! {}".format(e))
return
elif menu.lower() == "d":
if not len(self.controllers):
self.u.head("No Valid Controllers")
print("")
print("No valid controllers found for port discovery!")
print("You may need plist/ACPI renames in order to discover.")
print("")
return self.u.grab("Press [enter] to return...")
self.discover_ports()
elif menu.lower() == "p" and self.merged_list:
self.edit_plist()
elif menu.lower() == "a" and needs_rename:
self.generate_acpi_renames(needs_rename)
elif menu.lower() == "l" and needs_rename:
self.generate_renames(needs_rename)
elif menu.lower() == "h" and rhub_paths:
self.reset_rhubs(rhub_paths)
Expand Down

0 comments on commit f3be689

Please sign in to comment.