In [None]:
# Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from datetime import datetime
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import os

In [None]:
spark = SparkSession.builder.\
                    appName("spotify").\
                    config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1").\
                    config("spark.mongodb.input.uri", "mongodb+srv://**********").\
                    config("spark.mongodb.output.uri", "mongodb+srv://**********").\
                    config("spark.network.timeout", "7200s").\
                    config("spark.executor.heartbeatInterval", "1200s").\
                    getOrCreate()

In [None]:
spark

Configuration for accessing S3

In [None]:
# Using Chandrish's access creds
ACCESS_KEY = "****"
SECRET_KEY = "****"
spark._jsc.hadoopConfiguration().set("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1")
spark._jsc.hadoopConfiguration().set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", ACCESS_KEY)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", SECRET_KEY)

Reading Data from S3

#### Different Cluster settings
We tried to read data with two cluster settings:
- Cluster Setting 1: <br>
  9.1 LTS (includes Apache Spark 3.1.2, Scala 2.12) <br> 
  md5.large: 8gb memory 2 cores <br>
  Time: 7-8 mins (for reading data) <br>
  
- Cluster Setting 2: <br>
  10.3 (includes Apache Spark 3.2.1, Scala 2.12) <br>
  i3.2xlarge: 61 GB memory 8 cores <br>
  Time: 2.17 minutes

Reading playlists file, which is around 33GB and in json format

In [None]:
# reading playlists file
playlist_file = 's3a://spotify-recommendation/playlists'
lyrics_file = 's3a://spotify-recommendation/lyrics'
play_df = spark.read.option("multiline", "true").json(playlist_file)

Reading the lyrics file, which has lyrics for 130k songs

In [None]:
# reading lyrics file
lyrics_df = spark.read.json(lyrics_file)
lyrics_df = lyrics_df.select('track_uri', 'lyrics')

We have a nested json structure for the playlists file, so were exploding it to columns to make the join easier

In [None]:
# reading elements of the playlist file
playlist_df = play_df.select('playlists', explode('playlists')).\
                    select(explode('col.tracks'),'col.name','col.pid',
                       'col.num_followers', 'col.num_tracks','col.duration_ms','col.num_artists').\
                        withColumnRenamed('col.num_followers', 'num_followers').\
                        withColumnRenamed('col.num_tracks', 'tracks').\
                        withColumnRenamed('col.duration_ms', 'duration_ms').\
                        withColumnRenamed('col.num_artists', 'num_artists').\
                        withColumnRenamed('col.pid', 'pid').\
                        withColumnRenamed('col.name', 'name').\
                        select('col.*','*').drop('col')

We are converting the lyrics string into list of tokenized words

In [None]:
# tokenizing the lyrics
tokenizer = Tokenizer(inputCol="lyrics", outputCol="lyrics_words")
wordsData = tokenizer.transform(lyrics_df)

We are doing a left join of playlists data with lyrics data

In [None]:
# joining the two datasets
master_data = playlist_df.join(wordsData, 'track_uri', 'left')

Data Quality Checks

In [None]:
# checking the non null count for lyrics
master_data.where(master_data['lyrics'].isNotNull()).select('track_uri').distinct().count()

In [None]:
# checking the null count for lyrics
master_data.where(master_data['lyrics'].isNull()).select('track_uri').distinct().count()

In [None]:
master_data.show(3)

### Setting up the connection string

In [None]:
database = '*****'
collection = '*****'
user_name = '*****'
password = '*****'
address = '*****.mongodb.net'
connection_string = f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection}"

Writing the files onto MongoDB: <br>
Took 37 minutes with Cluster Setting 2

In [None]:
master_data.write.format("mongo").option("uri",connection_string).mode("append").save()
# it took 37 mins to write, when the notebook worked properly

Reading the data back from MongoDB

In [None]:
# reading the data back to check
df = spark.read.format('mongo').option("uri",connection_string).load()

Data Upload Checks

In [None]:
# checking the non null count and compare with previous result above
df.where(df['lyrics'].isNotNull()).select('track_uri').distinct().count()

In [None]:
# seeing how lyrics looks in the database
df.where(df['lyrics'].isNotNull()).show(1000)