In [1]:
!pip install hopsworks

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting hopsworks
  Downloading hopsworks-3.0.5.tar.gz (35 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting hsfs[python]<3.1.0,>=3.0.0 (from hopsworks)
  Downloading hsfs-3.0.7.tar.gz (120 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m120.7/120.7 kB[0m [31m8.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting hsml<3.1.0,>=3.0.0 (from hopsworks)
  Downloading hsml-3.0.3.tar.gz (50 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.9/50.9 kB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting pyhumps==1.6.1 (from hopsworks)
  Downloading pyhumps-1.6.1-py3-none-any.whl (5.0 kB)
Collecting furl (from hopsworks)
  Downloading furl-2.1.3-py2.py3-none-any.whl (20 kB)
Collecting boto3 (from hopsworks)
  Downloading boto3-1.

In [1]:
import pandas as pd
import numpy as np
import hopsworks
import os
import requests
import zipfile

In [2]:
# Hosted notebook environments may not have the local features package
def need_download_modules():
    if 'google.colab' in str(get_ipython()):
        return True
    if 'HOPSWORKS_PROJECT_ID' in os.environ:
        return True
    return False

if need_download_modules():
    print("Downloading modules")
    os.system('sudo apt-get install unzip')
    os.system('mkdir -p features')
    os.system('cd features && wget https://raw.githubusercontent.com/Prithivee7/voter_categorical_binning/main/features/voters.py')
else:
    print("Local environment")

Downloading modules


In [3]:
from features import voters

In [4]:
# Login to hopsworks
hopsworks_project = hopsworks.login()
fs = hopsworks_project.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/44182
Connected. Call `.close()` to terminate connection gracefully.


In [5]:
# This function accepts the Registered Voter Stats file by Election Date link as parameter and outputs the data as a dataframe
def get_voter_details_df(voter_stats_link):

  # Downloading the zip file
  path_to_zip_file = "voter_dataset.zip"
  response = requests.get(voter_stats_link, stream=True)
  with open(path_to_zip_file, "wb") as f:
      for chunk in response.iter_content(chunk_size=512):
          if chunk:  # filter out keep-alive new chunks
              f.write(chunk)

  # Extracting the data in the zip file      
  with zipfile.ZipFile(path_to_zip_file, 'r') as zip_ref:
    zip_ref.extractall("my_data")
  
  # Reading the dataframe
  file_name = os.listdir("my_data")[0]
  file_path = f"my_data/{file_name}"
  voters_df = pd.read_csv(file_path,delimiter=r"\t+")
  
  # Performing cleanup operation
  os.remove(path_to_zip_file)
  os.remove(file_path)
  os.rmdir("my_data")
  return voters_df

In [6]:
# This function takes in the voters dataframe and gives back the features 
def get_final_dataframe(df):
    df['Political Party'] = df['party_cd'].apply(voters.perform_binning_political_parties)
    df['County ID'] = df['county_desc'].apply(voters.get_county_id)
    df['Race'] = df['race_code'].apply(voters.perform_binning_races)
    df['Age Bracket'] = df['age'].apply(voters.get_age_bracket)
    df['Sex'] = df['sex_code'].apply(voters.perform_binning_sex)
    df['Ethnicity'] = df['ethnic_code'].apply(voters.perform_binning_ethnicity)
    df = df[['Political Party', "County ID", "Race",
            "Age Bracket", "Sex", "Ethnicity", "total_voters"]]
    return df

In [7]:
def perform_aggregation(df):
    agg_df = df.groupby([
      'Political Party', 'Race',
      'Age Bracket', 'Sex',"Ethnicity","County ID"]).agg(
      Voter_Count=('total_voters', np.size)
  ).reset_index()
    agg_df['p_key'] = [i for i in range(1,len(agg_df)+1)]
    return agg_df 

In [8]:
def update_column_names(df):
    df.columns = ['political_party', 'race', 'age_bracket', 'sex', 'ethnicity',
       'county_id', 'voter_count', 'p_key']
    return df

In [None]:
def main(voter_stats_link):
    voters_df = get_voter_details_df(voter_stats_link)
    features_df = get_final_dataframe(voters_df)
    agg_df = perform_aggregation(features_df)
    agg_df = update_column_names(agg_df)


In [9]:
# Provide the path of the zip file based on the election data required
# The data can be found here -> https://www.ncsbe.gov/results-data/voter-registration-data
# The description can be found here -> https://s3.amazonaws.com/dl.ncsbe.gov/ENRS/layout_voter_stats.txt
# The county_name and county_id can be found here -> https://s3.amazonaws.com/dl.ncsbe.gov/data/layout_ncvoter.txt

voter_stats_link = "https://s3.amazonaws.com/dl.ncsbe.gov/ENRS/2022_12_06/voter_stats_20221206.zip"

agg_df = main(voter_stats_link)



### Creating and Inserting records to the Feature Group

In [12]:
# Create a feature group
# Provide a name, description and the primary key column
voters_fg = fs.get_or_create_feature_group(
    name="voters",
    version=1,
    description="Voter data with categorical variables and aggregation",
    primary_key=['p_key'],
    online_enabled=True
)

In [13]:
# Insert the dataframe to the recently created feature store
voters_fg.insert(agg_df)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/44182/fs/44101/fg/52912


Uploading Dataframe: 0.00% |          | Rows 0/20713 | Elapsed Time: 00:00 | Remaining Time: ?

Launching offline feature group backfill job...
Backfill Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/44182/jobs/named/voters_1_offline_fg_backfill/executions


(<hsfs.core.job.Job at 0x7f080461bd60>, None)

In [17]:
# Updating the Feature Descriptions

feature_descriptions = [
    {"name": "political_party", "description": "It bins the political party into the Democratic, Republic and Others"},
    {"name": "race", "description": "Contains information about the Race of the voter"},
    {"name": "age_bracket", "description": "Contains information about the age bracket to which the voter belongs"},
    {"name": "sex", "description": "Contains information regarding the sex of the voter"},
    {"name": "ethnicity", "description": "Contains information about the ethnicity of the voter"},
    {"name": "county_id", "description": "Contains information about the county id of the voter"},
    {"name": "voter_count", "description": "Contains information regarding the number of voters"},
    {"name": "p_key", "description": "This feature is used as a primary key"},    
]

for desciption in feature_descriptions: 
    voters_fg.update_feature_description(desciption["name"], desciption["description"])