In [1]:
import os
import time
import psutil
import gc
import logging
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, avg
import xarray as xr
from contextlib import contextmanager

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("scalability_analysis.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger()

# Configuration for scalability analysis
DATASET_SIZES = ["small", "medium", "large"]  # Define dataset sizes
CLUSTER_SIZES = [2, 4, 8]  # Define cluster sizes 
OUTPUT_DIR = "scalability_results"
os.makedirs(OUTPUT_DIR, exist_ok=True)

@contextmanager
def memory_monitoring():
    """Context manager to monitor memory usage during operations."""
    process = psutil.Process(os.getpid())
    mem_before = process.memory_info().rss / (1024 * 1024)  # MB
    cpu_before = psutil.cpu_percent()
    start_time = time.time()
    
    try:
        yield
    finally:
        end_time = time.time()
        mem_after = process.memory_info().rss / (1024 * 1024)  # MB
        cpu_after = psutil.cpu_percent()
        
        logger.info(f"Time elapsed: {end_time - start_time:.2f} seconds")
        logger.info(f"Memory usage: {mem_after - mem_before:.2f} MB")
        logger.info(f"CPU usage: {(cpu_before + cpu_after) / 2:.2f}%")

def get_spark_session(cluster_size):
    """Create and configure a Spark session with appropriate resources."""
    try:
        # Configure Spark with memory constraints and cluster size
        return SparkSession.builder\
            .master("spark://192.168.2.156:7077") \
            .appName("scalability_group_28")\
            .config("spark.executor.memory", "2g")\
            .config("spark.driver.memory", "4g")\
            .config("spark.memory.offHeap.enabled", "true")\
            .config("spark.memory.offHeap.size", "2g")\
            .config("spark.executor.cores", str(cluster_size))\
            .config("spark.driver.maxResultSize", "1g")\
            .config("spark.sql.adaptive.enabled", "true")\
            .getOrCreate()
    except Exception as e:
        logger.error(f"Failed to initialize Spark: {e}")
        raise

def process_netcdf_chunk(file_path, spark, chunk_size=500):
    """Process a NetCDF file in chunks to avoid memory issues."""
    logger.info(f"Processing file: {file_path}")
    
    try:
        with memory_monitoring():
            # Open the NetCDF file with chunking options
            with xr.open_dataset(file_path, chunks={'time': chunk_size}) as ds:
                # Instead of loading the entire dataset at once, we'll process it in chunks
                yearly_avgs = []
                
                # Get unique years
                time_values = ds.time.values
                years = np.unique(pd.DatetimeIndex(time_values).year)
                
                # Process each year separately
                for yr in years:
                    logger.info(f"Processing year {yr}")
                    # Filter dataset for current year
                    year_ds = ds.sel(time=ds.time.dt.year == yr)
                    
                    # Extract necessary variables 
                    if 'tg' in year_ds:
                        # Calculate the mean for this year
                        yearly_avg = year_ds.tg.mean().compute()
                        yearly_avgs.append((yr, float(yearly_avg)))
                    
                    # garbage collection
                    del year_ds
                    gc.collect()
                
                # Create spark dataframe 
                if yearly_avgs:
                    schema = ["year", "yearly_avg_temperature"]
                    spark_df = spark.createDataFrame(yearly_avgs, schema=schema)
                    # Cache small result
                    spark_df.cache()
                    # Collect results 
                    results = spark_df.collect()
                    spark_df.unpersist()
                    return results
                return []
                
    except Exception as e:
        logger.error(f"Error processing {file_path}: {str(e)}")
        return []

def run_netcdf_processing(dataset_size, cluster_size):
    """
    Run the NetCDF processing script with the given dataset size and cluster size.
    """
    # Define file paths based on dataset size
    if dataset_size == "small":
        file_paths = ["tg_ens_mean_0.25deg_reg_1980-1994_v30.0e.nc"]
    elif dataset_size == "medium":
        file_paths = [
            "tg_ens_mean_0.25deg_reg_1980-1994_v30.0e.nc",
            "tg_ens_mean_0.25deg_reg_1995-2010_v30.0e.nc"
        ]
    elif dataset_size == "large":
        file_paths = [
            "tg_ens_mean_0.25deg_reg_1980-1994_v30.0e.nc",
            "tg_ens_mean_0.25deg_reg_1995-2010_v30.0e.nc",
            "tg_ens_mean_0.25deg_reg_2011-2024_v30.0e.nc"
        ]
    else:
        raise ValueError("Invalid dataset size")
    
    logger.info(f"Starting processing for dataset_size={dataset_size}, cluster_size={cluster_size}")
    
    # Initialize metrics
    start_time = time.time()
    process = psutil.Process(os.getpid())
    cpu_usage_samples = []
    memory_usage_start = process.memory_info().rss / (1024 * 1024)  # MB
    
    spark = None
    results = []
    
    try:
        # Start Spark session with the given cluster size
        spark = get_spark_session(cluster_size)
        
        # Process each file
        for file_path in file_paths:
            # Sample CPU usage periodically
            cpu_usage_samples.append(psutil.cpu_percent())
            
            # Process the file
            file_results = process_netcdf_chunk(file_path, spark)
            results.extend(file_results)
            
            # Garbage collection
            gc.collect()
            
            # Let system resources recover between files
            time.sleep(1)
    
    except Exception as e:
        logger.error(f"Error in processing run: {str(e)}")
    
    finally:
        # Stop the Spark session
        if spark:
            try:
                spark.stop()
                logger.info("Spark session stopped successfully")
            except:
                logger.warning("Error stopping Spark session")
    
    # Log end time and resource usage
    end_time = time.time()
    cpu_usage_samples.append(psutil.cpu_percent())
    memory_usage_end = process.memory_info().rss / (1024 * 1024)  # MB
    
    # Calculate metrics
    processing_time = end_time - start_time
    cpu_usage = sum(cpu_usage_samples) / len(cpu_usage_samples) if cpu_usage_samples else 0
    memory_usage = memory_usage_end - memory_usage_start
    
    logger.info(f"Completed processing: time={processing_time:.2f}s, CPU={cpu_usage:.2f}%, memory={memory_usage:.2f}MB")
    
    # Return metrics
    return {
        "dataset_size": dataset_size,
        "cluster_size": cluster_size,
        "processing_time": processing_time,
        "cpu_usage": cpu_usage,
        "memory_usage": memory_usage,
        "result_count": len(results)
    }

def perform_scalability_analysis():
    """
    Perform scalability analysis by running the NetCDF processing script with different configurations.
    """
    results = []
    
    # Create a checkpoint file 
    checkpoint_file = os.path.join(OUTPUT_DIR, "checkpoint.csv")
    completed_configs = set()
    
    # Check if checkpoint exists and load completed configurations
    if os.path.exists(checkpoint_file):
        checkpoint_df = pd.read_csv(checkpoint_file)
        for _, row in checkpoint_df.iterrows():
            completed_configs.add((row['dataset_size'], row['cluster_size']))
        logger.info(f"Loaded {len(completed_configs)} completed configurations from checkpoint")
        results = checkpoint_df.to_dict('records')
    
    # Run the analysis for each configuration
    for dataset_size in DATASET_SIZES:
        for cluster_size in CLUSTER_SIZES:
            # Skip configurations that have already been completed
            if (dataset_size, cluster_size) in completed_configs:
                logger.info(f"Skipping completed configuration: dataset_size={dataset_size}, cluster_size={cluster_size}")
                continue
            
            logger.info(f"Running scalability analysis for dataset_size={dataset_size}, cluster_size={cluster_size}")
            
            try:
                # Run the processing with this configuration
                result = run_netcdf_processing(dataset_size, cluster_size)
                results.append(result)
                
                # Update checkpoint file after each successful configuration
                pd.DataFrame(results).to_csv(checkpoint_file, index=False)
                completed_configs.add((dataset_size, cluster_size))
                
                # Stabilize the system 
                time.sleep(5)
                
            except Exception as e:
                logger.error(f"Failed analysis for dataset_size={dataset_size}, cluster_size={cluster_size}: {e}")
    
    # Save final results to a CSV file
    results_df = pd.DataFrame(results)
    results_df.to_csv(os.path.join(OUTPUT_DIR, "scalability_results.csv"), index=False)
    
    # Generate a simple summary
    summary = results_df.groupby(['dataset_size', 'cluster_size']).agg({
        'processing_time': 'mean',
        'cpu_usage': 'mean',
        'memory_usage': 'mean'
    }).reset_index()
    
    summary.to_csv(os.path.join(OUTPUT_DIR, "scalability_summary.csv"), index=False)
    
    logger.info("Scalability analysis complete. Results saved to scalability_results.csv")
    logger.info("Summary saved to scalability_summary.csv")

# Run scalability analysis
if __name__ == "__main__":
    try:
        perform_scalability_analysis()
    except Exception as e:
        logger.critical(f"Unhandled exception in main: {str(e)}")

2025-03-24 10:20:56,712 - INFO - Running scalability analysis for dataset_size=small, cluster_size=2
2025-03-24 10:20:56,715 - INFO - Starting processing for dataset_size=small, cluster_size=2
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/24 10:20:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2025-03-24 10:21:01,773 - INFO - Processing file: tg_ens_mean_0.25deg_reg_1980-1994_v30.0e.nc
2025-03-24 10:21:01,817 - INFO - Time elapsed: 0.04 seconds
2025-03-24 10:21:01,819 - INFO - Memory usage: 9.80 MB
2025-03-24 10:21:01,821 - INFO - CPU usage: 50.00%
2025-03-24 10:21:01,824 - ERROR - Error processing tg_ens_mean_0.25deg_reg_1980-1994_v30.0e.nc: [Errno 2] No such file or directory: '/home/ubuntu/DataEngineering/project/results/scalability_results/tg_ens_mean_0.25deg_reg_1980-1994_v30.0e.nc'
2025-03-24 10:21:03,046 - INFO