In [79]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [52]:
import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc, sum as spark_sum
from pyspark.sql.functions import when
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.feature import VectorAssembler

from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
# 1. Initialize SparkSession
spark = SparkSession.builder.appName("StudentsAnalysis").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/14 22:48:28 WARN Utils: Your hostname, TinkuAcchus-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 10.133.144.3 instead (on interface en0)
25/08/14 22:48:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/14 22:48:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# 2. Load the dataset
# 'inferSchema=True' automatically detects data types, which is helpful.
studentsDF = spark.read.csv("Students.csv", header=True, inferSchema=True)

In [4]:
# 3. Display the schema and first 5 rows to verify the data is loaded correctly
print("DataFrame Schema:")
studentsDF.printSchema()

DataFrame Schema:
root
 |-- Student_Name: string (nullable = true)
 |-- College_Name: string (nullable = true)
 |-- Stream: string (nullable = true)
 |-- Year_of_Study: integer (nullable = true)
 |-- AI_Tools_Used: string (nullable = true)
 |-- Daily_Usage_Hours: double (nullable = true)
 |-- Use_Cases: string (nullable = true)
 |-- Trust_in_AI_Tools: integer (nullable = true)
 |-- Impact_on_Grades: integer (nullable = true)
 |-- Do_Professors_Allow_Use: string (nullable = true)
 |-- Preferred_AI_Tool: string (nullable = true)
 |-- Awareness_Level: integer (nullable = true)
 |-- Willing_to_Pay_for_Access: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Device_Used: string (nullable = true)
 |-- Internet_Access: string (nullable = true)



In [5]:
print("\nFirst 5 rows of the DataFrame:")
studentsDF.show(5)


First 5 rows of the DataFrame:
+------------+--------------------+-----------+-------------+-------------+-----------------+--------------------+-----------------+----------------+-----------------------+-----------------+---------------+-------------------------+-------------+-----------+---------------+
|Student_Name|        College_Name|     Stream|Year_of_Study|AI_Tools_Used|Daily_Usage_Hours|           Use_Cases|Trust_in_AI_Tools|Impact_on_Grades|Do_Professors_Allow_Use|Preferred_AI_Tool|Awareness_Level|Willing_to_Pay_for_Access|        State|Device_Used|Internet_Access|
+------------+--------------------+-----------+-------------+-------------+-----------------+--------------------+-----------------+----------------+-----------------------+-----------------+---------------+-------------------------+-------------+-----------+---------------+
|       Aarav|Indian Institute ...|Engineering|            4|       Gemini|              0.9|Assignments, Codi...|                2|        

In [6]:
# Summary statistics for numeric columns
studentsDF.describe(["Daily_Usage_Hours", "Trust_in_AI_Tools", "Impact_on_Grades", "Awareness_Level"]).show()

25/08/14 22:48:35 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+------------------+------------------+--------------------+-----------------+
|summary| Daily_Usage_Hours| Trust_in_AI_Tools|    Impact_on_Grades|  Awareness_Level|
+-------+------------------+------------------+--------------------+-----------------+
|  count|              3614|              3614|                3614|             3614|
|   mean|2.5596845600442664|3.0232429441062534|0.003320420586607637|5.828444936358605|
| stddev|1.2133193874234915|1.4369339911635717|  2.3707064650348753|2.925480785216794|
|    min|               0.5|                 1|                  -5|                1|
|    max|               5.0|                 5|                   5|               10|
+-------+------------------+------------------+--------------------+-----------------+



In [7]:
# Count of unique values in a categorical column
studentsDF.select("AI_Tools_Used").distinct().show()

+--------------------+
|       AI_Tools_Used|
+--------------------+
|                Bard|
|    ChatGPT, Copilot|
|             ChatGPT|
|               Other|
|              Gemini|
|             Copilot|
|              Claude|
|ChatGPT, Gemini, ...|
|  Gemini, Midjourney|
+--------------------+



In [8]:
studentsDF.count()

3614

In [9]:
print("Checking for missing values in each column:")
# Get a list of all column names
columns_to_check = studentsDF.columns

Checking for missing values in each column:


In [10]:
# Loop through each column and print the count of nulls
for column_name in columns_to_check:
    missing_count = studentsDF.filter(col(column_name).isNull()).count()
    print(f"Column '{column_name}': {missing_count} missing values")

Column 'Student_Name': 0 missing values
Column 'College_Name': 0 missing values
Column 'Stream': 0 missing values
Column 'Year_of_Study': 0 missing values
Column 'AI_Tools_Used': 0 missing values
Column 'Daily_Usage_Hours': 0 missing values
Column 'Use_Cases': 0 missing values
Column 'Trust_in_AI_Tools': 0 missing values
Column 'Impact_on_Grades': 0 missing values
Column 'Do_Professors_Allow_Use': 0 missing values
Column 'Preferred_AI_Tool': 0 missing values
Column 'Awareness_Level': 0 missing values
Column 'Willing_to_Pay_for_Access': 0 missing values
Column 'State': 1614 missing values
Column 'Device_Used': 0 missing values
Column 'Internet_Access': 0 missing values


In [11]:
# Filter out rows where 'State' is null, then group by state and count
state_counts = studentsDF.dropna(subset=['State']).groupBy("State").count()

# Order the counts in descending order and get the first row
mode_state = state_counts.orderBy(desc("count")).first()["State"]

In [12]:
print(f"The mode (most frequent value) of the 'State' column is: {mode_state}")

The mode (most frequent value) of the 'State' column is: Maharashtra


In [18]:
# Fill the missing 'State' values with the calculated mode
studentsDF_imputed = studentsDF.fillna(mode_state, subset=['State'])

In [19]:
# Verify that there are no longer any null values in the 'State' column
print("Count of rows with a missing 'State' after imputation:", df_imputed.filter(col("State").isNull()).count())

Count of rows with a missing 'State' after imputation: 0


In [20]:
studentsDF_imputed.show(5)

+------------+--------------------+-----------+-------------+-------------+-----------------+--------------------+-----------------+----------------+-----------------------+-----------------+---------------+-------------------------+-------------+-----------+---------------+
|Student_Name|        College_Name|     Stream|Year_of_Study|AI_Tools_Used|Daily_Usage_Hours|           Use_Cases|Trust_in_AI_Tools|Impact_on_Grades|Do_Professors_Allow_Use|Preferred_AI_Tool|Awareness_Level|Willing_to_Pay_for_Access|        State|Device_Used|Internet_Access|
+------------+--------------------+-----------+-------------+-------------+-----------------+--------------------+-----------------+----------------+-----------------------+-----------------+---------------+-------------------------+-------------+-----------+---------------+
|       Aarav|Indian Institute ...|Engineering|            4|       Gemini|              0.9|Assignments, Codi...|                2|               2|                     No

In [21]:


# Create a new column to store the numeric representation
studentsDF_encoded = studentsDF_imputed.withColumn('Willing_to_Pay_Numeric',
                                   when(col('Willing_to_Pay_for_Access') == 'Yes', 1).otherwise(0))

# Show the new column alongside the original
print("\n Student dataFrame with additional numeric column:")
studentsDF_encoded.select("Willing_to_Pay_for_Access", "Willing_to_Pay_Numeric").show(5)


 Student dataFrame with additional numeric column:
+-------------------------+----------------------+
|Willing_to_Pay_for_Access|Willing_to_Pay_Numeric|
+-------------------------+----------------------+
|                      Yes|                     1|
|                       No|                     0|
|                       No|                     0|
|                       No|                     0|
|                      Yes|                     1|
+-------------------------+----------------------+
only showing top 5 rows


In [28]:


# Index the 'Stream' column to a numeric representation
indexer = StringIndexer(inputCol="Stream", outputCol="Stream_Index")
studentDF_indexed = indexer.fit(studentsDF_encoded).transform(studentsDF_encoded)

In [29]:
# One-hot encode the indexed column
encoder = OneHotEncoder(inputCol="Stream_Index", outputCol="Stream_Vector")
studentDF_encoded_final = encoder.fit(studentDF_indexed).transform(studentDF_indexed)

In [30]:
# Show the results to see the indexed and vectorized columns
print("\nStudent dataFrame with'Stream' column after encoding:")
studentDF_encoded_final.select("Stream", "Stream_Index", "Stream_Vector").show(5, truncate=False)


Student dataFrame with'Stream' column after encoding:
+-----------+------------+-------------+
|Stream     |Stream_Index|Stream_Vector|
+-----------+------------+-------------+
|Engineering|1.0         |(9,[1],[1.0])|
|Commerce   |4.0         |(9,[4],[1.0])|
|Science    |0.0         |(9,[0],[1.0])|
|Arts       |2.0         |(9,[2],[1.0])|
|Science    |0.0         |(9,[0],[1.0])|
+-----------+------------+-------------+
only showing top 5 rows


In [32]:
# Define the columns you want to use as features.
# Make sure to include both your original numerical columns and your new vectorized columns.
feature_columns = ['Trust_in_AI_Tools', 'Impact_on_Grades', 'Awareness_Level', 'Willing_to_Pay_Numeric', 'Stream_Vector']

# Assemble the features into a single vector column named 'features'
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
studentDF_final = assembler.transform(studentDF_encoded_final)

# Display the final schema and a sample of the new 'features' vector
print("\nFinal DataFrame Schema with features column:")
studentDF_final.printSchema()

print("\nFinal DataFrame with assembled features:")
studentDF_final.select("features", "Daily_Usage_Hours", "Stream_Vector").show(5, truncate=False)




Final DataFrame Schema with features column:
root
 |-- Student_Name: string (nullable = true)
 |-- College_Name: string (nullable = true)
 |-- Stream: string (nullable = true)
 |-- Year_of_Study: integer (nullable = true)
 |-- AI_Tools_Used: string (nullable = true)
 |-- Daily_Usage_Hours: double (nullable = true)
 |-- Use_Cases: string (nullable = true)
 |-- Trust_in_AI_Tools: integer (nullable = true)
 |-- Impact_on_Grades: integer (nullable = true)
 |-- Do_Professors_Allow_Use: string (nullable = true)
 |-- Preferred_AI_Tool: string (nullable = true)
 |-- Awareness_Level: integer (nullable = true)
 |-- Willing_to_Pay_for_Access: string (nullable = true)
 |-- State: string (nullable = false)
 |-- Device_Used: string (nullable = true)
 |-- Internet_Access: string (nullable = true)
 |-- Willing_to_Pay_Numeric: integer (nullable = false)
 |-- Stream_Index: double (nullable = false)
 |-- Stream_Vector: vector (nullable = true)
 |-- features: vector (nullable = true)


Final DataFrame wi

In [36]:

# Split the data into training and test sets
# 70% for training, 30% for testing. 'seed' ensures reproducibility.
(trainingData, testData) = studentDF_final.randomSplit([0.7, 0.3], seed=42)

print(f"Training Data Count: {trainingData.count()}")
print(f"Test Data Count: {testData.count()}")

Training Data Count: 2586
Test Data Count: 1028


In [37]:
# Initialize the Linear Regression model
# 'featuresCol' = input vector of features
# 'labelCol' = target variable 
linearModel = LinearRegression(featuresCol="features", labelCol="Daily_Usage_Hours")

# Train the model on the training data
print("\nTraining the Linear Regression model...")
linear_model = linearModel.fit(trainingData)
print("Model training complete.")


Training the Linear Regression model...


25/08/14 22:56:10 WARN Instrumentation: [1ebfd524] regParam is zero, which might cause numerical instability and overfitting.
25/08/14 22:56:10 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/08/14 22:56:10 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


Model training complete.


In [40]:
# Make predictions on the test data
predictions = linear_model.transform(testData)

# Show a few predictions alongside the actual values and features
print("\nSample Predictions (Actual vs. Predicted Daily_Usage_Hours):")
predictions.select("Daily_Usage_Hours", "prediction", "features").show(5, truncate=False)


Sample Predictions (Actual vs. Predicted Daily_Usage_Hours):
+-----------------+------------------+----------------------------------------+
|Daily_Usage_Hours|prediction        |features                                |
+-----------------+------------------+----------------------------------------+
|4.8              |2.541356682907782 |(13,[0,1,2,3,7],[4.0,3.0,9.0,1.0,1.0])  |
|4.2              |2.6884280545267325|(13,[0,1,2,3,8],[4.0,1.0,5.0,1.0,1.0])  |
|1.7              |2.555942472226904 |(13,[0,1,2,3,11],[5.0,-2.0,5.0,1.0,1.0])|
|2.3              |2.3600599881142275|(13,[0,1,2,7],[1.0,-3.0,3.0,1.0])       |
|2.6              |2.7573612648205374|(13,[0,1,2,3],[2.0,2.0,3.0,1.0])        |
+-----------------+------------------+----------------------------------------+
only showing top 5 rows


In [42]:
# Get model summary (for evaluation metrics)
trainingSummary = linear_model.summary

# Print the evaluation metrics
print(f"\nRoot Mean Squared Error (RMSE) on test data: {trainingSummary.rootMeanSquaredError}")
print(f"R-squared (R2) on test data: {trainingSummary.r2}")

# You can also print the coefficients and intercept
print("\nModel Coefficients:", linear_model.coefficients)
print("Model Intercept:", linear_model.intercept)




Root Mean Squared Error (RMSE) on test data: 1.1903326191173762
R-squared (R2) on test data: 0.02065071112641026

Model Coefficients: [-0.01945369532242912,0.02121758925078574,0.006858351064513779,0.07120213886904463,0.019322244176239373,-0.3968857565942115,-0.04778160915454097,-0.23946488690576576,-0.02252493252718884,-0.22124195758726567,-0.30684406807703835,-0.0719040517522307,0.08615591020184102]
Model Intercept: 2.6620562849012384


In [43]:
# Initialize the GBT Regressor model.
gbtModel = GBTRegressor(featuresCol="features", labelCol="Daily_Usage_Hours", maxIter=10)

# Train the model on the training data.
gbt_model = gbtModel.fit(trainingData)

# Make predictions on the test data.
gbt_predictions = gbt_model.transform(testData)

# Evaluate the model using a RegressionEvaluator.
evaluator = RegressionEvaluator(labelCol="Daily_Usage_Hours", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(gbt_predictions)
r2 = evaluator.evaluate(gbt_predictions, {evaluator.metricName: "r2"})

print(f"Gradient-Boosted Tree Regression (GBTRegressor) Results:")
print(f"Root Mean Squared Error (RMSE): {rmse}")
print(f"R-squared (R2): {r2}")

Gradient-Boosted Tree Regression (GBTRegressor) Results:
Root Mean Squared Error (RMSE): 1.1906205709140807
R-squared (R2): 0.07564059315875016


In [47]:


# Initialize the K-Means model. You must specify the number of clusters (e.g., k=3).
kmeans = KMeans(featuresCol="features", k=3)

# Train the model. Note that there is no label column.
kmeans_model = kmeans.fit(studentDF_final)

# Make predictions to see which cluster each student belongs to.
kmeans_predictions = kmeans_model.transform(studentDF_final)

# Evaluate the clustering quality using the Silhouette Score.
evaluator = ClusteringEvaluator()
silhouette_score = evaluator.evaluate(kmeans_predictions)

print(f"K-Means (Clustering) Results:")
print(f"Silhouette Score: {silhouette_score}")

K-Means (Clustering) Results:
Silhouette Score: 0.4694151247218369


In [48]:


# Initialize the Random Forest Regressor model
rf = RandomForestRegressor(featuresCol="features", labelCol="Daily_Usage_Hours", numTrees=10)

# Train the model on the training data
print("Training the Random Forest model...")
rf_model = rf.fit(trainingData)
print("Model training complete.")

# Make predictions on the test data
rf_predictions = rf_model.transform(testData)

# Evaluate the model using a RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="Daily_Usage_Hours", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(rf_predictions)
r2 = evaluator.evaluate(rf_predictions, {evaluator.metricName: "r2"})

print(f"\nRandom Forest Regressor Results:")
print(f"Root Mean Squared Error (RMSE): {rmse}")
print(f"R-squared (R2): {r2}")

Training the Random Forest model...
Model training complete.

Random Forest Regressor Results:
Root Mean Squared Error (RMSE): 1.2131915421409605
R-squared (R2): 0.0402616489762303


In [49]:


# ======================
# Collect metrics from all models
# ======================
results = []

# Linear Regression
results.append({
    "Model": "Linear Regression",
    "RMSE": trainingSummary.rootMeanSquaredError,
    "R2": trainingSummary.r2
})

# Gradient Boosted Trees
results.append({
    "Model": "Gradient-Boosted Trees",
    "RMSE": evaluator.evaluate(gbt_predictions),
    "R2": evaluator.evaluate(gbt_predictions, {evaluator.metricName: "r2"})
})

# Random Forest
results.append({
    "Model": "Random Forest",
    "RMSE": evaluator.evaluate(rf_predictions),
    "R2": evaluator.evaluate(rf_predictions, {evaluator.metricName: "r2"})
})

In [50]:

# ======================
# Convert to Spark DataFrame
# ======================
results_df = spark.createDataFrame(results)

# Create output folder if it doesn't exist
output_dir = os.path.join(os.getcwd(), "model_metrics")
os.makedirs(output_dir, exist_ok=True)

# Save as CSV for Tableau
results_df.coalesce(1) \
    .write.mode("overwrite") \
    .option("header", True) \
    .csv(output_dir)

print(f"✅ Model metrics saved to: {output_dir}")
results_df.show()

[Stage 252:>                                                        (0 + 1) / 1]

✅ Model metrics saved to: /Users/tinkuacchu/BigData/model_metrics
+--------------------+-------------------+------------------+
|               Model|                 R2|              RMSE|
+--------------------+-------------------+------------------+
|   Linear Regression|0.02065071112641026|1.1903326191173762|
|Gradient-Boosted ...|0.07564059315875016|1.1906205709140807|
|       Random Forest| 0.0402616489762303|1.2131915421409605|
+--------------------+-------------------+------------------+



                                                                                