In [2]:
#Setting up the environment

!java -version

#Install Spark
#download file
!wget -q http://apache.osuosl.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
#extract the file
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
#install findspark package
!pip install -q findspark

openjdk version "11.0.15" 2022-04-19
OpenJDK Runtime Environment (build 11.0.15+10-Ubuntu-0ubuntu0.20.04.1)
OpenJDK 64-Bit Server VM (build 11.0.15+10-Ubuntu-0ubuntu0.20.04.1, mixed mode, sharing)


In [3]:
# 

import os
pathToSpark = "/mnt/c/Users/walid/Desktop/Big Data/Final/spark-3.2.1-bin-hadoop3.2"
os.environ["SPARK_HOME"] = pathToSpark

In [4]:
import findspark
findspark.init()

# create entry points to spark
try:
    sc.stop()
except:
    pass
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = SparkConf().setAppName("FinalProject").setMaster("local[*]")
sc=SparkContext(conf = conf)
spark = SparkSession(sparkContext=sc)

In [13]:
# Reading in the directory containing all the JSON files and storing them into
# Spark's RDD format

import time

Filepath = "/mnt/c/Users/walid/Desktop/Big Data/Final/mpdata/*.json"

start = time.time()

df = spark.read.option("multiline", "true").json(Filepath)

end = time.time()
total_time = end-start

print("took " + str(total_time) + " seconds")

took 35.14468455314636 seconds


In [21]:
# Taking a look at the schema, we see that all of the headers were successfully imported and 
# we have all of the data we need to begin analyzing
#
# The format of the JSON files is deeply nested and so we only have two main columns:
# (1) info regarding the slice/part of the json files and when it was generated... info that is useless to us
# and (2) a column of playlists, which consists of rows of arrays, meaning each row contains multiple playlists
# 
# We will have to flatten and normalize this dataframe first before we can analyze the data

df.show(5)
df.printSchema()

+--------------------+--------------------+
|                info|           playlists|
+--------------------+--------------------+
|{2017-12-04 03:05...|[{false, null, 82...|
|{2017-12-03 08:41...|[{false, null, 10...|
|{2017-12-04 03:05...|[{false, null, 16...|
|{2017-12-04 03:05...|[{false, null, 39...|
|{2017-12-04 03:05...|[{false, null, 82...|
+--------------------+--------------------+
only showing top 5 rows

root
 |-- info: struct (nullable = true)
 |    |-- generated_on: string (nullable = true)
 |    |-- slice: string (nullable = true)
 |    |-- version: string (nullable = true)
 |-- playlists: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- collaborative: string (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- duration_ms: long (nullable = true)
 |    |    |-- modified_at: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- num_albums: long (nullable = true)
 |    |  

In [22]:
from pyspark.sql import functions as F

In [49]:
# Here, we've removed the useless info column and exploded the array of playlists so that
# each playlist was it's own row. 
#
# To double check, we counted the number of rows (1,000,000 playlists)

df = df.select(F.explode(F.col("playlists")).alias("playlists"))
#df1.printSchema()
df.show()
print("counting # of playlists...")
df.count()

+--------------------+
|           playlists|
+--------------------+
|{false, null, 824...|
|{false, null, 278...|
|{false, null, 275...|
|{false, null, 178...|
|{false, null, 243...|
|{false, null, 134...|
|{false, null, 523...|
|{false, null, 238...|
|{false, null, 805...|
|{false, null, 279...|
|{false, null, 872...|
|{false, null, 977...|
|{false, really gr...|
|{false, null, 293...|
|{false, null, 200...|
|{false, null, 797...|
|{false, null, 865...|
|{false, null, 939...|
|{false, null, 667...|
|{false, null, 434...|
+--------------------+
only showing top 20 rows

counting # of playlists...


1000000

In [50]:
# Here we take the "playlists" column exploded from earlier and select every column to be mapped to its proper header
# 
# We can now work with the playlists data to analyze, calculate, and answer questions.

df = df.select("playlists.*") #TODO: change * to only columns that we need to speed up operation times
df.show(3)
df.printSchema()

+-------------+-----------+-----------+-----------+----------+----------+-----------+---------+-------------+----------+------+--------------------+
|collaborative|description|duration_ms|modified_at|      name|num_albums|num_artists|num_edits|num_followers|num_tracks|   pid|              tracks|
+-------------+-----------+-----------+-----------+----------+----------+-----------+---------+-------------+----------+------+--------------------+
|        false|       null|    8247098| 1502928000|      pump|        32|         27|       23|            2|        36|834000|[{What's up (feat...|
|        false|       null|   27861520| 1472860800| Summer 16|        89|         75|        5|            2|       117|834001|[{And Star Power,...|
|        false|       null|   27577113| 1507075200|old school|        95|         65|       18|            2|       126|834002|[{Up All Night, s...|
+-------------+-----------+-----------+-----------+----------+----------+-----------+---------+-----------

In [61]:
# Here we isolate the tracks column, which has a lot of information about the tracks nested inside arrays
# we explode it and only select the columns that we are interested in looking at...


df_tracks = df.select("tracks")
df_tracks = df_tracks.select(F.explode(F.col("tracks")).alias("tracks"))
df_tracks = df_tracks.select("tracks.album_name", "tracks.artist_name", "tracks.track_name", "tracks.duration_ms")
df_tracks.show(10)
df_tracks.count()

+--------------------+--------------+--------------------+-----------+
|          album_name|   artist_name|          track_name|duration_ms|
+--------------------+--------------+--------------------+-----------+
|What's up (feat. ...|   Post Malone|What's up (feat. ...|     290533|
|     T R A P S O U L| Bryson Tiller|               Don't|     198293|
|    Remember My Name|      Lil Durk|             Like Me|     238439|
|            Barter 6|    Young Thug|               Check|     230693|
|            Barter 6|    Young Thug|With That (feat. ...|     202533|
|         Best Friend|    Young Thug|         Best Friend|     213000|
|good kid, m.A.A.d...|Kendrick Lamar|  Backseat Freestyle|     212653|
|Dreams Worth More...|     Meek Mill|R.I.C.O. (feat. D...|     197133|
|           SremmLife|  Rae Sremmurd|             No Type|     200080|
|Quarterback (feat...|          Thug|Quarterback (feat...|     301767|
+--------------------+--------------+--------------------+-----------+
only s

66346428