In [2]:
!pip install -q pyspark
print("PySpark installed successfully!")

PySpark installed successfully!


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.clustering import KMeans
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.fpm import FPGrowth
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator
import time
import pandas as pd
import matplotlib.pyplot as plt
import os

print("All libraries imported!")

All libraries imported!


In [4]:
spark = SparkSession.builder \
    .appName("CloudDataProcessing") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

print("Spark session created!")
print(f"Spark version: {spark.version}")

Spark session created!
Spark version: 4.0.1


In [5]:
!mkdir -p /content/results
print("Results directory created at /content/results/")

Results directory created at /content/results/


In [7]:
filename = "large_data.csv"

df = spark.read.csv(filename, header=True, inferSchema=True)

print(f"Data loaded successfully!")
print(f"Number of rows: {df.count():,}")
print(f"Number of columns: {len(df.columns)}")
print("\nColumn names:", df.columns)
print("\nFirst 5 rows:")
df.show(5)

Data loaded successfully!
Number of rows: 500,000
Number of columns: 7

Column names: ['order_id', 'customer_id', 'product_id', 'quantity', 'price', 'region', 'total_price']

First 5 rows:
+--------+-----------+----------+--------+------+---------+-----------+
|order_id|customer_id|product_id|quantity| price|   region|total_price|
+--------+-----------+----------+--------+------+---------+-----------+
|       1|      25795|       308|       1|799.12|Northeast|     799.12|
|       2|      10860|       695|       9|501.99|  Midwest|    4517.91|
|       3|      48158|       905|      12|577.14|Southeast|    6925.68|
|       4|      21284|       215|      12|156.09|    North|    1873.08|
|       5|      16265|       229|      17|798.67|     East|   13577.39|
+--------+-----------+----------+--------+------+---------+-----------+
only showing top 5 rows


In [8]:
num_rows = df.count()
num_cols = len(df.columns)

print(f"Total Rows: {num_rows:,}")
print(f"Total Columns: {num_cols}")
print("\nColumn Information:")
for col_name, col_type in df.dtypes:
    print(f"  {col_name}: {col_type}")

Total Rows: 500,000
Total Columns: 7

Column Information:
  order_id: int
  customer_id: int
  product_id: int
  quantity: int
  price: double
  region: string
  total_price: double


In [9]:
num_columns = [field.name for field in df.schema.fields
               if isinstance(field.dataType, (IntegerType, LongType, FloatType, DoubleType))]

print(f"Found {len(num_columns)} numerical columns\n")

for col in num_columns:
    stats = df.select(
        F.min(col).alias('min'),
        F.max(col).alias('max'),
        F.mean(col).alias('mean'),
        F.stddev(col).alias('std')
    ).collect()[0]

    print(f"{col}:")
    print(f"  Min: {stats['min']}")
    print(f"  Max: {stats['max']}")
    print(f"  Mean: {stats['mean']:.2f}")
    print(f"  Std Dev: {stats['std']:.2f}")
    print()

Found 6 numerical columns

order_id:
  Min: 1
  Max: 500000
  Mean: 250000.50
  Std Dev: 144337.71

customer_id:
  Min: 10000
  Max: 49999
  Mean: 29995.33
  Std Dev: 11541.41

product_id:
  Min: 100
  Max: 999
  Mean: 549.64
  Std Dev: 259.95

quantity:
  Min: 1
  Max: 19
  Mean: 10.01
  Std Dev: 5.48

price:
  Min: 10.0
  Max: 1000.0
  Mean: 505.28
  Std Dev: 286.00

total_price:
  Min: 10.01
  Max: 18999.62
  Mean: 5056.86
  Std Dev: 4282.43



In [10]:
total_rows = df.count()

for col in df.columns:
    null_count = df.filter(F.col(col).isNull()).count()
    percentage = (null_count / total_rows * 100) if total_rows > 0 else 0
    print(f"{col}: {null_count} missing ({percentage:.1f}%)")

order_id: 0 missing (0.0%)
customer_id: 0 missing (0.0%)
product_id: 0 missing (0.0%)
quantity: 0 missing (0.0%)
price: 0 missing (0.0%)
region: 0 missing (0.0%)
total_price: 0 missing (0.0%)


In [11]:
for col in df.columns:
    unique_count = df.select(col).distinct().count()
    print(f"{col}: {unique_count} unique values")

order_id: 500000 unique values
customer_id: 40000 unique values
product_id: 900 unique values
quantity: 19 unique values
price: 98353 unique values
region: 8 unique values
total_price: 341714 unique values


In [12]:

if len(num_columns) >= 2:
    print("\nCorrelations between numerical columns:\n")
    for i in range(len(num_columns)):
        for j in range(i+1, len(num_columns)):
            col1 = num_columns[i]
            col2 = num_columns[j]
            corr = df.stat.corr(col1, col2)
            print(f"{col1} <-> {col2}: {corr:.3f}")
else:
    print("Need at least 2 numerical columns for correlation")


Correlations between numerical columns:

order_id <-> customer_id: 0.001
order_id <-> product_id: 0.001
order_id <-> quantity: 0.000
order_id <-> price: -0.003
order_id <-> total_price: -0.002
customer_id <-> product_id: -0.001
customer_id <-> quantity: -0.001
customer_id <-> price: 0.002
customer_id <-> total_price: 0.001
product_id <-> quantity: 0.000
product_id <-> price: -0.001
product_id <-> total_price: -0.001
quantity <-> price: 0.000
quantity <-> total_price: 0.647
price <-> total_price: 0.669


In [13]:
if len(num_columns) >= 2:

    label_col = num_columns[-1]
    feature_cols = num_columns[:-1]

    print(f"Target column: {label_col}")
    print(f"Feature columns: {', '.join(feature_cols)}\n")


    df_clean = df.select(feature_cols + [label_col]).na.drop()


    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    df_assembled = assembler.transform(df_clean)


    train_data, test_data = df_assembled.randomSplit([0.8, 0.2], seed=42)

    print(f"Training samples: {train_data.count()}")
    print(f"Testing samples: {test_data.count()}\n")


    lr = LinearRegression(featuresCol="features", labelCol=label_col, maxIter=100)

    start_time = time.time()
    model = lr.fit(train_data)
    training_time = time.time() - start_time


    predictions = model.transform(test_data)


    evaluator = RegressionEvaluator(labelCol=label_col, predictionCol="prediction")
    rmse = evaluator.setMetricName("rmse").evaluate(predictions)
    r2 = evaluator.setMetricName("r2").evaluate(predictions)
    mae = evaluator.setMetricName("mae").evaluate(predictions)

    print("Results:")
    print(f"  RMSE (Root Mean Squared Error): {rmse:.4f}")
    print(f"  R² (R-squared): {r2:.4f}")
    print(f"  MAE (Mean Absolute Error): {mae:.4f}")
    print(f"  Training Time: {training_time:.2f} seconds\n")


    predictions.select(label_col, "prediction").limit(100) \
        .coalesce(1).write.mode("overwrite") \
        .csv("/content/results/linear_regression_predictions", header=True)

    print("Predictions saved to /content/results/linear_regression_predictions/")

else:
    print("Need at least 2 numerical columns for Linear Regression")

Target column: total_price
Feature columns: order_id, customer_id, product_id, quantity, price

Training samples: 399895
Testing samples: 100105

Results:
  RMSE (Root Mean Squared Error): 1571.8279
  R² (R-squared): 0.8658
  MAE (Mean Absolute Error): 1177.5824
  Training Time: 7.31 seconds

Predictions saved to /content/results/linear_regression_predictions/


In [14]:
if len(num_columns) >= 2:
    selected_cols = num_columns[:min(5, len(num_columns))]
    print(f"Using columns: {', '.join(selected_cols)}\n")


    df_clean = df.select(selected_cols).na.drop()


    assembler = VectorAssembler(inputCols=selected_cols, outputCol="features")
    df_assembled = assembler.transform(df_clean)


    scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
    scaler_model = scaler.fit(df_assembled)
    df_scaled = scaler_model.transform(df_assembled)


    k = 5
    kmeans = KMeans(featuresCol="scaled_features", k=k, seed=42, maxIter=20)

    start_time = time.time()
    model = kmeans.fit(df_scaled)
    training_time = time.time() - start_time


    predictions = model.transform(df_scaled)


    cluster_counts = predictions.groupBy("prediction").count().orderBy("prediction").collect()

    print(f"Number of clusters: {k}")
    print(f"Total samples: {df_clean.count()}\n")
    print("Cluster distribution:")
    for row in cluster_counts:
        print(f"  Cluster {row['prediction']}: {row['count']} samples")

    print(f"\nTraining Time: {training_time:.2f} seconds\n")


    predictions.select(selected_cols + ["prediction"]).limit(100) \
        .coalesce(1).write.mode("overwrite") \
        .csv("/content/results/kmeans_clusters", header=True)

    print("Cluster assignments saved to /content/results/kmeans_clusters/")

else:
    print("Need at least 2 numerical columns for K-Means")

Using columns: order_id, customer_id, product_id, quantity, price

Number of clusters: 5
Total samples: 500000

Cluster distribution:
  Cluster 0: 95698 samples
  Cluster 1: 103377 samples
  Cluster 2: 97164 samples
  Cluster 3: 100012 samples
  Cluster 4: 103749 samples

Training Time: 23.68 seconds

Cluster assignments saved to /content/results/kmeans_clusters/


In [16]:
if len(num_columns) >= 2:
    label_col = num_columns[0]
    feature_cols = num_columns[1:]

    print(f"Creating binary label from: {label_col}")
    print(f"Feature columns: {', '.join(feature_cols)}\n")


    df_clean = df.select(feature_cols + [label_col]).na.drop()


    median_value = df_clean.approxQuantile(label_col, [0.5], 0.01)[0]
    df_labeled = df_clean.withColumn("label",
                                     F.when(F.col(label_col) >= median_value, 1).otherwise(0))


    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    df_assembled = assembler.transform(df_labeled)


    train_data, test_data = df_assembled.randomSplit([0.8, 0.2], seed=42)

    print(f"Training samples: {train_data.count()}")
    print(f"Testing samples: {test_data.count()}\n")


    lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=100)

    start_time = time.time()
    model = lr.fit(train_data)
    training_time = time.time() - start_time


    predictions = model.transform(test_data)


    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
    accuracy = evaluator.setMetricName("accuracy").evaluate(predictions)
    f1 = evaluator.setMetricName("f1").evaluate(predictions)

    print("Results:")
    print(f"  Accuracy: {accuracy:.4f}")
    print(f"  F1-Score: {f1:.4f}")
    print(f"  Training Time: {training_time:.2f} seconds\n")


    predictions.select("label", "prediction", F.col("probability").cast("string").alias("probability")) \
        .limit(100) \
        .coalesce(1).write.mode("overwrite") \
        .csv("/content/results/logistic_regression_predictions", header=True)

    print("Predictions saved to /content/results/logistic_regression_predictions/")

else:
    print("Need at least 2 numerical columns for Logistic Regression")

Creating binary label from: order_id
Feature columns: customer_id, product_id, quantity, price, total_price

Training samples: 399895
Testing samples: 100105

Results:
  Accuracy: 0.4998
  F1-Score: 0.4998
  Training Time: 9.05 seconds

Predictions saved to /content/results/logistic_regression_predictions/


In [22]:
string_cols = [field.name for field in df.schema.fields
               if isinstance(field.dataType, StringType)]

if len(string_cols) >= 1:
    selected_cols = string_cols[:min(3, len(string_cols))]
    print(f"Using columns: {', '.join(selected_cols)}\n")


    df_transactions = df.select(
        F.array(*[F.col(c) for c in selected_cols]).alias("items")
    ).filter(F.size("items") > 0)

    df_transactions = df_transactions.withColumn(
        "items",
        F.array_except("items", F.array(F.lit(None)))
    ).filter(F.size("items") > 0)

    print(f"Total transactions: {df_transactions.count()}\n")

    fpGrowth = FPGrowth(itemsCol="items", minSupport=0.01, minConfidence=0.5)

    start_time = time.time()
    model = fpGrowth.fit(df_transactions)
    training_time = time.time() - start_time

    frequent_itemsets = model.freqItemsets
    print("Top 10 Frequent Itemsets:")
    frequent_itemsets.orderBy(F.desc("freq")).show(10, truncate=False)

    association_rules = model.associationRules
    print("\nTop 10 Association Rules:")
    association_rules.orderBy(F.desc("confidence")).show(10, truncate=False)

    print(f"\nTraining Time: {training_time:.2f} seconds\n")

    frequent_itemsets.orderBy(F.desc("freq")) \
        .select(F.col("items").cast("string").alias("items"), "freq") \
        .limit(50) \
        .coalesce(1).write.mode("overwrite") \
        .csv("/content/results/frequent_itemsets", header=True)

    association_rules.orderBy(F.desc("confidence")) \
        .select(F.col("antecedent").cast("string"), F.col("consequent").cast("string"), "confidence", "lift") \
        .limit(50) \
        .coalesce(1).write.mode("overwrite") \
        .csv("/content/results/association_rules", header=True)

    print("Frequent itemsets saved to /content/results/frequent_itemsets/")
    print("Association rules saved to /content/results/association_rules/")

else:
    print("Need at least 1 string column for FP-Growth")
    print("Creating sample transactions for demonstration...\n")

    sample_data = [
        (["milk", "bread", "butter"],),
        (["milk", "bread"],),
        (["milk", "butter"],),
        (["bread", "butter"],),
        (["milk", "bread", "butter", "cheese"],)
    ] * 100

    df_sample = spark.createDataFrame(sample_data, ["items"])

    fpGrowth = FPGrowth(itemsCol="items", minSupport=0.3, minConfidence=0.6)
    model = fpGrowth.fit(df_sample)

    print("Sample Frequent Itemsets:")
    model.freqItemsets.show(10, truncate=False)

    print("\nSample Association Rules:")
    model.associationRules.show(10, truncate=False)

Using columns: region

Total transactions: 500000

Top 10 Frequent Itemsets:
+-----------+-----+
|items      |freq |
+-----------+-----+
|[West]     |62747|
|[East]     |62741|
|[South]    |62612|
|[North]    |62605|
|[Southeast]|62442|
|[Midwest]  |62368|
|[Southwest]|62321|
|[Northeast]|62164|
+-----------+-----+


Top 10 Association Rules:
+----------+----------+----------+----+-------+
|antecedent|consequent|confidence|lift|support|
+----------+----------+----------+----+-------+
+----------+----------+----------+----+-------+


Training Time: 2.22 seconds

Frequent itemsets saved to /content/results/frequent_itemsets/
Association rules saved to /content/results/association_rules/


In [25]:
if len(num_columns) >= 2:
    selected_cols = num_columns[:min(5, len(num_columns))]
    df_clean = df.select(selected_cols).na.drop()

    node_counts = [1, 2, 4, 8]
    execution_times = []

    for nodes in node_counts:
        print(f"Running with {nodes} node(s)...")

        df_partitioned = df_clean.repartition(nodes)

        spark.conf.set("spark.default.parallelism", nodes)
        spark.conf.set("spark.sql.shuffle.partitions", nodes * 2)

        assembler = VectorAssembler(inputCols=selected_cols, outputCol="features")
        df_assembled = assembler.transform(df_partitioned)

        scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
        df_scaled = scaler.fit(df_assembled).transform(df_assembled)

        kmeans = KMeans(featuresCol="scaled_features", k=3, seed=42, maxIter=10)

        start_time = time.time()
        model = kmeans.fit(df_scaled)
        predictions = model.transform(df_scaled)
        predictions.count()
        exec_time = time.time() - start_time

        execution_times.append(exec_time)
        print(f"  ✓ Completed in {exec_time:.2f} seconds\n")

    baseline_time = execution_times[0]

    results = []
    for i, nodes in enumerate(node_counts):
        speedup = baseline_time / execution_times[i]
        efficiency = (speedup / nodes) * 100

        results.append({
            'Nodes': nodes,
            'Time (s)': round(execution_times[i], 2),
            'Speedup': round(speedup, 3),
            'Efficiency (%)': round(efficiency, 2)
        })

    print("=" * 70)
    print("PERFORMANCE RESULTS")
    print("=" * 70)
    print(f"{'Nodes':<10} {'Time (s)':<15} {'Speedup':<15} {'Efficiency (%)':<15}")
    print("-" * 70)

    for r in results:
        print(f"{r['Nodes']:<10} {r['Time (s)']:<15} {r['Speedup']:<15} {r['Efficiency (%)']:<15}")

    print("\n" + "=" * 70)
    print("FORMULAS USED:")
    print("  Speedup(n) = T(1) / T(n)")
    print("  Efficiency(n) = Speedup(n) / n × 100%")
    print("=" * 70)

    results_df = pd.DataFrame(results)
    results_df.to_csv('/content/results/performance_benchmark.csv', index=False)
    print("\n✅ Performance results saved to /content/results/performance_benchmark.csv")



Running with 1 node(s)...
  ✓ Completed in 16.34 seconds

Running with 2 node(s)...
  ✓ Completed in 15.97 seconds

Running with 4 node(s)...
  ✓ Completed in 18.43 seconds

Running with 8 node(s)...
  ✓ Completed in 18.70 seconds

PERFORMANCE RESULTS
Nodes      Time (s)        Speedup         Efficiency (%) 
----------------------------------------------------------------------
1          16.34           1.0             100.0          
2          15.97           1.024           51.19          
4          18.43           0.887           22.17          
8          18.7            0.874           10.92          

FORMULAS USED:
  Speedup(n) = T(1) / T(n)
  Efficiency(n) = Speedup(n) / n × 100%

✅ Performance results saved to /content/results/performance_benchmark.csv
