In [1]:
#IMPORTANT:
#The following guide was used to install Spark (PySpark) on Windows:
#https://medium.com/@GalarnykMichael/install-spark-on-windows-pyspark-4498a5d8d66c
#Then download the mysql-connector-java jar file separately

#In order to get this IPYNB script to work, we have to initiate Jupyter Notebook by running the following in the command line:
# pyspark --driver-class-path mysql-connector-java-8.0.23.jar --jars mysql-connector-java-8.0.23.jar 
#However, first change the directory to where the mysql-connector-java jar file is located (i.e. C:\opt\spark)

In [2]:
#import packages
from pyspark.sql import SparkSession #don't actually need to include this when command above is used
import pyspark.sql.functions as fc

In [3]:
spark

In [4]:
#load in the 4 tables
album = spark.read.format("jdbc").option("url","jdbc:mysql://localhost/mydatabase").option("driver","com.mysql.cj.jdbc.Driver").option("dbtable","album").option("user","root").option("password","dsci551").load()
artist = spark.read.format("jdbc").option("url","jdbc:mysql://localhost/mydatabase").option("driver","com.mysql.cj.jdbc.Driver").option("dbtable","artist").option("user","root").option("password","dsci551").load()
track = spark.read.format("jdbc").option("url","jdbc:mysql://localhost/mydatabase").option("driver","com.mysql.cj.jdbc.Driver").option("dbtable","track").option("user","root").option("password","dsci551").load()
audio = spark.read.format("jdbc").option("url","jdbc:mysql://localhost/mydatabase").option("driver","com.mysql.cj.jdbc.Driver").option("dbtable","audio").option("user","root").option("password","dsci551").load()

In [5]:
#view column types
album.printSchema()
artist.printSchema()
track.printSchema()
audio.printSchema()

root
 |-- album_id: string (nullable = true)
 |-- album_name: string (nullable = true)
 |-- album_release_date: string (nullable = true)
 |-- album_total_tracks: long (nullable = true)
 |-- album_artist_id: string (nullable = true)
 |-- album_image: string (nullable = true)
 |-- album_homepage: string (nullable = true)

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_genre: string (nullable = true)
 |-- artist_popularity: long (nullable = true)
 |-- artist_followers: long (nullable = true)
 |-- artist_image: string (nullable = true)
 |-- artist_homepage: string (nullable = true)

root
 |-- track_id: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- track_artist_id: string (nullable = true)
 |-- track_album_id: string (nullable = true)
 |-- track_number: long (nullable = true)
 |-- track_popularity: long (nullable = true)
 |-- track_explicit: boolean (nullable = true)
 |-- track_homepage: string (nullable = true)

In [6]:
#if we want to use spark.sql...
#create temp tables to query them
album.registerTempTable("temp_album")
artist.registerTempTable("temp_artist")
track.registerTempTable("temp_track")
audio.registerTempTable("temp_audio")

#we can then query like this
spark.sql("""select art.artist_name, count(t.track_id)
    from temp_track t
    left join temp_artist art on art.artist_id = t.track_artist_id
    where art.artist_name is not null
    group by art.artist_name
    order by count(t.track_id) desc
    limit 10;""").show()

+---------------+---------------+
|    artist_name|count(track_id)|
+---------------+---------------+
|Steven Universe|            300|
|    The Beatles|            275|
|  Harris Heller|            243|
|  Frank Sinatra|            232|
|     Elton John|            207|
|  Fleetwood Mac|            205|
|     Billy Joel|            161|
|  George Strait|            154|
|   Led Zeppelin|            153|
|    Linkin Park|            153|
+---------------+---------------+



In [7]:
type(album)

pyspark.sql.dataframe.DataFrame

In [8]:
###summary statistics we want to show
# top 10 artists by number of tracks:
# select art.artist_name, count(t.track_id)
# from track t
# left join artist art on art.artist_id = t.track_artist_id
# where art.artist_name is not null
# group by art.artist_name
# order by count(t.track_id) desc
# limit 10
most_tracks = track.join(artist, artist.artist_id == track.track_artist_id, how='left').filter("artist_name is not null") \
    .groupBy('artist_name').agg(fc.count("*").alias("cnt")).orderBy(fc.desc('cnt'))#.limit(10)
most_tracks_by_artist = most_tracks #.toPandas()
most_tracks_by_artist.coalesce(1).write.mode('overwrite').options(header='True', delimiter=',').csv("spark_df/most_tracks_by_artist")

In [9]:
# top 10 artists by popularity / followers... popularity based on music being streamed -- make scatterplot?
# ^^make scatterplot of popularity and followers?
# #select artist_name, artist_popularity, artist_followers
# from artist
# order by artist_popularity desc
# limit 10
# select artist_name, artist_popularity, artist_followers
# from artist
# order by artist_followers desc
# limit 10
most_artists_fol = artist.select('artist_name','artist_popularity','artist_followers').orderBy(fc.desc('artist_followers'))#.limit(10)
top_followers_by_artist = most_artists_fol #.toPandas()
top_followers_by_artist.coalesce(1).write.mode('overwrite').options(header='True', delimiter=',').csv("spark_df/top_followers_by_artist")

most_artists_pop = artist.select('artist_name','artist_popularity','artist_followers').orderBy(fc.desc('artist_popularity'))#.limit(10)
top_popularity_by_artist = most_artists_pop #.toPandas()
top_popularity_by_artist.coalesce(1).write.mode('overwrite').options(header='True', delimiter=',').csv("spark_df/top_popularity_by_artist")

In [10]:
# top 10 oldest and newest albums... can create bar chart of release decade?
# (select alb.album_name, art.artist_name, alb.album_release_date, art.artist_genre, alb.album_image, alb.album_homepage
# from album alb
# left join artist art on art.artist_id = alb.album_artist_id
# order by alb.album_release_date asc
# limit 10)
# union all
# (select alb.album_name, art.artist_name, alb.album_release_date, art.artist_genre, alb.album_image, alb.album_homepage
# from album alb
# left join artist art on art.artist_id = alb.album_artist_id
# where art.artist_name is not null
# order by alb.album_release_date desc
# limit 10)
oldest_album_date = album.join(artist, artist.artist_id == album.album_artist_id, how='left').filter('artist_name is not null')
oldest_album_date2 = oldest_album_date.select('album_name', 'artist_name', 'album_release_date').orderBy(fc.asc('album_release_date')).limit(10)
#oldest_album_date = album_release2.toPandas()

newest_album_date = album.join(artist, artist.artist_id == album.album_artist_id, how='left').filter('artist_name is not null')
newest_album_date2 = newest_album_date.select('album_name', 'artist_name', 'album_release_date').orderBy(fc.desc('album_release_date')).limit(10)
#newest_album_date = album_release4.toPandas()

oldest_newest_union = oldest_album_date2.union(newest_album_date2)
oldest_newest_albums = oldest_newest_union #.toPandas()
oldest_newest_albums.coalesce(1).write.mode('overwrite').options(header='True', delimiter=',').csv("spark_df/oldest_newest_albums")

In [11]:
#top 10 artists by average popularity and their average audio feature scores
# select art.artist_name, avg(t.track_popularity), avg(t.track_explicit), avg(aud.track_duration)/1000 as 'track_duration_sec', 
# 	avg(aud.danceability) as 'danceability', avg(aud.energy) as 'energy', avg(aud.loudness) as 'loudness', 
#     avg(aud.speechiness) as 'speechiness', avg(aud.acousticness) as 'acousticness', avg(aud.instrumentalness) as 'instrumentalness', 
#     avg(aud.liveness) as 'liveness', avg(aud.valence) as 'valence', avg(aud.tempo) as 'tempo', art.artist_homepage
# from track t
# left join audio aud on aud.track_id = t.track_id
# left join album alb on alb.album_id = t.track_album_id
# left join artist art on art.artist_id = t.track_artist_id
# group by art.artist_name
# order by avg(t.track_popularity) desc
# limit 10
audio_stats = track.join(audio, audio.track_id == track.track_id, how='left') \
    .join(album, album.album_id == track.track_album_id, how='left') \
    .join(artist, artist.artist_id == track.track_artist_id, how='left') \
    .groupBy('artist_name').agg(fc.mean('track_popularity').alias("avg_popularity"),(fc.mean('track_duration')/100).alias("avg_duration_sec") \
    ,fc.mean('danceability').alias("avg_danceability") \
    ,fc.mean('energy').alias("avg_energy") \
    ,fc.mean('loudness').alias("avg_loudness") \
    ,fc.mean('speechiness').alias("avg_speechiness") \
    ,fc.mean('acousticness').alias("avg_acousticness") \
    ,fc.mean('liveness').alias("avg_liveness") \
    ,fc.mean('valence').alias("avg_valence") \
    ,fc.mean('tempo').alias("avg_tempo")) \
    .orderBy(fc.desc('avg_popularity'))#.limit(15) #.toPandas()
audio_stats.coalesce(1).write.mode('overwrite').options(header='True', delimiter=',').csv("spark_df/audio_stats")