In [1]:
# Import libraries
import numpy as np
from parametriLancio import get_parametri  # Make sure parametri.py is in the same directory

import pandas as pd
import seaborn as sns

from sampling import Sampling

In [2]:
import numpy as np

class Generate_Population:
    def __init__(self, params):
        """
        Initialize the population generator with parameters.
        """
        np.random.seed(params['random_seed'])  # For reproducibility

        self.Q = params['Q']                  # Number of frames
        self.N = params['N']                  # Population size
        self.mean = np.array(params['mean'])  # Mean for multivariate normal
        self.cov = np.array(params['cov'])    # Covariance matrix
        self.Nd_pct = np.array(params['Nd_pct'])  # Proportion per domain
        self.mx = params["mx"]                # Target mean x
        self.my = params["my"]                # Target mean y

    def frame_membership(self):
        """
        Create binary matrix D x Q, where each row represents 
        frame membership for a domain.
        """
        self.D = 2**self.Q - 1
        self.d = np.arange(1, self.D + 1)
        self.q = np.arange(1, self.Q + 1)
        self.dBinary = np.array([
            list(map(int, np.binary_repr(i, self.Q)[::-1]))
            for i in range(1, self.D + 1)
        ])

    def evaluate_multiplicity(self):
        """
        Compute multiplicity (number of frames) per domain.
        """
        self.md = self.dBinary.sum(axis=1)

    def generate_population(self):
        """
        Generate positive-valued multivariate normal data (y, x).
        """
        yx = []
        while len(yx) < self.N:
            vals = np.random.multivariate_normal(self.mean, self.cov)
            if np.all(vals > 0):
                yx.append(vals)
        yx = np.array(yx) / 100.0
        self.y = yx[:, 0]
        self.x = yx[:, 1]

        # Check means
        assert np.isclose(self.y.mean()*100, self.my, atol=2), f"y mean off: {self.y.mean()*100}"
        assert np.isclose(self.x.mean()*100, self.mx, atol=8), f"x mean off: {self.x.mean()*100}"

    def population_size_per_domain(self):
        """
        Allocate population to domains according to Nd_pct.
        """
        self.Nd = np.round(self.Nd_pct * self.N).astype(int)
        diff = self.N - self.Nd.sum()
        if diff != 0:
            self.Nd[0] += diff
        assert self.Nd.sum() == self.N

    def assign_domains(self):
        """
        Randomly assign individuals to domains.
        """
        labels = np.arange(self.N)
        np.random.shuffle(labels)

        self.domains = []
        start = 0
        for nd in self.Nd:
            self.domains.append(labels[start:start+nd].tolist())
            start += nd

    def assign_frames(self):
        """
        Determine which individuals belong to each frame.
        """
        self.frames = []
        for iq in range(self.Q):
            idxs = []
            for idom in range(self.D):
                if self.dBinary[idom, iq] == 1:
                    idxs.extend(self.domains[idom])
            self.frames.append(sorted(idxs))

    def create_per_frame_variables(self):
        """
        Compute x and y values per frame.
        """
        self.x_kq = [[self.x[i] for i in frame] for frame in self.frames]
        self.y_kq = [[self.y[i] for i in frame] for frame in self.frames]

    def compute_totals(self):
        """
        Compute population and domain totals for y and x.
        """
        self.T_y = self.y.sum()
        self.T_x = self.x.sum()
        self.T_yd = [self.y[self.domains[idom]].sum() for idom in range(self.D)]
        self.T_xd = [self.x[self.domains[idom]].sum() for idom in range(self.D)]

    def multiplicity_per_unit_per_frame(self):
        """
        Compute the multiplicity vector for each unit in each frame.
        """
        self.m_kq = []
        for iq in range(self.Q):
            mkq = []
            for idom in range(self.D):
                if self.dBinary[idom, iq] == 1:
                    mkq += [self.md[idom]] * len(self.domains[idom])
            self.m_kq.append(mkq)

    def domain_membership_matrix(self):
        """
        Create a N x D binary matrix indicating domain membership.
        """
        self.domain_kd = np.zeros((self.N, self.D), dtype=int)
        for idom, dom in enumerate(self.domains):
            self.domain_kd[dom, idom] = 1

    def compute_g_list(self):
        """
        Create list of (g, domain_id, frame_id) for each domain-frame pair.
        """
        self.g_list = []
        g = 1
        for idom in range(self.D):
            for iq in range(self.Q):
                if self.dBinary[idom, iq] == 1:
                    self.g_list.append((g, idom + 1, iq + 1))
                    g += 1

    def run_generate(self):
        """
        Convenience method to run the full pipeline.
        """
        self.frame_membership()
        self.evaluate_multiplicity()
        self.generate_population()
        self.population_size_per_domain()
        self.assign_domains()
        self.assign_frames()
        self.create_per_frame_variables()
        self.compute_totals()
        self.multiplicity_per_unit_per_frame()
        self.domain_membership_matrix()
        self.compute_g_list()

    def explain_methods(self):
        """
        Print explanations of what each method does and their order of execution.
        """
        print("\n Methods Overview for Generate_Population\n")

        explanations = {
            "frame_membership": "1. Builds the D x Q binary matrix `dBinary` representing which domains belong to which frames.",
            "evaluate_multiplicity": "2. Computes `md`, the number of frames each domain is part of (i.e., domain multiplicity).",
            "generate_population": "3. Generates the full synthetic population (N individuals) from a multivariate normal distribution, ensuring all values are positive.",
            "population_size_per_domain": "4. Allocates the population into D domains based on the percentages in `Nd_pct`.",
            "assign_domains": "5. Randomly assigns individuals to the domains, ensuring sizes from step 4 are met.",
            "assign_frames": "6. Creates lists of individual indices per frame based on domain-frame memberships.",
            "create_per_frame_variables": "7. Extracts the `x` and `y` values for each frame from the full population data.",
            "compute_totals": "8. Computes total sums for `x` and `y` globally and per domain.",
            "multiplicity_per_unit_per_frame": "9. Computes for each individual in a frame the number of frames (multiplicity) they belong to.",
            "domain_membership_matrix": "10. Builds a binary matrix (N x D) showing domain membership for each individual.",
            "compute_g_list": "11. Generates a list of tuples (g, domain_id, frame_id) for indexing domain-frame combinations.",
            "run_all": "🚀 Runs all of the above methods in the correct order for full population setup.",
        }

        for method, desc in explanations.items():
            print(f"`{method}()`:\n    {desc}\n")

    def explain_attributes(self):
        """
        Print an explanation of each attribute (self.<...>) in the class.
        """
        print("\n Attributes of Generate_Population:\n")

        attributes = {
            "Q": "Number of frames (sampling sources). Set in __init__.",
            "N": "Total population size. Set in __init__.",
            "mean": "Mean vector for the multivariate normal distribution. Set in __init__.",
            "cov": "Covariance matrix for the multivariate normal distribution. Set in __init__.",
            "Nd_pct": "Proportion of population assigned to each domain. Set in __init__.",
            "mx": "Expected mean of x (auxiliary variable). Set in __init__.",
            "my": "Expected mean of y (target variable). Set in __init__.",
            "D": "Number of non-empty domains, computed as 2^Q - 1. Set in frame_membership().",
            "d": "Array of domain IDs (1 to D). Set in frame_membership().",
            "q": "Array of frame IDs (1 to Q). Set in frame_membership().",
            "dBinary": "D x Q binary matrix indicating domain membership in frames. Set in frame_membership().",
            "md": "Array with the number of frames each domain belongs to (domain multiplicity). Set in evaluate_multiplicity().",
            "y": "Generated values for the target variable. Set in generate_population().",
            "x": "Generated values for the auxiliary variable. Set in generate_population().",
            "Nd": "Number of individuals in each domain. Set in population_size_per_domain().",
            "domains": "List of index arrays, one per domain, indicating which individuals belong to which domain. Set in assign_domains().",
            "frames": "List of index arrays, one per frame, indicating which individuals belong to which frame. Set in assign_frames().",
            "x_kq": "List of lists: x values per individual per frame. Set in create_per_frame_variables().",
            "y_kq": "List of lists: y values per individual per frame. Set in create_per_frame_variables().",
            "T_y": "Total y value across the whole population. Set in compute_totals().",
            "T_x": "Total x value across the whole population. Set in compute_totals().",
            "T_yd": "List of y totals for each domain. Set in compute_totals().",
            "T_xd": "List of x totals for each domain. Set in compute_totals().",
            "m_kq": "List of multiplicity values for individuals in each frame. Set in multiplicity_per_unit_per_frame().",
            "domain_kd": "Binary matrix (N x D) indicating domain membership of each individual. Set in domain_membership_matrix().",
            "g_list": "List of tuples (g, domain_id, frame_id), used to reference domain-frame relationships. Set in compute_g_list()."
        }

        for attr, desc in attributes.items():
            print(f" `self.{attr}`:\n    {desc}\n")


In [6]:
class SampleExtractor(Generate_Population):
    def __init__(self, params):
        """
        Inherits from Generate_Population to access population generation.
        Then prepare the sampling configuration.
        """
        super().__init__(params)
        self.run_generate()  # Generate full population first

        # Sampling setup
        self.sample_designs = params['sample_design']
        self.nq = [int(np.ceil(self.Nd_pct[q] * params['fq'][q])) for q in range(self.Q)]
        self.random_seed = params['random_seed']
        self.Nrun = params.get('Nrun', 1)

    def extract_samples(self):
        """
        Extracts samples from each frame based on the specified sampling design,
        and computes first- and second-order inclusion probabilities.
        """
        self.s_q = []
        self.pi_kq_pop = []
        self.pi_kq = []
        self.pi_klq = []
        self.m_kq_sample = []
        self.rejects_q = []

        for q in range(self.Q):
            design = self.sample_designs[q] if isinstance(self.sample_designs, list) else self.sample_designs
            N = len(self.frames[q])
            n = self.nq[q]
            x = self.x_kq[q] if design in ("pareto", "sampford") else None

            sampling = Make(design, N=N, n=n, x=x)
            sample, rejects = sampling.get_sample(seed=self.random_seed + q)

            self.s_q.append(sample)
            self.rejects_q.append(rejects)
            self.pi_kq_pop.append(list(sampling.get_πi()))
            self.pi_kq.append(list(sampling.get_πi_sample(sample)))
            self.pi_klq.append(sampling.get_πij_sample(sample))
            self.m_kq_sample.append([self.m_kq[q][i] for i in sample])

    def classify_samples_by_domain(self):
        """
        Classifies sample units into their corresponding (domain, frame) pairs.
        """
        self.s_dq = []
        self.d_q = []

        for idom in range(self.D):
            for iq in range(self.Q):
                if self.dBinary[idom, iq] == 1:
                    frame_idxs = self.frames[iq]
                    dom_set = set(self.domains[idom])
                    sample = self.s_q[iq]
                    idxs_in_dom = [
                        i for i, idx in enumerate(sample)
                        if frame_idxs[idx] in dom_set
                    ]
                    self.s_dq.append(idxs_in_dom)
                    self.d_q.append((idom, iq))

    def compute_subsample_sizes(self):
        """
        Computes the size of each domain-frame subsample (d|q).
        """
        self.n_dq = [len(idxs) for idxs in self.s_dq]

    def run_extraction(self):
        """
        Runs the full sampling procedure after population generation.
        """
        self.extract_samples()
        self.classify_samples_by_domain()
        self.compute_subsample_sizes()

    def get_sampling_results(self):
        """
        Returns a dictionary of all sampling results.
        """
        return {
            's_q': self.s_q,
            'pi_kq_pop': self.pi_kq_pop,
            'pi_kq': self.pi_kq,
            'pi_klq': self.pi_klq,
            'm_kq': self.m_kq_sample,
            'nq': self.nq,
            'frames': self.frames,
            'x_kq': self.x_kq,
            'y_kq': self.y_kq,
            'pop': self.__dict__,  # or use a more selective population export
            's_dq': self.s_dq,
            'd_q': self.d_q,
            'n_dq': self.n_dq,
            'rejects_q': self.rejects_q
        }

    def explain_sampling_methods(self):
        """
        Explains all the sampling-related methods.
        """
        print("n SampleExtractor (child of Generate_Population) methods:\n")
        print(" extract_samples(): Draws a sample from each frame and computes inclusion probabilities.")
        print(" classify_samples_by_domain(): Maps sampled units to their domains within each frame.")
        print(" compute_subsample_sizes(): Counts sampled units per (domain, frame) pair.")
        print(" run_sampling(): Executes all three steps above in the correct order.")
        print(" get_sampling_results(): Returns a dictionary of all sampling-related outputs.\n")


class Estimates(SampleExtractor):

    def __init__(self, camp):
        """
        Inherits from SampleExtractor (which in turn inherits Generate_Population).
        Needs sampling results (`camp`) as input.
        """
        super().__init__(camp)


        self.run_extraction()
        self.Q = self.pop['Q']
        self.D = self.pop['D']
        self.dBinary = self.pop['dBinary']
        self.md = self.pop['md']
        self.Nq = self.pop['Nq']
        self.g_list = self.pop['g_list']

        self.deff = np.ones(self.Q)  # placeholder
        self.s_q = camp['s_q']
        self.nq = camp['nq']
        self.pi_kq = camp['pi_kq']
        self.m_kq = camp['m_kq']
        self.y_kq = self.pop['y_kq']
        self.frames = self.pop['frames']

        # results containers
        self.results = {}

    # ---------------- PIPELINE STEPS ---------------- #

    def compute_design_weights(self):
        """Step 1: Design weights and multiplicities"""
        self.w_kq = [[1/pi for pi in self.pi_kq[q]] for q in range(self.Q)]
        self.m_kq_full = [[m for m in self.m_kq[q]] for q in range(self.Q)]
        self.md_vec = self.md.copy()
        self.alpha_kq = [[1/m for m in self.m_kq[q]] for q in range(self.Q)]

    def compute_simple_multiplicity_estimator(self):
        """Step 2: Simple multiplicity estimator"""
        self.Y_SM = sum(
            self.y_kq[q][j] * self.alpha_kq[q][j] * self.w_kq[q][j]
            for q in range(self.Q)
            for j in range(len(self.s_q[q]))
        )

    def compute_Nhat(self):
        """Step 3: Nhat_q, Nhat_dq, Hhat_dq"""
        self.Nhat_q = [sum(self.w_kq[q]) for q in range(self.Q)]
        self.Nhat_dq = []
        self.y_dq = []
        self.wPML_kdq = []

        for g, id, iq in self.g_list:
            q = iq - 1
            d = id - 1
            dom_idx = [i for i, k in enumerate(self.s_q[q])
                       if self.pop['domain_kd'][self.frames[q][k], d] == 1]

            Nhat = sum(self.w_kq[q][i] for i in dom_idx)
            self.Nhat_dq.append(Nhat)
            ysum = sum(self.y_kq[q][i] for i in dom_idx)
            self.y_dq.append(ysum)
            self.wPML_kdq.append([0.0 for _ in dom_idx])  # placeholder

        self.Hhat_dq = self.dBinary.copy().astype(float)
        g_idx = 0
        for d in range(self.D):
            for q in range(self.Q):
                if self.dBinary[d, q] == 1:
                    self.Hhat_dq[d, q] = self.Nhat_dq[g_idx]
                    g_idx += 1

    def compute_NPML0(self):
        """Step 4: NPML0 initial estimates"""
        self.NPML0 = np.zeros(self.D)
        for d in range(self.D):
            idx_q = np.where(self.dBinary[d] == 1)[0]
            if len(idx_q) == 1:
                self.NPML0[d] = self.Hhat_dq[d, idx_q[0]]
            else:
                n_dq = [sum([self.pop['domain_kd'][self.frames[q][k], d] for k in self.s_q[q]])
                        for q in idx_q]
                qmax = idx_q[np.argmax(n_dq)]
                self.NPML0[d] = self.Hhat_dq[d, qmax]

    def iterative_npml(self, max_iter=25):
        """Step 5: Iterative NPML estimates"""
        Mplus = scipy.linalg.pinv(self.dBinary)
        fdeff = [self.nq[q] / self.Nq[q] / self.deff[q] for q in range(self.Q)]

        def A(x):
            A1 = (np.identity(self.D) - self.dBinary @ Mplus) @ scipy.linalg.inv(np.diag(x)) @ np.diag(self.dBinary @ fdeff)
            A2 = self.dBinary.T
            return np.concatenate((A1, A2))

        def b(x):
            b1 = (np.identity(self.D) - self.dBinary @ Mplus) @ scipy.linalg.inv(np.diag(x)) @ self.Hhat_dq @ fdeff
            b2 = np.array(self.Nq)
            return np.concatenate((b1, b2))

        x_old = np.array(self.NPML0)
        for _ in range(max_iter):
            try:
                A_mat = A(x_old)
                b_vec = b(x_old)
                x_new = np.linalg.pinv(A_mat) @ b_vec
            except Exception:
                x_new = x_old
            if np.allclose(x_old, x_new):
                break
            x_old = x_new

        self.NPML_d = x_old

    def compute_pml_weights(self):
        """Step 6: Compute PML weights and estimates"""
        self.wPML_kdq = []
        g_idx = 0
        for g, id, iq in self.g_list:
            q = iq - 1
            d = id - 1
            dom_idx = [i for i, k in enumerate(self.s_q[q])
                       if self.pop['domain_kd'][self.frames[q][k], d] == 1]
            Nhat = self.Nhat_dq[g_idx]
            wPML = [self.NPML_d[d] * self.w_kq[q][i] / Nhat if Nhat > 0 else 0.0 for i in dom_idx]
            self.wPML_kdq.append(wPML)
            g_idx += 1

        # totals
        self.t_ywPML_g = []
        g_idx = 0
        for g, id, iq in self.g_list:
            q = iq - 1
            d = id - 1
            dom_idx = [i for i, k in enumerate(self.s_q[q])
                       if self.pop['domain_kd'][self.frames[q][k], d] == 1]
            t_yw = sum(self.y_kq[q][i] * self.wPML_kdq[g_idx][j] for j, i in enumerate(dom_idx))
            self.t_ywPML_g.append(t_yw)
            g_idx += 1

    def compute_pml_alpha(self):
        """Step 7: Compute PML alpha (pPML_g)"""
        self.pPML_g = []
        fdeff = [self.nq[q] / self.Nq[q] / self.deff[q] for q in range(self.Q)]

        for g, id, iq in self.g_list:
            d = id - 1
            idx_q = np.where(self.dBinary[d] == 1)[0]
            if self.md[d] == 1:
                self.pPML_g.append(1.0)
            elif self.md[d] == 2:
                q1, q2 = idx_q
                f1 = fdeff[q1] * self.Hhat_dq[d, q1]
                f2 = fdeff[q2] * self.Hhat_dq[d, q2]
                if iq - 1 == q1:
                    self.pPML_g.append(f1 / (f1 + f2) if (f1 + f2) > 0 else 0.5)
                else:
                    self.pPML_g.append(f2 / (f1 + f2) if (f1 + f2) > 0 else 0.5)
            else:
                # generalize to 3+ frames
                weights = [fdeff[q] * self.Hhat_dq[d, q] for q in idx_q]
                tot = sum(weights)
                qcur = iq - 1
                self.pPML_g.append(weights[list(idx_q).index(qcur)] / tot if tot > 0 else 1 / len(idx_q))

    def compute_final_estimates(self):
        """Step 8: Final estimates dictionary"""
        self.Y_PML = sum(self.pPML_g[g] * self.t_ywPML_g[g] for g in range(len(self.g_list)))

        self.results = {
            'Y_SM': self.Y_SM,
            'w_kq': self.w_kq,
            'm_kq': self.m_kq_full,
            'md': self.md_vec,
            'alpha_kq': self.alpha_kq,
            'Nhat_q': self.Nhat_q,
            'Nhat_dq': self.Nhat_dq,
            'Hhat_dq': self.Hhat_dq,
            'NPML0': self.NPML0,
            'NPML_d': self.NPML_d,
            'wPML_kdq': self.wPML_kdq,
            't_ywPML_g': self.t_ywPML_g,
            'pPML_g': self.pPML_g,
            'Y_PML': self.Y_PML
        }
        return self.results

    # ---------------- PIPELINE RUNNER ---------------- #

    def run(self):
        """Run all steps in correct sequence."""
        self.compute_design_weights()
        self.compute_simple_multiplicity_estimator()
        self.compute_Nhat()
        self.compute_NPML0()
        self.iterative_npml()
        self.compute_pml_weights()
        self.compute_pml_alpha()
        return self.compute_final_estimates()


In [7]:
gen_pop = Generate_Population(get_parametri())
gen_pop.run_generate()

In [8]:
sample_pop = SampleExtractor(get_parametri())

In [10]:
estimates = Estimates(get_parametri())

KeyError: 'pop'