##**Predicting User Word Recall on Duolingo Using SparkML (13M+ Records)**

_A Scalable Machine Learning Pipeline for Spaced Repetition Analysis_

#####Section 1

####**Data Ingestion & Cleaning** 
#####_(Understanding the Data)_

In [0]:
# Loading the dataset
df = spark.read.csv("dbfs:/FileStore/tables/learning_traces_13m.csv", header=True, inferSchema=True)

In [0]:
# Checking schema
df.printSchema()

# Counting total rows
print("Initial row count:", df.count())

root
 |-- p_recall: double (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- delta: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- learning_language: string (nullable = true)
 |-- ui_language: string (nullable = true)
 |-- lexeme_id: string (nullable = true)
 |-- lexeme_string: string (nullable = true)
 |-- history_seen: integer (nullable = true)
 |-- history_correct: integer (nullable = true)
 |-- session_seen: integer (nullable = true)
 |-- session_correct: integer (nullable = true)

Initial row count: 12854226


In [0]:
# Showing a few rows
df.show(5, truncate=False)

+--------+----------+--------+-------+-----------------+-----------+--------------------------------+--------------------------------+------------+---------------+------------+---------------+
|p_recall|timestamp |delta   |user_id|learning_language|ui_language|lexeme_id                       |lexeme_string                   |history_seen|history_correct|session_seen|session_correct|
+--------+----------+--------+-------+-----------------+-----------+--------------------------------+--------------------------------+------------+---------------+------------+---------------+
|1.0     |1362076081|27649635|u:FO   |de               |en         |76390c1350a8dac31186187e2fe1e178|lernt/lernen<vblex><pri><p3><sg>|6           |4              |2           |2              |
|0.5     |1362076081|27649635|u:FO   |de               |en         |7dfd7086f3671685e2cf1c1da72796d7|die/die<det><def><f><sg><nom>   |4           |4              |2           |1              |
|1.0     |1362076081|27649635|u:FO 

In [0]:
# Checking column data types
df.dtypes

Out[6]: [('p_recall', 'double'),
 ('timestamp', 'int'),
 ('delta', 'int'),
 ('user_id', 'string'),
 ('learning_language', 'string'),
 ('ui_language', 'string'),
 ('lexeme_id', 'string'),
 ('lexeme_string', 'string'),
 ('history_seen', 'int'),
 ('history_correct', 'int'),
 ('session_seen', 'int'),
 ('session_correct', 'int')]

In [0]:
# Checking for nulls or NaNs
from pyspark.sql.functions import col, isnan, when, count
df.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in df.columns]).show()

+--------+---------+-----+-------+-----------------+-----------+---------+-------------+------------+---------------+------------+---------------+
|p_recall|timestamp|delta|user_id|learning_language|ui_language|lexeme_id|lexeme_string|history_seen|history_correct|session_seen|session_correct|
+--------+---------+-----+-------+-----------------+-----------+---------+-------------+------------+---------------+------------+---------------+
|       0|        0|    0|      0|                0|          0|        0|            0|           0|              0|           0|              0|
+--------+---------+-----+-------+-----------------+-----------+---------+-------------+------------+---------------+------------+---------------+



In [0]:
# Getting summary statistics
df.describe().show()

# Checking duplicate rows
print("Duplicate rows:", df.count() - df.dropDuplicates().count())

+-------+-------------------+-------------------+------------------+--------+-----------------+-----------+--------------------+------------------+-----------------+------------------+------------------+------------------+
|summary|           p_recall|          timestamp|             delta| user_id|learning_language|ui_language|           lexeme_id|     lexeme_string|     history_seen|   history_correct|      session_seen|   session_correct|
+-------+-------------------+-------------------+------------------+--------+-----------------+-----------+--------------------+------------------+-----------------+------------------+------------------+------------------+
|  count|           12854226|           12854226|          12854226|12854226|         12854226|   12854226|            12854226|          12854226|         12854226|          12854226|          12854226|          12854226|
|   mean| 0.8961055619883076|1.362589311570285E9| 729581.0618038768|    null|             null|       null| 

In [0]:
# Checking for empty string values in string columns
from pyspark.sql.functions import trim
df.select([count(when(trim(col(c)) == "", c)).alias(c) 
           for c in df.columns if df.schema[c].dataType.simpleString() == 'string']).show()

+-------+-----------------+-----------+---------+-------------+
|user_id|learning_language|ui_language|lexeme_id|lexeme_string|
+-------+-----------------+-----------+---------+-------------+
|      0|                0|          0|        0|            0|
+-------+-----------------+-----------+---------+-------------+



In [0]:
# Removing duplicate rows
df = df.dropDuplicates()

# Double-checking for remaining duplicates
print("Remaining duplicates after drop:", df.count() - df.dropDuplicates().count())

Remaining duplicates after drop: 0


In [0]:
# Filtering rows where delta is not positive
df = df.filter(col("delta") > 0)

In [0]:
# Filtering out rows where session or history values are invalid
df = df.filter((col("history_seen") > 0) & (col("session_seen") > 0))
df = df.filter((col("history_correct") <= col("history_seen")) & 
               (col("session_correct") <= col("session_seen")))

In [0]:
# Showing final cleaned row count
print("Cleaned row count:", df.count())

Cleaned row count: 12854145


####**Sampling ~1M Rows for Efficiency**

In [0]:
from pyspark.sql.functions import col
# Used ChatGpt to understand how do sample ~1M Rows

# Counting total rows and unique users
total_rows = df.count()
total_users = df.select("user_id").distinct().count()

# Estimating fraction to get around 1M rows
target_sample_size = 1000000
estimated_fraction = target_sample_size / total_rows

# Sampling users (slightly oversampling to be safe)
user_sample = df.select("user_id").distinct().sample(fraction=estimated_fraction * 1.4, seed=42)

# Joining sampled users with original data
df_sample = df.join(user_sample, on="user_id", how="inner")

# Caching and checking final sample size
df_sample.cache()
print("Sample row count:", df_sample.count())

# Registering sample as temporary SQL view
df_sample.createOrReplaceTempView("duolingo")


Sample row count: 1413476


#####Section 2

####**Exploratory Data Analysis (EDA)**

In [0]:
%sql
-- Summary stats for p_recall and delta
SELECT 
  MIN(p_recall), MAX(p_recall), AVG(p_recall), STDDEV(p_recall),
  MIN(delta), MAX(delta), AVG(delta), STDDEV(delta)
FROM duolingo


min(p_recall),max(p_recall),avg(p_recall),stddev(p_recall),min(delta),max(delta),avg(delta),stddev(delta)
0.0,1.0,0.8972467686061687,0.2696632277350794,1,40328362,735517.8692825347,2284478.0440377155


In [0]:
# Same summary stats using Spark SQL (Python)
spark.sql("""
SELECT 
  MIN(p_recall), MAX(p_recall), AVG(p_recall), STDDEV(p_recall),
  MIN(delta), MAX(delta), AVG(delta), STDDEV(delta)
FROM duolingo
""").show()

# Average recall grouped by learning language
spark.sql("""
SELECT learning_language, COUNT(*) AS total, AVG(p_recall) AS avg_recall
FROM duolingo
GROUP BY learning_language
ORDER BY avg_recall DESC
""").show()

# Grouping delta into time-based categories and averaging recall
spark.sql("""
SELECT 
  CASE 
    WHEN delta < 3600 THEN 'under_1_hr'   
    WHEN delta < 86400 THEN '1_hr_to_1_day'
    WHEN delta < 604800 THEN '1_day_to_1_week'
    WHEN delta < 2419200 THEN '1_week_to_1_month'
    ELSE 'over_1_month'
  END AS delta_group,
  COUNT(*) AS total,
  AVG(p_recall) AS avg_recall
FROM duolingo
GROUP BY delta_group
ORDER BY avg_recall DESC
""").show()


+-------------+-------------+------------------+------------------+----------+----------+-----------------+------------------+
|min(p_recall)|max(p_recall)|     avg(p_recall)|  stddev(p_recall)|min(delta)|max(delta)|       avg(delta)|     stddev(delta)|
+-------------+-------------+------------------+------------------+----------+----------+-----------------+------------------+
|          0.0|          1.0|0.8972467686061687|0.2696632277350794|         1|  40328362|735517.8692825347|2284478.0440377155|
+-------------+-------------+------------------+------------------+----------+----------+-----------------+------------------+

+-----------------+------+------------------+
|learning_language| total|        avg_recall|
+-----------------+------+------------------+
|               pt| 35315|0.9099329569772772|
|               it| 85326|0.9075119063639625|
|               es|369781|0.9000178752481279|
|               en|565089|0.8983397422173838|
|               de|149929|0.89586561594938

**Visualizations**

In [0]:
# Display distribution of p_recall
display(df_sample.select("p_recall"))

p_recall
1.0
1.0
0.0
1.0
1.0
1.0
1.0
1.0
1.0
1.0


Databricks visualization. Run in Databricks to view.

In [0]:
# Creating time group for delta
from pyspark.sql.functions import when
# Used ChatGpt for allocating divisions

df_viz = df_sample.withColumn("delta_group", when(col("delta") < 3600, "under_1_hr")
    .when(col("delta") < 86400, "1_hr_to_1_day")
    .when(col("delta") < 604800, "1_day_to_1_week")
    .when(col("delta") < 2419200, "1_week_to_1_month")
    .otherwise("over_1_month")
)

# Avg recall by delta group
display(df_viz.groupBy("delta_group").avg("p_recall"))

delta_group,avg(p_recall)
under_1_hr,0.9102995163274118
1_week_to_1_month,0.8859884519688876
1_hr_to_1_day,0.8974853095733323
over_1_month,0.8692742874923785
1_day_to_1_week,0.8911675107038912


Databricks visualization. Run in Databricks to view.

In [0]:
# Avg recall by learning language
display(df_sample.groupBy("learning_language").avg("p_recall"))

learning_language,avg(p_recall)
en,0.8983397422173838
pt,0.9099329569772772
de,0.8958656159493823
es,0.9000178752481279
it,0.9075119063639624
fr,0.8839839150609731


Databricks visualization. Run in Databricks to view.

In [0]:
# Creating history accuracy
df_sample = df_sample.withColumn("history_accuracy", col("history_correct") / col("history_seen"))
display(df_sample.select("history_accuracy", "p_recall"))

history_accuracy,p_recall
0.9,1.0
0.8888888888888888,1.0
0.6,0.0
1.0,1.0
0.75,1.0
0.9285714285714286,1.0
0.8571428571428571,1.0
0.8888888888888888,1.0
0.8846153846153846,1.0
0.9074074074074074,1.0


Databricks visualization. Run in Databricks to view.

In [0]:
# Relation between history_seen and p_recall
display(df_sample.select("history_seen", "p_recall"))

history_seen,p_recall
90,1.0
27,1.0
5,0.0
6,1.0
4,1.0
42,1.0
7,1.0
27,1.0
26,1.0
54,1.0


Databricks visualization. Run in Databricks to view.

In [0]:
# Creating session accuracy
df_sample = df_sample.withColumn("session_accuracy", col("session_correct") / col("session_seen"))
display(df_sample.select("session_accuracy", "p_recall"))

session_accuracy,p_recall
1.0,1.0
1.0,1.0
0.0,0.0
1.0,1.0
1.0,1.0
1.0,1.0
1.0,1.0
1.0,1.0
1.0,1.0
1.0,1.0


Databricks visualization. Run in Databricks to view.

In [0]:
# Count by UI language
display(df_sample.groupBy("ui_language").count())

ui_language,count
en,848387
es,422227
it,42611
pt,100251


Databricks visualization. Run in Databricks to view.

#####Section 3
####**Feature Engineering & ML Pipeline Setup**

In [0]:
from pyspark.sql.functions import col, round

# Creating accuracy metrics
df_sample = df_sample.withColumn("history_accuracy", round(col("history_correct") / col("history_seen"), 4)) \
                     .withColumn("session_accuracy", round(col("session_correct") / col("session_seen"), 4))


####**Correlation Analysis**

In [0]:
    # Selecting numeric features
numeric_cols = ["p_recall", "delta", "history_seen", "history_correct", 
                "session_seen", "session_correct", "history_accuracy", "session_accuracy"]

df_numeric = df_sample.select(*numeric_cols)

# Assembling vector for correlation
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features_vector")
df_vector = assembler.transform(df_numeric).select("features_vector")

# Getting Pearson correlation matrix
from pyspark.ml.stat import Correlation
correlation_matrix = Correlation.corr(df_vector, "features_vector", "pearson").head()[0]


In [0]:
# Displaying using Pandas for readability
import pandas as pd
corr_array = correlation_matrix.toArray()
corr_df = pd.DataFrame(corr_array, index=numeric_cols, columns=numeric_cols)

# Converting to Spark DF for interactive display
spark_corr_df = spark.createDataFrame(corr_df.reset_index().rename(columns={'index': 'Feature'}))
display(spark_corr_df)




Feature,p_recall,delta,history_seen,history_correct,session_seen,session_correct,history_accuracy,session_accuracy
p_recall,1.0,-0.0304075029880846,-0.0225720105957762,-0.0109021500256581,0.0405644717911731,0.3000932911005142,0.1019624662064468,0.9999999997592248
delta,-0.0304075029880846,1.0,-0.0432274078646378,-0.0443887538189714,0.002372153174309,-0.0064804757152388,0.021391224762247,-0.0304074884995842
history_seen,-0.0225720105957762,-0.0432274078646378,1.0,0.98219282856533,0.0074778041642591,-0.0017849946518779,-0.0417212256407685,-0.0225720182998422
history_correct,-0.0109021500256581,-0.0443887538189714,0.98219282856533,1.0,0.0070791115561597,0.0022504128894043,-0.0121871220275914,-0.010902158566838
session_seen,0.0405644717911731,0.002372153174309,0.0074778041642591,0.0070791115561597,1.0,0.9528981014704913,-0.057716694018157,0.0405651972283169
session_correct,0.3000932911005142,-0.0064804757152388,-0.0017849946518779,0.0022504128894043,0.9528981014704913,1.0,-0.0209215629459704,0.3000933480653667
history_accuracy,0.1019624662064468,0.021391224762247,-0.0417212256407685,-0.0121871220275914,-0.057716694018157,-0.0209215629459704,1.0,0.1019622539562554
session_accuracy,0.9999999997592248,-0.0304074884995842,-0.0225720182998422,-0.010902158566838,0.0405651972283169,0.3000933480653667,0.1019622539562554,1.0


####**Dropping Highly Correlated or Redundant Columns**

In [0]:
columns_to_drop = [
    "session_accuracy",       # Leaks the target
    "history_seen",           # Already captured in history_accuracy
    "history_correct",        # Same
    "session_seen"            # Very close to session_correct
]

df_sample = df_sample.drop(*columns_to_drop)


####**Building the SparkML Pipeline**

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

# Step 1: Encode categorical columns
lang_indexer = StringIndexer(inputCol="learning_language", outputCol="learning_language_index")
ui_indexer = StringIndexer(inputCol="ui_language", outputCol="ui_language_index")

lang_encoder = OneHotEncoder(inputCol="learning_language_index", outputCol="learning_language_vec")
ui_encoder = OneHotEncoder(inputCol="ui_language_index", outputCol="ui_language_vec")

In [0]:
# Step 2: Assemble and scale numerical columns
num_cols = ["delta", "history_accuracy", "session_correct"]
num_assembler = VectorAssembler(inputCols=num_cols, outputCol="numerical_vector")

scaler = StandardScaler(inputCol="numerical_vector", outputCol="numerical_scaled", withMean=True, withStd=True)

In [0]:
# Step 3: Combine everything into one feature vector
final_assembler = VectorAssembler(
    inputCols=["numerical_scaled", "learning_language_vec", "ui_language_vec"],
    outputCol="features"
)

# Step 4: Full pipeline
pipeline = Pipeline(stages=[
    lang_indexer, ui_indexer,
    lang_encoder, ui_encoder,
    num_assembler, scaler,
    final_assembler
])

In [0]:
# Step 5: Fit and transform the data
model_data = pipeline.fit(df_sample).transform(df_sample).select("features", "p_recall")

####**Sanity Checks on Transformed Data**

In [0]:
model_data.printSchema()
model_data.show(5, truncate=False)
print("Total model rows:", model_data.count())

# Check number of features in vector
from pyspark.ml.linalg import DenseVector
model_data.select("features").rdd.map(lambda row: row[0].size).take(1)


root
 |-- features: vector (nullable = true)
 |-- p_recall: double (nullable = true)

+----------------------------------------------------------------------------------------+--------+
|features                                                                                |p_recall|
+----------------------------------------------------------------------------------------+--------+
|(11,[0,1,2,5,8],[0.2081907208339154,-0.0050313717084598635,-0.4944131890533477,1.0,1.0])|1.0     |
|(11,[0,1,2,5,8],[0.2081907208339154,-0.08693404965672148,-0.4944131890533477,1.0,1.0])  |1.0     |
|(11,[0,1,2,5,8],[0.20734501342821618,-2.2186172622020175,-1.2509738952617708,1.0,1.0])  |0.0     |
|(11,[0,1,2,5,8],[-0.2520728403520729,0.7328305917893925,4.04495104819719,1.0,1.0])      |1.0     |
|(11,[0,1,2,5,8],[-0.2520728403520729,-1.1118243169552386,0.2621475171550754,1.0,1.0])   |1.0     |
+----------------------------------------------------------------------------------------+--------+
only showing t

####**Categorical Labels**

In [0]:
# Number of unique languages
df_sample.select("learning_language").distinct().count()
df_sample.select("ui_language").distinct().count()

# Print label order from indexer
lang_indexer_model = lang_indexer.fit(df_sample)
print(lang_indexer_model.labels)

ui_indexer_model = ui_indexer.fit(df_sample)
print(ui_indexer_model.labels)


['en', 'es', 'fr', 'de', 'it', 'pt']
['en', 'es', 'pt', 'it']


####**Train/Test Split**

In [0]:
train_data, test_data = model_data.randomSplit([0.8, 0.2], seed=42)

print("Train size:", train_data.count())
print("Test size:", test_data.count())


Train size: 1130839
Test size: 282637


#####Section 4
####**Model Training & Evaluation**

_Linear Regression (Baseline)_

In [0]:
from pyspark.ml.regression import LinearRegression

# Train a basic linear regression model
lr = LinearRegression(featuresCol="features", labelCol="p_recall")
lr_model = lr.fit(train_data)

In [0]:
# Evaluate on test data
test_results = lr_model.evaluate(test_data)

In [0]:
print("Linear Regression Evaluation Metrics:")
print("RMSE (Root Mean Squared Error):", test_results.rootMeanSquaredError)
print("MAE (Mean Absolute Error):", test_results.meanAbsoluteError)
print("R² (Coefficient of Determination):", test_results.r2)

# Model coefficients
print("Intercept:", lr_model.intercept)
print("Coefficients:", lr_model.coefficients)

Linear Regression Evaluation Metrics:
RMSE (Root Mean Squared Error): 0.25441094554280064
MAE (Mean Absolute Error): 0.17665984815577324
R² (Coefficient of Determination): 0.10236793072266948
Intercept: 0.9102033115318815
Coefficients: [-0.008092343699874365,0.0292601192762305,0.08179663625054047,-0.00011602233451057806,-0.007685611233261963,-0.024521273933113905,-0.014496295279496838,-0.0010430839622307374,0.00011602233421230905,-0.01521164424042775,-0.01870299240946138]


_Random Forest Regression (Improved Model)_

In [0]:
from pyspark.ml.regression import RandomForestRegressor

# Configure random forest model
rf = RandomForestRegressor(
    featuresCol="features",
    labelCol="p_recall",
    numTrees=50,
    maxDepth=10,
    seed=42
)

# Train the model
rf_model = rf.fit(train_data)

In [0]:
# Predict on test data
rf_predictions = rf_model.transform(test_data)

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

# Evaluate performance
evaluator = RegressionEvaluator(labelCol="p_recall", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(rf_predictions)

mae = evaluator.setMetricName("mae").evaluate(rf_predictions)
r2 = evaluator.setMetricName("r2").evaluate(rf_predictions)

print("Random Forest Evaluation Metrics:")
print("RMSE:", rmse)
print("MAE:", mae)
print("R²:", r2)


Random Forest Evaluation Metrics:
RMSE: 0.1132327314123569
MAE: 0.06408201835271668
R²: 0.8221840382909643


**Feature Importance from Random Forest**

In [0]:
# Display feature importances
for idx, importance in enumerate(rf_model.featureImportances):
    print(f"Feature {idx}: Importance = {importance}")

Feature 0: Importance = 0.0015380793136415608
Feature 1: Importance = 0.00641344988244703
Feature 2: Importance = 0.9914880715246358
Feature 3: Importance = 4.392234366491882e-05
Feature 4: Importance = 5.023363290887257e-05
Feature 5: Importance = 0.0002195451626455927
Feature 6: Importance = 2.1132755125715064e-05
Feature 7: Importance = 4.2929383528659605e-05
Feature 8: Importance = 7.67958224000168e-05
Feature 9: Importance = 7.869407556065516e-05
Feature 10: Importance = 2.714610344118726e-05


#####Section 5
####**Model Tuning with Cross-Validation**

In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
# Used ChatGpt to understand ParamGridBuilder and CrossValidator

# Define evaluator (RMSE as base metric)
evaluator = RegressionEvaluator(
    labelCol="p_recall",
    predictionCol="prediction",
    metricName="rmse"
)

# Parameter grid for tuning
paramGrid = ParamGridBuilder()\
    .addGrid(rf.numTrees, [20, 50])\
    .addGrid(rf.maxDepth, [5, 10])\
    .addGrid(rf.minInstancesPerNode, [1, 5])\
    .build()

# Setup CrossValidator
crossval = CrossValidator(
    estimator=rf,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,
    seed=42,
    parallelism=2  
)


In [0]:
# Train with tuning
print("Fitting model, please wait...")
cv_model = crossval.fit(train_data)
print("Model training completed.")

In [0]:
# Predictions from best model
cv_predictions = cv_model.transform(test_data)

In [0]:
# Evaluate
rmse = evaluator.evaluate(cv_predictions)
mae = evaluator.setMetricName("mae").evaluate(cv_predictions)
r2 = evaluator.setMetricName("r2").evaluate(cv_predictions)

print("Tuned Random Forest Results:")
print("RMSE:", rmse)
print("MAE:", mae)
print("R²:", r2)