In [None]:
import os

def find_project_root(start_path=None, marker="TecMul_Intercom"):
    """
    Dynamically find the project root by looking for a known directory or file.
    
    :param start_path: Starting path for the search (defaults to current working directory).
    :param marker: A directory or file that indicates the project root (e.g., 'src', '.git').
    :return: The absolute path to the project root.
    """
    if start_path is None:
        start_path = os.getcwd()
    
    current_path = start_path
    while True:
        if os.path.isdir(os.path.join(current_path, marker)) or os.path.exists(os.path.join(current_path, marker)):
            return current_path
        parent_path = os.path.abspath(os.path.join(current_path, os.pardir))
        if current_path == parent_path:  # Reached the root of the filesystem
            raise RuntimeError(f"Could not find project root containing '{marker}'.")
        current_path = parent_path
        
project_root = find_project_root(marker="src")
os.chdir(project_root)  # Change the working directory to the project root
print(f"Project root set to: {project_root}")

In [None]:
import math
import subprocess
import logging
import os
import sys
import numpy as np
import matplotlib.pyplot as plt

class Processing:
    def __init__(self):
        self.quant = ["1", "2", "4", "8", "16", "32", "64", "128", "256", "512", "1024", "2048", "4096", "8182"]
        self.wavelets = ["db1", "db2", "db3", "db4", "db5", "sym4", "sym5", "bior3.3", "bior3.5"]
        self.levels = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"]

    def call_temporal_script_no_overlapping_16(self, quant, wavelet, level):
        """
        Calls the 'temporal_no_overlapped_DWT_coding_16.py' script using subprocess,
        and saves the output to a specified file.

        :param quant: Quantization value (string).
        :param wavelet: Wavelet type (string).
        :param level: Level value (string).
        :param output_file: Path to the output text file.
        """
        starting_path= os.getcwd()
        script_path=os.path.join(starting_path, "src", "temporal_no_overlapped_DWT_coding_16.py")
        executorpy=sys.executable
        filesong=os.path.join(starting_path, "data", "AviadorDro_LaZonaFantasma.oga")
        try:
            # Command to execute the script with parameters
            cmd = [
                executorpy, 
                script_path,
                "-q", quant,
                "-w", wavelet,
                "-e", level,
                "--show_stats",
                "-f", filesong,
                "-t","20"
            ]

            # Logging the command being executed for debugging
            logging.info(f"Executing command: {' '.join(cmd)}")

            # Run the command
            result =subprocess.run(cmd, capture_output=True, text=True)
            if result.returncode == 0:
            # Process the captured standard output
                logging.info(f"Script output:\n{result.stdout}")
                return result.stdout
            else:
            # Log and process errors if the script failed
                logging.error(f"Script failed with error:\n{result.stderr}")
                return None
        except Exception as e:
            logging.error(f"An error occurred: {e}")

    def call_script_MST_16(self, quant):
        """
        Calls the 'stereo_MST_coding_16.py' script using subprocess,
        and saves the output to a specified file.

        :param quant: Quantization value (string).
        :param wavelet: Wavelet type (string).
        :param level: Level value (string).
        :param output_file: Path to the output text file.
        """
        starting_path= os.getcwd()
        script_path=os.path.join(starting_path, "src", "stereo_MST_coding_16.py")
        executorpy=sys.executable
        filesong=os.path.join(starting_path, "data", "AviadorDro_LaZonaFantasma.oga")
        try:
            # Command to execute the script with parameters
            cmd = [
                executorpy, 
                script_path,
                "-q", quant,
                "--show_stats",
                "-f", filesong,
                "-t","20"
            ]

            # Logging the command being executed for debugging
            logging.info(f"Executing command: {' '.join(cmd)}")

            # Run the command
            result =subprocess.run(cmd, capture_output=True, text=True)
            if result.returncode == 0:
            # Process the captured standard output
                logging.info(f"Script output:\n{result.stdout}")
                return result.stdout
            else:
            # Log and process errors if the script failed
                logging.error(f"Script failed with error:\n{result.stderr}")
                return None
        except Exception as e:
            logging.error(f"An error occurred: {e}")
    
    def call_script_MST_32(self, quant):
        """
        Calls the 'stereo_MST_coding_32.py' script using subprocess,
        and saves the output to a specified file.

        :param quant: Quantization value (string).
        :param wavelet: Wavelet type (string).
        :param level: Level value (string).
        :param output_file: Path to the output text file.
        """
        starting_path= os.getcwd()
        script_path=os.path.join(starting_path, "src", "stereo_MST_coding_32.py")
        executorpy=sys.executable
        filesong=os.path.join(starting_path, "data", "AviadorDro_LaZonaFantasma.oga")
        try:
            # Command to execute the script with parameters
            cmd = [
                executorpy, 
                script_path,
                "-q", quant,
                "--show_stats",
                "-f", filesong,
                "-t","20"
            ]

            # Logging the command being executed for debugging
            logging.info(f"Executing command: {' '.join(cmd)}")

            # Run the command
            result =subprocess.run(cmd, capture_output=True, text=True)
            if result.returncode == 0:
            # Process the captured standard output
                logging.info(f"Script output:\n{result.stdout}")
                return result.stdout
            else:
            # Log and process errors if the script failed
                logging.error(f"Script failed with error:\n{result.stderr}")
                return None
        except Exception as e:
            logging.error(f"An error occurred: {e}")
    
    def call_temporal_script_no_overlapping_32(self, quant, wavelet, level):
        """
        Calls the 'temporal_no_overlapped_DWT_coding_32.py' script using subprocess,
        and saves the output to a specified file.

        :param quant: Quantization value (string).
        :param wavelet: Wavelet type (string).
        :param level: Level value (string).
        :param output_file: Path to the output text file.
        """
        starting_path= os.getcwd()
        script_path=os.path.join(starting_path, "src", "temporal_no_overlapped_DWT_coding_32.py")
        executorpy=sys.executable
        filesong=os.path.join(starting_path, "data", "AviadorDro_LaZonaFantasma.oga")
        try:
            # Command to execute the script with parameters
            cmd = [
                executorpy, 
                script_path,
                "-q", quant,
                "-w", wavelet,
                "-e", level,
                "--show_stats",
                "-f", filesong,
                "-t","20"
            ]

            # Logging the command being executed for debugging
            logging.info(f"Executing command: {' '.join(cmd)}")

            # Run the command
            result =subprocess.run(cmd, capture_output=True, text=True)
            if result.returncode == 0:
            # Process the captured standard output
                logging.info(f"Script output:\n{result.stdout}")
                return result.stdout
            else:
            # Log and process errors if the script failed
                logging.error(f"Script failed with error:\n{result.stderr}")
                return None

        except Exception as e:
            logging.error(f"An error occurred: {e}")

    def call_temporal_script_overlapping(self, quant, wavelet, level):
        """
        Calls the 'temporal_overlapped_DWT_coding.py' script using subprocess,
        and saves the output to a specified file.

        :param quant: Quantization value (string).
        :param wavelet: Wavelet type (string).
        :param level: Level value (string).
        :param output_file: Path to the output text file.
        """
        starting_path= os.getcwd()
        script_path=os.path.join(starting_path, "src", "temporal_overlapped_DWT_coding.py")
        executorpy=executorpy=sys.executable
        filesong=os.path.join(starting_path, "data", "AviadorDro_LaZonaFantasma.oga")
        try:
            # Command to execute the script with parameters
            cmd = [
                executorpy, 
                script_path,
                "-q", quant,
                "-w", wavelet,
                "-e", level,
                "--show_stats",
                "-f", filesong,
                "-t","20"
            ]

            # Logging the command being executed for debugging
            logging.info(f"Executing command: {' '.join(cmd)}")

            # Run the command
            result =subprocess.run(cmd, capture_output=True, text=True)
            if result.returncode == 0:
            # Process the captured standard output
                logging.info(f"Script output:\n{result.stdout}")
                return result.stdout
            else:
            # Log and process errors if the script failed
                logging.error(f"Script failed with error:\n{result.stderr}")
                return None
        except Exception as e:
            logging.error(f"An error occurred: {e}")

# Logging setup
logging.basicConfig(level=logging.INFO)

### Coding-MST 16

In [None]:
# Initialize the processor
processor = Processing()

# Define maximum levels for each wavelet type

max_levels_per_wavelet = {
    "db1": 10, "db2": 10, "db3": 10, "db4": 10, "db5": 10,
}


# Data storage
points_per_wavelet = {}  # To store points for each wavelet and level

# Iterate through combinations with constraint on levels
for wavelet in processor.wavelets:
    max_level = max_levels_per_wavelet.get(wavelet, 2)  # Default to 2 if undefined
    points_per_wavelet[wavelet] = {}

    for level in processor.levels:
        if int(level) > max_level:
            continue

        if wavelet in ["sym4", "sym5", "bior3.3", "bior3.5"] and (int(level) < 4 or int(level) > 5):
            continue

        logging.info(f"Processing wavelet={wavelet}, level={level}")
        points = []  # Initialize points for this level

        for quant in processor.quant:

            output = processor.call_temporal_script_no_overlapping_16(quant, wavelet, level)

            if output:
                lines = output.splitlines()
                payload_sent_average = None
                average_rmse = None

                for line in lines:
                    if "Payload sent average " in line:
                        payload_sent_average = float(line.split("=")[1].strip().split()[0])
                    elif "Average RMSE (Root Mean Square Error) per sample " in line:
                        numbers_str = line.split("=")[1].strip().strip("[]")
                        numbers = [float(num) for num in numbers_str.split()]
                        average_rmse = sum(numbers) / len(numbers) if numbers else None

                if payload_sent_average is not None and average_rmse is not None:
                    logging.info(f"KBPS: {payload_sent_average}, RMSE: {average_rmse}")
                    points.append((payload_sent_average, average_rmse))
                else:
                    logging.warning(f"Missing data for quant={quant}. Skipping this point.")

        if points:
            points = sorted(points)
            points_per_wavelet[wavelet][level] = points

logging.info("Data collection completed.")

In [None]:
# Save points_per_wavelet for later use
import pickle

with open("points_per_wavelet_nol16.pkl", "wb") as f:
    pickle.dump(points_per_wavelet, f)

print("Data saved to 'points_per_wavelet_nol16.pkl'.")

In [None]:
# Load points_per_wavelet
import pickle

with open("points_per_wavelet_nol16.pkl", "rb") as f:
    points_per_wavelet = pickle.load(f)

# Generate plots
for wavelet, levels_data in points_per_wavelet.items():
    plt.figure()
    plt.title(f"RD Tradeoff for Wavelet: {wavelet}")
    plt.xlabel("R (Estimated Bits per Sample) [Entropy]")
    plt.ylabel("D (Root Mean Square Error)")
    plt.grid(True)

    for level, points in levels_data.items():
        if points:
            plt.plot(*zip(*points), marker="o", linestyle="-", label=f"Level {level}")

    plt.legend(title="Levels")
    plt.show()

### Coding- Overlapped

In [None]:
# Initialize the processor
processor = Processing()

# Define maximum levels for each wavelet type

max_levels_per_wavelet = {
    "db1": 9, "db2": 8, "db3": 7, "db4": 7, "db5": 6,
}


# Data storage
points_per_wavelet = {}  # To store points for each wavelet and level

# Iterate through combinations with constraint on levels
for wavelet in processor.wavelets:
    max_level = max_levels_per_wavelet.get(wavelet, 2)  # Default to 2 if undefined
    points_per_wavelet[wavelet] = {}

    for level in processor.levels:
        if int(level) > max_level:
            continue

        if wavelet in ["sym4", "sym5", "bior3.3", "bior3.5"] and (int(level) < 4 or int(level) > 5):
            continue

        logging.info(f"Processing wavelet={wavelet}, level={level}")
        points = []  # Initialize points for this level

        for quant in processor.quant:

            output = processor.call_temporal_script_overlapping(quant, wavelet, level)

            if output:
                lines = output.splitlines()
                payload_sent_average = None
                average_rmse = None

                for line in lines:
                    if "Payload sent average " in line:
                        payload_sent_average = float(line.split("=")[1].strip().split()[0])
                    elif "Average RMSE (Root Mean Square Error) per sample " in line:
                        numbers_str = line.split("=")[1].strip().strip("[]")
                        numbers = [float(num) for num in numbers_str.split()]
                        average_rmse = sum(numbers) / len(numbers) if numbers else None

                if payload_sent_average is not None and average_rmse is not None:
                    logging.info(f"KBPS: {payload_sent_average}, RMSE: {average_rmse}")
                    points.append((payload_sent_average, average_rmse))
                else:
                    logging.warning(f"Missing data for quant={quant}. Skipping this point.")

        if points:
            points = sorted(points)
            points_per_wavelet[wavelet][level] = points

logging.info("Data collection completed.")

In [None]:
# Save points_per_wavelet for later use
import pickle

with open("points_per_wavelet_ol.pkl", "wb") as f:
    pickle.dump(points_per_wavelet, f)

print("Data saved to 'points_per_wavelet_ol.pkl'.")

In [None]:
# Load points_per_wavelet
import pickle

with open("points_per_wavelet_ol.pkl", "rb") as f:
    points_per_wavelet = pickle.load(f)

# Generate plots
for wavelet, levels_data in points_per_wavelet.items():
    plt.figure()
    plt.title(f"RD Tradeoff for Wavelet: {wavelet}")
    plt.xlabel("R (Estimated Bits per Sample) [Entropy]")
    plt.ylabel("D (Root Mean Square Error)")
    plt.grid(True)

    for level, points in levels_data.items():
        if points:
            plt.plot(*zip(*points), marker="o", linestyle="-", label=f"Level {level}")

    plt.legend(title="Levels")
    plt.show()