In [1]:
import os
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

# --- ключевая конфигурация ---
spark = (
    SparkSession.builder
    .appName("MusicProject")
    .getOrCreate()
)

print("✅ SparkSession активна:", spark.version)


✅ SparkSession активна: 4.0.1


In [2]:
df = spark.read.json("C:\\Users\\Asus\\Desktop\\music_classifier\\data\\raw_data\\900k Definitive Spotify Dataset.json")  # inferSchema - авто присвоение типов данных


In [3]:
df.printSchema()

root
 |-- Acousticness: string (nullable = true)
 |-- Album: string (nullable = true)
 |-- Artist(s): string (nullable = true)
 |-- Danceability: string (nullable = true)
 |-- Energy: string (nullable = true)
 |-- Explicit: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Good for Driving: long (nullable = true)
 |-- Good for Exercise: long (nullable = true)
 |-- Good for Morning Routine: long (nullable = true)
 |-- Good for Party: long (nullable = true)
 |-- Good for Relaxation/Meditation: long (nullable = true)
 |-- Good for Running: long (nullable = true)
 |-- Good for Social Gatherings: long (nullable = true)
 |-- Good for Work/Study: long (nullable = true)
 |-- Good for Yoga/Stretching: long (nullable = true)
 |-- Instrumentalness: string (nullable = true)
 |-- Key: string (nullable = true)
 |-- Length: string (nullable = true)
 |-- Liveness: string (nullable = true)
 |-- Loudness (db): double (nullable = true)
 |-- Popularity: string (nullable = true)
 |-- Posit

In [4]:
df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns]).show(1, False, True)

-RECORD 0--------------------------------
 Acousticness                   | 0      
 Album                          | 0      
 Artist(s)                      | 0      
 Danceability                   | 0      
 Energy                         | 0      
 Explicit                       | 0      
 Genre                          | 0      
 Good for Driving               | 0      
 Good for Exercise              | 0      
 Good for Morning Routine       | 0      
 Good for Party                 | 0      
 Good for Relaxation/Meditation | 0      
 Good for Running               | 0      
 Good for Social Gatherings     | 0      
 Good for Work/Study            | 0      
 Good for Yoga/Stretching       | 0      
 Instrumentalness               | 0      
 Key                            | 0      
 Length                         | 0      
 Liveness                       | 0      
 Loudness (db)                  | 0      
 Popularity                     | 0      
 Positiveness                   | 

In [5]:
df.columns
cols_for_train = ["text", "Genre", "Acousticness", "Danceability", "Energy",
 "Instrumentalness", "Liveness", "Loudness (db)", "Positiveness",
 "Speechiness", "Tempo", "emotion"]
df.select(*cols_for_train).show(1, False, True)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [6]:
for col_name in cols_for_train:
    count_unique = df.select(col_name).distinct().count()
    print(f"{col_name}: {count_unique}")


text: 497448
Genre: 3097
Acousticness: 101
Danceability: 94
Energy: 101
Instrumentalness: 101
Liveness: 100
Loudness (db): 3745
Positiveness: 101
Speechiness: 96
Tempo: 170
emotion: 13


In [7]:
df.select("Genre").show(10)

+-------+
|  Genre|
+-------+
|hip hop|
|hip hop|
|hip hop|
|hip hop|
|hip hop|
|hip hop|
|hip hop|
|hip hop|
|hip hop|
|hip hop|
+-------+


In [8]:
# 3097 очистка жанров от лишнего (пробелы, пропуски, лишние символы)
print("До очистки:", df.select("Genre").distinct().count())
df = df.withColumn(
    "Genre",
    F.split(F.lower(F.col("Genre")), "[,|/]").getItem(0)
)

До очистки: 3097


In [9]:
print("После очистки:", df.select("Genre").distinct().count())

После очистки: 88


In [10]:
df = df.withColumnRenamed("Loudness (db)", "Loudness_(db)")

In [11]:
from pyspark.sql.types import DoubleType
numeric_cols = [
    "Acousticness", "Danceability", "Energy", "Instrumentalness",
    "Liveness", "Loudness_(db)", "Positiveness", "Speechiness", "Tempo"
]
for c in numeric_cols:
    df = df.withColumn(c, F.col(c).cast(DoubleType()))

In [12]:
df.select(*numeric_cols).printSchema()


root
 |-- Acousticness: double (nullable = true)
 |-- Danceability: double (nullable = true)
 |-- Energy: double (nullable = true)
 |-- Instrumentalness: double (nullable = true)
 |-- Liveness: double (nullable = true)
 |-- Loudness_(db): double (nullable = true)
 |-- Positiveness: double (nullable = true)
 |-- Speechiness: double (nullable = true)
 |-- Tempo: double (nullable = true)


In [13]:
df.select([
    F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in numeric_cols
]).show()

+------------+------------+------+----------------+--------+-------------+------------+-----------+-----+
|Acousticness|Danceability|Energy|Instrumentalness|Liveness|Loudness_(db)|Positiveness|Speechiness|Tempo|
+------------+------------+------+----------------+--------+-------------+------------+-----------+-----+
|           0|           0|     0|               0|       0|            0|           0|          0|    0|
+------------+------------+------+----------------+--------+-------------+------------+-----------+-----+


In [14]:
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
assembler = VectorAssembler(
    inputCols=numeric_cols,
    outputCol="numeric_features"
)
df_vector = assembler.transform(df)

In [15]:
scaler = MinMaxScaler(
    inputCol="numeric_features",
    outputCol="scaled_features"
)

scaler_model = scaler.fit(df_vector)
df_scaled = scaler_model.transform(df_vector)

In [None]:
# 3. Разворачиваем scaled_features в отдельные колонки
for i, col_name in enumerate(numeric_cols):
    df_scaled = df_scaled.withColumn(col_name + "_scaled", F.col("scaled_features")[i])

In [16]:
df_scaled.select("scaled_features").show(5, truncate=False)


+------------------------------------------------------------------------------------------------------------------------+
|scaled_features                                                                                                         |
+------------------------------------------------------------------------------------------------------------------------+
|[0.11,0.6989247311827957,0.8300000000000001,0.0,0.15151515151515152,0.785065407,0.87,0.021052631578947368,0.4378698225] |
|[0.0,0.6881720430107527,0.85,0.0,0.31313131313131315,0.8050508721,0.87,0.021052631578947368,0.5088757396]               |
|[0.0,0.6989247311827957,0.89,0.2,0.6363636363636365,0.7994186047,0.63,0.06315789473684211,0.5325443787]                 |
|[0.12,0.7741935483870969,0.84,0.0,0.11111111111111112,0.8110465116,0.97,0.021052631578947368,0.5384615385]              |
|[0.04,0.7634408602150539,0.71,0.01,0.09090909090909091,0.8083212209,0.7000000000000001,0.05263157894736842,0.5443786982]|
+---------------

In [17]:
df_scaled.printSchema()

root
 |-- Acousticness: double (nullable = true)
 |-- Album: string (nullable = true)
 |-- Artist(s): string (nullable = true)
 |-- Danceability: double (nullable = true)
 |-- Energy: double (nullable = true)
 |-- Explicit: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Good for Driving: long (nullable = true)
 |-- Good for Exercise: long (nullable = true)
 |-- Good for Morning Routine: long (nullable = true)
 |-- Good for Party: long (nullable = true)
 |-- Good for Relaxation/Meditation: long (nullable = true)
 |-- Good for Running: long (nullable = true)
 |-- Good for Social Gatherings: long (nullable = true)
 |-- Good for Work/Study: long (nullable = true)
 |-- Good for Yoga/Stretching: long (nullable = true)
 |-- Instrumentalness: double (nullable = true)
 |-- Key: string (nullable = true)
 |-- Length: string (nullable = true)
 |-- Liveness: double (nullable = true)
 |-- Loudness_(db): double (nullable = true)
 |-- Popularity: string (nullable = true)
 |-- Posit

In [18]:
df_scaled.select("emotion").distinct().show(truncate=False)


+---------+
|emotion  |
+---------+
|joy      |
|love     |
|angry    |
|anger    |
|fear     |
|surprise |
|sadness  |
|True     |
|confusion|
|thirst   |
|pink     |
|interest |
|Love     |
+---------+


In [19]:
df_scaled.groupBy("emotion").count().orderBy("count", ascending=False).show(truncate=False)


+---------+------+
|emotion  |count |
+---------+------+
|joy      |189371|
|sadness  |156813|
|anger    |95489 |
|fear     |26001 |
|love     |25379 |
|surprise |4974  |
|True     |17    |
|pink     |2     |
|Love     |2     |
|angry    |1     |
|confusion|1     |
|thirst   |1     |
|interest |1     |
+---------+------+


In [20]:
valid_emotions = ["joy", "sadness", "anger", "fear", "love", "surprise"]

df_scaled = df_scaled.withColumn("emotion", F.lower(F.trim(F.col("emotion"))))  # на всякий случай к нижнему регистру
df_scaled = df_scaled.filter(F.col("emotion").isin(valid_emotions))

In [21]:
df_scaled.groupBy("emotion").count().orderBy("count", ascending=False).show(truncate=False)


+--------+------+
|emotion |count |
+--------+------+
|joy     |189371|
|sadness |156813|
|anger   |95489 |
|fear    |26001 |
|love    |25381 |
|surprise|4974  |
+--------+------+


In [21]:
# 4. Выбираем нужные столбцы (включая новые)
cols_for_train_clear = [
    "text", "Genre", "emotion"
] + numeric_cols + [col + "_scaled" for col in numeric_cols]

df_pd = df_scaled.select(*cols_for_train_clear).toPandas()
df_pd.to_csv(r"C:\Users\Asus\Desktop\music_classifier\data\processed_data\songs_clean_full.csv", index=False)


ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "C:\Users\Asus\Desktop\music_classifier\music\Lib\site-packages\py4j\clientserver.py", line 535, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Asus\AppData\Local\Programs\Python\Python312\Lib\socket.py", line 707, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
ConnectionResetError: [WinError 10054] Удаленный хост принудительно разорвал существующее подключение

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\Asus\Desktop\music_classifier\music\Lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Asus\Desktop\music_classifier\music\Lib\site-packages\py4j\clientserver.py", line 566,

ConnectionRefusedError: [WinError 10061] Подключение не установлено, т.к. конечный компьютер отверг запрос на подключение

ConnectionRefusedError: [WinError 10061] Подключение не установлено, т.к. конечный компьютер отверг запрос на подключение

In [None]:
#df.show(1, truncate=False)  # Если длинно
# df.show(2, False, True)  # Если очень длинно

In [None]:
# df.select("song").show(10)

In [None]:
# df.printSchema()

In [None]:
# df.describe().show(1, False, True)

In [None]:
# df.select(
#     F.sum(
#         F.when((F.col("Popularity").isNull()) | (F.col("Popularity") == "") | (F.col("Popularity") == " "), 1)
#         .otherwise(0)
#     ).alias("missing_count")
# ).show()

In [None]:
# df.groupBy(F.trim(F.col("Popularity")).alias("Popularity_clean")) \
#   .count() \
#   .orderBy("Popularity_clean") \
#   .show(100, truncate=False)

In [None]:
# df = df.withColumn("Popularity", F.col("Popularity").cast("int"))
# df.printSchema()

In [None]:
# df.filter(F.col("Popularity") == 100).select("song", "Artist(s)", "Genre", "Popularity").show(100, truncate=False)


In [None]:
# df.filter(F.col("Artist(s)").rlike("(?i)ed sheeran")).show(10, False, True)