In [16]:
import pandas as pd
import numpy as np
import random, math, time
from scipy import stats
from scipy.stats import percentileofscore, rankdata, skew, kurtosis
from copulas.multivariate import GaussianMultivariate
import warnings
warnings.filterwarnings('ignore')

# Import Spark
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, udf, broadcast, rand, randn
from pyspark.sql.types import DoubleType, ArrayType, StructType, StructField
import pyspark.sql.functions as F

# Kh·ªüi t·∫°o Spark Session v·ªõi c·∫•u h√¨nh t·ªëi ∆∞u nh∆∞ng consistent
def init_spark(app_name="SyntheticDataGenerator"):
    """Kh·ªüi t·∫°o Spark Session v·ªõi c·∫•u h√¨nh t·ªëi ∆∞u"""
    spark = SparkSession.builder \
        .appName(app_name) \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64MB") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
        .getOrCreate()
    
    spark.sparkContext.setLogLevel("WARN")
    return spark

# ----------- GI·ªÆ NGUY√äN C√ÅC H√ÄM PH√ÇN PH·ªêI T·ª™ CODE G·ªêC -----------
def fuzz_param(param, IN):
    """Fuzz parameter v·ªõi noise level IN - GI·ªÆ NGUY√äN"""
    return random.uniform(param*(1-IN), param*(1+IN))

def generate_fuzzed_erlang(k, expected_value, IN, size):

    theta_mean = k / expected_value  # rate parameter = k/mean
    results = []
    for _ in range(size):
        # Sinh k bi·∫øn exponential v·ªõi rate parameter ƒë∆∞·ª£c fuzz
        xN = sum(-math.log(1 - random.uniform(0.001, 0.999)) / fuzz_param(theta_mean, IN) 
                  for _ in range(k))
        results.append(xN)
    return np.array(results)

def generate_fuzzed_gamma(alpha, beta, IN_alpha, k, size):

    IN_beta = k * IN_alpha  # Li√™n k·∫øt noise levels
    results = []
    for _ in range(size):
        # Fuzz alpha v√† beta
        alpha_fuzz = fuzz_param(alpha, IN_alpha)
        beta_fuzz = max(0.01, fuzz_param(beta, IN_beta))  # ƒê·∫£m b·∫£o beta > 0
        
        # Sinh t·ª´ Gamma distribution
        sample = np.random.gamma(shape=alpha_fuzz, scale=1/beta_fuzz)
        results.append(float(sample))
    return np.array(results)

def generate_fuzzed_gaussian(mu, sigma, IN_sigma, k, size):
    """GI·ªÆ NGUY√äN 100% logic t·ª´ code g·ªëc"""
    IN_mu = k * IN_sigma  # Li√™n k·∫øt noise levels
    data = []
    
    for _ in range(size // 2 + 1):  # +1 ƒë·ªÉ ƒë·∫£m b·∫£o ƒë·ªß samples
        # Fuzz mu v√† sigma
        fuzzed_mu = fuzz_param(mu, IN_mu)
        fuzzed_sigma = max(1e-6, fuzz_param(sigma, IN_sigma))  # ƒê·∫£m b·∫£o sigma > 0
        
        # Sinh 2 bi·∫øn chu·∫©n b·∫±ng Box-Muller
        u1, u2 = np.random.uniform(0, 1, 2)
        z0 = np.sqrt(-2 * np.log(u1)) * np.cos(2 * np.pi * u2)
        z1 = np.sqrt(-2 * np.log(u1)) * np.sin(2 * np.pi * u2)
        
        data.append(fuzzed_mu + fuzzed_sigma * z0)
        data.append(fuzzed_mu + fuzzed_sigma * z1)
    
    return np.array(data[:size])

def transform_uniform_to_distribution(uniform_vals, target_samples):
    """GI·ªÆ NGUY√äN logic transform t·ª´ code g·ªëc"""
    # T·∫°o empirical CDF t·ª´ target samples
    sorted_target = np.sort(target_samples)
    n = len(sorted_target)
    
    # Transform uniform values
    transformed = []
    for u in uniform_vals:
        # T√¨m percentile t∆∞∆°ng ·ª©ng trong target distribution
        idx = int(u * (n - 1))
        if idx >= n - 1:
            transformed.append(sorted_target[-1])
        else:
            # Linear interpolation
            alpha = (u * (n - 1)) - idx
            val = sorted_target[idx] * (1 - alpha) + sorted_target[idx + 1] * alpha
            transformed.append(val)
    
    return np.array(transformed)

# ----------- TH·ªêNG K√ä T∆Ø∆†NG TH√çCH V·ªöI CODE G·ªêC -----------
def compute_column_stats_spark_compatible(spark_df):
    """
    T√≠nh th·ªëng k√™ t∆∞∆°ng th√≠ch 100% v·ªõi code g·ªëc
    - Kh√¥ng sample ƒë·ªÉ ƒë·∫£m b·∫£o ch√≠nh x√°c tuy·ªát ƒë·ªëi
    - C√πng logic t√≠nh to√°n
    """
    stats_dict = {}
    
    print("üîÑ T√≠nh to√°n th·ªëng k√™ (compatible mode)...")
    
    # T√≠nh basic stats b·∫±ng Spark (t·ªëi ∆∞u)
    stat_exprs = []
    for col_name in spark_df.columns:
        stat_exprs.extend([
            F.mean(col(col_name)).alias(f"{col_name}_mean"),
            F.stddev(col(col_name)).alias(f"{col_name}_std"),
            F.min(col(col_name)).alias(f"{col_name}_min"),
            F.max(col(col_name)).alias(f"{col_name}_max"),
            F.count(col(col_name)).alias(f"{col_name}_count"),
            F.countDistinct(col(col_name)).alias(f"{col_name}_unique")
        ])
    
    stats_row = spark_df.select(*stat_exprs).collect()[0]
    
    # ‚ö†Ô∏è QUAN TR·ªåNG: Convert TO√ÄN B·ªò data ƒë·ªÉ t√≠nh skew/kurtosis ch√≠nh x√°c
    # ƒê√¢y l√† ƒëi·ªÉm kh√°c bi·ªát ch√≠nh so v·ªõi optimized version
    print("   Converting to pandas for exact skew/kurtosis calculation...")
    df_pandas = spark_df.toPandas()
    
    for col_name in spark_df.columns:
        # L·∫•y basic stats t·ª´ Spark
        mean_val = stats_row[f"{col_name}_mean"]
        std_val = stats_row[f"{col_name}_std"]
        count_val = stats_row[f"{col_name}_count"]
        unique_val = stats_row[f"{col_name}_unique"]
        min_val = stats_row[f"{col_name}_min"]
        max_val = stats_row[f"{col_name}_max"]
        
        # T√≠nh skew/kurtosis t·ª´ TO√ÄN B·ªò data (kh√¥ng sample)
        col_data = df_pandas[col_name].values
        col_data = col_data[np.isfinite(col_data)]  # Remove NaN/inf
        
        if len(col_data) > 0:
            skew_val = skew(col_data) 
            kurt_val = kurtosis(col_data)
            unique_ratio = unique_val / count_val
            cv = std_val / (abs(mean_val) + 1e-8)
        else:
            skew_val = 0
            kurt_val = 0
            unique_ratio = 0
            cv = 0
        
        stats_dict[col_name] = {
            "mean": mean_val,
            "std": std_val,
            "skewness": skew_val,
            "kurtosis": kurt_val,
            "min": min_val,
            "max": max_val,
            "unique_ratio": unique_ratio,
            "cv": cv
        }
    
    return stats_dict

def select_best_distribution(stats_dict, data=None):
    """GI·ªÆ NGUY√äN 100% logic selection t·ª´ code g·ªëc"""
    skew_val = stats_dict["skewness"]
    kurt_val = stats_dict["kurtosis"]
    min_val = stats_dict["min"]
    cv = stats_dict["cv"]
    unique_ratio = stats_dict["unique_ratio"]
    std = stats_dict["std"]

    # Hard constraints (ch·ªâ gi·ªØ c√°i th·∫≠t s·ª± b·∫Øt bu·ªôc)
    if std <= 0:
        return "gaussian", "zero_variance"
    if min_val < 0:
        return "gaussian", "negative_values"

    # Heuristic rules
    if skew_val > 1.5:
        if cv > 0.7:
            return "erlang", f"high_skew={skew_val:.2f}_cv={cv:.2f}"
        else:
            return "gamma", f"high_skew={skew_val:.2f}_low_cv={cv:.2f}"
    elif 0.5 <= skew_val <= 1.5:
        if cv > 1:
            return "erlang", f"moderate_skew={skew_val:.2f}_high_cv={cv:.2f}"
        else:
            return "gamma", f"moderate_skew={skew_val:.2f}_cv={cv:.2f}"
    elif abs(skew_val) < 0.5:
        return "gaussian", f"symmetric_skew={skew_val:.2f}"
    elif skew_val < -0.5:
        return "gaussian", f"negative_skew={skew_val:.2f}"
    else:
        return "gaussian", f"fallback_skew={skew_val:.2f}"

    # Secondary check (n·∫øu mu·ªën v·∫´n note unique_ratio nh∆∞ng kh√¥ng override)
    if unique_ratio < 0.1:
        return "gaussian", f"low_unique_ratio_skew={skew_val:.2f}"

def auto_select_distributions_spark_compatible(spark_df):
    """T·ª± ƒë·ªông ch·ªçn ph√¢n ph·ªëi t∆∞∆°ng th√≠ch v·ªõi code g·ªëc"""
    # S·ª≠ d·ª•ng stats function t∆∞∆°ng th√≠ch
    column_stats = compute_column_stats_spark_compatible(spark_df)
    
    distribution_map = {}
    print("üîç Ph√¢n t√≠ch t·ª´ng c·ªôt ƒë·ªÉ ch·ªçn ph√¢n ph·ªëi...")
    print("-" * 70)
    
    for col_name, stats in column_stats.items():
        chosen_dist, reason = select_best_distribution(stats)
        
        analysis = stats.copy()
        analysis["distribution"] = chosen_dist
        analysis["reason"] = reason
        distribution_map[col_name] = analysis
        
        print(f"üìä {col_name:15} ‚Üí {chosen_dist:8} "
              f"(skew={stats['skewness']:5.2f}, reason: {reason})")
    
    print("-" * 70)
    
    # Th·ªëng k√™ t·ªïng quan
    dist_counts = {}
    for info in distribution_map.values():
        dist = info['distribution']
        dist_counts[dist] = dist_counts.get(dist, 0) + 1
    
    print("üìà Ph√¢n ph·ªëi ƒë∆∞·ª£c ch·ªçn:")
    for dist, count in dist_counts.items():
        print(f"   {dist.capitalize():10}: {count} c·ªôt(s)")
    
    return distribution_map

# ----------- MAIN FUNCTION T∆Ø∆†NG TH√çCH 100% -----------
def generate_adaptive_synthetic_with_spark_compatible(spark_df, global_params=None, column_overrides=None, size=None):
    """
    Version t∆∞∆°ng th√≠ch 100% v·ªõi code g·ªëc
    - C√πng logic sinh marginal distributions
    - C√πng copula fitting approach  
    - C√πng transform method
    """

    if size is None:
        size = spark_df.count()
    
    # Default global parameters - GI·ªÆ NGUY√äN
    if global_params is None:
        global_params = {
            "erlang": {"k": 2, "IN": 0.15},
            "gamma": {"k_link": 0.2, "IN_alpha": 0.15}, 
            "gaussian": {"k_link": 0.2, "IN_sigma": 0.10}
        }
    
    # Step 1: Auto-select distributions cho t·ª´ng c·ªôt - T∆Ø∆†NG TH√çCH
    distribution_map = auto_select_distributions_spark_compatible(spark_df)
    
    # Apply column overrides n·∫øu c√≥
    if column_overrides:
        for col_name, override_dist in column_overrides.items():
            if col_name in distribution_map:
                old_dist = distribution_map[col_name]['distribution']
                distribution_map[col_name]['distribution'] = override_dist
                distribution_map[col_name]['reason'] = f"manual_override_from_{old_dist}"
                print(f"üîÑ Override {col_name}: {old_dist} ‚Üí {override_dist}")
    
    print(f"\nüîÑ Sinh {size} m·∫´u synthetic...")
    
    # Convert Spark DataFrame to Pandas ƒë·ªÉ l√†m vi·ªác v·ªõi Copula
    # GI·ªÆ NGUY√äN approach n√†y
    df_pandas = spark_df.toPandas()
    
    # Step 2: Sinh marginal distributions cho m·ªói c·ªôt - GI·ªÆ NGUY√äN LOGIC
    marginal_samples = {}
    
    for col_name in df_pandas.columns:
        col_data = df_pandas[col_name].values
        col_mean = np.mean(col_data)
        col_std = np.std(col_data, ddof=1)
        dist_info = distribution_map[col_name]
        dist_type = dist_info['distribution']
        
        print(f"   {col_name}: {dist_type}")
        
        if dist_type == "erlang":
            params = global_params["erlang"]
            marginal_samples[col_name] = generate_fuzzed_erlang(
                k=params["k"], 
                expected_value=col_mean, 
                IN=params["IN"], 
                size=size
            )
            
        elif dist_type == "gamma":
            params = global_params["gamma"]
            # Fit Gamma: alpha = (mean/std)^2, beta = mean/std^2
            if col_std > 0:
                alpha = (col_mean / col_std) ** 2
                beta = col_mean / (col_std ** 2)
            else:
                alpha, beta = 1.0, 1.0
                
            marginal_samples[col_name] = generate_fuzzed_gamma(
                alpha=alpha,
                beta=beta,
                IN_alpha=params["IN_alpha"],
                k=params["k_link"],
                size=size
            )
            
        elif dist_type == "gaussian":
            params = global_params["gaussian"]
            marginal_samples[col_name] = generate_fuzzed_gaussian(
                mu=col_mean,
                sigma=max(1e-6, col_std),  # Avoid zero std
                IN_sigma=params["IN_sigma"],
                k=params["k_link"],
                size=size
            )
        else:
            # Fallback to Gaussian
            params = global_params["gaussian"]
            marginal_samples[col_name] = generate_fuzzed_gaussian(
                mu=col_mean,
                sigma=max(1e-6, col_std),
                IN_sigma=params["IN_sigma"],
                k=params["k_link"],
                size=size
            )
    
    # Step 3: Fit Gaussian Copula tr√™n d·ªØ li·ªáu g·ªëc - GI·ªÆ NGUY√äN
    print("üîÑ Fitting Gaussian Copula...")
    copula_model = GaussianMultivariate()
    copula_model.fit(df_pandas)
    
    # Step 4: Sample t·ª´ copula ƒë·ªÉ c√≥ dependency structure - GI·ªÆ NGUY√äN
    print("üîÑ Sampling t·ª´ copula...")
    copula_samples = copula_model.sample(size)
    
    # Step 5: Transform copula samples th√†nh target distribution - GI·ªÆ NGUY√äN
    print("üîÑ Transform copula samples...")
    synthetic_df = pd.DataFrame(index=range(size), columns=df_pandas.columns)
    
    for col_name in df_pandas.columns:
        # L·∫•y copula values cho c·ªôt n√†y
        copula_vals = copula_samples[col_name].values
        
        # Transform v·ªÅ uniform [0,1] b·∫±ng empirical CDF c·ªßa d·ªØ li·ªáu g·ªëc - GI·ªÆ NGUY√äN
        uniform_vals = []
        for val in copula_vals:
            # T√≠nh percentile c·ªßa val trong d·ªØ li·ªáu g·ªëc
            percentile = percentileofscore(df_pandas[col_name], val, kind='rank') / 100
            uniform_vals.append(percentile)
        
        uniform_vals = np.array(uniform_vals)
        uniform_vals = np.clip(uniform_vals, 0.001, 0.999)  # Tr√°nh extremes
        
        # Transform uniform values th√†nh marginal distribution - GI·ªÆ NGUY√äN
        synthetic_col = transform_uniform_to_distribution(
            uniform_vals, 
            marginal_samples[col_name]
        )
        synthetic_df[col_name] = synthetic_col
    
    return synthetic_df, distribution_map

# ----------- Utility functions - GI·ªÆ NGUY√äN -----------
def read_dataset_spark(spark, path):
    """ƒê·ªçc CSV dataset b·∫±ng Spark v·ªõi coalesce t·ªëi ∆∞u"""
    return spark.read.csv(path, header=True, inferSchema=True) \
                    .coalesce(spark.sparkContext.defaultParallelism)

def detect_binary_columns_spark(spark_df, threshold=0.1):
    """T·ª± ƒë·ªông ph√°t hi·ªán c·ªôt binary b·∫±ng Spark - GI·ªÆ NGUY√äN LOGIC"""
    binary_cols = []
    total_rows = spark_df.count()
    
    for col_name in spark_df.columns:
        unique_count = spark_df.select(col_name).distinct().count()
        unique_ratio = unique_count / total_rows
        
        if unique_ratio <= threshold and unique_count <= 10:
            binary_cols.append(col_name)
    
    return binary_cols

def handle_binary_columns(df_synthetic, df_original_pandas, binary_cols=None):
    """X·ª≠ l√Ω c√°c c·ªôt binary v·ªÅ gi√° tr·ªã discrete - GI·ªÆ NGUY√äN"""
    if binary_cols is None:
        binary_cols = []
        for col in df_original_pandas.columns:
            unique_ratio = df_original_pandas[col].nunique() / len(df_original_pandas)
            if unique_ratio <= 0.1 and df_original_pandas[col].nunique() <= 10:
                binary_cols.append(col)
    
    for col in binary_cols:
        if col in df_synthetic.columns:
            original_values = sorted(df_original_pandas[col].unique())
            
            if len(original_values) == 2:
                # Binary column
                threshold = np.median(df_synthetic[col])
                df_synthetic[col] = np.where(
                    df_synthetic[col] >= threshold, 
                    original_values[1], 
                    original_values[0]
                )
            else:
                # Multi-class categorical 
                synthetic_vals = df_synthetic[col].values
                quantized = []
                for val in synthetic_vals:
                    # T√¨m gi√° tr·ªã g·∫ßn nh·∫•t
                    closest_val = min(original_values, key=lambda x: abs(x - val))
                    quantized.append(closest_val)
                df_synthetic[col] = quantized
    
    return binary_cols

# ----------- Main execution - T∆Ø∆†NG TH√çCH 100% -----------
def main():
    # Set random seed for reproducibility - QUAN TR·ªåNG!
    random.seed(42)
    np.random.seed(42)
    
    # Kh·ªüi t·∫°o Spark
    spark = init_spark("CompatibleSyntheticDataGenerator")
    
    try:
        # ƒê·ªçc d·ªØ li·ªáu b·∫±ng Spark
        print("üîÑ ƒêang ƒë·ªçc d·ªØ li·ªáu v·ªõi Spark...")
        spark_df = read_dataset_spark(spark, "diabetes.csv")
        row_count = spark_df.count()
        col_count = len(spark_df.columns)
        print(f" ƒê√£ ƒë·ªçc {row_count} rows, {col_count} columns\n")
        sizes = [None,100000]
        
        # L·∫∑p qua t·ª´ng size, sinh v√† l∆∞u + ƒëo th·ªùi gian
        for sz in sizes:
            # Format t√™n & actual_size
            if sz is None:
                actual_size = row_count
                file_tag = f"{actual_size}"
            else:
                actual_size = int(sz)
                file_tag = f"{actual_size}"
            
            print(f"\n{'='*60}")
            print(f"üîÑ B·∫Øt ƒë·∫ßu sinh synthetic size = {file_tag} (COMPATIBLE MODE)")
            start_time = time.time()
            
            # C·∫£nh b√°o khi size l·ªõn
            if actual_size >= 500_000:
                print("‚ö†Ô∏è  L∆∞u √Ω: size l·ªõn (>=500k). ƒê·∫£m b·∫£o m√°y c√≥ ƒë·ªß RAM ƒë·ªÉ toPandas() v√† x·ª≠ l√Ω.")
            
            # G·ªçi h√†m sinh - S·ª¨ D·ª§NG COMPATIBLE VERSION
            df_synthetic, distribution_info = generate_adaptive_synthetic_with_spark_compatible(
                spark_df,
                global_params = {
                    "erlang": {"k": 5, "IN": 0.35},        
                    "gamma": {"k_link": 0.3, "IN_alpha": 0.35},  
                    "gaussian": {"k_link": 0.3, "IN_sigma": 0.35}
                },
                size = (None if sz is None else int(sz))
            )
            
            # X·ª≠ l√Ω binary columns - GI·ªêNG CODE G·ªêC
            df_original_pandas = spark_df.toPandas()
            print("\nüîÑ X·ª≠ l√Ω binary columns...")
            binary_cols = handle_binary_columns(df_synthetic, df_original_pandas)
            print(f" ƒê√£ x·ª≠ l√Ω binary columns: {binary_cols}")
            
            # L∆∞u file k·∫øt qu·∫£
            # output_file = f"synthetic_adaptive_data_spark35.csv"
            output_file = f"synthetic_compatible_spark_{file_tag}.csv"

            df_synthetic.to_csv(output_file, index=False)
            
            # L∆∞u th√¥ng tin distribution ƒë∆∞·ª£c ch·ªçn
            dist_summary = pd.DataFrame([
                {
                    "Column": col, 
                    "Distribution": info['distribution'],
                    "Reason": info['reason'],
                    "Skewness": info.get('skewness', 'N/A'),
                    "Mean": info.get('mean', 'N/A'),
                    "Std": info.get('std', 'N/A')
                }
                for col, info in distribution_info.items()
            ])
            dist_summary_file = f"distribution_compatible_spark_{file_tag}.csv"
            dist_summary.to_csv(dist_summary_file, index=False)
            
            elapsed = time.time() - start_time
            mins = int(elapsed // 60)
            secs = int(elapsed % 60)
            print(f" ƒê√£ l∆∞u {output_file} | Th·ªùi gian: {mins}m {secs}s")
            print(f"{'='*60}\n")
        
        print("\nüéâ Ho√†n t·∫•t v·ªõi compatible mode!")
        
    finally:
        # ƒê√≥ng Spark session
        spark.stop()

if __name__ == "__main__":
    main()

üîÑ ƒêang ƒë·ªçc d·ªØ li·ªáu v·ªõi Spark...
‚úÖ ƒê√£ ƒë·ªçc 768 rows, 9 columns


üîÑ B·∫Øt ƒë·∫ßu sinh synthetic size = 768 (COMPATIBLE MODE)
üîÑ T√≠nh to√°n th·ªëng k√™ (compatible mode)...
   Converting to pandas for exact skew/kurtosis calculation...
üîç Ph√¢n t√≠ch t·ª´ng c·ªôt ƒë·ªÉ ch·ªçn ph√¢n ph·ªëi...
----------------------------------------------------------------------
üìä Pregnancies     ‚Üí gamma    (skew= 0.90, reason: moderate_skew=0.90_cv=0.88)
üìä Glucose         ‚Üí gaussian (skew= 0.17, reason: symmetric_skew=0.17)
üìä BloodPressure   ‚Üí gaussian (skew=-1.84, reason: negative_skew=-1.84)
üìä SkinThickness   ‚Üí gaussian (skew= 0.11, reason: symmetric_skew=0.11)
üìä Insulin         ‚Üí erlang   (skew= 2.27, reason: high_skew=2.27_cv=1.44)
üìä BMI             ‚Üí gaussian (skew=-0.43, reason: symmetric_skew=-0.43)
üìä DiabetesPedigreeFunction ‚Üí erlang   (skew= 1.92, reason: high_skew=1.92_cv=0.70)
üìä Age             ‚Üí gamma    (skew= 1.13, reason: mo

In [92]:
import pandas as pd
import numpy as np
import random, math, time
from scipy import stats
from scipy.stats import percentileofscore, rankdata, skew, kurtosis
from copulas.multivariate import GaussianMultivariate
import warnings
warnings.filterwarnings('ignore')

# Import Spark
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, udf, broadcast, rand, randn, lit, when, array, struct
from pyspark.sql.types import DoubleType, ArrayType, StructType, StructField, IntegerType
import pyspark.sql.functions as F
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors, VectorUDT

# ============================================================================
# PH·∫¶N 1: SPARK SESSION OPTIMIZATION
# ============================================================================

def init_spark_optimized(app_name="OptimizedSparkSynthetic"):
    """Spark Session ƒë∆∞·ª£c t·ªëi ∆∞u cho synthetic data generation"""
    spark = SparkSession.builder \
        .appName(app_name) \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
        .config("spark.sql.shuffle.partitions", "200") \
        .config("spark.default.parallelism", "16") \
        .config("spark.sql.execution.arrow.maxRecordsPerBatch", "10000") \
        .config("spark.sql.broadcastTimeout", "600") \
        .config("spark.driver.memory", "4g") \
        .config("spark.executor.memory", "4g") \
        .config("spark.driver.maxResultSize", "2g") \
        .getOrCreate()
    
    spark.sparkContext.setLogLevel("WARN")
    return spark

# ============================================================================
# PH·∫¶N 2: DISTRIBUTION FUNCTIONS - OPTIMIZED FOR SPARK
# ============================================================================

def fuzz_param(param, IN):
    """GI·ªÆ NGUY√äN - ƒë∆∞·ª£c s·ª≠ d·ª•ng trong UDF"""
    return random.uniform(param*(1-IN), param*(1+IN))

# Broadcast global parameters to all executors
def create_optimized_distribution_udfs(spark, global_params):
    """T·∫°o UDF ƒë∆∞·ª£c t·ªëi ∆∞u v·ªõi broadcast variables"""
    
    # Broadcast parameters ƒë·ªÉ tr√°nh serialize nhi·ªÅu l·∫ßn
    broadcast_params = spark.sparkContext.broadcast(global_params)
    
    def erlang_udf_factory(k_val, expected_value_val, IN_val):
        @udf(returnType=DoubleType())
        def erlang_sample(seed_col):
            # S·ª≠ d·ª•ng seed t·ª´ row ƒë·ªÉ ƒë·∫£m b·∫£o reproducibility
            random.seed(int(seed_col) + 42)
            
            params = broadcast_params.value
            theta_mean = k_val / expected_value_val
            
            xN = 0.0
            for _ in range(int(k_val)):
                u = random.uniform(0.001, 0.999)
                theta_fuzzed = fuzz_param(theta_mean, IN_val)
                xN += -math.log(u) / theta_fuzzed
            
            return float(xN)
        return erlang_sample
    
    def gamma_udf_factory(alpha_val, beta_val, IN_alpha_val, k_val):
        @udf(returnType=DoubleType())
        def gamma_sample(seed_col):
            random.seed(int(seed_col) + 42)
            np.random.seed(int(seed_col) + 42)
            
            IN_beta = k_val * IN_alpha_val
            alpha_fuzz = fuzz_param(alpha_val, IN_alpha_val)
            beta_fuzz = max(0.01, fuzz_param(beta_val, IN_beta))
            
            # S·ª≠ d·ª•ng numpy gamma
            sample = np.random.gamma(shape=alpha_fuzz, scale=1/beta_fuzz)
            return float(sample)
        return gamma_sample
    
    def gaussian_udf_factory(mu_val, sigma_val, IN_sigma_val, k_val):
        @udf(returnType=DoubleType())
        def gaussian_sample(seed_col):
            random.seed(int(seed_col) + 42)
            
            IN_mu = k_val * IN_sigma_val
            fuzzed_mu = fuzz_param(mu_val, IN_mu)
            fuzzed_sigma = max(1e-6, fuzz_param(sigma_val, IN_sigma_val))
            
            # Box-Muller transform
            u1, u2 = random.uniform(0, 1), random.uniform(0, 1)
            z0 = math.sqrt(-2 * math.log(u1)) * math.cos(2 * math.pi * u2)
            
            return float(fuzzed_mu + fuzzed_sigma * z0)
        return gaussian_sample
    
    return {
        "erlang_factory": erlang_udf_factory,
        "gamma_factory": gamma_udf_factory,
        "gaussian_factory": gaussian_udf_factory
    }

# ============================================================================
# PH·∫¶N 3: SPARK-NATIVE STATISTICS - MAJOR IMPROVEMENT
# ============================================================================

def compute_advanced_stats_spark_native(spark_df):
    """
    MAJOR IMPROVEMENT: T√≠nh skewness v√† kurtosis ho√†n to√†n tr√™n Spark
    Kh√¥ng c·∫ßn toPandas() -> ti·∫øt ki·ªám memory v√† tƒÉng t·ªëc ƒë√°ng k·ªÉ
    """
    stats_dict = {}
    print("T√≠nh to√°n th·ªëng k√™ ho√†n to√†n tr√™n Spark (kh√¥ng toPandas)...")
    
    # B∆∞·ªõc 1: T√≠nh basic stats
    stat_exprs = []
    for col_name in spark_df.columns:
        stat_exprs.extend([
            F.mean(col(col_name)).alias(f"{col_name}_mean"),
            F.stddev(col(col_name)).alias(f"{col_name}_std"),
            F.min(col(col_name)).alias(f"{col_name}_min"),
            F.max(col(col_name)).alias(f"{col_name}_max"),
            F.count(col(col_name)).alias(f"{col_name}_count"),
            F.countDistinct(col(col_name)).alias(f"{col_name}_unique")
        ])
    
    basic_stats = spark_df.select(*stat_exprs).collect()[0]
    
    # B∆∞·ªõc 2: T√≠nh skewness v√† kurtosis tr√™n Spark
    for col_name in spark_df.columns:
        mean_val = basic_stats[f"{col_name}_mean"]
        std_val = basic_stats[f"{col_name}_std"]
        
        if std_val > 0:
            # T√≠nh skewness: E[((X - Œº)/œÉ)¬≥]
            skew_expr = F.mean(
                F.pow((col(col_name) - lit(mean_val)) / lit(std_val), 3)
            ).alias(f"{col_name}_skew")
            
            # T√≠nh kurtosis: E[((X - Œº)/œÉ)‚Å¥] - 3  
            kurt_expr = F.mean(
                F.pow((col(col_name) - lit(mean_val)) / lit(std_val), 4)
            ).alias(f"{col_name}_kurt")
            
            advanced_stats = spark_df.select(skew_expr, kurt_expr).collect()[0]
            skew_val = advanced_stats[f"{col_name}_skew"]
            kurt_val = advanced_stats[f"{col_name}_kurt"] - 3  # Fisher's definition
        else:
            skew_val = 0
            kurt_val = 0
        
        # Compile final stats
        count_val = basic_stats[f"{col_name}_count"]
        unique_val = basic_stats[f"{col_name}_unique"]
        min_val = basic_stats[f"{col_name}_min"]
        max_val = basic_stats[f"{col_name}_max"]
        
        stats_dict[col_name] = {
            "mean": mean_val,
            "std": std_val,
            "skewness": skew_val,
            "kurtosis": kurt_val,
            "min": min_val,
            "max": max_val,
            "unique_ratio": unique_val / count_val,
            "cv": std_val / (abs(mean_val) + 1e-8)
        }
    
    return stats_dict

def select_best_distribution(stats_dict):
    """GI·ªÆ NGUY√äN logic selection"""
    skew_val = stats_dict["skewness"]
    kurt_val = stats_dict["kurtosis"]
    min_val = stats_dict["min"]
    cv = stats_dict["cv"]
    std = stats_dict["std"]

    if std <= 0:
        return "gaussian", "zero_variance"
    if min_val < 0:
        return "gaussian", "negative_values"

    if skew_val > 1.5:
        if cv > 0.7:
            return "erlang", f"high_skew={skew_val:.2f}_cv={cv:.2f}"
        else:
            return "gamma", f"high_skew={skew_val:.2f}_low_cv={cv:.2f}"
    elif 0.5 <= skew_val <= 1.5:
        if cv > 1:
            return "erlang", f"moderate_skew={skew_val:.2f}_high_cv={cv:.2f}"
        else:
            return "gamma", f"moderate_skew={skew_val:.2f}_cv={cv:.2f}"
    elif abs(skew_val) < 0.5:
        return "gaussian", f"symmetric_skew={skew_val:.2f}"
    else:
        return "gaussian", f"fallback_skew={skew_val:.2f}"

# ============================================================================
# PH·∫¶N 4: VECTORIZED MARGINAL GENERATION ON SPARK
# ============================================================================

def generate_all_marginals_spark_vectorized(spark, distribution_map, stats_dict, global_params, size):
    """
    MAJOR IMPROVEMENT: Sinh t·∫•t c·∫£ marginal distributions trong 1 Spark job
    thay v√¨ nhi·ªÅu jobs ri√™ng bi·ªát
    """
    print(f"Sinh t·∫•t c·∫£ marginal distributions trong 1 Spark job...")
    
    # T·∫°o base DataFrame v·ªõi seed column
    base_df = spark.range(size).select(
        col("id").alias("row_id"),
        (F.lit(42) + col("id")).cast(IntegerType()).alias("seed")
    )
    
    # T·∫°o t·∫•t c·∫£ UDF factories
    udf_factories = create_optimized_distribution_udfs(spark, global_params)
    
    # Build expressions cho t·∫•t c·∫£ columns trong 1 l·∫ßn
    column_exprs = [col("row_id")]
    
    for col_name, dist_info in distribution_map.items():
        dist_type = dist_info['distribution']
        col_stats = stats_dict[col_name]
        
        if dist_type == "erlang":
            params = global_params["erlang"]
            udf_func = udf_factories["erlang_factory"](
                params["k"], 
                col_stats["mean"], 
                params["IN"]
            )
            column_exprs.append(udf_func(col("seed")).alias(f"marginal_{col_name}"))
            
        elif dist_type == "gamma":
            params = global_params["gamma"]
            col_mean = col_stats["mean"]
            col_std = col_stats["std"]
            
            if col_std > 0:
                alpha = (col_mean / col_std) ** 2
                beta = col_mean / (col_std ** 2)
            else:
                alpha, beta = 1.0, 1.0
            
            udf_func = udf_factories["gamma_factory"](
                alpha, beta, 
                params["IN_alpha"], 
                params["k_link"]
            )
            column_exprs.append(udf_func(col("seed")).alias(f"marginal_{col_name}"))
            
        elif dist_type == "gaussian":
            params = global_params["gaussian"]
            udf_func = udf_factories["gaussian_factory"](
                col_stats["mean"],
                max(1e-6, col_stats["std"]),
                params["IN_sigma"],
                params["k_link"]
            )
            column_exprs.append(udf_func(col("seed")).alias(f"marginal_{col_name}"))
        
        else:  # Fallback
            params = global_params["gaussian"]
            udf_func = udf_factories["gaussian_factory"](
                col_stats["mean"],
                max(1e-6, col_stats["std"]),
                params["IN_sigma"], 
                params["k_link"]
            )
            column_exprs.append(udf_func(col("seed")).alias(f"marginal_{col_name}"))
    
    # Execute t·∫•t c·∫£ marginals trong 1 Spark action
    marginals_spark_df = base_df.select(*column_exprs)
    
    # Cache ƒë·ªÉ tr√°nh recomputation
    marginals_spark_df.cache()
    
    # Convert to dictionary format (ch·ªâ collect 1 l·∫ßn)
    marginal_data = marginals_spark_df.collect()
    
    marginal_samples = {}
    for original_col in distribution_map.keys():
        marginal_col = f"marginal_{original_col}"
        marginal_samples[original_col] = np.array([
            row[marginal_col] for row in marginal_data
        ])
    
    return marginal_samples

# ============================================================================
# PH·∫¶N 5: SPARK-OPTIMIZED COPULA PROCESSING
# ============================================================================

def fit_copula_with_spark_correlation(spark_df):
    """
    S·ª≠ d·ª•ng Spark MLlib ƒë·ªÉ t√≠nh correlation matrix
    T·ªëi ∆∞u cho large datasets
    """
    print("Fitting copula s·ª≠ d·ª•ng Spark MLlib correlation...")
    
    # Chuy·ªÉn th√†nh vector format cho MLlib
    assembler = VectorAssembler(
        inputCols=spark_df.columns,
        outputCol="features"
    )
    vector_df = assembler.transform(spark_df).select("features")
    
    # T√≠nh correlation matrix b·∫±ng Spark MLlib
    corr_matrix = Correlation.corr(vector_df, "features", "pearson").head()[0]
    correlation_array = corr_matrix.toArray()
    
    # T·∫°o copula model v·ªõi correlation matrix n√†y
    copula_model = GaussianMultivariate()
    copula_model._correlation = correlation_array
    copula_model.columns = list(spark_df.columns)
    
    return copula_model

def transform_uniform_to_distribution(uniform_vals, target_samples):
    """GI·ªÆ NGUY√äN logic transform"""
    sorted_target = np.sort(target_samples)
    n = len(sorted_target)
    
    transformed = []
    for u in uniform_vals:
        idx = int(u * (n - 1))
        if idx >= n - 1:
            transformed.append(sorted_target[-1])
        else:
            alpha = (u * (n - 1)) - idx
            val = sorted_target[idx] * (1 - alpha) + sorted_target[idx + 1] * alpha
            transformed.append(val)
    
    return np.array(transformed)

# ============================================================================
# PH·∫¶N 6: MAIN IMPROVED FUNCTION
# ============================================================================

def generate_adaptive_synthetic_spark_improved(spark_df, global_params=None, column_overrides=None, size=None):
    """
    IMPROVED VERSION: T·ªëi ∆∞u c√°c bottlenecks ch√≠nh trong Spark approach
    
    Key Improvements:
    1. Spark-native skewness/kurtosis computation (no toPandas for stats)
    2. Vectorized marginal generation (all columns in 1 Spark job)  
    3. MLlib correlation matrix for large datasets
    4. Broadcast variables for UDF optimization
    5. Proper caching and partition management
    """
    
    if size is None:
        size = spark_df.count()
    
    if global_params is None:
        global_params = {
            "erlang": {"k": 5, "IN": 0.35},
            "gamma": {"k_link": 0.3, "IN_alpha": 0.35}, 
            "gaussian": {"k_link": 0.3, "IN_sigma": 0.35}
        }
    
    spark = spark_df.sql_ctx.sparkSession
    
    
    # Step 1: IMPROVED - Spark-native statistics (major speedup)
    t1 = time.time()
    stats_dict = compute_advanced_stats_spark_native(spark_df)
    print(f"Stats computed (Spark-native) in {time.time() - t1:.2f}s")
    
    # Step 2: Distribution selection - GI·ªÆ NGUY√äN
    distribution_map = {}
    for col_name, stats in stats_dict.items():
        chosen_dist, reason = select_best_distribution(stats)
        analysis = stats.copy()
        analysis["distribution"] = chosen_dist
        analysis["reason"] = reason
        distribution_map[col_name] = analysis
        print(f"   {col_name}: {chosen_dist} ({reason})")
    
    # Apply overrides
    if column_overrides:
        for col_name, override_dist in column_overrides.items():
            if col_name in distribution_map:
                distribution_map[col_name]['distribution'] = override_dist
    
    # Step 3: IMPROVED - Vectorized marginal generation 
    t2 = time.time()
    marginal_samples = generate_all_marginals_spark_vectorized(
        spark, distribution_map, stats_dict, global_params, size
    )
    print(f" All marginals generated (vectorized) in {time.time() - t2:.2f}s")
    
    # Step 4: IMPROVED - Spark MLlib copula fitting cho large dataset
    t3 = time.time()
    dataset_size = spark_df.count()
    
    if dataset_size > 100000:  # Large dataset
        copula_model = fit_copula_with_spark_correlation(spark_df)
        print(f"Copula fitted (Spark MLlib) in {time.time() - t3:.2f}s")
    else:  # Small dataset - traditional approach
        df_pandas = spark_df.toPandas()
        copula_model = GaussianMultivariate()
        copula_model.fit(df_pandas)
        print(f"Copula fitted (traditional) in {time.time() - t3:.2f}s")
    
    # Step 5: Copula sampling and transform - GI·ªÆ NGUY√äN (kh√¥ng th·ªÉ t·ªëi ∆∞u th√™m)
    t4 = time.time()
    copula_samples = copula_model.sample(size)
    
    # Ch·ªâ toPandas() 1 l·∫ßn cho vi·ªác t√≠nh percentile
    if dataset_size <= 100000:
        df_pandas_ref = df_pandas  # ƒê√£ c√≥ t·ª´ b∆∞·ªõc 4
    else:
        df_pandas_ref = spark_df.toPandas()  # C·∫ßn toPandas() ƒë·ªÉ t√≠nh percentile
    
    synthetic_df = pd.DataFrame(index=range(size), columns=spark_df.columns)
    
    for col_name in spark_df.columns:
        copula_vals = copula_samples[col_name].values
        
        # Transform v·ªÅ uniform [0,1]
        uniform_vals = []
        for val in copula_vals:
            percentile = percentileofscore(df_pandas_ref[col_name], val, kind='rank') / 100
            uniform_vals.append(percentile)
        
        uniform_vals = np.array(uniform_vals)
        uniform_vals = np.clip(uniform_vals, 0.001, 0.999)
        
        # Transform uniform th√†nh target distribution
        synthetic_col = transform_uniform_to_distribution(
            uniform_vals, 
            marginal_samples[col_name]
        )
        synthetic_df[col_name] = synthetic_col
    
    print(f"‚úÖ Copula sampling & transform in {time.time() - t4:.2f}s")
    
    return synthetic_df, distribution_map

# ============================================================================
# PH·∫¶N 7: UTILITY & MAIN EXECUTION
# ============================================================================

def handle_binary_columns_spark(df_synthetic, spark_df_original, binary_cols=None):
    """GI·ªÆ NGUY√äN logic x·ª≠ l√Ω binary columns"""
    if binary_cols is None:
        df_pandas = spark_df_original.toPandas()
        binary_cols = []
        for col in df_pandas.columns:
            unique_ratio = df_pandas[col].nunique() / len(df_pandas)
            if unique_ratio <= 0.1 and df_pandas[col].nunique() <= 10:
                binary_cols.append(col)
    
    # X·ª≠ l√Ω binary columns - GI·ªÆ NGUY√äN
    df_pandas_original = spark_df_original.toPandas()
    for col in binary_cols:
        if col in df_synthetic.columns:
            original_values = sorted(df_pandas_original[col].unique())
            
            if len(original_values) == 2:
                threshold = np.median(df_synthetic[col])
                df_synthetic[col] = np.where(
                    df_synthetic[col] >= threshold, 
                    original_values[1], 
                    original_values[0]
                )
            else:
                synthetic_vals = df_synthetic[col].values
                quantized = []
                for val in synthetic_vals:
                    closest_val = min(original_values, key=lambda x: abs(x - val))
                    quantized.append(closest_val)
                df_synthetic[col] = quantized
    
    return binary_cols

def main():
    random.seed(42)
    np.random.seed(42)
    
    # Kh·ªüi t·∫°o Spark v·ªõi config t·ªëi ∆∞u
    spark = init_spark_optimized("ImprovedSparkSynthetic")
    
    try:
        print("ƒê·ªçc dataset...")
        spark_df = spark.read.csv("diabetes.csv", header=True, inferSchema=True)
        row_count = spark_df.count()
        col_count = len(spark_df.columns)
        print(f" Dataset: {row_count} rows x {col_count} columns")
        
        sizes = [768]
        IN_values = [0.1, 0.15, 0.2, 0.25, 0.3, 0.35]
        
        for size in sizes:
            for IN in IN_values:
                print(f"\n{'='*60}")
                print(f" Spark Generation: size={size}, IN={IN}")
                
                start_time = time.time()
                
                global_params = {
                    "erlang": {"k": 3, "IN": IN},        
                    "gamma": {"k_link": 0.5, "IN_alpha": IN},  
                    "gaussian": {"k_link": 0.3, "IN_sigma": IN}
                }
                
                df_synthetic, distribution_info = generate_adaptive_synthetic_spark_improved(
                    spark_df,
                    global_params=global_params,
                    size=size
                )
                
                # Handle binary columns
                binary_cols = handle_binary_columns_spark(df_synthetic, spark_df)
                
                # Save results
                output_file = f"synthetic_spark_improved_{size}_{int(IN*100)}.csv"
                df_synthetic.to_csv(output_file, index=False)
                
                elapsed = time.time() - start_time
                print(f" Completed in {elapsed:.2f}s | Speed: {size/elapsed:.0f} samples/sec")
        
        print("\nüéâ All improved Spark generations completed!")
        
    finally:
        spark.stop()

if __name__ == "__main__":
    main()


ƒê·ªçc dataset...
 Dataset: 768 rows x 9 columns

üîÑ Spark Generation: size=768, IN=0.1
T√≠nh to√°n th·ªëng k√™ ho√†n to√†n tr√™n Spark (kh√¥ng toPandas)...
Stats computed (Spark-native) in 1.06s
   Pregnancies: gamma (moderate_skew=0.90_cv=0.88)
   Glucose: gaussian (symmetric_skew=0.17)
   BloodPressure: gaussian (fallback_skew=-1.84)
   SkinThickness: gaussian (symmetric_skew=0.11)
   Insulin: erlang (high_skew=2.26_cv=1.44)
   BMI: gaussian (symmetric_skew=-0.43)
   DiabetesPedigreeFunction: erlang (high_skew=1.91_cv=0.70)
   Age: gamma (moderate_skew=1.13_cv=0.35)
   Outcome: erlang (moderate_skew=0.63_high_cv=1.37)
Sinh t·∫•t c·∫£ marginal distributions trong 1 Spark job...
 All marginals generated (vectorized) in 1.01s
Copula fitted (traditional) in 3.29s
‚úÖ Copula sampling & transform in 2.77s
 Completed in 8.17s | Speed: 94 samples/sec

üîÑ Spark Generation: size=768, IN=0.15
T√≠nh to√°n th·ªëng k√™ ho√†n to√†n tr√™n Spark (kh√¥ng toPandas)...
Stats computed (Spark-native)