<a href="https://colab.research.google.com/github/hxiufan/709A2/blob/main/DA709A2_Task2_MapReduce.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#set up PySpark on my cluster. Ensure Spark is installed and configured properly.
from pyspark import SparkContext, SparkConf
import csv

# Create a SparkContext
conf = SparkConf().setAppName("SpotifyTracksAnalysis")
sc = SparkContext(conf=conf)

In [None]:
# Mount my Google Drive
from google.colab import drive


In [None]:
# load the data into an RDD and preprocess it.
lines = sc.textFile("/content/drive/My Drive/DA709/cleaned_spotify_tracks.csv")
raw_data = sc.textFile("/content/drive/My Drive/DA709/cleaned_spotify_tracks.csv")

In [None]:
# Parse CSV lines and split fields
def parse_line(line):
    reader = csv.reader([line])
    return next(reader)

In [None]:
# Filter out header and parse rows
header = raw_data.first()
data = raw_data.filter(lambda line: line != header).map(parse_line)

In [None]:
# Extract selected fields and convert to tuple
# CSV columns: (name, genre, artists, album, popularity, duration_ms, explicit)
rdd = data.map(lambda row: (row[2], row[1], int(row[5]), int(row[6])))

In [None]:
# Perform MapReduce operations
# Aggregate by artist to calculate sum, average, and count
artist_stats = rdd.map(lambda x: (x[0], (x[2], 1, x[3]))) \
    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1], a[2] + b[2]))

In [None]:
# Calculate statistics
artist_result = artist_stats.map(lambda x: (x[0], {
    "total_duration_ms": x[1][0],
    "average_duration_ms": x[1][0] / x[1][1],
    "count_tracks": x[1][1],
    "total_explicit_count": x[1][2]
}))

In [None]:
# Collect and display the results
results = artist_result.collect()

In [None]:
# Print the results
for artist, stats in results:
    print(f"Artist: {artist}")
    print(f"  Total Duration (ms): {stats['total_duration_ms']}")
    print(f"  Average Duration (ms): {stats['average_duration_ms']:.2f}")
    print(f"  Track Count: {stats['count_tracks']}")
    print(f"  Total Explicit Count: {stats['total_explicit_count']}")
    print("-" * 40)

Artist: alt-rock
  Total Duration (ms): 1509
  Average Duration (ms): 30.18
  Track Count: 50
  Total Explicit Count: 9867802
----------------------------------------
Artist: ambient
  Total Duration (ms): 1630
  Average Duration (ms): 32.60
  Track Count: 50
  Total Explicit Count: 7062878
----------------------------------------
Artist: black-metal
  Total Duration (ms): 1284
  Average Duration (ms): 25.68
  Track Count: 50
  Total Explicit Count: 13926933
----------------------------------------
Artist: blues
  Total Duration (ms): 1878
  Average Duration (ms): 37.56
  Track Count: 50
  Total Explicit Count: 9887225
----------------------------------------
Artist: bossanova
  Total Duration (ms): 1276
  Average Duration (ms): 25.52
  Track Count: 50
  Total Explicit Count: 9651849
----------------------------------------
Artist: brazil
  Total Duration (ms): 2089
  Average Duration (ms): 41.78
  Track Count: 50
  Total Explicit Count: 9296537
----------------------------------------

In [None]:
#wrap up by stopping the Spark context and handling any necessary cleanup.
# Stop the SparkContext
sc.stop()