In [0]:
"./utils/configuration"

In [0]:
artists = spark.read.parquet("/mnt/musicstg/silver/artists/")
albums = spark.read.parquet("/mnt/musicstg/silver/albums/")
tracks = spark.read.parquet("/mnt/musicstg/silver/tracks/")
collaboations = spark.read.parquet("/mnt/musicstg/silver/collaborations/")

### Album Aggregations

In [0]:
from pyspark.sql import functions as f

# Aggregate album counts by artist and album type (pivoted columns for each album_type)
album_type_aggs = (
    albums
    .groupBy("artist_id")
    .pivot("album_type")
    .agg(f.countDistinct("album_id"))
    .fillna(0)  # Fill nulls with 0 for album types not present
)

# Aggregate earliest and latest album release dates per artist
album_type_dates = (
    albums
    .groupBy("artist_id")
    .agg(
        f.min("release_date").alias("earliest_release_date"),
        f.max("release_date").alias("latest_release_date"),
    )
)

# Join album type counts with release date aggregations
album_type_aggs = album_type_aggs.join(album_type_dates, "artist_id", "left")

# Display the aggregated DataFrame
display(album_type_aggs)

In [0]:
from pyspark.sql import functions as f

# Aggregate total albums and release date range per artist
album_aggs = (
    albums.groupBy("artist_id")
    .agg(
        f.countDistinct("album_id").alias("total_albums"),  # Count of unique albums per artist
        f.min("release_date").alias("earliest_release_date"),  # Earliest album release date per artist
        f.max("release_date").alias("latest_release_date"),    # Latest album release date per artist
    )
)

# Display the aggregated album DataFrame
display(album_aggs)

### Artists Aggregation

In [0]:
from pyspark.sql import functions as f

# Aggregate artist-level statistics: max followers and average popularity
artists_aggs = (
    artists.groupBy("artist_id",)
    .agg(
        f.max("followers").alias("max_followers"),      # Maximum followers per artist
        f.avg("popularity").alias("avg_popularity"),    # Average popularity per artist
    )
)

In [0]:
# display(artists_aggs)

### Explicit content

In [0]:
from pyspark.sql import functions as F

# Aggregate explicit track statistics per artist
explicit_aggs = (
    tracks.groupBy("artist_id")
    .agg(
        F.sum(F.col("explicit").cast("int")).alias("explicit_track_count"),  # Count of explicit tracks per artist
        F.count("*").alias("total_tracks")                                   # Total number of tracks per artist
    )
    # Calculate the ratio of explicit tracks to total tracks per artist
    .withColumn(
        "explicit_ratio",
        F.when(F.col("total_tracks") != 0, F.col("explicit_track_count") / F.col("total_tracks")).otherwise(None)
    )
)

In [0]:
# display(explicit_aggs)

### Average track duration


In [0]:
from pyspark.sql import functions as F

# Aggregate track duration statistics per artist
track_duration_aggs = (
    tracks.groupBy("artist_id")
    .agg(
        F.avg("duration_ms").alias("avg_track_duration_ms"),      # Average track duration in ms per artist
        F.min("duration_ms").alias("min_track_duration_ms"),      # Minimum track duration in ms per artist
        F.max("duration_ms").alias("max_track_duration_ms"),      # Maximum track duration in ms per artist
        F.sum("duration_ms").alias("total_duration_ms"),          # Total track duration in ms per artist
        F.count("*").alias("track_count_duration")                # Total number of tracks per artist
    )
)

In [0]:
# display(track_duration_aggs)

### Market Availability

In [0]:
bronze_df = spark.read.json("/mnt/musicstg/bronze/artists_full/")

In [0]:
from pyspark.sql.functions import size
from pyspark.sql.functions import explode
from pyspark.sql import functions as F

# Aggregate market availability statistics per artist
market_aggs = (
    bronze_df
    # Explode the tracks array to get one row per track per artist
    .select("artist_id", explode("tracks").alias("track"))
    # Select artist_id, track id, and count of available markets for each track
    .select(
        "artist_id",
        "track.id",
        size("track.available_markets").alias("market_count")
    )
    # Group by artist_id to aggregate market statistics
    .groupBy("artist_id")
    .agg(
        F.avg("market_count").alias("avg_market_count"),  # Average number of markets per artist
        F.max("market_count").alias("max_market_count"),  # Maximum number of markets per artist
        F.min("market_count").alias("min_market_count")   # Minimum number of markets per artist
    )
)

In [0]:
# display(market_aggs)

### collaboration Aggregation

In [0]:
from pyspark.sql import functions as F

# Aggregate total unique collaborations per artist
top_collab = (
    collaboations
    .groupBy("artist_id")
    .agg(F.countDistinct("collaborator_id").alias("total_collaborations"))  # Count of unique collaborators per artist
)

In [0]:
# display(top_collab)

### Joining all Aggregation

In [0]:
# Join all artist-level aggregations into a final DataFrame
artists_final = (
    artists
    .join(album_type_aggs, "artist_id", "left")             # Join album type and release date aggregations
    .join(artists_aggs, "artist_id", "left")                # Join artist statistics (followers, popularity)
    .join(explicit_aggs, "artist_id", "left")               # Join explicit track statistics
    .join(track_duration_aggs, "artist_id", "left")         # Join track duration statistics
    .join(market_aggs, "artist_id", "left")                 # Join market availability statistics
    .join(top_collab, "artist_id", "left")                  # Join collaboration statistics
)

In [0]:
# Write the final aggregated artist DataFrame to Parquet in overwrite mode

artists_final.write.mode("overwrite").parquet("/mnt/musicstg/gold/artists_final")

In [0]:
# display(artists_final)