In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [5]:
spark = (
  SparkSession.builder
    .appName("Codeway")
    .master("local[*]")
    .config("spark.driver.memory", "4g")
    .getOrCreate()
)

Exception ignored in PyObject_HasAttr(); consider using PyObject_HasAttrWithError(), PyObject_GetOptionalAttr() or PyObject_GetAttr():
Traceback (most recent call last):
  File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
AttributeError: partially initialized module 'pandas' from '/Users/macbookpro/PyCharmMiscProject/.venv/lib/python3.13/site-packages/pandas/__init__.py' has no attribute '_pandas_parser_CAPI' (most likely due to a circular import)
25/07/27 15:33:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/07/27 15:33:34 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [6]:
df_with_cohort = spark.read.json("/Users/macbookpro/PyCharmMiscProject/df_with_cohort_json")

                                                                                

In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

1) Time‐aware train/test split (80/20 by cohort_index)

In [8]:
max_idx    = df_with_cohort.agg(F.max("cohort_index")).first()[0]
split_idx  = int(max_idx * 0.8)
train      = df_with_cohort.filter(F.col("cohort_index") <= split_idx).withColumn("rand", F.rand(seed=12345)).orderBy("rand")
test       = df_with_cohort.filter(F.col("cohort_index")  > split_idx)

                                                                                

In [82]:
test.write \
    .mode("overwrite") \
    .option("compression", "gzip") \
    .json("/Users/macbookpro/PyCharmMiscProject/test_set/cohort_test_set")

 2) Preprocessing pipeline (fit once)

In [9]:
from pyspark import StorageLevel

In [10]:
si_season  = StringIndexer(inputCol="cohort_season", outputCol="season_idx", handleInvalid="keep")
oh_season  = OneHotEncoder(inputCols=["season_idx"], outputCols=["season_vec"])
feature_cols = [
    "cohort_size",
    "iOS", "iPadOS",
    "avg_auto_renew_off","std_auto_renew_off",
    "avg_free_trial","std_free_trial",
    "avg_paywall","std_paywall",
    "avg_refund","std_refund",
    "avg_renewal","std_renewal",
    "avg_subscribe","std_subscribe",
    "mean_event_hour","std_event_hour",
    "mean_revenue_1y","std_revenue_1y",
    "season_vec"
]
assembler  = VectorAssembler(inputCols=feature_cols, outputCol="features")

preproc    = Pipeline(stages=[si_season, oh_season, assembler])
train = train.drop("rand")
pp_model   = preproc.fit(train)

train_pp   = pp_model.transform(train).persist(StorageLevel.MEMORY_AND_DISK)
test_pp    = pp_model.transform(test).persist(StorageLevel.MEMORY_AND_DISK)

25/07/27 15:34:06 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


3) Define two GBT regressors with explicit prediction columns

In [11]:
gbt_mean = GBTRegressor(
    featuresCol="features",
    labelCol="mean_revenue_15d",
    predictionCol="pred15_mean",
    seed=12345
)
gbt_std  = GBTRegressor(
    featuresCol="features",
    labelCol="std_revenue_15d",
    predictionCol="pred15_std",
    seed=12345
)

 4) Evaluators

In [12]:
evaluator_mean = RegressionEvaluator(
    labelCol="mean_revenue_15d",
    predictionCol="pred15_mean",
    metricName="mae"
)
evaluator_std  = RegressionEvaluator(
    labelCol="std_revenue_15d",
    predictionCol="pred15_std",
    metricName="mae"
)

5) Hyperparameter grids

In [13]:
paramGrid_mean = (ParamGridBuilder()
    .addGrid(gbt_mean.maxDepth, [3, 5])    # try shallow vs. a bit deeper
    .addGrid(gbt_mean.maxIter,  [20])      # just one low-cost value
    .addGrid(gbt_mean.stepSize, [0.1])
    .build()
)

paramGrid_std  = (ParamGridBuilder()
    .addGrid(gbt_std.maxDepth, [3, 5])
    .addGrid(gbt_std.maxIter,  [20])
    .addGrid(gbt_std.stepSize, [0.1])
    .build()
)

Define Pipeline for importing the model to other notebooks

In [26]:
from pyspark.ml import Pipeline

# build a full pipeline that applies pp_model then the GBT
pipeline_mean = Pipeline(stages=[pp_model, gbt_mean])
pipeline_std  = Pipeline(stages=[pp_model, gbt_std])

6) Cross-validators

In [27]:
cv_mean = CrossValidator(
    estimator=pipeline_mean,
    estimatorParamMaps=paramGrid_mean,
    evaluator=evaluator_mean,
    numFolds=3
)

cv_std = CrossValidator(
    estimator=pipeline_std,
    estimatorParamMaps=paramGrid_std,
    evaluator=evaluator_std,
    numFolds=3
)

In [28]:
cv_mean.setParallelism(4)
cv_std.setParallelism(4)

CrossValidator_2f5b94b6d636

7) Train both models on preprocessed training data & Save the 2 Models to Disk

In [62]:
cv_model_mean = cv_mean.fit(train)
bestModelMean = cv_model_mean.bestModel    # the PipelineModel tuned for the mean label
bestModelMean.write().overwrite().save("/Users/macbookpro/PyCharmMiscProject/models/cohort_mean_model")

                                                                                

In [63]:
cv_model_std  = cv_std.fit(train)
bestModelStd  = cv_model_std.bestModel # the PipelineModel tuned for the std label
bestModelStd.write().overwrite().save("/Users/macbookpro/PyCharmMiscProject/models/cohort_std_model")

8) Generate predictions on preprocessed test data

In [64]:
preds = cv_model_mean.transform(test)

In [65]:
preds = preds.drop("season_idx", "season_vec", "country_vec", "features")
preds = cv_model_std.transform(preds)

9) Annualize the 15-day forecasts if desired

In [66]:
factor     = 365.0 / 15.0
std_factor = (365.0 / 15.0) ** 0.5

preds = preds.withColumn("forecast_mean_1y", F.col("pred15_mean") * factor) \
             .withColumn("forecast_std_1y",  F.col("pred15_std")  * std_factor)

In [67]:
# write "preds" out once—and forget Spark ML for downstream analysis
preds.write.mode("overwrite").parquet("/Users/macbookpro/PyCharmMiscProject/Output_Dataframes/cohort_preds.parquet")

                                                                                

 preds now contains:
   - pred15_mean, pred15_std
   - forecast_mean_1y, forecast_std_1y
   - plus all original cohort features for analysis

# Evaluate

In [68]:
mae_mean = evaluator_mean.evaluate(preds)
mae_std = evaluator_std.evaluate(preds)

In [69]:
print("15-day MAE (mean):", mae_mean)
print("15-day MAE (std) :", mae_std)

15-day MAE (mean): 0.020836992115998255
15-day MAE (std) : 0.09661682129958578


1) Compute both averages together

In [70]:
mean_vals = test_pp.agg(
    F.avg("mean_revenue_15d" ).alias("mean_true_mean"),
    F.avg("std_revenue_15d"  ).alias("mean_true_std")
).first()

mean_true_mean = mean_vals["mean_true_mean"]
mean_true_std  = mean_vals["mean_true_std"]

In [71]:
relative_mae_mean = mae_mean / mean_true_mean
relative_mae_std  = mae_std  / mean_true_std

2) Print as percentages

In [72]:
print(f"Relative MAE (mean): {relative_mae_mean:.2%}")
print(f"Relative MAE (std) : {relative_mae_std:.2%}")

Relative MAE (mean): 4.12%
Relative MAE (std) : 3.41%


# Write the Output

preds is the DataFrame obtained after scoring and annualizing:
it contains all cohort features plus forecast_mean_1y & forecast_std_1y.

Select exactly the columns wanted in the output

In [73]:
output_df = preds.select(
    "cohort_index",
    "first_event_date",
    "cohort_season",
    "cohort_size",

    # original cohort features
    "iOS", "iPadOS", "avg_stickiness_ratio",
    "avg_auto_renew_off", "std_auto_renew_off",
    "avg_free_trial",      "std_free_trial",
    "avg_paywall",         "std_paywall",
    "avg_refund",          "std_refund",
    "avg_renewal",         "std_renewal",
    "avg_subscribe",       "std_subscribe",
    "mean_event_hour",     "std_event_hour",
    "mean_revenue_1y",     "std_revenue_1y",

    # 1-year forecasts
    "forecast_mean_1y",
    "forecast_std_1y"
)

In [74]:
# list all columns except the three we skip
skip = {"cohort_index", "first_event_date", "cohort_season"}
cols = [c for c in output_df.columns if c not in skip]

# build one aggregation per column: count where value < 0
agg_exprs = [
    F.sum(F.when(F.col(c) < 0, 1).otherwise(0)).alias(c)
    for c in cols
]

# run and display
neg_counts_df = output_df.agg(*agg_exprs)
neg_counts_df.show(truncate=False)

+-----------+---+------+--------------------+------------------+------------------+--------------+--------------+-----------+-----------+----------+----------+-----------+-----------+-------------+-------------+---------------+--------------+---------------+--------------+----------------+---------------+
|cohort_size|iOS|iPadOS|avg_stickiness_ratio|avg_auto_renew_off|std_auto_renew_off|avg_free_trial|std_free_trial|avg_paywall|std_paywall|avg_refund|std_refund|avg_renewal|std_renewal|avg_subscribe|std_subscribe|mean_event_hour|std_event_hour|mean_revenue_1y|std_revenue_1y|forecast_mean_1y|forecast_std_1y|
+-----------+---+------+--------------------+------------------+------------------+--------------+--------------+-----------+-----------+----------+----------+-----------+-----------+-------------+-------------+---------------+--------------+---------------+--------------+----------------+---------------+
|0          |0  |0     |0                   |0                 |0              

In [75]:
null_counts = output_df.select([
    F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in output_df.columns
])

In [76]:
null_counts.show()

+------------+----------------+-------------+-----------+---+------+--------------------+------------------+------------------+--------------+--------------+-----------+-----------+----------+----------+-----------+-----------+-------------+-------------+---------------+--------------+---------------+--------------+----------------+---------------+
|cohort_index|first_event_date|cohort_season|cohort_size|iOS|iPadOS|avg_stickiness_ratio|avg_auto_renew_off|std_auto_renew_off|avg_free_trial|std_free_trial|avg_paywall|std_paywall|avg_refund|std_refund|avg_renewal|std_renewal|avg_subscribe|std_subscribe|mean_event_hour|std_event_hour|mean_revenue_1y|std_revenue_1y|forecast_mean_1y|forecast_std_1y|
+------------+----------------+-------------+-----------+---+------+--------------------+------------------+------------------+--------------+--------------+-----------+-----------+----------+----------+-----------+-----------+-------------+-------------+---------------+--------------+------------

In [77]:
output_df.show(10, truncate=False)

+------------+----------------+-------------+-----------+------------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+----------+----------+--------------------+-------------------+--------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|cohort_index|first_event_date|cohort_season|cohort_size|iOS               |iPadOS              |avg_stickiness_ratio|avg_auto_renew_off |std_auto_renew_off |avg_free_trial     |std_free_trial     |avg_paywall       |std_paywall       |avg_refund|std_refund|avg_renewal         |std_renewal        |avg_subscribe       |std_subscribe      |mean_event_hour   |std_event_hour    |mean_revenue_1y   |std_revenue_1y    |forecast_mean_1y  |forecast_std_1y   |
+------------+----------------+-------------+-----------+------------------+--------------

In [78]:
# 1) repartition to use all cores
num_parts = spark.sparkContext.defaultParallelism
df_out = output_df.repartition(num_parts)
print("After:", df_out.rdd.getNumPartitions())

After: 8


In [79]:
# 2) write as compressed JSON
df_out.write \
    .mode("overwrite") \
    .option("compression", "gzip") \
    .json("/Users/macbookpro/PyCharmMiscProject/output_cohort_json")