In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col 


## Building the spark session:

In [None]:
sc = pyspark.SparkContext.getOrCreate()
spark = SparkSession \
.builder \
.appName("") \
.getOrCreate()
accessKeyId=""
secretAccessKey=""
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.awsAccessKeyId", accessKeyId)
spark._jsc.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey", secretAccessKey)

Intalling psycopg2 package to connect to postgres on RDS:

In [None]:
sc.install_pypi_package("psycopg2-binary")

In [None]:
import psycopg2
conn = psycopg2.connect(
    host="depgdb.c50u4bslwwg0.eu-west-2.rds.amazonaws.com",
    database="",
    user="",
    password="")
cursor = conn.cursor()

## 1. Get tables from RDS:

__Running queries to get the tables from our database on RDS and load on spark dataframes:__

__Get artists table:__

In [None]:
query=("SELECT * FROM spotify.artists")
cursor.execute(query)
artists_rows=cursor.fetchall()
columns_artists=['artist_id','artist_name','followers','popularity']
artists_df=spark.createDataFrame(artists_rows,columns_artists)
#artists_df.show()

__Get albums table:__

In [None]:
query=("SELECT * FROM spotify.albums")
cursor.execute(query)
albums_rows=cursor.fetchall()
columns_albums=['album_id','artist_id','album_group','album_type','album_name','release_date','total_tracks']
albums_df=spark.createDataFrame(albums_rows,columns_albums)
#albums_df.show()

__Get albums images links table:__

In [None]:
query=("SELECT * FROM spotify.albums_images_links")
cursor.execute(query)
image_links_rows=cursor.fetchall()
columns_images=['album_id','image_number','height','width','url']
image_links_df=spark.createDataFrame(image_links_rows,columns_images)
#image_links_df.show()

__Get tracks table:__

In [None]:
query=("SELECT * FROM spotify.tracks")
cursor.execute(query)
tracks_rows=cursor.fetchall()
columns_tracks=['track_id','album_id','disc_number','duration_ms','explicit','is_local','track_name','preview_url','track_number','track_type']
tracks_df=spark.createDataFrame(tracks_rows,columns_tracks)
#tracks_df.show()

__Get audio features table:__

In [None]:
query=("SELECT * FROM spotify.audio_features")
cursor.execute(query)
audio_features_rows=cursor.fetchall()
columns_audio=['track_id','danceability','energy','key','loudness','mode','speechiness','acousticness','intrumentalness','liveness','valence','tempo','register_type','duration_ms','time_signature']
audio_features_df=spark.createDataFrame(audio_features_rows,columns_audio)
#audio_features_df.show()

__Get lyrics table:__

In [None]:
query=("SELECT * FROM spotify.lyrics")
cursor.execute(query)
lyrics_rows=cursor.fetchall()
columns_lyrics=['artist_name','track_name','track_id','lyrics']
lyrics_df=spark.createDataFrame(lyrics_rows,columns_lyrics)

## 2.Removing duplicates: 

__Removing duplicates if is needed comparing the lenghts of dataframes before and after perform a 'dropDuplicates' operation on the dataframes:__

__So if the are more rows before applying the 'dropDuplicates' operation we keep the resulted dataframe as that means that there are duplicates in the table.__

__Artists table:__

In [None]:
prev_count=artists_df.count()
dropdup_artists = artists_df.dropDuplicates(["artist_id"])
after_count=dropdup_artists.count()
print(prev_count,">",after_count,"?")
if prev_count>after_count:artists_df=dropdup_artists

__Albums table:__

In [None]:
prev_count=albums_df.count()
dropdup_albums = albums_df.dropDuplicates(["album_id"])
after_count=dropdup_albums.count()
print(prev_count,">",after_count,"?")
if prev_count>after_count:albums_df=dropdup_albums

__Albums images links table:__

In [None]:
prev_count=image_links_df.count()
dropdup_image_links = image_links_df.dropDuplicates(["album_id","image_number"])
after_count=dropdup_image_links.count()
print(prev_count,">",after_count,"?")
if prev_count>after_count:image_links_df=dropdup_image_links

__Tracks table:__

In [None]:
prev_count=tracks_df.count()
dropdup_tracks = tracks_df.dropDuplicates(["track_id"])
after_count=dropdup_tracks.count()
print(prev_count,">",after_count,"?")
if prev_count>after_count:tracks_df=dropdup_tracks

__There are not duplicates in any tables.__

## 3. Null values imputation:

__We will show rows with null values for every column of each table.__

__Artists table:__

__Lets visuazlize rows with null values for each column of the table:__

In [None]:
for column in columns_artists:
    print(column)
    #artists_df.filter(col(column).isNull()).show()

__We get empty results for each column so there is not null values in artists table.__

__Albums table:__

__Lets visuazlize rows with null values for each column of the table:__

In [None]:
for column in columns_albums:
    print(column)
    #albums_df.filter(col(column).isNull()).show()

__We get empty results for each column so there is not null values in albums table.__

__Albums images links table:__

__Lets visuazlize rows with null values for each column of the table:__

In [None]:
for column in columns_images:
    print(column)
    #image_links_df.filter(col(column).isNull()).show()

__We get empty results for each column so there is not null values in albums_images_links table.__

__Tracks table:__

__Lets visuazlize rows with null values for each column of the table:__

In [None]:
for column in columns_tracks:
    print(column)
    #tracks_df.filter(col(column).isNull()).show()

__We see that there are several rows with null values in the column 'preview_url' in the tracks table.__

In [None]:
tracks_df=tracks_df.fillna("",subset=['preview_url'])

__So we filled with an empty string the cells with null values in that column.__

__Audio features table:__

__Lets visuazlize rows with null values for each column of the table:__

In [None]:
for column in columns_audio:
    print(column)
    #audio_features_df.filter(col(column).isNull()).show()

__We get empty results for each column so there is not null values in audio_features table.__

## 4. Database Normalization:

__We need to join tracks, audio features and lyrics dataframes to get a normalized database as these three tables have the same primary key 'track_id'.__

__As there can be to many musics in Spotify database whose lyrics are not in the Genius database we simply drop rows with null values in the lyrics table.__

In [None]:
lyrics_df=lyrics_df.dropna(how='any',subset=['lyrics'])#drop null values from lyrics dataframe

__We join tracks and audio features dataframes first:__

In [None]:
tracks_df=tracks_df.drop('duration_ms')#drop duration_ms because is duplicated
tracks_audio_df=tracks_df.join(audio_features_df,["track_id"],"inner")
prev_count=tracks_audio_df.count()
print(prev_count,'registers')
#tracks_audio_df.show()

__Then we join the resulted dataframe above with lyrics dataframe:__

In [None]:
lyrics_df=lyrics_df.drop('artist_name','track_name')#we drop these columns because only lyrics column is needed.
tracks_audio_lyrics_df=tracks_audio_df.join(lyrics_df,["track_id"],"left")
tracks_audio_lyrics_df=tracks_audio_lyrics_df.fillna("",subset=['lyrics'])

__After join these three tables we filled the null values of column 'lyrics' with an empty string. Those null values correspond to musics whose lyrics are not in the Genius database.__

## 5. Save tables in parquet format:

__Finally we save the resulted dataframes on parquet format partitioned in sevaral files each one to our S3 bucket:__

In [None]:
artists_df.write.parquet("s3://ucl-msin0166-2021-spotify-project/parquet_files/spotify_artists.parquet") 
albums_df.write.parquet("s3://ucl-msin0166-2021-spotify-project/parquet_files/spotify_albums.parquet") 
image_links_df.write.parquet("s3://ucl-msin0166-2021-spotify-project/parquet_files/spotify_image_links.parquet")
tracks_cleaned_df.write.parquet("s3://ucl-msin0166-2021-spotify-project/parquet_files/spotify_tracks.parquet")