In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
import time
from datetime import datetime
from pathlib import Path


def log_timing(operation_name, start_time, timing_data):
    end_time = time.time()
    duration = end_time - start_time
    timing_data.append(f"{operation_name}: {duration:.2f} seconds")
    return end_time


def main():
    # Initialize Spark session
    # Create SparkSession with performance optimizations for local machine
    # spark = (
    #     SparkSession.builder.appName("Employee Analysis")
    #     # Memory configurations - carefully allocated for 16GB total RAM
    #     .config("spark.driver.memory", "10g")  # Leave ~6GB for OS and other processes
    #     .config("spark.memory.offHeap.enabled", "true")
    #     .config("spark.memory.offHeap.size", "2g")
    #     # Execution configurations - optimized for 8 cores

    #     # Serialization and compression
    #     .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    #     .config("spark.rdd.compress", "true")
    #     .config("spark.sql.inMemoryColumnarStorage.compressed", "true")
    #     # Local mode optimizatio        .config("spark.driver.host", "127.0.0.1") \
ns
    #     .config("spark.local.dir", "/tmp")  # Ensure adequate temp space
    #     .config("spark.sql.shuffle.partitions.local", "8")
    #     # Storage configurations
    #     .config(
    #         "spark.sql.files.maxPartitionBytes", "64MB"
    #     )  # Smaller chunks for local processing
    #     # Original config
    #     .config("spark.sql.legacy.timeParserPolicy", "LEGACY")
    #     .master("local[*]")  # Use all available cores
    #     .getOrCreate()
    # )
    spark = SparkSession.builder \
        .appName("ClusterStreamingApp") \
        .master("spark://84.16.230.94:7077") \
        .config("spark.executor.memory", "4g") \
        .config("spark.executor.cores", "2") \
        .config("spark.cores.max", "4") \
        .config("spark.sql.shuffle.partitions", "8") \
        .getOrCreate()
    # Initialize timing data
    timing_data = []
    overall_start = time.time()

    # Read CSV
    print("Starting PySpark analysis...")
    start_time = time.time()
    df = spark.read.csv("data/employee_records.csv", header=True, inferSchema=True)
    current_time = log_timing("Read CSV", start_time, timing_data)

    # Query 1: Average salary by department and experience level
    start_time = current_time
    dept_exp_salary = df.groupBy("department", "experience_level").agg(
        F.count("salary").alias("count"),
        F.round(F.mean("salary"), 2).alias("mean"),
        F.round(F.stddev("salary"), 2).alias("std"),
        F.min("salary").alias("min"),
        F.max("salary").alias("max"),
    )
    dept_exp_salary.cache()  # Cache for performance
    dept_exp_salary.count()  # Force computation
    current_time = log_timing(
        "Query 1: Department-Experience Level Salary Analysis", start_time, timing_data
    )

    # Query 2: Employee retention analysis
    start_time = current_time
    retention_analysis = (
        df.withColumn(
            "tenure_days",
            F.datediff(F.current_date(), F.to_date("join_date", "yyyy-MM-dd")),
        )
        .groupBy("department")
        .agg(
            F.round(F.mean("tenure_days"), 2).alias("mean_tenure_days"),
            F.min("tenure_days").alias("min_tenure_days"),
            F.max("tenure_days").alias("max_tenure_days"),
            F.count("*").alias("employee_count"),
            F.round(F.mean("salary"), 2).alias("mean_salary"),
        )
    )
    retention_analysis.cache()
    retention_analysis.count()
    current_time = log_timing(
        "Query 2: Employee Retention Analysis", start_time, timing_data
    )

    # Query 3: Complex performance metrics
    start_time = current_time
    performance_metrics = (
        df.withColumn(
            "is_high_performer",
            F.when(
                F.col("last_rating").isin(["Exceptional", "Exceeds Expectations"]), 1
            ).otherwise(0),
        )
        .withColumn("salary_tile", F.ntile(4).over(Window.orderBy("salary")))
        .groupBy("department", "salary_tile")
        .agg(
            F.round(F.mean("is_high_performer"), 3).alias("high_performer_ratio"),
            F.round(F.mean("projects_completed"), 2).alias("avg_projects"),
            F.sum("projects_completed").alias("total_projects"),
            F.round(
                F.mean(F.when(F.col("remote_work_eligible") == True, 1).otherwise(0)), 3
            ).alias("remote_work_ratio"),
            F.round(
                F.mean(F.when(F.col("bonus_eligible") == True, 1).otherwise(0)), 3
            ).alias("bonus_eligible_ratio"),
        )
    )
    performance_metrics.cache()
    performance_metrics.count()
    current_time = log_timing(
        "Query 3: Performance Metrics Analysis", start_time, timing_data
    )

    # Query 4: Location and compensation analysis
    start_time = current_time
    location_comp = df.groupBy("country", "office").agg(
        F.round(F.mean("salary"), 2).alias("mean_salary"),
        F.round(F.stddev("salary"), 2).alias("std_salary"),
        F.count("*").alias("employee_count"),
        F.round(F.mean("stock_options"), 2).alias("mean_stock_options"),
        F.sum("stock_options").alias("total_stock_options"),
        F.round(
            F.mean(F.when(F.col("bonus_eligible") == True, 1).otherwise(0)), 3
        ).alias("bonus_eligible_ratio"),
    )
    location_comp.cache()
    location_comp.count()
    current_time = log_timing(
        "Query 4: Location Compensation Analysis", start_time, timing_data
    )

    # Query 5: Advanced filtering and window calculations
    start_time = current_time
    windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
    top_performers = (
        df.withColumn("salary_rank", F.dense_rank().over(windowSpec))
        .filter(
            (F.col("last_rating").isin(["Exceptional", "Exceeds Expectations"]))
            & (F.col("salary_rank") <= 10)
        )
        .groupBy("department")
        .agg(
            F.count("*").alias("count"),
            F.round(F.mean("salary"), 2).alias("mean_salary"),
            F.max("salary").alias("max_salary"),
            F.round(F.mean("projects_completed"), 2).alias("mean_projects"),
            F.sum("projects_completed").alias("total_projects"),
        )
    )
    top_performers.cache()
    top_performers.count()
    current_time = log_timing(
        "Query 5: Top Performers Analysis", start_time, timing_data
    )

    # Calculate overall execution time
    overall_duration = time.time() - overall_start
    timing_data.append(f"\nTotal execution time: {overall_duration:.2f} seconds")

    # Save timing results
    output_dir = Path("performance_results")
    output_dir.mkdir(exist_ok=True)

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    with open(output_dir / f"pyspark_timing_{timestamp}.txt", "w") as f:
        f.write("\n".join(timing_data))

    # Save analysis results
    results_dir = Path("analysis_results/pyspark")
    results_dir.mkdir(parents=True, exist_ok=True)

    dept_exp_salary.toPandas().to_csv(results_dir / "dept_exp_salary.csv", index=False)
    retention_analysis.toPandas().to_csv(
        results_dir / "retention_analysis.csv", index=False
    )
    performance_metrics.toPandas().to_csv(
        results_dir / "performance_metrics.csv", index=False
    )
    location_comp.toPandas().to_csv(results_dir / "location_comp.csv", index=False)
    top_performers.toPandas().to_csv(results_dir / "top_performers.csv", index=False)

    # Stop Spark session
    spark.stop()

    print(f"Analysis complete. Results saved in {results_dir}")
    print(f"Timing results saved in {output_dir}")


if __name__ == "__main__":
    main()

24/11/08 13:14:48 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


Starting PySpark analysis...


24/11/08 13:16:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/08 13:16:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/08 13:17:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/08 13:17:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

Analysis complete. Results saved in analysis_results/pyspark
Timing results saved in performance_results
