In [1]:
! pip install pyspark



In [2]:
! pip install findspark



In [3]:
! pip install spark



In [4]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.4.0'
spark_version = 'spark-3.4.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
0% [Connecting to archive.ubuntu.com (91.189.91.81)] [Connecting to security.ub                                                                               Hit:2 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
                                                                               Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
0% [Waiting for headers] [Waiting for headers] [Connected to ppa.launchpadconte                                                                               Hit:4 http://security.ubuntu.com/ubuntu jammy-security InRelease
0% [Waiting for headers] [Connected to ppa.launchpadcontent.net (185.125.190.52                                                                               Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:6 https://ppa.launchpadcontent.net/

In [5]:
# Import packages
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType,StructField,StringType, DateType,IntegerType
from pyspark import SparkFiles
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


In [6]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("LeadBloodLevels").getOrCreate()

In [7]:
from pyspark import SparkFiles

lead_df = spark.read.csv("cleaned_blood_data.csv", sep=",", header=True,inferSchema=True)

In [8]:
#Second DataFrame Lead levels in blood
lead_df.show(10)


+------+-----------+-----+----+-------------+-----+------------------+-----------+------------+----------+---------------------------+--------------+-------+--------------------+--------------------+
|County|County Code|  Zip|Year|Year of Birth|Tests|Less than 5 mcg/dL|5-10 mcg/dL|10-15 mcg/dL|15+ mcg/dL|Total Elevated Blood Levels|Rate per 1,000|Percent|   Zip Code Location|     County Location|
+------+-----------+-----+----+-------------+-----+------------------+-----------+------------+----------+---------------------------+--------------+-------+--------------------+--------------------+
|Albany|          1|12009|2020|         2019|   30|              30.0|        0.0|         0.0|       0.0|                        0.0|           0.0|    0.0|(42.697778, -74.0...|(42.588271, -73.9...|
|Albany|          1|12023|2020|         2019|    7|               7.0|        0.0|         0.0|       0.0|                        0.0|           0.0|    0.0|(42.60636, -74.1438)|(42.588271, -73.9...|


In [9]:
#look at the statistics of lead in blood levels of children
lead_df.select(['Less than 5 mcg/dL','5-10 mcg/dL','10-15 mcg/dL','15+ mcg/dL']).describe().show()

+-------+------------------+--------------------+------------+----------+
|summary|Less than 5 mcg/dL|         5-10 mcg/dL|10-15 mcg/dL|15+ mcg/dL|
+-------+------------------+--------------------+------------+----------+
|  count|              1817|                1817|        1817|      1817|
|   mean| 25.73913043478261|0.022564667033571822|         0.0|       0.0|
| stddev| 33.12551510551277|  0.3969007470413673|         0.0|       0.0|
|    min|               6.0|                 0.0|         0.0|       0.0|
|    max|             376.0|                 9.0|         0.0|       0.0|
+-------+------------------+--------------------+------------+----------+



# Lead in Blood Data
First read in the data using pyspark, dropped columns that did not have testable data in them, then split the data into Training and testing sets to check the accuracy of the following question: Based on the County Information how many children have lead in their blood levels?


In [41]:
#Drop specific columns that are not needed I created a variable, so it would copy and not edit the original dataframe
df=lead_df.drop('Zip Code Location','County Location','Total Elevated Blood Levels','County Code','Zip','Year','Rate per 1,000','Percent','5-10 mcg/dL','10-15 mcg/dL','15+ mcg/dL')

#rename columns to eliminate spaces
df=df.withColumnRenamed("Year of Birth","BirthYear")\
                    .withColumnRenamed("Less than 5 mcg/dL","Level")
df.show(2)


+------+-------------+-----+------------------+
|County|Year of Birth|Tests|Less than 5 mcg/dL|
+------+-------------+-----+------------------+
|Albany|         2019|   30|              30.0|
|Albany|         2019|    7|               7.0|
+------+-------------+-----+------------------+
only showing top 2 rows



Split Train/Test lead Data

In [68]:
from pyspark.ml.feature import StringIndexer
#create an indexer
county_indexer=StringIndexer(inputCol='County',outputCol='CountyIndex')


In [69]:
from pyspark.ml.feature import OneHotEncoder
onehot_encoder=OneHotEncoder(inputCol="CountyIndex",outputCol="County_vec")


In [70]:
#Merge columns into a vector column
vector_assembler=VectorAssembler(inputCols=['BirthYear','Tests','Level','County_vec'],outputCol="features")

In [71]:
#Create a Pipeline
pipeline=Pipeline(stages=[county_indexer,onehot_encoder,vector_assembler])

In [72]:
#fit and transform
df_transformed=pipeline.fit(df).transform(df)
df_transformed.show(3)

+------+---------+-----+-----+-----------+---------------+--------------------+
|County|BirthYear|Tests|Level|CountyIndex|     County_vec|            features|
+------+---------+-----+-----+-----------+---------------+--------------------+
|Albany|     2019|   30| 30.0|       13.0|(55,[13],[1.0])|(58,[0,1,2,16],[2...|
|Albany|     2019|    7|  7.0|       13.0|(55,[13],[1.0])|(58,[0,1,2,16],[2...|
|Albany|     2017|    6|  6.0|       13.0|(55,[13],[1.0])|(58,[0,1,2,16],[2...|
+------+---------+-----+-----+-----------+---------------+--------------------+
only showing top 3 rows



In [136]:
df_transformed=df_transformed.select('CountyIndex','features')

In [137]:
#Split the data into training and testing sets
train,test=df_transformed.randomSplit([0.8,0.2],seed=52)


## Decsion Tree Model
Looking at counties and whether or not children tested in those counties showed up with lead in their blood work the model proved that:
Test Error=41%
Accuracy=58%

In [138]:
#Create Decsion Tree Classifier
tree=DecisionTreeClassifier(labelCol="CountyIndex",featuresCol="features")
# train our model using training data
model=tree.fit(train)
# test our model and make predictions using testing data
prediction=model.transform(test)

In [139]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="CountyIndex", predictionCol="prediction",metricName="accuracy")
accuracy = evaluator.evaluate(prediction)
print("Test Error = %g " % (1.0 - accuracy))
print("Accuracy = %g " % accuracy)

Test Error = 0.41989 
Accuracy = 0.58011 


## Random Forest Model
Looking at counties and whether or not children tested in those counties showed up with lead in their blood work the model proved that:
Test Error=24%
Accuracy=75%

In [140]:
from pyspark.ml.classification import RandomForestClassifier


rf = RandomForestClassifier(labelCol="CountyIndex",featuresCol="features", numTrees=9)
model = rf.fit(train)
# test our model and make predictions using testing data
predictions = model.transform(test)
predictions.select("prediction", "CountyIndex")

# evaluate the performance of the classifier
evaluator = MulticlassClassificationEvaluator(labelCol="CountyIndex",predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Accuracy = %g " % accuracy)

Test Error = 0.245856
Accuracy = 0.754144 


For this data that we were evaluating the Random Forest Classifier Model gave the most accurate findings and that would be the model we suggest to use