In [15]:
#!/usr/bin/env python
# train_lyrics_classifier.py
#
# Spark-ML pipeline for 7-genre lyric classification
# -- requires Spark 3.x + Python 3.8+

| Task                                                                             | Notes                                                                                                      |
| -------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------- |
| Install **Java 11**, **Spark 3.5 +**, **Hadoop winutils**, and **Python 3.10 +** | Verify `spark-shell` works from **any** directory (add `%SPARK_HOME%\bin` to `PATH`).                      |
| Create a fresh virtual-env                                                       | `python -m venv mlenv && mlenv\Scripts\activate && pip install pyspark==3.5.0 pandas matplotlib streamlit` |
| Create a Git repo just for the homework                                          | Makes ZIP assembly painless.                                                                               |


In [16]:
# 1. Imports & Spark session
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, trim, regexp_replace, length
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    RegexTokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
)
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pathlib, os

In [17]:
spark = SparkSession.builder.appName("Lyrics7Genres").getOrCreate()
print("Spark version:", spark.version)

Spark version: 3.5.5


In [18]:
# 2. Paths & config  ───────────────────────────────────────────────────────────
MEND_CSV  = "data/Merged_dataset.csv"   # change if needed
MODEL_DIR = "model_stage3_merged"
TRAIN_DIR = "data/train80"
TEST_DIR  = "data/test20"
SEED      = 42
os.makedirs("data", exist_ok=True)


In [19]:
# 3. Load & preview dataset  ───────────────────────────────────────────────────
# Reads the CSV into a Spark DataFrame.
df = (
    spark.read
         .option("header", "true")
         .option("multiLine", "true")   # preserves line breaks in lyrics
         .csv(MEND_CSV)
)
df.show(5, truncate=80)


+--------------------+--------------------+------------+-----+--------------------------------------------------------------------------------+
|         artist_name|          track_name|release_date|genre|                                                                          lyrics|
+--------------------+--------------------+------------+-----+--------------------------------------------------------------------------------+
|              mukesh|mohabbat bhi jhoothi|        1950|  pop|hold time feel break feel untrue convince speak voice tear try hold hurt try ...|
|       frankie laine|           i believe|        1950|  pop|believe drop rain fall grow believe darkest night candle glow believe go astr...|
|         johnnie ray|                 cry|        1950|  pop|sweetheart send letter goodbye secret feel better wake dream think real false...|
|         pérez prado|            patricia|        1950|  pop|kiss lips want stroll charm mambo chacha meringue heaven arm japan brag ge

lyrics ─▶ tokens ─▶ nostop ─▶ HashingTF ─▶ tf  (term-frequency vector)
                                                   │
                                                   ▼
                                                 IDF
                                                   │
                                                   ▼
                                               features


In [20]:
# 4. Minimal cleaning  ─────────────────────────────────────────────────────────
df = (
    df.select("artist_name", "track_name", "release_date", "genre", "lyrics")  #Keeps only the five columns required in the assignment.
      .withColumn("genre", trim(lower(col("genre"))))
      .withColumn("lyrics", regexp_replace(col("lyrics"), r"\s+", " "))
      .filter(length(col("lyrics")) > 0)
      .filter(col("release_date").rlike(r"^\d{4}$"))     # keep 4-digit year
)
print("Clean rows:", df.count())

Clean rows: 28522


In [21]:
# 5. 80 / 20 train-test split  ────────────────────────────────────────────────
train_df, test_df = df.randomSplit([0.8, 0.2], seed=SEED)
train_df.write.mode("overwrite").parquet(TRAIN_DIR)
test_df.write.mode("overwrite").parquet(TEST_DIR)
print("Train:", train_df.count(), " Test:", test_df.count())

Train: 22846  Test: 5676


In [22]:
tokenizer = RegexTokenizer(inputCol="lyrics", outputCol="tokens", pattern="\\W")
stop_rm   = StopWordsRemover(inputCol="tokens", outputCol="nostop")
tf        = HashingTF(inputCol="nostop", outputCol="tf", numFeatures=1 << 18)
idf       = IDF(inputCol="tf", outputCol="features")
lab       = StringIndexer(inputCol="genre", outputCol="label") # The StringIndexer scans every distinct genre, assigns each one an integer ID (0-based), and writes those integers into a new column called label.
clf       = LogisticRegression(maxIter=30, regParam=0.3, elasticNetParam=0.1)  # uses 'features'

| Stage                  | What it does                                             | Key params                                                   |
| ---------------------- | -------------------------------------------------------- | ------------------------------------------------------------ |
| **RegexTokenizer**     | Splits text on non-word chars to produce tokens.         | `pattern="\\W"` (= any char that’s *not* a letter/digit).    |
| **StopWordsRemover**   | Drops English stop words (“the”, “and”…).                | Default list; you can tweak for music jargon.                |
| **HashingTF**          | Maps tokens → sparse term-frequency vectors via hashing. | `numFeatures=2¹⁸` (≈ 260 K dims) limits collisions.          |
| **IDF**                | Re-weights TF vectors by inverse-document frequency.     | `outputCol="features"` so the classifier can find it.        |
| **StringIndexer**      | Converts genre strings → numeric labels 0–6.             | Needed for any Spark classifier.                             |
| **LogisticRegression** | Multi-class (One-Vs-Rest) linear model.                  | `regParam` + `elasticNetParam` control L2/L1 regularisation. |
| **Pipeline**           | Chains everything so you fit+transform in one shot.      | —                                                            |

In [23]:
# 7. Train model  ──────────────────────────────────────────────────────────────
pipe = Pipeline(stages=[tokenizer, stop_rm, tf, idf, lab, clf])
model = pipe.fit(train_df)

In [24]:
# 8. Evaluate  ─────────────────────────────────────────────────────────────────
preds = model.transform(test_df)
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy"
)
acc = evaluator.evaluate(preds)
print(f"Test accuracy: {acc:.4f}")

Test accuracy: 0.2419


In [25]:
# 9. Save artefacts  ───────────────────────────────────────────────────────────
model.write().overwrite().save(MODEL_DIR)
print("Model saved to", pathlib.Path(MODEL_DIR).resolve())

Model saved to C:\Users\DELL\Desktop\Sem 8\1. Big Data Analytics\MLlibAssignment\submission1\model_stage3_merged


In [26]:
# 10. Stop Spark  ──────────────────────────────────────────────────────────────
spark.stop()

# Taras’s method in Python

In [2]:
# 1. Imports & Spark session
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, trim, regexp_replace, length
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    RegexTokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
)
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pathlib, os

In [3]:
spark = SparkSession.builder.appName("Lyrics7Genres").getOrCreate()
print("Spark version:", spark.version)

Spark version: 3.5.5


In [4]:
# 2. Paths & config  ───────────────────────────────────────────────────────────
MEND_CSV  = "data/Merged_dataset.csv"   # change if needed
TRAIN_DIR = "data/train80"
TEST_DIR  = "data/test20"
SEED      = 42
os.makedirs("data", exist_ok=True)


In [5]:
# 3. Load & preview dataset  ───────────────────────────────────────────────────
# Reads the CSV into a Spark DataFrame.
df = (
    spark.read
         .option("header", "true")
         .option("multiLine", "true")   # preserves line breaks in lyrics
         .csv(MEND_CSV)
)
df.show(5, truncate=80)


+--------------------+--------------------+------------+-----+--------------------------------------------------------------------------------+
|         artist_name|          track_name|release_date|genre|                                                                          lyrics|
+--------------------+--------------------+------------+-----+--------------------------------------------------------------------------------+
|              mukesh|mohabbat bhi jhoothi|        1950|  pop|hold time feel break feel untrue convince speak voice tear try hold hurt try ...|
|       frankie laine|           i believe|        1950|  pop|believe drop rain fall grow believe darkest night candle glow believe go astr...|
|         johnnie ray|                 cry|        1950|  pop|sweetheart send letter goodbye secret feel better wake dream think real false...|
|         pérez prado|            patricia|        1950|  pop|kiss lips want stroll charm mambo chacha meringue heaven arm japan brag ge

In [6]:
# 4. Minimal cleaning  ─────────────────────────────────────────────────────────
df = (
    df.select("artist_name", "track_name", "release_date", "genre", "lyrics")  #Keeps only the five columns required in the assignment.
      .withColumn("genre", trim(lower(col("genre"))))
      .withColumn("lyrics", regexp_replace(col("lyrics"), r"\s+", " "))
      .filter(length(col("lyrics")) > 0)
      .filter(col("release_date").rlike(r"^\d{4}$"))     # keep 4-digit year
)
print("Clean rows:", df.count())

Clean rows: 28522


In [7]:
# 5. 80 / 20 train-test split  ────────────────────────────────────────────────
train_df, test_df = df.randomSplit([0.8, 0.2], seed=SEED)
train_df.write.mode("overwrite").parquet(TRAIN_DIR)
test_df.write.mode("overwrite").parquet(TEST_DIR)
print("Train:", train_df.count(), " Test:", test_df.count())

Train: 22846  Test: 5676


In [8]:
# 6. Build feature pipeline  (same as before, up to IDF) -----------------------
tokenizer = RegexTokenizer(inputCol="lyrics", outputCol="tokens", pattern="\\W")
stop_rm   = StopWordsRemover(inputCol="tokens", outputCol="nostop")
tf        = HashingTF(inputCol="nostop", outputCol="tf", numFeatures=1 << 18)
idf       = IDF(inputCol="tf", outputCol="features")
lab       = StringIndexer(inputCol="genre", outputCol="label")

# 6b. Classifier + param grid --------------------------------------------------
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

lr = LogisticRegression(
        maxIter=100,
        elasticNetParam=1.0,      # pure L1 like Taras
        featuresCol="features",
        labelCol="label"
)

paramGrid = (
    ParamGridBuilder()
      .addGrid(lr.regParam, [0.01, 0.05, 0.1, 0.3])
      .build()
)

evaluator = MulticlassClassificationEvaluator(metricName="f1")

tvs = TrainValidationSplit(
        estimator=lr,
        estimatorParamMaps=paramGrid,
        evaluator=evaluator,
        trainRatio=0.8,           # 80 % of train_df used for fitting,
        seed=SEED                 # 20 % for validation (like his 70/30)
)

pipe = Pipeline(stages=[tokenizer, stop_rm, tf, idf, lab, tvs])


In [9]:
# 7. Train model (unchanged call, but now includes tuning) ---------------------
model = pipe.fit(train_df)

best_lr = model.stages[-1].bestModel
print("Best regParam:", best_lr._java_obj.getRegParam())

Best regParam: 0.01


In [10]:
# 8. Evaluate on the held-out test set -----------------------------------------
preds = model.transform(test_df)
for m in ["accuracy", "f1", "weightedPrecision", "weightedRecall"]:
    val = evaluator.setMetricName(m).evaluate(preds)
    print(f"{m:>18}: {val:.4f}")

          accuracy: 0.3129
                f1: 0.2490
 weightedPrecision: 0.4631
    weightedRecall: 0.3129


In [11]:
import os, sys
# absolute path of the conda-env python that Jupyter is using now
PY = sys.executable                        # e.g. C:\Users\DELL\anaconda3\envs\music-genre\python.exe

os.environ["PYSPARK_PYTHON"]        = PY   # workers
os.environ["PYSPARK_DRIVER_PYTHON"] = PY   # driver  (needed mainly when you use spark-submit)

from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("Lyrics7Genres")
         .config("spark.pyspark.python", PY)
         .config("spark.pyspark.driver.python", PY)
         .getOrCreate())

print("Driver python :", sys.executable)
print("Worker python :", spark.sparkContext.pythonExec)   # should match now


Driver python : c:\Users\DELL\anaconda3\envs\music-genre\python.exe
Worker python : c:\Users\DELL\anaconda3\envs\music-genre\python.exe


In [13]:
import pathlib, shutil

# 1️⃣  Path:  `<notebook directory>/model_stage2`
save_dir = pathlib.Path.cwd() / "model_stage4_merged_Trans_way_new"

# 2️⃣  Remove any previous run so .overwrite() won’t clash with a *file*
if save_dir.exists():
    shutil.rmtree(save_dir)

# 3️⃣  Persist the fitted pipeline
# Spark is happy with either a plain absolute path or a file:// URI.
# We'll use the plain path to keep it readable.
model.write().overwrite().save(str(save_dir))

print("✅  Model saved to:", save_dir.resolve())


✅  Model saved to: C:\Users\DELL\Desktop\Sem 8\1. Big Data Analytics\MLlibAssignment\submission1\model_stage4_merged_Trans_way_new


In [14]:
from pyspark.ml import PipelineModel
reloaded = PipelineModel.load(f"model_stage2_Trans_way")
print("Reload OK, stages:", len(reloaded.stages))


Reload OK, stages: 6
