In [None]:
%load_ext autoreload
import numpy as np
import seaborn as sns
import os
import GallenModel as ClassificationModelsimple
import geopandas as gpd
import tensorflow as tf
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split

In [None]:
#prepare data
df=gpd.read_file('Data/NepalEqUSGSGallen.gpkg')

df = df[df.Slp_m>10.0]

In [None]:

train_df, test_df = train_test_split(df, test_size=0.30, random_state=42)

def df_to_dataset(dataframe, shuffle=True, batch_size=32):
  df = dataframe.copy()
  labels = df.pop('Landslide')
  df = {key: value.to_numpy()[:,tf.newaxis] for key, value in df.items()}
  ds = tf.data.Dataset.from_tensor_slices((dict(df), labels))
  if shuffle:
    ds = ds.shuffle(buffer_size=len(dataframe))
  ds = ds.batch(batch_size)
  ds = ds.prefetch(batch_size)
  return ds
exai_ds=df_to_dataset(train_df[['Est_m','Nrt_m','HC_m','VC_m','Slp_m','Prc_m','NDVI_m','PGA_Usgs','Sand_m','Silt_m','Clay_m','Bdod_m','GLG','Landslide']])
val_ds=df_to_dataset(test_df[['Est_m','Nrt_m','HC_m','VC_m','Slp_m','Prc_m','NDVI_m','PGA_Usgs','Sand_m','Silt_m','Clay_m','Bdod_m','GLG','Landslide']],shuffle=False)
[(train_features, label_batch)] = exai_ds.take(1)
print('Every feature:', list(train_features.keys()))
print('A batch of geology:', train_features['GLG'])
print('A batch of targets:', label_batch )

In [None]:
df.PGA_Usgs.min()

In [None]:
def get_category_encoding_layer(name, dataset, dtype, max_tokens=None):
  # Create a layer that turns strings into integer indices.
  if dtype == 'string':
    index = layers.StringLookup(max_tokens=max_tokens)
  # Otherwise, create a layer that turns integer values into integer indices.
  else:
    index = layers.IntegerLookup(max_tokens=max_tokens)

  # Prepare a `tf.data.Dataset` that only yields the feature.
  feature_ds = dataset.map(lambda x, y: x[name])

  # Learn the set of possible values and assign them a fixed integer index.
  index.adapt(feature_ds)

  # Encode the integer indices.
  encoder = layers.CategoryEncoding(num_tokens=index.vocabulary_size())

  # Apply multi-hot encoding to the indices. The lambda function captures the
  # layer, so you can use them, or include them in the Keras Functional model later.
  return lambda feature: encoder(index(feature))
def get_normalization_layer(name, dataset):
  # Create a Normalization layer for the feature.
  normalizer = layers.Normalization(axis=None)

  # Prepare a Dataset that only yields the feature.
  feature_ds = dataset.map(lambda x, y: x[name])

  # Learn the statistics of the data.
  normalizer.adapt(feature_ds)

  return normalizer

In [None]:
df[['Est_m','Nrt_m','HC_m','VC_m','Slp_m','Prc_m','NDVI_m','PGA_Usgs','Sand_m','Silt_m','Clay_m','Bdod_m','GLG','Landslide']].max()

In [None]:
all_inputs = []
encoded_features = []

# Numerical features.
numerical_cols=['Est_m', 'Nrt_m', 'HC_m', 'VC_m', 'Slp_m', 'Prc_m', 'NDVI_m', 'PGA_Usgs', 'Sand_m', 'Silt_m', 'Clay_m', 'Bdod_m']
for header in numerical_cols:
  numeric_col = tf.keras.Input(shape=(1,), name=header)
  normalization_layer = get_normalization_layer(header, exai_ds)
  encoded_numeric_col = normalization_layer(numeric_col)
  all_inputs.append(numeric_col)
  encoded_features.append(encoded_numeric_col)

categorical_cols = ['GLG']

for header in categorical_cols:
  categorical_col = tf.keras.Input(shape=(1,), name=header, dtype='string')
  encoding_layer = get_category_encoding_layer(name=header,
                                               dataset=exai_ds,
                                               dtype='string',
                                               max_tokens=9)
  encoded_categorical_col = encoding_layer(categorical_col)
  all_inputs.append(categorical_col)
  encoded_features.append(encoded_categorical_col)

In [None]:

clfmdl=ClassificationModelsimple.LandslideModel()
clfmdl.getclassificationModel(all_inputs=all_inputs, encoded_features=encoded_features)
clfmdl.getOptimizer()
clfmdl.compileModel()
clfmdl.model.load_weights('TrainedWeights/PINN_v3')

In [None]:
clfmdl.model.summary()

In [None]:
def trainmodel(model,train_ds,val_ds):
    
    NUMBER_EPOCHS = 100
    filepath='TrainedWeights/PINN_v3'
    BATCH_SIZE=128
    
    model_checkpoint_callback=tf.keras.callbacks.ModelCheckpoint(
        filepath,
        monitor="val_auc",
        verbose=0,
        save_best_only=True,
        save_weights_only=False,
        mode="max",
        save_freq="epoch",
        options=None
    )
    print(type(train_ds))
    hist = model.fit(train_ds,
                     epochs=NUMBER_EPOCHS,
                     batch_size=BATCH_SIZE,
                     validation_data=val_ds,
                    #  validation_split=0.2,#auto validate using 20% of random samples at each epoch
                     verbose=1, callbacks=[model_checkpoint_callback],class_weight = {0: 1, 1: 5}

                    )
    return hist


In [None]:
# trainmodel(clfmdl.model,exai_ds,val_ds)

In [None]:
model =  tf.keras.models.load_model("TrainedWeights/PINN_v3/")

In [None]:
y_test=test_df['Landslide'].to_numpy()
preds=model.predict(val_ds)

In [None]:
import sklearn.metrics
import matplotlib.pyplot as plt
import seaborn as sns
fpr,tpr,thresholds=sklearn.metrics.roc_curve(y_test, preds)


In [None]:
print(sklearn.metrics.auc(fpr,tpr))
print(sklearn.metrics.confusion_matrix(y_test,np.rint(preds)))

In [None]:
plt.figure(figsize=(4,4),dpi=300)
lw = 0.6
plt.plot(
    fpr,
    tpr,
    color="darkorange",
    lw=lw,
    label="ROC curve (area = %0.2f)" % sklearn.metrics.auc(fpr,tpr),
)
plt.plot([0, 1], [0, 1], color="navy", lw=lw, linestyle="--")
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.0])
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve Landslide Classification")
plt.text(0.61, 0.15,f"Accuracy={round(sklearn.metrics.balanced_accuracy_score(y_test, preds>0.5),2)}")
plt.legend(loc="lower right")
plt.axis('square')
plt.savefig('PINNPlots/rocrev1.pdf')
plt.show()


In [None]:
#confusion  map
all_data=df_to_dataset(df[['Est_m','Nrt_m','HC_m','VC_m','Slp_m','Prc_m','NDVI_m','PGA_Usgs','Sand_m','Silt_m','Clay_m','Bdod_m','GLG','Landslide']],shuffle=False)
preds2=model.predict(all_data)
Ydata=df['Landslide'].to_numpy()
confusiondata=np.empty(Ydata.shape,dtype=object)
confusiondata[np.bitwise_and(Ydata==1,np.rint(preds2.flatten())==1)]='True Positive'
confusiondata[np.bitwise_and(Ydata==0,np.rint(preds2.flatten())==1)]='False Positive'
confusiondata[np.bitwise_and(Ydata==1,np.rint(preds2.flatten())==0)]='False Negative'
confusiondata[np.bitwise_and(Ydata==0,np.rint(preds2.flatten())==0)]='True Negative'

In [None]:
df['confusion']=confusiondata

In [None]:
preds2

In [None]:
import contextily as cx
df_wm = df.to_crs(epsg=3857)
ax=df_wm.plot(column='confusion',legend=True,figsize=(10, 10), alpha=0.7)
cx.add_basemap(ax,source='NASAGIBS.ASTER_GDEM_Greyscale_Shaded_Relief')

In [None]:
ax.get_figure().savefig('PINNPlots/confusionmaprev1.pdf',facecolor=ax.get_facecolor())

In [None]:
#confusion  map
all_data=df_to_dataset(df[['Est_m','Nrt_m','HC_m','VC_m','Slp_m','Prc_m','NDVI_m','PGA_Usgs','Sand_m','Silt_m','Clay_m','Bdod_m','GLG','Landslide']],shuffle=False)


In [None]:
import contextily as cx
df_wm = df.to_crs(epsg=3857)
geo_Tech = tf.keras.Model(inputs=model.input, outputs=model.get_layer("cohesion").output)*5 # apply linear scaling to compensate for the 5x scaling on loss function
geotech_preds=geo_Tech.predict(all_data)
df_wm['cohesion'] = geotech_preds#*1000
ax=df_wm.plot(column='cohesion',legend=True,figsize=(10, 10), alpha=0.7)#, vmin=1, vmax=3)
cx.add_basemap(ax,source='NASAGIBS.ASTER_GDEM_Greyscale_Shaded_Relief')

In [None]:
df_wm['cohesion'].describe()

In [None]:
import contextily as cx
# df_wm = df.to_crs(epsg=3857)
geo_Tech = tf.keras.Model(inputs=model.input, outputs=model.get_layer("internalFriction").output)
geotech_preds=geo_Tech.predict(all_data)
df_wm['internalFriction'] = np.rad2deg(geotech_preds)
ax=df_wm.plot(column='internalFriction',legend=True,figsize=(10, 10), alpha=0.7)
cx.add_basemap(ax,source='NASAGIBS.ASTER_GDEM_Greyscale_Shaded_Relief')

In [None]:
df_wm['internalFriction'].describe()

In [None]:
import math
import tensorflow as tf
df_wm['cohesion'] = df_wm['cohesion']*1000

safety_factor = (df_wm.cohesion.to_numpy()*(1/(2300*9.81*np.sin(np.deg2rad(df_wm.Slp_m.to_numpy()))))) + ( np.tan(df_wm.internalFriction.to_numpy())/np.tan(np.deg2rad(df_wm.Slp_m.to_numpy())))

# safety_factor = tf.nn.relu(safety_factor)
safety_factor = tf.clip_by_value(safety_factor, 1.2, 15.0).numpy()

ac = ((safety_factor-1)*9.81*np.sin(np.deg2rad(df_wm.Slp_m)))

acpg=ac/(df_wm.PGA_Usgs.to_numpy()*10)

acpg = tf.clip_by_value(acpg, 0.001, 0.999).numpy()

powcomp = np.power((1-acpg),2.53)*np.power(acpg,-1.438)
logds = 0.251+np.log(powcomp)+0.5

displacement = np.exp(logds)


In [None]:
df_wm['ac'] = ac/9.8
df_wm['sf'] = safety_factor
df_wm['displacement'] = displacement
df_wm['acpg'] = acpg

In [None]:
df_wm['ac'].describe()

In [None]:
df_wm['aci'] = 1/df_wm['ac']

In [None]:
df_wm['sf'].describe()

In [None]:
df_wm['displacement'].describe()

In [None]:
df_wm['susceptibility']=preds2.flatten()

In [None]:
from matplotlib_scalebar.scalebar import ScaleBar
fig,axs = plt.subplots(1,2,dpi=500, figsize=(10,5))

df_wm = df_wm[df_wm['Slp_m']>10]

df_wm.plot(column='ac',cmap='viridis', legend=True, alpha=0.7,  ax=axs[0],vmin=0.0,vmax=0.50, legend_kwds={"label": "Critical Accelerations ($m/s^2$)", "orientation": "horizontal"},)
cx.add_basemap(axs[0],source='NASAGIBS.ASTER_GDEM_Greyscale_Shaded_Relief')
axs[0].set_axis_off()
axs[0].add_artist(ScaleBar(1))

df_wm.plot(column='susceptibility',cmap='RdYlGn_r',legend=True, alpha=0.7,  ax=axs[1], vmin=0.0, vmax = 1.0, legend_kwds={"label": "Susceptibility", "orientation": "horizontal"},)
cx.add_basemap(axs[1],source='NASAGIBS.ASTER_GDEM_Greyscale_Shaded_Relief')
axs[1].set_axis_off()

plt.savefig("PINNPlotsGallen/Products.pdf")
plt.show()

In [None]:
df_wm.columns

In [None]:
df_wm = df_wm[df_wm['Slp_m']>10]
df_wm=df_wm.to_crs(epsg=4326)
ax= df_wm.plot(column='Landslide', legend=True,categorical=True, alpha=0.7,vmin=0.0,vmax=1,)
cx.add_basemap(ax,source='NASAGIBS.ASTER_GDEM_Greyscale_Shaded_Relief')
# ax.set_axis_off()
ax.add_artist(ScaleBar(1))
plt.savefig("PINNPlotsGallen/StudyArea.pdf")