In [1]:
from pyspark.sql import SparkSession

# Initialize Spark
spark = SparkSession.builder.appName("HitSongPrediction").getOrCreate()

25/05/16 02:34:13 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
# Load CSV into Spark DataFrame
data_set = "gs://dataproc-staging-us-central1-678565301111-3rjy5jav/google-cloud-dataproc-metainfo/dataset.csv"

df = spark.read.csv(data_set, header=True, inferSchema=True)

                                                                                

In [3]:
# Show metadata
df.printSchema()

df.show(5)

print(f"Total rows: {df.count()}")

root
 |-- _c0: integer (nullable = true)
 |-- track_id: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- album_name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- duration_ms: string (nullable = true)
 |-- explicit: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- key: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- speechiness: string (nullable = true)
 |-- acousticness: string (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: string (nullable = true)
 |-- valence: string (nullable = true)
 |-- tempo: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- track_genre: string (nullable = true)



                                                                                

+---+--------------------+--------------------+--------------------+--------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+-----------+
|_c0|            track_id|             artists|          album_name|          track_name|popularity|duration_ms|explicit|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|track_genre|
+---+--------------------+--------------------+--------------------+--------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+-----------+
|  0|5SuOikwiRyPMVoIQD...|         Gen Hoshino|              Comedy|              Comedy|        73|     230666|   False|       0.676| 0.461|  1|  -6.746|   0|      0.143|      0.0322|         1.01E-6|   0.358|  0.715| 87.917|           4



Total rows: 114000


                                                                                

In [4]:
from pyspark.sql.functions import col

df = df.withColumn("energy", col("energy").cast("double"))
df = df.withColumn("explicit", col("explicit").cast("integer"))
df = df.withColumn("danceability", col("danceability").cast("double"))
df = df.withColumn("loudness", col("loudness").cast("double"))
df = df.withColumn("liveness", col("liveness").cast("double"))
df = df.withColumn("valence", col("valence").cast("double"))
df = df.withColumn("speechiness", col("speechiness").cast("double"))


# Define hit songs based on popularity threshold
df = df.withColumn("hit", (col("popularity") >= 75).cast("integer"))

In [5]:
# Balance dataset
hits = df.filter(col("hit") == 1)
non_hits = df.filter(col("hit") == 0).sample(withReplacement=False, fraction=hits.count() / df.count())

df_balanced = hits.union(non_hits)

                                                                                

In [6]:
# Check dataset balance
print(f"Balanced dataset size: {df_balanced.count()}")



Balanced dataset size: 5629




In [7]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

# Convert categorical track genre into numerical indices
indexer = StringIndexer(inputCol="track_genre", outputCol="track_genre_index")

encoder = OneHotEncoder(inputCol="track_genre_index", outputCol="track_genre_encoded")

In [8]:
# Assemble features into a vector for ML model
features = ['energy', 'explicit', 'danceability', 'loudness', 'liveness', 'tempo', 'instrumentalness', 'valence', 'speechiness']

assembler = VectorAssembler(inputCols=features + ['track_genre_encoded'], outputCol="features")

In [9]:
trainDF, testDF = df_balanced.randomSplit([0.75, 0.25], seed=42)

In [10]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

# Initialize model
model = LogisticRegression(featuresCol="features", labelCol="hit", maxIter=1000)

In [11]:
# Create pipeline
pipeline = Pipeline(stages=[indexer, encoder, assembler, model])

In [None]:
# Train model
model_fit = pipeline.fit(trainDF)

In [None]:
# Predict hit songs on test data
predictions = model_fit.transform(testDF)

In [None]:
# Evaluate model
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="hit", predictionCol="prediction", metricName="accuracy")
accuracy = accuracy_evaluator.evaluate(predictions)

print(f"Model Accuracy: {accuracy:.3f}")