In [38]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, col, hour, minute, date_format, when
from datetime import datetime
from pyspark.sql import functions as F
import pandas as pd
from dotenv import load_dotenv
import os

In [39]:
load_dotenv()

jpath = os.getenv("jpath")
topartisturl = os.getenv("topartisturl")
hours = os.getenv("hours")
topsongs = os.getenv("topsongs")
dates = os.getenv("dates")
user = os.getenv("user")
password = os.getenv("password")

In [40]:


jar_path = jpath

spark = SparkSession.builder \
    .appName("hi") \
    .master("local[*]") \
    .config("spark.jars", jar_path) \
    .config("spark.driver.extraClassPath", jar_path) \
    .config("spark.executor.extraClassPath", jar_path) \
    .getOrCreate()

print(spark.sparkContext._conf.getAll())
print(spark.sparkContext._conf.get("spark.jars"))





[('spark.hadoop.fs.s3a.vectored.read.min.seek.size', '128K'), ('spark.executor.extraJavaOptions', '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-modules=jdk.incubator.vector --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Dio.netty

In [None]:
df = spark.read.json('StreamingHistory_music_2.json', multiLine=True)
df = df.na.drop(how="any")


clean_df = df.where((df.artistName != "Unknown Artist") & (df.trackName != "Unknown Track"))


top_artists = clean_df.groupby("artistName").agg({"msPlayed": "sum"}).orderBy("sum(msPlayed)", ascending=False)

top_songs = clean_df.groupby("trackName").agg({"msPlayed": "sum"}).orderBy("sum(msPlayed)", ascending=False)



In [46]:
clean_df = clean_df.withColumn(
    "endTime", to_timestamp(col("endTime"), "yyyy-MM-dd HH:mm")
)

clean_df = clean_df.withColumn("endHour", hour(col("endTime")))
clean_df = clean_df.withColumn("endMins", minute(col("endTime")))

time = []

clean_df = clean_df.withColumn("Hour", hour(col("endTime")))
clean_df = clean_df.withColumn("MonthAndDay", date_format(col("endTime"), "MM-dd-yyyy"))
clean_df = clean_df.withColumn("MonthName", date_format(col("endTime"), "MMMM"))
clean_df = clean_df.withColumn("TimeofDay", when(clean_df["Hour"] < 12, "Morning") .when((clean_df["Hour"] >= 12) & (clean_df["Hour"] <= 18), "Afternoon").otherwise("Evening"))



clean_df.show()

+-------------------+-------------------+--------+--------------------+-------+-------+----+-----------+---------+---------+
|         artistName|            endTime|msPlayed|           trackName|endHour|endMins|Hour|MonthAndDay|MonthName|TimeofDay|
+-------------------+-------------------+--------+--------------------+-------+-------+----+-----------+---------+---------+
|       Fall Out Boy|2025-02-27 17:36:00|  301438|This Ain't A Scen...|     17|     36|  17| 02-27-2025| February|Afternoon|
|         Dean Blunt|2025-02-27 17:36:00|    3111|                   5|     17|     36|  17| 02-27-2025| February|Afternoon|
|          JPEGMAFIA|2025-02-27 17:36:00|    1532|       Kenan Vs. Kel|     17|     36|  17| 02-27-2025| February|Afternoon|
|      Devon Hendryx|2025-02-27 17:36:00|    2182|        Neon Kitchen|     17|     36|  17| 02-27-2025| February|Afternoon|
|         Weatherday|2025-02-27 17:36:00|    2507|Painted Girl's Theme|     17|     36|  17| 02-27-2025| February|Afternoon|


In [47]:
Dates = clean_df.groupby("MonthAndDay").agg({"msPlayed": "sum"}).orderBy("sum(msPlayed)", ascending=False)
Dates = Dates.withColumn("MonthAndDay", to_timestamp(col("MonthAndDay"), "MM-dd-yyyy"))
Dates = Dates.withColumn("MonthName", date_format(col("MonthAndDay"), "MMMM"))

Dates.show()


+-------------------+-------------+---------+
|        MonthAndDay|sum(msPlayed)|MonthName|
+-------------------+-------------+---------+
|2025-06-15 00:00:00|     13111806|     June|
|2025-06-14 00:00:00|     12793369|     June|
|2025-06-24 00:00:00|     12367409|     June|
|2025-06-25 00:00:00|     11140346|     June|
|2025-04-01 00:00:00|     10386387|    April|
|2025-05-27 00:00:00|      9563203|      May|
|2025-07-05 00:00:00|      9435295|     July|
|2025-06-16 00:00:00|      9411195|     June|
|2025-05-31 00:00:00|      9386490|      May|
|2025-03-24 00:00:00|      8953259|    March|
|2025-06-18 00:00:00|      8507764|     June|
|2025-05-25 00:00:00|      8484902|      May|
|2025-05-13 00:00:00|      8212466|      May|
|2025-06-28 00:00:00|      8165252|     June|
|2025-02-28 00:00:00|      7957507| February|
|2025-04-12 00:00:00|      7936964|    April|
|2025-03-08 00:00:00|      7798891|    March|
|2025-06-21 00:00:00|      7754671|     June|
|2025-05-28 00:00:00|      7686246

In [48]:
db_url = topartisturl
db_url2 = hours 
db_url3 = topsongs
db_url4 = dates 
db_properties = {
    "user": user,
    "password": password,
    "driver": "org.postgresql.Driver"
}

top_artists.write.jdbc(
    url=db_url,
    table="top_artists",      
    mode="overwrite",               
    properties=db_properties
)

top_songs.write.jdbc(
    url=db_url3,
    table="top_songs",      
    mode="overwrite",               
    properties=db_properties
)

Dates.write.jdbc(
    url=db_url4,
    table="dates",      
    mode="overwrite",               
    properties=db_properties
)

clean_df.select(["msPlayed", "TimeofDay"]).write.jdbc(
    url=db_url2,
    table="hours",      
    mode="overwrite",               
    properties=db_properties
)
