In [1]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col,when
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.ml.classification import GBTClassifier
from pyspark.sql import functions as F


spark = SparkSession.builder \
    .appName("Spotify Regression") \
    .config("spark.driver.memory", "15g") \
    .getOrCreate()

In [2]:
df = spark.read.parquet("data/df_clean_parquet")
df.printSchema()


root
 |-- spotify_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- daily_rank: float (nullable = true)
 |-- daily_movement: float (nullable = true)
 |-- weekly_movement: float (nullable = true)
 |-- country: string (nullable = true)
 |-- snapshot_date: string (nullable = true)
 |-- popularity: float (nullable = true)
 |-- is_explicit: string (nullable = true)
 |-- duration_ms: float (nullable = true)
 |-- album_name: string (nullable = true)
 |-- album_release_date: string (nullable = true)
 |-- danceability: float (nullable = true)
 |-- energy: float (nullable = true)
 |-- key: string (nullable = true)
 |-- loudness: float (nullable = true)
 |-- mode: string (nullable = true)
 |-- speechiness: float (nullable = true)
 |-- acousticness: float (nullable = true)
 |-- instrumentalness: float (nullable = true)
 |-- liveness: float (nullable = true)
 |-- valence: float (nullable = true)
 |-- tempo: float (nullable = true)
 |-- time

In [3]:
numeric_cols = [
    "daily_rank", "energy", "loudness", "speechiness",
    "acousticness", "instrumentalness", "liveness",
    "valence", "tempo"
]
label_col = "popularity_label"
popularity_threshold = 70

df = df.withColumn(
    label_col,
    F.when(F.col("popularity") >= F.lit(popularity_threshold), F.lit(1)).otherwise(F.lit(0))
).withColumn(label_col, F.col(label_col).cast(IntegerType()))

In [4]:
feature_cols = [c for c in numeric_cols if c != "popularity"]

In [5]:
def create_spotify_gbt_pipeline(
    numeric_features: list[str],
    label_col: str,
    max_iter: int
) -> Pipeline:

    vector_assembler = VectorAssembler(
        inputCols=numeric_features,
        outputCol="num_features"
    )

    scaler = MinMaxScaler(
        inputCol="num_features",
        outputCol="features"
    )

    gbt = GBTClassifier(
        featuresCol="features",
        labelCol=label_col,
        predictionCol="prediction",
        maxIter=max_iter,
        seed=42
    )

    pipeline = Pipeline(stages=[vector_assembler, scaler, gbt])
    return pipeline


pipeline = create_spotify_gbt_pipeline(
    numeric_features=feature_cols,
    label_col=label_col,
    max_iter=30
)

In [6]:

train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

In [7]:
print(f"Train dataset size: {train_df.count()}")
print(f"Test  dataset size: {test_df.count()}")

Train dataset size: 795388
Test  dataset size: 198883


In [8]:
param_grid = ParamGridBuilder() \
    .addGrid(pipeline.getStages()[-1].maxDepth, [3, 5]) \
    .addGrid(pipeline.getStages()[-1].stepSize, [0.05, 0.1]) \
    .addGrid(pipeline.getStages()[-1].maxIter, [30]) \
    .build()

cv_evaluator = BinaryClassificationEvaluator(
    labelCol=label_col,
    metricName="areaUnderROC"
)

cross_validator = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=param_grid,
    evaluator=cv_evaluator,
    numFolds=5,
)

In [9]:
cv_model = cross_validator.fit(train_df)




In [10]:
def get_best_model_params(cv_model: CrossValidatorModel) -> dict[str, float]:
    best_model = cv_model.bestModel
    best_stage = best_model.stages[-1]  # GBTClassifierModel
    return {
        "maxDepth": best_stage.getMaxDepth(),
        "stepSize": best_stage.getStepSize(),
        "maxIter": best_stage.getMaxIter()
    }

print("Лучшие параметры модели:")
for k, v in get_best_model_params(cv_model).items():
    print(f"{k}: {v}")


Лучшие параметры модели:
maxDepth: 5
stepSize: 0.1
maxIter: 30


In [11]:
test_df_predictions = cv_model.transform(test_df)

pred_pd = (
    test_df_predictions
    .select(label_col, "prediction", "rawPrediction")
    .limit(100)
    .toPandas()
)
print(pred_pd)

    popularity_label  prediction                                rawPrediction
0                  1         1.0    [-1.0965660276393883, 1.0965660276393883]
1                  1         1.0    [-1.0965660276393883, 1.0965660276393883]
2                  1         1.0    [-1.0965660276393883, 1.0965660276393883]
3                  1         1.0    [-1.0965660276393883, 1.0965660276393883]
4                  1         1.0    [-1.0965660276393883, 1.0965660276393883]
..               ...         ...                                          ...
95                 0         0.0      [0.710418835646897, -0.710418835646897]
96                 0         0.0    [0.6102443757214819, -0.6102443757214819]
97                 0         1.0  [-0.34375779585723115, 0.34375779585723115]
98                 1         1.0    [-0.6598621365216315, 0.6598621365216315]
99                 1         1.0    [-0.6598621365216315, 0.6598621365216315]

[100 rows x 3 columns]


In [12]:
auc = cv_evaluator.evaluate(test_df_predictions)
print(f"AUC (areaUnderROC): {auc:.4f}")

def evaluate_model_binary(data, label_col: str) -> dict[str, float]:
    tp = data.filter((F.col(label_col) == 1) & (F.col("prediction") == 1)).count()
    fp = data.filter((F.col(label_col) == 0) & (F.col("prediction") == 1)).count()
    fn = data.filter((F.col(label_col) == 1) & (F.col("prediction") == 0)).count()

    precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0
    f1 = (2 * precision * recall / (precision + recall)) if (precision + recall) > 0 else 0.0

    return {"precision": precision, "recall": recall, "f1": f1}

metrics = evaluate_model_binary(test_df_predictions, label_col)
print(f"Metrics: {metrics}")

AUC (areaUnderROC): 0.8631
Metrics: {'precision': 0.8178026832076314, 'recall': 0.9545900211862954, 'f1': 0.8809179576896674}
