# Class NDExpo¶
https://expostats.ca/site/app-local/NDExpo/aide_fr.htm

In [2]:
import pandas as pd
import numpy as np

In [3]:
# Fonction de parsing pour gérer `<x` et `[a-b]`
def parse_value(val):
    if isinstance(val, str):
        val = val.strip()
        # Valeur censurée
        if val.startswith('<'):
            try:
                censured_value = float(val[1:])
                return censured_value, True, 1
            except:
                return np.nan, True
        # Intervalle [a-b]
        elif val.startswith('[') and val.endswith(']'):
            try:
                a_b = val[1:-1].split('-')
                a = float(a_b[0])
                b = float(a_b[1])
                # Prend la moyenne de l'intervalle
                return (a + b) / 2, True, 2
            except:
                return np.nan, False
        # Valeur numérique simple
        else:
            try:
                return float(val), False, 0
            except:
                return np.nan, False, 0
    else:
        try:
            return float(val), False, 0
        except:
            return np.nan, False, 0

In [4]:
class Datum:
    def __init__(self, isND, value):
        self.isND = isND  # 1 si non détecté, 0 sinon
        if isND == 1:
            self.detectionLimitValue = value
        else:
            self.value = value
        self.position = -1
        self.index = -1
        self.plottingPosition = -1
        self.score = -1
        self.predicted = -1
        self.finalValue = -1
        self.dlGroup = None

    def getValue(self):
        if self.isNotDetected():
            return self.detectionLimitValue
        else:
            return self.value

    def isNotDetected(self):
        return self.isND == 1
        
    def isND(self):
        return self.isND == 1
        
    def __str__(self):
        return f"{self.position} : {'<' if self.isNotDetected() else ''}{self.getValue()} : {self.index} : {self.plottingPosition} : {self.score} : {self.finalValue}"

In [5]:
class DetectionLimit:
    def __init__(self, value):
        self.value = value
        self.A = 0
        self.B = 0
        self.C = 0
        self.overProbability = 0
        self.nextOverProbability = 0
        self.previousValues = []

    def getPlottingPosition(self, datum):
        if datum.isNotDetected():
            return datum.index * (1 - self.overProbability) / (self.C + 1)
        else:
            denom = self.A + self.B
            if denom == 0:
                return 0.5  # ou une valeur par défaut
            return (1 - self.overProbability) + ((self.overProbability - self.nextOverProbability) * (datum.index / denom))
            
    def __str__(self):
        return f"DetectionLimit({self.value}) A={self.A} B={self.B} C={self.C} prob={self.overProbability}"

In [6]:
class ND:
    def __init__(self):
        self.dataSet = []
        self.detectionLimitArray = []
        self.count = {'nd': 0, 'total': 0}
        self.error = 0

        # Constantes d'erreur
        self.ERR_TOOMANY_ND = 1
        self.ERR_GRTSTDL = 2
        self.ERR_NENGH_DATA = 3
        self.WARN_NO_ND = 4
        self.ERR_NENGH_DET = 5

        self.grouping = True

    def compare_value(self, datum1, datum2):
        diff = datum1.getValue() - datum2.getValue()
        if diff != 0:
            return diff
        return datum2.isND - datum1.isND
        
    def addDatum(self, isND, value, position=None):
        datum = Datum(isND, value)
        if position is not None:
            datum.position = position
        else:
            datum.position = len(self.dataSet)
        self.dataSet.append(datum)
        self.count['total'] += 1
        if isND == 1:
            self.count['nd'] += 1    
            
    def __str__(self):
        sumX = getattr(self, 'sumX', 'N/A')
        sumY = getattr(self, 'sumY', 'N/A')
        slope = getattr(self, 'slope', 'N/A')
        intercept = getattr(self, 'intercept', 'N/A')
        return (
            f"Objet ND\n"
            f"------------------------\n"
            f"Total données : {len(self.dataSet)}\n"
            f"NDétectés : {self.count['nd']}\n"
            f"Total : {self.count['total']}\n"
            f"Erreur : {self.error}\n"
            f"Limites detection : {len(self.detectionLimitArray)}\n"
            f"SumX (scores) : {sumX}\n"
            f"SumY (log valeurs) : {sumY}\n"
            f"Slope : {slope}\n"
            f"Intercept : {intercept}\n"
        )

    def do_calc(self):
        # Tri des données
        self.dataSet.sort(key=lambda d: d.getValue())

        total = self.count['total']
        nd_count = self.count['nd']

        # Vérifications
        if total < 5:
            self.error = self.ERR_NENGH_DATA
            return
        if (total - nd_count) < 3:
            self.error = self.ERR_NENGH_DET
            return
        if nd_count > np.floor(0.8 * total):
            self.error = self.ERR_TOOMANY_ND
            return
        if self.dataSet and self.dataSet[-1].isNotDetected():
            self.error = self.ERR_GRTSTDL
            return
        if nd_count == 0:
            self.error = self.WARN_NO_ND
            
        if self.error == 0:
            self.detectionLimitArray = []
    
            currentLimit = DetectionLimit(0)
            self.detectionLimitArray.append(currentLimit)
    
            first_datum = self.dataSet[0]
            if first_datum.isNotDetected():
                currentLimit = DetectionLimit(first_datum.detectionLimitValue)
                currentLimit.C += 1
                self.detectionLimitArray.append(currentLimit)
            else:
                currentLimit.A += 1
    
            currentPosition = 1
            first_datum.index = 1
            first_datum.dlGroup = currentLimit
    
            # Boucle sur le reste des données
            for i in range(1, len(self.dataSet)):
                datum = self.dataSet[i]
                prevDatum = self.dataSet[i - 1]
                if prevDatum.isND != datum.isND:
                    if datum.isNotDetected():
                        currentLimit = DetectionLimit(datum.detectionLimitValue)
                        self.detectionLimitArray.append(currentLimit)
                    currentPosition = 0
                else:
                    if datum.isNotDetected() and (prevDatum.detectionLimitValue != datum.detectionLimitValue):
                        if self.grouping:
                            currentLimit.previousValues.append(currentLimit.value)
                            currentLimit.value = datum.detectionLimitValue
                        else:
                            currentLimit = DetectionLimit(datum.detectionLimitValue)
                            self.detectionLimitArray.append(currentLimit)
                            currentPosition = 0
                currentPosition += 1
                datum.index = currentPosition
                datum.dlGroup = currentLimit
                if datum.isNotDetected():
                    currentLimit.C += 1
                else:
                    currentLimit.A += 1
    
            # Ajout d'un dernier groupe fictif
            last_value = self.dataSet[-1].getValue() + 1
            currentLimit = DetectionLimit(last_value)
            self.detectionLimitArray.append(currentLimit)
    
            # Calcul des B
            for i in range(1, len(self.detectionLimitArray)):
                prevLimit = self.detectionLimitArray[i - 1]
                currLimit = self.detectionLimitArray[i]
                currLimit.B = currLimit.C + prevLimit.B + prevLimit.A
    
            # Calcul des probabilités
            for i in reversed(range(len(self.detectionLimitArray) - 1)):
                currentLimit = self.detectionLimitArray[i]
                nextProb = self.detectionLimitArray[i + 1].overProbability
                currentLimit.nextOverProbability = nextProb
                denom = currentLimit.A + currentLimit.B
                if denom != 0:
                    currentLimit.overProbability = nextProb + (1 - nextProb) * (currentLimit.A / denom)
                else:
                    currentLimit.overProbability = nextProb
    
            # Initialiser la première overProbability
            self.detectionLimitArray[0].overProbability = 1
    
            # Calcul des scores et positions
            for datum in self.dataSet:
                datum.plottingPosition = datum.dlGroup.getPlottingPosition(datum)
                datum.score = self.inverseNormalcdf(datum.plottingPosition)
    
            # Calcul de la régression globale
            self.global_vals = self.GlobalValues(self.dataSet)
    
            # Prédictions et valeurs finales
            for datum in self.dataSet:
                if datum.isNotDetected():
                    datum.predicted = (datum.score * self.global_vals.slope) + self.global_vals.intercept
                    datum.finalValue = np.exp(datum.predicted)
                else:
                    datum.finalValue = datum.value
    
            # Trier par finalValue
            self.dataSet.sort(key=lambda d: d.finalValue)
            # Calcul du percentile 20
            self.global_vals.per20 = self.global_vals.getPercentile(20, self.dataSet)
    
            # Revenir à l'ordre initial
            self.dataSet.sort(key=lambda d: d.position)

    def inverseNormalcdf(self, p):
        return norm.ppf(p)

    class GlobalValues:
        def __init__(self, dataSet):
            self.slope = 0
            self.intercept = 0
            self.per20 = None
            self.compute(dataSet)
    
        def compute(self, dataSet):
            sumX = 0
            sumY = 0
            sumXY = 0
            sumXX = 0
            count = 0
            for d in dataSet:
                if not d.isNotDetected():
                    y = np.log(d.value)
                    x = d.score
                    sumX += x
                    sumY += y
                    sumXY += x * y
                    sumXX += x * x
                    count += 1
            if count>0 :
                denom = count * sumXX - sumX * sumX
                if denom != 0:
                    self.slope = (count * sumXY - sumX * sumY) / denom
                    self.intercept = (sumXX * sumY - sumX * sumXY) / denom
    
        def getPercentile(self, percentile, data):
            index = int(len(data) * percentile / 100)
            if data:
                return data[index].finalValue
            return None

In [None]:
def CheckNDL(df):
#application NDL
    values = []
    min_lvalue=0
    for v in df['Value']:
        val, isND, typ = parse_value(v)
        values.append((val, isND, typ))
        if isND: min_lvalue = val
    
    # Création de l'objet ND et ajout des données
    nd = ND()
    n_left_censored = 0
    n_interval_censored = 0
    n_exact_values = 0
    for i, (val, isND, typ) in enumerate(values):
        if val is not None:
            nd.addDatum(isND, val, i)
            if isND: 
                if typ==1: n_left_censored +=1 
                else: n_interval_censored +=1
            else: 
                n_exact_values+=1
    nd.do_calc()
    
    # données NDL
    ndl = [d for d in nd.dataSet]
    detected = [d for d in nd.dataSet if not d.isNotDetected()]
    ndetected = [d for d in nd.dataSet if d.isNotDetected()]
    
    # print(f"Detected ({len(detected)}):")
    # print(f"Not detected ({len(ndetected)}):")
    
    for d in ndl:
        df.at[d.position, 'NDL'] = d.isNotDetected()
        
    df['ValueNDL'] = df['Value']
    for d in ndetected:
        # print(f"Position: {d.position}, Valeur: {d.getValue():.3f} -> {d.finalValue:.3f}")
        df.at[d.position, 'ValueNDL'] = d.finalValue
           
    df['ValueNDL'] = pd.to_numeric(df['ValueNDL'], errors='coerce')
    
    return df