# Desafio 1
## Install pyspark

In [2]:
%pip install pyspark

Note: you may need to restart the kernel to use updated packages.


## Initialize spark session

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Desafio_1').getOrCreate()

23/12/11 13:18:09 WARN Utils: Your hostname, ghd8 resolves to a loopback address: 127.0.1.1; using 192.168.0.115 instead (on interface wlp4s0)
23/12/11 13:18:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/11 13:18:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Load dataframe

In [4]:
df = spark.read.csv("content/spotify.csv", header=True, inferSchema=True)
df.printSchema()

root
 |-- Unnamed: 0: integer (nullable = true)
 |-- track_id: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- album_name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- duration_ms: string (nullable = true)
 |-- explicit: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- key: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- speechiness: string (nullable = true)
 |-- acousticness: string (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: string (nullable = true)
 |-- valence: string (nullable = true)
 |-- tempo: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- track_genre: string (nullable = true)



## Select interest columns

In [5]:
selected_columns = [
    'track_id', 
    'artists', 
    'track_name', 
    'album_name',
    'popularity', 
    'duration_ms',
    'explicit', 
    'danceability', 
    'energy', 
    'loudness', 
    'speechiness',
    'acousticness',
    'liveness',
    'valence',
    'track_genre'
    ]

f_df = df.select(*selected_columns)
f_df.printSchema()

root
 |-- track_id: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- duration_ms: string (nullable = true)
 |-- explicit: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- speechiness: string (nullable = true)
 |-- acousticness: string (nullable = true)
 |-- liveness: string (nullable = true)
 |-- valence: string (nullable = true)
 |-- track_genre: string (nullable = true)



## Clean dataframe

### Inconsistent values

#### Popularity

In [6]:
from pyspark.sql.functions import col, asc

f_df.select('popularity').where(~col('popularity').rlike('^[0-9]+$')).show(df.count(), truncate=False) # Non number popularity


+------------------------------------------------------+
|popularity                                            |
+------------------------------------------------------+
| Pt. 1) [Music from the Original TV Series]"          |
| Adagio ""Nimrod"" (Arr. J. Meisl for String Quartet)"|
| Op. 46"                                              |
| Op. 310"                                             |
| Niedermeier & Whitehead                              |
| wann wird die Zeit erscheinen?"""                    |
| frohlocket"""                                        |
| wann wird die Zeit erscheinen?"""                    |
| Op. 2: Coda. Alla Polacca - Live"                    |
| o starker König"""                                   |
| frohlocket"""                                        |
| o starker König"""                                   |
| frohlocket"""                                        |
| Op.52 No.6                                           |
| Op.52 No.6                   

#### Track genre

In [7]:

f_df.select('track_genre').where(col('track_genre').rlike('^[a-zA-Z\- ]+$')).show(df.count(), truncate=False) # Number genre


+-----------------+
|track_genre      |
+-----------------+
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |
|acoustic         |


#### Acousticness

In [8]:
f_df.select('acousticness').where(~col('acousticness').rlike('^[0-9.\-e]+$')).show()

+------------+
|acousticness|
+------------+
|  Amonasro)"|
|       False|
|    Popolo)"|
+------------+



#### Energy

In [9]:
f_df.select('energy').where(~col('energy').rlike('^[0-9.\-e]+$')).show()

+--------------------+
|              energy|
+--------------------+
|Hearts And Soul -...|
|               False|
|               False|
|               False|
|               False|
|               False|
|               False|
|               False|
|               False|
|               False|
|               False|
|               False|
|               False|
|               False|
|               False|
|                Aida|
|              Coro)"|
|               False|
|               False|
|               False|
+--------------------+
only showing top 20 rows



### Filter inconsistent values

In [10]:

f_df = (
    f_df
    .withColumn('popularity', col('popularity').cast('int'))
    .withColumn('duration_ms', col('duration_ms').cast('int'))
    .withColumn('danceability', col('danceability').cast('double'))
    .withColumn('energy', col('energy').cast('double'))
    .withColumn('loudness', col('loudness').cast('double'))
    .withColumn('speechiness', col('speechiness').cast('double'))
    .withColumn('acousticness', col('acousticness').cast('double'))
    .withColumn('liveness', col('liveness').cast('double'))
    .withColumn('valence', col('valence').cast('double'))
)

f_df = f_df.where(col('track_genre').rlike('^[a-zA-Z\- ]+$'))

f_df.printSchema()

f_df.dropna() # remove all rows with null values

root
 |-- track_id: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- duration_ms: integer (nullable = true)
 |-- explicit: string (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- track_genre: string (nullable = true)



DataFrame[track_id: string, artists: string, track_name: string, popularity: int, duration_ms: int, explicit: string, danceability: double, energy: double, loudness: double, speechiness: double, acousticness: double, liveness: double, valence: double, track_genre: string]

## Write filtered dataframe to file

In [11]:
f_df.coalesce(1).write.csv('content/.f_spotifyzipcodes', header=True, mode='overwrite')

                                                                                

In [18]:
from pyspark.sql.functions import avg, col

genre_avg_popularity = f_df \
                            .groupBy('track_genre') \
                            .agg(avg(col('popularity')).alias('avg_popularity'))\
                            .orderBy(col('avg_popularity'))

genres = [row['track_genre'] for row in genre_avg_popularity.limit(5).collect()]

print(genres)
result = []

for genre in genres:
    result.append(f_df.select('artists', 'track_name', 'popularity', 'track_genre').orderBy(col('popularity').desc()).where(col('track_genre') == genre).collect()[0])

for genre in result:
    print(genre['popularity'])
    
print(result)

['iranian', 'romance', 'latin', 'detroit-techno', 'chicago-house']
33
35
98
58
78
[Row(artists='Porya Hatami', track_name='Kani (Day)', popularity=33, track_genre='iranian'), Row(artists='Valery Obodzinsky', track_name='Эти глаза напротив', popularity=35, track_genre='romance'), Row(artists='Manuel Turizo', track_name='La Bachata', popularity=98, track_genre='latin'), Row(artists='Inner City', track_name='Good Life', popularity=58, track_genre='detroit-techno'), Row(artists='Aaron Smith;Krono;Luvli', track_name='Dancin (feat. Luvli) - Krono Remix', popularity=78, track_genre='chicago-house')]


In [54]:
genre_avg_popularity = f_df.groupBy('artists') \
    .agg(avg(col('popularity')).alias('avg_popularity')) \
    .orderBy(col('avg_popularity').desc())

artists = [row['artists'] for row in genre_avg_popularity.limit(5).collect()]
avg_popularities = [row['avg_popularity'] for row in genre_avg_popularity.limit(5).collect()]

top_songs_per_artist = []

f_df.select('track_name').where(col('artists').contains('Eminem')).orderBy(col('popularity')).show()

+--------------------+
|          track_name|
+--------------------+
|             Twisted|
|"Love Me - From "...|
|        Kill For You|
|        Drama Setter|
|Last One Standing...|
|If I Get Locked U...|
|Dead Wrong (feat....|
|Outro (Obie Trice...|
|EPMD 2 (feat. Emi...|
|Caterpillar (feat...|
|Dead Wrong (feat....|
|Don't Aproach Me ...|
|Lean Back (feat. ...|
|My Name (feat. Em...|
|Gospel (with Eminem)|
|Last One Standing...|
|     I Need A Doctor|
|Last One Standing...|
|Venom - Music Fro...|
|What's The Differ...|
+--------------------+
only showing top 20 rows

