The purpose of this notebook is to demonstrate a basic understanding of the use of PySpark dataframes.

In [1]:
from pyspark.sql import SparkSession 
import pyspark.sql.functions as F
import pyspark.sql.types as T
import pyspark.sql.window as W
import pyspark.pandas as ps

import pandas as pd
import numpy as np



In [2]:
spark = SparkSession.builder.master('local[*]').config("spark.driver.memory",'15g')\
    .config('spark.ui.showConsoleProgress','false').appName("example").getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/08 19:01:11 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
df = spark.read.csv("./vaers_jan_nov_2021.csv", header=True, inferSchema=True, sep=',', quote='"', escape='"')

In [4]:
df.columns

['VAERS_ID',
 'SYMPTOM1',
 'SYMPTOMVERSION1',
 'SYMPTOM2',
 'SYMPTOMVERSION2',
 'SYMPTOM3',
 'SYMPTOMVERSION3',
 'SYMPTOM4',
 'SYMPTOMVERSION4',
 'SYMPTOM5',
 'SYMPTOMVERSION5',
 'VAX_TYPE',
 'VAX_MANU',
 'VAX_LOT',
 'VAX_DOSE_SERIES',
 'VAX_ROUTE',
 'VAX_SITE',
 'VAX_NAME',
 'RECVDATE',
 'STATE',
 'AGE_YRS',
 'CAGE_YR',
 'CAGE_MO',
 'SEX',
 'RPT_DATE',
 'SYMPTOM_TEXT',
 'DIED',
 'DATEDIED',
 'L_THREAT',
 'ER_VISIT',
 'HOSPITAL',
 'HOSPDAYS',
 'X_STAY',
 'DISABLE',
 'RECOVD',
 'VAX_DATE',
 'ONSET_DATE',
 'NUMDAYS',
 'LAB_DATA',
 'V_ADMINBY',
 'V_FUNDBY',
 'OTHER_MEDS',
 'CUR_ILL',
 'HISTORY',
 'PRIOR_VAX',
 'SPLTTYPE',
 'FORM_VERS',
 'TODAYS_DATE',
 'BIRTH_DEFECT',
 'OFC_VISIT',
 'ER_ED_VISIT',
 'ALLERGIES']

In [5]:
df.dtypes

[('VAERS_ID', 'int'),
 ('SYMPTOM1', 'string'),
 ('SYMPTOMVERSION1', 'double'),
 ('SYMPTOM2', 'string'),
 ('SYMPTOMVERSION2', 'double'),
 ('SYMPTOM3', 'string'),
 ('SYMPTOMVERSION3', 'double'),
 ('SYMPTOM4', 'string'),
 ('SYMPTOMVERSION4', 'double'),
 ('SYMPTOM5', 'string'),
 ('SYMPTOMVERSION5', 'double'),
 ('VAX_TYPE', 'string'),
 ('VAX_MANU', 'string'),
 ('VAX_LOT', 'string'),
 ('VAX_DOSE_SERIES', 'string'),
 ('VAX_ROUTE', 'string'),
 ('VAX_SITE', 'string'),
 ('VAX_NAME', 'string'),
 ('RECVDATE', 'string'),
 ('STATE', 'string'),
 ('AGE_YRS', 'double'),
 ('CAGE_YR', 'double'),
 ('CAGE_MO', 'double'),
 ('SEX', 'string'),
 ('RPT_DATE', 'string'),
 ('SYMPTOM_TEXT', 'string'),
 ('DIED', 'string'),
 ('DATEDIED', 'string'),
 ('L_THREAT', 'string'),
 ('ER_VISIT', 'string'),
 ('HOSPITAL', 'string'),
 ('HOSPDAYS', 'double'),
 ('X_STAY', 'string'),
 ('DISABLE', 'string'),
 ('RECOVD', 'string'),
 ('VAX_DATE', 'string'),
 ('ONSET_DATE', 'string'),
 ('NUMDAYS', 'double'),
 ('LAB_DATA', 'string'),
 

In [6]:
df.limit(5).toPandas()

24/06/08 19:01:17 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Unnamed: 0,VAERS_ID,SYMPTOM1,SYMPTOMVERSION1,SYMPTOM2,SYMPTOMVERSION2,SYMPTOM3,SYMPTOMVERSION3,SYMPTOM4,SYMPTOMVERSION4,SYMPTOM5,...,CUR_ILL,HISTORY,PRIOR_VAX,SPLTTYPE,FORM_VERS,TODAYS_DATE,BIRTH_DEFECT,OFC_VISIT,ER_ED_VISIT,ALLERGIES
0,916600,Dysphagia,23.1,Epiglottitis,23.1,,,,,,...,,,,,2,01/01/2021,,Y,,Pcn and bee venom
1,916601,Anxiety,23.1,Dyspnoea,23.1,,,,,,...,Patient residing at nursing facility. See pati...,Patient residing at nursing facility. See pati...,,,2,01/01/2021,,Y,,"""Dairy"""
2,916602,Chest discomfort,23.1,Dysphagia,23.1,Pain in extremity,23.1,Visual impairment,23.1,,...,,,,,2,01/01/2021,,,Y,Shellfish
3,916603,Dizziness,23.1,Fatigue,23.1,Mobility decreased,23.1,,,,...,kidney infection,"diverticulitis, mitral valve prolapse, osteoar...","got measles from measel shot, mums from mumps ...",,2,01/01/2021,,,,"Diclofenac, novacaine, lidocaine, pickles, tom..."
4,916604,Injection site erythema,23.1,Injection site pruritus,23.1,Injection site swelling,23.1,Injection site warmth,23.1,,...,Na,,,,2,01/01/2021,,,,Na


As we can see, each SYMPTOMVERSION column should have likely been a string. Let's make that conversion now

In [7]:
df = df.withColumn("SYMPTOMVERSION1", df.SYMPTOMVERSION1.cast(T.StringType())).\
withColumn("SYMPTOMVERSION2", df.SYMPTOMVERSION2.cast('string')).\
withColumn("SYMPTOMVERSION3", df.SYMPTOMVERSION3.cast('string')).\
withColumn("SYMPTOMVERSION4", df.SYMPTOMVERSION4.cast('string')).\
withColumn("SYMPTOMVERSION5", df.SYMPTOMVERSION5.cast('string'))

In [8]:
df.dtypes

[('VAERS_ID', 'int'),
 ('SYMPTOM1', 'string'),
 ('SYMPTOMVERSION1', 'string'),
 ('SYMPTOM2', 'string'),
 ('SYMPTOMVERSION2', 'string'),
 ('SYMPTOM3', 'string'),
 ('SYMPTOMVERSION3', 'string'),
 ('SYMPTOM4', 'string'),
 ('SYMPTOMVERSION4', 'string'),
 ('SYMPTOM5', 'string'),
 ('SYMPTOMVERSION5', 'string'),
 ('VAX_TYPE', 'string'),
 ('VAX_MANU', 'string'),
 ('VAX_LOT', 'string'),
 ('VAX_DOSE_SERIES', 'string'),
 ('VAX_ROUTE', 'string'),
 ('VAX_SITE', 'string'),
 ('VAX_NAME', 'string'),
 ('RECVDATE', 'string'),
 ('STATE', 'string'),
 ('AGE_YRS', 'double'),
 ('CAGE_YR', 'double'),
 ('CAGE_MO', 'double'),
 ('SEX', 'string'),
 ('RPT_DATE', 'string'),
 ('SYMPTOM_TEXT', 'string'),
 ('DIED', 'string'),
 ('DATEDIED', 'string'),
 ('L_THREAT', 'string'),
 ('ER_VISIT', 'string'),
 ('HOSPITAL', 'string'),
 ('HOSPDAYS', 'double'),
 ('X_STAY', 'string'),
 ('DISABLE', 'string'),
 ('RECOVD', 'string'),
 ('VAX_DATE', 'string'),
 ('ONSET_DATE', 'string'),
 ('NUMDAYS', 'double'),
 ('LAB_DATA', 'string'),
 

Now, let's get a summary of numerical columns

In [9]:
types = df.dtypes
sel_types = [t[0] for t in types if (t[1] == 'double' or t[1] == 'int') and t[0] != 'VAERS_ID']
sel_types

['AGE_YRS', 'CAGE_YR', 'CAGE_MO', 'HOSPDAYS', 'NUMDAYS', 'FORM_VERS']

In [10]:
df.select(sel_types).summary().show()

+-------+-----------------+------------------+-------------------+------------------+-----------------+-------------------+
|summary|          AGE_YRS|           CAGE_YR|            CAGE_MO|          HOSPDAYS|          NUMDAYS|          FORM_VERS|
+-------+-----------------+------------------+-------------------+------------------+-----------------+-------------------+
|  count|           819268|            728957|               1327|             65176|           799488|             890836|
|   mean| 50.0537900174302| 49.76858168588819|0.06156744536548606|21.109119921443476|28.33089802473583| 1.9995891499669973|
| stddev|18.57464863940989|18.740088977103554|0.15742062021670353|1238.5330170310406|595.4422481471431|0.02026528302936239|
|    min|             0.08|               0.0|                0.0|               1.0|              0.0|                  1|
|    25%|             36.0|              35.0|                0.0|               2.0|              0.0|                  2|
|    50%

In [11]:
for col in df.columns:
    if not col in sel_types:
        print(col, ': ', df.select(col).distinct().count())

VAERS_ID :  633063
SYMPTOM1 :  7077
SYMPTOMVERSION1 :  3
SYMPTOM2 :  6753
SYMPTOMVERSION2 :  4
SYMPTOM3 :  6298
SYMPTOMVERSION3 :  4
SYMPTOM4 :  5711
SYMPTOMVERSION4 :  4
SYMPTOM5 :  5283
SYMPTOMVERSION5 :  4
VAX_TYPE :  1
VAX_MANU :  4
VAX_LOT :  27032
VAX_DOSE_SERIES :  9
VAX_ROUTE :  10
VAX_SITE :  12
VAX_NAME :  4
RECVDATE :  309
STATE :  65
SEX :  3
RPT_DATE :  82
SYMPTOM_TEXT :  599419
DIED :  2
DATEDIED :  326
L_THREAT :  2
ER_VISIT :  2
HOSPITAL :  2
X_STAY :  2
DISABLE :  2
RECOVD :  4
VAX_DATE :  1200
ONSET_DATE :  714
LAB_DATA :  143626
V_ADMINBY :  9
V_FUNDBY :  5
OTHER_MEDS :  237179
CUR_ILL :  56348
HISTORY :  163605
PRIOR_VAX :  26716
SPLTTYPE :  93557
TODAYS_DATE :  366
BIRTH_DEFECT :  2
OFC_VISIT :  2
ER_ED_VISIT :  2
ALLERGIES :  108132


In [12]:
pd_df = pd.read_csv("./vaers_jan_nov_2021.csv")
pd_df.nunique()

  pd_df = pd.read_csv("./vaers_jan_nov_2021.csv")


VAERS_ID           633063
SYMPTOM1             7077
SYMPTOMVERSION1         3
SYMPTOM2             6752
SYMPTOMVERSION2         3
SYMPTOM3             6297
SYMPTOMVERSION3         3
SYMPTOM4             5710
SYMPTOMVERSION4         3
SYMPTOM5             5282
SYMPTOMVERSION5         3
VAX_TYPE                1
VAX_MANU                4
VAX_LOT             27030
VAX_DOSE_SERIES         8
VAX_ROUTE               9
VAX_SITE               11
VAX_NAME                4
RECVDATE              309
STATE                  64
AGE_YRS               142
CAGE_YR               116
CAGE_MO                11
SEX                     3
RPT_DATE               81
SYMPTOM_TEXT       599417
DIED                    1
DATEDIED              325
L_THREAT                1
ER_VISIT                1
HOSPITAL                1
HOSPDAYS               99
X_STAY                  1
DISABLE                 1
RECOVD                  3
VAX_DATE             1199
ONSET_DATE            713
NUMDAYS               844
LAB_DATA    

In [15]:
pd_df[[col for col in pd_df.columns if col in sel_types]].describe()

Unnamed: 0,AGE_YRS,CAGE_YR,CAGE_MO,HOSPDAYS,NUMDAYS,FORM_VERS
count,819268.0,728957.0,1327.0,65176.0,799488.0,890836.0
mean,50.05379,49.768582,0.061567,21.10912,28.330898,1.999589
std,18.574649,18.740089,0.157421,1238.533017,595.442248,0.020265
min,0.08,0.0,0.0,1.0,0.0,1.0
25%,36.0,35.0,0.0,2.0,0.0,2.0
50%,50.0,50.0,0.0,3.0,1.0,2.0
75%,65.0,64.0,0.0,7.0,7.0,2.0
max,119.0,120.0,1.0,99999.0,44224.0,2.0


What we can see here is that for basic operations, our data is not yet at the scale where the inherent parallelism outweighs the cost of the overhead for spark clusters.

In [16]:
df_symptoms = df.withColumn("SymptomsList", F.array('SYMPTOM1', 'SYMPTOM2', 'SYMPTOM3', 'SYMPTOM4', 'SYMPTOM5')).select('VAERS_ID', 'SymptomsList')
df_symptoms = df_symptoms.withColumn("SymptomsList", F.array_compact('SymptomsList'))
df_symptoms.columns

['VAERS_ID', 'SymptomsList']

In [17]:
df_symptoms.select('SymptomsList').show(5)

+--------------------+
|        SymptomsList|
+--------------------+
|[Dysphagia, Epigl...|
| [Anxiety, Dyspnoea]|
|[Chest discomfort...|
|[Dizziness, Fatig...|
|[Injection site e...|
+--------------------+
only showing top 5 rows



In [18]:
import itertools
from pyspark.sql.functions import pandas_udf, udf

@udf(returnType=T.ArrayType(T.ArrayType(T.StringType())))
def permutations(x:list) -> list:
    return list(itertools.permutations(x, 2))

df_symptoms = df_symptoms.withColumn("SymptomsPermutations", permutations(df_symptoms.SymptomsList)).select('VAERS_ID', 'SymptomsPermutations')
df_symptoms = df_symptoms.withColumn("SymptomsPermutations", F.explode('SymptomsPermutations')).select('VAERS_ID', 'SymptomsPermutations')

In [19]:
df_symptoms.select('VAERS_ID','SymptomsPermutations').show(5)

+--------+--------------------+
|VAERS_ID|SymptomsPermutations|
+--------+--------------------+
|  916600|[Vomiting, Epiglo...|
|  916600|[Epiglottitis, Vo...|
|  916601| [Anxiety, Dyspnoea]|
|  916601| [Dyspnoea, Anxiety]|
|  916602|[Chest discomfort...|
+--------+--------------------+
only showing top 5 rows



In [20]:
out_df = df_symptoms.groupby('SymptomsPermutations').agg(F.count('VAERS_ID').alias('Count')).orderBy(F.desc('Count'))
out_df.show(5)

+--------------------+-----+
|SymptomsPermutations|Count|
+--------------------+-----+
| [Fatigue, Headache]|38537|
| [Headache, Fatigue]|38537|
|  [Headache, Chills]|36056|
|  [Chills, Headache]|36056|
|   [Chills, Fatigue]|30023|
+--------------------+-----+
only showing top 5 rows



7.8 seconds to do the transformations and then group everything is pretty impressive. Let's see what the equivalent in normal Pandas takes.

In [21]:
pd_df['SymptomsList'] = pd_df[['SYMPTOM1', 'SYMPTOM2', 'SYMPTOM3', 'SYMPTOM4', 'SYMPTOM5']].values.tolist()
pd_df['SymptomsList'] = pd_df['SymptomsList'].apply(lambda x: list(filter(None, x)))
pd_df['SymptomsPermutations'] = pd_df['SymptomsList'].apply(lambda x: list(itertools.permutations(x, 2)))
pd_df = pd_df.explode('SymptomsPermutations')

: 

: 