In [1]:
!pip install -U pyspark==3.2.2
!pip install -U delta-spark

Collecting pyspark==3.2.2
  Downloading pyspark-3.2.2.tar.gz (281.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.5/281.5 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5 (from pyspark==3.2.2)
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m12.7 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.2-py2.py3-none-any.whl size=281969433 sha256=418df5a3dfc0386ec70c758ecc380fe10f9462039e17f26096cfd78f9a310459
  Stored in directory: /root/.cache/pip/wheels/84/42/fa/5339cf0197ee3f87cf713e440a581889f343da6d24e04e866a
Successfully built pyspark
Installing collected packages: py4j, pyspark
 

In [2]:
from delta import configure_spark_with_delta_pip
from tempfile import TemporaryDirectory
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml.functions import vector_to_array

In [3]:
def config_spark():
    tmpdir = TemporaryDirectory()
    builder = (
        SparkSession.builder.master("local[*]")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config("spark.sql.warehouse.dir", f"file:///{tmpdir.name}")
        .config("spark.executor.memory", "4g")  # Increase executor memory
        .config("spark.driver.memory", "4g")    # Increase driver memory
    )

    return configure_spark_with_delta_pip(builder).getOrCreate()

In [4]:
spark = config_spark()

## count the duplicates

In [5]:
songs_df = spark.read.csv('spotify_data.csv', header=True, inferSchema=True)


In [None]:
songs_df.show(truncate=False)

+---+---------------------+--------------------------------------------------------------+----------------------+----------+----+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+-----------+--------------+
|_c0|artist_name          |track_name                                                    |track_id              |popularity|year|genre   |danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|tempo  |duration_ms|time_signature|
+---+---------------------+--------------------------------------------------------------+----------------------+----------+----+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+-----------+--------------+
|0  |Jason Mraz           |I Won't Give Up                                               |53QF56cjZA9RTuuMZDrSA6|68        |2012|acoustic|0.483       |0.303 |4  |-10.058 |1   |0.0429     

In [None]:
filtered_df = songs_df.groupBy("track_id").count()\
.filter("count > 1")

In [None]:
filtered_df.show(truncate=False)

+------------------------------+-----+
|track_id                      |count|
+------------------------------+-----+
| Mimì)"                       |2    |
| Woglinde                     |2    |
| Vol. 2"""                    |7    |
| 1964"                        |9    |
| Chorus)"                     |20   |
| still                        |2    |
| 1965"""                      |2    |
| Jesus)"                      |3    |
| Violetta                     |3    |
| mein Hammer                  |2    |
| Alphise                      |2    |
| le jasmin"" (Lakmé           |3    |
| Op. 55                       |2    |
| Gemahl                       |2    |
| Suzuki                       |3    |
| je veux vivre dans ce reve"""|3    |
| dit-elle                     |2    |
| la paterna mano"""           |2    |
| o cara                       |2    |
| Suzuki)"                     |3    |
+------------------------------+-----+
only showing top 20 rows



In [None]:
 filtered_df.write.mode("overwrite").csv('dup_track_id.csv', header=True)

## delete the instances of the duplicates

> Add blockquote




In [None]:
songs_df = spark.read.csv('spotify_data.csv', header=True, inferSchema=True)

In [None]:
duplicate_track_id_df = spark.read.csv('dup_track_id.csv', header=True, inferSchema=True)

In [None]:
unique_tracks_df = songs_df.join(duplicate_track_id_df, ["track_id"], "left_anti")

NameError: name 'songs_df' is not defined

In [None]:
unique_tracks_df.show(truncate=False)

In [None]:
 unique_tracks_df.write.mode("overwrite").csv('./data_no_duplicates.csv', header=True)

### add year bucketing features

In [None]:
no_duplicate_df = spark.read.csv('data_no_duplicates.csv', header=True, inferSchema=True)

In [None]:
df_with_year_bucketing_features = no_duplicate_df.fillna(0, subset='year') \
            .withColumn('year_2000_2004', F.when(F.col('year').between(2000, 2004), 1).otherwise(0)) \
            .withColumn('year_2005_2009', F.when(F.col('year').between(2005, 2009), 1).otherwise(0)) \
            .withColumn('year_2010_2014', F.when(F.col('year').between(2010, 2014), 1).otherwise(0)) \
            .withColumn('year_2015_2019', F.when(F.col('year').between(2015, 2019), 1).otherwise(0)) \
            .withColumn('year_2020_2024', F.when(F.col('year').between(2020, 2024), 1).otherwise(0))

In [None]:
df_with_year_bucketing_features.select("track_id", "year", "year_2000_2004", "year_2005_2009", "year_2010_2014", "year_2015_2019", "year_2020_2024").show(truncate=False)

+----------------------+----+--------------+--------------+--------------+--------------+--------------+
|track_id              |year|year_2000_2004|year_2005_2009|year_2010_2014|year_2015_2019|year_2020_2024|
+----------------------+----+--------------+--------------+--------------+--------------+--------------+
|53QF56cjZA9RTuuMZDrSA6|2012|0             |0             |1             |0             |0             |
|1s8tP3jP4GZcyHDsjvw218|2012|0             |0             |1             |0             |0             |
|7BRCa8MPiyuvr2VU3O9W0F|2012|0             |0             |1             |0             |0             |
|63wsZUhUZLlh1OsyrZq7sz|2012|0             |0             |1             |0             |0             |
|6nXIYClvJAfi6ujLiKqEq8|2012|0             |0             |1             |0             |0             |
|24NvptbNKGs6sPy1Vh1O0v|2012|0             |0             |1             |0             |0             |
|0BP7hSvLAG3URGrEvNNbGM|2012|0             |0          

In [None]:
df_with_year_bucketing_features.write.mode("overwrite").csv('./data_with_year_bucketing_features.csv', header=True)

### adding new scaling for the following features "popularity", "loudness", "tempo"

In [None]:
df_with_year_bucketing_features = spark.read.csv('data_with_year_bucketing_features.csv', header=True, inferSchema=True)

In [None]:
# Define min row and max orox for the following features:  ["popularity", "loudness", "tempo"]
data = [
    {"popularity": 0, "loudness": -60, "tempo": 0},
    {"popularity": 100, "loudness": 0, "tempo": 250},
]

min_max_features = spark.createDataFrame(data=data)

In [None]:
min_max_features.show()

+--------+----------+-----+
|loudness|popularity|tempo|
+--------+----------+-----+
|     -60|         0|    0|
|       0|       100|  250|
+--------+----------+-----+



In [None]:
df_with_min_max = df_with_year_bucketing_features.unionByName(min_max_features, allowMissingColumns=True)

In [None]:
df_with_min_max.show(truncate=False)

+----------------------+---+---------------------+--------------------------------------------------------------+----------+----+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+-----------+--------------+--------------+--------------+--------------+--------------+--------------+
|track_id              |_c0|artist_name          |track_name                                                    |popularity|year|genre   |danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|tempo  |duration_ms|time_signature|year_2000_2004|year_2005_2009|year_2010_2014|year_2015_2019|year_2020_2024|
+----------------------+---+---------------------+--------------------------------------------------------------+----------+----+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+-----------+--------------+--------------+--------------+--------

In [None]:
df_with_min_max.printSchema()

root
 |-- track_id: string (nullable = true)
 |-- _c0: integer (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- year: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- key: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- speechiness: string (nullable = true)
 |-- acousticness: string (nullable = true)
 |-- instrumentalness: string (nullable = true)
 |-- liveness: string (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- duration_ms: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- year_2000_2004: integer (nullable = true)
 |-- year_2005_2009: integer (nullable = true)
 |-- year_2010_2014: integer (nullable = true)
 |-- year_2015_2019: integer (nullable = true

In [None]:
df_with_min_max = (
    df_with_min_max
    .withColumn("popularity", F.col("popularity").cast("int"))
    .withColumn("loudness", F.col("loudness").cast("int"))
    .withColumn("tempo", F.col("tempo").cast("int"))
)

In [None]:
df_with_min_max = df_with_min_max.where(F.col('popularity').isNotNull() &
                                  F.col('loudness').isNotNull() &
                                  F.col('tempo').isNotNull())

In [None]:
scale_columns = ["popularity", "loudness", "tempo"]

# Assemble all columns into a single feature vector
assembler = VectorAssembler(inputCols=scale_columns, outputCol="features_vector")
df_with_feature_vector = assembler.transform(df_with_min_max)

In [None]:
df_with_feature_vector.select("popularity", "loudness", "tempo", "features_vector").show(truncate=False)

+----------+--------+-----+------------------+
|popularity|loudness|tempo|features_vector   |
+----------+--------+-----+------------------+
|68        |-10     |133  |[68.0,-10.0,133.0]|
|50        |-10     |140  |[50.0,-10.0,140.0]|
|57        |-13     |139  |[57.0,-13.0,139.0]|
|58        |-9      |204  |[58.0,-9.0,204.0] |
|54        |-5      |171  |[54.0,-5.0,171.0] |
|48        |-6      |83   |[48.0,-6.0,83.0]  |
|48        |-8      |121  |[48.0,-8.0,121.0] |
|45        |-7      |138  |[45.0,-7.0,138.0] |
|44        |-7      |84   |[44.0,-7.0,84.0]  |
|58        |-10     |129  |[58.0,-10.0,129.0]|
|45        |-10     |91   |[45.0,-10.0,91.0] |
|45        |-6      |121  |[45.0,-6.0,121.0] |
|55        |-9      |136  |[55.0,-9.0,136.0] |
|40        |-8      |79   |[40.0,-8.0,79.0]  |
|44        |-7      |74   |[44.0,-7.0,74.0]  |
|41        |-11     |81   |[41.0,-11.0,81.0] |
|57        |-10     |140  |[57.0,-10.0,140.0]|
|39        |-8      |89   |[39.0,-8.0,89.0]  |
|49        |-

In [None]:
# Scale the features
scaler = MinMaxScaler(inputCol="features_vector", outputCol="normalized_vector")
scaler_model = scaler.fit(df_with_feature_vector)
df_with_model_results = scaler_model.transform(df_with_feature_vector)

In [None]:
df_with_model_results.select("popularity", "loudness", "tempo", "features_vector", "normalized_vector").show(truncate=False)

+----------+--------+-----+------------------+----------------------------------------------+
|popularity|loudness|tempo|features_vector   |normalized_vector                             |
+----------+--------+-----+------------------+----------------------------------------------+
|68        |-10     |133  |[68.0,-10.0,133.0]|[0.68,0.7575757575757576,0.532]               |
|50        |-10     |140  |[50.0,-10.0,140.0]|[0.5,0.7575757575757576,0.56]                 |
|57        |-13     |139  |[57.0,-13.0,139.0]|[0.5700000000000001,0.7121212121212122,0.556] |
|58        |-9      |204  |[58.0,-9.0,204.0] |[0.58,0.7727272727272727,0.8160000000000001]  |
|54        |-5      |171  |[54.0,-5.0,171.0] |[0.54,0.8333333333333334,0.684]               |
|48        |-6      |83   |[48.0,-6.0,83.0]  |[0.48,0.8181818181818182,0.332]               |
|48        |-8      |121  |[48.0,-8.0,121.0] |[0.48,0.7878787878787878,0.484]               |
|45        |-7      |138  |[45.0,-7.0,138.0] |[0.45,0.803030

In [None]:
df_with_model_results=df_with_model_results.filter(F.col('track_id').isNotNull())

In [None]:
df_with_model_results.show(truncate=False)

+----------------------+---+---------------------+--------------------------------------------------------------+----------+----+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-----+-----------+--------------+--------------+--------------+--------------+--------------+--------------+------------------+----------------------------------------------+
|track_id              |_c0|artist_name          |track_name                                                    |popularity|year|genre   |danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|tempo|duration_ms|time_signature|year_2000_2004|year_2005_2009|year_2010_2014|year_2015_2019|year_2020_2024|features_vector   |normalized_vector                             |
+----------------------+---+---------------------+--------------------------------------------------------------+----------+----+--------+------------+------+---+--------+----+----

In [None]:
df_with_model_results = df_with_model_results.withColumn("normalized_vector_as_array", vector_to_array("normalized_vector"))
df_with_model_results.select("normalized_vector", "normalized_vector_as_array").show(truncate=False)


+----------------------------------------------+------------------------------------------------+
|normalized_vector                             |normalized_vector_as_array                      |
+----------------------------------------------+------------------------------------------------+
|[0.68,0.7575757575757576,0.532]               |[0.68, 0.7575757575757576, 0.532]               |
|[0.5,0.7575757575757576,0.56]                 |[0.5, 0.7575757575757576, 0.56]                 |
|[0.5700000000000001,0.7121212121212122,0.556] |[0.5700000000000001, 0.7121212121212122, 0.556] |
|[0.58,0.7727272727272727,0.8160000000000001]  |[0.58, 0.7727272727272727, 0.8160000000000001]  |
|[0.54,0.8333333333333334,0.684]               |[0.54, 0.8333333333333334, 0.684]               |
|[0.48,0.8181818181818182,0.332]               |[0.48, 0.8181818181818182, 0.332]               |
|[0.48,0.7878787878787878,0.484]               |[0.48, 0.7878787878787878, 0.484]               |
|[0.45,0.80303030303

In [None]:
# Extract the scaled features back into separate columns
for i, col_name in enumerate(scale_columns):
    df_with_model_results = df_with_model_results.withColumn(col_name, F.col("normalized_vector_as_array").getItem(i))



In [None]:
df_with_model_results.select("popularity", "loudness", "tempo", "features_vector", "normalized_vector_as_array").show(truncate=False)

+-------------------+------------------+------------------+------------------+------------------------------------------------+
|popularity         |loudness          |tempo             |features_vector   |normalized_vector_as_array                      |
+-------------------+------------------+------------------+------------------+------------------------------------------------+
|0.68               |0.7575757575757576|0.532             |[68.0,-10.0,133.0]|[0.68, 0.7575757575757576, 0.532]               |
|0.5                |0.7575757575757576|0.56              |[50.0,-10.0,140.0]|[0.5, 0.7575757575757576, 0.56]                 |
|0.5700000000000001 |0.7121212121212122|0.556             |[57.0,-13.0,139.0]|[0.5700000000000001, 0.7121212121212122, 0.556] |
|0.58               |0.7727272727272727|0.8160000000000001|[58.0,-9.0,204.0] |[0.58, 0.7727272727272727, 0.8160000000000001]  |
|0.54               |0.8333333333333334|0.684             |[54.0,-5.0,171.0] |[0.54, 0.8333333333333334,

In [None]:
final_df = df_with_model_results.drop('features_vector', 'normalized_vector', 'normalized_vector_as_array')

In [None]:
final_df.write.mode("overwrite").csv('./final_dataset.csv', header=True)

In [None]:
final_df.show(truncate=False)

+----------------------+---+---------------------+--------------------------------------------------------------+-------------------+----+--------+------------+------+---+------------------+----+-----------+------------+----------------+--------+-------+------------------+-----------+--------------+--------------+--------------+--------------+--------------+--------------+
|track_id              |_c0|artist_name          |track_name                                                    |popularity         |year|genre   |danceability|energy|key|loudness          |mode|speechiness|acousticness|instrumentalness|liveness|valence|tempo             |duration_ms|time_signature|year_2000_2004|year_2005_2009|year_2010_2014|year_2015_2019|year_2020_2024|
+----------------------+---+---------------------+--------------------------------------------------------------+-------------------+----+--------+------------+------+---+------------------+----+-----------+------------+----------------+--------+--

In [None]:
df = spark.read.csv('./final_dataset.csv', header=True, inferSchema=True)

In [None]:
df = df.select(
    "track_id",
    F.array(
        "acousticness",
        "danceability",
        "energy",
        "instrumentalness",
        "liveness",
        "loudness",
        "mode",
        "popularity",
        "speechiness",
        "tempo",
        "valence",
        "year_2000_2004",
        "year_2005_2009",
        "year_2010_2014",
        "year_2015_2019",
        "year_2020_2024",
    ).alias("features"),
)


In [None]:
df.show(truncate=False)

+----------------------+----------------------------------------------------------------------------------------------------------------------------------+
|track_id              |features                                                                                                                          |
+----------------------+----------------------------------------------------------------------------------------------------------------------------------+
|53QF56cjZA9RTuuMZDrSA6|[0.694, 0.483, 0.303, 0.0, 0.115, 0.7575757575757576, 1.0, 0.68, 0.0429, 0.532, 0.139, 0.0, 0.0, 1.0, 0.0, 0.0]                   |
|1s8tP3jP4GZcyHDsjvw218|[0.477, 0.572, 0.454, 1.37E-5, 0.0974, 0.7575757575757576, 1.0, 0.5, 0.0258, 0.56, 0.515, 0.0, 0.0, 1.0, 0.0, 0.0]                |
|7BRCa8MPiyuvr2VU3O9W0F|[0.338, 0.409, 0.234, 5.0E-5, 0.0895, 0.7121212121212122, 1.0, 0.5700000000000001, 0.0323, 0.556, 0.145, 0.0, 0.0, 1.0, 0.0, 0.0] |
|63wsZUhUZLlh1OsyrZq7sz|[0.807, 0.392, 0.251, 0.0, 0.0797, 0.772

In [None]:
["track_id",
"acousticness",
"danceability",
"energy",
"instrumentalness",
"liveness",
"loudness",
"mode",
"popularity",
"speechiness",
"tempo",
"valence",
"year_2000_2004",
"year_2005_2009",
"year_2010_2014",
"year_2015_2019",
"year_2020_2024"]

In [None]:
col="update_timestamp"

In [None]:
date_col = df[col]
quantiles_values = date_col.quantile([0.25, 0.5, 0.75], interpolation="nearest")
print(quantiles_values)

PySparkTypeError: [NOT_COLUMN_OR_FLOAT_OR_INT_OR_LIST_OR_STR] Argument `item` should be a column, float, integer, list or string, got function.