In [1]:
from dask.distributed import Client
import multiprocessing

# Automatically get the number of available CPUs
num_cpus = multiprocessing.cpu_count()

# Create a Dask client using all available vCPUs
client = Client(n_workers=8, threads_per_worker=1, memory_limit='3GB', timeout="120s")

print(client)

<Client: 'tcp://127.0.0.1:36989' processes=8 threads=8, memory=22.35 GiB>




In [2]:
from google.cloud import storage
from scipy.stats import normaltest
import dask.dataframe as dd
import numpy as np
from scipy.stats.mstats import winsorize

In [4]:
import dask.dataframe as dd

# ✅ Define the GCS path for the Parquet file
parquet_gcs_path = "gs://demo-shubham/df.parquet"


df = dd.read_parquet(
    parquet_gcs_path,
    blocksize="50MB",  
    assume_missing=True,  # Ensures int columns with missing values convert to float
    low_memory=False  # Reduces memory overhead
)
# ✅ Show a preview (Dask loads data lazily)
print(df.head(2))  # Reads only a small portion

# ✅ Check column names
print(df.columns.tolist())

  Country (region)  Ladder  SD of Ladder  Positive affect  Negative affect  \
0          Finland       1             4             41.0             10.0   
1          Denmark       2            13             24.0             26.0   

   Social support  Freedom  Corruption  Generosity  Log of GDP\nper capita  \
0             2.0      5.0         4.0        47.0                    22.0   
1             4.0      6.0         3.0        22.0                    14.0   

   Healthy life\nexpectancy  
0                      27.0  
1                      23.0  
['Country (region)', 'Ladder', 'SD of Ladder', 'Positive affect', 'Negative affect', 'Social support', 'Freedom', 'Corruption', 'Generosity', 'Log of GDP\nper capita', 'Healthy life\nexpectancy']


In [5]:
# ✅ Persist the DataFrame to keep it in distributed memory
df = df.persist()  # ✅ Reduce partitions

In [6]:
# Identify columns with all NaN values
columns_to_drop = [col for col in df.columns if df[col].isna().all().compute()]

# Drop the columns
df = df.drop(columns=columns_to_drop)

In [7]:
import dask.dataframe as dd

# ✅ Automatically detect and convert object columns to datetime (ISO 8601 format)
for col in df.columns:
    if df[col].dtype == 'object':  # Check for potential date-like columns
        df[col] = dd.to_datetime(df[col], errors='coerce')  # Convert to datetime

# ✅ Standardize datetime format (Ensuring proper execution across partitions)
df = df.map_partitions(lambda d: d.assign(
    **{col: d[col].dt.strftime('%Y-%m-%d %H:%M:%S') for col in d.select_dtypes(include=['datetime64']).columns}
))

In [8]:
import dask.dataframe as dd
import numpy as np

# Define batch size
batch_size = 2  # Adjust based on available memory

# Identify numerical columns
numerical_cols = df.select_dtypes(include=["float64","int64"]).columns

# Convert to a Dask DataFrame if not already
if not isinstance(df, dd.DataFrame):
    df = dd.from_pandas(df, npartitions=10)  # Adjust partitions based on your dataset size

# Process in batches
for i in range(0, len(numerical_cols), batch_size):
    batch = numerical_cols[i : i + batch_size]  # Get the batch of columns
    print(f"Processing batch: {batch}")

    # Compute median for the batch at once (more efficient than per column)
    median_values = df[batch].median_approximate().compute()

    # Fill missing values for each column in the batch
    for col in batch:
        df[col] = df[col].fillna(median_values[col])
        print(f"Imputed median for {col}: {median_values[col]}")


Processing batch: Index(['Ladder', 'SD of Ladder'], dtype='object')
Imputed median for Ladder: 78.5
Imputed median for SD of Ladder: 78.5
Processing batch: Index(['Positive affect', 'Negative affect'], dtype='object')
Imputed median for Positive affect: 78.0
Imputed median for Negative affect: 78.0
Processing batch: Index(['Social support', 'Freedom'], dtype='object')
Imputed median for Social support: 78.0
Imputed median for Freedom: 78.0
Processing batch: Index(['Corruption', 'Generosity'], dtype='object')
Imputed median for Corruption: 74.5
Imputed median for Generosity: 78.0
Processing batch: Index(['Log of GDP\nper capita', 'Healthy life\nexpectancy'], dtype='object')
Imputed median for Log of GDP
per capita: 76.5
Imputed median for Healthy life
expectancy: 75.5


In [9]:
def detect_outliers_dask(df, method=None, threshold=1.5):
    """
    Detects outliers using IQR or Z-score, with an option to auto-select the best method.
    """
    outlier_percentages = {}
    numeric_cols = df.select_dtypes(include=[np.number]).columns

    if method is None:
        # Take a small sample (10%) efficiently using random_split
        sample_frac = 0.1
        df_sample, _ = df.random_split([sample_frac, 1 - sample_frac])
        df_sample = df_sample.compute()  # Convert sample to Pandas DataFrame

        normality_pvals = df_sample[numeric_cols].apply(lambda x: normaltest(x.dropna())[1])

        if (normality_pvals > 0.05).all():  
            method = "zscore"  # If p > 0.05, assume normal distribution
        else:
            method = "iqr"  # Otherwise, assume non-normal distribution

        print(f"Auto-selected method: {method}")

    # Now, use IQR or Z-score as before
    if method == "iqr":
        for col in numeric_cols:
            quantiles = df[col].quantile([0.25, 0.75]).compute()
            Q1, Q3 = quantiles.loc[0.25], quantiles.loc[0.75]
            IQR = Q3 - Q1
            lower_bound = Q1 - threshold * IQR
            upper_bound = Q3 + threshold * IQR

            mask = (df[col] < lower_bound) | (df[col] > upper_bound)
            outlier_count = mask.sum().compute()
            total_rows = len(df)
            outlier_percentages[col] = (outlier_count / total_rows) * 100

    elif method == "zscore":
        for col in numeric_cols:
            mean, std = df[col].mean().compute(), df[col].std().compute()
            mask = abs((df[col] - mean) / std) > threshold
            outlier_count = mask.sum().compute()
            total_rows = len(df)
            outlier_percentages[col] = (outlier_count / total_rows) * 100

    else:
        raise ValueError("Method must be 'iqr' or 'zscore'.")

    return outlier_percentages

In [10]:
def clean_or_winsorize_dask(df, outlier_percentages, threshold=5):
    """
    Cleans or applies Winsorization based on outlier percentage.
    
    Parameters:
    - df: Dask DataFrame
    - outlier_percentages: Dictionary of outlier percentages per column.
    - threshold: Percentage threshold to decide between cleaning or Winsorization.
    
    Returns:
    - Dask DataFrame with outliers cleaned or Winsorized.
    """
    numeric_cols = list(outlier_percentages.keys())

    # Compute IQR bounds once for all columns
    stats = df[numeric_cols].quantile([0.25, 0.75]).compute()
    Q1, Q3 = stats.loc[0.25], stats.loc[0.75]
    IQR = Q3 - Q1
    lower_bound, upper_bound = Q1 - 1.5 * IQR, Q3 + 1.5 * IQR

    df_out = df.copy()  # Create a copy to avoid modifying the original

    for col in numeric_cols:
        if outlier_percentages[col] <= threshold:
            print(f"Removing {outlier_percentages[col]:.2f}% outliers from column '{col}'.")
            df_out = df_out.assign(**{col: df[col].where((df[col] >= lower_bound[col]) & (df[col] <= upper_bound[col]))})
        else:
            print(f"Applying Winsorization to column '{col}' with {outlier_percentages[col]:.2f}% outliers.")
            df_out = df_out.assign(**{col: df[col].map_partitions(lambda x: winsorize(x, limits=(0.05, 0.05)), meta=(col, "float64"))})

    return df_out

In [11]:
outlier_percentages = detect_outliers_dask(df, method=None)

  return hypotest_fun_in(*args, **kwds)


Auto-selected method: iqr


In [12]:
df_cleaned = clean_or_winsorize_dask(df, outlier_percentages, threshold=5)

Removing 0.00% outliers from column 'Ladder'.
Removing 0.00% outliers from column 'SD of Ladder'.
Removing 0.00% outliers from column 'Positive affect'.
Removing 0.00% outliers from column 'Negative affect'.
Removing 0.00% outliers from column 'Social support'.
Removing 0.00% outliers from column 'Freedom'.
Removing 0.00% outliers from column 'Corruption'.
Removing 0.00% outliers from column 'Generosity'.
Removing 0.00% outliers from column 'Log of GDP
per capita'.
Removing 0.00% outliers from column 'Healthy life
expectancy'.


In [13]:
# ✅ Convert FL_DATE to datetime format
#df = df.map_partitions(lambda d: d.assign(FL_DATE=dd.to_datetime(d["FL_DATE"], errors="coerce")))

In [15]:
import dask.dataframe as dd
import pandas as pd
import numpy as np
from dask_ml.preprocessing import LabelEncoder
from dask import delayed
from sklearn.preprocessing import OneHotEncoder

categorical_cols = df.select_dtypes(include=["object", "category","string"]).columns
# ✅ Encode categorical variables
for col in categorical_cols:
    unique_count = df[col].nunique().compute()  # Get number of unique values
    
    if unique_count <= 10:
        # **One-Hot Encoding** for low-cardinality categorical columns
        encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
        
        # Apply One-Hot Encoding using map_partitions
        df = df.map_partitions(
            lambda d: d.join(
                pd.DataFrame(
                    encoder.fit_transform(d[[col]]),
                    index=d.index,
                    columns=[f"{col}_{i}" for i in range(unique_count)]
                )
            ), 
            meta=df
        )
        
        df = df.drop(columns=[col])  # Drop original column after encoding
        print(f"Applied One-Hot Encoding to {col}")

    else:
        # **Label Encoding** for high-cardinality categorical columns
        encoder = LabelEncoder()
        df[col] = df[col].map_partitions(lambda x: encoder.fit_transform(x), meta=(col, 'int64'))
        print(f"Applied Label Encoding to {col}")

# ✅ Check the transformed DataFrame
#print(df.dtypes)

Applied Label Encoding to Country (region)


In [16]:
print(df.columns)  # Check if 'FL_DATE' exists


Index(['Country (region)', 'Ladder', 'SD of Ladder', 'Positive affect',
       'Negative affect', 'Social support', 'Freedom', 'Corruption',
       'Generosity', 'Log of GDP\nper capita', 'Healthy life\nexpectancy'],
      dtype='object')


In [17]:
import dask.dataframe as dd
from dask_ml.preprocessing import MinMaxScaler
import numpy as np


# ✅ Identify column types
numerical_cols = df.select_dtypes(include=['float64', 'int64']).columns
categorical_cols = df.select_dtypes(include=['object', 'string[pyarrow]']).columns

# ✅ Initialize MinMaxScaler
scaler = MinMaxScaler()

# ✅ Function to scale only numerical columns, keeping categorical & datetime columns unchanged
def scale_partition(partition):
    partition[numerical_cols] = scaler.fit_transform(partition[numerical_cols].to_numpy())
    return partition

# ✅ Define `meta` using the correct column order from `df.columns`
meta = {col: np.float64 for col in numerical_cols}  # Numerical columns
meta.update({col: "string" for col in categorical_cols})  # Categorical columns
meta["FL_DATE"] = "datetime64[ns]"  # Ensure FL_DATE is properly formatted

# ✅ Fix column order to match `df.columns`
meta = {col: meta[col] for col in df.columns if col in meta}  # Reorder meta to match df.columns

# ✅ Apply scaling in parallel using map_partitions
df_scaled = df.map_partitions(scale_partition, meta=meta)

In [18]:
#✅ Preview the scaled data
#df_scaled.head()

In [19]:
import dask.dataframe as dd
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
from google.cloud import storage  # GCS upload

# ✅ Define GCS output path
output_path = "gs://demo-shubham/scaled-data.parquet"

# ✅ Convert Dask DataFrame to Pandas in chunks & write to GCS
gcs_client = storage.Client()
bucket = gcs_client.bucket("demo-shubham")
blob = bucket.blob("scaled-data.parquet")

# ✅ Open a Parquet writer
schema = pa.Schema.from_pandas(df_scaled.head())  # Infer schema from first few rows
sink = pa.BufferOutputStream()  # Write in memory before uploading

with pq.ParquetWriter(sink, schema=schema, compression="snappy") as writer:
    for i, partition in enumerate(df_scaled.to_delayed()):  # Iterate over Dask partitions
        print(f"Processing chunk {i + 1}...")
        df_chunk = partition.compute()  # Convert only this partition to Pandas
        table = pa.Table.from_pandas(df_chunk)  # Convert to Apache Arrow table
        writer.write_table(table)  # Append to Parquet file

# ✅ Upload to GCS
blob.upload_from_string(sink.getvalue().to_pybytes(), content_type="application/octet-stream")
print(f"✅ Successfully uploaded processed data to: {output_path}")


Processing chunk 1...
✅ Successfully uploaded processed data to: gs://demo-shubham/scaled-data.parquet


In [20]:
!jupyter nbconvert --to script Normalization.ipynb

[NbConvertApp] Converting notebook Normalization.ipynb to script
[NbConvertApp] Writing 10432 bytes to Normalization.py
