In [12]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Charts Pipeline")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")
# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

#  Google Storage File Path
gsc_file_path = 'gs://spotify_data_de/charts.csv'

# Create data frame
df = spark.read.format("csv").option("header", "true") \
       .load(gsc_file_path)

df.printSchema()

root
 |-- title: string (nullable = true)
 |-- rank: string (nullable = true)
 |-- date: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- url: string (nullable = true)
 |-- region: string (nullable = true)
 |-- chart: string (nullable = true)
 |-- trend: string (nullable = true)
 |-- streams: string (nullable = true)



In [13]:
#Change datatypings to the correct type hints
from pyspark.sql.functions import *
from pyspark.sql.types import NumericType, IntegerType

df = df.withColumn("streams",col("streams").cast(IntegerType()))
df = df.withColumn("rank",col("rank").cast(IntegerType()))
df = df.select("*", col("date"), to_date(col("date"),"yyyy-MM-dd").alias("conv_date")).drop("date")

df.printSchema()

root
 |-- title: string (nullable = true)
 |-- rank: integer (nullable = true)
 |-- artist: string (nullable = true)
 |-- url: string (nullable = true)
 |-- region: string (nullable = true)
 |-- chart: string (nullable = true)
 |-- trend: string (nullable = true)
 |-- streams: integer (nullable = true)
 |-- conv_date: date (nullable = true)



In [14]:
top200 = df.filter(df.chart == 'top200')

df_grouped = top200.groupby(col('title')).max("streams")

In [15]:
df_grouped.show(20)

+--------------------+------------+
|               title|max(streams)|
+--------------------+------------+
|We Don't Talk Any...|     1190610|
|  Persiana Americana|       65346|
|        Sunset Lover|     1068294|
|Cheerleader - Fel...|      636702|
|          Soits lebn|        1354|
|          Don't Mind|       36905|
|Materialista (fea...|       11938|
|        La Gota Fría|        8128|
|When You Love Som...|       98264|
|Let Me Love You -...|       13533|
|       Till It Hurts|       24147|
|Keep It Mello (fe...|        4818|
|              Dahulu|        6440|
|       The Beginning|       29625|
|              Remedy|       81329|
|              Heaven|     3339581|
|No Promises (feat...|     2176314|
|            Senorita|      387984|
|      Alles probiert|      195448|
|Rapariga Não - Ao...|      149839|
+--------------------+------------+
only showing top 20 rows



In [16]:
joinExpression = ['title']
merged_df = df_grouped.join(df, joinExpression,"left")
final_df = merged_df.dropDuplicates(['title'])
final_df.orderBy(col('max(streams)').desc()).show(20)

+--------------------+------------+----+--------------------+--------------------+-----------+-------+-------------+-------+----------+
|               title|max(streams)|rank|              artist|                 url|     region|  chart|        trend|streams| conv_date|
+--------------------+------------+----+--------------------+--------------------+-----------+-------+-------------+-------+----------+
|          Easy On Me|    19749704|  50|               Adele|https://open.spot...|  Argentina| top200|      MOVE_UP|  71132|2021-11-01|
|All I Want for Ch...|    17223237| 109|        Mariah Carey|https://open.spot...|    Belgium| top200|      MOVE_UP|   3246|2017-01-01|
|      Last Christmas|    15813799| 104|               Wham!|https://open.spot...|    Belgium| top200|      MOVE_UP|   3323|2017-01-01|
|     drivers license|    13714177|  72|      Olivia Rodrigo|https://open.spot...|  Argentina| top200|    MOVE_DOWN|  50033|2021-07-01|
|            good 4 u|    12586645|  27|      Ol

In [17]:
#Best and worst performing song per artist that reached the top200
from pyspark.sql import Row, Window

windowdesc = Window.partitionBy(col('artist')).orderBy(col('max(streams)').desc())
windowasc = Window.partitionBy(col('artist')).orderBy(col('max(streams)').asc())

songs_merged_windowed = final_df.withColumn('rank_desc', dense_rank().over(windowdesc)).withColumn('rank_asc', dense_rank().over(windowasc))

worst_best_song_table = songs_merged_windowed.where((col('rank_desc') == 1) | (col('rank_asc') == 1) ).select('*')

worst_best_song_table2 = worst_best_song_table.withColumn('performance_category', when(col('rank_desc') == 1, 'Most streams')
                                                          .otherwise('Least streams')).select('artist', 'performance_category', 'title', 'max(streams)')
worst_best_song_table2.show(10)

+--------------------+--------------------+--------------------+------------+
|              artist|performance_category|               title|max(streams)|
+--------------------+--------------------+--------------------+------------+
|"@LMK ~ LoveMusic...|        Most streams|            Emotionz|        4418|
|"Arturo ""Zambo""...|       Least streams|Popurri de Valses...|       10046|
|"Arturo ""Zambo""...|        Most streams|    Se Acabó y Punto|       22303|
|"Boy Wonder CF, T...|        Most streams|Siente el Boom (f...|       25858|
|"Fruko Y Sus Teso...|       Least streams|         Los Charcos|       18883|
|"Fruko Y Sus Teso...|        Most streams|            El Preso|       27051|
|"Gotay ""El Auten...|        Most streams|           Más De Ti|       13141|
|"Héctor Acosta ""...|        Most streams|  Amorcito Enfermito|        2216|
|"Interpreti dello...|        Most streams|          Bella ciao|       74922|
|"Musicologo The L...|        Most streams| Bum Bam Ven - Remix|

In [18]:
bucket = "spotify_data_de"
spark.conf.set('temporaryGcsBucket', bucket)
# Saving the data to BigQuery
worst_best_song_table2.write.format('bigquery') \
  .option('table', 'de2022-362617.spotify.charts') \
  .mode("overwrite") \
  .save()

In [11]:
# Stop the spark context
spark.stop()