In [1]:
import pandas as pd
import numpy as np
from math import erf
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
from datetime import datetime as dt
import os,sys
import plotly.graph_objects as go
from plotly.subplots import make_subplots
%matplotlib inline

DAY_OF_SIM = dt.today() # dt.strptime(open("datefile.txt",'r').read()[:-1], '%d-%m-%Y') 

In [2]:
for file in os.listdir("../polling_averages"):
    os.remove(f"../polling_averages/{file}")

In [3]:
df = pd.read_csv("https://projects.fivethirtyeight.com/polls/data/president_polls.csv")

parse_date = lambda date_str: dt.strptime(date_str, '%m/%d/%y')
df["end_date"] = df["end_date"].apply(parse_date)
df = df.sort_values(by="end_date")
df = df[df["end_date"]<=DAY_OF_SIM].copy()
df["state"] = df["state"].fillna("National")
df["pct"] = df["pct"]/100

In [4]:
flattened_polls = []
question_ids = df['question_id'].unique()

for question_id in question_ids:
    the_poll = df[df["question_id"]==question_id]

    if the_poll['state'].unique().shape[0] > 1:
        print(f"WARNING TOO MANY STATES IN POLL: QUESTION_ID = {question_id}")
    if the_poll['end_date'].unique().shape[0] > 1:
        print(f"WARNING TOO MANY DATES IN POLL: QUESTION_ID = {question_id}")
    if the_poll['sample_size'].unique().shape[0] > 1:
        print(f"WARNING TOO MANY SAMPLE_SIZES IN POLL: QUESTION_ID = {question_id}")
    if the_poll['pollster_id'].unique().shape[0] > 1:
        print(f"WARNING TOO MANY POLLSTERS IN POLL: QUESTION_ID = {question_id}")


    candidates = np.sort(the_poll["candidate_name"].unique())

    trump_and_biden = ("Donald Trump" in candidates) and ("Joe Biden" in candidates)
    trump_and_harris = ("Donald Trump" in candidates) and ("Kamala Harris" in candidates)

    poll_flattened = {
        'question_id':question_id,
        'pollster_id':the_poll['pollster_id'].unique()[0],
        'end_date':the_poll['end_date'].unique()[0],
        'state':the_poll['state'].unique()[0],
        'race' :''.join([f"{candidate} v. " for candidate in candidates])[:-4],
        'trump_and_biden':trump_and_biden,
        'trump_and_harris':trump_and_harris,
        'sample_size':the_poll['sample_size'].unique()[0],
    }
    for candidate in candidates:
        poll_flattened[f"{candidate}"] = the_poll[the_poll["candidate_name"]==candidate]["pct"].values[0]

    other_vote = 0
    for candidate in np.setdiff1d(candidates,["Donald Trump","Kamala Harris","Joe Biden"]):
        other_vote += the_poll[the_poll["candidate_name"]==candidate]["pct"].values[0]
    poll_flattened["Others"] = other_vote


    poll_flattened = pd.Series(data=poll_flattened)

    flattened_polls.append(poll_flattened)

flattened_polls = pd.concat(flattened_polls,axis=1).T

In [5]:
extracted_polls = flattened_polls[flattened_polls["trump_and_harris"]][["pollster_id","end_date","state","sample_size","race","Donald Trump","Kamala Harris","Others"]].sort_values(by='end_date').copy()

In [6]:
extracted_polls['sample_size'].isna().any()

True

In [7]:
random_walk_var = 0.005**2 # daily variance
start_date = dt.strptime("2024-07-01", '%Y-%m-%d')
date_range = pd.date_range(start_date, DAY_OF_SIM)

In [8]:
def row_handler(row):
    outrow=row.copy()
    if row.isna().all():
        return outrow
    if (outrow[["harris_mean","trump_mean","other_mean"]].sum()) <= 1:
        return outrow

    outrow[["harris_mean","trump_mean","other_mean"]] = row/(outrow[["harris_mean","trump_mean","other_mean"]].sum())
    if row["harris_mean"]!=0 and not row.isna()["harris_mean"]:
        outrow["harris_var"] = row["harris_var"] * (outrow["harris_mean"] / row["harris_mean"])**2
    if row["trump_mean"]!=0 and not row.isna()["trump_mean"]:
        outrow["trump_var"] = row["trump_var"] * (outrow["trump_mean"] / row["trump_mean"])**2
    if row["other_mean"]!=0 and not row.isna()["other_mean"]:
        outrow["other_var"] = row["other_var"] * (outrow["other_mean"] / row["other_mean"])**2
    return outrow

In [9]:
polled_states = extracted_polls['state'].unique()

mapper = {"Kamala Harris":"harris",
          "Donald Trump":"trump",
          "Others":"other"}


polling_dfs = {}
polls_dfs = {}
for state in polled_states:
    polling_averages = pd.DataFrame({"date":date_range,"harris_mean":np.nan,"harris_var":np.nan,"trump_mean":np.nan,"trump_var":np.nan,"other_mean":np.nan,"other_var":np.nan})
    polling_averages.set_index("date",inplace=True)

    for date in date_range:
        up_to_date_polls = extracted_polls[(extracted_polls["end_date"]<=date) & (extracted_polls["state"]==state)].copy()
        if up_to_date_polls.shape[0]==0:
            continue
        up_to_date_polls["sample_size"].fillna(500,inplace=True)
        up_to_date_polls["bias_var"] = 0.03**2 # Variance due to unkown bias (assume mean zero but posses 0.03 var)

        for candidate in ["Donald Trump","Kamala Harris"]:
            up_to_date_polls[f"sample_var_{candidate}"] = up_to_date_polls[candidate] * (1 - up_to_date_polls[candidate]) / up_to_date_polls["sample_size"] # Variance due to the sample size
            up_to_date_polls[f"time_since_var_{candidate}"] = ((date - up_to_date_polls["end_date"]).apply(lambda x: x.days))**2 * random_walk_var * up_to_date_polls[candidate] 

            up_to_date_polls[f"total_var_{candidate}"] = up_to_date_polls[f"sample_var_{candidate}"] + up_to_date_polls["bias_var"] + up_to_date_polls[f"time_since_var_{candidate}"]
            up_to_date_polls[f"weight_{candidate}"] = 1 /up_to_date_polls[f"total_var_{candidate}"]
            
            
            new_mean = sum(up_to_date_polls[candidate] / up_to_date_polls[f"total_var_{candidate}"]) / sum( 1 / up_to_date_polls[f"total_var_{candidate}"])
            new_var = 1 / sum( 1 / up_to_date_polls[f"total_var_{candidate}"])

            # dummy fix for uncertainty intervals in the negative
            if (new_mean - 1.96*np.sqrt(new_var) <= 0) and (new_mean + 1.96*np.sqrt(new_var) <= 0.1):
                the_max = 1.96*np.sqrt(new_var) + new_mean

                new_mean = 0.5 * (the_max)
                new_var = (the_max / (2*1.96))**2

            polling_averages.loc[date,f"{mapper[candidate]}_mean"] = new_mean
            polling_averages.loc[date,f"{mapper[candidate]}_var"] = new_var

        candidate="Others"
        up_to_date_polls=up_to_date_polls[up_to_date_polls['race'].str.count("v.")>1]
        if up_to_date_polls.shape[0]==0:
            continue
        up_to_date_polls[f"sample_var_{candidate}"] = up_to_date_polls[candidate] * (1 - up_to_date_polls[candidate]) / up_to_date_polls["sample_size"] # Variance due to the sample size
        up_to_date_polls[f"time_since_var_{candidate}"] = ((date - up_to_date_polls["end_date"]).apply(lambda x: x.days))**2 * random_walk_var * up_to_date_polls[candidate] 

        up_to_date_polls[f"total_var_{candidate}"] = up_to_date_polls[f"sample_var_{candidate}"] + up_to_date_polls["bias_var"] + up_to_date_polls[f"time_since_var_{candidate}"]
        up_to_date_polls[f"weight_{candidate}"] = 1 /up_to_date_polls[f"total_var_{candidate}"]
        
        
        new_mean = sum(up_to_date_polls[candidate] / up_to_date_polls[f"total_var_{candidate}"]) / sum( 1 / up_to_date_polls[f"total_var_{candidate}"])
        new_var = 1 / sum( 1 / up_to_date_polls[f"total_var_{candidate}"])

        # dummy fix for uncertainty intervals in the negative
        if (new_mean - 1.96*np.sqrt(new_var) <= 0) and (new_mean + 1.96*np.sqrt(new_var) <= 0.1):
            the_max = 1.96*np.sqrt(new_var) + new_mean

            new_mean = 0.5 * (the_max)
            new_var = (the_max / (2*1.96))**2

        polling_averages.loc[date,f"{mapper[candidate]}_mean"] = new_mean
        polling_averages.loc[date,f"{mapper[candidate]}_var"] = new_var

    polling_averages=polling_averages.apply(row_handler,axis=1)

    if polling_averages.dropna().shape[0]>0:
        polling_averages.dropna().to_csv(f"../polling_averages/{state}.csv")


    polling_dfs[state] = polling_averages.copy()


In [10]:
republican_red = "#cf1313"
democratic_blue = "#116dc2"
other_grey = "#8a8a8a"
republican_red_alphaed = "rgba(207, 19, 19, 0.2)"
democratic_blue_alphaed = "rgba(17, 109, 194, 0.2)"
other_grey_alphaed = "rgba(138, 138, 138,0.2)"

color_mapper = {"Kamala Harris":democratic_blue,"Donald Trump":republican_red,"Others":other_grey}
color_mapper_alphaed = {"Kamala Harris":democratic_blue_alphaed,"Donald Trump":republican_red_alphaed,"Others":other_grey_alphaed}


In [11]:
for state in polled_states:
    the_scatters = []
    the_lines = []
    the_cis = []
    the_max = 0
    for candidate in ["Kamala Harris","Donald Trump","Others"]:
        
        polls_for_scatter = extracted_polls[(extracted_polls["end_date"]>=start_date) & (extracted_polls["state"]==state)]
        pi =polling_dfs[state][f"{mapper[candidate]}_mean"]
        if pi.isna().all():
            continue
        sigmasqd = polling_dfs[state][f"{mapper[candidate]}_var"]
        if max(polls_for_scatter[["Donald Trump","Kamala Harris","Others"]].max().max(),max(pi.dropna()),max((pi+1.96*np.sqrt(sigmasqd)).dropna())) >= the_max:
            the_max = max(polls_for_scatter[["Donald Trump","Kamala Harris","Others"]].max().max(),max(pi.dropna()),max((pi+(1.96*np.sqrt(sigmasqd)).apply(lambda x: min(x,0.2))).dropna()))
        if polls_for_scatter.shape[0]>0:
            scatter = go.Scatter(
                x=polls_for_scatter["end_date"],
                y=polls_for_scatter[candidate].values,
                mode='markers',
                marker=dict(
                    size=(polls_for_scatter["sample_size"].fillna(500)**0.3).values,
                    color=color_mapper_alphaed[candidate],
                    colorscale='Viridis',
                    showscale=False,
                    opacity=0.8,

                    
                    line=dict(width=0, color='white')
                ),
                hoverinfo="skip",
                showlegend=False,
                name='Scatter Data'
            )
        else:
            polls_for_scatter = extracted_polls[(extracted_polls["state"]==state)]
            scatter = go.Scatter(
                x=polls_for_scatter["end_date"],
                y=polls_for_scatter[candidate].values,
                mode='markers',
                marker=dict(
                    size=(polls_for_scatter["sample_size"].fillna(500)**0.3).values,
                    color=color_mapper_alphaed[candidate],
                    colorscale='Viridis',
                    showscale=False,
                    opacity=0.8,

                    
                    line=dict(width=0, color='white')
                ),
                hoverinfo="skip",
                showlegend=False,
                name='Scatter Data'
            )



        text = pi.apply(lambda x: f"{candidate}: {np.round(x*100,1)}%").values

        line = go.Scatter(
            x=date_range,
            y=pi,
            mode='lines',
            line=dict(color=color_mapper[candidate], width=2),
            name='Line Data',
            hoverinfo='text',
            text=text,
            hovertemplate='%{text}<extra></extra>',
        )

        confidence_interval = go.Scatter(
            x=np.concatenate([date_range, date_range[::-1]]),
            y=np.concatenate([pi+(1.96*np.sqrt(sigmasqd)).apply(lambda x: min(x,0.2)), (pi-(1.96*np.sqrt(sigmasqd)).apply(lambda x: min(x,0.2)))[::-1]]),
            fill='toself',
            fillcolor=color_mapper_alphaed[candidate],
            line=dict(color='rgba(255, 255, 255, 0)'),  # No outline
            hoverinfo="skip",
            showlegend=False,
            name='Confidence Interval'
        )

        the_scatters.append(scatter)
        the_lines.append(line)
        the_cis.append(confidence_interval)

    fig = go.Figure(data=(the_scatters + the_cis + the_lines))#, confidence_interval, line])

    # Customize layout
    fig.update_layout(
        # title="Scatter Plot with Line and Uncertainty Intervals",
        # xaxis_title="X-axis",
        # yaxis_title="Y-axis",
        template="plotly_white",
        showlegend=False,
        dragmode=False,
        hovermode="x unified",
        width=800, height=400,
        margin=dict(l=0, r=0, t=0, b=0),
        
        
    )
    fig.update_yaxes(
        range=[0, max(0.6,min(the_max,1))*1.05],
        tickvals=np.arange(0,np.ceil(max(0.6,min(the_max,1)+0.1)*10)/10,0.1), 
        ticktext=[f"{np.round(i*100,1)}%" for i in np.arange(0,np.ceil(max(0.6,min(the_max,1)+0.1)*10)/10,0.1)]  
    )
    # fig.show()
    fig.write_html(f"../docs/assets/poll_plots/{state}.html",config={'displayModeBar': False,"responsive": True,"scrollZoom":False})

In [12]:
for file in os.listdir("../docs/assets/poll_plots"):
    state_name = file[:-5]
    if os.path.isdir("../docs/_polls"):
        with open(f"../docs/_polls/{state_name}.markdown","w") as file:
            file.write(
                
f"""---
layout: polling_page
title:  "{state_name}"
head_title: "{state_name} Polling"
---
                """)
