
# Phase 3: Stream Processing


In [1]:

# Install PySpark
!pip install -q pyspark


In [2]:

# Start SparkSession (no need for manual config)
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SongsStreamLab").getOrCreate()

# Test it works
spark.range(5).show()


+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



## Simulate a data stream (e.g., JSON or CSV)

In [3]:

# Simulate tracks/songs records (like CSV rows)
data = [
    ("101", "Ed Sheeran", ["pop", "uk pop"], 93639154, 96, "https://open.spotify.com/artist/6eUKZXaKkcviH0Ku9w2n3V", "Shivers", "=", "2021-09-10", 207320, False, 83, 0.834, 1),
    ("102", "Nirvana", ["grunge", "rock"], 16802723, 87, "https://open.spotify.com/artist/6olE6TJLqED3rqDCT0FyPh", "Smells Like Teen Spirit", "Nevermind", "1991-09-10", 301933, True, 83, 0.587, 1),
    ("103", "The Weeknd", ["canadian pop", "r&b"], 51478288, 98, "https://open.spotify.com/artist/1Xyo4u8uXC1ZmMpatF05PJ", "Blinding Lights", "After Hours", "2019-11-29", 200040, False, 94, 0.73, 1),
    ("104", "Eminem", ["hip hop", "rap"], 50367072, 97, "https://open.spotify.com/artist/7dGJo4pcD2V6oG8kP0tJRR", "Lose Yourself", "8 Mile", "2002-10-28", 326693, True, 89, 0.846, 1)
]

columns = [
    "track_ID", "artist_name", "genres", "followers", "artist_popularity", "artist_url",
    "track_name", "album_name", "release_date", "duration_ms", "explicit",
    "track_popularity", "energy", "mode"
]

# Create DataFrame
df = spark.createDataFrame(data, columns)
df.show()


+--------+-----------+-------------------+---------+-----------------+--------------------+--------------------+-----------+------------+-----------+--------+----------------+------+----+
|track_ID|artist_name|             genres|followers|artist_popularity|          artist_url|          track_name| album_name|release_date|duration_ms|explicit|track_popularity|energy|mode|
+--------+-----------+-------------------+---------+-----------------+--------------------+--------------------+-----------+------------+-----------+--------+----------------+------+----+
|     101| Ed Sheeran|      [pop, uk pop]| 93639154|               96|https://open.spot...|             Shivers|          =|  2021-09-10|     207320|   false|              83| 0.834|   1|
|     102|    Nirvana|     [grunge, rock]| 16802723|               87|https://open.spot...|Smells Like Teen ...|  Nevermind|  1991-09-10|     301933|    true|              83| 0.587|   1|
|     103| The Weeknd|[canadian pop, r&b]| 51478288|        

## Use PySpark to filter and process the data

In [4]:

# Filter high popularity tracks/songs
popular_tracks = df.filter(df["track_popularity"] >= 85)
popular_tracks.show()


+--------+-----------+-------------------+---------+-----------------+--------------------+---------------+-----------+------------+-----------+--------+----------------+------+----+
|track_ID|artist_name|             genres|followers|artist_popularity|          artist_url|     track_name| album_name|release_date|duration_ms|explicit|track_popularity|energy|mode|
+--------+-----------+-------------------+---------+-----------------+--------------------+---------------+-----------+------------+-----------+--------+----------------+------+----+
|     103| The Weeknd|[canadian pop, r&b]| 51478288|               98|https://open.spot...|Blinding Lights|After Hours|  2019-11-29|     200040|   false|              94|  0.73|   1|
|     104|     Eminem|     [hip hop, rap]| 50367072|               97|https://open.spot...|  Lose Yourself|     8 Mile|  2002-10-28|     326693|    true|              89| 0.846|   1|
+--------+-----------+-------------------+---------+-----------------+---------------

In [5]:
# Filter tracks/songs that are explicit
explicit_tracks = df.filter(df["explicit"] == True)
explicit_tracks.show()

+--------+-----------+--------------+---------+-----------------+--------------------+--------------------+----------+------------+-----------+--------+----------------+------+----+
|track_ID|artist_name|        genres|followers|artist_popularity|          artist_url|          track_name|album_name|release_date|duration_ms|explicit|track_popularity|energy|mode|
+--------+-----------+--------------+---------+-----------------+--------------------+--------------------+----------+------------+-----------+--------+----------------+------+----+
|     102|    Nirvana|[grunge, rock]| 16802723|               87|https://open.spot...|Smells Like Teen ...| Nevermind|  1991-09-10|     301933|    true|              83| 0.587|   1|
|     104|     Eminem|[hip hop, rap]| 50367072|               97|https://open.spot...|       Lose Yourself|    8 Mile|  2002-10-28|     326693|    true|              89| 0.846|   1|
+--------+-----------+--------------+---------+-----------------+--------------------+----

In [6]:
# Filter tracks/songs with energy greater than 0.8
high_energy_tracks = df.filter(df["energy"] > 0.8)
high_energy_tracks.show()

+--------+-----------+--------------+---------+-----------------+--------------------+-------------+----------+------------+-----------+--------+----------------+------+----+
|track_ID|artist_name|        genres|followers|artist_popularity|          artist_url|   track_name|album_name|release_date|duration_ms|explicit|track_popularity|energy|mode|
+--------+-----------+--------------+---------+-----------------+--------------------+-------------+----------+------------+-----------+--------+----------------+------+----+
|     101| Ed Sheeran| [pop, uk pop]| 93639154|               96|https://open.spot...|      Shivers|         =|  2021-09-10|     207320|   false|              83| 0.834|   1|
|     104|     Eminem|[hip hop, rap]| 50367072|               97|https://open.spot...|Lose Yourself|    8 Mile|  2002-10-28|     326693|    true|              89| 0.846|   1|
+--------+-----------+--------------+---------+-----------------+--------------------+-------------+----------+------------+-

## Save or display the processed output of the popular tracks

In [13]:
from pyspark.sql.functions import concat_ws

# Convert the 'genres' column to a comma-separated string
popular_tracks = popular_tracks.withColumn("genres", concat_ws(",", "genres"))

# Save to local folder (inside Colab environment)
output_path = "/content/sample_data"
popular_tracks.write.mode("overwrite").csv(output_path)

# Verify file was saved
import os
print("Files saved to:", os.listdir(output_path))

Files saved to: ['part-00000-4c195de9-0654-45a9-8b75-a237b340e037-c000.csv', '_SUCCESS', '._SUCCESS.crc', 'part-00001-4c195de9-0654-45a9-8b75-a237b340e037-c000.csv', '.part-00000-4c195de9-0654-45a9-8b75-a237b340e037-c000.csv.crc', '.part-00001-4c195de9-0654-45a9-8b75-a237b340e037-c000.csv.crc']
