In [None]:
spark.conf.set(
"fs.azure.account.key.datastorageaccount1124.dfs.core.windows.net",
"AccessKey"
)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

spark = SparkSession.builder\
    .appName("Most streamed songs")\
    .getOrCreate()

spark

In [None]:
df = spark.read\
    .option("header",True)\
    .option("inferSchema",True)\
    .option("mode","FAILFAST")\
    .csv("abfss://datacontainer@datastorageaccount1124.dfs.core.windows.net/Most Streamed Spotify Songs 2024.csv")

df.printSchema()
df.show(5)

df.createOrReplaceTempView("mostStreamedSongsTable")

In [None]:
#Changing the date format and filtering columns

dateFormat = "MM-dd-yyyy"
filterDF = df.select(
    col("Artist"),
    col("Track"),
    col("Spotify Streams"),
    col("Spotify Popularity"),
    to_date(col("Release Date"),dateFormat).alias("Release Date"))\
    .where(col("Release Date") > "2024-01-01")\
    .filter(col("Spotify Streams").isNotNull())\
    .filter(col("Spotify Popularity").isNotNull())\
    .orderBy(col("Release Date").desc())

filterDF.show()

In [None]:
yeardf = df.withColumn("Year", year(col("Release Date")))

maxPopularityByYear = yeardf.groupBy("Year")\
    .agg(max(col("Spotify Popularity")).alias("Max Popularity"))\
    .orderBy(col("Year"))

maxPopularityByYear.show()

In [None]:
#Grouping the Artist based on popularity

popularityDF = filterDF.groupBy("Artist")\
    .agg(max("Spotify Popularity").alias("Maximum Popularity Based on Artist"))\
    .orderBy(col("Maximum Popularity Based on Artist").desc())
popularityDF.show()

In [None]:
# Define a window specification to rank by maximum popularity
windowSpec = Window\
    .orderBy(col("Maximum Popularity Based on Artist").desc())

# Add a rank column based on maximum popularity
rankedPopularityDF = popularityDF\
    .withColumn("Rank", dense_rank().over(windowSpec))
print("Ranking Artist based on Spotify popularity based on current year:\n")
rankedPopularityDF.show()

In [None]:
output_path = "abfss://outputcontainer@datastorageaccount1124.dfs.core.windows.net/filtered.csv"
output_path1 = "abfss://outputcontainer@datastorageaccount1124.dfs.core.windows.net/popularity.csv"
output_path2 = "abfss://outputcontainer@datastorageaccount1124.dfs.core.windows.net/rankedPopularity.csv"
output_path3 = "abfss://outputcontainer@datastorageaccount1124.dfs.core.windows.net/maxPopularityByYear.csv"

filterDF.write\
    .csv(output_path, mode="overwrite")

popularityDF.write\
    .csv(output_path1,mode="overwrite")

rankedPopularityDF.write\
    .csv(output_path2,mode="overwrite")

maxPopularityByYear.write\
    .csv(output_path3,mode="overwrite")