Delivery Route Optimization, using Spark to process large volumes of location and traffic data in real-time, optimizing delivery routes to reduce costs and time.

**Databricks Environment Setup**

Create a Databricks cluster with Apache Spark support.


In [0]:
#Required libraries
%pip install networkx
%pip install geopy
%pip install googlemaps


Python interpreter will be restarted.
Collecting networkx
  Using cached networkx-3.2.1-py3-none-any.whl (1.6 MB)
Installing collected packages: networkx
Successfully installed networkx-3.2.1
Python interpreter will be restarted.
Python interpreter will be restarted.
Python interpreter will be restarted.
Python interpreter will be restarted.
Collecting googlemaps
  Downloading googlemaps-4.10.0.tar.gz (33 kB)
Building wheels for collected packages: googlemaps
  Building wheel for googlemaps (setup.py): started
  Building wheel for googlemaps (setup.py): finished with status 'done'
  Created wheel for googlemaps: filename=googlemaps-4.10.0-py3-none-any.whl size=40716 sha256=2fd5729e067626e5b63129c536824192d54d011089b6dece17e24dad4854b4dd
  Stored in directory: /root/.cache/pip/wheels/d9/5f/46/54a2bdb4bcb07d3faba4463d2884865705914cc72a7b8bb5f0
Successfully built googlemaps
Installing collected packages: googlemaps
Successfully installed googlemaps-4.10.0
Python interpreter will be restar

In [0]:
# check installments
import networkx as nx
import geopy
from googlemaps import Client


In [0]:
#Create folders
dbutils.fs.mkdirs("/mnt/lhdw/landingzone/route/toprocess")
dbutils.fs.mkdirs("/mnt/lhdw/landingzone/route/processed")
dbutils.fs.mkdirs("/mnt/lhdw/bronze")
dbutils.fs.mkdirs("/mnt/lhdw/silver")
dbutils.fs.mkdirs("/mnt/lhdw/gold")

Out[2]: True

In [0]:
# List contents of the specified directory
folders = dbutils.fs.ls("/mnt/lhdw/landingzone/route/")
for folder in folders:
    print(folder.name)

processed/
to-process/
toprocess/


## Import files 

Basic data to map locations, orders and logistics constraints.

In [0]:
# To load 1 file
# import urllib.request

# # GitHub URL file
# url = 'https://github.com/VALQUIRIAFABRO/databricks/blob/main/logistic-delivery-route/orders.csv'

# # Path for temporary cluster instance
# temp_path = '/tmp/orders.csv'

# # insert file into temp path
# urllib.request.urlretrieve(url, temp_path)

# # destination path to DBFS
# dbfs_path = '/mnt/lhdw/landingzone/route/toprocess/orders.csv'

# # Move file to DBFS
# dbutils.fs.cp(f'file:{temp_path}', f'dbfs:{dbfs_path}')

# print(f"File downloaded and saved in: {dbfs_path}")

import urllib.request

# GitHub URL file
urls = [
    'https://github.com/VALQUIRIAFABRO/databricks/blob/main/logistic-delivery-route/orders.csv',
    'https://github.com/VALQUIRIAFABRO/databricks/blob/main/logistic-delivery-route/locations.csv',
    'https://github.com/VALQUIRIAFABRO/databricks/blob/main/logistic-delivery-route/constraints.csv'
]

# Caminho da instância temporária
temp_base_path = '/tmp/'

# Caminho base no DBFS
dbfs_base_path = '/mnt/lhdw/landingzone/route/toprocess/'

for url in urls:
    # Nome do arquivo (extraído da URL)
    file_name = url.split('/')[-1]
    
    # Caminhos temporário e destino
    temp_path = f'{temp_base_path}{file_name}'
    dbfs_path = f'{dbfs_base_path}{file_name}'
    
    # Fazer o download do arquivo
    urllib.request.urlretrieve(url, temp_path)
    
    # Mover o arquivo para o DBFS
    dbutils.fs.cp(f'file:{temp_path}', f'dbfs:{dbfs_path}')
    
    print(f"Arquivo {file_name} baixado e salvo em: {dbfs_path}")


Arquivo orders.csv baixado e salvo em: /mnt/lhdw/landingzone/route/toprocess/orders.csv
Arquivo locations.csv baixado e salvo em: /mnt/lhdw/landingzone/route/toprocess/locations.csv
Arquivo constraints.csv baixado e salvo em: /mnt/lhdw/landingzone/route/toprocess/constraints.csv


In [0]:
# you can list the files and directories within the specified path in DBFS Databricks Utilities (dbutils)
dbutils.fs.ls("/mnt/lhdw/landingzone/route/toprocess/")

Out[14]: [FileInfo(path='dbfs:/mnt/lhdw/landingzone/route/toprocess/constraints.csv', name='constraints.csv', size=213769, modificationTime=1742337999000),
 FileInfo(path='dbfs:/mnt/lhdw/landingzone/route/toprocess/locations.csv', name='locations.csv', size=214254, modificationTime=1742337998000),
 FileInfo(path='dbfs:/mnt/lhdw/landingzone/route/toprocess/orders.csv', name='orders.csv', size=213741, modificationTime=1742337998000)]

## Medalion Architecture - Bronze Layer

This code is configuring and creating a SparkSession optimized for loading data into a "Bronze" table or layer, which is typically the first stage of the data pipeline in a Delta Lake or data lakehouse architecture.

**Code Explanation**

1. Initializes the process of building a SparkSession. 

`SparkSession.builder`


2. Sets the name of the application to "Load Data Bronze". This name helps identify the application when running it, for instance, in the Spark UI.

`.appName("Load Data Bronze")`

3. Sets the default number of partitions for shuffling operations. Shuffling happens during operations like join or groupBy. Setting it to 200 means the shuffle output will be split into 200 partitions, which can help optimize performance based on your data size. In this project we are using files with small size, so consider increase the size according to the amount of data you are ingesting.

number of partitions = number of CPU cores * 2 or 3

`.config("spark.sql.shuffle.partitions", "20")`

4. Sets the maximum size of a single partition when reading files. Here it's set to 128MB, which ensures that large files are divided into manageable chunks for processing and avoid creating too many small files, which can degrade read and write performance. This helps ensure that Spark uses all available cores.

`.config("spark.sql.files.maxPartitionBytes", "128MB")`

5. Specifies the compression codec for saving data in Parquet format. "Snappy" is a lightweight, fast compression format often used for columnar storage like Parquet.

`.config("spark.sql.parquet.compression.codec", "snappy")`

6. Enables Adaptive Query Execution (AQE), a feature in Spark that dynamically and automatically optimizes execution plan at runtime based on data size being processed. It improves performance for queries involving skewed data or large shuffle stages.

`.config("spark.sql.adaptive.enabled", "true")`

7. Finalizes and creates the SparkSession object. If a SparkSession already exists, it reuses the existing one instead of creating a new instance.

`.getOrCreate()`



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Start SparkSession
spark = SparkSession.builder \
    .appName("Load Data Bronze") \
    .config("spark.sql.shuffle.partitions", "200")  \
    .config("spark.sql.files.maxPartitionBytes", "128MB") \
    .config("spark.sql.parquet.compression.codec", "snappy") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

lz_path_in = "/mnt/lhdw/landingzone/route/toprocess"
lz_path_out = "/mnt/lhdw/landingzone/route/processed"
bronze_path = "/mnt/lhdw/bronze/route"    

In [0]:
# Define Raw data Schema 
# Define schemas for each file
schema_mapping = {
    "orders.csv": StructType([
        StructField("order_id", IntegerType(), True),
        StructField("customer_id", IntegerType(), True),
        StructField("delivery_window", DateType(), True)
    ]),
    "locations.csv": StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("latitude", IntegerType(), True),
        StructField("longitude", IntegerType(), True)
    ]),
    "constraints.csv": StructType([
        StructField("constraint", StringType(), True),
        StructField("limit", IntegerType(), True)
    ])
}

# Function to get schema dynamically based on filename
def get_schema(file_name):
    return schema_mapping.get(file_name, None)

# Looping through files dynamically
file_list = ["orders.csv", "locations.csv", "constraints.csv"] 

for file_name in file_list:
    schema = get_schema(file_name)
    if schema:       
        df_vendas = spark.read.option("header", "true").schema(schema).csv(lz_path_in) \
                              .withColumn("filename", regexp_extract(input_file_name(), "([^/]+)$", 0))
        distinct_filenames = df_vendas.select("filename").distinct()
        

+--------+-----------+---------------+-------------+
|order_id|customer_id|delivery_window|     filename|
+--------+-----------+---------------+-------------+
|    null|       null|           null|locations.csv|
|    null|       null|           null|locations.csv|
|    null|       null|           null|locations.csv|
|    null|       null|           null|locations.csv|
|    null|       null|           null|locations.csv|
|    null|       null|           null|locations.csv|
|    null|       null|           null|locations.csv|
|    null|       null|           null|locations.csv|
|    null|       null|           null|locations.csv|
|    null|       null|           null|locations.csv|
|    null|       null|           null|locations.csv|
|    null|       null|           null|locations.csv|
|    null|       null|           null|locations.csv|
|    null|       null|           null|locations.csv|
|    null|       null|           null|locations.csv|
|    null|       null|           null|location

In [0]:
display(distinct_filenames)

filename
locations.csv
constraints.csv
orders.csv


In [0]:
# Print the schema mapping
print("Schema Mapping:")
for file_name, schema in schema_mapping.items():
    print(f"File: {file_name}, Schema: {schema}")

Schema Mapping:
File: orders.csv, Schema: StructType([StructField('order_id', IntegerType(), True), StructField('customer_id', IntegerType(), True), StructField('delivery_window', DateType(), True)])
File: locations.csv, Schema: StructType([StructField('id', IntegerType(), True), StructField('name', StringType(), True), StructField('latitude', IntegerType(), True), StructField('longitude', IntegerType(), True)])
File: constraints.csv, Schema: StructType([StructField('constraint', StringType(), True), StructField('limit', IntegerType(), True)])
