# PySpark + CockroachDB Analytical Notebook

In [None]:
import os
from dotenv import load_dotenv
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, desc, countDistinct, max as spark_max, min as spark_min, explode

# Load environment variables from .env
load_dotenv()

print(" COCKROACH_USER:", os.getenv("COCKROACH_USER"))
print(" COCKROACH_PASS:", os.getenv("COCKROACH_PASS"))
print(" COCKROACH_HOST:", os.getenv("COCKROACH_HOST"))
print(" COCKROACH_PORT:", os.getenv("COCKROACH_PORT"))
print(" MONGO URI:", os.getenv("MONGO_ATLAS_URI"))

## 1. Import Required Libraries
We begin by importing PySpark libraries and loading the `.env` variables containing CockroachDB and MongoDB credentials.

In [None]:
spark = SparkSession.builder \
    .appName("CockroachDB_PySpark_Project") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.2.18") \
    .getOrCreate()

## 2. Initialize Spark Session
We initialize the Spark session and configure the required JDBC driver for CockroachDB.

In [None]:
COCKROACH_USER = os.getenv("COCKROACH_USER")
COCKROACH_PASS = os.getenv("COCKROACH_PASS")
COCKROACH_HOST = os.getenv("COCKROACH_HOST")
COCKROACH_PORT = os.getenv("COCKROACH_PORT")
DATABASE_NAME = "defaultdb"

jdbc_url = f"jdbc:postgresql://{COCKROACH_HOST}:{COCKROACH_PORT}/{DATABASE_NAME}?sslmode=require"

connection_properties = {
    "user": COCKROACH_USER,
    "password": COCKROACH_PASS,
    "driver": "org.postgresql.Driver"
}

## 3. CockroachDB Connection Properties
We read the CockroachDB credentials from environment variables and build the JDBC connection string.

In [None]:
def load_table(table_name):
    return spark.read.jdbc(
        url=jdbc_url,
        table=table_name,
        properties=connection_properties
    )

track_link_df = load_table("track_link")
audio_features_df = load_table("audio_features")
track_reference_df = load_table("track_reference")
lyrics_df = load_table("lyrics")

## 4. Load Tables into DataFrames
We load all four tables into Spark DataFrames for processing: `track_link`, `audio_features`, `track_reference`, and `lyrics`.

In [None]:
track_link_df.show(5)
audio_features_df.show(5)
track_reference_df.show(5)
lyrics_df.show(5)

## 5. Show Sample Records
Let's display a few records from each table to get an understanding of their structure.

In [None]:
track_link_df.join(track_reference_df, on="musicbrainz_id").select("track_title", "artist", "album").show(5)

## Join track metadata with artist and album
Join `track_link` with `track_reference` to retrieve track title, artist, and album info.

In [None]:
audio_features_df.orderBy(col("energy").desc()).select("musicbrainz_id", "energy").show(5)

## Top 5 most energetic tracks
Retrieve tracks ordered by energy score in descending order.

In [None]:
track_reference_df.join(audio_features_df, on="musicbrainz_id").join(lyrics_df, on="musicbrainz_id").select("title", "artist", "energy", "valence").show(5)

## Combine track metadata with energy and valence scores
Join track reference, audio features, and lyrics to get combined metadata with energy and valence.

In [None]:
track_reference_df.join(audio_features_df, on="musicbrainz_id").groupBy("country").agg(avg("tempo").alias("avg_tempo")).orderBy(desc("avg_tempo")).show(5)

## Average tempo grouped by country
Compute the average tempo of tracks for each country.

In [None]:
audio_features_df.filter((col("valence") > 0.7) & (col("danceability") > 0.7)).select("musicbrainz_id").show(5)

## Happy and danceable tracks
Filter tracks with valence > 0.7 and danceability > 0.7.

In [None]:
track_reference_df.groupBy("artist").count().orderBy(desc("count")).show(5)

## Count of tracks per artist
Get the count of tracks available for each artist, ordered by count.

In [None]:
lyrics_df.filter(col("genius_lyrics").isNotNull()).select("musicbrainz_id", "genius_url").show(5)

## Tracks with available Genius lyrics
Filter tracks that have available lyrics from Genius platform.

In [None]:
track_reference_df.groupBy("country").agg(countDistinct("artist").alias("unique_artists")).orderBy(desc("unique_artists")).show(5)

## Unique artist count per country
Count the distinct artists available per country.

In [None]:
track_reference_df.groupBy("release_date").count().orderBy(desc("count")).show(5)

## Count of tracks per release date
Group tracks by their release date and count them.

In [None]:
audio_features_df.orderBy(desc("danceability")).select("musicbrainz_id", "danceability").show(5)

## Top 5 most danceable tracks
Retrieve the most danceable tracks ordered by danceability.

In [None]:
audio_features_df.groupBy("sample_rate").count().orderBy(desc("count")).show(5)

## Count of tracks grouped by sample rate
Group tracks by their sample rate and count occurrences.

In [None]:
track_reference_df.filter(col("country") == "US").select("title", "artist").show(5)

## Tracks released in the US
Filter tracks that are marked with country as US.

In [None]:
audio_features_df.groupBy("musicbrainz_id").agg(avg("mfcc_1").alias("avg_mfcc_1")).orderBy(desc("avg_mfcc_1")).show(5)

## Average MFCC 1 feature per track
Compute the average MFCC 1 value grouped by track.

In [None]:
audio_features_df.groupBy("musicbrainz_id").agg(spark_max("tempo").alias("max_tempo"), spark_min("tempo").alias("min_tempo")).show(5)

## Max and min tempo per track
Get max and min tempo for each track.

In [None]:
track_reference_df.filter(col("release_date") > "2015-01-01").select("title", "artist").show(5)

## Tracks released after 2015
Filter tracks released after Jan 1, 2015.

In [None]:
track_reference_df.filter(col("length") > 300).select("title", "length").show(5)

## Long tracks with duration > 300 seconds
Filter tracks longer than 300 seconds.

In [None]:
audio_features_df.filter(col("speechiness") > 0.5).select("musicbrainz_id", "speechiness").show(5)

## Tracks with high speechiness (> 0.5)
Filter tracks where speechiness is greater than 0.5.

In [None]:
audio_features_df.filter(col("instrumentalness") > 0.8).select("musicbrainz_id", "instrumentalness").show(5)

## Instrumental tracks (instrumentalness > 0.8)
Filter tracks where instrumentalness is greater than 0.8.

In [None]:
lyrics_df.filter(col("lastfm_wiki_summary").isNotNull()).select("musicbrainz_id", "lastfm_wiki_summary").show(5)

## Tracks with available Last.fm wiki summary
Filter tracks with a non-null wiki summary from Last.fm.

In [None]:
audio_features_df.filter(col("zero_crossing_rate") > 0.1).select("musicbrainz_id", "zero_crossing_rate").show(5)