In [1]:
# This script:
#   - Checks if CUDA-enabled GPUs are available.
#   - Configures Spark for 6 cores and ~10GB total (5g each for driver and executor).
#   - If GPUs are available and your environment supports RAPIDS, extra GPU configurations are applied.
#   - Ingests and filters a massive CSV file using a defined schema.
#   - Performs anomaly filtering (keeps rows where soil_moisture is within ±3 standard deviations).
#   - Sorts data by multiple columns and applies further filtering.
#   - Computes descriptive statistics (mean, median, stddev, Q1, Q3) for carbon_percent and nitrogen_percent.
#   - Interpolates missing carbon_percent values (if null, sets value to soil_moisture * 0.05).
#   - Builds and tests a Linear Regression model (using Spark ML) to predict carbon_percent from soil_moisture and temperature.
#   - Generates a bar chart (in a separate thread) of the nutrient statistics.
#   - Reports total runtime.
  
# Note: To truly leverage GPUs make sure your Spark cluster is configured for GPU acceleration (e.g., with RAPIDS Accelerator) and that the necessary drivers and discovery scripts are available.

import os
import time
import subprocess
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from threading import Thread

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, stddev, percentile_approx, when
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression



In [2]:
# Function to check for CUDA-enabled GPUs.
def has_cuda():
    try:
        import torch
        return torch.cuda.is_available()
    except ImportError:
        try:
            result = subprocess.run(["nvidia-smi"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            return result.returncode == 0
        except Exception:
            return False


In [3]:
# Determine compute mode.
use_gpu = has_cuda()
if use_gpu:
    print("CUDA cores detected. Configuring Spark for GPU acceleration.")
else:
    print("No CUDA cores detected. Running on CPU.")



No CUDA cores detected. Running on CPU.


In [4]:
# Start timing.
start_time = time.time()

# Define schema for the CSV.
schema = StructType([
    StructField("timestamp", IntegerType(), True),
    StructField("soil_moisture", FloatType(), True),
    StructField("soil_water_content", FloatType(), True),
    StructField("carbon_percent", FloatType(), True),
    StructField("nitrogen_percent", FloatType(), True),
    StructField("atmospheric_humidity", FloatType(), True),
    StructField("temperature", FloatType(), True),
    StructField("pH", FloatType(), True)
])

# Create Spark session with resource settings.
spark_builder = SparkSession.builder \
    .appName("1BRC_Round2") \
    .master("local[6]") \
    .config("spark.driver.memory", "5g") \
    .config("spark.executor.memory", "5g") \
    .config("spark.local.dir", "D:/spark_temp/") \
    .config("spark.sql.shuffle.partitions", "20") \
    .config("spark.default.parallelism", "12") \
    .config("spark.sql.adaptive.enabled", "true")

if use_gpu:
    # These configurations are examples; proper GPU use requires a GPU-enabled Spark setup.
    spark_builder = spark_builder \
        .config("spark.rapids.sql.enabled", "true") \
        .config("spark.executor.resource.gpu.amount", "1") \
        .config("spark.task.resource.gpu.amount", "0.5") \
        .config("spark.executor.resource.gpu.discoveryScript", "/path/to/gpuDiscoveryScript.sh")

spark = spark_builder.getOrCreate()
print("Spark session created.")



Spark session created.


In [5]:
# Task 1: Data Ingestion & Preparation.
print("Task 1: Data Ingestion & Preparation")
file_path = "combined.csv"  # Update with your CSV file location.
try:
    df = spark.read.schema(schema).csv(file_path, header=True) \
            .filter(
                (col("soil_moisture").between(0, 100)) &
                (col("soil_water_content").between(0, 100)) &
                (col("carbon_percent").between(0, 10)) &
                (col("nitrogen_percent").between(0, 5)) &
                (col("atmospheric_humidity").between(0, 100)) &
                (col("temperature").between(0, 40)) &
                (col("pH").between(4, 9))
            )
    print("Data loaded with initial filtering.")
    sample_df = df.sample(fraction=0.0001, seed=42)
    sample_count = sample_df.count()
    estimated_rows = int(sample_count / 0.0001)
    print(f"Estimated total rows: ~{estimated_rows}")
except Exception as e:
    print(f"Error loading CSV: {e}")
    spark.stop()
    exit(1)



Task 1: Data Ingestion & Preparation
Data loaded with initial filtering.
Estimated total rows: ~798410000


In [None]:
# Task 2: Cleaning – Anomaly Filtering Only.
print("Task 2: Cleaning Data (Anomaly Filtering Only)")
stats_row = df.sample(fraction=0.001, seed=42) \
              .select(mean("soil_moisture").alias("mean_moisture"),
                      stddev("soil_moisture").alias("stddev_moisture")
              ).collect()[0]
mean_moisture = stats_row["mean_moisture"]
stddev_moisture = stats_row["stddev_moisture"]
df_clean = df.filter(((col("soil_moisture") - mean_moisture) / stddev_moisture).between(-3, 3))
print("Sample of anomaly-filtered data:")
df_clean.show(5)

Task 2: Cleaning Data (Anomaly Filtering Only)
Sample of anomaly-filtered data:
+---------+-------------+------------------+--------------+----------------+--------------------+-----------+----+
|timestamp|soil_moisture|soil_water_content|carbon_percent|nitrogen_percent|atmospheric_humidity|temperature|  pH|
+---------+-------------+------------------+--------------+----------------+--------------------+-----------+----+
|        2|          1.5|             59.92|          8.34|            3.59|               53.55|       9.82|8.01|
|        3|        59.39|              82.4|          2.05|            2.71|               67.54|      39.78|5.03|
|        5|        54.58|             77.19|          5.34|            3.32|               30.31|       32.0|8.44|
|        6|        66.24|             12.15|          1.49|            2.69|                3.43|       1.22|8.07|
|        7|        81.44|             60.34|          6.56|            0.62|               10.97|      15.28|5.75|


In [7]:
# Task 3: Multi-Column Sorting & Filtering.
print("Task 3: Multi-Column Sorting & Filtering")
sorted_sample = df_clean.sample(fraction=0.0001, seed=42) \
                        .orderBy("timestamp", "soil_moisture", "temperature")
print("Sorted sample:")
sorted_sample.show(5)
filtered_df = df_clean.filter((col("soil_moisture") > 80) & (col("pH") < 5))
print("Filtered sample:")
filtered_df.show(5)



Task 3: Multi-Column Sorting & Filtering
Sorted sample:
+---------+-------------+------------------+--------------+----------------+--------------------+-----------+----+
|timestamp|soil_moisture|soil_water_content|carbon_percent|nitrogen_percent|atmospheric_humidity|temperature|  pH|
+---------+-------------+------------------+--------------+----------------+--------------------+-----------+----+
|    61166|        66.19|              26.1|          7.78|            0.41|               90.57|      37.13|4.57|
|    68508|        38.12|              27.2|          3.44|            3.71|               57.62|       6.94|7.44|
|    74499|        53.46|              7.67|          3.25|            0.69|               76.61|      38.44| 4.7|
|    75698|        29.86|              57.8|          6.87|            2.67|               47.53|      37.49|8.18|
|    82680|        44.08|             75.92|          9.57|            1.43|               91.44|      24.54| 7.9|
+---------+-------------

In [8]:
# Task 4: Statistical Analysis.
print("Task 4: Statistical Analysis")
stats = df_clean.select(
    mean("carbon_percent").alias("carbon_mean"),
    percentile_approx("carbon_percent", 0.5, 1000).alias("carbon_median"),
    stddev("carbon_percent").alias("carbon_stddev"),
    percentile_approx("carbon_percent", [0.25, 0.75], 1000).alias("carbon_quartiles"),
    mean("nitrogen_percent").alias("nitrogen_mean"),
    percentile_approx("nitrogen_percent", 0.5, 1000).alias("nitrogen_median"),
    stddev("nitrogen_percent").alias("nitrogen_stddev"),
    percentile_approx("nitrogen_percent", [0.25, 0.75], 1000).alias("nitrogen_quartiles")
).collect()[0]
print(f"Carbon Percent -> Mean: {stats['carbon_mean']:.2f}, Median: {stats['carbon_median']:.2f}, StdDev: {stats['carbon_stddev']:.2f}, " +
      f"Q1: {stats['carbon_quartiles'][0]:.2f}, Q3: {stats['carbon_quartiles'][1]:.2f}")
print(f"Nitrogen Percent -> Mean: {stats['nitrogen_mean']:.2f}, Median: {stats['nitrogen_median']:.2f}, StdDev: {stats['nitrogen_stddev']:.2f}, " +
      f"Q1: {stats['nitrogen_quartiles'][0]:.2f}, Q3: {stats['nitrogen_quartiles'][1]:.2f}")



Task 4: Statistical Analysis
Carbon Percent -> Mean: 5.00, Median: 5.00, StdDev: 2.89, Q1: 2.50, Q3: 7.50
Nitrogen Percent -> Mean: 2.50, Median: 2.50, StdDev: 1.44, Q1: 1.25, Q3: 3.75


In [1]:
# Task 5: Interpolation & Prediction.
print("Task 5: Interpolation & Prediction")
df_interp = df_clean.withColumn("carbon_percent_interp",
                    when(col("carbon_percent").isNull(), col("soil_moisture") * 0.05)
                    .otherwise(col("carbon_percent"))
                )
sample_pred = df_interp.sample(fraction=0.001, seed=42)
assembler = VectorAssembler(inputCols=["soil_moisture", "temperature"], outputCol="features")
df_vector = assembler.transform(sample_pred)
train_df, test_df = df_vector.randomSplit([0.8, 0.2], seed=42)
lr = LinearRegression(featuresCol="features", labelCol="carbon_percent_interp")
lr_model = lr.fit(train_df)
predictions = lr_model.transform(test_df)
print("Prediction sample:")
predictions.select("timestamp", "soil_moisture", "temperature", "carbon_percent_interp", "prediction").show(5)



Task 5: Interpolation & Prediction


NameError: name 'df_clean' is not defined

In [11]:
def plot_stats(stats, output_path):
    import pandas as pd
    import matplotlib.pyplot as plt
    # Create a DataFrame with the statistics.
    stats_pd = pd.DataFrame({
        "Stat": ["Mean", "Median", "StdDev", "Q1", "Q3"],
        "Carbon": [stats["carbon_mean"], stats["carbon_median"], stats["carbon_stddev"],
                   stats["carbon_quartiles"][0], stats["carbon_quartiles"][1]],
        "Nitrogen": [stats["nitrogen_mean"], stats["nitrogen_median"], stats["nitrogen_stddev"],
                     stats["nitrogen_quartiles"][0], stats["nitrogen_quartiles"][1]]
    })
    # Plot the DataFrame as a bar chart.
    ax = stats_pd.plot(x="Stat", y=["Carbon", "Nitrogen"], kind="bar", figsize=(10,6),
                         title="Soil Nutrient Statistics")
    ax.set_ylabel("Value")
    plt.tight_layout()
    plt.savefig(output_path)
    plt.close()

# Task 6: Visualization (using a separate thread).
print("Task 6: Generating Visualization")
plot_thread = Thread(target=plot_stats, args=(stats, "stats_plot.png"))
plot_thread.start()
plot_thread.join()
print("Bar chart saved as stats_plot.png")



Task 6: Generating Visualization
Bar chart saved as stats_plot.png


In [14]:
# End Timing & Report Performance.
runtime = time.time() - start_time
print(f"Total Runtime: {runtime:.2f} seconds")
spark.stop()

if __name__ == "__main__":
    main()

Total Runtime: 2366.05 seconds
Total Runtime: 0.17 seconds
