In [29]:
# from pyspark.context import SparkContext
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.session import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, StandardScaler, VectorAssembler 
from pyspark.ml import Pipeline
from pyspark.sql.functions import rand
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import DoubleType

from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras import optimizers, regularizers
from keras.optimizers import Adam

from elephas.ml_model import ElephasEstimator



In [8]:
conf = SparkConf().setAppName('Spark DL Tabular Pipeline').setMaster('local[*]')

# Increase memory spark has available
SparkContext.setSystemProperty('spark.executor.memory', '6g')

sc = SparkContext(conf=conf)

sql_context = SQLContext(sc)

#Verify settings set for available memory
sc._conf.getAll()

[('spark.executor.memory', '6g'),
 ('spark.driver.host', 'DESKTOP-RE98QFK'),
 ('spark.driver.port', '49883'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.startTime', '1621853423909'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.id', 'local-1621853424914'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'Spark DL Tabular Pipeline')]

In [9]:
# Read the weather and the accident df

df_weather_temp = sql_context.read\
    .options(header=True, inferSchema=True)\
    .csv('datasets/hourly_weather.csv')

df_weather = df_weather_temp.withColumn('Hour', df_weather_temp.date[12:13][1:2].cast('double'))

df_accidents_temp = sql_context.read\
    .options(header=True, inferSchema=True)\
    .csv('datasets/accidents.csv')
df_accidents = df_accidents_temp.drop('Hour')

In [10]:
df_accidents.limit(5).toPandas()

Unnamed: 0,_c0,date,AccidentType,AccidentSeverityCategory,AccidentInvolvingPedestrian,AccidentInvolvingBicycle,AccidentInvolvingMotorcycle,RoadType,AccidentLocation_CHLV95_E,AccidentLocation_CHLV95_N,Month,WeekDay
0,0,2011-01-01 00:30:00,at0,as4,0,0,0,rt433,2684605,1245194,1,Saturday
1,1,2011-01-01 01:30:00,at0,as3,0,1,0,rt433,2682382,1246980,1,Saturday
2,2,2011-01-01 02:30:00,at0,as4,0,0,0,rt439,2682791,1247749,1,Saturday
3,3,2011-01-01 02:30:00,at5,as3,0,0,0,rt433,2681199,1247102,1,Saturday
4,4,2011-01-01 03:30:00,at0,as4,0,0,0,rt433,2682479,1250690,1,Saturday


In [11]:
df_weather.limit(5).toPandas()

Unnamed: 0,_c0,date,air_temperature,water_temperature,wind_gust_max_10min,wind_speed_avg_10min,wind_force_avg_10min,wind_direction,windchill,barometric_pressure_qfe,precipitation,dew_point,global_radiation,humidity,water_level,Hour
0,0,2011-01-01 00:30:00,2.233333,5.2,2.4,1.216667,1.216667,1785,2.2,974.55,0.0,1.616667,0.5,95.833333,67.635,0.0
1,1,2011-01-01 01:30:00,2.38,5.2,2.8,0.86,0.86,1076,2.16,973.98,0.0,1.52,0.6,93.6,81.162,1.0
2,2,2011-01-01 02:30:00,2.58,5.14,1.2,0.34,0.34,1159,2.58,973.64,0.0,1.38,0.0,92.0,81.162,2.0
3,3,2011-01-01 03:30:00,2.5,5.16,1.9,0.52,0.52,1122,2.54,973.42,0.0,1.5,0.0,92.8,81.162,3.0
4,4,2011-01-01 04:30:00,2.5,5.1,4.0,1.58,1.58,1108,1.62,973.16,0.0,0.72,0.2,88.2,81.162,4.0


In [12]:
df = df_weather.join(df_accidents, how='left')
# df.limit(5).toPandas()
# df.printSchema()

In [13]:
df = df.drop(*['AccidentLocation_CHLV95_N', 'AccidentLocation_CHLV95_E', 'RoadType',\
               'AccidentInvolvingMotorcycle', 'AccidentInvolvingBicycle', 'AccidentInvolvingPedestrian',\
              'AccidentType', 'date', '_c0'])
df.printSchema()

root
 |-- air_temperature: double (nullable = true)
 |-- water_temperature: double (nullable = true)
 |-- wind_gust_max_10min: double (nullable = true)
 |-- wind_speed_avg_10min: double (nullable = true)
 |-- wind_force_avg_10min: double (nullable = true)
 |-- wind_direction: integer (nullable = true)
 |-- windchill: double (nullable = true)
 |-- barometric_pressure_qfe: double (nullable = true)
 |-- precipitation: double (nullable = true)
 |-- dew_point: double (nullable = true)
 |-- global_radiation: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- water_level: double (nullable = true)
 |-- Hour: double (nullable = true)
 |-- AccidentSeverityCategory: string (nullable = true)
 |-- Month: integer (nullable = true)
 |-- WeekDay: string (nullable = true)



In [14]:
stages = []

X = df.columns
del X[-4:]
print(X)
# index accident type, accident severity and week day

severity_indexer = StringIndexer(inputCol='AccidentSeverityCategory', outputCol='severityIndex')
# severity_indexer = StringIndexer(inputCol='AccidentSeverityCategory', outputCol='severityIndex')
day_indexer = StringIndexer(inputCol='WeekDay', outputCol='weekDayIndex')

stages += [severity_indexer]
stages += [day_indexer]

# One-hot encoder for accident type, severity and week day

# ohe_accident_type = OneHotEncoder(inputCol='typeIndex', outputCol='type_vec')
# ohe_accident_severity = OneHotEncoder(intype_indexerputCol='severityIndex', outputCol='severity_vec')
ohe_weekday = OneHotEncoder(inputCol='weekDayIndex', outputCol='weekday_vec')
ohe_hour = OneHotEncoder(inputCol='Hour', outputCol='hour_vec')
5000
X += ['weekday_vec', 'hour_vec']
y = 'severityIndex'
print(X)
# stages += ohe_accident_type


assembler_final = VectorAssembler(inputCols=X, outputCol='features')

stages += [ohe_weekday, ohe_hour, assembler_final]

# Create pipeline and pass all stages
pipeline = Pipeline(stages=stages)

['air_temperature', 'water_temperature', 'wind_gust_max_10min', 'wind_speed_avg_10min', 'wind_force_avg_10min', 'wind_direction', 'windchill', 'barometric_pressure_qfe', 'precipitation', 'dew_point', 'global_radiation', 'humidity', 'water_level']
['air_temperature', 'water_temperature', 'wind_gust_max_10min', 'wind_speed_avg_10min', 'wind_force_avg_10min', 'wind_direction', 'windchill', 'barometric_pressure_qfe', 'precipitation', 'dew_point', 'global_radiation', 'humidity', 'water_level', 'weekday_vec', 'hour_vec']


In [15]:
stages

[StringIndexer_bdb3933a6e80,
 StringIndexer_71b395b999ea,
 OneHotEncoder_fbea22afd218,
 OneHotEncoder_0e19708b92ea,
 VectorAssembler_389b303d21b3]

In [16]:
# fit pipeline to data
pipeline_model = pipeline.fit(df)

# Transform data using fitted pipeline
df_transform = pipeline_model.transform(df)

In [17]:
# Inspect transformed data
df_transform.limit(5).toPandas()

Unnamed: 0,air_temperature,water_temperature,wind_gust_max_10min,wind_speed_avg_10min,wind_force_avg_10min,wind_direction,windchill,barometric_pressure_qfe,precipitation,dew_point,...,water_level,Hour,AccidentSeverityCategory,Month,WeekDay,severityIndex,weekDayIndex,weekday_vec,hour_vec,features
0,2.233333,5.2,2.4,1.216667,1.216667,1785,2.2,974.55,0.0,1.616667,...,67.635,0.0,as4,1,Saturday,0.0,5.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(2.233333333333334, 5.2, 2.4, 1.21666666666666..."
1,2.233333,5.2,2.4,1.216667,1.216667,1785,2.2,974.55,0.0,1.616667,...,67.635,0.0,as3,1,Saturday,1.0,5.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(2.233333333333334, 5.2, 2.4, 1.21666666666666..."
2,2.233333,5.2,2.4,1.216667,1.216667,1785,2.2,974.55,0.0,1.616667,...,67.635,0.0,as4,1,Saturday,0.0,5.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(2.233333333333334, 5.2, 2.4, 1.21666666666666..."
3,2.233333,5.2,2.4,1.216667,1.216667,1785,2.2,974.55,0.0,1.616667,...,67.635,0.0,as3,1,Saturday,1.0,5.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(2.233333333333334, 5.2, 2.4, 1.21666666666666..."
4,2.233333,5.2,2.4,1.216667,1.216667,1785,2.2,974.55,0.0,1.616667,...,67.635,0.0,as4,1,Saturday,0.0,5.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(2.233333333333334, 5.2, 2.4, 1.21666666666666..."


In [18]:
# select the features and the label for the final df
df_transform_fin = df_transform.select('features', 'severityIndex')
df_transform_fin.limit(5).toPandas()

Unnamed: 0,features,severityIndex
0,"(2.233333333333334, 5.2, 2.4, 1.21666666666666...",0.0
1,"(2.233333333333334, 5.2, 2.4, 1.21666666666666...",1.0
2,"(2.233333333333334, 5.2, 2.4, 1.21666666666666...",0.0
3,"(2.233333333333334, 5.2, 2.4, 1.21666666666666...",1.0
4,"(2.233333333333334, 5.2, 2.4, 1.21666666666666...",0.0


In [19]:
# Shuffle
df_transform_fin = df_transform_fin.orderBy(rand())

In [20]:
# Train / Test Split
train_data, test_data = df_transform_fin.randomSplit([.8, .2], seed=42)
train_data.persist()


DataFrame[features: vector, severityIndex: double]

In [21]:
# Number of classes
# too computationally expensive:
# nb_classes = train_data.select('severityIndex').distinct().count()
nb_classes = 4

In [22]:
#train_data.select('features').limit(5).toPandas()

In [23]:
#df.select('AccidentSeverityCategory').limit(5).toPandas()

In [24]:
# Input dimention
#input_dim = len(train_data.select('features').first()[0])
input_dim = 44

In [25]:
model = Sequential()
model.add(Dense(256, input_shape=(input_dim,), activity_regularizer=regularizers.l2(0.01)))
model.add(Activation('relu'))
model.add(Dropout(rate=0.3))

model.add(Dense(256, input_shape=(input_dim,), activity_regularizer=regularizers.l2(0.01)))
model.add(Activation('relu'))
model.add(Dropout(rate=0.3))

model.add(Dense(nb_classes))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam')

In [26]:
model.summary()

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_1 (Dense)              (None, 256)               11520     
_________________________________________________________________
activation_1 (Activation)    (None, 256)               0         
_________________________________________________________________
dropout_1 (Dropout)          (None, 256)               0         
_________________________________________________________________
dense_2 (Dense)              (None, 256)               65792     
_________________________________________________________________
activation_2 (Activation)    (None, 256)               0         
_________________________________________________________________
dropout_2 (Dropout)          (None, 256)               0         
_________________________________________________________________
dense_3 (Dense)              (None, 4)                

In [32]:
# Set and Serial optimizer
optimizer_conf = optimizers.Adam(lr=0.01)
opt_conf = optimizers.serialize(optimizer_conf)

# Initialize SparkML Estimator and get Settings
estimator = ElephasEstimator()
estimator.setFeaturesCol('features')
estimator.setLabelCol('severityIndex')
estimator.set_keras_model_config(model.to_yaml())
estimator.set_categorical_labels(True)
estimator.set_nb_classes(nb_classes)
estimator.set_num_workers(1)
estimator.set_epochs(2)
estimator.set_batch_size(64)
estimator.set_verbosity(1)
estimator.set_validation_split(0.1)
estimator.set_optimizer_config(opt_conf)
estimator.set_mode('synchronous')
estimator.set_loss('categorical_crossentropy')
estimator.set_metrics(['acc'])

ElephasEstimator_8d66ed786828

In [33]:
# Create deep learning pipeline
dl_pipeline = Pipeline(stages=[estimator])

In [34]:
# Helper function for fitting, transforming and predicting train and test
def dl_pip_fit_score_res(dl_pipeline=dl_pipeline, train_data=train_data,
                        test_data=test_data, label='labelIndex'):
    fit_dl_pipeline = dl_pipeline.fit(train_data)
    pred_train = fit_dl_pipeline.transform(train_data)
    pred_test = fit_dl_pipeline.transform(test_data)
    
    pnl_train = pred_train.select(label, 'prediction')
    pnl_test = pred_test.select(label, 'prediction')
    
    pred_and_label_train = pnl_train.rdd.map(lambda row: (row[label], row['prediction']))
    pred_and_label_test = pnl_test.rdd.map(lambda row: (row[label], row['prediction']))
    
    metrics_train = MulticlassMetrics(pred_and_label_train)
    metrics_test = MulticlassMetrics(pred_and_label_test)
    
    print('Train data accuracy: {}'.format(round(metrics_train.precision(), 4)))
    print('Training Data Confusion Matrix')
    display(pnl_train.crosstab('labelIndex', 'prediction\n').toPandas())
    
    print('Test data accuracy: {}'.format(round(metrics_test.precision(), 4)))
    print('Test Data Confusion Matrix')
    display(pnl_test.crosstab('labelIndex', 'prediction').toPandas())
    

In [35]:
from tensorflow.python.client import device_lib
print(device_lib.list_local_devices())

[name: "/device:CPU:0"
device_type: "CPU"
memory_limit: 268435456
locality {
}
incarnation: 7773833848595092224
, name: "/device:GPU:0"
device_type: "GPU"
memory_limit: 10108528231
locality {
  bus_id: 1
  links {
  }
}
incarnation: 4067923651490530269
physical_device_desc: "device: 0, name: GeForce GTX 1080 Ti, pci bus id: 0000:01:00.0, compute capability: 6.1"
]


In [None]:
dl_pip_fit_score_res(dl_pipeline=dl_pipeline, train_data=train_data,
                        test_data=test_data, label='labelIndex')