#Elections

##Settings

In [2]:
!pip install pyspark -q

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [15]:
!pip install plotly -q

###Kaggle

Cargamos archivo con API de Kaggle

In [None]:
from google.colab import files
files.upload() # Subir API de Kaggle

Creación de directorio

In [None]:
!ls -lha kaggle.json # print
!pip install -q kaggle # installing the kaggle package
!mkdir -p ~/.kaggle # creating .kaggle folder where the key should be placed
!cp kaggle.json ~/.kaggle/ # move the key to the folder
!pwd # checking the present working directory
!chmod 600 ~/.kaggle/kaggle.json # Change file mode bits

Obtenemos el conjunto de datos de Kaggle y lo descomprimimos

In [None]:
!kaggle datasets download -d tunguz/us-elections-dataset

In [None]:
!unzip us-elections-dataset.zip

###Google Drive

Montamos una conexión con Google Drive

In [5]:
from google.colab import drive
drive.mount('drive')

Mounted at drive


##Librerías

In [19]:
import random
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px

from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

##Datos

###1976-2020

Abrimos el conjunto de datos en un primer acercamiento

In [None]:
df = pd.read_csv('/content/1976-2020-president.csv')

####Primer Análisis

Observamos las características y contenido del conjunto de datos

In [None]:
df.head()

In [None]:
df.sample()

In [None]:
df.tail()

In [None]:
df.info()

In [None]:
df.isna().sum()

In [None]:
df.corr()

###Elecciones 2020

####Google Drive

#####Exportación

Extraemos los datos de las elecciones presidenciales del 2020

In [None]:
df_2020 = df[df['year'] == 2020]

Guardamos el archivo en Google Drive

In [None]:
df_2020.to_csv('2020-president.csv',  encoding='utf-8', index = False, header=True)

In [None]:
!cp 2020-president.csv '/content/drive/MyDrive/Data'

#####Carga

Realizamos la carga del conjunto de datos previamente acotado

In [None]:
df = pd.read_csv('/content/drive/MyDrive/Data/2020-president.csv')

####Segundo Análisis

In [None]:
df

In [None]:
df.notes.value_counts()

In [None]:
df.party_simplified.value_counts()

In [None]:
df.party_detailed.value_counts()

In [None]:
df.candidatevotes.sum()

In [None]:
df.party_simplified.unique()

##Simulación de Datos

###Limpieza

Determinamos el estado que más votantes tuvo para simular sus datos

In [None]:
df.loc[df.totalvotes.idxmax(), 'state']

Extramos el estado en un nuevo dataframe

In [None]:
state = df[df['state'] == 'CALIFORNIA'].reset_index(drop=True)

Removemos algunas columnas para simplificar el problema

In [None]:
ls_drop = ['year', 'state', 'state_po', 'state_cen', 'state_ic', 'party_detailed', 'writein', 'totalvotes', 'version', 'notes']

In [None]:
state.drop(ls_drop, axis=1, inplace=True)

Reduciremos las categorías de candidato a 3: 'Biden', 'Trump' y 'Others'

In [None]:
candidates = ['BIDEN, JOSEPH R. JR', 'TRUMP, DONALD J.']
filt = state[state['candidate'].isin(candidates)]

In [None]:
votos_other = state[~state['candidate'].isin(candidates)]['candidatevotes'].sum()

In [None]:
other = pd.DataFrame({
    'state_fips':[6],
    'office': ['US PRESIDENT'],
    'candidate': ['OTHER'],
    'candidatevotes':[votos_other],
    'party_simplified':['OTHER']
})

Concatenamos los conjuntos creados sobrescribiendo el anterior 

In [None]:
state = pd.concat([filt, other]).reset_index(drop=True)
state

Eliminamos las variables que ya no serán ocupadas

In [None]:
del df, ls_drop, votos_other, candidates, filt, other

Guardamos el dataframe en un archivo csv para facilitar su carga posterior

In [None]:
state.to_csv('californiaVotes.csv', encoding='utf-8', index=False, header=True)

In [None]:
!cp californiaVotes.csv '/content/drive/MyDrive/Data'

###Generando Registros (Apache Spark)

Cargamos el conjunto previamente generado

In [5]:
state = pd.read_csv('/content/drive/MyDrive/Data/californiaVotes.csv')
state

Unnamed: 0,state_fips,office,candidate,candidatevotes,party_simplified
0,6,US PRESIDENT,"BIDEN, JOSEPH R. JR",11110250,DEMOCRAT
1,6,US PRESIDENT,"TRUMP, DONALD J.",6006429,REPUBLICAN
2,6,US PRESIDENT,OTHER,384202,OTHER


Observamos los datos con una herramienta de visualización 

In [21]:
fig = go.Figure()

# Agregar las barras
fig.add_trace(go.Bar(
    x=state['candidate'],
    y=state['candidatevotes'],
    text=state['party_simplified'],
    hovertemplate='Candidate: %{text}<br>Votes: %{y}',
    marker_color=['blue', 'red', 'green'],  # Color de las barras
))

# Diseño gráfico
fig.update_layout(
    title='US Elections 2020',
    xaxis_title='Candidate',
    yaxis_title='Votes',
)

fig.show()

Creamos una sesión de Spark

In [6]:
spark = SparkSession.builder.getOrCreate()

Utilizaremos Apache Spark para convertir el conjunto en un RDD

In [7]:
rdd = spark.sparkContext.parallelize(state.values.tolist())

In [None]:
for r in rdd.collect():
  print(r)

[6, 'US PRESIDENT', 'BIDEN, JOSEPH R. JR', 11110250, 'DEMOCRAT']
[6, 'US PRESIDENT', 'TRUMP, DONALD J.', 6006429, 'REPUBLICAN']
[6, 'US PRESIDENT', 'OTHER', 384202, 'OTHER']


Duplicamos los registros según el número de votos

In [8]:
rdd = rdd.flatMap(lambda row: [row] * row[3])

Implementamos una función que genera un número SSN (Social Security Number) aleatorio que cumpla las normas de la región de California

In [9]:
def ssn_california():
  return '0' + str(random.randint(60_100_000, 69_999_999))

Agregamos un snn a cada registro en el rdd

In [10]:
rdd = spark.sparkContext.parallelize([(ssn_california(),) + tuple(row) for row in rdd.collect()])

Una vez obtenidos los snn eliminamos la columna con el total de votos 

In [11]:
rdd = rdd.map(lambda row: row[:4] + row[5:])

Observamos una muestra de nuestro rdd

In [None]:
rdd.take(5)

[('061443138', 6, 'US PRESIDENT', 'BIDEN, JOSEPH R. JR', 'DEMOCRAT'),
 ('062503114', 6, 'US PRESIDENT', 'BIDEN, JOSEPH R. JR', 'DEMOCRAT'),
 ('065959610', 6, 'US PRESIDENT', 'BIDEN, JOSEPH R. JR', 'DEMOCRAT'),
 ('065356903', 6, 'US PRESIDENT', 'BIDEN, JOSEPH R. JR', 'DEMOCRAT'),
 ('061278064', 6, 'US PRESIDENT', 'BIDEN, JOSEPH R. JR', 'DEMOCRAT')]

##Flujo de Datos

Creamos un streamig con intervalo de un 1

In [12]:
ssc = StreamingContext(spark.sparkContext, 1)



Creamos una lista que almacene cada batch

In [13]:
cola_rdds = []

Definimos una función que procese el RDD, agregando cada batch en el flujo de datos a un DataFrame que actualiza el gráfico

In [18]:
def process_rdd(_, rdd):
    global cola_rdds
    cola_rdds.extend(rdd.collect())

    # Guardamos los datos acumulados en DataFrame
    batch_df = spark.createDataFrame(cola_rdds, ["ssn", "state_fips", "office", "candidate", "candidatevotes", "party_simplified"])

    # Gráfico de barras
    fig = px.bar(batch_df, x="candidate", y="candidatevotes", text='party_simplified',color="candidate",
                 animation_frame="ssn", range_y=[0, max(cola_rdds, key=lambda x: x[4])[4]])

    # Título y etiquetas
    fig.update_layout(title='US Elections 2020',
                      xaxis_title='Candidato', yaxis_title='Cantidad de votos')

    fig.show()

Creamos un flujo de datos a partir del RDD

In [15]:
flujo_datos = ssc.queueStream([rdd])

Procesamos los datos en el flujo

In [16]:
flujo_datos.foreachRDD(process_rdd)

Iniciamos el StreamingContext y mantentemos en ejecución

In [None]:
ssc.start()
ssc.awaitTermination()

#Dataset

[US Elections](https://www.kaggle.com/datasets/tunguz/us-elections-dataset?select=1976-2020-president.csv)

##Diccionario de Datos

**ssn**: Social Security Number

**state_fips**: Id estado

**office**: Tipo elecciones

**candidate**: Nombre del candidato

**candidatevotes**: Número de votos totales obtenidos por estado

**party_simplified**: Partido del candidato