# Início

In [157]:
from pyspark.sql import SparkSession

In [158]:
# Vamos criar a sessão:

sessao_spark = SparkSession.builder\
    .appName('Recomendados PySpark')\
    .getOrCreate()

sessao_spark

In [159]:
# Leitura dos dados:

dados = sessao_spark.read.csv('dados_musicas.csv', header = True, sep = ';', inferSchema = True)

In [160]:
dados.limit(5).toPandas()

Unnamed: 0,valence,year,acousticness,artists,danceability,duration_ms,energy,explicit,id,instrumentalness,key,liveness,loudness,mode,name,popularity,speechiness,tempo,artists_song
0,0.285,2000,0.00239,Coldplay,0.429,266773,0.661,0,3AJwUDP919kvQ9QcozQPxg,0.000121,11,0.234,-7.227,1,Yellow,84,0.0281,173.372,Coldplay - Yellow
1,0.613,2000,0.143,OutKast,0.843,270507,0.806,1,0I3q5fE6wg7LIfHGngUTnV,0.0,4,0.0771,-5.946,0,Ms. Jackson,80,0.269,94.948,OutKast - Ms. Jackson
2,0.4,2000,0.00958,Linkin Park,0.556,216880,0.864,0,60a0Rd6pjrkxjPbaKzXjfq,0.0,3,0.209,-5.87,0,In the End,84,0.0584,105.143,Linkin Park - In the End
3,0.543,2000,0.00664,3 Doors Down,0.545,233933,0.865,0,6ZOBP3NvffbU4SZcrnt1k6,1.1e-05,11,0.168,-5.708,0,Kryptonite,78,0.0286,99.009,3 Doors Down - Kryptonite
4,0.76,2000,0.0302,Eminem,0.949,284200,0.661,1,3yfqSUWxFvZELEM4PmlwIR,0.0,5,0.0454,-4.244,0,The Real Slim Shady,80,0.0572,104.504,Eminem - The Real Slim Shady


In [161]:
# Analisando o tipo das variáveis:

dados.printSchema()

root
 |-- valence: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- artists: string (nullable = true)
 |-- danceability: double (nullable = true)
 |-- duration_ms: integer (nullable = true)
 |-- energy: double (nullable = true)
 |-- explicit: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- key: integer (nullable = true)
 |-- liveness: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- artists_song: string (nullable = true)



In [12]:
# Podemos checar quantas linhas temos:
dados.count()

20311

In [13]:
# Quantidade de colunas:
len(dados.columns)

19

In [162]:
# Podemos checar se temos dados nulos usando functions:

import pyspark.sql.functions as f

In [163]:
dados.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in dados.columns]).toPandas()

Unnamed: 0,valence,year,acousticness,artists,danceability,duration_ms,energy,explicit,id,instrumentalness,key,liveness,loudness,mode,name,popularity,speechiness,tempo,artists_song
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [19]:
# Olhando para os primeiros registros da nossa base de dados, só temos músicas dos anos 2000. Será que só tem músicas desse ano?

print(sorted(dados.select('year').distinct().collect()))

[Row(year=2000), Row(year=2001), Row(year=2002), Row(year=2003), Row(year=2004), Row(year=2005), Row(year=2006), Row(year=2007), Row(year=2008), Row(year=2009), Row(year=2010), Row(year=2011), Row(year=2012), Row(year=2013), Row(year=2014), Row(year=2015), Row(year=2016), Row(year=2017), Row(year=2018), Row(year=2019), Row(year=2020)]


In [166]:
dados_anos = sessao_spark.read.csv('dados_musicas_ano.csv', header = True, sep = ',', inferSchema = True)

In [167]:
dados_anos.limit(5).toPandas()

Unnamed: 0,mode,year,acousticness,danceability,duration_ms,energy,instrumentalness,liveness,loudness,speechiness,tempo,valence,popularity,key
0,1,1921,0.886896,0.418597,260537.166667,0.231815,0.344878,0.20571,-17.048667,0.073662,101.531493,0.379327,0.653333,2
1,1,1922,0.938592,0.482042,165469.746479,0.237815,0.434195,0.24072,-19.275282,0.116655,100.884521,0.535549,0.140845,10
2,1,1923,0.957247,0.577341,177942.362162,0.262406,0.371733,0.227462,-14.129211,0.093949,114.01073,0.625492,5.389189,0
3,1,1924,0.9402,0.549894,191046.707627,0.344347,0.581701,0.235219,-14.231343,0.092089,120.689572,0.663725,0.661017,10
4,1,1925,0.962607,0.573863,184986.92446,0.278594,0.418297,0.237668,-14.146414,0.111918,115.521921,0.621929,2.604317,5


In [168]:
# Podemos ver que a base de dados acima possui músicas de anos diferentes da base `dados_musicas`. Podemos fazer um filtro para
# considerar apenas as músicas dos anos 2000 pra frente:

dados_anos = dados_anos\
    .filter('year >= 2000')

dados_anos.limit(5).toPandas()

Unnamed: 0,mode,year,acousticness,danceability,duration_ms,energy,instrumentalness,liveness,loudness,speechiness,tempo,valence,popularity,key
0,1,2000,0.289323,0.590918,242724.642638,0.625413,0.101168,0.197686,-8.247766,0.089205,118.999323,0.559475,46.684049,7
1,1,2001,0.286842,0.583318,240307.79601,0.626986,0.107214,0.187026,-8.305095,0.089182,117.765399,0.541479,48.750125,7
2,1,2002,0.282624,0.57616,239503.283,0.64127,0.088048,0.193911,-7.68664,0.084308,119.239738,0.542397,48.6555,7
3,1,2003,0.256471,0.575763,244670.57523,0.660165,0.083049,0.196976,-7.485545,0.093926,120.914622,0.530504,48.626407,7
4,1,2004,0.280559,0.56768,237378.708037,0.648868,0.077934,0.202199,-7.601655,0.094239,121.290346,0.524489,49.273143,7


In [26]:
dados_anos.count()

21

In [28]:
len(dados_anos.columns)

14

In [170]:
# Podemos estudar a relação entre as variáveis loudness e year utilizando um gráfico interativo:

import plotly.express as px

fig = px.line(dados_anos.toPandas(), x = 'year', y = 'loudness', markers = True, title = 'Variação de loudness durante os anos')
fig.show()

# Veremos que o gráfico abaixo é inconclusivo:

In [171]:
# Podemos visualizar a relação entre mais variáveis durante os anos em um mesmo gráfico:

import plotly.graph_objects as go

fig = go.Figure()

temp = dados_anos.toPandas()

fig.add_trace(go.Scatter(x = temp['year'], y = temp['acousticness'], name = 'Acousticness'))

fig.add_trace(go.Scatter(x = temp['year'], y = temp['valence'], name = 'Valence'))

fig.add_trace(go.Scatter(x = temp['year'], y = temp['danceability'], name = 'Danceability'))

fig.add_trace(go.Scatter(x = temp['year'], y = temp['energy'], name = 'Energy'))

fig.add_trace(go.Scatter(x = temp['year'], y = temp['instrumentalness'], name = 'Instrumentalness'))

fig.add_trace(go.Scatter(x = temp['year'], y = temp['liveness'], name = 'Liveness'))

fig.add_trace(go.Scatter(x = temp['year'], y = temp['speechiness'], name = 'Speechiness'))


fig.show()

# Veremos que as características não parecem mudar conforme os anos nesses dados:

In [172]:
fig = px.imshow(dados_anos.toPandas().corr(), text_auto = True)
fig.show()

In [173]:
fig = px.imshow(dados_anos.drop('mode').toPandas().corr(), text_auto = True)
fig.show()

## Clustering

#### StandardScaler

In [174]:
dados_generos = sessao_spark.read.csv('dados_musicas_genero.csv', header = True, inferSchema = True)

In [175]:
dados_generos.show(truncate = False)

+----+----------------------+-------------------+-------------------+------------------+-------------------+--------------------+-------------------+-------------------+--------------------+------------------+-------------------+------------------+---+
|mode|genres                |acousticness       |danceability       |duration_ms       |energy             |instrumentalness    |liveness           |loudness           |speechiness         |tempo             |valence            |popularity        |key|
+----+----------------------+-------------------+-------------------+------------------+-------------------+--------------------+-------------------+-------------------+--------------------+------------------+-------------------+------------------+---+
|1   |21st century classical|0.9793333333333332 |0.16288333333333335|160297.66666666663|0.07131666666666665|0.60683367          |0.3616             |-31.514333333333337|0.04056666666666667 |75.3365           |0.10378333333333334|27.833333333

In [44]:
dados_generos.count()

2973

In [45]:
dados_generos.select('genres').distinct().count()

2973

In [46]:
len(dados_generos.columns)

14

In [176]:
# Para começarmos a usar as técnicas nos nossos dados, precisamos vatorizá-los:

from pyspark.ml.feature import VectorAssembler

In [177]:
dados_generos.columns

['mode',
 'genres',
 'acousticness',
 'danceability',
 'duration_ms',
 'energy',
 'instrumentalness',
 'liveness',
 'loudness',
 'speechiness',
 'tempo',
 'valence',
 'popularity',
 'key']

In [178]:
# Por padrão chamamos de X o objeto referente as features:
X = dados_generos.columns
# Além disso, vamos retirar a coluna dos gêneros:
X.remove('genres')

In [179]:
X

['mode',
 'acousticness',
 'danceability',
 'duration_ms',
 'energy',
 'instrumentalness',
 'liveness',
 'loudness',
 'speechiness',
 'tempo',
 'valence',
 'popularity',
 'key']

In [180]:
# Vetorizando:
dados_generos_vetor = VectorAssembler(inputCols = X, outputCol = 'features').transform(dados_generos).select(['features','genres'])

In [181]:
dados_generos_vetor.show(truncate = False, n = 5)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------+
|features                                                                                                                                                                                                                   |genres                |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------+
|[1.0,0.9793333333333332,0.16288333333333335,160297.66666666663,0.07131666666666665,0.60683367,0.3616,-31.514333333333337,0.04056666666666667,75.3365,0.10378333333333334,27.83333333333333,6.0]                            |21st century classical|
|[1.0,0.49478,0.2993

In [182]:
# Sabemos que os algoritmos de agrupamento são sensíveis as escalas. Vamos então padronizar os nossos dados:

from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol = 'features', outputCol = 'scaled_features')
scaler_model = scaler.fit(dados_generos_vetor)
dados_generos_scaler = scaler_model.transform(dados_generos_vetor)

In [183]:
dados_generos_scaler.show()

+--------------------+--------------------+--------------------+
|            features|              genres|     scaled_features|
+--------------------+--------------------+--------------------+
|[1.0,0.9793333333...|21st century clas...|[2.68174831000279...|
|[1.0,0.49478,0.29...|               432hz|[2.68174831000279...|
|[1.0,0.762,0.7120...|               8-bit|[2.68174831000279...|
|[1.0,0.6514170195...|                  []|[2.68174831000279...|
|[1.0,0.6765573049...|          a cappella|[2.68174831000279...|
|[1.0,0.45921,0.51...|            abstract|[2.68174831000279...|
|[1.0,0.3421466666...|      abstract beats|[2.68174831000279...|
|[1.0,0.2438540633...|    abstract hip hop|[2.68174831000279...|
|[0.0,0.3229999999...|           accordeon|[0.0,1.0101313736...|
|[1.0,0.446125,0.6...|           accordion|[2.68174831000279...|
|[0.0,0.0679505384...|          acid house|[0.0,0.2125045534...|
|[1.0,0.2569145079...|           acid rock|[2.68174831000279...|
|[1.0,0.00683,0.66...|   

#### PCA

In [184]:
from pyspark.ml.feature import PCA

In [185]:
pca = PCA(k=2, inputCol = 'scaled_features', outputCol='pca_features')
model_pca = pca.fit(dados_generos_scaler)
dados_generos_pca = model_pca.transform(dados_generos_scaler)

In [186]:
# Na visualizaçao abaixo veremos apenas duas colunas já que escolhemos `k = 2`:
dados_generos_pca.select('pca_features').show(truncate=False)

+-----------------------------------------+
|pca_features                             |
+-----------------------------------------+
|[2.507095366888567,0.4381691373769795]   |
|[-0.5969679056633482,4.981612052751348]  |
|[-4.158460276223561,-0.836652508107994]  |
|[-2.387344878512217,-0.4877989015663402] |
|[-2.6501218371679087,-0.5756819768820469]|
|[-1.4965091203367626,1.8644183183717793] |
|[-3.9235207721573238,0.2851835002352837] |
|[-4.611011109831115,-0.6783790472312371] |
|[-2.8376900630842297,-0.5712993716580516]|
|[-2.706690139892783,-1.2593788079708297] |
|[-4.698331383924287,1.2765569680619446]  |
|[-3.3759874966798677,0.7560741064307468] |
|[-5.608998877066022,1.0427311644393218]  |
|[0.2954946352117687,-0.2763864586236307] |
|[-2.5725591062870423,-1.3169815431109795]|
|[-3.4008228020493454,0.5073029625781903] |
|[-4.366720316263419,-0.336482705977109]  |
|[-2.7254698167724003,0.5058604987046363] |
|[-4.958112358381605,1.2627579957290729]  |
|[-3.693495184642272,1.382276206

In [187]:
# Como fizemos várias etapas para tratar os nossos dados, podemos criar uma pipeline para todo esse processo:

from pyspark.ml import Pipeline

pca_pipeline = Pipeline(stages = [VectorAssembler(inputCols = X, outputCol = 'features'),
                                  StandardScaler(inputCol = 'features', outputCol = 'scaled_features'),
                                  PCA(k=2, inputCol = 'scaled_features', outputCol='pca_features')])

In [188]:
# Ajustando o pipeline:

pca_pipeline_model = pca_pipeline.fit(dados_generos)

In [189]:
dados_generos_pca = pca_pipeline_model.transform(dados_generos)

In [190]:
dados_generos_pca.show()

+----+--------------------+-------------------+-------------------+------------------+-------------------+--------------------+-------------------+-------------------+--------------------+------------------+-------------------+------------------+---+--------------------+--------------------+--------------------+
|mode|              genres|       acousticness|       danceability|       duration_ms|             energy|    instrumentalness|           liveness|           loudness|         speechiness|             tempo|            valence|        popularity|key|            features|     scaled_features|        pca_features|
+----+--------------------+-------------------+-------------------+------------------+-------------------+--------------------+-------------------+-------------------+--------------------+------------------+-------------------+------------------+---+--------------------+--------------------+--------------------+
|   1|21st century clas...| 0.9793333333333332|0.162883333

In [191]:
dados_generos_pca.select('pca_features').show(truncate=False)

+-----------------------------------------+
|pca_features                             |
+-----------------------------------------+
|[2.507095366888567,0.4381691373769795]   |
|[-0.5969679056633482,4.981612052751348]  |
|[-4.158460276223561,-0.836652508107994]  |
|[-2.387344878512217,-0.4877989015663402] |
|[-2.6501218371679087,-0.5756819768820469]|
|[-1.4965091203367626,1.8644183183717793] |
|[-3.9235207721573238,0.2851835002352837] |
|[-4.611011109831115,-0.6783790472312371] |
|[-2.8376900630842297,-0.5712993716580516]|
|[-2.706690139892783,-1.2593788079708297] |
|[-4.698331383924287,1.2765569680619446]  |
|[-3.3759874966798677,0.7560741064307468] |
|[-5.608998877066022,1.0427311644393218]  |
|[0.2954946352117687,-0.2763864586236307] |
|[-2.5725591062870423,-1.3169815431109795]|
|[-3.4008228020493454,0.5073029625781903] |
|[-4.366720316263419,-0.336482705977109]  |
|[-2.7254698167724003,0.5058604987046363] |
|[-4.958112358381605,1.2627579957290729]  |
|[-3.693495184642272,1.382276206

In [192]:
from pyspark.ml.clustering import KMeans

In [193]:
# Vamos selecionar a criação de 5 clusters:
SEED  = 1250
kmeans = KMeans(featuresCol = 'pca_features', predictionCol='cluster_pca').setK(5).setSeed(SEED)

In [194]:
model_kmeans = kmeans.fit(dados_generos_pca)

In [195]:
# Vamos entrar criar então:
predictions_kmeans = model_kmeans.transform(dados_generos_pca)

In [196]:
# Verificando o dataframe com os clusters:
predictions_kmeans.select('pca_features','cluster_pca').show()

+--------------------+-----------+
|        pca_features|cluster_pca|
+--------------------+-----------+
|[2.50709536688856...|          4|
|[-0.5969679056633...|          4|
|[-4.1584602762235...|          3|
|[-2.3873448785122...|          3|
|[-2.6501218371679...|          3|
|[-1.4965091203367...|          1|
|[-3.9235207721573...|          3|
|[-4.6110111098311...|          0|
|[-2.8376900630842...|          3|
|[-2.7066901398927...|          3|
|[-4.6983313839242...|          2|
|[-3.3759874966798...|          3|
|[-5.6089988770660...|          0|
|[0.29549463521176...|          4|
|[-2.5725591062870...|          3|
|[-3.4008228020493...|          3|
|[-4.3667203162634...|          0|
|[-2.7254698167724...|          3|
|[-4.9581123583816...|          2|
|[-3.6934951846422...|          2|
+--------------------+-----------+
only showing top 20 rows



#### Plotando o Clustering

In [197]:
from pyspark.ml.functions import vector_to_array

In [198]:
pca_features_xy = predictions_kmeans.withColumn('x', vector_to_array('pca_features')[0])\
    .withColumn('y', vector_to_array('pca_features')[1])\
    .select('x','y','cluster_pca','genres')

In [199]:
pca_features_xy.show()

+-------------------+-------------------+-----------+--------------------+
|                  x|                  y|cluster_pca|              genres|
+-------------------+-------------------+-----------+--------------------+
|  2.507095366888567| 0.4381691373769795|          4|21st century clas...|
|-0.5969679056633482|  4.981612052751348|          4|               432hz|
| -4.158460276223561| -0.836652508107994|          3|               8-bit|
| -2.387344878512217|-0.4877989015663402|          3|                  []|
|-2.6501218371679087|-0.5756819768820469|          3|          a cappella|
|-1.4965091203367626| 1.8644183183717793|          1|            abstract|
|-3.9235207721573238| 0.2851835002352837|          3|      abstract beats|
| -4.611011109831115|-0.6783790472312371|          0|    abstract hip hop|
|-2.8376900630842297|-0.5712993716580516|          3|           accordeon|
| -2.706690139892783|-1.2593788079708297|          3|           accordion|
| -4.698331383924287| 1.2

In [200]:
fig = px.scatter(pca_features_xy.toPandas(), x = 'x', y = 'y', color = 'cluster_pca', hover_data = ['x','y','genres'])
fig.show()

In [201]:
# Podemos selecionar o estágio 2 do nosso pipeline (que é o último estágio, o do PCA), e verificarmos a variância explicada:
pca_pipeline_model.stages[2].explainedVariance

# O resultado abaixo diz que o primeiro componente explica apenas 3 variáveis, e que o segundo componente explica apenas 1; Esse
# pode ser o motivo pelo qual a divisão não está tão clara.

DenseVector([0.2975, 0.1212])

## Clusterização por Música

In [202]:
dados.show()

+------------------+----+------------+--------------+------------------+-----------+------------------+--------+--------------------+--------------------+---+--------+-------------------+----+--------------------+----------+-----------+------------------+--------------------+
|           valence|year|acousticness|       artists|      danceability|duration_ms|            energy|explicit|                  id|    instrumentalness|key|liveness|           loudness|mode|                name|popularity|speechiness|             tempo|        artists_song|
+------------------+----+------------+--------------+------------------+-----------+------------------+--------+--------------------+--------------------+---+--------+-------------------+----+--------------------+----------+-----------+------------------+--------------------+
|             0.285|2000|     0.00239|      Coldplay|             0.429|     266773|0.6609999999999999|       0|3AJwUDP919kvQ9Qco...|             1.21E-4| 11|   0.234|  

In [203]:
# Vamos assim como antes, vetorizar os nossos dados:

X = dados.columns
X.remove('artists')
X.remove('id')
X.remove('name')
X.remove('artists_song')
X

['valence',
 'year',
 'acousticness',
 'danceability',
 'duration_ms',
 'energy',
 'explicit',
 'instrumentalness',
 'key',
 'liveness',
 'loudness',
 'mode',
 'popularity',
 'speechiness',
 'tempo']

In [204]:
dados_encoded_vetor = VectorAssembler(inputCols=X, outputCol = 'features').transform(dados)

In [205]:
dados_encoded_vetor.select('features').show(truncate = False, n = 5)

+-------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                 |
+-------------------------------------------------------------------------------------------------------------------------+
|[0.285,2000.0,0.00239,0.429,266773.0,0.6609999999999999,0.0,1.21E-4,11.0,0.234,-7.227,1.0,84.0,0.0281,173.372]           |
|[0.613,2000.0,0.143,0.843,270507.0,0.8059999999999999,1.0,0.0,4.0,0.0771,-5.9460000000000015,0.0,80.0,0.269,94.948]      |
|[0.4,2000.0,0.00958,0.556,216880.0,0.8640000000000001,0.0,0.0,3.0,0.209,-5.87,0.0,84.0,0.0584,105.143]                   |
|[0.5429999999999999,2000.0,0.00664,0.545,233933.0,0.865,0.0,1.1E-5,11.0,0.168,-5.7079999999999975,0.0,78.0,0.0286,99.009]|
|[0.76,2000.0,0.0302,0.949,284200.0,0.6609999999999999,1.0,0.0,5.0,0.0454,-4.244,0.0,80.0,0.0572,104.504]                 |
+-------

In [206]:
# O último passo antes do PCA é escalar os nossos dados:

scaler = StandardScaler(inputCol = 'features', outputCol = 'features_scaled')
model_scaler = scaler.fit(dados_encoded_vetor)
dados_musicas_scaler = model_scaler.transform(dados_encoded_vetor)

In [207]:
dados_musicas_scaler.select('features_scaled').show(truncate = False, n = 5)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features_scaled                                                                                                                                                                                                                                                               |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[1.156596679221045,329.8170613984441,0.008403188822334736,2.5040545743834373,3.7858811737298526,2.980511298201045,0.0,6.070790766862789E-4,3.0696424994844875,1.4832857988766257,-1.

In [208]:
# Chegou a hora do PCA! Antes, usamos apenas 2 componentes principais para conseguirmos plotar um gráfico. Mas como o resultado
# não foi satisfatório, vamos usar mais folds. Começaremos usando 15 componentes (sem redução por enquanto):

pca = PCA(k=15, inputCol = 'features_scaled', outputCol = 'pca_features')

In [209]:
model_pca = pca.fit(dados_musicas_scaler)
dados_musicais_pca = model_pca.transform(dados_musicas_scaler)

In [210]:
# Como esperado, a explicação da variância é praticamente completa:
sum(model_pca.explainedVariance)*100

99.99999999999999

In [211]:
# Vamos testar a explicação da variância com vários valores de componentes escolhidas:

lista_valores = [sum(model_pca.explainedVariance[0:i+1]) for i in range(15)]
lista_valores

# Cada valor abaixo é a variância explicada conforme adicionamos mais um componente:

[0.21297354127107343,
 0.34597741622955064,
 0.43989805678254446,
 0.5267820578269102,
 0.6001038103117928,
 0.6665719513652388,
 0.7282174213581727,
 0.7845103843321022,
 0.8350002357002613,
 0.8812446547406768,
 0.9151737562703046,
 0.9435370133056037,
 0.9699018569070159,
 0.9900947792885578,
 0.9999999999999999]

In [212]:
import numpy as np

In [213]:
# Quantas componentes foram necessárias para explicarmos 70%: 
k = sum(np.array(lista_valores) <= 0.7)
k
# ou seja, podemos usar 6 componentes para explicar 70%.

6

In [214]:
pca = PCA(k=6, inputCol='features_scaled', outputCol='pca_features')
model_pca = pca.fit(dados_musicas_scaler)
dados_musicas_pca_final = model_pca.transform(dados_musicas_scaler)

In [215]:
dados_musicas_pca_final.select('pca_features').show(truncate = False, n = 5)

+----------------------------------------------------------------------------------------------------------------------+
|pca_features                                                                                                          |
+----------------------------------------------------------------------------------------------------------------------+
|[-34.71004464775703,-165.36848784906184,-11.163498729833321,-139.11374280854437,-14.152192300931375,6.254422223810392]|
|[-36.498857627749054,-167.85185045616558,-10.729023248907561,-135.904352656713,-13.388401890587229,3.972980727830861] |
|[-35.460300916940554,-165.99633344577887,-11.361290348241594,-138.24276881247,-13.866654919305782,4.342675920458719]  |
|[-35.56301325520583,-165.59515096480607,-11.46022600703138,-137.52095740570041,-15.56854280392272,4.542073725584272]  |
|[-36.54556974907567,-167.37512505802482,-11.881276527236555,-136.27798590243984,-14.056847990344538,3.583390145779156]|
+-------------------------------

In [216]:
# Verificando o quanto explicamos:

sum(model_pca.explainedVariance)*100

66.65719513652388

## Criação dos Clusters

In [217]:
pca_pipeline = Pipeline(stages=[VectorAssembler(inputCols=X, outputCol='features'),
                                StandardScaler(inputCol='features', outputCol='features_scaled'),
                                PCA(k=6, inputCol='features_scaled', outputCol='pca_features')])


In [218]:
model_pca_pipeline = pca_pipeline.fit(dados)

In [219]:
projection = model_pca_pipeline.transform(dados)

In [220]:
projection.select('pca_features').show(truncate=False, n = 5)

+----------------------------------------------------------------------------------------------------------------------+
|pca_features                                                                                                          |
+----------------------------------------------------------------------------------------------------------------------+
|[-34.71004464775703,-165.36848784906184,-11.163498729833321,-139.11374280854437,-14.152192300931375,6.254422223810392]|
|[-36.498857627749054,-167.85185045616558,-10.729023248907561,-135.904352656713,-13.388401890587229,3.972980727830861] |
|[-35.460300916940554,-165.99633344577887,-11.361290348241594,-138.24276881247,-13.866654919305782,4.342675920458719]  |
|[-35.56301325520583,-165.59515096480607,-11.46022600703138,-137.52095740570041,-15.56854280392272,4.542073725584272]  |
|[-36.54556974907567,-167.37512505802482,-11.881276527236555,-136.27798590243984,-14.056847990344538,3.583390145779156]|
+-------------------------------

In [221]:
kmeans = KMeans(k=50, featuresCol = 'pca_features', predictionCol = 'cluster_pca', seed = SEED)

In [222]:
modelo_kmeans = kmeans.fit(projection)

In [223]:
projection_kmeans = modelo_kmeans.transform(projection)

In [224]:
projection_kmeans.select('pca_features','cluster_pca').show()

+--------------------+-----------+
|        pca_features|cluster_pca|
+--------------------+-----------+
|[-34.710044647757...|          8|
|[-36.498857627749...|         11|
|[-35.460300916940...|         34|
|[-35.563013255205...|         34|
|[-36.545569749075...|         36|
|[-36.713222290262...|         16|
|[-36.013246178822...|         18|
|[-36.542687712104...|         13|
|[-36.425249009784...|         15|
|[-35.872074915770...|         25|
|[-31.639065936568...|         28|
|[-35.661446890546...|          7|
|[-35.574542234850...|         14|
|[-36.022399748656...|         18|
|[-34.336941950985...|         42|
|[-35.096906055142...|         26|
|[-35.202423054032...|          6|
|[-34.942948671026...|          6|
|[-36.617404517517...|         11|
|[-34.296973387410...|         39|
+--------------------+-----------+
only showing top 20 rows



## Sistema de Recomendação

In [225]:
nome_musica = 'Taylor Swift - Blank Space'

In [226]:
# Precisamos descobrir em qual cluster essa música está:

cluster = projection_kmeans.filter(projection_kmeans.artists_song == nome_musica).select('cluster_pca').collect()[0][0]
cluster

29

In [227]:
musicas_recomendadas = projection_kmeans.filter(projection_kmeans.cluster_pca == cluster)\
    .select('artists_song','id','pca_features')

musicas_recomendadas.show()

+--------------------+--------------------+--------------------+
|        artists_song|                  id|        pca_features|
+--------------------+--------------------+--------------------+
|Train - Drops of ...|2hKdd3qO7cWr2Jo0B...|[-34.321717368827...|
|John Mayer - Your...|7vFv0yFGMJW3qVXbA...|[-35.165989489466...|
|Maroon 5 - She Wi...|4llK75pXNWZz6KAho...|[-34.809059512962...|
|Avril Lavigne - C...|5xEM5hIgJ1jjgcEBf...|[-35.035756557524...|
|Maroon 5 - She Wi...|7sapKrjDij2fpDVj0...|[-35.295324738918...|
|Good Charlotte - ...|2g2a5kDeZexbUTD8a...|[-35.821614291214...|
|Red Hot Chili Pep...|1ndGB6rvxKYN9seCY...|[-35.458611626516...|
|Ricardo Arjona - ...|5UJsYyBi0CdSJl0ul...|[-34.355738031951...|
|Beyoncé - Crazy I...|5IVuqXILoxVWvWEPm...|[-35.828451654161...|
|     OutKast - Roses|6bUNEbXT7HovLW6Bg...|[-35.559526721657...|
|Usher - Yeah! (fe...|5rb9QrpfcKFHM1EUb...|[-36.050436933493...|
|Kelly Clarkson - ...|3xrn9i8zhNZsTtcoW...|[-35.191226411336...|
|Modest Mouse - Fl...|2lw

In [228]:
componentes_musica = musicas_recomendadas.filter(musicas_recomendadas.artists_song == nome_musica)\
    .select('pca_features').collect()[0][0]

componentes_musica

DenseVector([-35.461, -166.9658, -12.7925, -137.9457, -13.1234, 4.1374])

In [229]:
from scipy.spatial.distance import euclidean
from pyspark.sql.types import FloatType

def calcula_distance(value):
    return euclidean(componentes_musica, value)
    
udf_calcula_distance = f.udf(calcula_distance, FloatType()) 

musicas_recomendadas_dist = musicas_recomendadas.withColumn('Dist', udf_calcula_distance('pca_features'))

In [230]:
recomendadas = sessao_spark.createDataFrame(musicas_recomendadas_dist.sort('Dist').take(10)).select(['artists_song', 'id', 'Dist'])

Py4JJavaError: An error occurred while calling o2627.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 417.0 failed 1 times, most recent failure: Lost task 0.0 in stage 417.0 (TID 391) (Vinicius executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:99)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.isEmpty(Iterator.scala:387)
	at scala.collection.Iterator.isEmpty$(Iterator.scala:387)
	at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1431)
	at scala.collection.TraversableOnce.nonEmpty(TraversableOnce.scala:143)
	at scala.collection.TraversableOnce.nonEmpty$(TraversableOnce.scala:143)
	at scala.collection.AbstractIterator.nonEmpty(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2(RDD.scala:1556)
	at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2$adapted(RDD.scala:1555)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:907)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:907)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(Unknown Source)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
	... 34 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2493)
	at org.apache.spark.rdd.RDD.$anonfun$reduce$1(RDD.scala:1136)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1118)
	at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$1(RDD.scala:1565)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1552)
	at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:291)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4160)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4157)
	at sun.reflect.GeneratedMethodAccessor133.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:99)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.isEmpty(Iterator.scala:387)
	at scala.collection.Iterator.isEmpty$(Iterator.scala:387)
	at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1431)
	at scala.collection.TraversableOnce.nonEmpty(TraversableOnce.scala:143)
	at scala.collection.TraversableOnce.nonEmpty$(TraversableOnce.scala:143)
	at scala.collection.AbstractIterator.nonEmpty(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2(RDD.scala:1556)
	at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2$adapted(RDD.scala:1555)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:907)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:907)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(Unknown Source)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
	... 34 more


In [231]:
# Criando uma função para recomendação:

def recomendador(nome_musica):
    cluster = projection_kmeans.filter(projection_kmeans.artists_song == nome_musica).select('cluster_pca').collect()[0][0]
    musicas_recomendadas = projection_kmeans.filter(projection_kmeans.cluster_pca == cluster)\
                                           .select('artists_song', 'id', 'pca_features')
    componenetes_musica = musicas_recomendadas.filter(musicas_recomendadas.artists_song == nome_musica)\
                                          .select('pca_features').collect()[0][0]
    
    
    def calcula_distance(value):
        return euclidean(componentes_musica, value)
    
    udf_calcula_distance = f.udf(calcula_distance, FloatType()) 

    musicas_recomendadas_dist = musicas_recomendadas.withColumn('Dist', udf_calcula_distance('pca_features'))

    recomendadas = sessao_spark.createDataFrame(musicas_recomendadas_dist.sort('Dist').take(10)).select(['artists_song', 'id', 'Dist'])
    
    recomendadas.select('artists_song').show(truncate = False)
    

In [232]:
recomendador('Taylor Swift - Blank Space')

Py4JJavaError: An error occurred while calling o2677.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 420.0 failed 1 times, most recent failure: Lost task 0.0 in stage 420.0 (TID 394) (Vinicius executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:99)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.isEmpty(Iterator.scala:387)
	at scala.collection.Iterator.isEmpty$(Iterator.scala:387)
	at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1431)
	at scala.collection.TraversableOnce.nonEmpty(TraversableOnce.scala:143)
	at scala.collection.TraversableOnce.nonEmpty$(TraversableOnce.scala:143)
	at scala.collection.AbstractIterator.nonEmpty(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2(RDD.scala:1556)
	at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2$adapted(RDD.scala:1555)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:907)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:907)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(Unknown Source)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
	... 34 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2493)
	at org.apache.spark.rdd.RDD.$anonfun$reduce$1(RDD.scala:1136)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1118)
	at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$1(RDD.scala:1565)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1552)
	at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:291)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4160)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4157)
	at sun.reflect.GeneratedMethodAccessor133.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:99)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.isEmpty(Iterator.scala:387)
	at scala.collection.Iterator.isEmpty$(Iterator.scala:387)
	at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1431)
	at scala.collection.TraversableOnce.nonEmpty(TraversableOnce.scala:143)
	at scala.collection.TraversableOnce.nonEmpty$(TraversableOnce.scala:143)
	at scala.collection.AbstractIterator.nonEmpty(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2(RDD.scala:1556)
	at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2$adapted(RDD.scala:1555)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:907)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:907)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(Unknown Source)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
	... 34 more
