In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [2]:
spark = SparkSession.builder \
    .appName("CO2 Emissions Analysis") \
    .getOrCreate()


In [3]:
data = spark.read.csv("/content/CO2 Emissions_Canada.csv", header=True, inferSchema=True)


In [4]:
data.show(5)
data.printSchema()


+-----+----------+-------------+--------------+---------+------------+---------+--------------------------------+-------------------------------+--------------------------------+---------------------------+-------------------+
| Make|     Model|Vehicle Class|Engine Size(L)|Cylinders|Transmission|Fuel Type|Fuel Consumption City (L/100 km)|Fuel Consumption Hwy (L/100 km)|Fuel Consumption Comb (L/100 km)|Fuel Consumption Comb (mpg)|CO2 Emissions(g/km)|
+-----+----------+-------------+--------------+---------+------------+---------+--------------------------------+-------------------------------+--------------------------------+---------------------------+-------------------+
|ACURA|       ILX|      COMPACT|           2.0|        4|         AS5|        Z|                             9.9|                            6.7|                             8.5|                         33|                196|
|ACURA|       ILX|      COMPACT|           2.4|        4|          M6|        Z|            

In [5]:
print(f"Number of Rows: {data.count()}")
print(f"Number of Columns: {len(data.columns)}")


Number of Rows: 7385
Number of Columns: 12


In [6]:
data.describe(["Engine Size(L)", "Fuel Consumption City (L/100 km)",
               "Fuel Consumption Hwy (L/100 km)", "CO2 Emissions(g/km)"]).show()


+-------+------------------+--------------------------------+-------------------------------+-------------------+
|summary|    Engine Size(L)|Fuel Consumption City (L/100 km)|Fuel Consumption Hwy (L/100 km)|CO2 Emissions(g/km)|
+-------+------------------+--------------------------------+-------------------------------+-------------------+
|  count|              7385|                            7385|                           7385|               7385|
|   mean|3.1600677048070125|              12.556533513879508|               9.04170616113748| 250.58469871360867|
| stddev|1.3541704555622625|              3.5002740810878024|             2.2244563806418762|  58.51267939440643|
|    min|               0.9|                             4.2|                            4.0|                 96|
|    max|               8.4|                            30.6|                           20.6|                522|
+-------+------------------+--------------------------------+---------------------------

In [7]:
data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns]).show()


+----+-----+-------------+--------------+---------+------------+---------+--------------------------------+-------------------------------+--------------------------------+---------------------------+-------------------+
|Make|Model|Vehicle Class|Engine Size(L)|Cylinders|Transmission|Fuel Type|Fuel Consumption City (L/100 km)|Fuel Consumption Hwy (L/100 km)|Fuel Consumption Comb (L/100 km)|Fuel Consumption Comb (mpg)|CO2 Emissions(g/km)|
+----+-----+-------------+--------------+---------+------------+---------+--------------------------------+-------------------------------+--------------------------------+---------------------------+-------------------+
|   0|    0|            0|             0|        0|           0|        0|                               0|                              0|                               0|                          0|                  0|
+----+-----+-------------+--------------+---------+------------+---------+--------------------------------+---------

In [8]:
data.groupBy("Fuel Type").count().show()


+---------+-----+
|Fuel Type|count|
+---------+-----+
|        E|  370|
|        D|  175|
|        Z| 3202|
|        N|    1|
|        X| 3637|
+---------+-----+



In [9]:
from pyspark.ml.feature import VectorAssembler

feature_columns = ["Engine Size(L)", "Fuel Consumption City (L/100 km)",
                   "Fuel Consumption Hwy (L/100 km)", "Fuel Consumption Comb (L/100 km)",
                   "Cylinders"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data).select("features", "CO2 Emissions(g/km)")


In [10]:
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)


In [11]:
from pyspark.ml.regression import LinearRegression


In [12]:
lr = LinearRegression(featuresCol="features", labelCol="CO2 Emissions(g/km)")
lr_model = lr.fit(train_data)


In [13]:
print("Coefficients: ", lr_model.coefficients)
print("Intercept: ", lr_model.intercept)


Coefficients:  [5.871117106398591,-0.5984585754341162,0.24995741240696912,13.458620283543862,6.678568623212983]
Intercept:  51.99946500270319


In [14]:
predictions = lr_model.transform(test_data)
predictions.select("features", "CO2 Emissions(g/km)", "prediction").show(5)


+--------------------+-------------------+------------------+
|            features|CO2 Emissions(g/km)|        prediction|
+--------------------+-------------------+------------------+
|[0.9,7.5,6.1,6.9,...|                157|167.21995712448017|
|[1.0,6.9,5.7,6.4,...|                147|161.33685087364577|
|[1.0,7.5,5.5,6.6,...|                153|163.61950830261267|
|[1.0,7.9,5.9,7.0,...|                164|168.86355595081938|
|[1.0,8.6,8.1,8.4,...|                196|187.83660965227222|
+--------------------+-------------------+------------------+
only showing top 5 rows



In [15]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="CO2 Emissions(g/km)", predictionCol="prediction")
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

print(f"Root Mean Squared Error (RMSE): {rmse}")
print(f"R-Squared (R²): {r2}")


Root Mean Squared Error (RMSE): 20.596194419289475
R-Squared (R²): 0.87640747409551


In [16]:
# Reload or use the original dataset
original_data = spark.read.csv("/content/CO2 Emissions_Canada.csv", header=True, inferSchema=True)



In [17]:
original_data.printSchema()


root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Vehicle Class: string (nullable = true)
 |-- Engine Size(L): double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Transmission: string (nullable = true)
 |-- Fuel Type: string (nullable = true)
 |-- Fuel Consumption City (L/100 km): double (nullable = true)
 |-- Fuel Consumption Hwy (L/100 km): double (nullable = true)
 |-- Fuel Consumption Comb (L/100 km): double (nullable = true)
 |-- Fuel Consumption Comb (mpg): integer (nullable = true)
 |-- CO2 Emissions(g/km): integer (nullable = true)



In [18]:
from pyspark.ml.feature import VectorAssembler

clustering_features = ["Fuel Consumption City (L/100 km)",
                       "Fuel Consumption Hwy (L/100 km)",
                       "Fuel Consumption Comb (L/100 km)",
                       "CO2 Emissions(g/km)"]

assembler = VectorAssembler(inputCols=clustering_features, outputCol="features")
clustering_data = assembler.transform(original_data).select("features")


In [19]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
scaler_model = scaler.fit(clustering_data)
scaled_data = scaler_model.transform(clustering_data).select("scaled_features")


In [20]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(featuresCol="scaled_features", k=2, seed=42)
model = kmeans.fit(scaled_data)


In [21]:
clustered_data = model.transform(scaled_data)
clustered_data.show(5)  # Displays the cluster assignment for each record


+--------------------+----------+
|     scaled_features|prediction|
+--------------------+----------+
|[-0.7589501428567...|         0|
|[-0.3875506553069...|         0|
|[-1.8731486055063...|         0|
|[0.04098721494287...|         0|
|[-0.1304279331570...|         0|
+--------------------+----------+
only showing top 5 rows



In [22]:
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator = ClusteringEvaluator(featuresCol="scaled_features", metricName="silhouette", distanceMeasure="squaredEuclidean")
silhouette_score = evaluator.evaluate(clustered_data)

print(f"Silhouette Score: {silhouette_score}")


Silhouette Score: 0.7177506170009392


In [23]:
print("Cluster Centers: ")
for center in model.clusterCenters():
    print(center)


Cluster Centers: 
[-0.58498032 -0.57214407 -0.58771389 -0.57809133]
[1.06327594 1.03994442 1.06824456 1.05075433]


In [24]:
from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql.functions import udf
import pyspark.sql.functions as F

# Convert vector column to array
vector_to_array_udf = udf(lambda vector: vector.toArray().tolist(), ArrayType(FloatType()))

# Apply UDF on 'scaled_features' instead of 'features'
clustered_data = clustered_data.withColumn("features_array", vector_to_array_udf("scaled_features"))

# Split array into separate columns
for i, col_name in enumerate(clustering_features):
    clustered_data = clustered_data.withColumn(col_name, F.col("features_array")[i])

# Drop unnecessary columns before saving
clustered_data = clustered_data.drop("scaled_features", "features_array")

# Save as CSV
clustered_data.write.csv("clustered_data.csv", header=True, mode="overwrite")



In [25]:
classification_data = spark.read.csv("/content/CO2 Emissions_Canada.csv", header=True, inferSchema=True)

classification_data.printSchema()

root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Vehicle Class: string (nullable = true)
 |-- Engine Size(L): double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Transmission: string (nullable = true)
 |-- Fuel Type: string (nullable = true)
 |-- Fuel Consumption City (L/100 km): double (nullable = true)
 |-- Fuel Consumption Hwy (L/100 km): double (nullable = true)
 |-- Fuel Consumption Comb (L/100 km): double (nullable = true)
 |-- Fuel Consumption Comb (mpg): integer (nullable = true)
 |-- CO2 Emissions(g/km): integer (nullable = true)



In [26]:
from pyspark.sql.functions import when, col

classification_data = classification_data.withColumn(
    "emission_category",
    when(col("CO2 Emissions(g/km)") <= 150, "Low")
    .when((col("CO2 Emissions(g/km)") > 150) & (col("CO2 Emissions(g/km)") <= 250), "Medium")
    .otherwise("High")
)


In [27]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="emission_category", outputCol="label")
classification_data = indexer.fit(classification_data).transform(classification_data)


In [28]:
from pyspark.ml.feature import VectorAssembler

features = ["Engine Size(L)", "Cylinders", "Fuel Consumption Comb (L/100 km)"]

assembler = VectorAssembler(inputCols=features, outputCol="features")
classification_data = assembler.transform(classification_data).select("features", "label")


In [29]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Split the dataset into training (80%) and test (20%)
train_data, test_data = classification_data.randomSplit([0.8, 0.2], seed=42)

# Initialize the Decision Tree model
dt = DecisionTreeClassifier(featuresCol="features", labelCol="label")

# Train the model
dt_model = dt.fit(train_data)

# Make predictions on the test set
predictions = dt_model.transform(test_data)


In [30]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Decision Tree Accuracy: {accuracy:.4f}")


Decision Tree Accuracy: 0.9638


In [31]:
print("Feature Importances: ", dt_model.featureImportances)


Feature Importances:  (3,[0,1,2],[0.02290557016504808,0.0004954835758582166,0.9765989462590937])


In [32]:
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql import Row

# Convert predictions to RDD format
prediction_rdd = predictions.select("label", "prediction").rdd.map(lambda row: (row["prediction"], row["label"]))

# Compute confusion matrix
metrics = MulticlassMetrics(prediction_rdd)
print("Confusion Matrix:\n", metrics.confusionMatrix().toArray())




Confusion Matrix:
 [[662.  18.  11.]
 [ 18. 664.   0.]
 [  4.   0.  33.]]


In [33]:
tableau_data = predictions.select("features", "label", "prediction")


In [34]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define mapping from numerical labels back to category names
def label_to_category(label):
    mapping = {0.0: "Low", 1.0: "Medium", 2.0: "High"}
    return mapping.get(label, "Unknown")

# Create UDF
label_to_category_udf = udf(label_to_category, StringType())

# Apply mapping
tableau_data = tableau_data.withColumn("Predicted Category", label_to_category_udf(col("prediction")))
tableau_data = tableau_data.withColumn("Actual Category", label_to_category_udf(col("label")))


In [35]:
tableau_data.write.csv("tableau_classification_results.csv", header=True, mode="overwrite")


AnalysisException: [UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE] The CSV datasource doesn't support the column `features` of the type "STRUCT<type: TINYINT, size: INT, indices: ARRAY<INT>, values: ARRAY<DOUBLE>>".