In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import accuracy_score
import numpy as np
from sklearn.preprocessing import LabelEncoder
from imblearn.under_sampling import RandomUnderSampler

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import RandomForestClassificationModel

np.random.seed(42)

In [None]:
  pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=eceb3a484e3d0f0230f9e7a999de10851cd078b4710fe8615dc6686b34dbf96f
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
# Load the Drive helper and mount
from google.colab import drive

# This will prompt for authorization.
drive.mount('/content/drive')

Mounted at /content/drive


Load feature data

In [None]:
dataset_path = '/content/drive/Shared drives/DATA 228/Group Project/'

'''load feature data'''

feature_file_paths = [
    dataset_path+'dataset/46013h2014.txt', dataset_path+'dataset/46013h2015.txt', dataset_path+'dataset/46013h2016.txt', dataset_path+'dataset/46013h2017.txt', dataset_path+'dataset/46013h2018.txt', dataset_path+'dataset/46013h2019.txt', dataset_path+'dataset/46013h2021.txt', dataset_path+'dataset/46013h2022.txt',
    dataset_path+'dataset/46026h2014.txt', dataset_path+'dataset/46026h2015.txt', dataset_path+'dataset/46026h2016.txt', dataset_path+'dataset/46026h2017.txt', dataset_path+'dataset/46026h2018.txt', dataset_path+'dataset/46026h2019.txt', dataset_path+'dataset/46026h2020.txt', dataset_path+'dataset/46026h2021.txt', dataset_path+'dataset/46026h2022.txt',
    dataset_path+'dataset/46237h2014.txt', dataset_path+'dataset/46237h2015.txt', dataset_path+'dataset/46237h2016.txt', dataset_path+'dataset/46237h2017.txt', dataset_path+'dataset/46237h2018.txt', dataset_path+'dataset/46237h2019.txt', dataset_path+'dataset/46237h2020.txt', dataset_path+'dataset/46237h2021.txt', dataset_path+'dataset/46237h2022.txt',
    dataset_path+'dataset/ftpc1h2014.txt', dataset_path+'dataset/ftpc1h2015.txt', dataset_path+'dataset/ftpc1h2016.txt', dataset_path+'dataset/ftpc1h2017.txt', dataset_path+'dataset/ftpc1h2018.txt', dataset_path+'dataset/ftpc1h2019.txt', dataset_path+'dataset/ftpc1h2020.txt', dataset_path+'dataset/ftpc1h2021.txt', dataset_path+'dataset/ftpc1h2022.txt',
    dataset_path+'dataset/pxoc1h2014.txt', dataset_path+'dataset/pxoc1h2015.txt', dataset_path+'dataset/pxoc1h2016.txt', dataset_path+'dataset/pxoc1h2017.txt', dataset_path+'dataset/pxoc1h2018.txt', dataset_path+'dataset/pxoc1h2019.txt', dataset_path+'dataset/pxoc1h2020.txt', dataset_path+'dataset/pxoc1h2021.txt', dataset_path+'dataset/pxoc1h2022.txt',
    dataset_path+'dataset/pxsc1h2014.txt', dataset_path+'dataset/pxsc1h2015.txt', dataset_path+'dataset/pxsc1h2016.txt', dataset_path+'dataset/pxsc1h2017.txt', dataset_path+'dataset/pxsc1h2018.txt', dataset_path+'dataset/pxsc1h2019.txt', dataset_path+'dataset/pxsc1h2020.txt', dataset_path+'dataset/pxsc1h2021.txt', dataset_path+'dataset/pxsc1h2022.txt',
    dataset_path+'dataset/tibc1h2015.txt', dataset_path+'dataset/tibc1h2016.txt', dataset_path+'dataset/tibc1h2017.txt', dataset_path+'dataset/tibc1h2018.txt', dataset_path+'dataset/tibc1h2019.txt', dataset_path+'dataset/tibc1h2020.txt', dataset_path+'dataset/tibc1h2021.txt', dataset_path+'dataset/tibc1h2022.txt'
]


# Initialize an empty DataFrame to store the combined feature data
all_feature_data = pd.DataFrame()

for feature_file_path in feature_file_paths:
    feature_data = pd.read_csv(feature_file_path, delim_whitespace=True, skiprows=[1])
    feature_data['timestamp'] = pd.to_datetime(feature_data[['#YY', 'MM', 'DD', 'hh', 'mm']].astype(str).agg(' '.join, axis=1), format='%Y %m %d %H %M')
    feature_data['year'] = feature_data['timestamp'].dt.year
    feature_data['month'] = feature_data['timestamp'].dt.month
    feature_data['day'] = feature_data['timestamp'].dt.day
    feature_data['hour'] = feature_data['timestamp'].dt.hour
    feature_data['minute'] = feature_data['timestamp'].dt.minute
    all_feature_data = pd.concat([all_feature_data, feature_data], axis=0, ignore_index=True)


print(f'total count of feature data: {all_feature_data.shape[0]}')



# Define the missing value patterns
missing_patterns = [99.00, 999, 999.0, 99.0]

# Loop through each column and replace each pattern with NaN
for column in all_feature_data.columns:
    for pattern in missing_patterns:
        all_feature_data[column] = all_feature_data[column].replace(to_replace=pattern, value=np.nan, regex=True)

missing_data = all_feature_data.isnull().sum()
# Display missing data count for each column
print('missing data count in raw data:')
print(missing_data)

# Setting threshold for excessive missing values (e.g., 60%)
threshold = 0.6 * len(all_feature_data)

# Drop columns with missing values greater than the threshold
all_feature_data.dropna(axis=1, thresh=threshold, inplace=True)

# Drop rows with any missing values
all_feature_data.dropna(axis=0, inplace=True)

total count of feature data: 3111213
missing data count in raw data:
#YY                0
MM                 0
DD                 0
hh                 0
mm                 0
WDIR          992810
WSPD          986877
GST          1201764
WVHT         2829788
DPD          2829788
APD          2829787
MWD          2852039
PRES             179
ATMP          466734
WTMP         1634345
DEWP         2312132
VIS          2425237
TIDE         3111213
timestamp          0
year               0
month              0
day                0
hour               0
minute             0
dtype: int64


Load target file

In [None]:
'''Load target file'''
target_file_path = dataset_path+'dataset/storm_data_search_results.csv'
target_data = pd.read_csv(target_file_path, sep=',')

print(target_data.head(2).to_string())

# Keep only specific columns
selected_columns = ['BEGIN_DATE', 'BEGIN_TIME', 'EVENT_TYPE']
target_data = target_data[selected_columns]

# convert the TIME columns to hourly timestamp
target_data['BEGIN_TIME'] = (target_data['BEGIN_TIME'].floordiv(100))
target_data['BEGIN_TIME'] = target_data['BEGIN_TIME'].astype(str) + '00'

# Convert timestamp columns to a single datetime column
target_data['timestamp'] = pd.to_datetime(target_data[['BEGIN_DATE', 'BEGIN_TIME']].astype(str).agg(' '.join, axis=1), format='%m/%d/%Y %H%M')

   EVENT_ID        CZ_NAME_STR BEGIN_LOCATION  BEGIN_DATE  BEGIN_TIME  EVENT_TYPE MAGNITUDE TOR_F_SCALE  DEATHS_DIRECT  INJURIES_DIRECT  DAMAGE_PROPERTY_NUM  DAMAGE_CROPS_NUM STATE_ABBR CZ_TIMEZONE MAGNITUDE_TYPE  EPISODE_ID CZ_TYPE  CZ_FIPS  WFO  INJURIES_INDIRECT  DEATHS_INDIRECT           SOURCE FLOOD_CAUSE TOR_LENGTH TOR_WIDTH BEGIN_RANGE BEGIN_AZIMUTH END_RANGE END_AZIMUTH   END_LOCATION    END_DATE  END_TIME BEGIN_LAT BEGIN_LON END_LAT  END_LON                                     EVENT_NARRATIVE                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              

In [None]:
'''Merge feature and target data based on the timestamp'''
# Merge feature and target data based on the timestamp as event data
all_event_data = pd.merge(all_feature_data, target_data, how='right', on='timestamp')

# Merge feature and target data based on the timestamp as other data
all_other_data = pd.merge(all_feature_data, target_data, how='left', on='timestamp')
all_other_data['EVENT_TYPE'].fillna('no', inplace=True)

# concatenate two partial data into a whole dataset
all_data = pd.concat([all_event_data, all_other_data])

# drop some unuseful columns
all_data = all_data.drop(['timestamp','BEGIN_DATE', 'BEGIN_TIME', '#YY', 'MM', 'DD', 'hh', 'mm', 'year', 'month', 'day', 'hour', 'minute'], axis=1)
# Drop rows with any missing values
all_data.dropna(axis=0, inplace=True)
all_data.loc[all_data["EVENT_TYPE"] != "no", "EVENT_TYPE"] = 'yes'

print('missing data count in prepared data:')
missing_data_prepared = all_data.isnull().sum()
print(missing_data_prepared)

print(f'total count of prepared data: {len(all_data)}')

missing data count in prepared data:
WDIR          0
WSPD          0
GST           0
PRES          0
ATMP          0
EVENT_TYPE    0
dtype: int64
total count of prepared data: 1678318


In [None]:
all_data

Unnamed: 0,WDIR,WSPD,GST,PRES,ATMP,EVENT_TYPE
0,157.0,5.2,9.4,1017.2,13.2,yes
1,160.0,5.6,8.5,1018.0,13.3,yes
2,192.0,6.3,9.0,1020.9,13.9,yes
3,206.0,1.5,1.7,1004.6,14.2,yes
4,134.0,2.4,3.1,1003.3,15.8,yes
...,...,...,...,...,...,...
1677863,317.0,5.8,8.6,1002.8,10.9,no
1677864,320.0,7.1,8.8,1002.9,10.8,no
1677865,312.0,7.2,10.6,1003.0,10.7,no
1677866,317.0,7.9,11.1,1003.1,10.9,no


Identify features and target variable

In [None]:
# Identify features and target variable
X = all_data.drop(['EVENT_TYPE'], axis=1)
y = all_data['EVENT_TYPE']

yes_count = all_data['EVENT_TYPE'].value_counts().get('yes', 0)
no_count = all_data['EVENT_TYPE'].value_counts().get('no', 0)
print(yes_count)
print(no_count)

900
1677418


Split training and test dataset

In [None]:
# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
# Combine X_train and y_train into a single DataFrame for undersampling
train_data = pd.concat([X_train, y_train], axis=1)

# Identify the minority class label
minority_class_label = train_data['EVENT_TYPE'].value_counts().idxmin()

# Apply random undersampling
undersampler = RandomUnderSampler(sampling_strategy='auto', random_state=42)
X_resampled, y_resampled = undersampler.fit_resample(train_data.drop('EVENT_TYPE', axis=1), train_data['EVENT_TYPE'])
print(y_resampled.value_counts())

print(type(X_resampled))

df_train = pd.concat([X_resampled, y_resampled], axis = 1)
df_test = pd.concat([X_test, y_test], axis = 1)

no     720
yes    720
Name: EVENT_TYPE, dtype: int64
<class 'pandas.core.frame.DataFrame'>


In [None]:
# Building the SparkSession and name
# it :'pandas to spark'
spark = SparkSession.builder.appName("ML_App").getOrCreate()

# create DataFrame
df_train_spark = spark.createDataFrame(df_train)

df_test_spark = spark.createDataFrame(df_test)

In [None]:
# Create a feature vector by combining all the features
assembler = VectorAssembler(inputCols=["WDIR", "WSPD", "GST", "PRES", "ATMP"], outputCol="features")

# Transform the data to create the feature vector
train_data = assembler.transform(df_train_spark)
test_data = assembler.transform(df_test_spark)

label_stringIdx = StringIndexer(inputCol = 'EVENT_TYPE', outputCol = 'labelIndex')
train_data = label_stringIdx.fit(train_data).transform(train_data)
test_data = label_stringIdx.fit(test_data).transform(test_data)
train_data.show()

+-----+----+----+------+----+----------+--------------------+----------+
| WDIR|WSPD| GST|  PRES|ATMP|EVENT_TYPE|            features|labelIndex|
+-----+----+----+------+----+----------+--------------------+----------+
|144.0| 1.4| 3.8|1013.3|14.6|        no|[144.0,1.4,3.8,10...|       0.0|
|227.0| 0.8| 1.1|1017.6|10.8|        no|[227.0,0.8,1.1,10...|       0.0|
|215.0| 2.1| 3.4|1011.7|15.5|        no|[215.0,2.1,3.4,10...|       0.0|
|289.0| 1.9| 5.7|1018.6|11.4|        no|[289.0,1.9,5.7,10...|       0.0|
|325.0| 4.9| 6.6|1018.1|12.9|        no|[325.0,4.9,6.6,10...|       0.0|
|198.0| 1.5| 1.7|1021.2|10.8|        no|[198.0,1.5,1.7,10...|       0.0|
|349.0| 3.4| 4.5|1023.6|12.0|        no|[349.0,3.4,4.5,10...|       0.0|
| 29.0| 5.2| 6.3|1025.4|12.2|        no|[29.0,5.2,6.3,102...|       0.0|
|263.0| 1.9| 4.4|1017.8|14.6|        no|[263.0,1.9,4.4,10...|       0.0|
|279.0| 1.5| 2.9|1015.6|13.5|        no|[279.0,1.5,2.9,10...|       0.0|
|260.0| 3.4| 5.0|1013.5|18.8|        no|[260.0,3.4,

Random Forest Classifier

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'labelIndex', numTrees=40, maxDepth=3)
rf_model = rf.fit(train_data)
rf_predictions = rf_model.transform(test_data)
# rf_predictions.select('labelIndex', 'rawPrediction', 'prediction', 'probability').show(25)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

rf_evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction")
accuracy = rf_evaluator.evaluate(rf_predictions)
print(f'Accuracy = {accuracy:.4f}')
print(f'Test Error = {(1.0 - accuracy):.4f}')

Accuracy = 0.9099
Test Error = 0.0901


In [None]:
# Evaluate recall
recall_evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="weightedRecall")
recall = recall_evaluator.evaluate(rf_predictions)
print(f'Recall = {recall:.4f}')

# Evaluate precision
precision_evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="weightedPrecision")
precision = precision_evaluator.evaluate(rf_predictions)
print(f'Precision = {precision:.4f}')

# Evaluate F1 score
f1_evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="f1")
f1 = f1_evaluator.evaluate(rf_predictions)
print(f'F1 Score = {f1:.4f}')

# Evaluate area under ROC curve
auc_evaluator = BinaryClassificationEvaluator(labelCol="labelIndex", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = auc_evaluator.evaluate(rf_predictions)
print(f'AUC = {auc:.4f}')

Recall = 0.8356
Precision = 0.9993
F1 Score = 0.9099
AUC = 0.8262


Gradient Boosting Classifier

In [None]:
from pyspark.ml.classification import GBTClassifier

gb = GBTClassifier(featuresCol = 'features', labelCol = 'labelIndex', maxIter=50)
gb_model = gb.fit(train_data)
gb_predictions = gb_model.transform(test_data)

In [None]:
gb_evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction")
accuracy = gb_evaluator.evaluate(gb_predictions)
print(f'Accuracy = {accuracy:.4f}')
print(f'Test Error = {(1.0 - accuracy):.4f}')

Accuracy = 0.8911
Test Error = 0.1089


In [None]:
# Evaluate recall
recall_evaluator_gb = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="weightedRecall")
recall_gb = recall_evaluator_gb.evaluate(gb_predictions)
print(f'Recall = {recall_gb:.4f}')

# Evaluate precision
precision_evaluator_gb = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="weightedPrecision")
precision_gb = precision_evaluator_gb.evaluate(gb_predictions)
print(f'Precision = {precision_gb:.4f}')

# Evaluate F1 score
f1_evaluator_gb = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="f1")
f1_gb = f1_evaluator_gb.evaluate(gb_predictions)
print(f'F1 Score = {f1_gb:.4f}')

# Evaluate area under ROC curve
auc_evaluator_gb = BinaryClassificationEvaluator(labelCol="labelIndex", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc_gb = auc_evaluator_gb.evaluate(gb_predictions)
print(f'AUC = {auc_gb:.4f}')


Recall = 0.8044
Precision = 0.9994
F1 Score = 0.8911
AUC = 0.9127


XGBoost

In [None]:
from xgboost.spark import SparkXGBClassifier

xgb = SparkXGBClassifier(label_col="labelIndex", missing=0.0)
xgb_model = xgb.fit(train_data)
xgb_predictions = xgb_model.transform(test_data)

INFO:XGBoost-PySpark:Running xgboost-2.0.1 on 1 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': 0.0}
INFO:XGBoost-PySpark:Finished xgboost training!


In [None]:
xgb_evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction")
accuracy = xgb_evaluator.evaluate(xgb_predictions)
print(f'Accuracy = {accuracy:.4f}')
print(f'Test Error = {(1.0 - accuracy):.4f}')

Accuracy = 0.9060
Test Error = 0.0940


In [None]:
# Evaluate recall
recall_evaluator_xgb = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="weightedRecall")
recall_xgb = recall_evaluator_xgb.evaluate(xgb_predictions)
print(f'Recall = {recall_xgb:.4f}')

# Evaluate F1 score
f1_evaluator_xgb = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="f1")
f1_xgb = f1_evaluator_xgb.evaluate(xgb_predictions)
print(f'F1 Score = {f1_xgb:.4f}')

# Evaluate precision
precision_evaluator_xgb = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="weightedPrecision")
precision_xgb = precision_evaluator_xgb.evaluate(xgb_predictions)
print(f'Precision = {precision_xgb:.4f}')

# Evaluate area under ROC curve
auc_evaluator_xgb = BinaryClassificationEvaluator(labelCol="labelIndex", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc_xgb = auc_evaluator_xgb.evaluate(xgb_predictions)
print(f'AUC = {auc_xgb:.4f}')

Recall = 0.8290
F1 Score = 0.9060
Precision = 0.9994
AUC = 0.9390
