## Vaccine Adverse Event Reporting System (VAERS) 

In [1]:
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

In [2]:
import os
from pprint import pprint
pprint({key: value for key, value in os.environ.items() if key.startswith("FEAST_")})

{'FEAST_CORE_URL': 'core:6565',
 'FEAST_HISTORICAL_FEATURE_OUTPUT_FORMAT': 'parquet',
 'FEAST_HISTORICAL_FEATURE_OUTPUT_LOCATION': 'file:///shared/historical_feature_output',
 'FEAST_REDIS_HOST': 'redis',
 'FEAST_SERVING_URL': 'online_serving:6566',
 'FEAST_SPARK_HOME': '/usr/local/spark',
 'FEAST_SPARK_LAUNCHER': 'standalone',
 'FEAST_SPARK_STAGING_LOCATION': 'file:///shared/staging',
 'FEAST_SPARK_STANDALONE_MASTER': 'local'}


### Data Preperation

VAERS data are from a passive surveillance system and represent unverified reports of health events that occur after vaccination. 

Such data are subject to limitations of under-reporting, simultaneous administration of multiple vaccine antigens, reporting bias, and lack of incidence rates in unvaccinated comparison groups. 

In [3]:
import pandas as pd
import numpy as np
from datetime import datetime

In [4]:
covid_data_df = pd.read_csv('2021VAERSDATA.csv', encoding='latin', 
                               dtype={col: np.float32 for col in ['AGE_YRS', 'CAGE_YR', 'CAGE_MO', 'HOSPDAYS', 'NUMDAYS']})
covid_data_df['DIED'] = covid_data_df['DIED'].astype('string')
covid_data_df['L_THREAT'] = covid_data_df['L_THREAT'].astype('string')
covid_data_df['RECOVD'] = covid_data_df['RECOVD'].astype('string')

covid_data_df.columns = [x.lower() for x in covid_data_df.columns]

covid_data_df['datetime'] = pd.to_datetime(
            np.random.randint(
                datetime(2020, 10, 10).timestamp(),
                datetime(2020, 10, 20).timestamp(),
                size=5351),
        unit="s"
    )
covid_data_df['created'] = pd.to_datetime(datetime.now())

In [5]:
covid_data_df.dtypes

vaers_id                 int64
recvdate                object
state                   object
age_yrs                float32
cage_yr                float32
cage_mo                float32
sex                     object
rpt_date                object
symptom_text            object
died                    string
datedied                object
l_threat                string
er_visit                object
hospital                object
hospdays               float32
x_stay                  object
disable                 object
recovd                  string
vax_date                object
onset_date              object
numdays                float32
lab_data                object
v_adminby               object
v_fundby                object
other_meds              object
cur_ill                 object
history                 object
prior_vax               object
splttype                object
form_vers                int64
todays_date             object
birth_defect            object
ofc_visi

In [6]:
covid_symptoms_df = pd.read_csv('2021VAERSSYMPTOMS.csv', encoding='latin',
                                  dtype={col: np.float32 for col in ['SYMPTOMVERSION1', 'SYMPTOMVERSION2', 'SYMPTOMVERSION3', 'SYMPTOMVERSION4', 'SYMPTOMVERSION5']})

covid_symptoms_df['SYMPTOM1'] = covid_symptoms_df['SYMPTOM1'].astype('string')
covid_symptoms_df['SYMPTOM2'] = covid_symptoms_df['SYMPTOM2'].astype('string')
covid_symptoms_df['SYMPTOM3'] = covid_symptoms_df['SYMPTOM3'].astype('string')
covid_symptoms_df['SYMPTOM4'] = covid_symptoms_df['SYMPTOM4'].astype('string')
covid_symptoms_df['SYMPTOM5'] = covid_symptoms_df['SYMPTOM5'].astype('string')

covid_symptoms_df.columns = [x.lower() for x in covid_symptoms_df.columns]
covid_symptoms_df['datetime'] = pd.to_datetime(
            np.random.randint(
                datetime(2020, 10, 10).timestamp(),
                datetime(2020, 10, 20).timestamp(),
                size=8640),
        unit="s"
    )
covid_symptoms_df['created'] = pd.to_datetime(datetime.now())

In [7]:
covid_symptoms_df.dtypes

vaers_id                    int64
symptom1                   string
symptomversion1           float32
symptom2                   string
symptomversion2           float32
symptom3                   string
symptomversion3           float32
symptom4                   string
symptomversion4           float32
symptom5                   string
symptomversion5           float32
datetime           datetime64[ns]
created            datetime64[ns]
dtype: object

In [8]:
covid_vax_df = pd.read_csv('2021VAERSVAX.csv', encoding='latin')
covid_vax_df['VAX_TYPE'] = covid_vax_df['VAX_TYPE'].astype('string')
covid_vax_df['VAX_MANU'] = covid_vax_df['VAX_MANU'].astype('string')

covid_vax_df.columns = [x.lower() for x in covid_vax_df.columns]
covid_vax_df['datetime'] = pd.to_datetime(
            np.random.randint(
                datetime(2020, 10, 10).timestamp(),
                datetime(2020, 10, 20).timestamp(),
                size=5471),
        unit="s"
    )
covid_vax_df['created'] = pd.to_datetime(datetime.now())

In [9]:
covid_vax_df.dtypes

vaers_id                    int64
vax_type                   string
vax_manu                   string
vax_lot                    object
vax_dose_series            object
vax_route                  object
vax_site                   object
vax_name                   object
datetime           datetime64[ns]
created            datetime64[ns]
dtype: object

### Basic Imports and Feast Client initialization

In [10]:
import os

from feast import Client, Feature, Entity, ValueType, FeatureTable
from feast.data_source import FileSource, KafkaSource
from feast.data_format import ParquetFormat, AvroFormat

In [12]:
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
client = Client()

### Declare Features and Entities

In [13]:
vaers_id = Entity(name="vaers_id", description="VAERS Identification Number", value_type=ValueType.INT64)

In [14]:
age_yrs   = Feature("age_yrs", ValueType.FLOAT)
hospdays  = Feature("hospdays", ValueType.FLOAT)
died      = Feature("died", ValueType.STRING)
l_threat  = Feature("l_threat", ValueType.STRING)
recovd    = Feature("recovd", ValueType.STRING)

vax_type = Feature("vax_type", ValueType.STRING)
vax_manu = Feature("vax_manu", ValueType.STRING)

symptom1 = Feature("symptom1", ValueType.STRING)
symptom2 = Feature("symptom2", ValueType.STRING)
symptom3 = Feature("symptom3", ValueType.STRING)
symptom4 = Feature("symptom4", ValueType.STRING)
symptom5 = Feature("symptom5", ValueType.STRING)

In [15]:
# This is the location we're using for the offline feature store.

import os
demo_data_location = os.path.join(os.getenv("FEAST_SPARK_STAGING_LOCATION", "file:///home/jovyan/"), "test_data")

In [16]:
covid_data_source_uri = os.path.join(demo_data_location, "covid_data")

covid_data = FeatureTable(
    name = "covid_data",
    entities = ["vaers_id"],
    features = [
        age_yrs, 
        hospdays,
        died,    
        l_threat,
        recovd  
    ],
    batch_source=FileSource(
        event_timestamp_column="datetime",
        created_timestamp_column="created",
        file_format=ParquetFormat(),
        file_url=covid_data_source_uri,
        date_partition_column="date"
    )
)

In [17]:
covid_symptoms_source_uri = os.path.join(demo_data_location, "covid_symptoms")

covid_symptoms = FeatureTable(
    name = "covid_symptoms",
    entities = ["vaers_id"],
    features = [
        symptom1,
        symptom2,
        symptom3,
        symptom4,
        symptom5
    ],
    batch_source=FileSource(
        event_timestamp_column="datetime",
        created_timestamp_column="created",
        file_format=ParquetFormat(),
        file_url=covid_symptoms_source_uri,
        date_partition_column="date"
    )
)

In [18]:
covid_vax_source_uri = os.path.join(demo_data_location, "covid_vax")

covid_vax = FeatureTable(
name = "covid_vax",
    entities = ["vaers_id"],
    features = [
        vax_type,
        vax_manu
    ],
    batch_source=FileSource(
        event_timestamp_column="datetime",
        created_timestamp_column="created",
        file_format=ParquetFormat(),
        file_url=covid_vax_source_uri,
        date_partition_column="date"
    )
)

### Registering entities and feature tables in Feast Core

In [39]:
client.apply(vaers_id)
client.apply(covid_data)
client.apply(covid_symptoms)
client.apply(covid_vax)

In [40]:
print(client.get_feature_table("covid_data").to_yaml())
print(client.get_feature_table("covid_symptoms").to_yaml())
print(client.get_feature_table("covid_vax").to_yaml())

spec:
  name: covid_data
  entities:
  - vaers_id
  features:
  - name: hospdays
    valueType: FLOAT
  - name: died
    valueType: STRING
  - name: age_yrs
    valueType: FLOAT
  - name: l_threat
    valueType: STRING
  - name: recovd
    valueType: STRING
  batchSource:
    type: BATCH_FILE
    eventTimestampColumn: datetime
    datePartitionColumn: date
    createdTimestampColumn: created
    fileOptions:
      fileFormat:
        parquetFormat: {}
      fileUrl: file:///shared/staging/test_data/covid_data
meta:
  createdTimestamp: '2021-03-03T13:28:37Z'

spec:
  name: covid_symptoms
  entities:
  - vaers_id
  features:
  - name: symptom5
    valueType: STRING
  - name: symptom3
    valueType: STRING
  - name: symptom4
    valueType: STRING
  - name: symptom1
    valueType: STRING
  - name: symptom2
    valueType: STRING
  batchSource:
    type: BATCH_FILE
    eventTimestampColumn: datetime
    datePartitionColumn: date
    createdTimestampColumn: created
    fileOptions:
      file

In [43]:
entities = covid_data_df['vaers_id'].values

### Populating batch source

In [44]:
client.ingest(covid_data, covid_data_df)
client.ingest(covid_symptoms, covid_symptoms_df)
client.ingest(covid_vax, covid_vax_df)

Removing temporary file(s)...
Data has been successfully ingested into FeatureTable batch source.
Removing temporary file(s)...
Data has been successfully ingested into FeatureTable batch source.
Removing temporary file(s)...
Data has been successfully ingested into FeatureTable batch source.


In [45]:
import gcsfs
from pyarrow.parquet import ParquetDataset
from urllib.parse import urlparse

In [46]:
def read_parquet(uri):
    parsed_uri = urlparse(uri)
    if parsed_uri.scheme == "file":
        return pd.read_parquet(parsed_uri.path)
    elif parsed_uri.scheme == "gs":
        fs = gcsfs.GCSFileSystem()
        files = ["gs://" + path for path in fs.glob(uri + '/part-*')]
        ds = ParquetDataset(files, filesystem=fs)
        return ds.read().to_pandas()
    elif parsed_uri.scheme == 's3':
        import s3fs
        fs = s3fs.S3FileSystem()
        files = ["s3://" + path for path in fs.glob(uri + '/part-*')]
        ds = ParquetDataset(files, filesystem=fs)
        return ds.read().to_pandas()
    elif parsed_uri.scheme == 'wasbs':
        import adlfs
        fs = adlfs.AzureBlobFileSystem(
            account_name=os.getenv('FEAST_AZURE_BLOB_ACCOUNT_NAME'), account_key=os.getenv('FEAST_AZURE_BLOB_ACCOUNT_ACCESS_KEY')
        )
        uripath = parsed_uri.username + parsed_uri.path
        files = fs.glob(uripath + '/part-*')
        ds = ParquetDataset(files, filesystem=fs)
        return ds.read().to_pandas()
    else:
        raise ValueError(f"Unsupported URL scheme {uri}")

### Historical Retrieval for Training

In [47]:
entities_with_timestamp = pd.DataFrame(columns=['vaers_id', 'event_timestamp'])
entities_with_timestamp['vaers_id'] = np.random.choice(entities, 5, replace=False)
entities_with_timestamp['event_timestamp'] = pd.to_datetime(np.random.randint(
    datetime(2021, 1, 1).timestamp(),
    datetime(2021, 2, 12).timestamp(),
    size=5), unit='s')

entities_with_timestamp['event_timestamp'] = pd.to_datetime(entities_with_timestamp['event_timestamp']).dt.date
entities_with_timestamp['event_timestamp'] = pd.to_datetime(entities_with_timestamp['event_timestamp'])

entities_with_timestamp

Unnamed: 0,vaers_id,event_timestamp
0,918773,2021-01-17
1,993138,2021-01-19
2,990694,2021-02-08
3,950980,2021-01-25
4,933260,2021-01-30


In [48]:
job = client.get_historical_features(
    feature_refs=[
        "covid_data:age_yrs",
        "covid_data:hospdays",
        "covid_data:died",
        "covid_data:l_threat",
        "covid_data:recovd",
        "covid_symptoms:symptom1",
        "covid_symptoms:symptom2",
        "covid_symptoms:symptom3",
        "covid_symptoms:symptom4",
        "covid_symptoms:symptom5",
        "covid_vax:vax_type",
        "covid_vax:vax_manu"
    ], 
    entity_source=entities_with_timestamp
)

In [49]:
output_file_uri = job.get_output_file_uri()

In [50]:
test = read_parquet(output_file_uri)

In [52]:
test

Unnamed: 0,vaers_id,event_timestamp,covid_data__hospdays,covid_data__died,covid_data__age_yrs,covid_data__l_threat,covid_data__recovd,covid_symptoms__symptom5,covid_symptoms__symptom3,covid_symptoms__symptom4,covid_symptoms__symptom1,covid_symptoms__symptom2,covid_vax__vax_manu,covid_vax__vax_type
0,950980,2021-01-25,4.0,,50.0,Y,N,Immune thrombocytopenia,Hepatitis B test negative,Hepatitis C test negative,Computerised tomogram head normal,HIV test negative,MODERNA,COVID19
1,990694,2021-02-08,,,52.0,,N,,Migraine,Nausea,Diarrhoea,Fatigue,PFIZER\BIONTECH,COVID19
2,933260,2021-01-30,,,,,U,,,,Pemphigoid,,GLAXOSMITHKLINE BIOLOGICALS,VARZOS
3,993138,2021-01-19,,,69.0,,Y,Fatigue,Cough,Dizziness,Anaphylactoid reaction,Blood pressure decreased,MODERNA,COVID19
4,918773,2021-01-17,,,47.0,,N,,,,SARS-CoV-2 test positive,,PFIZER\BIONTECH,COVID19


The retrieved result can now be used for model training.

In [53]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [54]:
spark = SparkSession.builder.getOrCreate()

In [55]:
myData = spark.read.format("csv").option("header", "true").load("2021VAERSDATA.csv")

In [59]:
myData.schema

StructType(List(StructField(VAERS_ID,StringType,true),StructField(RECVDATE,StringType,true),StructField(STATE,StringType,true),StructField(AGE_YRS,StringType,true),StructField(CAGE_YR,StringType,true),StructField(CAGE_MO,StringType,true),StructField(SEX,StringType,true),StructField(RPT_DATE,StringType,true),StructField(SYMPTOM_TEXT,StringType,true),StructField(DIED,StringType,true),StructField(DATEDIED,StringType,true),StructField(L_THREAT,StringType,true),StructField(ER_VISIT,StringType,true),StructField(HOSPITAL,StringType,true),StructField(HOSPDAYS,StringType,true),StructField(X_STAY,StringType,true),StructField(DISABLE,StringType,true),StructField(RECOVD,StringType,true),StructField(VAX_DATE,StringType,true),StructField(ONSET_DATE,StringType,true),StructField(NUMDAYS,StringType,true),StructField(LAB_DATA,StringType,true),StructField(V_ADMINBY,StringType,true),StructField(V_FUNDBY,StringType,true),StructField(OTHER_MEDS,StringType,true),StructField(CUR_ILL,StringType,true),StructFie