In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import  VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.ml.evaluation import RegressionEvaluator


ModuleNotFoundError: No module named 'pyspark'

In [1]:
#loading data
tval=spark.read.csv('wasb:///data/train_values_oAriVNN.csv', inferSchema=True, header=True)
tlab=spark.read.csv('wasb:///data/train_labels.csv', inferSchema=True, header=True)
out=spark.read.csv('wasb:///data/test_values.csv', inferSchema=True, header=True)


NameError: name 'spark' is not defined

In [None]:
#merging data (values+labels)
data1 = tval.join(tlab, tval.row_id == tval.row_id)
data1.show()

In [None]:
# Select features and label
data = data1.select("population", "renter_occupied_households", "pct_renter_occupied", "pct_asian", "rucc", "urban_influence", "economic_typology","median_gross_rent", "pct_adults_bachelors_or_higher", "pct_af_am", "pct_other", "birth_rate_per_1k", "median_property_value", "homicides_per_100k", "rent_burden", "pct_hispanic", "median_household_income",
                    "pct_female", "pct_below_18_years_of_age", "pct_civilian_labor", "pct_multiple", "pct_low_birthweight", "pct_uninsured_adults", "poverty_rate", "pct_nh_pi", "pct_excessive_drinking", "pct_unemployment", "pct_adults_with_some_college", "air_pollution_particulate_matter_value", "pct_uninsured_children", "pct_am_ind", "pct_adults_less_than_a_high_school_diploma", "heart_disease_mortality_per_100k", "pct_diabetes", "pct_adult_obesity", "pop_per_primary_care_physician", "pct_adult_smoking", "pct_physical_inactivity", "pop_per_dentist", "death_rate_per_1k", "motor_vehicle_crash_deaths_per_100k", "pct_aged_65_years_and_older", "pct_white", "pct_adults_with_high_school_diploma",col("evictions").alias("label"))
data.count()

In [None]:
#handling missing values flights.dropDuplicates()
#data=flights.fillna(value=0, subset=["pct_adult_smoking", "pct_low_birthweight", "pct_excessive_drinking", "homicides_per_100k", "motor_vehicle_crash_deaths_per_100k", "pop_per_dentist", "pop_per_primary_care_physician"])
#data.count()

In [None]:
# Split the data
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")

In [None]:
#Define the pipeline

#categorical values
strIdx1 = StringIndexer(inputCol = "rucc", outputCol = "ruccIdx")
strIdx2 = StringIndexer(inputCol = "urban_influence", outputCol = "urban_influenceIdx")
strIdx3 = StringIndexer(inputCol = "economic_typology", outputCol = "economic_typologyIdx")
catVect = VectorAssembler(inputCols = ["ruccIdx", "urban_influenceIdx", "economic_typologyIdx"], outputCol="catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")
#numeric values
numVect = VectorAssembler(inputCols = ["population", "renter_occupied_households", "pct_renter_occupied", "pct_asian", "median_gross_rent", "pct_adults_bachelors_or_higher", "pct_af_am", "pct_other", "birth_rate_per_1k", "median_property_value", "homicides_per_100k", "rent_burden", "pct_hispanic", "median_household_income", "pct_female", "pct_below_18_years_of_age", "pct_civilian_labor", "pct_multiple", "pct_low_birthweight", "pct_uninsured_adults", "poverty_rate", "pct_nh_pi", "pct_excessive_drinking", "pct_unemployment", "pct_adults_with_some_college", "air_pollution_particulate_matter_value", "pct_uninsured_children", "pct_am_ind", "pct_adults_less_than_a_high_school_diploma", "heart_disease_mortality_per_100k", "pct_diabetes", "pct_adult_obesity", "pop_per_primary_care_physician", "pct_adult_smoking", "pct_physical_inactivity", "pop_per_dentist", "death_rate_per_1k", "motor_vehicle_crash_deaths_per_100k", "pct_aged_65_years_and_older", "pct_white", "pct_adults_with_high_school_diploma"], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")
lr = LinearRegression(labelCol="label", featuresCol="features")
pipeline = Pipeline(stages=[strIdx1,strIdx2,strIdx3, catVect, catIdx, numVect, minMax, featVect, lr])

In [None]:
#tune parameters
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.3, 0.1, 0.01]).addGrid(lr.maxIter, [25, 15]).build()
tvs = TrainValidationSplit(estimator=pipeline, evaluator=RegressionEvaluator(), estimatorParamMaps=paramGrid, trainRatio=0.8)

model = tvs.fit(train)

In [None]:
# Test the Model
prediction = model.transform(test)
predicted = prediction.select("features", "prediction", "trueLabel")
predicted.show(100)

In [None]:
#Examine the Predicted and Actual Values
predicted.createOrReplaceTempView("regressionPredictions")


In [None]:
%%sql
SELECT trueLabel, prediction FROM regressionPredictions

In [None]:
#Retrieve the Root Mean Square Error (RMSE)

evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(prediction)
print "Root Mean Square Error (RMSE):", rmse


In [None]:
#getting results
outcome = model.transform(out)