### Import the necessary libraries

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
# sc = SparkContext('local[*]')
# spark = SparkSession(sc)

### Create a SparkSession

In [3]:
spark = SparkSession.builder.master('local').getOrCreate()

### Load data into a PySpark Dataframe

In [4]:
# reading in pyspark df
spark_df = spark.read.csv("./Documents/Flatiron/forestfires.csv", header = 'true', inferSchema = 'true')

In [5]:
# observing datatype of df
type(spark_df)

pyspark.sql.dataframe.DataFrame

In [6]:
spark_df.head()

Row(X=7, Y=5, month='mar', day='fri', FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0)

In [7]:
spark_df.columns

['X',
 'Y',
 'month',
 'day',
 'FFMC',
 'DMC',
 'DC',
 'ISI',
 'temp',
 'RH',
 'wind',
 'rain',
 'area']

In [8]:
# select multiple columns
spark_df[['month', 'day', 'rain']]

DataFrame[month: string, day: string, rain: double]

In [9]:
# spark dataframe methods kept
col_rain = spark_df.select('rain')

In [10]:
# simply select one column, pandas likewise 
spark_df['rain']

Column<b'rain'>

In [11]:
spark_df.dtypes

[('X', 'int'),
 ('Y', 'int'),
 ('month', 'string'),
 ('day', 'string'),
 ('FFMC', 'double'),
 ('DMC', 'double'),
 ('DC', 'double'),
 ('ISI', 'double'),
 ('temp', 'double'),
 ('RH', 'int'),
 ('wind', 'double'),
 ('rain', 'double'),
 ('area', 'double')]

In [12]:
# correlation between month and fire area
spark_df_months = spark_df.groupBy('month').agg({'area' : 'mean'})
spark_df_months

DataFrame[month: string, avg(area): double]

In [13]:
# action
spark_df_months.collect()

[Row(month='jun', avg(area)=5.841176470588234),
 Row(month='aug', avg(area)=12.489076086956521),
 Row(month='may', avg(area)=19.24),
 Row(month='feb', avg(area)=6.275),
 Row(month='sep', avg(area)=17.942616279069753),
 Row(month='mar', avg(area)=4.356666666666667),
 Row(month='oct', avg(area)=6.638),
 Row(month='jul', avg(area)=14.3696875),
 Row(month='nov', avg(area)=0.0),
 Row(month='apr', avg(area)=8.891111111111112),
 Row(month='dec', avg(area)=13.33),
 Row(month='jan', avg(area)=0.0)]

### Conclusion:
spread area of wildfire gets larger during summer months

### Boolean Mask

In [14]:
no_rain = spark_df.filter(spark_df['rain'] == 0.0)
some_rain = spark_df.filter(spark_df['rain'] > 0.0)

In [15]:
from pyspark.sql.functions import mean
print('no rain fire area: ', no_rain.select(mean('area')).show(), '\n')
print('some rain fire area: ', some_rain.select(mean('area')).show(), '\n')

+------------------+
|         avg(area)|
+------------------+
|13.023693516699408|
+------------------+

no rain fire area:  None 

+---------+
|avg(area)|
+---------+
|  1.62375|
+---------+

some rain fire area:  None 



### Conclusion: 
rain plays dominant role in the spread area of wildfire

### Seasonal data

* summer: June, July, August
* winter: December, January, February

In [17]:
summer_months = spark_df.filter(spark_df['month'].isin(['jun', 'jul', 'aug']))
winter_months = spark_df.filter(spark_df['month'].isin(['dec', 'jan', 'feb']))

print('summer months fire area', summer_months.select(mean('area')).show(), '\n')
print('winter months fire area', winter_months.select(mean('area')).show(), '\n')

+------------------+
|         avg(area)|
+------------------+
|12.262317596566525|
+------------------+

summer months fire area None 

+-----------------+
|        avg(area)|
+-----------------+
|7.918387096774193|
+-----------------+

winter months fire area None 



### Machine learning

In [18]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import feature
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder

correlation between months and fire area is observed, so we will include this observation in our model. the day of one week, however, is highly unlikely to make any impact on fire. consequently, we will get rid of this attribute from the dataframe in use of model fit.

In [19]:
fire_df = spark_df.drop('day')
fire_df.head()

Row(X=7, Y=5, month='mar', FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0)

to turn string data-type of months into discrete numbers, two steps are employeed as below: 

* convert string variable into numerical index (`StringIndexer`), e.g. Jan (month_num = 1), Feb (month_num = 2), Mar (month_num = 3), etc.
* create dummy variables that correspond to each month (`OneHotEncoder`), e.g. Jan (mon_1), Feb (mon_2), Mar (mon_3), etc.
    * mon_1 : 1 0 0 0 0 0 0 0 0 0 0 0
    * mon_2 : 0 1 0 0 0 0 0 0 0 0 0 0
    * mon_3 : 0 0 1 0 0 0 0 0 0 0 0 0  

In [20]:
si = StringIndexer(inputCol='month', outputCol='month_num')
reg_model = si.fit(fire_df)
new_df = reg_model.transform(fire_df)

In [21]:
# estimator (aka: untrained transformer)
type(si)

pyspark.ml.feature.StringIndexer

In [22]:
# transformer (aka: trained transformer)
type(reg_model)

pyspark.ml.feature.StringIndexerModel

In [23]:
reg_model.labels

['aug',
 'sep',
 'mar',
 'jul',
 'feb',
 'jun',
 'oct',
 'apr',
 'dec',
 'jan',
 'may',
 'nov']

In [24]:
new_df.head(4)

[Row(X=7, Y=5, month='mar', FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0, month_num=2.0),
 Row(X=7, Y=4, month='oct', FFMC=90.6, DMC=35.4, DC=669.1, ISI=6.7, temp=18.0, RH=33, wind=0.9, rain=0.0, area=0.0, month_num=6.0),
 Row(X=7, Y=4, month='oct', FFMC=90.6, DMC=43.7, DC=686.9, ISI=6.7, temp=14.6, RH=33, wind=1.3, rain=0.0, area=0.0, month_num=6.0),
 Row(X=8, Y=6, month='mar', FFMC=91.7, DMC=33.3, DC=77.5, ISI=9.0, temp=8.3, RH=97, wind=4.0, rain=0.2, area=0.0, month_num=2.0)]

after converting string type of months into numerical ones, we can utilize Spark's `OneHotEncoder()` 

In [25]:
new_df.select('month_num').distinct().collect()

[Row(month_num=8.0),
 Row(month_num=0.0),
 Row(month_num=7.0),
 Row(month_num=1.0),
 Row(month_num=4.0),
 Row(month_num=11.0),
 Row(month_num=3.0),
 Row(month_num=2.0),
 Row(month_num=10.0),
 Row(month_num=6.0),
 Row(month_num=5.0),
 Row(month_num=9.0)]

In [26]:
# fitting and transforming OneHotEncoder
ohe = feature.OneHotEncoder(inputCols=['month_num'], outputCols=['month_vec'], dropLast=True)
one_hot_encoded = ohe.fit(new_df).transform(new_df)
one_hot_encoded.head()

Row(X=7, Y=5, month='mar', FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0, month_num=2.0, month_vec=SparseVector(11, {2: 1.0}))

now we have OneHitEncoded sparse vector as `month_vec`. for instance, month_vec = SparseVector(11, {2: 11.0}) indicates we have a sparse vector of size 11 (because one month is dropped such as `dropLast=True` in order to prevent the collinearity issue: 12 = a + b + c + d + ... + j + k + l) and this particular data point (aka: dp) is the second index (counting from 0, i.e. 0-index vector) of our month labels, that is March based off labels in the `model` StringIndex transformer.

final Spark dataframe for ML trainign as shown below:

In [27]:
features = ['X', 'Y', 'FFMC', 'DMC', 'DC', 'ISI', 'temp', 'RH', 'wind', 'rain', 'month_vec']
target = 'area'

vector = VectorAssembler(inputCols=features, outputCol='features')
vectorized_df = vector.transform(one_hot_encoded)

for all ML models in PySpark, we put all of the features of our model into one sparse vector. This is once again for efficiency sake. Above shows that we are putting all features together by `VectorAssembler()` method.

In [28]:
vectorized_df.head()

Row(X=7, Y=5, month='mar', FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0, month_num=2.0, month_vec=SparseVector(11, {2: 1.0}), features=SparseVector(21, {0: 7.0, 1: 5.0, 2: 86.2, 3: 26.2, 4: 94.3, 5: 5.1, 6: 8.2, 7: 51.0, 8: 6.7, 12: 1.0}))

Now, lets fit our FF data with a `Random Forest Regression` model. 

In [29]:
## instantiating and fitting the model
rf_model = RandomForestRegressor(featuresCol='features', labelCol='area', predictionCol='prediction').fit(vectorized_df)

In [30]:
rf_model.featureImportances

SparseVector(21, {0: 0.1045, 1: 0.0935, 2: 0.1443, 3: 0.0757, 4: 0.0905, 5: 0.0549, 6: 0.1122, 7: 0.1206, 8: 0.1441, 9: 0.0, 10: 0.0068, 11: 0.0304, 12: 0.0, 13: 0.0158, 15: 0.0, 16: 0.0008, 17: 0.0031, 18: 0.0004, 20: 0.0023})

In [31]:
## generating predictions
predictions = rf_model.transform(vectorized_df).select('area', 'prediction')
predictions.head(10)

[Row(area=0.0, prediction=5.124695067988922),
 Row(area=0.0, prediction=5.20366662659808),
 Row(area=0.0, prediction=5.960195222909501),
 Row(area=0.0, prediction=5.94978706040469),
 Row(area=0.0, prediction=5.4586543208320455),
 Row(area=0.0, prediction=8.89401585320033),
 Row(area=0.0, prediction=4.856621383669456),
 Row(area=0.0, prediction=5.256103502718791),
 Row(area=0.0, prediction=5.940880521150659),
 Row(area=0.0, prediction=6.68987674318813)]

Now, lets evaluate how well the model is using `RegressionEvaluator`

In [32]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='area')

In [33]:
## evaluating r^2
evaluator.evaluate(predictions, {evaluator.metricName: 'r2'})

0.5440438100816761

In [34]:
## evaluating mean absolute error
evaluator.evaluate(predictions, {evaluator.metricName: 'mae'})

13.480850716212045