In [1]:
#Create a pyspark session;

In [2]:
from pyspark.sql import SparkSession

In [3]:
# Initialize Spark session with specified memory and cores
spark = SparkSession.builder.appName("OptimizedDataLoading").config("spark.driver.memory", "7g").config("spark.executor.memory", "7g").config("spark.driver.cores", "5").getOrCreate()

24/11/17 20:37:51 WARN Utils: Your hostname, Shwetas-Mac-mini.local resolves to a loopback address: 127.0.0.1; using 192.168.1.101 instead (on interface en1)
24/11/17 20:37:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/17 20:37:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Loading the data from a CSV file into a DataFrame
data = spark.read.csv("data", header=True, inferSchema=True)

                                                                                

In [5]:
data.head(1)

24/11/17 20:38:20 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[Row(Start_Lat=30.265263, Start_Lng=-97.736488, Severity=0, Distance(mi)=0.07, City='Austin', State='TX', Temperature(F)=48.0, Wind_Chill(F)=45.0, Humidity(%)=58.0, Pressure(in)=29.33, Visibility(mi)=10.0, Wind_Speed(mph)=7.0, Precipitation(in)=0.0, Amenity=0, Bump=0, Crossing=1, Give_Way=0, Junction=0, No_Exit=0, Railway=0, Roundabout=0, Station=0, Stop=0, Traffic_Calming=0, Traffic_Signal=1, Turning_Loop=0, Duration_Minutes=123.22, nearest_station='000011', distance_to_station_km=1.4, traffic_volume=98, date=datetime.date(2021, 1, 1), time_of_day='Afternoon', day_of_week='Fri', is_weekend=0)]

In [6]:
#We will only consider entries that are near to the station, to get an accurate traffic volume;

In [7]:
# Set a threshold for distance
threshold = 5.0

In [8]:
data = data.filter(data["distance_to_station_km"] <= threshold)

In [9]:
data = data.drop('nearest_station', 'distance_to_station_km')

In [10]:
#We will standardize some features which have different scales

In [11]:
#############################################################################################################################################

In [12]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

In [13]:
feature_columns = [
    "Distance(mi)", "Temperature(F)", "Wind_Chill(F)", "Humidity(%)",
    "Pressure(in)", "Visibility(mi)", "Wind_Speed(mph)", "Precipitation(in)",
    "Duration_Minutes", "traffic_volume"]

In [14]:
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features_vector")
data = assembler.transform(data)

In [15]:
data.head(1)

[Row(Start_Lat=30.265263, Start_Lng=-97.736488, Severity=0, Distance(mi)=0.07, City='Austin', State='TX', Temperature(F)=48.0, Wind_Chill(F)=45.0, Humidity(%)=58.0, Pressure(in)=29.33, Visibility(mi)=10.0, Wind_Speed(mph)=7.0, Precipitation(in)=0.0, Amenity=0, Bump=0, Crossing=1, Give_Way=0, Junction=0, No_Exit=0, Railway=0, Roundabout=0, Station=0, Stop=0, Traffic_Calming=0, Traffic_Signal=1, Turning_Loop=0, Duration_Minutes=123.22, traffic_volume=98, date=datetime.date(2021, 1, 1), time_of_day='Afternoon', day_of_week='Fri', is_weekend=0, features_vector=DenseVector([0.07, 48.0, 45.0, 58.0, 29.33, 10.0, 7.0, 0.0, 123.22, 98.0]))]

In [16]:
#Once these features are added as vector, we can safely remove the original columns

In [17]:
data = data.drop(*feature_columns)

In [18]:
data.head(1)

[Row(Start_Lat=30.265263, Start_Lng=-97.736488, Severity=0, City='Austin', State='TX', Amenity=0, Bump=0, Crossing=1, Give_Way=0, Junction=0, No_Exit=0, Railway=0, Roundabout=0, Station=0, Stop=0, Traffic_Calming=0, Traffic_Signal=1, Turning_Loop=0, date=datetime.date(2021, 1, 1), time_of_day='Afternoon', day_of_week='Fri', is_weekend=0, features_vector=DenseVector([0.07, 48.0, 45.0, 58.0, 29.33, 10.0, 7.0, 0.0, 123.22, 98.0]))]

In [19]:
#Apply StandardScaler to standardize the features

In [20]:
scaler = StandardScaler(inputCol="features_vector", outputCol="scaled_features", withMean=True, withStd=True)
scaler_model = scaler.fit(data)
data = scaler_model.transform(data)

                                                                                

In [21]:
data = data.drop('features_vector')

In [22]:
#we will add the binary features to the scaled vectors

In [23]:
binary_features = [
    "Amenity", "Bump", "Crossing", "Give_Way", "Junction",
    "No_Exit", "Railway", "Roundabout", "Station", "Stop",
    "Traffic_Calming", "Traffic_Signal", "Turning_Loop"
]

In [24]:
# Use VectorAssembler to combine existing features_vector with binary features
assembler = VectorAssembler(inputCols=["scaled_features"] + binary_features, outputCol="combined_features")
data = assembler.transform(data)

In [25]:
data = data.drop('scaled_features')

In [26]:
data = data.drop(*binary_features)

In [27]:
data.head(1)

[Row(Start_Lat=30.265263, Start_Lng=-97.736488, Severity=0, City='Austin', State='TX', date=datetime.date(2021, 1, 1), time_of_day='Afternoon', day_of_week='Fri', is_weekend=0, combined_features=SparseVector(23, {0: -0.5271, 1: -0.8308, 2: -0.8411, 3: -0.1825, 4: -0.1489, 5: 0.3481, 6: -0.0901, 7: -0.1319, 8: -0.0408, 9: -0.6464, 12: 1.0, 21: 1.0}))]

In [28]:
#So currently the order of things in the vector is;

In [29]:
#---- "Distance(mi)", "Temperature(F)", "Wind_Chill(F)", "Humidity(%)",
#---- "Pressure(in)", "Visibility(mi)", "Wind_Speed(mph)", "Precipitation(in)",
#---- "Duration_Minutes", "traffic_volume"
#---- "Amenity", "Bump", "Crossing", "Give_Way", "Junction",
#---- "No_Exit", "Railway", "Roundabout", "Station", "Stop",
#---- "Traffic_Calming", "Traffic_Signal", "Turning_Loop"

In [30]:
#We will one-hot encode the ""time_of_day='Afternoon', day_of_week='Fri'""

In [31]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

In [32]:
#Index the `time_of_day` column
time_of_day_indexer = StringIndexer(inputCol="time_of_day", outputCol="time_of_day_index")

# Fit the StringIndexer to the data
time_of_day_indexer_model = time_of_day_indexer.fit(data)

# Transform the data
data = time_of_day_indexer_model.transform(data)

# Access the labels
print(time_of_day_indexer_model.labels)



['Afternoon', 'Morning', 'Evening', 'Night']


                                                                                

In [33]:
# Apply OneHotEncoder to the indexed column
time_of_day_encoder = OneHotEncoder(inputCol="time_of_day_index", outputCol="time_of_day_onehot")
data = time_of_day_encoder.fit(data).transform(data)

In [34]:
data = data.drop('time_of_day', 'time_of_day_index')

In [35]:
# Index the `day_of_week` column
day_of_week_indexer = StringIndexer(inputCol="day_of_week", outputCol="day_of_week_index")

day_of_week_indexer_model = day_of_week_indexer.fit(data)

data = day_of_week_indexer_model.transform(data)

# Access the labels
print(day_of_week_indexer_model.labels)



['Fri', 'Thu', 'Wed', 'Tue', 'Mon', 'Sat', 'Sun']


                                                                                

In [36]:
# Apply OneHotEncoder to the indexed column
day_of_week_encoder = OneHotEncoder(inputCol="day_of_week_index", outputCol="day_of_week_onehot")
data = day_of_week_encoder.fit(data).transform(data)

In [37]:
data = data.drop('day_of_week_index', 'day_of_week', 'is_weekend')

In [38]:
data.head(1)

[Row(Start_Lat=30.265263, Start_Lng=-97.736488, Severity=0, City='Austin', State='TX', date=datetime.date(2021, 1, 1), combined_features=SparseVector(23, {0: -0.5271, 1: -0.8308, 2: -0.8411, 3: -0.1825, 4: -0.1489, 5: 0.3481, 6: -0.0901, 7: -0.1319, 8: -0.0408, 9: -0.6464, 12: 1.0, 21: 1.0}), time_of_day_onehot=SparseVector(3, {0: 1.0}), day_of_week_onehot=SparseVector(6, {0: 1.0}))]

In [39]:
# Combine all features into a single vector
assembler = VectorAssembler(
    inputCols=["combined_features", "time_of_day_onehot", "day_of_week_onehot"],
    outputCol="final_features"
)
data = assembler.transform(data)

In [40]:
data = data.drop('combined_features', 'time_of_day_onehot', 'day_of_week_onehot')

In [41]:
data.head(1)

[Row(Start_Lat=30.265263, Start_Lng=-97.736488, Severity=0, City='Austin', State='TX', date=datetime.date(2021, 1, 1), final_features=SparseVector(32, {0: -0.5271, 1: -0.8308, 2: -0.8411, 3: -0.1825, 4: -0.1489, 5: 0.3481, 6: -0.0901, 7: -0.1319, 8: -0.0408, 9: -0.6464, 12: 1.0, 21: 1.0, 23: 1.0, 26: 1.0}))]

In [42]:
#The columns will be like

In [43]:
#---- "Distance(mi)", "Temperature(F)", "Wind_Chill(F)", "Humidity(%)",
#---- "Pressure(in)", "Visibility(mi)", "Wind_Speed(mph)", "Precipitation(in)",
#---- "Duration_Minutes", "traffic_volume"
#---- "Amenity", "Bump", "Crossing", "Give_Way", "Junction",
#---- "No_Exit", "Railway", "Roundabout", "Station", "Stop",
#---- "Traffic_Calming", "Traffic_Signal", "Turning_Loop"
#---- 'Afternoon', 'Morning', 'Evening', 'Night'
#---- 'Fri', 'Thu', 'Wed', 'Tue', 'Mon', 'Sat', 'Sun'

In [44]:
###########################################################################################################################################

In [45]:
#The classes are highly imbalanced, in pyspark random forest there is no class weight attribute for this, thereby undersampling

In [46]:
# Separate the majority and minority classes
majority_class = data.filter(data["Severity"] == 0)
minority_class = data.filter(data["Severity"] == 1)

In [47]:
# Perform undersampling on the majority class
# Adjust the fraction to match the size of the minority class
fraction = minority_class.count() / float(majority_class.count())
majority_sampled = majority_class.sample(withReplacement=False, fraction=fraction, seed=7)



CodeCache: size=131072Kb used=40094Kb max_used=40094Kb free=90977Kb
 bounds [0x0000000105d3c000, 0x00000001084ac000, 0x000000010dd3c000]
 total_blobs=14909 nmethods=13915 adapters=905
 compilation: disabled (not enough contiguous free space left)


                                                                                

In [48]:
# Combine the sampled majority class with the minority class
data = majority_sampled.union(minority_class)

# Check the class distribution after undersampling
data.groupBy("Severity").count().show()



+--------+------+
|Severity| count|
+--------+------+
|       0|172294|
|       1|172480|
+--------+------+



                                                                                

In [49]:
###########################################################################################################################################

In [50]:
# Split the balanced data into training and testing sets;
(trainingData, testData) = data.randomSplit([0.9, 0.1], seed=5)

In [51]:
# Check class distribution in the training data
trainingData.groupBy("Severity").count().show()

# Check class distribution in the testing data
testData.groupBy("Severity").count().show()

                                                                                

+--------+------+
|Severity| count|
+--------+------+
|       0|155091|
|       1|155088|
+--------+------+





+--------+-----+
|Severity|count|
+--------+-----+
|       0|17203|
|       1|17392|
+--------+-----+



                                                                                

In [52]:
##########################################################################################################################################

In [53]:
# Initialize and train a Random Forest model

In [54]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [55]:
rf = RandomForestClassifier(featuresCol="final_features", labelCol="Severity", numTrees=2000)
model = rf.fit(trainingData)

24/11/17 20:46:09 WARN DAGScheduler: Broadcasting large task binary with size 1102.0 KiB
24/11/17 20:46:52 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/11/17 20:48:07 WARN DAGScheduler: Broadcasting large task binary with size 4.5 MiB
24/11/17 20:49:52 WARN DAGScheduler: Broadcasting large task binary with size 1416.3 KiB
24/11/17 20:50:01 WARN DAGScheduler: Broadcasting large task binary with size 9.0 MiB
24/11/17 20:52:45 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
                                                                                

In [56]:
# Make predictions and evaluate the model
predictions = model.transform(testData)

evaluator = MulticlassClassificationEvaluator(labelCol="Severity", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Accuracy:", accuracy)

24/11/17 20:53:01 WARN DAGScheduler: Broadcasting large task binary with size 9.7 MiB

Test Accuracy: 0.6927301633183987


                                                                                

In [57]:
# Precision, Recall, and F1-Score
evaluator_precision = MulticlassClassificationEvaluator(labelCol="Severity", predictionCol="prediction", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="Severity", predictionCol="prediction", metricName="weightedRecall")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="Severity", predictionCol="prediction", metricName="f1")

precision = evaluator_precision.evaluate(predictions)
recall = evaluator_recall.evaluate(predictions)
f1_score = evaluator_f1.evaluate(predictions)

print("Precision:", precision)
print("Recall:", recall)
print("F1-Score:", f1_score)

24/11/17 20:56:18 WARN DAGScheduler: Broadcasting large task binary with size 9.7 MiB
24/11/17 20:57:19 WARN DAGScheduler: Broadcasting large task binary with size 9.7 MiB
24/11/17 20:58:19 WARN DAGScheduler: Broadcasting large task binary with size 9.7 MiB

Precision: 0.6942584422901629
Recall: 0.6927301633183987
F1-Score: 0.6919845282927657


                                                                                

In [58]:
# Generate a confusion matrix using the predictions and the actual labels
predictions.groupBy("Severity", "prediction").count().show()

24/11/17 20:59:19 WARN DAGScheduler: Broadcasting large task binary with size 9.7 MiB

+--------+----------+-----+
|Severity|prediction|count|
+--------+----------+-----+
|       0|       0.0|11083|
|       0|       1.0| 6120|
|       1|       0.0| 4510|
|       1|       1.0|12882|
+--------+----------+-----+



24/11/17 21:00:19 WARN DAGScheduler: Broadcasting large task binary with size 9.6 MiB
                                                                                

True Negatives (TN): 11,096
The model correctly predicted Severity = 0 when the actual severity was 0.

False Positives (FP): 6,107
The model incorrectly predicted Severity = 1 when the actual severity was 0. This represents instances where the model overestimated the severity.

False Negatives (FN): 4,532
The model incorrectly predicted Severity = 0 when the actual severity was 1. This represents instances where the model underestimated the severity.

True Positives (TP): 12,860
The model correctly predicted Severity = 1 when the actual severity was 1.

In [60]:
# The order of features in `final_features`:

# 1. Scaled Numerical Features (10 features, in this order):
#    1. "Distance(mi)"
#    2. "Temperature(F)"
#    3. "Wind_Chill(F)"
#    4. "Humidity(%)"
#    5. "Pressure(in)"
#    6. "Visibility(mi)"
#    7. "Wind_Speed(mph)"
#    8. "Precipitation(in)"
#    9. "Duration_Minutes"
#    10. "traffic_volume"

# 2. Binary Features (13 features, in this order):
#    11. "Amenity"
#    12. "Bump"
#    13. "Crossing"
#    14. "Give_Way"
#    15. "Junction"
#    16. "No_Exit"
#    17. "Railway"
#    18. "Roundabout"
#    19. "Station"
#    20. "Stop"
#    21. "Traffic_Calming"
#    22. "Traffic_Signal"
#    23. "Turning_Loop"

# 3. One-Hot Encoded Features:
#    - time_of_day_onehot (number of features = number of unique categories in `time_of_day` minus 1)
#    - day_of_week_onehot (number of features = number of unique categories in `day_of_week` minus 1)

# Example:
# If `time_of_day` has 4 unique categories, then time_of_day_onehot will have 3 features.
# If `day_of_week` has 7 unique categories, then day_of_week_onehot will have 6 features.

# Total number of features in `final_features`:
# 10 (scaled numerical features) + 13 (binary features) + 3 (time_of_day_onehot) + 6 (day_of_week_onehot) = 32 features

In [61]:
# Access the trained Random Forest model
print("Feature Importances:", model.featureImportances)

Feature Importances: (32,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,18,19,20,21,23,24,25,26,27,28,29,30,31],[0.16356948252170928,0.07032898235626303,0.05860297711694641,0.022249915459421305,0.17149024587952263,0.0010908542462190174,0.004427756210794965,0.0004480221502391222,0.37975030328792186,0.014440496935351809,0.00020218991813986928,2.1139965625822512e-05,0.0018246128905238875,0.00021685020336219033,0.012741860290094923,8.77642025053312e-05,0.00013194967485078153,0.004862866318651884,0.0005861415951468119,1.898358946969861e-05,0.0019136155391614618,0.06842075734496172,0.007144484353726788,0.013792365736247042,0.000505490937766686,0.0003243639620001244,8.630204659323826e-05,0.0002757811571415296,9.176314657862345e-05,0.00035168096306207756])


32: The total number of features.

[indices]: The indices of the features.

[importances]: The importance scores corresponding to each feature index.

In [63]:
# # Feature Names in the Order of `final_features`
# feature_names = [
#     "Distance(mi)",                # 0
#     "Temperature(F)",              # 1
#     "Wind_Chill(F)",               # 2
#     "Humidity(%)",                 # 3
#     "Pressure(in)",                # 4
#     "Visibility(mi)",              # 5
#     "Wind_Speed(mph)",             # 6
#     "Precipitation(in)",           # 7
#     "Duration_Minutes",            # 8
#     "traffic_volume",              # 9
#     "Amenity",                     # 10
#     "Bump",                        # 11
#     "Crossing",                    # 12
#     "Give_Way",                    # 13
#     "Junction",                    # 14
#     "No_Exit",                     # 15
#     "Railway",                     # 16
#     "Roundabout",                  # 17
#     "Station",                     # 18
#     "Stop",                        # 19
#     "Traffic_Calming",             # 20
#     "Traffic_Signal",              # 21
#     "Turning_Loop",                # 22
#     "Afternoon",                   # 23 (One-Hot Encoded)
#     "Morning",                     # 24 (One-Hot Encoded)
#     "Evening",                     # 25 (One-Hot Encoded)
#     # "Night" is not included as it is the reference category
#     "Fri",                         # 26 (One-Hot Encoded)
#     "Thu",                         # 27 (One-Hot Encoded)
#     "Wed",                         # 28 (One-Hot Encoded)
#     "Tue",                         # 29 (One-Hot Encoded)
#     "Mon",                         # 30 (One-Hot Encoded)
#     "Sat"                          # 31 (One-Hot Encoded)
#     # "Sun" is not included as it is the reference category
# ]

In [64]:
# # Feature Importances from the Model
# importances = [
#     0.17394707113931535,  # Distance(mi)
#     0.06897441105651222,  # Temperature(F)
#     0.06055660873673941,  # Wind_Chill(F)
#     0.022732523660063102, # Humidity(%)
#     0.16852914570339161,  # Pressure(in)
#     0.0011179775026315254,# Visibility(mi)
#     0.005057684382629142, # Wind_Speed(mph)
#     0.0005198216945008352,# Precipitation(in)
#     0.3659642468730161,   # Duration_Minutes
#     0.013483433139066718, # traffic_volume
#     0.0002104069482080534,# Amenity
#     2.246449146095659e-05,# Bump
#     0.0019763951466569315,# Crossing
#     0.000245813917017759, # Give_Way
#     0.014700306917373689, # Junction
#     9.06908388498976e-05, # No_Exit
#     0.00011299998075738631,# Railway
#     0.0,                  # Roundabout (not important)
#     0.004304892804743067, # Station
#     0.0006019762745434373,# Stop
#     3.140434662607242e-05,# Traffic_Calming
#     0.002190662073182403, # Traffic_Signal
#     0.0,                  # Turning_Loop (not important)
#     0.07115104416059445,  # Afternoon
#     0.007352640662777778, # Morning
#     0.014368749450850517, # Evening
#     # "Night" is removed as it is the reference category
#     0.00034579810772437654,# Fri
#     7.237659882916528e-05,# Thu
#     0.0002579104712474635,# Wed
#     8.986670807331296e-05,# Tue
#     0.00045023922322031,  # Mon
#     # "Sun" is removed as it is the reference category
# ]

**Most Important Features:**

**Duration_Minutes (0.366): This feature has the highest importance score, meaning that the duration of an event (e.g., the duration of an accident or traffic delay) is the most influential factor in predicting accident severity. This suggests that longer durations are strongly associated with more severe outcomes.**

**Distance(mi) (0.174): The distance of the event also has a significant impact, indicating that the distance covered during an incident is a crucial factor in determining severity.**

**Pressure(in) (0.169): Atmospheric pressure is another critical feature, likely affecting driving conditions or accident likelihood.**

In [66]:
########################################################################################################################################

**Moderately Important Features:**

**Afternoon (0.071): The time of day, particularly the afternoon, plays a moderate role in predicting accident severity. This may reflect higher traffic volumes or changing road conditions during this period.**

**Temperature(F) (0.069) and Wind_Chill(F) (0.061): Weather conditions, including temperature and wind chill, are moderately important and influence road safety.**

**Junction (0.015): The presence of a junction is another moderately influential feature, likely because intersections are common sites for accidents.**

**Evening (0.014): The evening time period has a moderate impact, possibly due to reduced visibility or higher traffic density.**

In [68]:
##########################################################################################################################################

**Less Important Features:**

**traffic_volume (0.013): Surprisingly, traffic volume has a lower importance score than some other features, indicating that while it affects accident severity, it is not as significant as other factors.**

**Station (0.004), Traffic_Signal (0.002), and Crossing (0.002): These features have relatively low importance scores, suggesting they contribute less to the model's predictions.**

**Morning (0.007): The morning time period has a minor influence compared to the afternoon and evening.**

In [70]:
#########################################################################################################################################

**Least Important Features:**

**Features like Bump (0.00002), Traffic_Calming (0.00003), and Thu (0.00007) have very low importance scores, indicating minimal impact on the model's predictions.**

**Features with an importance of 0.0 (e.g., Roundabout and Turning_Loop) are not contributing at all and could be candidates for removal in future models.**

In [72]:
#Given the traffic volume and other parameters, will the model be able to predict correctly the severity of the accident?

In [73]:
#I will say 'YES' for about 70%

In [74]:
#I believe with more training we would have been able to achieve higher classification metrics

In [75]:
import json

In [76]:
# Feature names
feature_names = [
    "Distance(mi)", "Temperature(F)", "Wind_Chill(F)", "Humidity(%)", "Pressure(in)",
    "Visibility(mi)", "Wind_Speed(mph)", "Precipitation(in)", "Duration_Minutes",
    "traffic_volume", "Amenity", "Bump", "Crossing", "Give_Way", "Junction",
    "No_Exit", "Railway", "Roundabout", "Station", "Stop", "Traffic_Calming",
    "Traffic_Signal", "Turning_Loop", "Afternoon", "Morning", "Evening", "Fri",
    "Thu", "Wed", "Tue", "Mon", "Sat"
]

In [77]:
# Save to a JSON file
with open('RANDOM FOREST/feature_names.json', 'w') as file:
    json.dump(feature_names, file)

In [78]:
#Saving the model;

In [99]:
# For PySpark models, use the built-in save method
model.save("RANDOM FOREST/rf_model")

24/11/17 21:05:48 WARN TaskSetManager: Stage 64 contains a task of very large size (1246 KiB). The maximum recommended task size is 1000 KiB.
