In [141]:
! pip install pyspark



In [142]:
! pip install findspark



In [143]:
! pip install spark



In [144]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.4.0'
spark_version = 'spark-3.4.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
0% [Waiting for headers] [1 InRelease 14.2 kB/110 kB 13%] [Connected to cloud.r                                                                               Hit:2 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Waiting for headers] [1 InRelease 110 kB/110 kB 100%] [Connected to cloud.r                                                                               Get:3 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
0% [Waiting for headers] [Connected to cloud.r-project.org (65.9.86.28)] [Conne                                                                               Get:4 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [109 kB]
0% [4 InRelease 54.7 kB/109 kB 50%] [Connected to cloud.r-project.org (65.9.86.0% [Waiting for headers] [Connected to ppa.launchpadcontent.net (185.125.190.52                                                      

In [145]:
# Import packages
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType,StructField,StringType, DateType,IntegerType
from pyspark import SparkFiles
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


In [146]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("LeadWaterLevels").getOrCreate()

In [147]:
from pyspark import SparkFiles

school_df = spark.read.csv("cleaned_water_data.csv", sep=",", header=True,inferSchema=True)

In [148]:
#Data Frame School Water
school_df.show(5)


+------+-----+--------------------+--------------------+-------------------------------------+---------------------------------------+----------------+----------------+--------------------+
|County|  Zip|              School|Type of Organization|Any Building with Lead-Free Plumbing?|Number of Outlets that Require Sampling|Outlets ≤ 15 ppb|Outlets > 15 ppb|     County Location|
+------+-----+--------------------+--------------------+-------------------------------------+---------------------------------------+----------------+----------------+--------------------+
|Albany|12009|         ALTAMONT ES|       Public School|                                   No|                                   63.0|            63.0|             0.0|(42.678066, -73.8...|
|Albany|12047|ABRAM LANSING SCHOOL|       Public School|                                   No|                                   74.0|            74.0|             0.0|(42.678066, -73.8...|
|Albany|12047|  BOGHT HILLS SCHOOL|       Public S

In [149]:
#look at the statistics of plumbing outlets
school_df.select(['Outlets ≤ 15 ppb','Outlets > 15 ppb']).describe().show()

+-------+------------------+-----------------+
|summary|  Outlets ≤ 15 ppb| Outlets > 15 ppb|
+-------+------------------+-----------------+
|  count|              3366|             3366|
|   mean| 80.46850861556744|4.065359477124183|
| stddev|48.434493872706874|8.314144501152382|
|    min|               0.0|              0.0|
|    max|             420.0|            143.0|
+-------+------------------+-----------------+



# Schools in New York Water Data
First read in the data using pyspark, dropped columns that did not have testable data in them, then split the data into Training and testing sets to check the accuracy of the following question:
    Are there any schools in New york that have Lead Free Plumbing?

In [155]:
#Drop specific columns that are not needed I created a variable, so it would copy and not edit the original dataframe
df=school_df.drop('County','County Location','Type of Organization','School')


#rename columns to eliminate spaces
df=df.withColumnRenamed("Zip","ZipCode")\
                    .withColumnRenamed("Number of Outlets that Require Sampling","numberOfSamples")\
                    .withColumnRenamed("Outlets ≤ 15 ppb","OverFifteen")\
                    .withColumnRenamed("Outlets > 15 ppb","LessFifteen")
df.show(2)

+-------+-------------------------------------+---------------+-----------+-----------+
|ZipCode|Any Building with Lead-Free Plumbing?|numberOfSamples|OverFifteen|LessFifteen|
+-------+-------------------------------------+---------------+-----------+-----------+
|  12009|                                   No|           63.0|       63.0|        0.0|
|  12047|                                   No|           74.0|       74.0|        0.0|
+-------+-------------------------------------+---------------+-----------+-----------+
only showing top 2 rows



Split Train/Test lead Data

In [156]:
from pyspark.ml.feature import StringIndexer
#create an indexer
indexer= StringIndexer(inputCol='Any Building with Lead-Free Plumbing?',outputCol='BuildingIndex')

In [157]:
from pyspark.ml.feature import OneHotEncoder
onehot_encoder=OneHotEncoder(inputCol="BuildingIndex",outputCol="Building_vec")


In [161]:
#Merge columns into a vector column
vector_assembler=VectorAssembler(inputCols=['ZipCode', 'numberOfSamples','OverFifteen','LessFifteen','Building_vec'],outputCol="features")

In [162]:
#Create a Pipeline
pipeline=Pipeline(stages=[indexer,onehot_encoder,vector_assembler])

In [163]:
#fit and transform
df_transformed=pipeline.fit(df).transform(df)
df_transformed.show(3)

+-------+-------------------------------------+---------------+-----------+-----------+-------------+-------------+--------------------+
|ZipCode|Any Building with Lead-Free Plumbing?|numberOfSamples|OverFifteen|LessFifteen|BuildingIndex| Building_vec|            features|
+-------+-------------------------------------+---------------+-----------+-----------+-------------+-------------+--------------------+
|  12009|                                   No|           63.0|       63.0|        0.0|          0.0|(3,[0],[1.0])|[12009.0,63.0,63....|
|  12047|                                   No|           74.0|       74.0|        0.0|          0.0|(3,[0],[1.0])|[12047.0,74.0,74....|
|  12047|                                   No|           81.0|       80.0|        1.0|          0.0|(3,[0],[1.0])|[12047.0,81.0,80....|
+-------+-------------------------------------+---------------+-----------+-----------+-------------+-------------+--------------------+
only showing top 3 rows



In [164]:
df_transformed=df_transformed.select('BuildingIndex','features')

In [165]:
#Split the data into training and testing sets
train,test=df_transformed.randomSplit([0.8,0.2],seed=52)


## Decsion Tree Model
Looking at the school building and whether there is Any Building with Lead-Free Plumbing. The Results were:
Test Error-0%
Accuracy-99%

In [166]:
#Create Decsion Tree Classifier
tree=DecisionTreeClassifier(labelCol="BuildingIndex",featuresCol="features")
# train our model using training data
model=tree.fit(train)
# test our model and make predictions using testing data
prediction=model.transform(test)

In [168]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="BuildingIndex", predictionCol="prediction",metricName="accuracy")
accuracy = evaluator.evaluate(prediction)
print("Test Error = %g " % (1.0 - accuracy))
print("Accuracy = %g " % accuracy)

Test Error = 0.00149477 
Accuracy = 0.998505 


## Random Forest Model
Looking at the school building and whether there is Any Building with Lead-Free Plumbing. The Results were:
Test Error=0%
Accuracy=100%

In [169]:
from pyspark.ml.classification import RandomForestClassifier


rf = RandomForestClassifier(labelCol="BuildingIndex",featuresCol="features", numTrees=9)
model = rf.fit(train)
# test our model and make predictions using testing data
predictions = model.transform(test)
predictions.select("prediction", "BuildingIndex")

# evaluate the performance of the classifier
evaluator = MulticlassClassificationEvaluator(labelCol="BuildingIndex",predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Accuracy = %g " % accuracy)

Test Error = 0
Accuracy = 1 


For this data that we were evaluating the Random Forest Classifier Model gave the most accurate findings and that would be the model we suggest to use