In [38]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
input_uri="mongodb://10.75.13.8/fires.fires"
output_uri="mongodb://10.75.13.8/fires.fires"

In [3]:
myspark=SparkSession.builder.appName("wildfire").config("spark.mongodb.input.uri", input_uri).config("spark.mongodb.output.uri", output_uri).config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.0').getOrCreate()

df=myspark.read.format('com.mongodb.spark.sql.DefaultSource').load()

print(df.printSchema())

root
 |-- COMPLEX_NAME: string (nullable = true)
 |-- CONT_DATE: timestamp (nullable = true)
 |-- CONT_DOY: double (nullable = true)
 |-- CONT_TIME: string (nullable = true)
 |-- COUNTY: string (nullable = true)
 |-- DISCOVERY_DATE: timestamp (nullable = true)
 |-- DISCOVERY_DOY: integer (nullable = true)
 |-- DISCOVERY_TIME: string (nullable = true)
 |-- FIPS_CODE: string (nullable = true)
 |-- FIPS_NAME: string (nullable = true)
 |-- FIRE_CODE: string (nullable = true)
 |-- FIRE_NAME: string (nullable = true)
 |-- FIRE_SIZE: double (nullable = true)
 |-- FIRE_SIZE_CLASS: string (nullable = true)
 |-- FIRE_YEAR: integer (nullable = true)
 |-- FOD_ID: integer (nullable = true)
 |-- FPA_ID: string (nullable = true)
 |-- ICS_209_INCIDENT_NUMBER: string (nullable = true)
 |-- ICS_209_NAME: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LOCAL_FIRE_REPORT_ID: string (nullable = true)
 |-- LOCAL_INCIDENT_ID: string (nullable = true)
 |-- LONGITUDE: double (nullable = t

In [9]:
df.createOrReplaceTempView("wildfires")

#select our features and target for the classification
SQL_QUERY ="SELECT STATE, \
            month(DISCOVERY_DATE) as month, \
            dayofweek(DISCOVERY_DATE) as day, \
            LATITUDE, \
            LONGITUDE, \
            FIRE_SIZE, \
            ABS(datediff(CONT_DATE,DISCOVERY_DATE)) as burn_dur, \
            STAT_CAUSE_DESCR as cause \
            from wildfires"
class_df=myspark.sql(SQL_QUERY)

In [10]:
print(class_df.show())

+-----+-----+---+-----------+-------------+---------+--------+--------------+
|STATE|month|day|   LATITUDE|    LONGITUDE|FIRE_SIZE|burn_dur|         cause|
+-----+-----+---+-----------+-------------+---------+--------+--------------+
|   CA|    2|  4|40.03694444|-121.00583333|      0.1|       0| Miscellaneous|
|   CA|    5|  4|38.93305556|-120.40444444|     0.25|       0|     Lightning|
|   CA|    5|  2|38.98416667|-120.73555556|      0.1|       0|Debris Burning|
|   CA|    6|  2|38.55916667|-119.91333333|      0.1|       5|     Lightning|
|   CA|    6|  2|38.55916667|-119.93305556|      0.1|       5|     Lightning|
|   CA|    6|  4|38.63527778|-120.10361111|      0.1|       1|     Lightning|
|   CA|    7|  5|38.68833333|-120.15333333|      0.1|       1|     Lightning|
|   CA|    3|  3|40.96805556|-122.43388889|      0.8|       0|Debris Burning|
|   CA|    3|  3|41.23361111|-122.28333333|      1.0|       0|Debris Burning|
|   CA|    7|  5|38.54833333|-120.14916667|      0.1|       1|  

In [12]:
class_df.dtypes

[('STATE', 'string'),
 ('month', 'int'),
 ('day', 'int'),
 ('LATITUDE', 'double'),
 ('LONGITUDE', 'double'),
 ('FIRE_SIZE', 'double'),
 ('burn_dur', 'int'),
 ('cause', 'string')]

In [14]:
#since we have categorical and string data we need to put these into numeric values
#for the machine learning to work
from pyspark.ml.feature import OneHotEncoder, StringIndexer

In [18]:
#lets make a list of the columns that need to be onehotencoded and those that are numerical
catCols=['STATE', 'cause']
numCols=['month', 'day', 'LATITUDE', 'LONGITUDE', 'FIRE_SIZE', 'burn_dur']

In [16]:
train, test = class_df.randomSplit([0.7,0.3])

In [76]:
#prior to onehotencoding we first stringindex
#we will index the cause for the label use only
string_indexer=[
    StringIndexer(inputCol=x, outputCol=x+"_StringIndexer", handleInvalid="skip")
    for x in catCols
]

[StringIndexer_89092406c63a, StringIndexer_c19c9120a3dd]

In [118]:
#we don't want to one_hot_encode the cause since it is the target
one_hot_encoder=[
    OneHotEncoder(
        inputCols=["STATE_StringIndexer"],
        outputCols=["STATE_OneHotEncoder"]
    )
]

[OneHotEncoder_1a5fc8e9635f]

In [21]:
#now we assemble the features into a feature vector
from pyspark.ml.feature import VectorAssembler

In [119]:
assemblerInput= [x for x in numCols]
assemblerInput += ["STATE_OneHotEncoder"]
#check the columns are as we expect
assemblerInput

['month',
 'day',
 'LATITUDE',
 'LONGITUDE',
 'FIRE_SIZE',
 'burn_dur',
 'STATE_OneHotEncoder']

In [120]:
vector_assembler=VectorAssembler(
    inputCols=assemblerInput,
    outputCol="features",
    handleInvalid="skip"
)

In [121]:
#cosntruct our stages for the ML pipeline
stages=[]
stages+=string_indexer
stages+=one_hot_encoder
stages+=[vector_assembler]

[StringIndexer_89092406c63a,
 StringIndexer_c19c9120a3dd,
 OneHotEncoder_1a5fc8e9635f,
 VectorAssembler_c33b90cff163]

In [122]:
%%time
#now to get our data into shape with the pipeline on the train set and then transform the test set
from pyspark.ml import Pipeline

pipeline=Pipeline().setStages(stages)
model=pipeline.fit(train)

pp_df=model.transform(test)

CPU times: user 27.5 ms, sys: 4.04 ms, total: 31.6 ms
Wall time: 55.7 s


In [123]:
print(pp_df.show(2, truncate=False))

+-----+-----+---+-----------+-------------+---------+--------+--------------+-------------------+-------------------+-------------------+---------------------------------------------------------------+
|STATE|month|day|LATITUDE   |LONGITUDE    |FIRE_SIZE|burn_dur|cause         |STATE_StringIndexer|cause_StringIndexer|STATE_OneHotEncoder|features                                                       |
+-----+-----+---+-----------+-------------+---------+--------+--------------+-------------------+-------------------+-------------------+---------------------------------------------------------------+
|AK   |3    |1  |58.36111111|-134.62666667|1.0      |0       |Debris Burning|34.0               |0.0                |(51,[34],[1.0])    |(57,[0,1,2,3,4,40],[3.0,1.0,58.36111111,-134.62666667,1.0,1.0])|
|AK   |3    |3  |57.78333333|-134.66666667|0.1      |0       |Smoking       |34.0               |8.0                |(51,[34],[1.0])    |(57,[0,1,2,3,4,40],[3.0,3.0,57.78333333,-134.66666667,0

In [37]:
#Let's try Logistic Regression
from pyspark.ml.classification import LogisticRegression

In [124]:
data=pp_df.select(
    col("features"),
    col("cause_StringIndexer").alias("label")
)

In [89]:
data.show(5, truncate=False)

+----------------------------------------------------------------------+-----+
|features                                                              |label|
+----------------------------------------------------------------------+-----+
|(69,[0,1,2,3,4,40,57],[3.0,1.0,58.36111111,-134.62666667,1.0,1.0,1.0])|0.0  |
|(69,[0,1,2,3,4,40,65],[3.0,3.0,57.78333333,-134.66666667,0.1,1.0,1.0])|8.0  |
|(69,[0,1,2,3,4,5,40,63],[3.0,7.0,57.6,-135.21666667,0.1,1.0,1.0,1.0]) |6.0  |
|(69,[0,1,2,3,4,40,58],[4.0,6.0,56.34916667,-132.34,0.1,1.0,1.0])      |1.0  |
|(69,[0,1,2,3,4,40,64],[4.0,7.0,55.58333333,-133.03333333,1.0,1.0,1.0])|7.0  |
+----------------------------------------------------------------------+-----+
only showing top 5 rows



In [125]:
%%time
log_reg_model=LogisticRegression().fit(data)

CPU times: user 16.1 ms, sys: 326 µs, total: 16.4 ms
Wall time: 1min 10s


In [132]:
trainingSummary=log_reg_model.summary

In [133]:
objectiveHistory=trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

objectiveHistory:
2.0868170035109475
2.0641286495122633
2.017050019070519
1.745099953312953
1.72858638481694
1.721803155785034
1.7174234908918775
1.7100004654866767
1.6991111636980631
1.686693617109632
1.677581662111939
1.6736747810226438
1.6720750796939594
1.6701783070401255
1.6650089060748188
1.6632844056686957
1.6604309558106642
1.6583851472776545
1.657429344247666
1.6557352471758444
1.653923013682503
1.6526267246962956
1.651774702614614
1.650850979253984
1.6497779989859869
1.6476664919281705
1.64663446922236
1.6457675043381876
1.6451220396546826
1.6442756964980376
1.643278924416367
1.6429379063200038
1.6423647224951647
1.6421374858532245
1.6417021314958302
1.6414096840842385
1.6410137554780369
1.6405827870517629
1.6403603121120454
1.6398152050234067
1.639575440012165
1.6393281210129893
1.6391939426790694
1.6389412383799755
1.638607364205586
1.6382259845073843
1.6377641551054125
1.6374197283868301
1.637111930626467
1.6369239773187836
1.6367222187660844
1.6363648130219532
1.636111260

In [134]:
print("False positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

False positive rate by label:
label 0: 0.16833548664944015
label 1: 0.08435414719061281
label 2: 0.09473912700292052
label 3: 0.27259818506721895
label 4: 0.02189034518274594
label 5: 0.0012384870528278787
label 6: 0.0002600919228987779
label 7: 0.013100300583159032
label 8: 0.0
label 9: 0.0
label 10: 0.0
label 11: 0.0
label 12: 0.0


In [135]:
print("True positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

True positive rate by label:
label 0: 0.5566193049640313
label 1: 0.39295150603634
label 2: 0.39752216412821095
label 3: 0.8856930107605379
label 4: 0.4423076923076923
label 5: 0.007264070321084117
label 6: 0.001976047904191617
label 7: 0.09769713886950454
label 8: 0.0
label 9: 0.0
label 10: 0.0
label 11: 0.0
label 12: 0.0


In [136]:
print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
    print("label %d: %s" % (i, prec))

Precision by label:
label 0: 0.43348291432075364
label 1: 0.480133108175226
label 2: 0.42145473826279767
label 3: 0.4929705657678743
label 4: 0.6131761086900794
label 5: 0.25481798715203424
label 6: 0.3113207547169811
label 7: 0.18170019467878
label 8: 0.0
label 9: 0.0
label 10: 0.0
label 11: 0.0
label 12: 0.0


In [137]:
print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
    print("label %d: %s" % (i, rec))

Recall by label:
label 0: 0.5566193049640313
label 1: 0.39295150603634
label 2: 0.39752216412821095
label 3: 0.8856930107605379
label 4: 0.4423076923076923
label 5: 0.007264070321084117
label 6: 0.001976047904191617
label 7: 0.09769713886950454
label 8: 0.0
label 9: 0.0
label 10: 0.0
label 11: 0.0
label 12: 0.0


In [138]:
print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))

F-measure by label:
label 0: 0.48739403625879435
label 1: 0.43218956074661896
label 2: 0.4091387660560118
label 3: 0.6333968519146937
label 4: 0.513911268664733
label 5: 0.014125467386788534
label 6: 0.003927168868261336
label 7: 0.12707056954844567
label 8: 0.0
label 9: 0.0
label 10: 0.0
label 11: 0.0
label 12: 0.0


In [139]:
accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

Accuracy: 0.46792884285570163
FPR: 0.12444501350752565
TPR: 0.4679288428557017
F-measure: 0.4115667715402553
Precision: 0.41816999391003773
Recall: 0.4679288428557017


In [140]:
pp_df.select("cause_StringIndexer", "cause").distinct().orderBy("cause_StringIndexer").show()

+-------------------+-----------------+
|cause_StringIndexer|            cause|
+-------------------+-----------------+
|                0.0|   Debris Burning|
|                1.0|    Miscellaneous|
|                2.0|            Arson|
|                3.0|        Lightning|
|                4.0|Missing/Undefined|
|                5.0|    Equipment Use|
|                6.0|         Campfire|
|                7.0|         Children|
|                8.0|          Smoking|
|                9.0|         Railroad|
|               10.0|        Powerline|
|               11.0|        Fireworks|
|               12.0|        Structure|
+-------------------+-----------------+

