In [None]:
from pyspark.sql import functions as F

# path to the data relative to the notebook
path_to_data = './path/to/data'

# add/remove files as needed
data_files = [
    'endsong_0.json',
    'endsong_1.json',
    'endsong_2.json',
    'endsong_3.json',
    'endsong_4.json',
    'endsong_5.json',
    'endsong_6.json',
    'endsong_7.json',
]

In [None]:
df_data = None
for file in data_files:
    df_this_data_file = spark.read.json(path_to_data + file)
    if not df_data:
        df_data = df_this_data_file
    else:
        df_data = df_data.union(df_this_data_file)

In [None]:
from datetime import datetime

season_start_dates = {
    'SZNZ: Spring': datetime.strptime('2022-03-21', '%Y-%m-%d'),
    'SZNZ: Summer': datetime.strptime('2022-06-21', '%Y-%m-%d'),
    'SZNZ: Autumn': datetime.strptime('2022-09-21', '%Y-%m-%d'),
    'SZNZ: Winter': datetime.strptime('2022-12-21', '%Y-%m-%d'),
}

start_date_data = [
    ('SZNZ: Spring', '2022-03-21'),
    ('SZNZ: Summer', '2022-06-21'),
    ('SZNZ: Autumn', '2022-09-21'),
    ('SZNZ: Winter', '2022-12-21')
]
df_start_dates = spark.createDataFrame(start_date_data, ['album', 'start_date'])

In [None]:

df_weezer = (df_data
    .filter(F.col('master_metadata_album_artist_name') == 'Weezer')
    .filter(F.col('master_metadata_album_album_name').like('SZNZ%'))
    .orderBy('ts')
    .select(
        F.col('master_metadata_track_name').alias('track'),
        F.col('master_metadata_album_album_name').alias('album'),
        'ms_played',
        'ts',
        F.expr('sum(ms_played) over (partition by master_metadata_album_album_name order by ts)').alias('cumsum_ms'),
        F.expr('row_number() over (partition by master_metadata_album_album_name order by ts)').alias('rn'),
    )         
)

df_weezer = (df_weezer
    .orderBy('ts')
    .select(
        '*', 
        F.lit(F.col('cumsum_ms') / 60000 / 60).alias('cumsum_hrs')
    )
)

In [None]:
from pyspark.sql import Window
import datetime

album_dfs = {}
seasons = ['SZNZ: Summer', 'SZNZ: Spring', 'SZNZ: Autumn', 'SZNZ: Winter']
for season in seasons:
    
    season_dates = [ ( (season_start_dates[season] + datetime.timedelta(days=x)).strftime('%Y-%m-%d'),) for x in range(90)]
    album_dfs[season] = spark.createDataFrame( season_dates, ['ts'])
    
    df_this_season_data = (df_weezer
        .filter(F.col('album') == season)
        .groupBy(F.substring(F.col('ts'), 1,10).alias('ts'))
        .agg(F.sum('ms_played').alias('ms_played'))
        .select(
            F.substring(F.col('ts'), 1, 10).alias('ts'),
            F.expr('sum(ms_played) over (order by ts)').alias(season),
        )
    )
    
    album_dfs[season] = (album_dfs[season]
        .join(df_this_season_data, 'ts', 'left')
        .select(
            '*',
            F.expr('row_number() over (order by ts)').alias('rn'),
        )
    )
    
    
df_albums_pivot = spark.range(1,90).withColumnRenamed("id","rn")
for album_df in album_dfs:
    df_albums_pivot = df_albums_pivot.join(album_dfs[album_df], 'rn', 'left' )
 
for season in seasons:
    df_albums_pivot = (df_albums_pivot
        .withColumn(season + '_hrs', F.lit(F.col(season) / 60000 / 60))
    )

window_album = Window.partitionBy().orderBy('rn').rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_albums_pivot = (df_albums_pivot
    .select(
        '*',
        *[F.last(F.col(c + '_hrs'), ignorenulls=True).over(window_album).alias(c + '_hrs_fill') for c in seasons]
    )               
    .orderBy('rn')
)

In [None]:
import matplotlib.pyplot as plt

pd_albums_pivot = df_albums_pivot.toPandas()

season_colours = {
    'SZNZ: Summer': 'orange', 
    'SZNZ: Spring': 'green',
    'SZNZ: Autumn': 'purple',
    'SZNZ: Winter': 'white'
}
ax = plt.axes()
ax.set_facecolor("black")

for season in seasons:
    plt.plot(pd_albums_pivot['rn'], pd_albums_pivot[season + '_hrs_fill'], color=season_colours[season])
plt.xlabel('Day', fontsize=12)
plt.ylabel('hrs_played', fontsize=12)
plt.grid(False)


plt.show()