In [2]:
from pyspark.sql import SparkSession
import os 
import json 
import re
import pandas as pd 
import numpy as np
import time 
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, BooleanType

The following is the code to load data and convert the data from json format to spark dataframe format. We then save the dataframes into multiple files in parquet format. This makes the upload size smaller and faster.
The whole loading process was done on local machine.  

In [3]:
# Create a Spark session
spark = (
    SparkSession.builder.master("local[*]")
    .appName("PySpark JSON Nested Example")
    .getOrCreate()
)

In [10]:
# Define the schema
schema = StructType([
    StructField("pid", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("num_tracks", IntegerType(), True),
    StructField("num_albums", IntegerType(), True),
    StructField("num_followers", IntegerType(), True),
    # StructField("num_edits", IntegerType(), True),
    # StructField("duration_ms", IntegerType(), True),
    # StructField("num_artists", IntegerType(), True),
    # StructField("collaborative", BooleanType(), True),
    StructField("tracks", ArrayType(
        StructType([
            StructField("pos", StringType(), True), 
            StructField("track_name", StringType(), True),
            StructField("track_artist", StringType(), True),
            StructField("track_uri", StringType(), True),
            StructField("album_name", StringType(), True),
            StructField("album_uri", StringType(), True),
            StructField("artist_name", StringType(), True),
            StructField("artist_uri", StringType(), True)
        ])
    ), True)
])

In [4]:
schema_playlist = StructType([
    StructField("playlist_pid", IntegerType(), True),
    StructField("pos", StringType(), True),
    StructField("track_name", StringType(), True),
    StructField("track_artist", StringType(), True),
    StructField("track_uri", StringType(), True),
    StructField("album_uri", StringType(), True),
    StructField("album_name", StringType(), True),
    StructField("artist_name", StringType(), True),
    StructField("artist_uri", StringType(), True)
])

every json file has following structrue:  
the file is made of dict object: with two main keys "info" and "playlist". playlist is the object of interest  
playlist is a list of dict objects with each dict object as: with "name", "collaborative", "pid", "modified_at", "num_tracks", "num_albums", "num_followers", "tracks"  
tracks is list of dict objects: with "pos", "artist_name", "track_uri", "artist_uri", "track_name", "album_uri", "duration_ms", "album_name"    

This function is just to read and print the dataframe. 

In [5]:
path = "C:/Users/Tiam Tee/Documents/Spotify100M_data_project/spotify_million_playlist_dataset/data"
save_path = "C:/Users/Tiam Tee/Documents/Spotify100M_data_project/rdd_data_metadata/batch_10"
save_path_tracks_only = "C:/Users/Tiam Tee/Documents/Spotify100M_data_project/rdd_data_tracksdata/batch_10" 

def read_multiple_files(path=path, schema=schema):

    filenames = os.listdir(path) 
    merge_data = []
    merge_track_data = []

    for index, filename in enumerate(filenames):

        path_direc = os.path.join(path, filename) # complete path to file
        with open(path_direc, 'r') as f: #open file
            data = json.load(f) #load json file

        playlists = data['playlists'] #value of playlist is a list
        if index < 10:
            merge_data.extend(playlists) #merge all the data into one list
            for playlist in playlists:
                for track in playlist['tracks']:
                    track['playlist_pid'] = playlist['pid']
                merge_track_data.extend(playlist['tracks']) #merge all the tracks into one list
        
    merge_data_rdd = spark.sparkContext.parallelize(merge_data) #convert to rdd
    merge_track_data_rdd = spark.sparkContext.parallelize(merge_track_data) #convert to rdd
    merge_sdf = spark.createDataFrame(merge_data_rdd, schema)
    merge_track_sdf = spark.createDataFrame(merge_track_data_rdd, schema_playlist) 

    return merge_sdf, merge_track_sdf

# start = time.localtime()
# print(f'start time: {start}')
# merge_metadata, tracks_only_frame = read_multiple_files()
# merge_metadata.write.parquet(save_path, mode='overwrite')
# tracks_only_frame.write.parquet(save_path_tracks_only, mode='overwrite')
# end = time.localtime()
# print(f'end time: {end}')
# merge_metadata.show(5)
# tracks_only_frame.show(5)

Here, we read and load the data, merge data from 10 json files at a time. Then, save it in parquet format. 

In [6]:
path = "C:/Users/Tiam Tee/Documents/Spotify100M_data_project/spotify_million_playlist_dataset/data" 
save_path = "C:/Users/Tiam Tee/Documents/Spotify100M_data_project/rdd_data_metadata/batch_{}" #batch_{}
save_path_tracks_only = "C:/Users/Tiam Tee/Documents/Spotify100M_data_project/rdd_datatracksdata/batch_{}_tracks_only" #/batch_1_tracks_only

def read_100_files(path=path, save_path=save_path, save_path_tracks_only=save_path_tracks_only,n=100):
    
    filenames = os.listdir(path)[:n]

    for batch_num in range(0, int(n/10), 1): 
        merge_data = []
        merge_track_data = []
        for filename in filenames[batch_num*10:batch_num*10+10]: 
            path_direc = os.path.join(path, filename)
            with open(path_direc, 'r') as f:
                data = json.load(f)
                playlists = data['playlists']
                merge_data.extend(playlists)
                for playlist in playlists:
                    for track in playlist['tracks']:
                        track['playlist_pid'] = playlist['pid']
                        merge_track_data.extend(playlist['tracks'])
        
        merge_data_rdd = spark.sparkContext.parallelize(merge_data)
        merge_track_data_rdd = spark.sparkContext.parallelize(merge_track_data)
        merge_sdf = spark.createDataFrame(merge_data_rdd, schema)
        merge_track_sdf = spark.createDataFrame(merge_track_data_rdd, schema_playlist)
        merge_sdf.write.parquet(save_path.format(batch_num), mode='overwrite')
        merge_track_sdf.write.parquet(save_path_tracks_only.format(batch_num), mode='overwrite')
        print(f'batch {batch_num} done')
        spark.catalog.clearCache()
    
        
# read_100_files(n=10)

In [7]:
sdf_from_parquet = spark.read.parquet("C:/Users/Tiam Tee/Documents/Spotify100M_data_project/rdd_data_metadata/batch_1")
sdf_from_parquet.show(truncate=False) # an example

+-----+---------------+----------+----------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------