# VAERS 
Data source : Vaccine Adverse Event Reporting System, U.S. Department of Health & Human Services https://vaers.hhs.gov

Data files : https://vaers.hhs.gov/data/datasets.html  


```
VAERS is a passive reporting system, meaning it relies on individuals to send in reports of their experiences to CDC and FDA. VAERS is not designed to determine if a vaccine caused a health problem, but is especially useful for detecting unusual or unexpected patterns of adverse event reporting that might indicate a possible safety problem with a vaccine. This way, VAERS can provide CDC and FDA with valuable information that additional work and evaluation is necessary to further assess a possible safety concern.
```
## Content
Data is accessible in CSV files, one per year (1990 to 2021 at this time) per dataset (data, symptoms, vaccines)
file name are formated as `<YEAR>VAERS<DATA|SYMPTOMS|VAX>.csv` except for non domestic data named `NonDomesticVAERS<DATA|SYMPTOMS|VAX>.csv`.

**Columns of CSV files are described in [VAERS DATA USE GUIDE](https://vaers.hhs.gov/docs/VAERSDataUseGuide_November2020.pdf)**

## data for this notebook
Download [All data compressed file (zip ~365MB, uncompressed ~1.5GB)](https://vaers.hhs.gov/eSubDownload/index.jsp?fn=AllVAERSDataCSVS.zip) and unzip files into dedicated dataset folders:
```
mkdir VAERS && \
cd VAERS && \
mkdir VAX && \
mkdir DATA && \
mkdir SYMPTOMS && \
unzip AllVAERSDataCSVS.zip && \
mv *VAERSDATA.csv DATA/ && \
mv *VAERSVAX.csv VAX/ && \
mv *VAERSSYMPTOMS.csv SYMPTOMS && \
echo "CSV data dispatched."
```

In [None]:
val sourceVAERScsvRoot="file:///home/taccart/Downloads/VAERS/"

In [None]:
case class Symptom(
    VAERS_ID: String
    ,SYMPTOM: String
    ,SYMPTOMVERSION: String

case class Vax(
    VAERS_ID: String
    ,VAX_TYPE: String
    ,VAX_MANU: String
    ,VAX_LOT: String 
    ,VAX_DOSE_SERIES: String 
    ,VAX_ROUTE: String
    ,VAX_SITE: String
    ,VAX_NAME: String
)
case class Data(
	VAERS_ID: String
	,RECVDATE: String
	,STATE: String
	,AGE_YRS: String
	,CAGE_YR: String
	,CAGE_MO: String
	,SEX: String
	,RPT_DATE: String
	,SYMPTOM_TEXT: String
	,DIED: String
	,DATEDIED: String
	,L_THREAT: String
	,ER_VISIT: String
	,HOSPITAL: String
	,HOSPDAYS: String
	,X_STAY: String
	,DISABLE: String
	,RECOVD: String
	,VAX_DATE: String
	,ONSET_DATE: String
	,NUMDAYS: String
	,LAB_DATA: String
	,V_ADMINBY: String
	,V_FUNDBY: String
	,OTHER_MEDS: String
	,CUR_ILL: String
	,HISTORY: String
	,PRIOR_VAX: String
	,SPLTTYPE: String
	,FORM_VERS: String
	,TODAYS_DATE: String
	,BIRTH_DEFECT: String
	,OFC_VISIT: String
	,ER_ED_VISIT: String
	,ALLERGIES: String
)

In [None]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql._

val symptomsCSVDF=spark.read
    .option("header", "true")
    .csv(s"${sourceVAERScsvRoot}/SYMPTOMS")
    .withColumn("sourceFile",input_file_name)
    .cache

val symptomsDF = symptomsCSVDF.where ("SYMPTOM1 !=null").select( "VAERS_ID", "sourceFile", "SYMPTOM1").withColumnRenamed("SYMPTOM1","SYMPTOM")
  .unionAll(symptomsCSVDF.where ("SYMPTOM2 !=null").select( "VAERS_ID", "sourceFile", "SYMPTOM2").withColumnRenamed("SYMPTOM2","SYMPTOM").withColumnRenamed("SYMPTOMVERSION2","SYMPTOMVERSION"))
  .unionAll(symptomsCSVDF.where ("SYMPTOM3 !=null").select( "VAERS_ID", "sourceFile", "SYMPTOM3").withColumnRenamed("SYMPTOM3","SYMPTOM").withColumnRenamed("SYMPTOMVERSION3","SYMPTOMVERSION"))
  .unionAll(symptomsCSVDF.where ("SYMPTOM4 !=null").select( "VAERS_ID", "sourceFile", "SYMPTOM4").withColumnRenamed("SYMPTOM4","SYMPTOM").withColumnRenamed("SYMPTOMVERSION4","SYMPTOMVERSION"))
  .unionAll(symptomsCSVDF.where ("SYMPTOM5 !=null").select( "VAERS_ID", "sourceFile", "SYMPTOM5").withColumnRenamed("SYMPTOM5","SYMPTOM").withColumnRenamed("SYMPTOMVERSION5","SYMPTOMVERSION"))
  .cache
println("Schema of symptomsDF : one row for each symptom.")
symptomsDF.printSchema
symptomsDF.show

val symptomsArrayDF= symptomsDF
    .groupBy("VAERS_ID","sourceFile")
    .agg(collect_list($"SYMPTOM")  as "SYMPTOMS")
   
.cache

println("Schema of symptomsArrayDF: one row for each vaers_id, with the array of symptoms.")
symptomsArrayDF.printSchema


val vaxDF=spark.read
    .option("header", "true")
    .csv(s"${sourceVAERScsvRoot}/VAX")
    .withColumn("sourceFile",input_file_name)

    .cache
println("Schema of vaxDF")
vaxDF.printSchema

val dataDF=spark.read
    .option("header", "true")
    .csv(s"${sourceVAERScsvRoot}/DATA")
    .withColumn("sourceFile",input_file_name)
    .cache
println("Schema of dataDF")
dataDF.printSchema


dataDF.createOrReplaceTempView("TData")
vaxDF.createOrReplaceTempView("TVax")
symptomsDF
  .where ("length(symptom) >=3").createOrReplaceTempView("TSymptoms")
symptomsArrayDF.createOrReplaceTempView("TSymptomsArray")
println("SparkSQL prepared tables:")
spark.sql("show tables").show

In [None]:
// verify presence of  VAERS_ID only in one sourceFile: this query should return no row.
//symptomsDF.groupBy("VAERS_ID","sourceFile").agg(collect_list($"SYMPTOM")  as "SYMPTOMS").groupBy("VAERS_ID").agg(count($"sourceFile")  as "C") .where("C >1") .show

In [None]:
// examples
val fullDF= dataDF
.join(vaxDF,dataDF.col("VAERS_ID") === vaxDF.col("VAERS_ID"), "inner")
.join(symptomsArrayDF,dataDF.col("VAERS_ID") === symptomsArrayDF.col("VAERS_ID"), "inner")

val covidDF= vaxDF
.where("VAX_TYPE like 'COVI%'")
.select("VAERS_ID","VAX_TYPE", "VAX_MANU", "VAX_ROUTE")
.join(symptomsArrayDF.select("VAERS_ID", "SYMPTOMS"), symptomsArrayDF.col("VAERS_ID" )===vaxDF.col("VAERS_ID"), "inner")

.join(dataDF, dataDF.col("VAERS_ID" )===vaxDF.col("VAERS_ID"), "inner")


In [None]:
spark.sql("select vaers_id, symptom from tsymptoms where symptom like '%bite%'").createOrReplaceTempView("tanimals")

In [None]:
spark.sql("show tables").show

# Unexpected data

## Bites: count where vax_type is COVID19 and  symptom is like bite

In [None]:
%%python
import numpy as np
import random as rand
import pandas as pd
spark.sql("select   VAX_TYPE, VAX_MANU,symptom, count(*) reportCount from tanimals inner join tvax on tanimals.vaers_id = tvax.vaers_id where vax_type='COVID19' group by VAX_TYPE, VAX_MANU,symptom").toPandas()

In [None]:
spark.sql("select vaers_id, count(vaers_id) countSymptoms from tsymptoms group by vaers_id")
.cache
.createOrReplaceTempView("tmanysymptoms")

##  number of reports by range of symptoms declared (by step of 10)
99% or reports have less than 16 symptoms

In [None]:
%%python
import numpy as np
import random as rand
import pandas as pd
spark.sql("select VAX_TYPE, VAX_MANU, ceil(countSymptoms/10)*10 rangeMaxSymptoms, count(*) as countReports  from tmanysymptoms  inner join tvax on tmanysymptoms.vaers_id = tvax.vaers_id where vax_type='COVID19' group by VAX_TYPE, VAX_MANU, ceil(countSymptoms/10) order by 1,2,3") .toPandas()

# Quality score 
Suspicious records : 
* coming from non medical source
* having too many symptoms declared =>  exclude the reports having a count of simultaneous symptoms in top 1% per vax.
* suspected to be a duplicate of a previous one => 
* too much time elapsed between vax and symptom => ? 


In [None]:
// %%python
// import numpy as np
// import random as rand
// import pandas as pd
spark.sql("select * from tdata order by numdays desc limit 25  ") .show
// .toPandas()

In [None]:
spark.sql("select symptom, count(symptom)  from x  group by symptom ").count