In [10]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
import time

# Path to the MySQL JDBC driver JAR file
jdbc_driver_path = "/Users/teddy/Documents/GitHub/projects/spark-project/jars/mysql-connector-java-8.4.0.jar"

# Initialize Spark session with the MySQL JDBC driver
spark = SparkSession.builder \
    .appName("MySQLConnectionExample") \
    .config("spark.driver.extraClassPath", jdbc_driver_path) \
    .config("spark.jars", jdbc_driver_path) \
    .getOrCreate()

# Define MySQL database connection properties
jdbc_url = "jdbc:mysql://localhost:3306/eventsim?unixSocket=/tmp/mysql.sock"
connection_properties = {
    "user": "root",  # Replace with your MySQL username
    "password": "new_password",  # Replace with your MySQL password
    "driver": "com.mysql.cj.jdbc.Driver"
}

# Specify the table name
table_name = "events"

# Attempt to read the table from MySQL
try:
    df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
    print("Connection successful! Here are the first few rows of the table:")
    df.show(5)  # Display the first 5 rows
except Exception as e:
    print("Connection failed:")
    print(e)


Connection failed:
An error occurred while calling o63.jdbc.
: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:103)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:41)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.

In [4]:
df.head()

NameError: name 'df' is not defined

In [None]:
df.columns

['id',
 'event_type',
 'timestamp',
 'user_id',
 'user_name',
 'user_age',
 'user_gender',
 'user_main_genre',
 'user_subscription_plan',
 'user_platform',
 'user_state',
 'user_os',
 'user_current_page',
 'track_id',
 'title',
 'song_id',
 'release',
 'artist_id',
 'artist_mbid',
 'artist_name',
 'duration',
 'artist_familiarity',
 'artist_hotttnesss',
 'year',
 'track_7digitalid']

In [None]:
from pyspark.sql import functions as F
from pyspark.sql import Window
# Assuming your PySpark DataFrame is named 'df'
# Extract the hour from the timestamp if available, or generate a synthetic one
df = df.withColumn("hour_of_day", (F.rand() * 24).cast("integer"))  # Replace with actual timestamp column if available
# Aggregate the data by hour_of_day and user_main_genre
agg_df = df.groupBy("hour_of_day", "user_main_genre").agg(
    F.count("*").alias("play_count")
)
# Convert the aggregated PySpark DataFrame to Pandas
pandas_df = agg_df.toPandas()

# Main Dashboard

In [None]:
from dash import Dash, dcc, html
import dash_bootstrap_components as dbc
import plotly.express as px
from pyspark.sql import functions as F
import time

# Define charcoal gray color and white color
charcoal_gray = "#36454F"
white = "#FFFFFF"

# 1. Extract the hour from the timestamp
df = df.withColumn("hour_of_day", F.hour(F.col("timestamp")))

# 2. Aggregate the data by hour_of_day and user_main_genre
agg_df = df.groupBy("hour_of_day", "user_main_genre").agg(
    F.count("*").alias("play_count")
)

# 3. Convert the aggregated PySpark DataFrame to Pandas
pandas_df = agg_df.toPandas()

# 4. Sort the DataFrame by 'play_count' in ascending order
pandas_df = pandas_df.sort_values(by='play_count', ascending=False)

# 5. Calculate KPIs

# Top Song by Play Count
top_song = df.groupBy("title").agg(F.count("*").alias("play_count")).orderBy(F.desc("play_count")).first()

# Most Listened to Artist
top_artist = df.groupBy("artist_name").agg(F.count("*").alias("play_count")).orderBy(F.desc("play_count")).first()

# Total Play Count
total_play_count = df.count()

# Most Popular Genre
top_genre = df.groupBy("user_main_genre").agg(F.count("*").alias("play_count")).orderBy(F.desc("play_count")).first()

# Most Popular Subscription Plan
top_subscription_plan = df.groupBy("user_subscription_plan").agg(F.count("*").alias("user_count")).orderBy(F.desc("user_count")).first()

# Average Song Duration
average_song_duration = df.agg(F.avg("duration")).first()

# Most Active User
most_active_user = df.groupBy("user_name").agg(F.count("*").alias("play_count")).orderBy(F.desc("play_count")).first()

# 6. Create a bar chart using Plotly Express
fig = px.bar(pandas_df, x='hour_of_day', y='play_count', color='user_main_genre',
              title='Song Plays by Hour and Genre')

# Play Count by Age Group
age_agg_df = df.groupBy("user_age").agg(F.count("*").alias("play_count"))
age_pandas_df = age_agg_df.toPandas()
age_fig = px.bar(age_pandas_df, x='user_age', y='play_count', title='Play Count by Age Group')

# Play Count by Gender
gender_agg_df = df.groupBy("user_gender").agg(F.count("*").alias("play_count"))
gender_pandas_df = gender_agg_df.toPandas()
gender_fig = px.pie(gender_pandas_df, names='user_gender', values='play_count', title='Play Count by Gender')

# Play Count by Subscription Plan
subscription_agg_df = df.groupBy("user_subscription_plan").agg(F.count("*").alias("play_count"))
subscription_pandas_df = subscription_agg_df.toPandas()
subscription_fig = px.bar(subscription_pandas_df, x='user_subscription_plan', y='play_count', title='Play Count by Subscription Plan')

# Play Count by Platform
platform_agg_df = df.groupBy("user_platform").agg(F.count("*").alias("play_count"))
platform_pandas_df = platform_agg_df.toPandas()
platform_fig = px.bar(platform_pandas_df, x='user_platform', y='play_count', title='Play Count by Platform')

# Play Count by OS
os_agg_df = df.groupBy("user_os").agg(F.count("*").alias("play_count"))
os_pandas_df = os_agg_df.toPandas()
os_fig = px.bar(os_pandas_df, x='user_os', y='play_count', title='Play Count by OS')

# 7. Create the Dash app and use Bootstrap
app = Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])

app.layout = dbc.Container([
    dbc.Row([
        dbc.Col(html.H1("Music Listening Habits Dashboard"), className="mb-4")
    ]),

    # Display KPIs
dbc.Row([
    dbc.Col(dbc.Card([
        dbc.CardHeader(
            html.Strong("Top Song"),
            style={"backgroundColor": charcoal_gray, "color": white}
        ),
        dbc.CardBody(f"{top_song['title']} (Plays: {top_song['play_count']})")
    ])),
    dbc.Col(dbc.Card([
        dbc.CardHeader(
            html.Strong("Most Listened to Artist"),
            style={"backgroundColor": charcoal_gray, "color": white}
        ),    
        dbc.CardBody(f"{top_artist['artist_name']} (Plays: {top_artist['play_count']})")
    ])),
    dbc.Col(dbc.Card([
        dbc.CardHeader(
            html.Strong("Total Play Count"),
            style={"backgroundColor": charcoal_gray, "color": white}
        ),
        dbc.CardBody(f"{total_play_count}")
    ])),
    dbc.Col(dbc.Card([
        dbc.CardHeader(
            html.Strong("Most Popular Genre"),
            style={"backgroundColor": charcoal_gray, "color": white}
        ),
        dbc.CardBody(f"{top_genre['user_main_genre']} (Plays: {top_genre['play_count']})")
    ])),
    dbc.Col(dbc.Card([
        dbc.CardHeader(
            html.Strong("Most Popular Subscription Plan"),
            style={"backgroundColor": charcoal_gray, "color": white}
        ),
        dbc.CardBody(f"{top_subscription_plan['user_subscription_plan']} (Users: {top_subscription_plan['user_count']})")
    ])),
    dbc.Col(dbc.Card([
        dbc.CardHeader(
            html.Strong("Average Song Duration"),
            style={"backgroundColor": charcoal_gray, "color": white}
        ),
        dbc.CardBody(f"{round(average_song_duration['avg(duration)'] / 60, 2)} minutes")
    ])),
    dbc.Col(dbc.Card([
        dbc.CardHeader(
            html.Strong("Most Active User"),
            style={"backgroundColor": charcoal_gray, "color": white}
        ),
        dbc.CardBody(f"{most_active_user['user_name']} (Plays: {most_active_user['play_count']})")
    ])),
], className="mb-4"),

    # Existing Chart
    dbc.Row([
        dbc.Col(dcc.Graph(id='example-graph', figure=fig))
    ]),

    # New Charts
    dbc.Row([
        dbc.Col(dcc.Graph(id='age-play-count', figure=age_fig)),
        dbc.Col(dcc.Graph(id='gender-play-count', figure=gender_fig))
    ]),
    dbc.Row([
        dbc.Col(dcc.Graph(id='subscription-play-count', figure=subscription_fig)),
        dbc.Col(dcc.Graph(id='platform-play-count', figure=platform_fig))
    ]),
    dbc.Row([
        dbc.Col(dcc.Graph(id='os-play-count', figure=os_fig))
    ]),
], fluid=True)

# 8. Run the Dash app
if __name__ == '__main__':
    app.run_server(mode='external')


# Unused Dashboard

# Timer Updated Dashboard

In [None]:
from dash import Dash, dcc, html, Output, Input
import dash_bootstrap_components as dbc
import plotly.express as px
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Initialize SparkSession with JDBC driver
spark = SparkSession.builder \
    .appName("MusicDashboard") \
    .config("spark.jars", "/Users/timl/PythonWork/Spark/Spark Week/Archive/jars/mysql-connector-java-8.4.0.jar") \
    .getOrCreate()

# Define charcoal gray color and white color
charcoal_gray = "#36454F"
white = "#FFFFFF"


# Function to fetch and process data from MySQL
def fetch_data():
    global df
    df = spark.read.format("jdbc") \
        .option("url", "jdbc:mysql://localhost:3306/eventsim?unixSocket=/tmp/mysql.sock") \
        .option("dbtable", "events") \
        .option("user", "timlinkous") \
        .option("password", "zipcode1") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .load()
    df = df.withColumn("hour_of_day", F.hour(F.col("timestamp")))

# Initial data fetch
fetch_data()

# 8. Create the Dash app and use Bootstrap
app = Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])

app.layout = dbc.Container([
    dbc.Row([
        dbc.Col(html.H1("Music Listening Habits Dashboard"), className="mb-4")
    ]),

    # Display KPIs
    dbc.Row([
        dbc.Col(dbc.Card([
            dbc.CardHeader(
                html.Strong("Top Song"),
                style={"backgroundColor": charcoal_gray, "color": white}
            ),
            dbc.CardBody(id="top-song")
        ])),
        dbc.Col(dbc.Card([
            dbc.CardHeader(
                html.Strong("Most Listened to Artist"),
                style={"backgroundColor": charcoal_gray, "color": white}
            ),
            dbc.CardBody(id="top-artist")
        ])),
        dbc.Col(dbc.Card([
            dbc.CardHeader(
                html.Strong("Total Play Count"),
                style={"backgroundColor": charcoal_gray, "color": white}
            ),
            dbc.CardBody(id="total-play-count")
        ])),
        dbc.Col(dbc.Card([
            dbc.CardHeader(
                html.Strong("Most Popular Genre"),
                style={"backgroundColor": charcoal_gray, "color": white}
            ),
            dbc.CardBody(id="top-genre")
        ])),
        dbc.Col(dbc.Card([
            dbc.CardHeader(
                html.Strong("Most Popular Subscription Plan"),
                style={"backgroundColor": charcoal_gray, "color": white}
            ),
            dbc.CardBody(id="top-subscription-plan")
        ])),
        dbc.Col(dbc.Card([
            dbc.CardHeader(
                html.Strong("Average Song Duration"),
                style={"backgroundColor": charcoal_gray, "color": white}
            ),
            dbc.CardBody(id="avg-song-duration")
        ])),
        dbc.Col(dbc.Card([
            dbc.CardHeader(
                html.Strong("Most Active User"),
                style={"backgroundColor": charcoal_gray, "color": white}
            ),
            dbc.CardBody(id="most-active-user")
        ])),
    ], className="mb-4"),

    # Existing Chart
    dbc.Row([
        dbc.Col(dcc.Graph(id='example-graph'))
    ]),

    # New Charts
    dbc.Row([
        dbc.Col(dcc.Graph(id='age-play-count')),
        dbc.Col(dcc.Graph(id='gender-play-count'))
    ]),
    dbc.Row([
        dbc.Col(dcc.Graph(id='subscription-play-count')),
        dbc.Col(dcc.Graph(id='platform-play-count'))
    ]),
    dbc.Row([
        dbc.Col(dcc.Graph(id='os-play-count'))
    ]),

    # Interval component for periodic updates
    dcc.Interval(
        id='interval-component',
        interval=10*1000,  # in milliseconds (every 10 seconds)
        n_intervals=0,
        max_intervals=6  # Run only 6 times
    )
], fluid=True)

# Callbacks to update the dashboard
@app.callback(
    [Output('top-song', 'children'),
     Output('top-artist', 'children'),
     Output('total-play-count', 'children'),
     Output('top-genre', 'children'),
     Output('top-subscription-plan', 'children'),
     Output('avg-song-duration', 'children'),
     Output('most-active-user', 'children'),
     Output('example-graph', 'figure'),
     Output('age-play-count', 'figure'),
     Output('gender-play-count', 'figure'),
     Output('subscription-play-count', 'figure'),
     Output('platform-play-count', 'figure'),
     Output('os-play-count', 'figure')],
    [Input('interval-component', 'n_intervals')]
)
def update_dashboard(n):
    fetch_data()  # Fetch the latest data from MySQL

    # Aggregate the data
    agg_df = df.groupBy("hour_of_day", "user_main_genre").agg(
        F.count("*").alias("play_count")
    )
    pandas_df = agg_df.toPandas().sort_values(by='play_count', ascending=False)

    # Calculate KPIs
    top_song = df.groupBy("title").agg(F.count("*").alias("play_count")).orderBy(F.desc("play_count")).first()
    top_artist = df.groupBy("artist_name").agg(F.count("*").alias("play_count")).orderBy(F.desc("play_count")).first()
    total_play_count = df.count()
    top_genre = df.groupBy("user_main_genre").agg(F.count("*").alias("play_count")).orderBy(F.desc("play_count")).first()
    top_subscription_plan = df.groupBy("user_subscription_plan").agg(F.count("*").alias("user_count")).orderBy(F.desc("user_count")).first()
    average_song_duration = df.agg(F.avg("duration")).first()
    most_active_user = df.groupBy("user_name").agg(F.count("*").alias("play_count")).orderBy(F.desc("play_count")).first()

    # Create the figures
    fig = px.bar(pandas_df, x='hour_of_day', y='play_count', color='user_main_genre', title='Song Plays by Hour and Genre')

    age_agg_df = df.groupBy("user_age").agg(F.count("*").alias("play_count"))
    age_pandas_df = age_agg_df.toPandas()
    age_fig = px.bar(age_pandas_df, x='user_age', y='play_count', title='Play Count by Age Group')

    gender_agg_df = df.groupBy("user_gender").agg(F.count("*").alias("play_count"))
    gender_pandas_df = gender_agg_df.toPandas()
    gender_fig = px.pie(gender_pandas_df, names='user_gender', values='play_count', title='Play Count by Gender')

    subscription_agg_df = df.groupBy("user_subscription_plan").agg(F.count("*").alias("play_count"))
    subscription_pandas_df = subscription_agg_df.toPandas()
    subscription_fig = px.bar(subscription_pandas_df, x='user_subscription_plan', y='play_count', title='Play Count by Subscription Plan')

    platform_agg_df = df.groupBy("user_platform").agg(F.count("*").alias("play_count"))
    platform_pandas_df = platform_agg_df.toPandas()
    platform_fig = px.bar(platform_pandas_df, x='user_platform', y='play_count', title='Play Count by Platform')

    os_agg_df = df.groupBy("user_os").agg(F.count("*").alias("play_count"))
    os_pandas_df = os_agg_df.toPandas()
    os_fig = px.bar(os_pandas_df, x='user_os', y='play_count', title='Play Count by OS')

    return (f"{top_song['title']} (Plays: {top_song['play_count']})",
            f"{top_artist['artist_name']} (Plays: {top_artist['play_count']})",
            f"{total_play_count}",
            f"{top_genre['user_main_genre']} (Plays: {top_genre['play_count']})",
            f"{top_subscription_plan['user_subscription_plan']} (Users: {top_subscription_plan['user_count']})",
            f"{round(average_song_duration['avg(duration)'] / 60, 2)} minutes",
            f"{most_active_user['user_name']} (Plays: {most_active_user['play_count']})",
            fig, age_fig, gender_fig, subscription_fig, platform_fig, os_fig)

# 10. Run the Dash app
if __name__ == '__main__':
    app.run_server(debug=True)


24/08/24 16:10:08 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Py4JJavaError: An error occurred while calling o39.load.
: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:103)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:41)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:34)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [None]:
import pandas as pd

# Assuming your data is in a DataFrame called df
numeric_df = df.select_dtypes(include=['float64', 'int64'])
correlation_matrix = numeric_df.corr()
print(correlation_matrix)


AttributeError: 'DataFrame' object has no attribute 'select_dtypes'