In [1]:
# DEFINE FUNCTIONS <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

def get_access_token(client_id:str, client_sc:str):
    import requests
    
    headers = {
        'Content-Type': 'application/x-www-form-urlencoded',
    }
    data = f'grant_type=client_credentials&client_id={client_id}&client_secret={client_sc}'.encode()
    response = requests.post('https://accounts.spotify.com/api/token', headers=headers, data=data).json()
    access_token = response['access_token']

    return access_token

def get_response(access_token:str, endpoint:str, params:dict=None):
    import requests, json

    url = f"https://api.spotify.com/v1/{endpoint}"
    headers = {
        'Authorization': f'Bearer {access_token}',
    }

    if params != None:
        response = requests.get(url=url, params=params, headers=headers)
    else:
        response = requests.get(url=url, headers=headers)
    print(response)
    
    if response.status_code == 200:
        try:
            data = response.json()
            return data
        except json.decoder.JSONDecodeError:
            raise ValueError(f"API Server Error - {endpoint} - Invalid JSON content in response: {response.text}")
    else:
        raise ValueError(f"API Server Error - {endpoint} - Non-200 status code received: {response.status_code}")
    

def post_response(access_token:str, endpoint:str, data:dict=None):
    import requests

    url = f"https://api.spotify.com/v1/{endpoint}"
    headers = {
        'Authorization': f'Bearer {access_token}',
    }

    response = requests.post(url=url, headers=headers)
    print(response)
    
    if response.status_code == 200:
        pass
    else:
        raise ValueError(f"API Server Error - {endpoint} - Non-200 status code received: {response.status_code}")

In [2]:
# INFOS <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

from configparser import ConfigParser

config = ConfigParser()
config.read("/home/hooniegit/git/Spotify-DemoProject/recommendation/demo/config.ini")

client_id = config.get("spotify", "client_id")
client_sc = config.get("spotify", "client_sc")
user_id = config.get("spotify", "user_id")

In [3]:
# START CODE <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from math import ceil
import json

In [4]:
### Build Session
spark = SparkSession.builder \
    .master(config.get("spark", "master")) \
    .appName("pipeline_demo") \
    .getOrCreate()

### Create Access Token
access_token = get_access_token(client_id=client_id, client_sc=client_sc)

### Create Playlist Lists
endpoint = f"users/{user_id}/playlists"
params = {
    "limit": 50,
    "offset": 0
}

playlists = get_response(access_token=access_token, endpoint=endpoint, params=params)
json_string  = json.dumps(playlists)
json_rdd = spark.sparkContext.parallelize([json_string])
df_plinfo = spark.read.json(json_rdd, multiLine=True)

items = df_plinfo \
    .withColumn("items", explode("items")) \
    .select("items.id") \
    .rdd.flatMap(lambda x: x).collect()

### Create Playlist Item Lists
track_list = [] # <---------- "Need To Use"
for id in items:
    endpoint = f"playlists/{id}/tracks"
    playlist_spec = get_response(access_token=access_token, endpoint=endpoint)
    
    json_string  = json.dumps(playlist_spec)
    json_rdd = spark.sparkContext.parallelize([json_string])
    df_playlist_spec = spark.read.json(json_rdd, multiLine=True)
    
    ids = df_playlist_spec \
    .withColumn("items", explode("items")) \
    .select("items.track.id") \
    .rdd.flatMap(lambda x: x).collect()
    
    track_list += ids
    
    total = df_playlist_spec.select("total").first()[0]
    left = int(total)-100
    cnt = ceil(left/100)
    
    for i in range(cnt):
        offset = 100 + 100 * i
        params = {"offset":offset}
        
        playlist_spec = get_response(access_token=access_token, endpoint=endpoint, params=params)
        
        json_string  = json.dumps(playlist_spec)
        json_rdd = spark.sparkContext.parallelize([json_string])
        df_playlist_spec = spark.read.json(json_rdd, multiLine=True)
        
        ids = df_playlist_spec \
        .withColumn("items", explode("items")) \
        .select("items.track.id") \
        .rdd.flatMap(lambda x: x).collect()
        
        track_list += ids      

cnt = ceil(len(track_list)/50)

big_list = []
for j in range(cnt):
    big_list.append(track_list[j*50:(j+1)*50])


24/01/06 20:29:01 WARN Utils: Your hostname, workspace resolves to a loopback address: 127.0.0.1; using 220.118.158.128 instead (on interface eno1)
24/01/06 20:29:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/06 20:29:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


<Response [200]>


                                                                                

<Response [200]>


                                                                                

<Response [200]>


                                                                                

In [5]:
# Create Dataframe : main_df
main_df = None
cnt = 0
for small_list in big_list:
    
    print(cnt)
    
    tracks = ""
    for id in small_list:
        tracks += f",{id}"
    tracks = tracks[1:]
    
    endpoint = "tracks"
    params = {"ids":tracks}
    track = get_response(access_token=access_token, endpoint=endpoint, params=params)
    
    json_string  = json.dumps(track)
    json_rdd = spark.sparkContext.parallelize([json_string])
    df_tracks = spark.read.json(json_rdd, multiLine=True)
    
    df_tracks = spark.read.json(json_rdd, multiLine=True) \
        .withColumn("tracks", explode("tracks")) \
        .selectExpr("tracks.id",
                    "tracks.popularity")
    
    endpoint = "audio-features"
    params = {"ids":tracks}
    audio_features = get_response(access_token=access_token, endpoint=endpoint, params=params)
    
    json_string  = json.dumps(audio_features)
    json_rdd = spark.sparkContext.parallelize([json_string])
    df_audio_features = spark.read.json(json_rdd, multiLine=True) \
        .withColumn("audio_features", explode("audio_features")) \
        .selectExpr("audio_features.id",
                    "audio_features.key",
                    "audio_features.mode",
                    "audio_features.time_signature",
                    "audio_features.tempo",
                    "audio_features.acousticness",
                    "audio_features.danceability",
                    "audio_features.energy",
                    "audio_features.instrumentalness",
                    "audio_features.liveness",
                    "audio_features.loudness",
                    "audio_features.speechiness",
                    "audio_features.valence")
    
    result_track_df = df_tracks.join(df_audio_features, "id", "left")
    if cnt == 0:
        main_df = result_track_df
    else:
        main_df = main_df.union(result_track_df)
    cnt += 1

0
<Response [200]>


                                                                                

<Response [200]>


                                                                                

1
<Response [200]>


                                                                                

<Response [200]>
2
<Response [200]>


                                                                                

<Response [200]>


                                                                                

In [8]:
### Load Dataframe : df_dw
dw_tracks = spark.read.parquet("file:///home/hooniegit/git/Spotify-DemoProject/spark/data/parquet/tracks/main/*")
dw_audioFeatures = spark.read.parquet("file:///home/hooniegit/git/Spotify-DemoProject/spark/data/parquet/tracks/audio_features/*")
df_dw = dw_tracks.join(dw_audioFeatures, "id", "left")

df_dw.show() # << TEST



+--------------------+----------+---+----+--------------+-------+------------+------------+------+----------------+--------+--------+-----------+-------+
|                  id|popularity|key|mode|time_signature|  tempo|acousticness|danceability|energy|instrumentalness|liveness|loudness|speechiness|valence|
+--------------------+----------+---+----+--------------+-------+------------+------------+------+----------------+--------+--------+-----------+-------+
|4wFnPjhS34uFvYTxG...|        37|  7|   1|             4|125.438|      0.0969|       0.316|  0.43|             0.0|   0.107|  -7.344|     0.0294|  0.112|
|7hZXichYpoQfFWbJa...|        22|  1|   1|             4|126.007|      0.0594|       0.638| 0.948|           0.702|   0.148|  -5.116|      0.036|  0.573|
|7jJdh0fTXNacFF2x4...|        33|  0|   1|             4|115.905|      0.0232|       0.395| 0.797|         9.51E-6|   0.161|  -6.944|     0.0372|  0.332|
|2aOvfAWnsmz8ezrcO...|         0|  5|   0|             4|150.071|       0.97

                                                                                

In [9]:
### Union Dataframe : df
df = df_dw.union(main_df)

df.show() # << TEST



+--------------------+----------+---+----+--------------+-------+------------+------------+------+----------------+--------+--------+-----------+-------+
|                  id|popularity|key|mode|time_signature|  tempo|acousticness|danceability|energy|instrumentalness|liveness|loudness|speechiness|valence|
+--------------------+----------+---+----+--------------+-------+------------+------------+------+----------------+--------+--------+-----------+-------+
|4wFnPjhS34uFvYTxG...|        37|  7|   1|             4|125.438|      0.0969|       0.316|  0.43|             0.0|   0.107|  -7.344|     0.0294|  0.112|
|7hZXichYpoQfFWbJa...|        22|  1|   1|             4|126.007|      0.0594|       0.638| 0.948|           0.702|   0.148|  -5.116|      0.036|  0.573|
|7jJdh0fTXNacFF2x4...|        33|  0|   1|             4|115.905|      0.0232|       0.395| 0.797|         9.51E-6|   0.161|  -6.944|     0.0372|  0.332|
|2aOvfAWnsmz8ezrcO...|         0|  5|   0|             4|150.071|       0.97

                                                                                

In [11]:
### Create Features
from pyspark.ml.feature import VectorAssembler

selected_features = ["popularity", "key", "mode", "time_signature", "tempo", "acousticness", "danceability", "energy", "instrumentalness", "liveness", "loudness", "speechiness", "valence"]
assembler = VectorAssembler(inputCols=selected_features, outputCol="features")
df_assembled = assembler.transform(df)

df_assembled.show()



+--------------------+----------+---+----+--------------+-------+------------+------------+------+----------------+--------+--------+-----------+-------+--------------------+
|                  id|popularity|key|mode|time_signature|  tempo|acousticness|danceability|energy|instrumentalness|liveness|loudness|speechiness|valence|            features|
+--------------------+----------+---+----+--------------+-------+------------+------------+------+----------------+--------+--------+-----------+-------+--------------------+
|000O8nXpAK5QAppKv...|         5|  1|   1|             4|128.018|     5.12E-4|       0.504|  0.93|          0.0107|   0.281|   -2.89|     0.0539|  0.355|[5.0,1.0,1.0,4.0,...|
|000kSCs9tKtH1VXI3...|        11|  2|   1|             4| 129.85|       0.863|       0.425| 0.266|             0.0|  0.0989|  -6.791|     0.0337|  0.253|[11.0,2.0,1.0,4.0...|
|0010mZpCCwlPwoBiB...|        38|  3|   1|             3|124.993|       0.108|       0.527| 0.793|         3.28E-6|   0.144| 

                                                                                

In [None]:
# 고려해야 할 사항들
"""
    0. 스케일링 가능 항목 : acousticness, danceability, energy, instrumentalness, liveness, speechiness, valence
    1. loudness 항목 : 절댓값 반환 필요 / 계산식 확인 필요
    1. mode 항목 : 0 또는 1만 가짐
    2. key, tempo 항목 : 이상치가 큰 영향을 끼칠 수 있음
"""

In [12]:
from pyspark.ml.feature import MinMaxScaler

minmax_scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
minmax_model = minmax_scaler.fit(df_assembled)
minmax_scaled_df = minmax_model.transform(df_assembled)

minmax_scaled_df.show()

24/01/06 20:42:13 WARN TaskSetManager: Lost task 3.0 in stage 127.0 (TID 1413) (220.118.158.128 executor 12): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`VectorAssembler$Lambda$3812/115049348`: (struct<popularity_double_VectorAssembler_6f8ce1e0a33d:double,key_double_VectorAssembler_6f8ce1e0a33d:double,mode_double_VectorAssembler_6f8ce1e0a33d:double,time_signature_double_VectorAssembler_6f8ce1e0a33d:double,tempo:double,acousticness:double,danceability:double,energy:double,instrumentalness:double,liveness:double,loudness:double,speechiness:double,valence:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$G

Py4JJavaError: An error occurred while calling o388.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 127.0 failed 4 times, most recent failure: Lost task 7.3 in stage 127.0 (TID 1480) (220.118.158.128 executor 1): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`VectorAssembler$$Lambda$3812/115049348`: (struct<popularity_double_VectorAssembler_6f8ce1e0a33d:double,key_double_VectorAssembler_6f8ce1e0a33d:double,mode_double_VectorAssembler_6f8ce1e0a33d:double,time_signature_double_VectorAssembler_6f8ce1e0a33d:double,tempo:double,acousticness:double,danceability:double,energy:double,instrumentalness:double,liveness:double,loudness:double,speechiness:double,valence:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage15.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:92)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:877)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:877)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 26 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`VectorAssembler$$Lambda$3812/115049348`: (struct<popularity_double_VectorAssembler_6f8ce1e0a33d:double,key_double_VectorAssembler_6f8ce1e0a33d:double,mode_double_VectorAssembler_6f8ce1e0a33d:double,time_signature_double_VectorAssembler_6f8ce1e0a33d:double,tempo:double,acousticness:double,danceability:double,energy:double,instrumentalness:double,liveness:double,loudness:double,speechiness:double,valence:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage15.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:92)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:877)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:877)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 26 more
