#### Importing the neccessary libraries

In [1]:
pip install folium keras-tuner

In [2]:
from pyspark.sql.functions import col, array
import folium
from folium.map import Marker
from folium.plugins import HeatMap
from functools import reduce
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import seaborn as sns
from pyspark.sql.functions import udf, mean, lit
from pyspark.sql.types import DoubleType
from geopy.distance import geodesic
from pyspark.sql import Window
import math
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor, DecisionTreeRegressor, DecisionTreeRegressionModel
from pyspark.ml.evaluation import RegressionEvaluator
from sklearn.metrics import mean_squared_error, r2_score
from pyspark.ml.feature import StandardScaler
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
import keras_tuner as kt
from pyspark.sql.functions import to_timestamp, concat_ws, date_trunc, count
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.api import ExponentialSmoothing, SimpleExpSmoothing, Holt
from statsmodels.tsa.holtwinters import HoltWintersResults
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
    .appName("Optimized Spark Session") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.cores", "4") \
    .config("spark.driver.cores", "4") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "1") \
    .config("spark.dynamicAllocation.maxExecutors", "10") \
    .config("spark.sql.shuffle.partitions", "100") \
    .getOrCreate()

In [4]:
# Loading crime data
df = (spark.read
      .option("multiline", "true")
      .option("quote", '"')
      .option("header", "true")
      .option("escape", "\\")
      .option("escape", '"')
      .csv('abfss://raw@crimetsoppersnurture.dfs.core.windows.net/mercury-reports.csv'))

In [5]:
# Load infrastructure data
infra_path = "abfss://raw@crimetsoppersnurture.dfs.core.windows.net/National Infrastructure.csv"
infrastructure_df = spark.read.csv(infra_path, header = True, inferSchema = True)

In [6]:
# Load CS_report data for time trends
cs_report = "abfss://raw@crimetsoppersnurture.dfs.core.windows.net/cs-reports-20170627-20240627.csv"
cs_report = spark.read.csv(cs_report, header = True, inferSchema = True)

In [7]:
# Sample 20,000 random records from the file
cs_report = cs_report.sample(withReplacement=False, fraction=20000/cs_report.count(), seed=42)

In [8]:
# Loading the CS_report dataset
cs_report.show(truncate=False)

In [9]:
display(cs_report)

In [10]:
# Loading the crime report dataset
df.show(truncate=False)

In [11]:
display(df)

In [12]:
# Loading the infrastructure dataset
infrastructure_df.show(truncate=False)

In [13]:
display(infrastructure_df)

In [14]:
df.count()

In [15]:
infrastructure_df.count()

In [16]:
cs_report.count()

In [17]:
summary_df = df.describe()
display(summary_df)

In [18]:
summary_infrastructure_df = infrastructure_df.describe()
summary_infrastructure_df.show()

In [19]:
summary_cs_report = cs_report.describe()
display(summary_cs_report)

In [20]:
cs_report.printSchema()

In [21]:
df.printSchema()

In [22]:
infrastructure_df.printSchema()

### Data Cleaning and Preparation

#### Checking for missing values in the dataset

In [23]:
# Listing columns
col_ = df.columns

# Creating a condition to check for null values
null_cond = reduce(lambda acc, col_name: acc | col(col_name).isNull(), col_, col(col_[0]).isNull())

# Filtering the df to get rows with any null
df_null = df.filter(null_cond)

In [24]:
print(df_null.count())
df_null.show(truncate=False)

In [25]:
# Removing rows with null values in location columns with null values
df = df.dropna(subset=["Location (Latitude)", "Location (Longitude)"])
df.count()

In [26]:
infrastructure_df = infrastructure_df.dropna()
infrastructure_df.count()

#### Converting latitude and longitude to float

In [27]:
df = df.withColumn("latitude", col("Location (Latitude)").cast("float"))
df = df.withColumn("longitude", col("Location (Longitude)").cast("float"))

df.select("latitude", "longitude").printSchema()

In [28]:
infrastructure_df = infrastructure_df.withColumn("latitude_x", col("Latitude").cast("float"))
infrastructure_df = infrastructure_df.withColumn("longitude_x", col("Longitude").cast("float"))

infrastructure_df.select("latitude_x", "longitude_x").printSchema()

In [29]:
cs_report = cs_report.withColumn("latitude", col("Location (Latitude)").cast("float"))
cs_report = cs_report.withColumn("longitude", col("Location (Longitude)").cast("float"))

cs_report.select("latitude", "longitude").printSchema()

#### Crime Time Trends Near Infrastructure

In [30]:
# Define the distance calculation function
def cal_distance(lat1, lon1, lat2, lon2):
    return geodesic((lat1, lon1), (lat2, lon2)).kilometers

In [31]:
# Register the distance calculation function as a UDF
cal_distance_udf = udf(cal_distance, DoubleType())

In [32]:
# Perform cross join to calculate distances between offenses and infrastructures
joined_df = cs_report.crossJoin(infrastructure_df)

In [33]:
# Calculate distances using the UDF
distance_df = joined_df.withColumn('distance_km', cal_distance_udf(cs_report['latitude'], cs_report['longitude'],
                                                                  infrastructure_df['latitude_x'], infrastructure_df['longitude_x']))

In [34]:
# Filter offenses based on proximity to any infrastructure
filtered_df = distance_df.filter(distance_df['distance_km'] <= 1.0)

In [35]:
cs_report_ = filtered_df.select(cs_report['Disseminated date'], cs_report['Disseminated time'],
                                       cs_report['latitude'], cs_report['longitude'], cs_report['Offences'],
                                       infrastructure_df['OrganisationName'], infrastructure_df['latitude_x'], infrastructure_df['longitude_x'],
                                       distance_df['distance_km'])

In [36]:
# Combine the date and time columns into a single timestamp column
cs_report_ = cs_report_.withColumn(
    "DateTimeString", concat_ws(" ", cs_report_["Disseminated date"], cs_report_["Disseminated time"])
)

# Convert the concatenated string to a timestamp
cs_report_ = cs_report_.withColumn("DateTime", to_timestamp(cs_report_["DateTimeString"], "dd/MM/yyyy HH:mm"))

In [37]:
# Drop the intermediate string column
cs_report_ = cs_report_.drop("DateTimeString")

In [38]:
# Aggregate by Month
monthly_offences = cs_report_.groupBy(date_trunc("month", "DateTime").alias("Month")).agg(count("*").alias("TotalOffences"))

In [39]:
# Convert to Pandas DataFrame for time series analysis
monthly_offences_pd = monthly_offences.toPandas()
monthly_offences_pd.set_index("Month", inplace=True)
monthly_offences_pd.sort_index(inplace=True)

In [40]:
# Plot the time series
monthly_offences_pd.plot(title='Monthly Offences')
plt.ylabel('Number of Offences')
plt.show()

In [41]:
# Simple Moving Average
monthly_offences_pd['SMA_3'] = monthly_offences_pd['TotalOffences'].rolling(window=3).mean()
monthly_offences_pd['SMA_3'].plot()
plt.title('Simple Moving Average (3)')
plt.show()

In [42]:
monthly_offences_pd.dropna(inplace=True)

In [43]:
monthly_offences_pd = monthly_offences_pd[pd.notnull(monthly_offences_pd.index)]

In [44]:
monthly_offences_pd['TotalOffences']

In [45]:
# Decompose the time series to identify trends and seasonality
decomposition = seasonal_decompose(monthly_offences_pd['TotalOffences'], model='additive', period=12)

In [46]:
# Plot the decomposition components separately
plt.figure(figsize=(12, 8))

# Plot observed data
plt.subplot(411)
plt.plot(monthly_offences_pd.index, monthly_offences_pd['TotalOffences'], label='Observed')
plt.legend()

# Plot trend component
plt.subplot(412)
plt.plot(monthly_offences_pd.index, decomposition.trend, label='Trend')
plt.legend()

# Plot seasonal component
plt.subplot(413)
plt.plot(monthly_offences_pd.index, decomposition.seasonal, label='Seasonal')
plt.legend()

# Plot residual component
plt.subplot(414)
plt.plot(monthly_offences_pd.index, decomposition.resid, label='Residual')
plt.legend()

plt.tight_layout()
plt.show()

##### Holt-Winters Model

In [47]:
# Fit Holt-Winters model
hw_model = ExponentialSmoothing(monthly_offences_pd['TotalOffences'], trend='add', seasonal='add', seasonal_periods=12)
hw_model_fit = hw_model.fit()

In [48]:
# Forecast next 72 months
forecast = hw_model_fit.forecast(steps=72)
forecast_index = pd.date_range(start=monthly_offences_pd.index[-1], periods=72, freq='MS')
forecast_series = pd.Series(forecast, index=forecast_index)

In [49]:
# Plot the forecast
plt.figure(figsize=(12, 6))
plt.plot(monthly_offences_pd['TotalOffences'], label='Historical')
plt.plot(forecast_series, label='Forecast', color='red')
plt.title('Holt-Winters Forecast for Next 6 years')
plt.xlabel('Date')
plt.ylabel('Total Offences')
plt.legend()
plt.grid(True)
plt.show()

##### ACF and PACF Plot

In [50]:
plot_acf(monthly_offences_pd['TotalOffences'].dropna(), lags=20)
plt.show()

In [51]:
plot_pacf(monthly_offences_pd['TotalOffences'].dropna(), lags=20)
plt.show()

##### Visualization

In [52]:
# Converting df to pandas
df = df.toPandas()
infrastructure_df = infrastructure_df.toPandas()

In [53]:
# Scatter plot of the crime distribution
plt.figure(figsize=(10, 8))
plt.scatter(df['longitude'], df['latitude'], alpha=0.5, s=10, c='red')
plt.title("Geographical Distribution of Crimes")
plt.xlabel("Longitude")
plt.ylabel("Latitude")
plt.show()

In [54]:
# Scatter plot of the key infrastructure distribution
plt.figure(figsize=(10, 8))
plt.scatter(infrastructure_df['longitude_x'], infrastructure_df['latitude_x'], alpha=0.5, s=10, c='blue')
plt.title("Geographical Distribution of the Key Infrastructure")
plt.xlabel("Longitude")
plt.ylabel("Latitude")
plt.show()

In [55]:
# Creating a map centered around the mean latitude and longitude of the crime
map_center = [df['latitude'].mean(), df['longitude'].mean()]
crime_map = folium.Map(location = map_center, zoom_start = 10)

In [56]:
# Adding heatmap to the map
heat_data = [[row['latitude'], row['longitude']] for index, row in df.iterrows()]
HeatMap(heat_data).add_to(crime_map)

In [57]:
crime_map

In [58]:
# Creating a map centered around the mean latitude and longitude of infrastructure
map_center = [infrastructure_df['latitude_x'].mean(), infrastructure_df['longitude_x'].mean()]
infrastructure_map = folium.Map(location = map_center, zoom_start = 10)

In [59]:
# Adding heatmap to the map
heat_data = [[row['latitude_x'], row['longitude_x']] for index, row in infrastructure_df.iterrows()]
HeatMap(heat_data).add_to(infrastructure_map)

In [60]:
infrastructure_map

In [61]:
# Adding points to the map with cross marker
for idx, row in df.iterrows():
    icon = folium.DivIcon(
        html='<div style="font-size: 18px; color: blue;">o</div>',
        icon_size=(18, 18),
        icon_anchor=(6, 6)
    )
    tooltip_info = row['Offences']
    Marker([row['latitude'], row['longitude']],
           icon=icon,
           tooltip=tooltip_info
          ).add_to(crime_map)

In [62]:
crime_map

In [63]:
# Adding points to the map with cross marker
for idx, row in infrastructure_df.iterrows():
    icon = folium.DivIcon(
        html='<div style="font-size: 18px; color: red;">+</div>',
        icon_size=(18, 18),
        icon_anchor=(6, 6)
    )
    tooltip_info = row['OrganisationName']
    Marker([row['latitude_x'], row['longitude_x']],
           icon=icon,
           tooltip=tooltip_info
          ).add_to(infrastructure_map)

In [64]:
infrastructure_map

In [65]:
# Creating a map centered around the mean latitude and longitude of all data points
map_center = [
    (df['latitude'].mean() + infrastructure_df['latitude_x'].mean()) / 2,
    (df['longitude'].mean() + infrastructure_df['longitude_x'].mean()) / 2
]
combined_map = folium.Map(location=map_center, zoom_start=10)

In [66]:
# Adding crime data with "o" markers
for idx, row in df.iterrows():
    icon = folium.DivIcon(
        html='<div style="font-size: 18px; color: blue;">o</div>',
        icon_size=(18, 18),
        icon_anchor=(6, 6)
    )
    folium.Marker(
        location=[row['latitude'], row['longitude']],
        icon=icon,
        tooltip=f"Crime Location: {row['Offences']} ({row['latitude']}, {row['longitude']})"
    ).add_to(combined_map)

In [67]:
# Adding infrastructure data with "+" markers
for idx, row in infrastructure_df.iterrows():
    icon = folium.DivIcon(
        html='<div style="font-size: 18px; color: red;">+</div>',
        icon_size=(18, 18),
        icon_anchor=(6, 6)
    )
    folium.Marker(
        location=[row['latitude_x'], row['longitude_x']],
        icon=icon,
        tooltip=f"Infrastructure: {row['OrganisationName']} ({row['latitude_x']}, {row['longitude_x']})"
    ).add_to(combined_map)

In [68]:
combined_map

In [69]:
# Creating a pivot table to count offenses by Contact Type and Method
contact_counts = df.pivot_table(index='Contact Type', columns='Contact Method', values='Offences', aggfunc='count', fill_value=0)


plt.figure(figsize=(10, 6))
sns.heatmap(contact_counts, cmap='viridis', annot=True, fmt='g')
plt.title("Analysis of Crime Reports by Contact Type and Method")
plt.xlabel("Contact Method")
plt.ylabel("Contact Type")
plt.xticks(rotation=45)
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()

### Spatial Analysis and Feature Engineering

In [70]:
# Converting Pandas DataFrame back to PySpark DataFrame
df = spark.createDataFrame(df)
infrastructure_df = spark.createDataFrame(infrastructure_df)

In [71]:
# Cross joining crime data with infrastructure data
df = df.crossJoin(infrastructure_df.select("latitude_x", "longitude_x", "OrganisationName"))

In [72]:
# Calculate distance to each infrastructure point
df = df.withColumn("distance", cal_distance_udf(df["latitude"], df["longitude"], 
                                                        infrastructure_df["latitude_x"], infrastructure_df["longitude_x"]))

In [73]:
df.show(truncate = False)

In [74]:
# Filter to find the nearest infrastructure
windowSpec = Window.partitionBy("ISR").orderBy("distance")
df_near = df.withColumn("row_number", F.row_number().over(windowSpec)).filter(F.col("row_number") == 1).drop("row_number")

In [75]:
df.show(truncate = False)

In [76]:
# Collect data for plotting
crime_points = df_near.select("latitude", "longitude", "latitude_x", "longitude_x", "distance", "OrganisationName", "Offences").collect()

In [77]:
# Creating a map centered around the mean latitude and longitude of all data points
map_center_1 = [
    (df_near.agg(mean("latitude")).collect()[0][0] + df_near.agg(mean("latitude_x")).collect()[0][0]) / 2,
    (df_near.agg(mean("longitude")).collect()[0][0] + df_near.agg(mean("longitude_x")).collect()[0][0]) / 2
]

In [78]:
combined_map_1 = folium.Map(location=map_center_1, zoom_start=10)

In [79]:
# Adding crime data with "o" markers
for row in crime_points:
    icon = folium.DivIcon(
        html='<div style="font-size: 14px; color: blue;">o</div>',
        icon_size=(14, 14),
        icon_anchor=(6, 6)
    )
    folium.Marker(
        location=[row['latitude'], row['longitude']],
        icon=icon,
        tooltip=f"Crime Location: {row['Offences']} ({row['latitude']}, {row['longitude']})"
    ).add_to(combined_map_1)

In [80]:
# Adding infrastructure data with "+" markers
for row in crime_points:
    icon = folium.DivIcon(
        html='<div style="font-size: 14px; color: red;">+</div>',
        icon_size=(14, 14),
        icon_anchor=(6, 6)
    )
    folium.Marker(
        location=[row['latitude_x'], row['longitude_x']],
        icon=icon,
        tooltip=f"Infrastructure: {row['OrganisationName']} ({row['latitude_x']}, {row['longitude_x']})"
    ).add_to(combined_map_1)

In [81]:
# Drawing lines and adding distance labels
for row in crime_points:
    folium.PolyLine(
        locations=[[row['latitude'], row['longitude']], [row['latitude_x'], row['longitude_x']]],
        color='black'
    ).add_to(combined_map_1)
    mid_lat = (row['latitude'] + row['latitude_x']) / 2
    mid_lon = (row['longitude'] + row['longitude_x']) / 2
    folium.Marker(
        location=[mid_lat, mid_lon],
        icon=folium.DivIcon(html=f'<div style="font-size: 12px; color: black;">{row["distance"]:.2f} km</div>'),
    ).add_to(combined_map_1)

In [82]:
combined_map_1

##### Correlation Matrix

In [83]:
# Convert df_near to Pandas DataFrame for correlation analysis
df_near_pd = df_near.select("latitude", "longitude", "distance").toPandas()

# Compute correlation matrix
corr_matrix = df_near_pd.corr()

# Plot the correlation matrix
plt.figure(figsize=(10, 8))
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', fmt='.2f')
plt.title('Correlation Matrix')
plt.show()

In [84]:
# Print the correlation matrix
print(corr_matrix)

In [85]:
# Count the number of crimes within each distance for each infrastructure
agg_df = df_near.groupBy("OrganisationName", "distance").agg(F.count("*").alias("crime_count"))

# Sum the crime_count for each infrastructure
final_agg_df = agg_df.groupBy("OrganisationName").agg(F.sum("crime_count").alias("total_crime_count"))

In [86]:
final_agg_df.show()

In [87]:
final_df = df.join(final_agg_df, on="OrganisationName", how="left").dropna()

In [88]:
# Define the feature columns
feature_columns = ["latitude", "longitude", "distance"]

# Assemble the features
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
final_df = assembler.transform(final_df)

### Predictive Modelling

In [89]:
# Split the data into training and testing sets
train_df, test_df = final_df.randomSplit([0.8, 0.2], seed=42)

##### Scaling

In [90]:
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

In [91]:
# Fit and transform the training data
scaler_model = scaler.fit(train_df)
train_df = scaler_model.transform(train_df)

In [92]:
# Transform the testing data
test_df = scaler_model.transform(test_df)

#### Random Forest

In [93]:
# Train Random Forest model
rf = RandomForestRegressor(featuresCol="features", labelCol="total_crime_count")
rf_model = rf.fit(train_df)

In [94]:
# Make predictions
predictions = rf_model.transform(test_df)

In [95]:
# Evaluate model
evaluator = RegressionEvaluator(labelCol="total_crime_count", predictionCol="prediction", metricName="rmse")
rf_rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rf_rmse}")

In [96]:
# Convert the PySpark DataFrame to a Pandas DataFrame for visualization
predictions_pd = predictions.select("total_crime_count", "prediction").toPandas()

# Compute residuals
predictions_pd['residuals'] = predictions_pd['total_crime_count'] - predictions_pd['prediction']

In [97]:
# Plot Residuals
plt.figure(figsize=(10, 6))
sns.scatterplot(x=predictions_pd['prediction'], y=predictions_pd['residuals'])
plt.axhline(0, color='red', linestyle='--')
plt.xlabel('Predicted Values')
plt.ylabel('Residuals')
plt.title('Residual Plot')
plt.show()

In [98]:
# Predicted vs. Actual Plot
plt.figure(figsize=(10, 6))
sns.scatterplot(x=predictions_pd['total_crime_count'], y=predictions_pd['prediction'])
plt.plot([predictions_pd['total_crime_count'].min(), predictions_pd['total_crime_count'].max()], 
         [predictions_pd['total_crime_count'].min(), predictions_pd['total_crime_count'].max()], 
         color='red', linestyle='--')
plt.xlabel('Actual Values')
plt.ylabel('Predicted Values')
plt.title('Predicted vs. Actual')
plt.show()

In [99]:
# Histogram of Residuals
plt.figure(figsize=(10, 6))
sns.histplot(predictions_pd['residuals'], kde=True)
plt.xlabel('Residuals')
plt.title('Distribution of Residuals')
plt.show()

#### Decision Tree

In [100]:
# Train Decision Tree model
dt = DecisionTreeRegressor(featuresCol="scaled_features", labelCol="total_crime_count")
dt_model = dt.fit(train_df)

In [101]:
# Make predictions
predictions = dt_model.transform(test_df)

In [102]:
# Evaluate model
evaluator = RegressionEvaluator(labelCol="total_crime_count", predictionCol="prediction", metricName="rmse")
dt_rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {dt_rmse}")

#### TensorFlow Model

In [103]:
# Convert Spark DataFrame to Pandas DataFrame
train_df_pd = train_df.select("features", "total_crime_count").toPandas()
test_df_pd = test_df.select("features", "total_crime_count").toPandas()

In [104]:
# Extract values from DenseVector and convert to NumPy arrays
train_features = train_df_pd['features'].apply(lambda x: x.toArray()).values.tolist()
train_labels = train_df_pd['total_crime_count'].values.tolist()
test_features = test_df_pd['features'].apply(lambda x: x.toArray()).values.tolist()
test_labels = test_df_pd['total_crime_count'].values.tolist()

In [105]:
# Convert Pandas DataFrame to TensorFlow Dataset
train_ds = tf.data.Dataset.from_tensor_slices((train_features, train_labels)).cache()
test_ds = tf.data.Dataset.from_tensor_slices((test_features, test_labels)).cache()

In [106]:
# Define the TensorFlow model
def create_model():
    model = Sequential([
        Dense(10, activation='relu', input_shape=(len(feature_columns),)),
        Dense(5, activation='relu'),
        Dense(1)
    ])
    model.compile(optimizer=Adam(), loss='mean_squared_error', metrics=[root_mean_squared_error])
    return model

In [107]:
# Define RMSE metric
def root_mean_squared_error(y_true, y_pred):
    return tf.sqrt(tf.reduce_mean(tf.square(y_pred - y_true)))

In [108]:
# Train the model using the TensorFlow Dataset
model = create_model()

In [109]:
model.summary()

In [110]:
# Train the model with checkpoints and early stopping
history = model.fit(
    train_ds.batch(80),
    epochs=10,
    validation_data=test_ds.batch(80)
)

In [111]:
# Plot training history
def plot_history(history):
    plt.figure(figsize=(12, 6))

    # Plot loss
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()

    # Plot RMSE
    plt.subplot(1, 2, 2)
    plt.plot(history.history['root_mean_squared_error'], label='Training RMSE')
    plt.plot(history.history['val_root_mean_squared_error'], label='Validation RMSE')
    plt.xlabel('Epoch')
    plt.ylabel('RMSE')
    plt.title('Training and Validation RMSE')
    plt.legend()

    plt.tight_layout()
    plt.show()

In [112]:
plot_history(history)

In [113]:
# Evaluate the model
loss, rmse = model.evaluate(test_ds.batch(80))
print(f"Root Mean Square Error (RMSE) on test data: {rmse}")

##### Hyperparameter tuning

In [None]:
# Define the model-building function
def build_model(hp):
    model = Sequential()
    model.add(Dense(units=hp.Int('units_layer1', min_value=8, max_value=64, step=8),
                    activation='relu', input_shape=(len(feature_columns),)))
    model.add(Dense(units=hp.Int('units_layer2', min_value=4, max_value=32, step=4),
                    activation='relu'))
    model.add(Dense(1))
    model.compile(optimizer=Adam(hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])),
                  loss='mean_squared_error',
                  metrics=[root_mean_squared_error])
    return model

In [None]:
# Initialize the tuner with explicit Objective definition
tuner = kt.RandomSearch(
    build_model,
    objective=kt.Objective('val_root_mean_squared_error', direction='min'),
    max_trials=1,
    executions_per_trial=1,
    directory='my_dir',
    project_name='intro_to_kt'
)

In [116]:
# Perform the search
tuner.search(train_ds.batch(80),
             epochs=10,
             validation_data=test_ds.batch(80))

# Get the optimal hyperparameters
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

print(f"""
The optimal number of units in the first dense layer is {best_hps.get('units_layer1')} and the second dense layer is {best_hps.get('units_layer2')}.
The optimal learning rate for the optimizer is {best_hps.get('learning_rate')}.
""")

In [117]:
# Build the model with the optimal hyperparameters and train it
model = tuner.hypermodel.build(best_hps)
history = model.fit(train_ds.batch(80),
                    epochs=10,
                    validation_data=test_ds.batch(80))

In [119]:
# Plot training history
def plot_history(history):
    plt.figure(figsize=(12, 6))

    # Plot loss
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()

    # Plot RMSE
    plt.subplot(1, 2, 2)
    plt.plot(history.history['root_mean_squared_error'], label='Training RMSE')
    plt.plot(history.history['val_root_mean_squared_error'], label='Validation RMSE')
    plt.xlabel('Epoch')
    plt.ylabel('RMSE')
    plt.title('Training and Validation RMSE')
    plt.legend()

    plt.tight_layout()
    plt.show()

plot_history(history)

In [120]:
# Evaluate the loaded model
loss, rmse = model.evaluate(test_ds.batch(80))
print(f"Root Mean Square Error (RMSE) on test data: {rmse}")

In [145]:
# Evaluate the loaded model and make predictions
predictions = model.predict(test_ds.batch(80))

In [149]:
# Extract true values from the dataset
true_values = []

# Iterate through the dataset and collect true values
for x, y in test_ds:
    # Convert scalar tensors to arrays
    true_values.append(np.array(y))

# Convert the list to a NumPy array
true_values = np.array(true_values)

In [150]:
# Convert predictions and true values to Pandas DataFrame for visualization
predictions_df = pd.DataFrame({
    'True Values': true_values.flatten(),
    'Predicted Values': predictions.flatten()
})
predictions_df['Residuals'] = predictions_df['True Values'] - predictions_df['Predicted Values']

In [151]:
# Plot: Predicted vs. Actual Values
plt.figure(figsize=(10, 6))
sns.scatterplot(x=predictions_df['True Values'], y=predictions_df['Predicted Values'])
plt.plot([predictions_df['True Values'].min(), predictions_df['True Values'].max()], 
         [predictions_df['True Values'].min(), predictions_df['True Values'].max()], 
         color='red', linestyle='--')
plt.xlabel('True Values')
plt.ylabel('Predicted Values')
plt.title('Predicted vs. Actual Values')
plt.show()

In [152]:
# Plot: Residuals
plt.figure(figsize=(10, 6))
sns.scatterplot(x=predictions_df['Predicted Values'], y=predictions_df['Residuals'])
plt.axhline(0, color='red', linestyle='--')
plt.xlabel('Predicted Values')
plt.ylabel('Residuals')
plt.title('Residual Plot')
plt.show()

In [153]:
# Plot: Histogram of Residuals
plt.figure(figsize=(10, 6))
sns.histplot(predictions_df['Residuals'], bins=30, kde=True)
plt.xlabel('Residuals')
plt.ylabel('Frequency')
plt.title('Histogram of Residuals')
plt.show()

In [154]:
# Print the RMSE
print(f"Root Mean Square Error (RMSE) on test data: {rmse}")

In [133]:
# Save the model with overwrite option
model.save('my_model.h5', overwrite=True)

##### Predicting with the model for DT and RF

In [134]:
# Saving the model using Spark's save method
model_path = "dt_model_spark"
dt_model.write().overwrite().save(model_path)

In [135]:
# Loading the model using Spark's load method
dt_model_loaded = DecisionTreeRegressionModel.load(model_path)

In [136]:
# Define a function to prepare sample data
def prepare_sample_data():
    samp_data = {
        'latitude': [50.540740002664144],
        'longitude': [-2.04143996166132524],
        'distance': [5.46881194025045]
    }
    sample_ = pd.DataFrame(samp_data)
    return sample_

# Prepare the sample data
sample_ = prepare_sample_data()


In [137]:
# Convert the sample data to the format expected by the model
sample_spark_df = spark.createDataFrame(sample_)

In [138]:
# Assemble the features into a single vector
assembler = VectorAssembler(inputCols=sample_spark_df.columns, outputCol="features")
sample_spark_df = assembler.transform(sample_spark_df)

In [139]:
# Scale the features using the previously fitted scaler model
sample_spark_df = scaler_model.transform(sample_spark_df)

# Select only the scaled_features column
sample_features = sample_spark_df.select("scaled_features")

In [140]:
# Predict function
def predict_crime(model, input_data):
    # Model expects a DataFrame
    prediction = model.transform(input_data).collect()[0]['prediction']
    return prediction

prediction = predict_crime(dt_model_loaded, sample_features)

In [141]:
# Print the prediction
print(f'Predicted Crime Count Near Infrastructure: {prediction:.0f}')

if prediction >= 10:
    print("**Warning: High potential threat to infrastructure!**")

##### Predicting with the model for TensorFlow Model

In [142]:
# Define a function to prepare sample data
def prepare_sample_data():
    samp_data = {
        'latitude': [51.37999725341797],
        'longitude': [-0.406042069196701],
        'distance': [20.46881194025045]
    }
    sample_ = pd.DataFrame(samp_data)
    return sample_

sample_ = prepare_sample_data()

In [143]:
# Convert the sample data to the format expected by the model
sample_spark_df = spark.createDataFrame(sample_)

# Assemble the features into a single vector
assembler = VectorAssembler(inputCols=sample_spark_df.columns, outputCol="features")
sample_spark_df = assembler.transform(sample_spark_df)

# Scale the features using the previously fitted scaler model
sample_spark_df = scaler_model.transform(sample_spark_df)

# Select only the scaled_features column
sample_features = sample_spark_df.select("scaled_features").collect()

# Convert to a NumPy array
sample_features_np = np.array([row['scaled_features'] for row in sample_features])

# Load the TensorFlow model
loaded_model = tf.keras.models.load_model('my_model.h5', custom_objects={'root_mean_squared_error': root_mean_squared_error})

In [144]:
# Predict function for TensorFlow model
def predict_crime_tensorflow(model, input_data):
    prediction = model.predict(input_data)
    return prediction[0][0]

# Make predictions with the TensorFlow model
prediction = predict_crime_tensorflow(loaded_model, sample_features_np)

# Print the prediction
print(f'Predicted Crime Count Near Infrastructure: {prediction:.0f}')

if prediction >= 10:
    print("**Warning: High potential threat to infrastructure!**")