In [10]:
# Подключение Google Диска (для доступа к файлу с датасетом)
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [11]:
# Импорт необходимых библиотек из PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, regexp_extract
from pyspark.sql.types import DoubleType

In [14]:
# Создание сессии Spark
spark = SparkSession.builder.appName("SpotifyTrends").getOrCreate()

# Загрузка CSV-файла с треками из Google Drive
df = spark.read.option("header", True).option("inferSchema", True).csv(
    "/content/drive/MyDrive/tracks.csv"
)

In [15]:
# Извлечение года из поля release_date, поддерживаются разные форматы:
# yyyy, yyyy-MM, yyyy-MM-dd, dd/MM/yyyy
df = df.withColumn(
    "release_year",
    when(col("release_date").rlike(r"^\d{4}$"),
         col("release_date").cast("int"))
    .when(col("release_date").rlike(r"^\d{4}-\d{2}$"),
          regexp_extract(col("release_date"), r"^(\d{4})", 1).cast("int"))
    .when(col("release_date").rlike(r"^\d{2}/\d{2}/\d{4}$"),
          regexp_extract(col("release_date"), r"(\d{4})$", 1).cast("int"))
    .when(col("release_date").rlike(r"^\d{4}-\d{2}-\d{2}$"),
          regexp_extract(col("release_date"), r"^(\d{4})", 1).cast("int"))
    .otherwise(None)
)

In [16]:
# Фильтрация строк с корректным годом выпуска
df = df.filter(col("release_year").isNotNull())

In [17]:
# Оставляем только треки, выпущенные с 1920 года
df = df.filter(col("release_year") >= 1920)

In [18]:
# Вычисление десятилетия для каждого трека (например, 1987 → 1980)
df = df.withColumn("decade", (col("release_year") / 10).cast("int") * 10)

In [19]:
# Список аудиопараметров, которые будут анализироваться
features = [
    "danceability", "energy", "loudness", "speechiness",
    "acousticness", "instrumentalness", "liveness", "valence", "tempo"
]

In [20]:
# Приведение каждого параметра к типу Double (для последующей агрегации)
for feature in features:
    df = df.withColumn(feature, col(feature).cast(DoubleType()))

In [21]:
# Группировка по десятилетиям и вычисление среднего значения для каждого параметра
agg_df = df.groupBy("decade").avg(*features).orderBy("decade")

In [22]:
# Сохранение результата в CSV-файл (в одну партицию, с заголовками)
agg_df.coalesce(1).write.option("header", True).mode("overwrite").csv("/content/drive/MyDrive/output/spotify_trends")

In [23]:
# Вывод полученной таблицы в консоль
agg_df.show()

+------+-------------------+-------------------+-------------------+-------------------+-------------------+---------------------+-------------------+------------------+------------------+
|decade|  avg(danceability)|        avg(energy)|      avg(loudness)|   avg(speechiness)|  avg(acousticness)|avg(instrumentalness)|      avg(liveness)|      avg(valence)|        avg(tempo)|
+------+-------------------+-------------------+-------------------+-------------------+-------------------+---------------------+-------------------+------------------+------------------+
|  1920| 0.6044980792001055|0.28125600052624633|-14.789149190895909|0.29171081436653223| 0.8901129887383201|  0.32611214068148864| 0.2046233258781741|0.5999094605972903|113.12064886199191|
|  1930| 0.5522532350216158|0.30676277509265004|-13.522755867819649|0.19452228227300855| 0.8693909407273038|  0.27482508944101464|0.21714564546016088| 0.572560382952439|112.83435013897511|
|  1940|0.47718324893090214| 0.2685315676344811|-14.196

In [24]:
# Завершение сессии Spark
spark.stop()