# Spark SQL: funciones

#### Máster en Data Science y Big Data, AFI Escuela de Finanzas
#### Claudia Quintana Wong

## Descripción de ficheros

Fichero albums.tsv:
* id: Identificador único del disco.
* title: Título del disco.

Fichero artists.tsv:

* id: Identificador único del artista.
* name: Nombre del artista.
* hotness: Nivel de popularidad del artista.
* familiarity: Reconocimiento del artista.
* location: Ubicación del artista.


Fichero songs.tsv:

* id: Identificador único de la canción.
* title: Título de la canción.
* year: Año de publicación de la canción.
* hotness: Popularidad de la canción.
* id_artist: Identificador único del artista de la canción.
* id_album: Identificador único del álbum de la canción.
* duration: Duración en segundos de la canción.
* end_of_fade_in: Segundo de la canción en el que termina el fade in.
* start_of_fade_out: Segundo de la canción en el que empieza el fade out.
* tempo: Tempo de la canción.
* time_signature: Número de tiempos por compás de la canción.
* key: Escala de la canción (de 0 a 11).
* loudness: Volumen de la canción.
* mode: Tipo de escala de la canción (mayor = 0 o menor = 1)
* style: Estilo de la canción.

## Carga de datos

In [1]:
from pyspark.sql.types import Row
from pyspark.sql import functions as F

In [2]:
def albumParser(line):
    fields = line.split('\t')
    return Row(id = fields[0], title = fields[1])

def artistParser(line):
    fields = line.split('\t')
    return Row(id=fields[0], 
               name = fields[1], 
               hotness = float(fields[2]), 
               familiarity = fields[3], 
               location = fields[4])

def songParser(line):
    fields = line.split('\t')
    return Row(id = fields[0],
               title = fields[1],
               year = int(fields[2]),
               hotness = float(fields[3]) if fields[3] != 'NA' else 0,
               id_artist = fields[4],
               id_album = fields[5],
               duration = float(fields[6]),
               end_of_fade_in = float(fields[7]),
               start_of_fade_out = float(fields[8]),
               tempo = float(fields[9]),
               time_signature = fields[10],
               key = fields[11],
               loudness = float(fields[12]),
               mode = int(fields[13]),
               style = fields[14])

In [3]:
data_folder = './data'

albums = sc.textFile(f'{data_folder}/albums.tsv')
artists = sc.textFile(f'{data_folder}/artists.tsv')
songs = sc.textFile(f'{data_folder}/songs.tsv')

In [4]:
albums_rows = albums.map(albumParser)
artists_rows = artists.map(artistParser)
songs_rows = songs.map(songParser)

In [5]:
album_df = spark.createDataFrame(albums_rows)
artist_df = spark.createDataFrame(artists_rows)
song_df = spark.createDataFrame(songs_rows)

### Inspección de los dataframes

In [6]:
album_df.dtypes

[('id', 'string'), ('title', 'string')]

In [7]:
artist_df.dtypes

[('familiarity', 'string'),
 ('hotness', 'double'),
 ('id', 'string'),
 ('location', 'string'),
 ('name', 'string')]

In [8]:
song_df.dtypes

[('duration', 'double'),
 ('end_of_fade_in', 'double'),
 ('hotness', 'double'),
 ('id', 'string'),
 ('id_album', 'string'),
 ('id_artist', 'string'),
 ('key', 'string'),
 ('loudness', 'double'),
 ('mode', 'bigint'),
 ('start_of_fade_out', 'double'),
 ('style', 'string'),
 ('tempo', 'double'),
 ('time_signature', 'string'),
 ('title', 'string'),
 ('year', 'bigint')]

Es necesario aclarar que la interpretación dada a cada una de las preguntas en el ejercicio de Spark-Core es la misma aplicada en este ejercicio.

### 1. ¿Cuál es el estilo más rápido (tempo) en media?

Para dar respuesta a esta consulta seguiremos los siguientes pasos:

1. Agrupar las canciones por estilo
2. Hallar la media en cada grupo teniendo en cuenta la variable *tempo*
3. Ordenar de manera ascendente de acuerdo a la media calculada y tomar el primer elemento.

In [9]:
grouped_df = song_df.select('style', 'tempo').groupBy('style').mean()
grouped_df.orderBy('avg(tempo)').take(1)

[Row(style='rebetika', avg(tempo)=47.447)]

### 2. ¿Cuales son los 5 artistas, ubicados en UK (cualquier territorio de UK), con mayor número de canciones en escala menor (mode = 1)?

Para dar respuesta a esta interrogante seguiremo los siguientes pasos:

1. Obtener los artistas ubicados en UK (cualquier territorio de UK)
2. Seleccionar las canciones en escala menor
3. Mezclar los artistas de UK con las canciones seleccionadas
4. Hacer un conteo de canciones por artista
5. Ordenar descendientemente los artistas de acuerdo al conteo y seleccionar los 5 primeros

In [10]:
uk_artists = artist_df.where(artist_df.location.contains('UK') | 
                             artist_df.location.contains('United Kingdom') |
                             artist_df.location.contains('England')).select('name', 'location', (artist_df.id).alias('id_artist'))
minor_songs = song_df.where(song_df.mode == 1).select('id_artist', 'mode')
joined = uk_artists.join(minor_songs, on=['id_artist'], how='inner')
joined.groupBy(['id_artist', 'name']).sum().orderBy('sum(mode)', ascending=False).take(5)

[Row(id_artist='AR9W3X91187FB3994C', name='Phil Collins', sum(mode)=9),
 Row(id_artist='ARH6W4X1187B99274F', name='Radiohead', sum(mode)=7),
 Row(id_artist='ARFCUN31187B9AD578', name='The Rolling Stones', sum(mode)=7),
 Row(id_artist='ARAIABB1187B9AC6E2', name='Seal', sum(mode)=6),
 Row(id_artist='ARD8JVH1187FB4DA04', name='Bad Company', sum(mode)=6)]

### 3. Desde 1970 hasta hoy, ¿las canciones son más rápidas (tempo), altas (loudness) y cortas (duration) en media? Ordena los resultados por año ascendente.

Se seguirán los siguientes pasos:

1. Seleccionar las canciones de 1970 hasta hoy
2. Seleccionar las columnas: *tempo*, *loudness*, *duration*
3. Agrupar las canciones por año y calcular la media de las variables selecciondas
4. Ordenar las canciones ascendentemente según el año de lanzamiento

In [11]:
song_time = song_df.select('id', 'year', 'tempo', 'loudness', 'duration').where(song_df.year >= 1970)
grouped = song_time.groupBy('year').mean().select('year', 'avg(tempo)', 'avg(loudness)', 'avg(duration)')
grouped.orderBy('year').show()

+----+------------------+-------------------+------------------+
|year|        avg(tempo)|      avg(loudness)|     avg(duration)|
+----+------------------+-------------------+------------------+
|1970|121.34628571428571| -11.92847619047619|231.42578619047623|
|1971|         136.16196|-12.153000000000002|259.55428919999997|
|1972|129.17204166666667|-11.719291666666665|238.54539750000004|
|1973|        116.356125|-11.711541666666667|294.16444416666667|
|1974|125.08609090909088|-10.670681818181817|239.49134636363635|
|1975|125.41183333333333|-11.249541666666666| 277.4406354166667|
|1976|137.26139999999998|           -11.6584| 210.9940493333333|
|1977|139.33685714285713|-11.820114285714284|255.30692800000003|
|1978|         134.38385|           -10.1125|247.85456749999997|
|1979|137.51694444444445|-11.879083333333334| 226.0566886111111|
|1980|126.89337499999999|       -11.09853125|     210.438730625|
|1981|127.96074999999999|-11.570444444444446|211.69224499999999|
|1982|         125.14522|

### 4. ¿Cuál es el estilo que más abusa de los efectos de fade in y fade out (mayor número de segundos desde inicio al final del fade in más desde el inicio del fade out al final de la canción)?

Se utiliza la misma expresión de cálculo empleada para resolver la consulta en Spark Core.

$$end\_of\_fade\_in + duration - start\_of\_fade\_out$$

Para encontrar el estilo que más abusa de estos efectos se determina la canción que cumple este requisito y se devuelve el estilo asociado.

Los pasos a seguir son:

1. Calcular la expresión para cada canción de la base de datos
2. Ordenar de manera descendente según la métrica calculada
3. Devolver el estilo de la primera observación en el resultado anterio

In [12]:
fades_effect = song_df.select('id', 'title', 'style', 
               (song_df.end_of_fade_in + song_df.duration - song_df.start_of_fade_out)
               .alias('fades_effect')).orderBy('fades_effect', ascending=False).take(1)


### Intepretación 2
El estilo que más abusa de los efectos fades es aquel en que sus canciones abusan más de este efecto en media

In [13]:
fades_effect_1 = song_df.select('id', 'title', 'style', 
               (song_df.end_of_fade_in + song_df.duration - song_df.start_of_fade_out)
               .alias('fades_effect'))

fades_effect = fades_effect_1.groupBy('style').mean().orderBy('avg(fades_effect)', ascending=False)
fades_effect.take(1)

[Row(style='industrial dance', avg(fades_effect)=61.67226499999998)]

### 5. ¿Cual es la canción más popular (hotness) de los 5 artistas más populares (hotness)?

Para resolver esta consulta se siguieron los siguientes pasos:

1. Ordenar los artistas según su popularidad y tomar los 5 más populares.
2. Agrupar las canciones por artista y quedarnos con la que mayor popularidad tiene en cada grupo
3. Hacer un join entre la lista de los 5 artistas más populares y la lista de la canción más popular por artista

El resultado final se compone de tuplas de la forma:

$$id\_artist, artist\_name, artist\_hotness, song\_id, song\_title, song\_hotness$$

In [14]:
top_5_artists = artist_df.select((artist_df.id).alias('id_artist'), 'name', 'hotness').orderBy('hotness', ascending=False).take(5)
top_5_artists= spark.createDataFrame(top_5_artists)
top_songs_per_artist = song_df.filter(song_df.hotness.isNotNull()).select(
    'id_artist', 
    'title', 
    (song_df.hotness).alias('song_hotness')).join(
    top_5_artists, on='id_artist').groupBy('id_artist').max()

In [15]:
join = song_df.select('id', 
                      'id_artist', 
                      'title',
                      song_df.hotness.alias('max(song_hotness)')).join(
    top_songs_per_artist,
    on=['id_artist', 'max(song_hotness)']).join(
    artist_df.select(artist_df.id.alias('id_artist'), 'name', 'hotness'), 
    on=['id_artist']).select('id_artist', 
                             'name', 'hotness',
                             'id', 'title', 
                             'max(song_hotness)').orderBy('hotness', ascending=False).show()

+------------------+---------------+-----------+------------------+--------------------+-----------------+
|         id_artist|           name|    hotness|                id|               title|max(song_hotness)|
+------------------+---------------+-----------+------------------+--------------------+-----------------+
|ARRH63Y1187FB47783|     Kanye West|1.082502557|SOJMUAN12AB0183911|       Street Lights|      0.814517241|
|ARF8HTQ1187B9AE693|      Daft Punk|1.021255588|SONJBQX12A6D4F8382|             Da Funk|        0.8622545|
|ARTDQRC1187FB4EFD4|Black Eyed Peas|1.005941966|SOCHRXB12A8AE48069|Let's Get It Started|      0.624425493|
|ARS54I31187FB46721|   Taylor Swift|0.922412443|SOTNWCI12AAF3B2028| The Way I Loved You|      0.853828893|
|ARJ7KF01187B98D717|       Coldplay|0.916053228|SOEHTZE12A6310F0F2|          One I Love|      0.810263613|
+------------------+---------------+-----------+------------------+--------------------+-----------------+

