In [None]:
# Provisional machine learning model looking the accuracy of predicting forest fires in Alberta, CA
# Segment 1 Deliverables 

In [1]:
import pandas as pd
import numpy as np
import sklearn as skl



In [2]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:8 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:9 http://security.ubuntu.com/ubuntu bionic-security/restricted amd64 Packages [581 kB]
Get:10 http://security.ubuntu.com/ubuntu bionic-security/main amd64 Packages [2,335 kB]
Get:11 http://security.ubuntu.com/ubuntu bionic-security/universe amd64 Packages [1,428 kB]
Get:12 http://ppa.launchpad.net/c2d4u.team/c2d4u4

In [3]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2021-09-22 20:46:01--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2021-09-22 20:46:01 (5.36 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [4]:
# Start Spark session
from pyspark.sql import SparkSession
from pyspark.sql.functions import substring, length, col, expr, to_timestamp, date_format, round

spark = SparkSession.builder.appName("LMPT-Forest-Fires").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [5]:
connection_string = 'lmpt-finalproject.coke2w4vs8wf.us-east-2.rds.amazonaws.com'
password = 'LMPTp4ssw0rd' 
database_name = 'postgres'

# Configure settings for RDS
mode = "append"
jdbc_url=f"jdbc:postgresql://{connection_string}:5432/{database_name}"
config = {"user":"postgres", 
          "password": password, 
          "driver":"org.postgresql.Driver"}


In [6]:
# Read in data 
df = spark.read.jdbc(jdbc_url,table='fires_2006to2018',properties=config)

In [7]:
# Show dataframe 
df.show()

+-----------+---------+---------+-------------+-------------------+-------------------+------------+----------+----------------------+-----------------------+------------------+------------------+------------------------+----------------------+-------------------+-------------------+--------------------+-------------------+--------------+---------+-------------------+-------------------+-------------------+------------------------+------------------------+-----------------+---------+----------------------+----------------------------+---------+---------------+-------------------+-----------+-------------------+-----------+-------------------+-----------+-------------------+-----------+
|fire_number|fire_name|fire_year|calendar_year|assessment_datetime|assessment_hectares|current_size|size_class|fire_location_latitude|fire_location_longitude|       fire_origin|general_cause_desc|industry_identifier_desc|responsible_group_desc|     activity_class|         true_cause|  permit_detail_desc|

In [8]:
# Create new dataframe for model with chosen features
fire_df = df[['calendar_year','fire_start_date', 'fire_fighting_start_size', 'bh_fs_date', 'bh_hectares', 'weather_conditions_over_fire', 'true_cause']]  
# The following features have been witheld 'fire_number','size_class', 'start_for_fire_date',,
fire_df.show(10)

+-------------+-------------------+------------------------+-------------------+-----------+----------------------------+------------+
|calendar_year|    fire_start_date|fire_fighting_start_size|         bh_fs_date|bh_hectares|weather_conditions_over_fire|  true_cause|
+-------------+-------------------+------------------------+-------------------+-----------+----------------------------+------------+
|         2015|2015-07-29 18:00:00|                      NA|2015-07-29 20:38:00|       0.01|                       Clear|Vehicle Fire|
|         2015|2015-07-29 15:00:00|                       1|2015-07-30 21:01:00|        0.4|                       Clear|        null|
|         2015|2015-07-27 12:00:00|                     0.1|2015-07-31 13:08:00|        0.1|                      Cloudy|        null|
|         2015|2015-07-11 21:04:00|                      NA|2015-07-31 23:15:00|        0.1|                      Cloudy|        null|
|         2015|2015-08-03 13:00:00|                    

In [9]:
# Drop off NA  starting sizes and convert to data type double
fire_df = fire_df[fire_df.fire_fighting_start_size != 'NA']
fire_df = fire_df.withColumn('fire_fighting_start_size',fire_df['fire_fighting_start_size'].cast("double"))
fire_df.show()

+-------------+-------------------+------------------------+-------------------+-----------+----------------------------+--------------+
|calendar_year|    fire_start_date|fire_fighting_start_size|         bh_fs_date|bh_hectares|weather_conditions_over_fire|    true_cause|
+-------------+-------------------+------------------------+-------------------+-----------+----------------------------+--------------+
|         2015|2015-07-29 15:00:00|                     1.0|2015-07-30 21:01:00|        0.4|                       Clear|          null|
|         2015|2015-07-27 12:00:00|                     0.1|2015-07-31 13:08:00|        0.1|                      Cloudy|          null|
|         2015|2015-08-16 12:00:00|                     0.5|2015-08-16 17:05:00|        0.5|                       Clear|          null|
|         2015|2015-08-10 15:00:00|                     0.3|2015-08-18 09:45:00|        0.3|                      Cloudy|          null|
|         2015|2015-08-01 12:00:00|      

In [10]:
# Replace bh_hectares with ratio of fire size between "start" and "being held". Ratio of fire size will be the final predictive variable
fire_df = fire_df.withColumn("fire_growth",col("bh_hectares")/col("fire_fighting_start_size")).drop("bh_hectares")
fire_df.show()

+-------------+-------------------+------------------------+-------------------+----------------------------+--------------+-----------+
|calendar_year|    fire_start_date|fire_fighting_start_size|         bh_fs_date|weather_conditions_over_fire|    true_cause|fire_growth|
+-------------+-------------------+------------------------+-------------------+----------------------------+--------------+-----------+
|         2015|2015-07-29 15:00:00|                     1.0|2015-07-30 21:01:00|                       Clear|          null|        0.4|
|         2015|2015-07-27 12:00:00|                     0.1|2015-07-31 13:08:00|                      Cloudy|          null|        1.0|
|         2015|2015-08-16 12:00:00|                     0.5|2015-08-16 17:05:00|                       Clear|          null|        1.0|
|         2015|2015-08-10 15:00:00|                     0.3|2015-08-18 09:45:00|                      Cloudy|          null|        1.0|
|         2015|2015-08-01 12:00:00|      

In [11]:
# Review data types
fire_df.dtypes

[('calendar_year', 'int'),
 ('fire_start_date', 'string'),
 ('fire_fighting_start_size', 'double'),
 ('bh_fs_date', 'string'),
 ('weather_conditions_over_fire', 'string'),
 ('true_cause', 'string'),
 ('fire_growth', 'double')]

In [12]:
# Check Null Values in Columns
#Dict_Null = {col:fire_df.filter(df[col].isNull()).count() for col in fire_df.columns}
Dict_Null = {col:fire_df.filter(fire_df[col].isNull()).count() for col in fire_df.columns}
Dict_Null

{'bh_fs_date': 0,
 'calendar_year': 0,
 'fire_fighting_start_size': 0,
 'fire_growth': 0,
 'fire_start_date': 0,
 'true_cause': 8111,
 'weather_conditions_over_fire': 2323}

In [13]:
# Count rows of data 
fire_df.count()

14565

In [14]:
# Replace Null Values 
fire_df = fire_df.na.fill("unknown")
fire_df.show()

+-------------+-------------------+------------------------+-------------------+----------------------------+--------------+-----------+
|calendar_year|    fire_start_date|fire_fighting_start_size|         bh_fs_date|weather_conditions_over_fire|    true_cause|fire_growth|
+-------------+-------------------+------------------------+-------------------+----------------------------+--------------+-----------+
|         2015|2015-07-29 15:00:00|                     1.0|2015-07-30 21:01:00|                       Clear|       unknown|        0.4|
|         2015|2015-07-27 12:00:00|                     0.1|2015-07-31 13:08:00|                      Cloudy|       unknown|        1.0|
|         2015|2015-08-16 12:00:00|                     0.5|2015-08-16 17:05:00|                       Clear|       unknown|        1.0|
|         2015|2015-08-10 15:00:00|                     0.3|2015-08-18 09:45:00|                      Cloudy|       unknown|        1.0|
|         2015|2015-08-01 12:00:00|      

Feature Engineering 
1. Removing numbers to identify ignition area 
2. Creating the month of when the fire was discovered
3. Converting strings to dates for start fire fighting date & fire held date (bh_fs_date)
4. Calculating the number of days the fire was held before being under control
5. Review and clean up categorical features in order to use classification machine models 
- Convert weather conditions - clean up 
- Convert true_cause 
- responsible_group_desc 
- fire_fighting_start_size 
- bh_hectares 

In [15]:
fire_df.show(10)

+-------------+-------------------+------------------------+-------------------+----------------------------+--------------+-----------+
|calendar_year|    fire_start_date|fire_fighting_start_size|         bh_fs_date|weather_conditions_over_fire|    true_cause|fire_growth|
+-------------+-------------------+------------------------+-------------------+----------------------------+--------------+-----------+
|         2015|2015-07-29 15:00:00|                     1.0|2015-07-30 21:01:00|                       Clear|       unknown|        0.4|
|         2015|2015-07-27 12:00:00|                     0.1|2015-07-31 13:08:00|                      Cloudy|       unknown|        1.0|
|         2015|2015-08-16 12:00:00|                     0.5|2015-08-16 17:05:00|                       Clear|       unknown|        1.0|
|         2015|2015-08-10 15:00:00|                     0.3|2015-08-18 09:45:00|                      Cloudy|       unknown|        1.0|
|         2015|2015-08-01 12:00:00|      

In [None]:
# 1. Removing last 3 digits from fire_number to to identify location area of fire

#fire_df = fire_df.withColumn("fire_number",expr("substring(fire_number, 1, length(fire_number)-3)"))
#fire_df.show()


In [16]:
# 2.Convert discovered_date to look at just months (find trends in months/ seasons)
from pyspark.sql.functions import to_timestamp, date_format
fire_df = fire_df.withColumn('fire_start_date', to_timestamp (col('fire_start_date'))).withColumn('Month', date_format(col('fire_start_date'), 'M'))
fire_df.show()

+-------------+-------------------+------------------------+-------------------+----------------------------+--------------+-----------+-----+
|calendar_year|    fire_start_date|fire_fighting_start_size|         bh_fs_date|weather_conditions_over_fire|    true_cause|fire_growth|Month|
+-------------+-------------------+------------------------+-------------------+----------------------------+--------------+-----------+-----+
|         2015|2015-07-29 15:00:00|                     1.0|2015-07-30 21:01:00|                       Clear|       unknown|        0.4|    7|
|         2015|2015-07-27 12:00:00|                     0.1|2015-07-31 13:08:00|                      Cloudy|       unknown|        1.0|    7|
|         2015|2015-08-16 12:00:00|                     0.5|2015-08-16 17:05:00|                       Clear|       unknown|        1.0|    8|
|         2015|2015-08-10 15:00:00|                     0.3|2015-08-18 09:45:00|                      Cloudy|       unknown|        1.0|    8|

In [17]:
# 3. Convert start_for_fire_date & bh_fs_date to timestamp to find length of fire held time
#fire_df = fire_df.withColumn('start_for_fire_date', col('start_for_fire_date').cast('timestamp'))
fire_df = fire_df.withColumn('fire_start_date', col('fire_start_date').cast('timestamp'))

In [18]:
# Convert start_for_fire_date & bh_fs_date to timestamp to find length of fire held time
fire_df = fire_df.withColumn('bh_fs_date', col('bh_fs_date').cast('timestamp'))
fire_df.show()

+-------------+-------------------+------------------------+-------------------+----------------------------+--------------+-----------+-----+
|calendar_year|    fire_start_date|fire_fighting_start_size|         bh_fs_date|weather_conditions_over_fire|    true_cause|fire_growth|Month|
+-------------+-------------------+------------------------+-------------------+----------------------------+--------------+-----------+-----+
|         2015|2015-07-29 15:00:00|                     1.0|2015-07-30 21:01:00|                       Clear|       unknown|        0.4|    7|
|         2015|2015-07-27 12:00:00|                     0.1|2015-07-31 13:08:00|                      Cloudy|       unknown|        1.0|    7|
|         2015|2015-08-16 12:00:00|                     0.5|2015-08-16 17:05:00|                       Clear|       unknown|        1.0|    8|
|         2015|2015-08-10 15:00:00|                     0.3|2015-08-18 09:45:00|                      Cloudy|       unknown|        1.0|    8|

In [19]:
# Check dates have been converted to timestamps
fire_df.dtypes

[('calendar_year', 'int'),
 ('fire_start_date', 'timestamp'),
 ('fire_fighting_start_size', 'double'),
 ('bh_fs_date', 'timestamp'),
 ('weather_conditions_over_fire', 'string'),
 ('true_cause', 'string'),
 ('fire_growth', 'double'),
 ('Month', 'string')]

In [20]:
# Find the length of fire held 
fire_df = fire_df.withColumn("bh_fs_date",to_timestamp(col("bh_fs_date"),"HH:mm:ss.SSS")) \
   .withColumn("fire_start_date",to_timestamp(col("fire_start_date"),"HH:mm:ss.SSS")) \
   .withColumn("DiffInSeconds", col("bh_fs_date").cast("long") - col("fire_start_date").cast("long")) \
   .withColumn("DiffInMinutes",(col("DiffInSeconds")/60)) \
   .withColumn("DiffInHours",(col("DiffInSeconds")/3600)) 
fire_df.show(truncate=False)

+-------------+-------------------+------------------------+-------------------+----------------------------+--------------+-----------+-----+-------------+-------------+------------------+
|calendar_year|fire_start_date    |fire_fighting_start_size|bh_fs_date         |weather_conditions_over_fire|true_cause    |fire_growth|Month|DiffInSeconds|DiffInMinutes|DiffInHours       |
+-------------+-------------------+------------------------+-------------------+----------------------------+--------------+-----------+-----+-------------+-------------+------------------+
|2015         |2015-07-29 15:00:00|1.0                     |2015-07-30 21:01:00|Clear                       |unknown       |0.4        |7    |108060       |1801.0       |30.016666666666666|
|2015         |2015-07-27 12:00:00|0.1                     |2015-07-31 13:08:00|Cloudy                      |unknown       |1.0        |7    |349680       |5828.0       |97.13333333333334 |
|2015         |2015-08-16 12:00:00|0.5            

In [55]:
#import required libraries
from pyspark.ml.feature import StringIndexer
weather_conditions_over_fire_indexer = StringIndexer(inputCol="weather_conditions_over_fire", outputCol="weather_conditions_over_fireIndex")

#Use one hot encoding to encode catergoratical columns 
ec_df = weather_conditions_over_fire_indexer.fit(fire_df).transform(fire_df)
ec_df.show()

+-------------+-------------------+------------------------+-------------------+----------------------------+--------------+-----------+-----+-------------+-------------+------------------+---------------------------------+
|calendar_year|    fire_start_date|fire_fighting_start_size|         bh_fs_date|weather_conditions_over_fire|    true_cause|fire_growth|Month|DiffInSeconds|DiffInMinutes|       DiffInHours|weather_conditions_over_fireIndex|
+-------------+-------------------+------------------------+-------------------+----------------------------+--------------+-----------+-----+-------------+-------------+------------------+---------------------------------+
|         2015|2015-07-29 15:00:00|                     1.0|2015-07-30 21:01:00|                       Clear|       unknown|        0.4|    7|       108060|       1801.0|30.016666666666666|                              0.0|
|         2015|2015-07-27 12:00:00|                     0.1|2015-07-31 13:08:00|                      Cl

In [58]:
#Use one hot encoding to encode catergoratical columns 
true_cause_indexer = StringIndexer(inputCol="true_cause", outputCol="true_cause_fireIndex")

#Fits a model to the input dataset with optional parameters.
ec_df1 = true_cause_indexer.fit(fire_df).transform(fire_df)
ec_df1.show()

+-------------+-------------------+------------------------+-------------------+----------------------------+--------------+-----------+-----+-------------+-------------+------------------+--------------------+
|calendar_year|    fire_start_date|fire_fighting_start_size|         bh_fs_date|weather_conditions_over_fire|    true_cause|fire_growth|Month|DiffInSeconds|DiffInMinutes|       DiffInHours|true_cause_fireIndex|
+-------------+-------------------+------------------------+-------------------+----------------------------+--------------+-----------+-----+-------------+-------------+------------------+--------------------+
|         2015|2015-07-29 15:00:00|                     1.0|2015-07-30 21:01:00|                       Clear|       unknown|        0.4|    7|       108060|       1801.0|30.016666666666666|                 0.0|
|         2015|2015-07-27 12:00:00|                     0.1|2015-07-31 13:08:00|                      Cloudy|       unknown|        1.0|    7|       349680|

In [61]:
#import module
from pyspark.ml import Pipeline

#Create pipeline and pass all stages
pipeline = Pipeline(stages=[weather_conditions_over_fire_indexer,
                            true_cause_indexer,
                    ])


In [62]:
# Create pipeline to pass all stages 
df_transformed = pipeline.fit(fire_df).transform(fire_df)
df_transformed.show()

+-------------+-------------------+------------------------+-------------------+----------------------------+--------------+-----------+-----+-------------+-------------+------------------+---------------------------------+--------------------+
|calendar_year|    fire_start_date|fire_fighting_start_size|         bh_fs_date|weather_conditions_over_fire|    true_cause|fire_growth|Month|DiffInSeconds|DiffInMinutes|       DiffInHours|weather_conditions_over_fireIndex|true_cause_fireIndex|
+-------------+-------------------+------------------------+-------------------+----------------------------+--------------+-----------+-----+-------------+-------------+------------------+---------------------------------+--------------------+
|         2015|2015-07-29 15:00:00|                     1.0|2015-07-30 21:01:00|                       Clear|       unknown|        0.4|    7|       108060|       1801.0|30.016666666666666|                              0.0|                 0.0|
|         2015|2015-

In [72]:
# Clean dataframe for model 
final_df = df_transformed.drop("weather_conditions_over_fire").drop("true_cause").drop("Month").drop("DiffInSeconds").drop("DiffInMinutes")\
.drop("fire_start_date").drop("bh_fs_date")
final_df.show(10)

+-------------+------------------------+-----------+------------------+---------------------------------+--------------------+
|calendar_year|fire_fighting_start_size|fire_growth|       DiffInHours|weather_conditions_over_fireIndex|true_cause_fireIndex|
+-------------+------------------------+-----------+------------------+---------------------------------+--------------------+
|         2015|                     1.0|        0.4|30.016666666666666|                              0.0|                 0.0|
|         2015|                     0.1|        1.0| 97.13333333333334|                              1.0|                 0.0|
|         2015|                     0.5|        1.0| 5.083333333333333|                              0.0|                 0.0|
|         2015|                     0.3|        1.0|            186.75|                              1.0|                 0.0|
|         2015|                    0.01|        1.0| 392.3666666666667|                              0.0|      

In [73]:
# dropping all infinity and NaN values before hitting the database
final_df = final_df.replace([np.inf, -np.inf], np.nan)
final_df = final_df[final_df.fire_growth != np.nan]
final_df = final_df[final_df.DiffInHours != np.nan]
final_df = final_df[final_df.fire_fighting_start_size != np.nan]
final_df.show()

+-------------+------------------------+-----------+------------------+---------------------------------+--------------------+
|calendar_year|fire_fighting_start_size|fire_growth|       DiffInHours|weather_conditions_over_fireIndex|true_cause_fireIndex|
+-------------+------------------------+-----------+------------------+---------------------------------+--------------------+
|         2015|                     1.0|        0.4|30.016666666666666|                              0.0|                 0.0|
|         2015|                     0.1|        1.0| 97.13333333333334|                              1.0|                 0.0|
|         2015|                     0.5|        1.0| 5.083333333333333|                              0.0|                 0.0|
|         2015|                     0.3|        1.0|            186.75|                              1.0|                 0.0|
|         2015|                    0.01|        1.0| 392.3666666666667|                              0.0|      

In [74]:
# Decide on features and label: 
#"user-input" for how many days before a fire is predicted, we estimate the size before and the fire size is being held i.e. predicting change in starting fire size and discovered size by the number of days 

# Split our preprocessed data into our features and target arrays
# Output labels 
y = final_df.select("fire_growth").toPandas()

# Features data 
X = final_df.drop("fire_growth").toPandas()

# Split the preprocessed data into a training & test dataset
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=78)

In [75]:
# Check the shape of X 
X.shape

(14031, 5)

In [76]:
# Check y shape samples
y.shape

(14031, 1)

In [77]:
# Define the linear regression model
from sklearn.linear_model import LinearRegression
model = LinearRegression()


In [78]:
# Create a StandardScaler instance
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()

# Fit the StandardScaler
X_scaler = scaler.fit(X_train)

# Scale the data 
X_trained_scaled = X_scaler.transform(X_train)
X_test_scaled = X_scaler.transform(X_test)

In [79]:
# Train the model
model.fit(X_train, y_train)


LinearRegression(copy_X=True, fit_intercept=True, n_jobs=None, normalize=False)

In [80]:
# Evaluate the model
y_pred = model.predict(X_test_scaled)
y_pred

array([[-12371.02109858],
       [-12382.77011381],
       [-12379.84715585],
       ...,
       [-12367.87337402],
       [-12374.32656577],
       [-12379.84715657]])

In [81]:
# Retrieving the model intercept and slope 
print(model.coef_)
print(model.intercept_)

[[ 6.18950589e+00 -3.16982249e-02  1.55336030e-04 -5.48987254e+00
  -3.35717838e+00]]
[-12374.70600828]


In [82]:
# Calculating the R squared value 
from sklearn.metrics import r2_score
r2_score(y_test, y_pred)

-6.231027132035251

In [89]:
# Determine the shape of our training and testing sets.
# X & y_train are 75% & X & y_test are 25%
print(X_train.shape)
print(X_test.shape)
print(y_train.shape)
print(y_test.shape)


(10523, 5)
(3508, 5)
(10523, 1)
(3508, 1)
