# **Spatial Visualization**

## Constants

In [None]:
# Google geocoding API
GOOGLE_API_KEY = 'your-key-here'
BASE_URL = "https://maps.googleapis.com/maps/api/geocode/json"

# Assuming a reference point for grouping (e.g., centroid)
REFERENCE_LATITUDE = 37.7749
REFERENCE_LONGITUDE = -122.4194

## Utils

### Calculate Haversine Distance

The Haversine formula is used to calculate the great-circle distance between two points on the surface of a sphere, given their longitudes and latitudes. It's commonly used in navigation and geolocation applications. The formula is as follows:

The result gives the distance between the two points along the surface of the sphere.

In [None]:
def haversine(lat1, lon1, lat2, lon2):
    R = 6371.0  # Earth radius in kilometers

    # Convert latitude and longitude from degrees to radians
    lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])

    # Haversine formula
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
    c = 2 * asin(sqrt(a))
    distance = R * c

    return distance

### Get Latitude, Longitude from Address

In [None]:
def get_lat_long_from_address(address):
    params = {
        "address": address,
        "key": GOOGLE_API_KEY,
    }

    response = requests.get(BASE_URL, params=params)
    data = response.json()

    if data["status"] == "OK":
        # Extract latitude and longitude
        location = data["results"][0]["geometry"]["location"]
        latitude = location["lat"]
        longitude = location["lng"]
        return latitude, longitude
    else:
        print(f"Error: {data['status']}")
        return None

### Get Address from Latitude, Longitude

In [None]:
def get_address_from_coords(lat, lon):
    params = {
        "latlng": f"{lat},{lon}",
        "key": GOOGLE_API_KEY
    }

    response = requests.get(BASE_URL, params=params)
    data = response.json()

    if data["status"] == "OK":
        # Extract relevant address components
        address_components = data["results"][0]["address_components"]

        # Extract street name and city, for example
        street = next((comp["long_name"] for comp in address_components if "route" in comp["types"]), None)
        city = next((comp["long_name"] for comp in address_components if "locality" in comp["types"]), None)

        # Construct the shorter address
        shorter_address = f"{street}, {city}" if street and city else f"{lat},{lon}"

        return shorter_address
    else:
        return f"{lat},{lon}"


## Plots

**Plotting a map using Folium to visualize the spread of crime hotspots in the form of a heat map across the regions having bike stations**


This plot will help travellers to visualize and understand the spread of crime across various regions where bike stations are present so that they can decide whether they want to make use of bikes in those areas

In [None]:
def plot_folium_map(stationdf,crimedf):

  #Reference latitude and longitude are defined along with an estimated center of map
  latitude_ref = stationdf.select(expr("percentile_approx(lat, 0.5)").alias("latitude_ref")).first()["latitude_ref"]
  longitude_ref = stationdf.select(expr("percentile_approx(long, 0.5)").alias("longitude_ref")).first()["longitude_ref"]
  map_center = [latitude_ref,longitude_ref]
  heatmap = folium.Map(location=map_center, zoom_start=10)

  # A layer for bike station data and another layer for crime data is added
  stations_group = folium.FeatureGroup("Bike Stations").add_to(heatmap)
  crime_incidents_group = folium.FeatureGroup("Crime Incidents").add_to(heatmap)

  # Convert latitudes, longitudes, and counts to a list of points
  # Latitude and longitude are converted to required point format format
  stationdf = stationdf.withColumn("lat", col("lat").cast("double"))
  stationdf = stationdf.withColumn("long", col("long").cast("double"))
  stations_loc = [[float(row.lat), float(row.long)] for row in stationdf.collect()]

  # Adding the heatmap
  heatmap.add_child(HeatMap(stations_loc,radius=1))

  # Convert Spark DataFrame to Pandas DataFrame
  stations_pandas_df = stationdf.toPandas()
  for index,row in stations_pandas_df.iterrows():
      folium.Marker([float(row['lat']),float(row['long'])],popup=row['name'] + "-" + str(row['dock_count'])).add_to(stations_group)


  # Add crime data to the map

  # Collect Latitude and Longitude columns to Python lists
  latitude_list = crimedf.select("Latitude").rdd.flatMap(lambda x: x).collect()
  longitude_list = crimedf.select("Longitude").rdd.flatMap(lambda x: x).collect()

  crimes_loc = list(zip(latitude_list, longitude_list))
  crime_heatmap=plugins.HeatMap(crimes_loc,radius=5,blur=2)
  crime_incidents_group.add_child(crime_heatmap)

  folium.LayerControl().add_to(heatmap)

  # Display the map
  return heatmap

In [None]:
plot_folium_map(stationdf,crimedf)

In [None]:
# converting to pandas dataframe - as plotly only works with pandas dataframes
stations_pandas_df = stationdf.toPandas()

stations_pandas_df['lat'] = stations_pandas_df['lat'].astype(float)
stations_pandas_df['long'] = stations_pandas_df['long'].astype(float)
stations_pandas_df['dock_count'] = stations_pandas_df['dock_count'].astype(float)

# Create layout
layout = dict(
    mapbox=dict(
        style="open-street-map",
        center=dict(lat=stations_pandas_df['lat'].mean(), lon=stations_pandas_df['long'].mean()),
        zoom=10
    )
)

# Create figure and add traces to it
fig = px.scatter_mapbox(stations_pandas_df, lat="lat", lon="long",color="dock_count", size="dock_count", zoom=10, custom_data=["name","dock_count"])  # An empty plot to which we'll add traces

fig.update_traces(
    hovertemplate="<b>Name:</b> %{customdata[0]}<br><b>Dock count:</b> %{customdata[1]}"
)

# Update layout
fig.update_layout(layout)

# Show the plot
fig.show()

In [None]:
# group crime dataframe rows based on their location by grouping nearest latitude and longitudes

# Add a new column with Haversine distance from the reference point
crimedf = crimedf.withColumn(
    "distance_from_reference",
    haversine(col("Latitude"), col("Longitude"), lit(REFERENCE_LATITUDE), lit(REFERENCE_LONGITUDE))
)

# Define a window specification for ranking by distance
window_spec = Window.orderBy("distance_from_reference")

# Add a rank column based on distance
crimedf = crimedf.withColumn("rank", rank().over(window_spec))

# Group by latitude and longitude and count the number of rows
result_df = crimedf.groupBy("Latitude", "Longitude").count()

# the resultant dataframe will have Latitude, Longitude and count
# now, since the data is reduced, we can convert to pandas and visualize crime data

In [None]:
result_df.columns

['Latitude', 'Longitude', 'count']

In [None]:
# visualize the crime count statistics with plotly


crime_pandas_df = result_df.toPandas()

crime_pandas_df['Latitude'] = crime_pandas_df['Latitude'].astype(float)
crime_pandas_df['Longitude'] = crime_pandas_df['Longitude'].astype(float)

# Create layout
layout = dict(
    mapbox=dict(
        style="open-street-map",
        center=dict(lat=crime_pandas_df['Latitude'].mean(), lon=crime_pandas_df['Longitude'].mean()),
        zoom=10,
    )
)


# Create figure and add traces to it
fig = px.scatter_mapbox(crime_pandas_df, lat="Latitude", lon="Longitude",color="count", size="count", zoom=10)  # An empty plot to which we'll add traces

# fig.update_traces(
#     hovertemplate= "<b>Address:</b> %{customdata[0]}<br>}"
# )

# Update layout
fig.update_layout(layout)

# Show the plot
fig.show()

In [None]:
# Given location
address = "3131 Watkins Drive Riverside CA"
latitude, longitude = get_lat_long_from_address(address)

given_latitude = latitude
given_longitude = longitude
radius_in_miles = 0.5

stationdf_with_distance = stationdf.withColumn(
    "distance",
    haversine(col("lat"), col("long"), lit(REFERENCE_LATITUDE), lit(REFERENCE_LONGITUDE))
)
crimedf_with_distance = crimedf.withColumn(
    "distance",
    haversine(col("Latitude"), col("Longitude"), lit(REFERENCE_LATITUDE), lit(REFERENCE_LONGITUDE))
)

# Filter stations within the given radius
station_result = stationdf_with_distance.filter("distance <= {}".format(radius_in_miles)).drop("distance")
crime_result = crimedf_with_distance.filter("distance <= {}".format(radius_in_miles)).drop("distance")

# Show the result
station_result.show()
crime_result.show()

In [None]:
plot_folium_map(station_result,crime_result)

In [None]:
# Calculate Address for each crime coordinates for visualizing

# Add spark user defined function
get_address_from_coords_udf = udf(get_address_from_coords, StringType())

# Add a new column with the address information using the UDF
result_df = result_df.withColumn("address", get_address_from_coords_udf(result_df["Latitude"], result_df["Longitude"]))

In [None]:
station_result_pandas = station_result.toPandas()
station_result_pandas["lat"] = pd.to_numeric(station_result_pandas["lat"], errors='coerce')
station_result_pandas["long"] = pd.to_numeric(station_result_pandas["long"], errors='coerce')

crime_result_pandas = crime_result.toPandas()

# Create scatter mapbox for the first dataset
fig = px.scatter_mapbox(
    station_result_pandas,
    lat="lat",
    lon="long",
    color="dock_count",  # 'dataset1' is a column to distinguish points from the first dataset
    color_continuous_scale="Viridis",  # Choose a color scale
    zoom=10,
    mapbox_style="carto-positron",
    title="Scatter Mapbox with Two Datasets",
)

# Add scatter mapbox for the second dataset
fig.add_scattermapbox(
    lat=crime_result_pandas["Latitude"],
    lon=crime_result_pandas["Longitude"],
    mode="markers",
    marker=dict(
        size=10,
        color="red",  # Choose a color for the second dataset
    ),
    name="Crime Data",  # Legend label for the second dataset
)

# Update layout and show the figure
fig.update_layout(
    # legend=dict(title=dict(text="Dataset")),
    mapbox=dict(center=dict(lat=station_result_pandas["lat"].mean(), lon=station_result_pandas["long"].mean())),
    legend_traceorder="reversed",  # Reverse the order of the legend items
    legend_y=1.05,  # Adjust the y position of the legend
)
fig.show()

#Data Visualization And Graphs

In [None]:
spark2 = SparkSession.builder.appName("pyspark-geopandas").getOrCreate()
sc = spark2.sparkContext

def wrangle(df):

    # Create a Spark session
    spark = SparkSession.builder.appName("ExampleAppName").getOrCreate()

    # Read CSV file into a Spark DataFrame
    df = spark.read.csv(filepath, header=True, inferSchema=True)
    df = df.dropna()
    date_time_cols = split(df['Incident Date'], ' ')
    df = df.withColumn('date', date_time_cols.getItem(0)).withColumn('time', date_time_cols.getItem(1))
    time_cols = split(df['time'], ':')
    date_cols = split(df['date'], '/')
    df = df.withColumn('hour', time_cols.getItem(0).cast(IntegerType())).withColumn('minute', time_cols.getItem(1).cast(IntegerType())).withColumn('month', date_cols.getItem(0).cast(IntegerType())).withColumn('day', date_cols.getItem(1).cast(IntegerType())).withColumn('year', date_cols.getItem(2).cast(IntegerType()))
    return df

crimedf = wrangle('crime_report.csv')

In [None]:
crimedf = spark.read.csv('crime_report.csv',header=True)

In [None]:
df = pd.read_csv('crime_report.csv')
days = list(df['Incident Day of Week'])
#days = list(crimemod['Incident Day of Week'])
#items = Counter(days)
print(items)

In [None]:
# Extract weekdays and their counts
weekdays = list(items.keys())
counts = list(items.values())

# Plotting the bar graph
plt.bar(weekdays, counts, color='skyblue')
plt.xlabel('Weekdays')
plt.ylabel('Number of Crimes')
plt.title('Number of Crimes on a particular weekday')
plt.show()

In [None]:
#spark2 = SparkSession.builder.appName("pyspark-geopandas").getOrCreate()
#sc = spark2.sparkContext

In [None]:
# Assuming stationdf is a Pandas DataFrame
# You need to create a Spark DataFrame from the Pandas DataFrame
#spark = SparkSession.builder.appName("ExampleAppName").getOrCreate()

# Convert Spark DataFrame to RDD to parallelize the process
rdd_station = stationdf.rdd

# Use map transformation to create a new RDD with the desired structure
# Each element in the new RDD will be a tuple (name, [lat, long])
rdd_locations = rdd_station.map(lambda row: (row["name"], [row["lat"], row["long"]]))

# Collect the results into a dictionary
dict_locations = dict(rdd_locations.collect())

# Show the result
print(dict_locations)

In [None]:
def wrangle(filepath):
#def wrangle(df):
    # Create a Spark session
#    spark = SparkSession.builder.appName("ExampleAppName").getOrCreate()

    # Read CSV file into a Spark DataFrame
    df = spark.read.csv(filepath, header=True, inferSchema=True)
    df = df.dropna()
    df = df.withColumn('duration', round(col('duration') / 60, 2))
    #df = df.withColumn('end_date', to_timestamp(col('end_date'), 'MM/dd/yyyy HH:mm'))
    date_time_cols = split(df['end_date'], ' ')
    df = df.withColumn('date', date_time_cols.getItem(0)).withColumn('time', date_time_cols.getItem(1))
    time_cols = split(df['time'], ':')
    date_cols = split(df['date'], '/')
    df = df.withColumn('hour', time_cols.getItem(0).cast(IntegerType())).withColumn('minute', time_cols.getItem(1).cast(IntegerType())).withColumn('month', date_cols.getItem(0).cast(IntegerType())).withColumn('day', date_cols.getItem(1).cast(IntegerType())).withColumn('year', date_cols.getItem(2).cast(IntegerType()))
    return df

tripmod = wrangle('/content/trip.csv')

In [None]:
tripdf.groupBy("hour","day").count().show()
#tripmod.groupBy("hour","day").count().show()

In [None]:
def wrangle(filepath):
#def wrangle(df):
    # Read CSV file
    df = pd.read_csv('trip.csv')

    # Removing null values
    df.dropna(inplace=True)
    print(df.head())
    print(len([df.columns]))
    # Convert duration seconds to minutes
    df['duration'] = (df['duration'] / 60).round(2)

    # Extract the month from the date
    df['end_date'] = pd.to_datetime(df['end_date'], format='%m/%d/%Y %H:%M')
    df['month'] = df['end_date'].dt.month

    # Extract the day of the week (Monday = 0, Sunday = 6)
    df['start_date'] = pd.to_datetime(df['start_date'], format='%m/%d/%Y %H:%M')
    df['day_of_week'] = df['start_date'].dt.weekday

    # Extract the hour divided in 24h
    df['start_date'] = pd.to_datetime(df['start_date'], format='%m/%d/%Y %H:%M')
    df['hour_of_day'] = df['start_date'].dt.hour


    for index, row in df.iterrows():
        station = row['start_station_name']
        if station in dict_locations:
            df.at[index, 'latitude'] = dict_locations[station][0]
            df.at[index, 'longitude'] = dict_locations[station][1]


    # Reset the index of the DataFrame
    df = df.reset_index()

   # Group by 'day_of_week' and 'hour_of_day', then count the number of trips
    trips_per_hour = df.groupby([df['start_date'].dt.dayofweek, df['start_date'].dt.hour])['id'].transform('count')

    # Create a new column 'trips_per_hour' in the DataFrame
    df['trips_per_hour'] = trips_per_hour

     # Group by 'day_of_week' and 'hour_of_day', then count the number of trips
    trips_per_day = df.groupby([df['start_date'].dt.dayofweek, df['start_date'].dt.day])['id'].transform('count')

    # Create a new column 'trips_per_hour' in the DataFrame
    df['trips_per_day'] = trips_per_day


    return df

tripdf = wrangle('trip.csv')
#tripmod = wrangle(tripdf)

In [None]:
tripdf_plot1=tripdf.groupby("month").count()
tripdf_plot1=tripdf_plot1.sort_values("month")
#tripdf_plot1=tripmod.groupby("month").count()
#tripdf_plot1=tripmod.sort_values("month")

In [None]:
tripdf_plot1=tripdf.groupby("month").count()
tripdf_plot1=tripdf_plot1.sort_values("month")

by_hour_avg_trips = tripdf.groupby('hour_of_day').start_date.count()

Hours = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23']

normalized_hours_trips = (by_hour_avg_trips - by_hour_avg_trips.min()) / (by_hour_avg_trips.max() - by_hour_avg_trips.min())

colors = plt.cm.Oranges(normalized_hours_trips)

plt.barh(Hours, by_hour_avg_trips, color=colors)
plt.xlabel('Number of Trips')
plt.ylabel('Hours')
plt.title('Number of trips per hours')
plt.xticks(rotation='vertical')
plt.show()

In [None]:
dark_colors = ["#99D699", "#B2B2B2",
                (0.8509803921568627, 0.37254901960784315, 0.00784313725490196),
                (0.4588235294117647, 0.4392156862745098, 0.7019607843137254),
                (0.9058823529411765, 0.1607843137254902, 0.5411764705882353),
                (0.4, 0.6509803921568628, 0.11764705882352941),
                (0.9019607843137255, 0.6705882352941176, 0.00784313725490196),
                (0.6509803921568628, 0.4627450980392157, 0.11372549019607843),
                (0.4, 0.4, 0.4)]
rcParams['figure.figsize'] = (12, 9)
rcParams['figure.dpi'] = 150
rcParams['lines.linewidth'] = 2
rcParams['axes.facecolor'] = "white"
rcParams['axes.titlesize'] = 20
rcParams['axes.labelsize'] = 17.5
rcParams['xtick.labelsize'] = 15
rcParams['ytick.labelsize'] = 15
rcParams['legend.fontsize'] = 17.5
rcParams['patch.edgecolor'] = 'none'
rcParams['grid.color']="white"
rcParams['grid.linestyle']="-"
rcParams['grid.linewidth'] = 1
rcParams['grid.alpha']=1
rcParams['text.color'] = "444444"
rcParams['axes.labelcolor'] = "444444"
rcParams['ytick.color'] = "444444"
rcParams['xtick.color'] = "444444"

In [None]:
tripdf = pd.read_csv('trip.csv')
stationdf = pd.read_csv('station.csv')
stationdf.head()

In [None]:
stationdf.dtypes

In [None]:
h_map = folium.Map([stationdf["lat"].median(),stationdf["long"].median()],zoom_start=10,tiles='Stamen Toner')

In [None]:
stationdf["lat"] = stationdf["lat"].apply(lambda x:str(x))
stationdf["long"] = stationdf["long"].apply(lambda x:str(x))
stationdf.head()

In [None]:
location_station = [[float(stationdf.lat.values[i]),float(stationdf.long.values[i])] for i in range(len(stationdf))]
h_map.add_child(HeatMap(location_station,radius=10))
for index,row in stationdf.iterrows():
    folium.Marker([float(row['lat']),float(row['long'])],popup=row['name']).add_to(h_map)
h_map

In [None]:
tripdf['start_date'] = pd.to_datetime(tripdf['start_date'])
tripdf['end_date'] = pd.to_datetime(tripdf['end_date'])

In [None]:
start_station = stationdf[["id","lat","long"]]
start_station.columns = ["start_station_id","start_lat","start_long"]
end_station = stationdf[["id","lat","long"]]
end_station.columns = ["end_station_id","end_lat","end_long"]
tripdf = tripdf.merge(start_station,on="start_station_id")
tripdf = tripdf.merge(end_station,on="end_station_id")

In [None]:
pl_dc = dict()
for index,row in tripdf.iterrows():
    start_lat = row['start_lat']
    start_long = row['start_long']
    end_lat = row['end_lat']
    end_long = row['end_long']
    key = str(start_lat)+'_'+str(start_long)+'_'+str(end_lat)+'_'+str(end_long)
    if key in pl_dc:
        pl_dc[key] += 1
    else:
        pl_dc[key] = 1

In [None]:
start_lat = []
start_long = []
end_lat = []
end_long = []
nb_trips = []
for key,value in pl_dc.items():
    start_lat.append(float(key.split('_')[0]))
    start_long.append(float(key.split('_')[1]))
    end_lat.append(float(key.split('_')[2]))
    end_long.append(float(key.split('_')[3]))
    nb_trips.append(int(value))

In [None]:
tempdf = pd.DataFrame({"start_lat":start_lat,"start_long":start_long,"end_lat":end_lat,"end_long":end_long,"nb_trips":nb_trips})
tempdf.nb_trips.plot()

In [None]:
ave_lat = (tempdf.start_lat.median()+tempdf.end_lat.median())/2
ave_lon = (tempdf.start_long.median()+tempdf.end_long.median())/2
directions_map = folium.Map(location=[ave_lat, ave_lon], zoom_start=15)

In [None]:
for index,row in tempdf.iterrows():
    points = []
    points.append(tuple([row['start_lat'],row['start_long']]))
    points.append(tuple([row['end_lat'],row['end_long']]))
    folium.PolyLine(points,color='red',weight=row['nb_trips']/1000).add_to(directions_map)

In [None]:
for index,row in stationdf.iterrows():
    folium.Marker([float(row['lat']),float(row['long'])],popup=row['name']).add_to(directions_map)
directions_map

In [None]:
fig, ax1 = plt.subplots(figsize = (10,7))
ax1.grid(zorder=1)
ax1.xaxis.grid(False)
trip_dur = tripdf['duration'].values/60
plt.hist(trip_dur, bins = range(0,45,2),density=True,zorder=0,color=dark_colors[1])
plt.xlabel('Trip Duration (Minutes)')
plt.ylabel('Percent of Trips')
plt.title('Trip Duration Distribution')
plt.figure(figsize=(15,12))
hist, bin_edges = np.histogram(trip_dur, range(0,45,1), normed=True)
cum_trip_dur = np.cumsum(hist)
ax2 = ax1.twinx()
ax2.plot(range(1,45,1),cum_trip_dur,c=dark_colors[0])
ax2.set_ylabel('Cumulative Proportion of Trips')

In [None]:
tripdf['week']=tripdf.start_date.dt.dayofweek
tripdf['start_hour'] = tripdf.start_date.dt.hour
tripdf['start_day'] = tripdf.end_date.dt.hour
tripdf['end_day'] = tripdf.end_date.dt.day

In [None]:
plt.figure(figsize=(10,8))
weekdaytrips_df = tripdf.loc[(tripdf.duration <= 7200) & (tripdf.week <5)]
weekdaytrips_df.boxplot(column="duration",by="start_hour",figsize=(10,8))
plt.ylim(0,3600)
plt.ylabel('Trip Duration (Seconds)')
plt.xlabel('Hour of Day')
plt.title('Trip Duration Distribution Over Time of Day (Week Days)')

In [None]:
plt.figure(figsize=(10,8))
weekendtrips_df = tripdf.loc[(tripdf.duration <= 7200) & (tripdf.week >4)]
weekendtrips_df.boxplot(column="duration",by="start_hour",figsize=(10,8))
plt.ylim(0,3600)
plt.ylabel('Trip Duration (Seconds)')
plt.xlabel('Hour of Day')
plt.title('Trip Duration Distribution Over Time of Day (Weekend days)')