In [None]:
# === RDKit hierarchicial clustering ===

from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit import DataStructs
from rdkit.ML.Cluster import Butina

def ClusterFps(fps,cutoff=0.2):

    # first generate the distance matrix:
    dists = []
    nfps = len(fps)
    for i in range(1,nfps):
        sims = DataStructs.BulkTanimotoSimilarity(fps[i],fps[:i]) # calculate similarity between each molecule fingerprint
        dists.extend([1-x for x in sims]) # similarity to distance

    # now cluster the data:
    cs = Butina.ClusterData(dists,nfps,cutoff,isDistData=True)
    return cs


fps = [AllChem.GetMorganFingerprintAsBitVect(x,2,1024) for x in molecules]
clusters = ClusterFps(fps, 0.6)

clusters

In [None]:
def extract_reactions_with_numpy(file_path:str): # -> List[str]:
    # This was a nice idea but it does a lot to mess up the formatting of the file when we convert the array back a string
    # might be fixable
    """Extracts the reactions from a .rdf file and returns them as a list of strings."""

    section_seperator = "$RXN"
    rxn_blocks = [] # this will store each extracted reaction block

    # === Parse File ===
    with open(file_path, "r") as f:
        lines = f.read().splitlines() # read all lines into a list

    lines = np.array(lines) # convert lines list to a numpy array
    rxn_block_idxs = np.where(lines == section_seperator) # find all instances $RXN and save as np array
    num_reactions = np.shape(rxn_block_idxs)[1] - 1

    # === Extract Reactions ===
    for ii in range(num_reactions):
        # iterate over the found $RXN tags
        rxn_block_start = rxn_block_idxs[0][ii] # Get index of block start - each block starts on line with $RXN
        rxn_block_end = rxn_block_idxs[0][ii+1] - 1 # Get index of block end - block ends one line before the next $RXN
        
        rxn_block = lines[rxn_block_start:rxn_block_end] # retreive all lines of this reaction
        rxn_block = np.array2string(rxn_block, 
                                    separator="\n")
                                    #formatter={"str_kind": lambda x: "%s " % x}) # convert array back to a string
        
        rxn_block = rxn_block[1:-1] # remove leading and trailing "[" and "]"
        rxn_blocks.append(rxn_block)
    
    return rxn_blocks

In [None]:
# high_dpi = True
# # adjust basic settings for high DPI screen
# if high_dpi:
#     plt.rcParams['font.size'] = 14 # set the font size for all plots
#     plt.rcParams['figure.figsize'] = (10,10) # set the figure size for all plots

In [None]:
def extract_sensor_data(pccr_file_path: str,
                        sensor_name: str,
                        stop_after_first: bool = False): # -> pd.DataFrame: # adding the return type hints messes up the syntax highlightning in notebooks in VSCode

    """Extracts values and timestamps from the given pcrr file and sensor.
    pccr_file_path: Path to the pcrr file
    sensor_name: The name of the sensor to read data for
    stop_after_first: Defaults to False. Set True to stop parsing the document after the first instance of the sensor is found.
    """
    
    # === Check for correct file type ===
    pccr_file_ext = ".pcrr"
    if not check_file_extension(pccr_file_path, pccr_file_ext):
        print(f"{pccr_file_path} is not a .pccr file, aborting.")
        return None

    sensor_data_value = []
    sensor_data_timestamp = []

    # === XML Parsing ===
    doc = pulldom.parse(pcrr_file_path) # these xml files are very large, use pulldom to extract the parts we need
    for event, node in doc:
        if event == pulldom.START_ELEMENT:
            # could extend this here by iterating over multiple sensors, meaning we would only have to process the xml file once rather than n-sensor times
            if node.tagName == "sensor_data" and node.getAttribute("name") == sensor_name: # find the sensor_data tag with the name attribute of the sensor we're interested in
                doc.expandNode(node) # expand the node so we can parse it with elementree and xpath
                root = ET.fromstring(node.toxml()) # load the xml into elementree

                # find all instances of the sensor_data_record tag within this node, record the values and timestamps
                sensor_data_value += [x.text for x in root.findall("./sensor_data_record/value")] 
                sensor_data_timestamp += [x.text for x in root.findall("./sensor_data_record/timestamp")]

                if stop_after_first: # sensor data is present in mutliple nodes. If set, stop parsing after the first instance of the target tag, useful for debugging/demo purposes as it is otherwise quite slow because of the large file size
                    break

    # === Create Dataframe ===
    df = pd.DataFrame(data={sensor_name: sensor_data_value, "timestamp": sensor_data_timestamp}) #convert to a dataframe
    df = df.astype({sensor_name: float, "timestamp": "datetime64[ns]"}) # set datatypes
    df.set_index("timestamp", inplace=True) # set the timestamp as the index
    
    return df