


## Problem statement:

> Optimize the shipping route of various goods across different cities for the
truck driver, in a way that minimizes the fuel consumption.
The truck begins and ends its journey in Dallas TX, covering all the cities
below.

<table>
  <tr>
    <td>New York City, NY</td>
    <td>Washington, DC</td>
    <td>Baltimore, MD</td>
    <td>Boston, MA</td>
  </tr>
  <tr>
    <td>Denver, CO</td>
    <td>Orlando, FL</td>
    <td>Miami, FL</td>
    <td>Chicago, IL</td>
  </tr>
  <tr>
    <td>Salt Lake City, UT</td>
    <td>Seattle, WA</td>
    <td>Houston, TX</td>
    <td>Las Vegas, NV</td>
  </tr>
  <tr>
    <td>Nashville, TN</td>
    <td>Cincinnati, OH</td>
    <td>Detroit, MI</td>
    <td>Pittsburgh, PA</td>
  </tr>
</table>

## Solution:

>Approach to the solution
>- Use the dataset containing Longitude and Latitude data to generate a
>distance matrix.
>- Google Maps API will be used to generate the distance matrix.
>- All the data will be stored in MongoDB.
>- To process the data and find the shortest distance we will leverage
>Graphframes which is Apache Spark’s API for graphs and graph-
>parallel computation.
>- Alternatively, the same can also be achieved with only Apache Spark,
>Spark MLlib & GraphX as well.
>Data


# Building A Distance Matrix

### Installing necessary modules

In [None]:
!pip install pyspark
!set PYSPARK_SUBMIT_ARGS="--master local[*] pyspark-shell"
!pip install osmnx
!pip install networkx
!pip install folium
!pip install pymongo

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
!pip install numpy --upgrade
!pip install scikit-learn

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


### Importing libraries

In [None]:
import pandas as pd
import numpy as np
from math import ceil
from datetime import datetime

import networkx as nx
import osmnx as ox
import folium
import requests
import time

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import col, round
from pyspark.sql import Row
from pyspark.sql.types import DoubleType, StructField, StructType, StringType

### Mounting to Google Drive

In [None]:
# mount Google Drive
from google.colab import drive
drive.mount('/content/drive')
drive_path = '/content/drive/MyDrive/Final Project/'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
spark = SparkSession.builder \
    .appName("RouteOptimization") \
    .getOrCreate()

### Preprocessing the dataset

In [None]:
uscities_df = spark.read.csv(drive_path + 'uscities.csv', header=True, inferSchema=True)
uscities_df.show()

+-------------+-------------+--------+--------------------+-----------+--------------------+-------+---------+----------+-------+------+--------+------------+-------------------+-------+--------------------+----------+
|         city|   city_ascii|state_id|          state_name|county_fips|         county_name|    lat|      lng|population|density|source|military|incorporated|           timezone|ranking|                zips|        id|
+-------------+-------------+--------+--------------------+-----------+--------------------+-------+---------+----------+-------+------+--------+------------+-------------------+-------+--------------------+----------+
|     New York|     New York|      NY|            New York|      36081|              Queens|40.6943| -73.9249|  18972871|10768.2| shape|   false|        true|   America/New_York|      1|11229 11226 11225...|1840034016|
|  Los Angeles|  Los Angeles|      CA|          California|       6037|         Los Angeles|34.1141|-118.4068|  12121244| 32

In [None]:
cities = ['Dallas, TX', 'New York, NY', 'Washington, DC', 'Baltimore, MD', 'Boston, MA',
          'Denver, CO', 'Orlando, FL', 'Miami, FL', 'Chicago, IL',
          'Salt Lake City, UT', 'Seattle, WA', 'Houston, TX', 'Las Vegas, NV',
          'Nashville, TN', 'Cincinnati, OH', 'Detroit, MI', 'Pittsburgh, PA']

cities = [(c.split(',')[0].strip(), c.split(',')[1].strip()) for c in cities]
print(cities)

[('Dallas', 'TX'), ('New York', 'NY'), ('Washington', 'DC'), ('Baltimore', 'MD'), ('Boston', 'MA'), ('Denver', 'CO'), ('Orlando', 'FL'), ('Miami', 'FL'), ('Chicago', 'IL'), ('Salt Lake City', 'UT'), ('Seattle', 'WA'), ('Houston', 'TX'), ('Las Vegas', 'NV'), ('Nashville', 'TN'), ('Cincinnati', 'OH'), ('Detroit', 'MI'), ('Pittsburgh', 'PA')]


In [None]:
cities_df = None
for city in cities:
    city_df = uscities_df.filter((uscities_df['city'] == city[0]) & (uscities_df['state_id'] == city[1]))
    if cities_df is None:
        cities_df = city_df
    else:
        cities_df = cities_df.union(city_df)

print(cities_df.count())
cities_df.show()

17
+--------------+--------------+--------+--------------------+-----------+--------------------+-------+---------+----------+-------+------+--------+------------+-------------------+-------+--------------------+----------+
|          city|    city_ascii|state_id|          state_name|county_fips|         county_name|    lat|      lng|population|density|source|military|incorporated|           timezone|ranking|                zips|        id|
+--------------+--------------+--------+--------------------+-----------+--------------------+-------+---------+----------+-------+------+--------+------------+-------------------+-------+--------------------+----------+
|        Dallas|        Dallas|      TX|               Texas|      48113|              Dallas|32.7935| -96.7667|   5668165| 1522.2| shape|   false|        true|    America/Chicago|      1|75287 75098 75234...|1840019440|
|      New York|      New York|      NY|            New York|      36081|              Queens|40.6943| -73.9249| 

### Visualizing the 16 Cities On A Map

In [None]:
# Create a map centered on the United States
map_us = folium.Map(location=[37.0902, -95.7129], zoom_start=4)

# Extract the latitude and longitude coordinates for the 16 cities
cities = cities_df.select('city', 'state_id', 'lat', 'lng').limit(16).collect()

# Add markers for the cities to the map
for city in cities:
    city_name = city['city']
    state_name = city['state_id']
    lat = city['lat']
    lng = city['lng']
    marker_text = f"{city_name}, {state_name}"
    folium.Marker(
        location=[lat, lng],
        tooltip=marker_text,
        icon=folium.Icon(icon="info-sign")
    ).add_to(map_us)

# Display the map
map_us

In [None]:
# Define the API base URL
base_url = "https://routing.openstreetmap.de/routed-car/route/v1/driving/"

# Define a function to make API calls with a buffer time between requests
def make_api_call(origin, destination):
    # Construct the API request URL
    url = f"{base_url}{origin[1]},{origin[0]};{destination[1]},{destination[0]}?overview=false"

    # Set headers to mimic a user agent and avoid bot detection
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.71 Safari/537.36"
    }

    # Send the API request
    response = requests.get(url, headers=headers)

    # Wait for 5 seconds before making the next API call
    time.sleep(6.5)

    return response

In [None]:
# Create an empty distance matrix
num_cities = cities_df.count()
distance_matrix = np.zeros((num_cities, num_cities))

error_log = []
# Iterate through each row in cities_df
for i, row in enumerate(cities_df.collect()):
    origin_city = (row['lat'], row['lng'])

    for j, next_row in enumerate(cities_df.collect()):
        destination_city = (next_row['lat'], next_row['lng'])

        # Make the API call to get the road distance
        response = make_api_call(origin_city, destination_city)

        # Parse the response JSON
        data = response.json()

        try:
          # Extract the distance from the response
          distance = data["routes"][0]["distance"]
        except:
          print('Error!')
          print(f"{row['city']} -> {next_row['city']}")
          print(f"{origin_city} -> {destination_city}\n\n")
          error_log.append(data)

        # Store the distance in the distance matrix
        distance_matrix[i][j] = distance

In [None]:
# Create a list of Row objects
distance_data = [
    Row(city=row['city'], **{cities_df.collect()[j]['city']: \
                             float(distance_matrix[i][j]) \
                             for j in range(num_cities)})
    for i, row in enumerate(cities_df.collect())
]

# Define the schema for the distance_df DataFrame
schema = StructType([
    StructField('city', StringType(), nullable=False),
    *[StructField(cities_df.collect()[j]['city'], DoubleType(), nullable=True) \
      for j in range(num_cities)]
])

In [None]:
# Create the distance_df PySpark DataFrame
distance_df = spark.createDataFrame(distance_data, schema)

# Print the distance DataFrame
distance_df.show()

+--------------+---------+---------+----------+---------+---------+---------+---------+---------+---------+--------------+---------+---------+---------+---------+----------+---------+----------+
|          city|   Dallas| New York|Washington|Baltimore|   Boston|   Denver|  Orlando|    Miami|  Chicago|Salt Lake City|  Seattle|  Houston|Las Vegas|Nashville|Cincinnati|  Detroit|Pittsburgh|
+--------------+---------+---------+----------+---------+---------+---------+---------+---------+---------+--------------+---------+---------+---------+---------+----------+---------+----------+
|        Dallas|      0.0|2498996.4| 2136532.8|2197741.9|2830711.1|1278071.1|1750505.6|2096232.3|1531167.7|     1999876.1|3381032.8| 383149.7|1978048.9|1064704.3| 1503412.4|1882247.3| 1964528.2|
|      New York|2498108.9|      0.0|  369580.3| 309529.3| 343015.8|2878359.7|1749537.5|2062875.8|1285736.6|     3519305.6|4588518.3|2631039.8|4083271.9|1432726.5| 1038190.9|1008213.2|  603345.3|
|    Washington|2138060.8

In [None]:
# Convert the distance values from meters to miles and round to 2 decimal points
distance_df = distance_df.select(
    col('city'),
    *((round(col(city) / 1609.34, 2)).alias(city) \
      for city in distance_df.columns[1:])
)

In [None]:
# Print the distance DataFrame
distance_df.show()

+--------------+-------+--------+----------+---------+-------+-------+-------+-------+-------+--------------+-------+-------+---------+---------+----------+-------+----------+
|          city| Dallas|New York|Washington|Baltimore| Boston| Denver|Orlando|  Miami|Chicago|Salt Lake City|Seattle|Houston|Las Vegas|Nashville|Cincinnati|Detroit|Pittsburgh|
+--------------+-------+--------+----------+---------+-------+-------+-------+-------+-------+--------------+-------+-------+---------+---------+----------+-------+----------+
|        Dallas|    0.0| 1552.81|   1327.58|  1365.62|1758.93| 794.16|1087.72|1302.54| 951.43|       1242.67|2100.88| 238.08|  1229.11|   661.58|    934.18|1169.58|    1220.7|
|      New York|1552.26|     0.0|    229.65|   192.33| 213.14|1788.53|1087.11|1281.81| 798.92|        2186.8|2851.18|1634.86|  2537.23|   890.26|     645.1| 626.48|     374.9|
|    Washington|1328.53|  228.99|       0.0|    37.97| 434.68|1646.04| 857.95|1052.65| 697.97|       2085.85|2750.23|141

In [None]:
# Repartition the DataFrame into a single partition
distance_df_single_partition = distance_df.coalesce(1)

# Save the distance_df DataFrame as a single CSV file
distance_df_single_partition.write.csv(drive_path + '16C_Distance_Matrix.csv', \
                                       header=True)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder \
    .appName("CSV to MongoDB") \
    .config("spark.mongodb.output.uri", "mongodb+srv://neeleshkuntimala13:kMXD6fPSMspLRbUf@cluster0>.mongodb.net/test.<mycol>") \
    .config("spark.mongodb.output.database", "<mydb>") \
    .config("spark.mongodb.output.collection", "<mycol>") \
    .getOrCreate()

In [None]:
# df = spark.read \
#     .option("header", "true") \
#     .csv(drive_path + "16C_Distance_Matrix.csv")

# # Define the schema for the distance_df DataFrame
# schema = StructType([
#     StructField('city', StringType(), nullable=False),
#     *[StructField(cities_df.collect()[j]['city'], DoubleType(), nullable=True) \
#       for j in range(num_cities)]
# ])

df = spark.read.csv(
    '/content/drive/MyDrive/Final Project/16C_Distance_Matrix (1).csv',
    header=True,
    inferSchema=True
)
df.show()

+--------------+-------+--------+----------+---------+-------+-------+-------+-------+-------+--------------+-------+-------+---------+---------+----------+-------+----------+
|          city| Dallas|New York|Washington|Baltimore| Boston| Denver|Orlando|  Miami|Chicago|Salt Lake City|Seattle|Houston|Las Vegas|Nashville|Cincinnati|Detroit|Pittsburgh|
+--------------+-------+--------+----------+---------+-------+-------+-------+-------+-------+--------------+-------+-------+---------+---------+----------+-------+----------+
|        Dallas|    0.0| 1552.81|   1327.58|  1365.62|1758.93| 794.16|1087.72|1302.54| 951.43|       1242.67|2100.88| 238.08|  1229.11|   661.58|    934.18|1169.58|    1220.7|
|      New York|1552.26|     0.0|    229.65|   192.33| 213.14|1788.53|1087.11|1281.81| 798.92|        2186.8|2851.18|1634.86|  2537.23|   890.26|     645.1| 626.48|     374.9|
|    Washington|1328.53|  228.99|       0.0|    37.97| 434.68|1646.04| 857.95|1052.65| 697.97|       2085.85|2750.23|141

In [None]:
df.write.format("mongo") \
  .mode("append") \
  .option("uri", "mongodb+srv://neeleshkuntimala13:kMXD6fPSMspLRbUf@cluster0.v56duym.mongodb.net/?retryWrites=true&w=majority") \
  .option("database", "<mydb>") \
  .option("collection", "<mycol>") \
  .save()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder \
    .appName("Read from MongoDB") \
    .config("spark.mongodb.input.uri", "mongodb+srv://neeleshkuntimala13:kMXD6fPSMspLRbUf@cluster0.v56duym.mongodb.net/?retryWrites=true&w=majority") \
    .config("spark.mongodb.input.database", "<mydb>") \
    .config("spark.mongodb.input.collection", "<mycol>") \
    .config("spark.mongodb.input.readPreference.name", "secondaryPreferred") \
    .getOrCreate()

df = spark.read.format("mongo") \
    .option("uri", "mongodb+srv://neeleshkuntimala13:kMXD6fPSMspLRbUf@cluster0.v56duym.mongodb.net/?retryWrites=true&w=majority") \
    .option("database", "<mydb>") \
    .option("collection", "<mycol>") \
    .load()

df.show()

+--------------+-------+--------+----------+---------+-------+-------+-------+-------+-------+--------------+-------+-------+---------+---------+----------+-------+----------+
|          city| Dallas|New York|Washington|Baltimore| Boston| Denver|Orlando|  Miami|Chicago|Salt Lake City|Seattle|Houston|Las Vegas|Nashville|Cincinnati|Detroit|Pittsburgh|
+--------------+-------+--------+----------+---------+-------+-------+-------+-------+-------+--------------+-------+-------+---------+---------+----------+-------+----------+
|        Dallas|    0.0| 1552.81|   1327.58|  1365.62|1758.93| 794.16|1087.72|1302.54| 951.43|       1242.67|2100.88| 238.08|  1229.11|   661.58|    934.18|1169.58|    1220.7|
|      New York|1552.26|     0.0|    229.65|   192.33| 213.14|1788.53|1087.11|1281.81| 798.92|        2186.8|2851.18|1634.86|  2537.23|   890.26|     645.1| 626.48|     374.9|
|    Washington|1328.53|  228.99|       0.0|    37.97| 434.68|1646.04| 857.95|1052.65| 697.97|       2085.85|2750.23|141