In [1]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col

In [2]:
spark = SparkSession.builder.appName("spark") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .config("spark.network.timeout", "600s") \
    .getOrCreate()

spark

24/12/11 10:24:53 WARN Utils: Your hostname, codespaces-f8c460 resolves to a loopback address: 127.0.0.1; using 10.0.0.182 instead (on interface eth0)
24/12/11 10:24:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/11 10:24:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
country_aliases = spark.read.option("header", True).csv("csv/country_aliases.csv")
data = spark.read.option("header", True).csv("csv/charts.csv")

In [4]:
data.show(5)

+----------+-------+--------+-------+--------------------+----------+------------------+--------+--------+------+
|      date|country|position|streams|            track_id|   artists|     artist_genres|duration|explicit|  name|
+----------+-------+--------+-------+--------------------+----------+------------------+--------+--------+------+
|2019-03-14|     ro|      26|  23430|2XKFnwB6djxrJCjR3...|['Khalid']|['pop r&b', 'pop']|  163973|   False|My Bad|
|2019-03-21|     ro|      26|  23264|2XKFnwB6djxrJCjR3...|['Khalid']|['pop r&b', 'pop']|  163973|   False|My Bad|
|2019-03-28|     ro|      21|  24878|2XKFnwB6djxrJCjR3...|['Khalid']|['pop r&b', 'pop']|  163973|   False|My Bad|
|2019-04-04|     ro|      42|  22087|2XKFnwB6djxrJCjR3...|['Khalid']|['pop r&b', 'pop']|  163973|   False|My Bad|
|2019-04-11|     ro|      46|  19601|2XKFnwB6djxrJCjR3...|['Khalid']|['pop r&b', 'pop']|  163973|   False|My Bad|
+----------+-------+--------+-------+--------------------+----------+------------------+

In [5]:
data = data.dropna(subset=["name"])
data = data.withColumn("duration_minutes", col("duration") / (1000 * 60))

In [6]:
country_aliases = country_aliases.select(col("alias"), col("country").alias("country_name"))

In [7]:
data.createOrReplaceTempView("data")
country_aliases.createOrReplaceTempView("country_aliases")

In [8]:
data = spark.sql(
"""
select *
from data
join country_aliases on data.country = country_aliases.alias
"""
)

In [9]:
columns_to_drop = ['country', 'alias', 'duration']
data = data.drop(*columns_to_drop)

In [10]:
data.show(5)

+----------+--------+-------+--------------------+----------+------------------+--------+------+------------------+------------+
|      date|position|streams|            track_id|   artists|     artist_genres|explicit|  name|  duration_minutes|country_name|
+----------+--------+-------+--------------------+----------+------------------+--------+------+------------------+------------+
|2019-03-14|      26|  23430|2XKFnwB6djxrJCjR3...|['Khalid']|['pop r&b', 'pop']|   False|My Bad|2.7328833333333336|     Romania|
|2019-03-21|      26|  23264|2XKFnwB6djxrJCjR3...|['Khalid']|['pop r&b', 'pop']|   False|My Bad|2.7328833333333336|     Romania|
|2019-03-28|      21|  24878|2XKFnwB6djxrJCjR3...|['Khalid']|['pop r&b', 'pop']|   False|My Bad|2.7328833333333336|     Romania|
|2019-04-04|      42|  22087|2XKFnwB6djxrJCjR3...|['Khalid']|['pop r&b', 'pop']|   False|My Bad|2.7328833333333336|     Romania|
|2019-04-11|      46|  19601|2XKFnwB6djxrJCjR3...|['Khalid']|['pop r&b', 'pop']|   False|My Bad|2

In [11]:
data.count()

                                                                                

2009631

In [12]:
data.createOrReplaceTempView("data")

24/12/11 10:25:09 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [26]:
data.s

+----------------+
|duration_minutes|
+----------------+
+----------------+



## Create tables

In [13]:
total_and_max_streams_by_country = spark.sql(
"""
select 
    country_name, 
    sum(streams) as total_streams, 
    max(streams) as max_stream, 
    count(*) as count_songs 
from data
group by country_name 
order by total_streams desc
"""
)

In [14]:
total_and_max_streams_by_country.show(5)

[Stage 10:>                                                         (0 + 4) / 4]

+-------------+----------------+----------+-----------+
| country_name|   total_streams|max_stream|count_songs|
+-------------+----------------+----------+-----------+
|       Global|2.44282798692E11|   9997563|      32737|
|United States| 7.3543418344E10|    999931|      32345|
|       Brazil| 2.7839103326E10|    999907|      32333|
|       Mexico| 2.6328201557E10|    999951|      34304|
|      Germany| 2.2971087595E10|     99999|      34720|
+-------------+----------------+----------+-----------+
only showing top 5 rows



                                                                                

In [16]:
top_genres_by_country = spark.sql(
"""
select 
    country_name, 
    genre, 
    count(*) as genre_count
from (
    select 
        country_name, 
        explode(
            split(replace(replace(replace(artist_genres, "]", ""), "[", ""), "'", ""), ',\\s*')
        ) as genre
    from data
) exploded_genres
group by country_name, genre
order by genre_count desc
"""
)

In [17]:
top_genres_by_country.show(5)



+-------------+------------+-----------+
| country_name|       genre|genre_count|
+-------------+------------+-----------+
|     Malaysia|         pop|      14511|
|    Singapore|         pop|      14435|
|        Chile| trap latino|      13088|
|    Australia|         pop|      13032|
|United States|         rap|      12841|
+-------------+------------+-----------+
only showing top 5 rows



                                                                                

In [18]:
top_songs_by_countries = spark.sql(
"""
select country_name, name
from (
    select 
        name, 
        country_name, 
        row_number() over(partition by country_name order by position) as rank
    from data
) as ranked_songs
where rank = 1
"""
)

In [19]:
top_songs_by_countries.show(5)



+------------+--------------------+
|country_name|                name|
+------------+--------------------+
|     Andorra|I Took A Pill In ...|
|   Argentina|             Corazón|
|   Australia|Roses - Imanbek R...|
|     Austria|                 110|
|     Belarus|          HOODAK MP3|
+------------+--------------------+
only showing top 5 rows



                                                                                

In [20]:
top_artists_by_country = spark.sql(
"""
with artists_streams as
(
    select 
        artist,
        sum(streams) as total_streams,
        country_name
    from
    (
        select 
            explode(split(replace(replace(replace(artists, "]", ""), "[", ""), "'", ""), ',\\s*')) as artist,
            streams, 
            country_name
        from data
    ) as artist_data
    group by artist, country_name
)

select 
    country_name,
    artist,
    total_streams
from (
    select 
        country_name,
        artist,
        total_streams,
        row_number() over (partition by country_name order by total_streams desc) as rank
    from artists_streams
) as ranked_artists
where rank = 1
order by country_name
"""
)

In [21]:
top_artists_by_country.show(5)



+------------+-----------+-------------+
|country_name|     artist|total_streams|
+------------+-----------+-------------+
|     Andorra|       Sech|     137853.0|
|   Argentina|   J Balvin| 3.42262394E8|
|   Australia| Ed Sheeran| 4.61058785E8|
|     Austria|Capital Bra|  5.7124997E7|
|     Belarus| INSTASAMKA|    1344977.0|
+------------+-----------+-------------+
only showing top 5 rows



                                                                                

In [22]:
top_songs_more_than_1_time = spark.sql(
"""
with song_position_1 as
(
    select 
        name, 
        artists,
        row_number() over(partition by name order by name) as rank
    from (
        select
            name, 
            artists
        from data
        where position = 1
    ) 
)

select
    name,
    artists
from
(
    select 
        max(rank) as max_rank,
        name,
        artists
    from song_position_1
    group by name, artists
)
where max_rank > 1
order by name
"""
)

In [23]:
top_songs_more_than_1_time.show(5)



+--------------------+--------------------+
|                name|             artists|
+--------------------+--------------------+
|"Bekhayali (From ...|['Sachet Tandon',...|
|"Let It Go - From...|    ['Idina Menzel']|
|                 1-2|           ['nublu']|
|   100 dni do matury|['Mata', 'Ezra Wi...|
|                 110|['Capital Bra', '...|
+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [48]:
top_songs_in_more_than_1_country = spark.sql(
"""
select distinct
    name, 
    artists,
    count(country_name) as total_countries
from data
where position = 1
group by name, artists
having total_countries > 1
order by name
"""
)

In [49]:
top_songs_in_more_than_1_country.show(5)



+--------------------+--------------------+---------------+
|                name|             artists|total_countries|
+--------------------+--------------------+---------------+
|"Bekhayali (From ...|['Sachet Tandon',...|              8|
|"Let It Go - From...|    ['Idina Menzel']|              2|
|                 1-2|           ['nublu']|              6|
|   100 dni do matury|['Mata', 'Ezra Wi...|              3|
|                 110|['Capital Bra', '...|             10|
+--------------------+--------------------+---------------+
only showing top 5 rows



                                                                                