# Entwurf einer Python-Klasse zum Ziehen geschichteter Stichproben
Autor: Paul Schoppel

Datum: 02.10.2023

### Abhängigkeiten

In [92]:
import numpy as np
import pandas as pd
import warnings

### Source-Code der Klasse `StratifiedSample`

In [93]:
class StratifiedSample:
    def __init__(self, dataframe):
        self.dataframe = dataframe
        self.strata_cols = None
        self.sample_plan_df = None

    def read_sample_plan(self, sample_plan_df):
        """
        Reads the sample plan from a DataFrame and validates the strata columns.
        
        Parameters:
        - sample_plan_df: DataFrame containing the strata and NSIZE columns.
        """
        # Identify strata columns from sample plan
        self.strata_cols = [col for col in sample_plan_df.columns if col != 'NSIZE']
        
        # Check if strata columns in sample plan exist in the dataframe
        if not all(col in self.dataframe.columns for col in self.strata_cols):
            raise ValueError(f"The provided strata columns are not all present in the dataframe.")
        
        # Check if NSIZE column exists
        if 'NSIZE' not in sample_plan_df.columns:
            raise ValueError("NSIZE column is missing in the sample plan DataFrame.")
        
        self.sample_plan_df = sample_plan_df

    def get_plan_template(self, strata_cols=None):
        """
        Generates a sample plan template with empty NSIZE column.
        
        Parameters:
        - strata_cols: List of strata columns. If None, uses previously loaded strata_cols.
        
        Returns:
        - A DataFrame with a sample plan template.
        """
        if self.sample_plan_df is not None:
            plan_template = self.sample_plan_df.drop(columns='NSIZE').drop_duplicates().sort_values(by=self.strata_cols).reset_index(drop=True)
            plan_template['NSIZE'] = np.nan
            return plan_template
        
        if strata_cols is None:
            raise ValueError("No strata columns provided and no sample plan loaded. Cannot generate template.")
        
        if not all(col in self.dataframe.columns for col in strata_cols):
            raise ValueError("One or more provided strata columns do not exist in the dataframe.")
        
        plan_template = self.dataframe[strata_cols].drop_duplicates().sort_values(by=strata_cols).reset_index(drop=True)
        plan_template['NSIZE'] = np.nan
        return plan_template

    def sample_size(self):
        """
        Display the total number of observations that will be included in the stratified sample.
        """
        if self.sample_plan_df is None:
            raise ValueError("Sample plan has not been read. Please use read_sample_plan to load a sample plan.")
        
        n_total = self.sample_plan_df['NSIZE'].sum()
        print(f"The total number of observations that will be included in the stratified sample is {n_total}.")
    
    def get_sample(self, random_state=None):
        """
        Draws a stratified sample based on the stored sample plan DataFrame.
        
        Parameters:
        - random_state: Seed for random sampling.
        
        Returns:
        - Pandas DataFrame containing the stratified sample.
        """
        if self.sample_plan_df is None:
            raise ValueError("Sample plan has not been read. Please use read_sample_plan to load the sample plan.")
        
        if random_state:
            np.random.seed(random_state)
        
        samples = []
        for _, row in self.sample_plan_df.iterrows():
            strata_dict = row[self.strata_cols].to_dict()
            n = int(row['NSIZE'])
            query_str = ' & '.join([f"{k} == '{v}'" for k, v in strata_dict.items()])
            subset = self.dataframe.query(query_str)
            
            if n > len(subset):
                warnings.warn(f"Sample size {n} for strata {strata_dict} exceeds available observations {len(subset)}. Taking all available observations.")
                n = len(subset)
            
            sample = subset.sample(n=n, random_state=random_state)
            samples.append(sample)
        
        sample_df = pd.concat(samples).reset_index(drop=True)
        return sample_df

### Erstellen von simulierten Beispieldaten

In [94]:
# Simulated data for testing
bundeslaender = [
    'Baden-Württemberg', 'Bayern', 'Berlin', 'Brandenburg',
    'Bremen', 'Hamburg', 'Hessen', 'Mecklenburg-Vorpommern',
    'Niedersachsen', 'Nordrhein-Westfalen', 'Rheinland-Pfalz',
    'Saarland', 'Sachsen', 'Sachsen-Anhalt', 'Schleswig-Holstein', 'Thüringen'
]

np.random.seed(0)
n = 1000
data = {
    'ID': np.arange(n),
    'Geschlecht': np.random.choice(['M', 'W'], n),
    'Studienrichtung': np.random.choice(['Informatik', 'Medizin', 'Kunst'], n),
    'Bundesland': np.random.choice(bundeslaender, n),
    'Einkommen': np.random.normal(loc=3500, scale=200, size=n)
}
df = pd.DataFrame(data)
df.head()

Unnamed: 0,ID,Geschlecht,Studienrichtung,Bundesland,Einkommen
0,0,M,Kunst,Hessen,3236.456531
1,1,W,Informatik,Saarland,3608.201644
2,2,W,Informatik,Hamburg,3482.976879
3,3,M,Kunst,Niedersachsen,3387.139793
4,4,W,Informatik,Saarland,3693.353602


### Beispielhafte Verwendung der Klasse `StratifiedSample`

Initialisiere ein Sampler-Objekt mit der Grundgesamtheit an Daten

In [95]:
mein_sampler = StratifiedSample(df)

Mit der Methode `get_plan_template()` kann ein Vorlage-Stichprobenplan erstellt werden. Das Merkmal NSIZE im ausgegebenen Dataframe beschreibt die Stichprobengröße pro Schicht. In der Vorlage ist das Merkmal in jeder Zeile auf `NaN` gesetzt.

And das Argument `strata_cols` werden die Schichtungsmerkmale übergeben.

In [96]:
mein_sampler.get_plan_template(
    strata_cols = ['Bundesland', 'Geschlecht']
)

Unnamed: 0,Bundesland,Geschlecht,NSIZE
0,Baden-Württemberg,M,
1,Baden-Württemberg,W,
2,Bayern,M,
3,Bayern,W,
4,Berlin,M,
5,Berlin,W,
6,Brandenburg,M,
7,Brandenburg,W,
8,Bremen,M,
9,Bremen,W,


Nachfolgend wird anhand der Methode `get_plan_template()` ein vereinfachter, sehr kleiner, beispielhafter Stichprobenplan erstellt.

Die `NaN`-Werte in NSIZE werden durch zufällige Werte zwischen 1 und 10 ersetzt.

In [97]:
# Function generating random nsize values
def add_random_NSIZE(df, min_val, max_val, seed):
    np.random.seed(seed)
    n_rows = len(df)
    random_values = np.random.randint(min_val, max_val, size=n_rows)
    df['NSIZE'] = random_values
    return df

# build sample plan
example_plan = (
    mein_sampler.get_plan_template(strata_cols=['Bundesland', 'Geschlecht'])
    .iloc[::4]
    .drop('NSIZE', axis=1)
    .pipe(add_random_NSIZE, min_val=1, max_val=11, seed = 1)
)

print("Beispielhafter Stichprobenplan")
example_plan

Beispielhafter Stichprobenplan


Unnamed: 0,Bundesland,Geschlecht,NSIZE
0,Baden-Württemberg,M,6
4,Berlin,M,9
8,Bremen,M,10
12,Hessen,M,6
16,Niedersachsen,M,1
20,Rheinland-Pfalz,M,1
24,Sachsen,M,2
28,Schleswig-Holstein,M,8


Stichprobenplan einlesen

In [98]:
mein_sampler.read_sample_plan(example_plan)

Zur Kontrolle kann die Summe von NSIZE, also die Anzahl an geplanten Beobachtungen in der Stichprobe anhand der Methode `.sample_size()` ausgegeben werden.

In [99]:
mein_sampler.sample_size()

The total number of observations that will be included in the stratified sample is 43.


Stichprobe ziehen mit der Methode `.get_sample()`

In [71]:
mein_sampler.get_sample()

Unnamed: 0,ID,Geschlecht,Studienrichtung,Bundesland,Einkommen
0,114,M,Medizin,Baden-Württemberg,3530.933767
1,707,M,Medizin,Baden-Württemberg,3195.992143
2,195,M,Medizin,Baden-Württemberg,3438.124481
3,925,M,Medizin,Baden-Württemberg,3474.586567
4,985,M,Kunst,Baden-Württemberg,3611.392824
5,549,M,Kunst,Baden-Württemberg,3508.507815
6,532,M,Medizin,Berlin,3198.067679
7,815,M,Informatik,Berlin,3477.317674
8,967,M,Medizin,Berlin,3303.1582
9,701,M,Medizin,Berlin,3279.175203
