# HDF5 file from NANOPORE SEQUENCING SUMMARY

In [1]:
import json
import h5py
import numpy as np
from bs4 import BeautifulSoup
import ast

In [2]:
jsson_filename = 'L4085/report_PAG26975_20240513_1631_a46c69aa.json'
html_filename = 'L4085/report_PAG26975_20240513_1631_a46c69aa.html'

## Metadati json

In [3]:
with open(jsson_filename) as json_file:
    data = json.load(json_file)

In [4]:
args_list = data['protocol_run_info']['args']

In [5]:
# trasformo in dict la lista data['protocol_run_info']['args']

def convert_value(value):
    # Prova a convertire in int
    try:
        return int(value)
    except ValueError:
        pass
    
    # Prova a convertire in float
    try:
        return float(value)
    except ValueError:
        pass
    
    # Ritorna la stringa così com'è se non è int o float
    return value

args_dict = {}

for i in range(len(args_list)):
    item = args_list[i]
    if '=' in item:
        key, value = item.lstrip('--').split('=', 1)
        args_dict[key] = convert_value(value)
    elif item.startswith('--'):
        key = item.lstrip('--')
        value = args_list[i + 1] if i + 1 < len(args_list) and not args_list[i + 1].startswith('--') else None
        args_dict[key] = convert_value(value)

In [6]:
data['protocol_run_info']['args'] = args_dict

In [7]:
# trasformo in dict la lista data['protocol_run_info']['device']['firmware_version']
# ogni elemento della lista diventa un nuovo dizionario firmware_i

firmware_dict = {f'firmware_{i+1}': firmware for i, firmware in enumerate(data['protocol_run_info']['device']['firmware_version'])}
data['protocol_run_info']['device']['firmware_version'] = firmware_dict

In [8]:
# elimino un layer di dict in data['protocol_run_info']['meta_info']['tags'] mantenendo i tipi appropriati
transformed_dict = {}

for key, sub_dict in data['protocol_run_info']['meta_info']['tags'].items():
    if 'string_value' in sub_dict:
        transformed_dict[key] = sub_dict['string_value']
    elif 'bool_value' in sub_dict:
        transformed_dict[key] = sub_dict['bool_value']
    elif 'array_value' in sub_dict:
        transformed_dict[key] = ast.literal_eval(sub_dict['array_value'])  # Converts the string representation of a list into an actual list

data['protocol_run_info']['meta_info']['tags'] = transformed_dict

In [9]:
# trasformo in dict la lista data['acquisitions']
# ogni elemento della lista diventa un nuovo dizionario acquisition_i

acquisition_dict = {f'acquisition_{i+1}': firmware for i, firmware in enumerate(data['acquisitions'])}
data['acquisitions'] = acquisition_dict

In [10]:
# per ciascuna acquisition_i trasformo la lista 
# data['acquisitions']['acquisition_i']['acquisition_run_info']['config_summary']['channel_state_info']['groups']
# in un dizionario

for acquisition_key in data['acquisitions']:
    groups = data['acquisitions'][acquisition_key]['acquisition_run_info']['config_summary']['channel_state_info']['groups']
    transformed_groups = {d['name']: {k: v for k, v in d.items() if k != 'name'} for d in groups}
    data['acquisitions'][acquisition_key]['acquisition_run_info']['config_summary']['channel_state_info']['groups'] = transformed_groups

In [11]:
# per ciascuna acquisition_i trasformo la lista 
# data['acquisitions']['acquisition_i']['bias_voltage']
# in un dizionario

for acquisition_key in data['acquisitions']:
        bias_voltage = data['acquisitions'][acquisition_key]['bias_voltage']
        transformed_bias_voltage = {f'bias_voltage_{i+1}': bias_voltage[i] for i in range(len(bias_voltage))}
        for bv_key in transformed_bias_voltage:
            tmp = transformed_bias_voltage[bv_key]['bias_voltages']
            transformed_bias_voltage2 = {f'info_{k+1}': tmp[k] for k in range(len(tmp))}
            transformed_bias_voltage[bv_key]['bias_voltages'] = transformed_bias_voltage2
        data['acquisitions'][acquisition_key]['bias_voltage'] = transformed_bias_voltage

In [12]:
# per ciascuna acquisition_i trasformo la lista 
# data['acquisitions']['acquisition_1']['temperature']
# in un dizionario

for temperature_key in data['acquisitions']:
        temperature = data['acquisitions'][temperature_key]['temperature']
        transformed_temperature = {f'temperature_{i+1}': temperature[i] for i in range(len(temperature))}
        for child_key in transformed_temperature:
            tmp = transformed_temperature[child_key]['temperatures']
            transformed_temperature2 = {f'info_{k+1}': tmp[k] for k in range(len(tmp))}
            transformed_temperature[child_key]['temperatures'] = transformed_temperature2
        data['acquisitions'][temperature_key]['temperature'] = transformed_temperature

In [13]:
data['acquisitions'][acquisition_key]['acquisition_run_info']['config_summary']['channel_state_info']['groups'].keys()

dict_keys(['sequencing', 'pore', 'recovering', 'inactive', 'unclassified'])

In [14]:
# per ciascuna acquisition_i trasformo la lista 
# data['acquisitions']['acquisition_i']['acquisition_run_info']['config_summary']['channel_state_info']['groups'][group_key]['states']
# in un dizionario

for acquisition_key in data['acquisitions']:
    for group_key in data['acquisitions'][acquisition_key]['acquisition_run_info']['config_summary']['channel_state_info']['groups']:
        states = data['acquisitions'][acquisition_key]['acquisition_run_info']['config_summary']['channel_state_info']['groups'][group_key]['states']
        transformed_states = {d['name']: {k: v for k, v in d.items() if k != 'name'} for d in states}
        data['acquisitions'][acquisition_key]['acquisition_run_info']['config_summary']['channel_state_info']['groups'][group_key]['states'] = transformed_states


In [15]:
def dict_to_hdf5(d, hdf5_group, path=''):
    """
    Recursively converts a nested dictionary to an HDF5 file structure.

    Args:
        d (dict): The dictionary to iterate through.
        hdf5_group (h5py.Group): The current HDF5 group to write to.
        path (str): The current path in the HDF5 file (for naming purposes).
    """
    #excluded_keys = ["acquisitions",'user_messages','report_data_generation_time']
    excluded_keys = ['user_messages',
                     'report_data_generation_time',
                     'duty_time',
                     'writer_output',
                     'acquisition_output',
                     'read_length_histogram',
                     'basecall_boxplot',
                     'qscore_histograms'
                     ]
    for key, value in d.items():
        if key in excluded_keys:
            # Skip this key and its children
            continue

        # Determine if the key name already exists and adjust if necessary
        group_name = key
        count = 1
        while group_name in hdf5_group:
            count += 1
            group_name = f"{key}_{count}"

        if isinstance(value, dict):
            # Create a new group for this key
            new_group = hdf5_group.create_group(group_name)
            # Recurse into the dictionary
            dict_to_hdf5(value, new_group, path=f"{path}/{group_name}")
        else:
            # Handle non-dict values by checking their type
            if isinstance(value, str):
                # Convert a single string to variable-length strings
                dt = h5py.string_dtype(encoding='utf-8')
                hdf5_group.create_dataset(group_name, data=value, dtype=dt)
            elif isinstance(value, (list, tuple)):
                if all(isinstance(item, str) for item in value):
                    # Convert list of strings to numpy array with appropriate dtype
                    dt = h5py.string_dtype(encoding='utf-8')
                    hdf5_group.create_dataset(group_name, data=np.array(value, dtype=dt), dtype=dt)
                elif all(isinstance(item, (int, float)) for item in value):
                    # Convert list of numbers to numpy array
                    hdf5_group.create_dataset(group_name, data=np.array(value))
                else:
                    # Convert mixed-type lists to a JSON string and store it
                    json_data = json.dumps(value)
                    dt = h5py.string_dtype(encoding='utf-8')
                    hdf5_group.create_dataset(group_name, data=json_data, dtype=dt)
            elif isinstance(value, np.ndarray):
                if value.dtype == object:
                    # Convert numpy arrays with dtype=object to JSON string and store it
                    json_data = json.dumps(value.tolist())
                    dt = h5py.string_dtype(encoding='utf-8')
                    hdf5_group.create_dataset(group_name, data=json_data, dtype=dt)
                else:
                    hdf5_group.create_dataset(group_name, data=value)
            elif isinstance(value, (int, float, np.number)):
                # Directly store numerical values
                hdf5_group.create_dataset(group_name, data=value)
            else:
                # If the value is not a supported type, raise an error
                raise TypeError(f"Unsupported data type for key '{group_name}': {type(value)}")


In [16]:
# Creating the HDF5 file
with h5py.File('report.hdf5', 'w') as hdf5_file:
    dict_to_hdf5(data, hdf5_file)


## Summary html

In [None]:
# Carica e analizza l'HTML
with open(html_filename, 'r', encoding='utf-8') as file:
    content = file.read()

# Utilizza BeautifulSoup per analizzare l'HTML
soup = BeautifulSoup(content, 'html.parser')

# Trova il tag script che contiene `reportData`
script_tag = soup.find('script', text=lambda text: text and 'reportData=' in text)

if script_tag:
    # Estrai il testo del tag script
    script_content = script_tag.string

    # Trova il punto in cui inizia `reportData`
    start_index = script_content.find('reportData=') + len('reportData=')
    
    # Trova la fine dell'oggetto JSON
    end_index = script_content.find(';', start_index)
    
    # Estrai la stringa JSON
    json_data = script_content[start_index:end_index].strip()
    
    # Converti la stringa JSON in un oggetto Python
    report_data = json.loads(json_data)


In [18]:
# riapro il file in rw
f = h5py.File('report.hdf5', "r+")

In [None]:
# creo gruppo summary
f.create_group("summary")

In [None]:
for i in report_data['run_setup']:
    print(i['title'], i['value'])

In [21]:
# creo e popolo sottogruppi
f['/summary'].create_group("run_setup")
for i in report_data['run_setup']:
    f['/summary/run_setup'].create_dataset(i['title'], data=i['value'])

In [22]:
f['/summary'].create_group("run_settings")
for i in report_data['run_settings']:
    f['/summary/run_settings'].create_dataset(i['title'], data=i['value'])

In [23]:
f['/summary'].create_group("data_output_settings")
for i in report_data['data_output_settings']:
    f['/summary/data_output_settings'].create_dataset(i['title'], data=i['value'])


In [24]:
f['/summary'].create_group("software_versions")
for i in report_data['software_versions']:
    f['/summary/software_versions'].create_dataset(i['title'], data=i['value'])

In [25]:
f['/summary'].create_group("data_output")
for key, value in report_data['data_output'].items():
    f['/summary/data_output'].create_dataset(key, data=value)

In [26]:
f['/summary'].create_group("basecalling")
for key, value  in report_data['basecalling'].items():
    f['/summary/basecalling'].create_dataset(key, data=value)

In [27]:
f.close()

## Grafici

In [28]:
import matplotlib.pyplot as plt
import numpy as np

from matplotlib import colors
from matplotlib.ticker import PercentFormatter

### READ LENGTHS · OUTLIERS REMOVED

In [29]:
r_lengths = data['acquisitions']['acquisition_4']['read_length_histogram']

In [None]:
r_lengths[5].keys()

In [31]:
test_5 = r_lengths[5]

In [32]:
y = list(map(int, test_5['plot']['histogram_data'][0]['bucket_values']))

In [33]:
x = list(range(0,len(y)))

In [None]:
# Creazione dell'istogramma
plt.figure(figsize=(12, 8))
plt.bar(x, y)
plt.xlabel('Read length (k)')
plt.ylabel('Bases (Mb)')
plt.title('READ LENGTHS · OUTLIERS REMOVED')
#plt.xticks(rotation=90)  # Ruota le etichette per renderle più leggibili
plt.show()

### CUMULATIVE OUTPUT

In [35]:
test_0 = data['acquisitions']['acquisition_4']['acquisition_output'][0]['plot'][0]['snapshots'][0]['snapshots']

In [36]:
time = [int(i["seconds"]) for i in test_0]


In [37]:
def convert_seconds_to_hhmm(seconds_list):
    hhmm_list = []
    for seconds in seconds_list:
        hours = seconds // 3600
        minutes = (seconds % 3600) // 60
        hhmm_list.append(f"{int(hours):02d}:{int(minutes):02d}")
    return hhmm_list

In [38]:
new_time = convert_seconds_to_hhmm(time)

In [39]:
bases_passed = [int(i["yield_summary"]["basecalled_pass_bases"]) for i in test_0]
bases_failed = [int(i["yield_summary"]["basecalled_fail_bases"]) for i in test_0]

In [40]:
bases_passed_Gb = [i*1e-9 for i in bases_passed]
bases_failed_Gb = [i*1e-9 for i in bases_failed]

In [41]:
bases_total_Gb = [x + y for x, y in zip(bases_passed_Gb, bases_failed_Gb)]

In [None]:
# Creazione dell'istogramma
plt.figure(figsize=(30, 8))
plt.plot(new_time, bases_passed_Gb)
plt.plot(new_time, bases_failed_Gb)
plt.plot(new_time, bases_total_Gb)
plt.xlabel('Time (Hrs:Min)')
plt.ylabel('Bases (Gb)')
plt.title('CUMULATIVE OUTPUT: BASES')
#plt.xticks(rotation=90)  # Ruota le etichette per renderle più leggibili
plt.show()

### QUALITY SCORES

In [43]:
test_qs = data['acquisitions']['acquisition_4']['qscore_histograms'][0]['histogram_data']

In [None]:
test_qs[0]['filtering']

In [None]:
test_qs[1]['filtering']

In [46]:
reads_qs_passed = [int(x) for x in test_qs[0]['bucket_values']]
reads_qs_failed = [int(x) for x in test_qs[1]['bucket_values']]

In [47]:
x_qs = list(range(0,len(reads_qs_passed)))

In [None]:
# Creazione dell'istogramma
plt.figure(figsize=(12, 8))
plt.bar(x_qs, reads_qs_passed)
plt.bar(x_qs, reads_qs_failed)
plt.xlabel('Q Score')
plt.ylabel('Reads')
plt.title('QUALITY SCORE')
#plt.xticks(rotation=90)  # Ruota le etichette per renderle più leggibili
plt.show()

## Plots to HDF5

In [49]:
# riapro il file in rw
f = h5py.File('report.hdf5', "r+")

In [50]:
f.create_group("plots")
f['/plots'].attrs["NX_class"] = "NXentry"
f['/plots'].attrs["default"] = "cumulative_output"

In [None]:
# cumulative output
f['/plots'].create_group("cumulative_output")
f['/plots/cumulative_output'].attrs["NX_class"] = "NXdata"
f['/plots/cumulative_output'].attrs["signal"] = "Bases_passed"
f['/plots/cumulative_output'].attrs["auxiliary_signals"] = ['Bases_total','Bases_failed']
f['/plots/cumulative_output'].attrs["axes"] = 'Time'
f['/plots/cumulative_output'].create_dataset('title',data='Cumulative Output')

f['/plots/cumulative_output'].create_dataset('Time', data=time)
f['/plots/cumulative_output'].create_dataset('Bases_passed', data=bases_passed_Gb)
f['/plots/cumulative_output'].create_dataset('Bases_failed', data=bases_failed_Gb)
f['/plots/cumulative_output'].create_dataset('Bases_total', data=bases_total_Gb)

In [None]:
# Read lengths
f['/plots'].create_group("read_lengths")
f['/plots/read_lengths'].attrs["NX_class"] = "NXdata"
f['/plots/read_lengths'].attrs["signal"] = "Bases"
f['/plots/read_lengths'].attrs["axes"] = 'Read_length'
f['/plots/read_lengths'].create_dataset('title',data='Read Lengths')

f['/plots/read_lengths'].create_dataset('Bases', data=y)
f['/plots/read_lengths'].create_dataset('Read_length', data=x)

In [None]:
# quality score
f['/plots'].create_group("q_score")
f['/plots/q_score'].attrs["NX_class"] = "NXdata"
f['/plots/q_score'].attrs["signal"] = "Read_passed"
f['/plots/q_score'].attrs["auxiliary_signals"] = ['Read_failed']
f['/plots/q_score'].attrs["axes"] = 'Q_Score'
f['/plots/q_score'].create_dataset('title',data='Quality Score')

f['/plots/q_score'].create_dataset('Q_Score', data=x_qs)
f['/plots/q_score'].create_dataset('Read_passed', data=reads_qs_passed)
f['/plots/q_score'].create_dataset('Read_failed', data=reads_qs_failed)

In [54]:
f.close()