In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('tree_methods_adv').getOrCreate()

In [2]:
dataset = spark.read.csv('dataset.csv',inferSchema=True,header=True)

In [3]:
# Print data schema.
dataset.printSchema()

# Print data columns.
dataset.columns

root
 |-- accident_index: string (nullable = true)
 |-- vehicle_reference: integer (nullable = true)
 |-- vehicle_type: integer (nullable = true)
 |-- towing_and_articulation: integer (nullable = true)
 |-- vehicle_manoeuvre: integer (nullable = true)
 |-- vehicle_location-restricted_lane: integer (nullable = true)
 |-- junction_location: integer (nullable = true)
 |-- skidding_and_overturning: integer (nullable = true)
 |-- hit_object_in_carriageway: integer (nullable = true)
 |-- vehicle_leaving_carriageway: integer (nullable = true)
 |-- hit_object_off_carriageway: integer (nullable = true)
 |-- 1st_point_of_impact: integer (nullable = true)
 |-- was_vehicle_left_hand_drive?: integer (nullable = true)
 |-- journey_purpose_of_driver: integer (nullable = true)
 |-- sex_of_driver: integer (nullable = true)
 |-- age_of_driver: integer (nullable = true)
 |-- age_band_of_driver: integer (nullable = true)
 |-- engine_capacity_(cc): integer (nullable = true)
 |-- propulsion_code: integer (n

['accident_index',
 'vehicle_reference',
 'vehicle_type',
 'towing_and_articulation',
 'vehicle_manoeuvre',
 'vehicle_location-restricted_lane',
 'junction_location',
 'skidding_and_overturning',
 'hit_object_in_carriageway',
 'vehicle_leaving_carriageway',
 'hit_object_off_carriageway',
 '1st_point_of_impact',
 'was_vehicle_left_hand_drive?',
 'journey_purpose_of_driver',
 'sex_of_driver',
 'age_of_driver',
 'age_band_of_driver',
 'engine_capacity_(cc)',
 'propulsion_code',
 'age_of_vehicle',
 'driver_imd_decile',
 'driver_home_area_type',
 'vehicle_imd_decile',
 'NUmber_of_Casualities_unique_to_accident_index',
 'No_of_Vehicles_involved_unique_to_accident_index',
 'location_easting_osgr',
 'location_northing_osgr',
 'longitude',
 'latitude',
 'police_force',
 'accident_severity',
 'number_of_vehicles',
 'number_of_casualties',
 'date',
 'day_of_week',
 'time',
 'local_authority_(district)',
 'local_authority_(highway)',
 '1st_road_class',
 '1st_road_number',
 'road_type',
 'speed_lim

In [41]:
my_cols = dataset.select([ 'weather_conditions','age_of_driver','accident_severity' ])

In [42]:
finaldata = my_cols.na.drop()

In [43]:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,
                                OneHotEncoder,StringIndexer)

Data Transformation

In [44]:
assembler = VectorAssembler(inputCols=[ 'weather_conditions','age_of_driver','accident_severity'],outputCol='features')

In [45]:
from pyspark.ml.classification import LogisticRegression

In [46]:
from pyspark.ml import Pipeline

In [47]:
regmodel = LogisticRegression(featuresCol='features',labelCol='accident_severity')

In [48]:
pipeline = Pipeline(stages=[assembler,regmodel])

In [49]:
train_titanic_data, test_titanic_data = finaldata.randomSplit([0.75,0.25])

In [50]:
fitmodel = pipeline.fit(train_titanic_data)

In [52]:
results = fitmodel.transform(test_titanic_data)

In [53]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='accident_severity')

In [54]:
results.select('accident_severity','prediction').show()

+-----------------+----------+
|accident_severity|prediction|
+-----------------+----------+
|                2|       2.0|
|                2|       2.0|
|                2|       2.0|
|                2|       2.0|
|                2|       2.0|
|                2|       2.0|
|                2|       2.0|
|                2|       2.0|
|                2|       2.0|
|                2|       2.0|
|                2|       2.0|
|                2|       2.0|
|                2|       2.0|
|                2|       2.0|
|                2|       2.0|
|                2|       2.0|
|                2|       2.0|
|                2|       2.0|
|                2|       2.0|
|                2|       2.0|
+-----------------+----------+
only showing top 20 rows



In [55]:
AUC = my_eval.evaluate(results)

AUC

1.0