In [131]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import (BinaryClassificationEvaluator,
                                    MulticlassClassificationEvaluator)
from pyspark.ml.feature import (StringIndexer, VectorAssembler,
                               OneHotEncoder)
from pyspark.sql.session import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, sum

In [1]:
import findspark

In [2]:
spark = findspark.init('/home/danial/spark-3.4.0-bin-hadoop3')

In [7]:
spark = SparkSession.builder.config('spark.jars.packages', 'org.xerial:sqlite-jdbc:3.42.0.0').getOrCreate()

In [42]:
df_spark = spark.read.format('jdbc').options(driver='org.sqlite.JDBC', \
                                             inferSchema = True,\
                                             header = True,\
                                             dbtable = 'fires',\
                                             url= 'jdbc:sqlite:/home/danial/Desktop/mygis/Geospatial_Data_Science/My GIS Projects/data/3.1.88_M_US_Wildfires/FPA_FOD_20170508.sqlite').load()


In [56]:
df_spark.printSchema()

root
 |-- OBJECTID: integer (nullable = true)
 |-- FOD_ID: decimal(38,18) (nullable = true)
 |-- FPA_ID: string (nullable = true)
 |-- SOURCE_SYSTEM_TYPE: string (nullable = true)
 |-- SOURCE_SYSTEM: string (nullable = true)
 |-- NWCG_REPORTING_AGENCY: string (nullable = true)
 |-- NWCG_REPORTING_UNIT_ID: string (nullable = true)
 |-- NWCG_REPORTING_UNIT_NAME: string (nullable = true)
 |-- SOURCE_REPORTING_UNIT: string (nullable = true)
 |-- SOURCE_REPORTING_UNIT_NAME: string (nullable = true)
 |-- LOCAL_FIRE_REPORT_ID: string (nullable = true)
 |-- LOCAL_INCIDENT_ID: string (nullable = true)
 |-- FIRE_CODE: string (nullable = true)
 |-- FIRE_NAME: string (nullable = true)
 |-- ICS_209_INCIDENT_NUMBER: string (nullable = true)
 |-- ICS_209_NAME: string (nullable = true)
 |-- MTBS_ID: string (nullable = true)
 |-- MTBS_FIRE_NAME: string (nullable = true)
 |-- COMPLEX_NAME: string (nullable = true)
 |-- FIRE_YEAR: decimal(38,18) (nullable = true)
 |-- DISCOVERY_DATE: decimal(38,18) (null

In [67]:
df = df_spark.select('LONGITUDE', 'LATITUDE','FIRE_SIZE', 'FIRE_SIZE_CLASS',\
               'STATE','FIRE_YEAR', 'DISCOVERY_DOY', 'CONT_DOY',\
                'STAT_CAUSE_DESCR', 'STAT_CAUSE_CODE','DISCOVERY_TIME', 'CONT_TIME'\
               )
df.printSchema()

root
 |-- LONGITUDE: decimal(38,18) (nullable = true)
 |-- LATITUDE: decimal(38,18) (nullable = true)
 |-- FIRE_SIZE: decimal(38,18) (nullable = true)
 |-- FIRE_SIZE_CLASS: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- FIRE_YEAR: decimal(38,18) (nullable = true)
 |-- DISCOVERY_DOY: decimal(38,18) (nullable = true)
 |-- CONT_DOY: decimal(38,18) (nullable = true)
 |-- STAT_CAUSE_DESCR: string (nullable = true)
 |-- STAT_CAUSE_CODE: decimal(38,18) (nullable = true)
 |-- DISCOVERY_TIME: string (nullable = true)
 |-- CONT_TIME: string (nullable = true)



In [59]:
df_spark.select('STAT_CAUSE_CODE').distinct().show()

[Stage 28:>                                                         (0 + 1) / 1]

+--------------------+
|     STAT_CAUSE_CODE|
+--------------------+
|1.000000000000000000|
|3.000000000000000000|
|12.00000000000000...|
|4.000000000000000000|
|10.00000000000000...|
|11.00000000000000...|
|2.000000000000000000|
|13.00000000000000...|
|9.000000000000000000|
|8.000000000000000000|
|6.000000000000000000|
|7.000000000000000000|
|5.000000000000000000|
+--------------------+



                                                                                

In [69]:
df = df.withColumn("FIRE_YEAR", df.FIRE_YEAR.cast("integer"))
df = df.withColumn("DISCOVERY_DOY", df.DISCOVERY_DOY.cast("integer"))
df = df.withColumn("CONT_DOY", df.CONT_DOY.cast("integer"))
df = df.withColumn("STAT_CAUSE_CODE", df.STAT_CAUSE_CODE.cast("integer"))


In [70]:
df.select('STATE','FIRE_YEAR', 'DISCOVERY_DOY', 'CONT_DOY',\
                'STAT_CAUSE_DESCR', 'STAT_CAUSE_CODE','DISCOVERY_TIME', 'CONT_TIME').show()



+-----+---------+-------------+--------+----------------+---------------+--------------+---------+
|STATE|FIRE_YEAR|DISCOVERY_DOY|CONT_DOY|STAT_CAUSE_DESCR|STAT_CAUSE_CODE|DISCOVERY_TIME|CONT_TIME|
+-----+---------+-------------+--------+----------------+---------------+--------------+---------+
|   CA|     2005|           33|      33|   Miscellaneous|              9|          1300|     1730|
|   CA|     2004|          133|     133|       Lightning|              1|          0845|     1530|
|   CA|     2004|          152|     152|  Debris Burning|              5|          1921|     2024|
|   CA|     2004|          180|     185|       Lightning|              1|          1600|     1400|
|   CA|     2004|          180|     185|       Lightning|              1|          1600|     1200|
|   CA|     2004|          182|     183|       Lightning|              1|          1800|     1600|
|   CA|     2004|          183|     184|       Lightning|              1|          1800|     1400|
|   CA|   

In [71]:
df_spark.select('LONGITUDE', 'LATITUDE','FIRE_SIZE', 'FIRE_SIZE_CLASS').show()


+--------------------+--------------------+--------------------+---------------+
|           LONGITUDE|            LATITUDE|           FIRE_SIZE|FIRE_SIZE_CLASS|
+--------------------+--------------------+--------------------+---------------+
|-121.005833330000...|40.03694444000000...|0.100000000000000000|              A|
|-120.404444440000...|38.93305556000000...|0.250000000000000000|              A|
|-120.735555560000...|38.98416667000000...|0.100000000000000000|              A|
|-119.913333330000...|38.55916667000000...|0.100000000000000000|              A|
|-119.933055560000...|38.55916667000000...|0.100000000000000000|              A|
|-120.103611110000...|38.63527778000000...|0.100000000000000000|              A|
|-120.153333330000...|38.68833333000000...|0.100000000000000000|              A|
|-122.433888890000...|40.96805556000000...|0.800000000000000000|              B|
|-122.283333330000...|41.23361111000000...|1.000000000000000000|              B|
|-120.149166670000...|38.548

In [73]:
df.printSchema()

root
 |-- LONGITUDE: decimal(38,18) (nullable = true)
 |-- LATITUDE: decimal(38,18) (nullable = true)
 |-- FIRE_SIZE: decimal(38,18) (nullable = true)
 |-- FIRE_SIZE_CLASS: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- FIRE_YEAR: integer (nullable = true)
 |-- DISCOVERY_DOY: integer (nullable = true)
 |-- CONT_DOY: integer (nullable = true)
 |-- STAT_CAUSE_DESCR: string (nullable = true)
 |-- STAT_CAUSE_CODE: integer (nullable = true)
 |-- DISCOVERY_TIME: string (nullable = true)
 |-- CONT_TIME: string (nullable = true)



# Let's see the task as a classification task

+ meaning I will choose the fire size class as the target variable rather the the fire size in acres.
+ I chose STAT_CAUSE_CODE rather than STAT_CAUSE_DESCR, so I will not need to perform encoding. at least for this attribute
+ For now, I will exclude DISCOVERY_TIME and CONT_TIME
+ I will keep DISCOVERY_DOY becasue it may be useful in terms of predicting fire size like the day of the year the fire occured may be related to the size of fire like in terms of weather temperature
+ I will keep CONT_DOY becasue it may be useful in terms of predicting fire size like the larger the fire the more time it may take to control the fire


In [118]:
my_df = df.select('LONGITUDE', 'LATITUDE', 'FIRE_SIZE_CLASS',\
               'STATE','FIRE_YEAR', 'DISCOVERY_DOY', 'CONT_DOY',\
                'STAT_CAUSE_CODE'\
               )

In [119]:
my_df.show()

+--------------------+--------------------+---------------+-----+---------+-------------+--------+---------------+
|           LONGITUDE|            LATITUDE|FIRE_SIZE_CLASS|STATE|FIRE_YEAR|DISCOVERY_DOY|CONT_DOY|STAT_CAUSE_CODE|
+--------------------+--------------------+---------------+-----+---------+-------------+--------+---------------+
|-121.005833330000...|40.03694444000000...|              A|   CA|     2005|           33|      33|              9|
|-120.404444440000...|38.93305556000000...|              A|   CA|     2004|          133|     133|              1|
|-120.735555560000...|38.98416667000000...|              A|   CA|     2004|          152|     152|              5|
|-119.913333330000...|38.55916667000000...|              A|   CA|     2004|          180|     185|              1|
|-119.933055560000...|38.55916667000000...|              A|   CA|     2004|          180|     185|              1|
|-120.103611110000...|38.63527778000000...|              A|   CA|     2004|     

In [120]:
my_df.printSchema()

root
 |-- LONGITUDE: decimal(38,18) (nullable = true)
 |-- LATITUDE: decimal(38,18) (nullable = true)
 |-- FIRE_SIZE_CLASS: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- FIRE_YEAR: integer (nullable = true)
 |-- DISCOVERY_DOY: integer (nullable = true)
 |-- CONT_DOY: integer (nullable = true)
 |-- STAT_CAUSE_CODE: integer (nullable = true)



In [121]:
my_df.select('FIRE_SIZE_CLASS').distinct().show()

[Stage 81:>                                                         (0 + 1) / 1]

+---------------+
|FIRE_SIZE_CLASS|
+---------------+
|              F|
|              E|
|              B|
|              D|
|              C|
|              A|
|              G|
+---------------+



                                                                                

In [122]:
null_counts = my_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in my_df.columns])

null_counts.show()


[Stage 84:>                                                         (0 + 1) / 1]

+---------+--------+---------------+-----+---------+-------------+--------+---------------+
|LONGITUDE|LATITUDE|FIRE_SIZE_CLASS|STATE|FIRE_YEAR|DISCOVERY_DOY|CONT_DOY|STAT_CAUSE_CODE|
+---------+--------+---------------+-----+---------+-------------+--------+---------------+
|        0|       0|              0|    0|        0|            0|  891531|              0|
+---------+--------+---------------+-----+---------+-------------+--------+---------------+



                                                                                

# SO for now let's exclude CONT_DOY column which contains null values

later I will retrain the model using these column where dropping the null vals

In [123]:
df_final = df.select('LONGITUDE', 'LATITUDE', 'FIRE_SIZE_CLASS',\
               'STATE','FIRE_YEAR', 'DISCOVERY_DOY',\
                'STAT_CAUSE_CODE'\
               )

In [124]:
df_final.show()

+--------------------+--------------------+---------------+-----+---------+-------------+---------------+
|           LONGITUDE|            LATITUDE|FIRE_SIZE_CLASS|STATE|FIRE_YEAR|DISCOVERY_DOY|STAT_CAUSE_CODE|
+--------------------+--------------------+---------------+-----+---------+-------------+---------------+
|-121.005833330000...|40.03694444000000...|              A|   CA|     2005|           33|              9|
|-120.404444440000...|38.93305556000000...|              A|   CA|     2004|          133|              1|
|-120.735555560000...|38.98416667000000...|              A|   CA|     2004|          152|              5|
|-119.913333330000...|38.55916667000000...|              A|   CA|     2004|          180|              1|
|-119.933055560000...|38.55916667000000...|              A|   CA|     2004|          180|              1|
|-120.103611110000...|38.63527778000000...|              A|   CA|     2004|          182|              1|
|-120.153333330000...|38.68833333000000...|   

In [160]:
STATE_indexer = StringIndexer(inputCol='STATE', outputCol='STATE_Index')
STATE_indexed = STATE_indexer.fit(df_final).transform(df_final)
#STATE_encoder = OneHotEncoder(inputCol='STATE_Index', outputCol='STATE_Vec')
STATE_indexed.show()

[Stage 139:>                                                        (0 + 1) / 1]

+--------------------+--------------------+---------------+-----+---------+-------------+---------------+-----------+
|           LONGITUDE|            LATITUDE|FIRE_SIZE_CLASS|STATE|FIRE_YEAR|DISCOVERY_DOY|STAT_CAUSE_CODE|STATE_Index|
+--------------------+--------------------+---------------+-----+---------+-------------+---------------+-----------+
|-121.005833330000...|40.03694444000000...|              A|   CA|     2005|           33|              9|        0.0|
|-120.404444440000...|38.93305556000000...|              A|   CA|     2004|          133|              1|        0.0|
|-120.735555560000...|38.98416667000000...|              A|   CA|     2004|          152|              5|        0.0|
|-119.913333330000...|38.55916667000000...|              A|   CA|     2004|          180|              1|        0.0|
|-119.933055560000...|38.55916667000000...|              A|   CA|     2004|          180|              1|        0.0|
|-120.103611110000...|38.63527778000000...|             

                                                                                

In [163]:
FIRE_SIZE_CLASS_indexer = StringIndexer(inputCol='FIRE_SIZE_CLASS', outputCol='FIRE_SIZE_CLASS_Index')
FIRE_SIZE_CLASS_indexed = FIRE_SIZE_CLASS_indexer.fit(STATE_indexed).transform(STATE_indexed)
FIRE_SIZE_CLASS_indexed.show()


[Stage 150:>                                                        (0 + 1) / 1]

+--------------------+--------------------+---------------+-----+---------+-------------+---------------+-----------+---------------------+
|           LONGITUDE|            LATITUDE|FIRE_SIZE_CLASS|STATE|FIRE_YEAR|DISCOVERY_DOY|STAT_CAUSE_CODE|STATE_Index|FIRE_SIZE_CLASS_Index|
+--------------------+--------------------+---------------+-----+---------+-------------+---------------+-----------+---------------------+
|-121.005833330000...|40.03694444000000...|              A|   CA|     2005|           33|              9|        0.0|                  1.0|
|-120.404444440000...|38.93305556000000...|              A|   CA|     2004|          133|              1|        0.0|                  1.0|
|-120.735555560000...|38.98416667000000...|              A|   CA|     2004|          152|              5|        0.0|                  1.0|
|-119.913333330000...|38.55916667000000...|              A|   CA|     2004|          180|              1|        0.0|                  1.0|
|-119.933055560000..

                                                                                

In [162]:
FIRE_SIZE_CLASS_indexed.select('FIRE_SIZE_CLASS_Index').distinct().show()

[Stage 147:>                                                        (0 + 1) / 1]

+---------------------+
|FIRE_SIZE_CLASS_Index|
+---------------------+
|                  0.0|
|                  1.0|
|                  4.0|
|                  3.0|
|                  2.0|
|                  6.0|
|                  5.0|
+---------------------+



                                                                                

In [164]:
assembler = VectorAssembler(inputCols=['LONGITUDE', 'LATITUDE',
               'STATE_Index', 'FIRE_SIZE_CLASS_Vec','FIRE_YEAR', 'DISCOVERY_DOY',
                'STAT_CAUSE_CODE'], outputCol='features')


In [184]:
log_reg_model = LogisticRegression(featuresCol='features', labelCol='FIRE_SIZE_CLASS_Index')

In [169]:
pipeline = Pipeline (stages=[STATE_indexer, FIRE_SIZE_CLASS_indexer,
                             STATE_indexed, FIRE_SIZE_CLASS_indexed,
                            assembler, log_reg_model])

In [174]:
data = FIRE_SIZE_CLASS_indexed.select('LONGITUDE', 'LATITUDE', 'FIRE_YEAR', 'DISCOVERY_DOY', 'STAT_CAUSE_CODE', 'STATE_Index', 'FIRE_SIZE_CLASS_Index')
data.show()

+--------------------+--------------------+---------+-------------+---------------+-----------+---------------------+
|           LONGITUDE|            LATITUDE|FIRE_YEAR|DISCOVERY_DOY|STAT_CAUSE_CODE|STATE_Index|FIRE_SIZE_CLASS_Index|
+--------------------+--------------------+---------+-------------+---------------+-----------+---------------------+
|-121.005833330000...|40.03694444000000...|     2005|           33|              9|        0.0|                  1.0|
|-120.404444440000...|38.93305556000000...|     2004|          133|              1|        0.0|                  1.0|
|-120.735555560000...|38.98416667000000...|     2004|          152|              5|        0.0|                  1.0|
|-119.913333330000...|38.55916667000000...|     2004|          180|              1|        0.0|                  1.0|
|-119.933055560000...|38.55916667000000...|     2004|          180|              1|        0.0|                  1.0|
|-120.103611110000...|38.63527778000000...|     2004|   

In [180]:
train_data, test_data = data.randomSplit([0.7, 0.3])

In [182]:
assembler = VectorAssembler(inputCols=['LONGITUDE', 'LATITUDE',
               'STATE_Index','FIRE_YEAR', 'DISCOVERY_DOY',
                'STAT_CAUSE_CODE'], outputCol='features')


In [185]:
fit_model = log_reg_model.fit(train_data)

IllegalArgumentException: features does not exist. Available: LONGITUDE, LATITUDE, FIRE_YEAR, DISCOVERY_DOY, STAT_CAUSE_CODE, STATE_Index, FIRE_SIZE_CLASS_Index