In [2]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark import SparkConf

In [3]:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder \
    .appName("Spotify Million Playlist Project") \
    .getOrCreate()

## Spotify Million Playlist Project: PySpark Analysis

This notebook demonstrates a comprehensive analysis of the Spotify Million Playlist Dataset using Apache Spark, leveraging PySpark for efficient data processing and analysis on a large scale.

**Goal:** To explore the dataset, identify popular tracks, artists, and albums, analyze playlist characteristics, and visualize key insights.

**Dataset:** Spotify Million Playlist Dataset (subset)

### Step 0: Install & Import Libraries

Before starting the analysis, we need to install the necessary libraries and import them into our environment.

In [4]:
!pip install pyspark

from google.colab import drive
drive.mount('/content/drive')

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, count, desc

import matplotlib.pyplot as plt
import seaborn as sns
import os

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


**Insight:** This step ensures all required libraries are available and connected to Google Drive to access the dataset.

### Step 1: Initialize Spark Session

We initialize a Spark session, which is the entry point for any Spark functionality. We name the application for better identification.

In [5]:
spark = SparkSession.builder.appName("Spotify Big Data Analysis").getOrCreate()
print("Spark session initialized.")

Spark session initialized.


**Insight:** A Spark session is successfully initialized, providing the foundation for distributed data processing.

### Step 2: Load Dataset

The dataset, which is in JSON format, is loaded into a Spark DataFrame. We then display the schema and a few rows to understand the data structure.

In [6]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType

# Define schema for Spotify Million Playlist Dataset
schema = StructType([
    StructField("info", StructType([
        StructField("num_tracks", IntegerType(), True),
        StructField("num_albums", IntegerType(), True),
        StructField("num_followers", IntegerType(), True),
        StructField("num_edits", IntegerType(), True),
        StructField("playlist_name", StringType(), True),
        StructField("playlist_id", IntegerType(), True),
    ])),
    StructField("playlists", ArrayType(
        StructType([
            StructField("pid", IntegerType(), True),
            StructField("name", StringType(), True),
            StructField("num_tracks", IntegerType(), True),
            StructField("num_albums", IntegerType(), True),
            StructField("num_followers", IntegerType(), True),
            StructField("tracks", ArrayType(
                StructType([
                    StructField("track_name", StringType(), True),
                    StructField("artist_name", StringType(), True),
                    StructField("album_name", StringType(), True),
                    StructField("duration_ms", IntegerType(), True),
                    StructField("pos", IntegerType(), True),
                ])
            ))
        ])
    ))
])

# Load dataset using explicit schema
data_path = "/content/drive/MyDrive/data/mpd.slice.*.json"

df = spark.read.schema(schema).json(data_path)

print("DataFrame schema:")
df.printSchema()
print("Sample rows:")
df.show(2, truncate=False)

DataFrame schema:
root
 |-- info: struct (nullable = true)
 |    |-- num_tracks: integer (nullable = true)
 |    |-- num_albums: integer (nullable = true)
 |    |-- num_followers: integer (nullable = true)
 |    |-- num_edits: integer (nullable = true)
 |    |-- playlist_name: string (nullable = true)
 |    |-- playlist_id: integer (nullable = true)
 |-- playlists: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- pid: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- num_tracks: integer (nullable = true)
 |    |    |-- num_albums: integer (nullable = true)
 |    |    |-- num_followers: integer (nullable = true)
 |    |    |-- tracks: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- track_name: string (nullable = true)
 |    |    |    |    |-- artist_name: string (nullable = true)
 |    |    |    |    |-- album_name: string (nullable = true)
 |    |    |  

**Insight:** The JSON dataset containing playlist information is successfully loaded into a Spark DataFrame, showing a nested structure with playlist details.

### Step 3: Explore Playlists

We extract the playlist information from the nested structure using the `explode` function. This creates a new DataFrame where each row represents a single playlist.

In [7]:
playlists = df.select(explode(col("playlists")).alias("playlist"))
print("Playlists DataFrame schema:")
playlists.printSchema()
print("Sample playlist rows:")
playlists.show(2, truncate=False)

Playlists DataFrame schema:
root
 |-- playlist: struct (nullable = true)
 |    |-- pid: integer (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- num_tracks: integer (nullable = true)
 |    |-- num_albums: integer (nullable = true)
 |    |-- num_followers: integer (nullable = true)
 |    |-- tracks: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- track_name: string (nullable = true)
 |    |    |    |-- artist_name: string (nullable = true)
 |    |    |    |-- album_name: string (nullable = true)
 |    |    |    |-- duration_ms: integer (nullable = true)
 |    |    |    |-- pos: integer (nullable = true)

Sample playlist rows:


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/socket.py", line 720, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

**Insight:** The playlists are successfully extracted into a separate DataFrame, which is easier to work with for playlist-level analysis.

### Step 4: Flatten Tracks

We further flatten the structure to extract individual track details from each playlist. This results in a DataFrame where each row represents a track within a playlist, along with its associated playlist ID.

In [None]:
from pyspark.sql.functions import col, explode

# Explode playlists to get individual tracks
playlists = df.select(explode(col("playlists")).alias("playlist"))

tracks = playlists.select(
    col("playlist.pid").alias("pid"),
    explode(col("playlist.tracks")).alias("track")
)

# Select track-level details
tracks = tracks.select(
    "pid",
    col("track.track_name").alias("track_name"),
    col("track.artist_name").alias("artist_name"),
    col("track.album_name").alias("album_name"),
    col("track.pos").alias("position")
)

print("Sample tracks rows:")
tracks.show(5, truncate=False)

**Insight:** Track details are successfully flattened, creating a dataset ready for analyzing individual tracks and their properties across playlists.

### Step 5: Track Counts

We count the occurrences of each track across all playlists to identify the most frequent tracks.

In [None]:
from pyspark.sql.functions import col, explode

# Step 5: Flatten playlists into track-level DataFrame
playlists = df.select(explode(col("playlists")).alias("playlist"))

tracks = playlists.select(
    col("playlist.pid").alias("pid"),
    explode(col("playlist.tracks")).alias("track")
)

# Extract relevant track details
tracks = tracks.select(
    "pid",
    col("track.track_name").alias("track_name"),
    col("track.artist_name").alias("artist_name"),
    col("track.album_name").alias("album_name"),
    col("track.pos").alias("position")
)

print("Sample tracks rows:")
tracks.show(5, truncate=False)

**Insight:** The top tracks are identified, revealing the most popular songs across the sampled playlists.

### Step 6: Artist Counts

Similarly, we count the occurrences of each artist across all playlists to identify the most frequent artists.

In [None]:
top_artists = tracks.groupBy("artist_name") \
    .agg(count("*").alias("count")) \
    .orderBy(desc("count"))

print("Top 10 most frequent artists:")
top_artists.show(10, truncate=False)

**Insight:** The top artists are determined, highlighting the most frequently featured artists in the playlists.

### Step 7: Visualization – Top Tracks

We visualize the top 10 most frequent tracks using a bar plot to easily compare their popularity.

In [None]:
pdf_tracks = top_tracks.limit(10).toPandas()

plt.figure(figsize=(12,5))
sns.barplot(x="count", y="track_name", data=pdf_tracks, palette="viridis")
plt.title("Top 10 Most Frequent Tracks", fontsize=14)
plt.show()

**Insight:** The bar plot clearly shows the relative popularity of the top 10 tracks, providing a visual summary of the most frequent songs.

### Step 8: Visualization – Top Artists

We visualize the top 10 most frequent artists using a bar plot to compare their frequency in the playlists.

In [None]:
pdf_artists = top_artists.limit(10).toPandas()

plt.figure(figsize=(12,5))
sns.barplot(x="count", y="artist_name", data=pdf_artists, palette="magma")
plt.title("Top 10 Most Frequent Artists", fontsize=14)
plt.show()

**Insight:** The visualization of top artists quickly reveals which artists have the strongest presence in the dataset's playlists.

### Step 9: Playlist Size Distribution

We analyze the distribution of playlist sizes (number of tracks per playlist) by calculating descriptive statistics.

In [None]:
playlist_sizes = playlists.selectExpr("size(playlist.tracks) as num_tracks")

print("Playlist size distribution statistics:")
playlist_sizes.describe().show()

**Insight:** Descriptive statistics of playlist sizes provide an overview of how many tracks playlists typically contain, showing minimum, maximum, average, and standard deviation.

### Step 10: Visualization – Playlist Size Distribution

We visualize the distribution of playlist sizes using a histogram to understand the frequency of different playlist lengths.

In [None]:
pdf_sizes = playlist_sizes.toPandas()

plt.figure(figsize=(10,5))
sns.histplot(pdf_sizes["num_tracks"], bins=50, kde=False, color="blue")
plt.title("Distribution of Playlist Sizes", fontsize=14)
plt.xlabel("Number of Tracks")
plt.ylabel("Count")
plt.show()

**Insight:** The histogram illustrates the distribution of playlist lengths, showing that most playlists are relatively short, with fewer very long playlists.

### Step 11: Most Popular Albums

We identify the most frequent albums across all playlists by counting the occurrences of each album.

In [None]:
top_albums = tracks.groupBy("album_name") \
    .agg(count("*").alias("count")) \
    .orderBy(desc("count"))

print("Top 10 most frequent albums:")
top_albums.show(10, truncate=False)

**Insight:** The most popular albums are listed, indicating which albums are most frequently included in playlists.

### Step 12: Visualization – Top Albums

We visualize the top 10 most frequent albums using a bar plot for easy comparison.

In [None]:
pdf_albums = top_albums.limit(10).toPandas()

plt.figure(figsize=(12,5))
sns.barplot(x="count", y="album_name", data=pdf_albums, palette="cubehelix")
plt.title("Top 10 Most Frequent Albums", fontsize=14)
plt.show()

**Insight:** The bar plot of top albums provides a visual ranking of the albums that appear most often in the dataset's playlists.

### Step 13: Unique Tracks & Artists

We calculate the total number of unique tracks and artists in the dataset to understand the diversity of the content.

In [None]:
unique_tracks = tracks.select("track_name").distinct().count()
unique_artists = tracks.select("artist_name").distinct().count()

print("Unique Tracks:", unique_tracks)
print("Unique Artists:", unique_artists)

**Insight:** The counts of unique tracks and artists reveal the breadth of musical content within the dataset.

### Step 14: Save Results

Finally, we save the analysis results (top tracks, artists, albums, and playlist sizes) as CSV files to a specified output path for future use or sharing.

In [None]:
output_path = "/content/drive/MyDrive/spotify_project/results"

top_tracks.write.mode("overwrite").csv(output_path + "/top_tracks")
top_artists.write.mode("overwrite").csv(output_path + "/top_artists")
top_albums.write.mode("overwrite").csv(output_path + "/top_albums")
playlist_sizes.write.mode("overwrite").csv(output_path + "/playlist_sizes")

print(" Results saved successfully at:", output_path)

In [None]:
from pyspark.sql.types import StructType, StructField, ArrayType, StringType, IntegerType

# Define the schema for the Spotify Million Playlist Project data
schema = StructType([
    StructField("info", StructType([
        StructField("generated_on", IntegerType(), True),
        StructField("slice", IntegerType(), True),
        StructField("version", IntegerType(), True)
    ]), True),
    StructField("playlists", ArrayType(StructType([
        StructField("description", StringType(), True),
        StructField("name", StringType(), True),
        StructField("num_followers", IntegerType(), True),
        StructField("num_edits", IntegerType(), True),
        StructField("num_tracks", IntegerType(), True),
        StructField("num_albums", IntegerType(), True),
        StructField("num_artists", IntegerType(), True),
        StructField("duration_ms", IntegerType(), True),
        StructField("collaborative", StringType(), True),
        StructField("pid", IntegerType(), True),
        StructField("tracks", ArrayType(StructType([
            StructField("pos", IntegerType(), True),
            StructField("artist_name", StringType(), True),
            StructField("track_uri", StringType(), True),
            StructField("artist_uri", StringType(), True),
            StructField("album_uri", StringType(), True),
            StructField("track_name", StringType(), True),
            StructField("album_name", StringType(), True),
            StructField("duration_ms", IntegerType(), True)
        ]), True), True)
    ]), True), True)
])

print("Schema defined successfully.")