In [6]:
!pip install mysql-connector-python pandas matplotlib ipython python-dotenv 

Collecting python-dotenv
  Using cached python_dotenv-1.0.1-py3-none-any.whl.metadata (23 kB)
Using cached python_dotenv-1.0.1-py3-none-any.whl (19 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.0.1


In [7]:
import mysql.connector
import pandas as pd
import matplotlib.pyplot as plt
from IPython.display import display, clear_output
import time
from dotenv import load_dotenv
import seaborn as sns
from IPython.display import display, clear_output
import dash
import plotly.graph_objs as go
from dash import dcc, html, Input, Output, State, callback, ctx
import dash_bootstrap_components as dbc
from dash.dependencies import Input, Output
from sqlalchemy import create_engine
import traceback  # To print full error stack trace

# Run Event Sim

In [9]:
# Function to create a connection
def create_connection():
    """Create a connection to the MySQL database using SQLAlchemy."""
    try:
        engine = create_engine(
            "mysql+mysqlconnector://timlinkous:zipcode1@localhost/starmeter"
        )
        return engine
    except Exception as e:
        print(f"Error connecting to the database: {e}")
        return None

In [10]:
import pandas as pd
import time
import dash
import dash_bootstrap_components as dbc
import plotly.graph_objects as go
from dash import dcc, html, Output, Input, State
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("MySQL-Spark-Connection") \
    .config("spark.jars.packages", "mysql:mysql-connector-java:8.0.29") \
    .getOrCreate()

# Function to retrieve fan counts with error handling
def get_fan_counts():
    try:
        # Fetch data using Spark
        df = spark.read.format("jdbc") \
            .option("url", "jdbc:mysql://localhost:3306/starmeter") \
            .option("dbtable", "user_dynamic_preferences") \
            .option("user", "timlinkous") \
            .option("password", "zipcode1") \
            .option("driver", "com.mysql.cj.jdbc.Driver") \
            .load()

        # Perform the aggregation using Spark
        fan_counts_df = df.groupBy("current_favorite").count().withColumnRenamed("count", "fan_count")

        # Convert to Pandas DataFrame for Dash processing
        fan_counts_pd = fan_counts_df.toPandas()

        if fan_counts_pd.empty:
            print("Warning: The retrieved DataFrame is empty.")
        return fan_counts_pd

    except Exception as e:
        print(f"Error retrieving data: {e}")
        return pd.DataFrame(columns=['current_favorite', 'fan_count'])

# Initialize Dash app
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])

# Initialize layout KPI cards interval component
app.layout = html.Div([
    html.H1("Real-Time Fan Count of Celebrities"),
    html.Div(id='kpi-cards', children=[
        dbc.Card([
            dbc.CardBody([
                html.H4("Total Fans", className="card-title"),
                html.H5(id='total-fans', className="card-text")
            ])
        ]),
        dbc.Card([
            dbc.CardBody([
                html.H4("Most Popular Celebrity", className="card-title"),
                html.H5(id='most-popular-celebrity', className="card-text")
            ])
        ]),
    ]),
    dcc.Graph(id='live-update-graph'),
    dcc.Interval(
        id='interval-component',
        interval=1000,  # update interval milliseconds
        n_intervals=0,
        disabled=True  # interval initially disabled
    ),
    html.Button('Start', id='start-stop-button', n_clicks=0, className="btn btn-primary"),
])

# Initialize data storage
fan_counts_data = {celebrity: [] for celebrity in ['Sabrina Carpenter', 'Snoop Dogg', 'Tony Stark', 'LeBron James']}
time_data = []

@app.callback(
    [Output('live-update-graph', 'figure'),
     Output('total-fans', 'children'),
     Output('most-popular-celebrity', 'children')],
    Input('interval-component', 'n_intervals')
)
def update_graph_and_kpis(n):
    global time_data, fan_counts_data

    try:
        # Fetch latest fan counts using Spark
        df = get_fan_counts()

        # Initialize KPI variables
        total_fans = 0
        most_popular_celeb = None
        max_fans = 0

        if not df.empty:
            # Append current timestamp
            current_time = time.time()
            time_data.append(current_time - time_data[0] if time_data else 0)

            # Update fan counts data
            for celebrity in fan_counts_data.keys():
                fan_count = df[df['current_favorite'] == celebrity]['fan_count'].sum() if not df[df['current_favorite'] == celebrity].empty else 0
                fan_counts_data[celebrity].append(fan_count)

                # Update total fans and most popular celebrity
                total_fans += fan_count
                if fan_count > max_fans:
                    max_fans = fan_count
                    most_popular_celeb = celebrity

            # Create traces for each celebrity
            fig = go.Figure()
            for celebrity, fan_counts in fan_counts_data.items():
                fig.add_trace(go.Scatter(
                    x=time_data,
                    y=fan_counts,
                    mode='lines+markers',
                    name=celebrity,
                    marker=dict(size=5),
                    text=[f'Followers {count}' for count in fan_counts],
                    hoverinfo='text'
                ))

            # Ensure time_data has enough data points to set range
            if len(time_data) > 1:
                fig.update_layout(
                    xaxis=dict(range=[max(time_data) - 100, max(time_data)]),
                )

            fig.update_layout(
                xaxis_title='Time (seconds)',
                yaxis_title='Number of Fans',
                title='Real-Time Fan Count of Celebrities',
            )

        else:
            fig = go.Figure().update_layout(
                title='No data available',
                xaxis_title='Time (seconds)',
                yaxis_title='Number of Fans'
            )

        return fig, f'{total_fans}', most_popular_celeb or 'No data'

    except Exception as e:
        print(f"Error in updating graph and KPIs: {e}")
        return go.Figure().update_layout(title="Error updating graph"), 'Error', 'Error'

# Callback control start stop functionality
@app.callback(
    Output('interval-component', 'disabled'),
    Output('start-stop-button', 'children'),
    Input('start-stop-button', 'n_clicks'),
    State('interval-component', 'disabled')
)
def toggle_interval(n_clicks, is_disabled):
    # Toggle state interval (start stop updates)
    if n_clicks % 2 == 0:
        return True, 'Start'  # disabled, show 'Start' button text
    else:
        return False, 'Stop'  # enabled, show 'Stop' button text

if __name__ == '__main__':
    app.run_server(debug=True)


24/09/07 09:48:50 WARN Utils: Your hostname, Zipcoders-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.222 instead (on interface en0)
24/09/07 09:48:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/timl/Library/jupyterlab-desktop/jlab_server/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/timl/.ivy2/cache
The jars for the packages stored in: /Users/timl/.ivy2/jars
mysql#mysql-connector-java added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-08c297f5-18a7-435d-a845-d6a6adbf7e5d;1.0
	confs: [default]
	found mysql#mysql-connector-java;8.0.29 in central
	found com.google.protobuf#protobuf-java;3.19.4 in central
:: resolution report :: resolve 99ms :: artifacts dl 2ms
	:: modules in use:
	com.google.protobuf#protobuf-java;3.19.4 from central in [default]
	mysql#mysql-connector-java;8.0.29 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	-------------------------------------------

24/09/07 09:49:02 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [None]:
# Function to retrieve the fan count for each celebrity
def get_fan_counts():
    connection = create_connection()  # Assuming create_connection() is defined elsewhere
    query = """
    SELECT current_favorite, COUNT(*) as fan_count 
    FROM user_dynamic_preferences 
    GROUP BY current_favorite;
    """
    # Use the raw MySQL connection to fetch the data
    df = pd.read_sql(query, con=connection)
    connection.close()
    return df

# Initialize Dash app
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])

# Initialize layout with an empty figure and an interval component
app.layout = html.Div([
    html.H1("Real-Time Fan Count of Celebrities"),
    dcc.Graph(id='live-update-graph'),
    dcc.Interval(
        id='interval-component',
        interval=300,  # in milliseconds (0.3 seconds)
        n_intervals=0
    )
])

# Initialize data storage
fan_counts_data = {celebrity: [] for celebrity in ['Sabrina Carpenter', 'Snoop Dogg', 'Tony Stark', 'LeBron James']}
time_data = []

# Callback to update the graph
@app.callback(
    Output('live-update-graph', 'figure'),
    Input('interval-component', 'n_intervals')
)
def update_graph(n):
    global time_data, fan_counts_data

    # Fetch the latest fan counts
    df = get_fan_counts()

    # Append the current timestamp
    current_time = time.time()
    time_data.append(current_time - time_data[0] if time_data else 0)

    # Update the fan counts data
    for celebrity in fan_counts_data.keys():
        fan_count = df[df['current_favorite'] == celebrity]['fan_count'].sum() if not df[df['current_favorite'] == celebrity].empty else 0
        fan_counts_data[celebrity].append(fan_count)

    # Create traces for each celebrity
    fig = go.Figure()
    for celebrity, fan_counts in fan_counts_data.items():
        fig.add_trace(go.Scatter(
            x=time_data,
            y=fan_counts,
            mode='lines+markers',
            name=celebrity
        ))

    # Set graph layout properties
    fig.update_layout(
        xaxis_title='Time (seconds)',
        yaxis_title='Number of Fans',
        title='Real-Time Fan Count of Celebrities',
        xaxis=dict(range=[max(time_data) - 100, max(time_data)]),
        yaxis=dict(range=[0, 1000])
    )

    return fig

if __name__ == '__main__':
    app.run_server(debug=True)


In [50]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import row_number, lag, sum, col, desc, when
from pyspark.sql.window import Window

def calculate_fan_count_changes(spark):
    # Read the event_log table
    df = spark.read.format("jdbc") \
        .option("url", "jdbc:mysql://localhost:3306/starmeter") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("dbtable", "event_log") \
        .option("user", "timlinkous") \
        .option("password", "zipcode1") \
        .load()

    # Create a window spec sorted by event_date in descending order
    window_spec = Window.partitionBy("celebrity").orderBy(desc("event_date"))

    # Calculate the fan count changes
    df_with_changes = df.withColumn("row_number", row_number().over(window_spec)) \
        .withColumn("previous_fan_count", lag("current_fan_count").over(window_spec)) \
        .withColumn("fan_count_change", sum(col("fans_gained") - col("fans_lost")).over(window_spec))

    return df_with_changes.select(
        "event_date", 
        "celebrity", 
        "event_description", 
        "current_fan_count",
        "fan_count_change"
    ).orderBy(desc("event_date"))

In [51]:
import dash
from dash import html, dcc
from dash.dependencies import Input, Output
import dash_bootstrap_components as dbc
from dash import dash_table
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc

# Initialize Spark session
spark = SparkSession.builder \
    .appName("CelebrityFanCountDashboard") \
    .config("spark.jars.packages", "mysql:mysql-connector-java:8.0.29") \
    .getOrCreate()

def fetch_and_calculate_changes():
    # Fetch data from MySQL
    df = spark.read.format("jdbc") \
        .option("url", "jdbc:mysql://localhost:3306/starmeter") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("dbtable", "event_log") \
        .option("user", "timlinkous") \
        .option("password", "zipcode1") \
        .load()

    # Convert to Pandas DataFrame
    pdf = df.select("event_date", "celebrity", "event_description", "current_fan_count") \
        .orderBy(desc("event_date")) \
        .limit(20).toPandas()

    pdf = pdf.sort_values(['celebrity', 'event_date'])  # Sort in ascending order
    pdf['fan_count_change'] = pdf.groupby('celebrity')['current_fan_count'].diff()
    pdf['fan_count_change'] = pdf['fan_count_change'] * -1  # Invert the sign
    pdf = pdf.sort_values('event_date', ascending=False)

    # Keep only the 10 most recent events
    pdf = pdf.head(10)

    pdf['fan_count_change'] = pdf['fan_count_change'].fillna(0).astype(int)
    
    return pdf

# Initialize Dash app
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])

# Layout
app.layout = html.Div([
    html.H1("Celebrity Fan Count Real-Time Dashboard"),
    dcc.Interval(
        id='interval-component',
        interval=5000,  # in milliseconds, update every 5 seconds
        n_intervals=0
    ),
    dash_table.DataTable(
        id='live-update-table',
        columns=[
            {"name": "Event Date", "id": "event_date"},
            {"name": "Celebrity", "id": "celebrity"},
            {"name": "Event Description", "id": "event_description"},
            {"name": "Current Fan Count", "id": "current_fan_count"},
            {"name": "Fan Count Change", "id": "fan_count_change"}
        ],
        style_table={'height': 'auto', 'overflowY': 'auto'},
        style_cell={'textAlign': 'left'},
        style_data_conditional=[
            {
                'if': {'column_id': 'fan_count_change', 'filter_query': '{fan_count_change} > 0'},
                'color': 'green'
            },
            {
                'if': {'column_id': 'fan_count_change', 'filter_query': '{fan_count_change} < 0'},
                'color': 'red'
            }
        ],
        sort_action='native',
        sort_mode='multi'
    )
])

# Callback to update the table
@app.callback(
    Output('live-update-table', 'data'),
    Input('interval-component', 'n_intervals')
)
def update_table(n):
    try:
        df = fetch_and_calculate_changes()
        return df.to_dict('records')
    except Exception as e:
        print(f"Error in update_table: {str(e)}")
        return []  # Return empty list if there's an error

if __name__ == '__main__':
    app.run_server(debug=True)

24/09/07 11:18:56 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/09/07 11:18:56 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [49]:
spark.stop()
