In [2]:
# Library imports
import json, time, datetime as dt, requests
from typing import List, Tuple, Dict, Any, Optional
# from pyspark.sql import functions as F, types as T

In [None]:
# Configurations for Google Maps Routes API pipeline
LAKEHOUSE_DB = "ETAs_Por_Rotas_Diarios" # database Delta Lakehouse
TABLE_NAME   = "route_metricas"         # Delta Lakehouse table name
USE_POLYLINES_FILE = True
N_VIAS = 8                              # number of routes to be requested from Google Maps API

# API Keys for Google Maps Routes API
GOOGLE_API_KEY = "AIzaSyD1lPUonZhepCHlQ86Cb_dwe-eU5b6j64s" # replace with your actual API key. It is recommended to use environment variables or a secure vault for sensitive information.

# Endpoint
ROUTES_API_ENDPOINT = "https://routes.googleapis.com/directions/v2:computeRoutes"

In [4]:
# Polyline Decoder
def decode_polyline(polyline_str: str) -> List[Tuple[float, float]]:
    # Decodes a polyline that was encoded using the Google Maps method.
    # See https://developers.google.com/maps/documentation/utilities/polylinealgorithm
    coords = []
    index = lat = lng = 0
    length = len(polyline_str)
    while index < length:
        shift = result = 0
        # Decode latitude
        while True:
            b = ord(polyline_str[index]) - 63; index += 1
            result |= (b & 0x1f) << shift; shift += 5
            if b < 0x20: break
        dlat = ~(result >> 1) if (result & 1) else (result >> 1); lat += dlat

        shift = result = 0
        # Decode longitude
        while True:
            b = ord(polyline_str[index]) - 63; index += 1
            result |= (b & 0x1f) << shift; shift += 5
            if b < 0x20: break
        dlng = ~(result >> 1) if (result & 1) else (result >> 1); lng += dlng

        coords.append((lat / 1e5, lng / 1e5))
    return coords

In [5]:
# Select n evenly spaced via points from a list of points (excluding start and end points)
def sample_via_points(points: List[Tuple[float,float]], n: int = 8) -> List[Tuple[float,float]]:
    # If there are not enough points to sample, return an empty list
    if len(points) <= 2 or n <= 0: return []
    # Calculate step size and select indices
    step = len(points) / (n + 1)
    idxs = [int(round(step * k)) for k in range(1, n + 1)]
    idxs = [min(max(i, 1), len(points)-2) for i in idxs]
    # Remove duplicates while preserving order
    seen, uniq = set(), []
    for i in idxs:
        if i not in seen:
            seen.add(i); uniq.append(i)
    return [points[i] for i in uniq]

In [6]:
# Helper to convert lat,lng to API format
def to_latlng(lat: float, lng: float) -> Dict[str, Any]:
    return {"latLng": {"latitude": lat, "longitude": lng}}

# Parse duration string (e.g., "1234s") to seconds
def parse_duration_to_seconds(duration_str: Optional[str]) -> Optional[int]:
    # API returns "1234s"
    if not duration_str or not duration_str.endswith("s"):
        return None
    try:
        return int(float(duration_str[:-1]))
    except Exception:
        return None

# Get future departure time in seconds since epoch (UTC)
def future_departure_seconds(buffer_sec: int = 120) -> int:
    # guarantees future time in UTC (now + buffer)
    return int((dt.datetime.now(dt.timezone.utc) + dt.timedelta(seconds=buffer_sec)).timestamp())

In [7]:
# Compute route ETA using fixed via waypoints
def compute_route_eta_fixed(points: List[Tuple[float,float]], n_vias: int = 8) -> Dict[str, Any]:
    if len(points) < 2:
        raise ValueError("Invalid polyline (less than 2 points).")

    # Prepare request body
    origin = points[0]
    destination = points[-1]
    # Sample via points
    vias = sample_via_points(points, n=n_vias)

    # Make API request
    headers = {
        "X-Goog-Api-Key": GOOGLE_API_KEY,
        "X-Goog-FieldMask": "routes.duration,routes.staticDuration,routes.distanceMeters"
    }
    # Request body
    body = {
        "origin": {"location": to_latlng(*origin)},
        "destination": {"location": to_latlng(*destination)},
        "intermediates": [{"via": True, "location": to_latlng(lat, lng)} for lat, lng in vias],
        "travelMode": "DRIVE",
        "routingPreference": "TRAFFIC_AWARE_OPTIMAL",                 # ETA com trÃ¡fego
        "departureTime": {"seconds": future_departure_seconds(120)},
        "computeAlternativeRoutes": False                              # rota deve seguir os 'via'
    }
    # Send request and handle response
    resp = requests.post(ROUTES_API_ENDPOINT, headers=headers, json=body, timeout=30)
    if resp.status_code != 200:
        raise RuntimeError(f"Routes API error {resp.status_code}: {resp.text}")
    return resp.json()


In [11]:
# Example polylines for testing
# You can replace these with actual encoded polylines using the utils/save_polylines.py
POLYLINES = [
                "lts_B~fgdHJVLNLDXFPAl@QnBfFiDvAeBh@_BI_HqAo@S]S_@g@Oa@SiBGI[qEEiA{@{My@}LUqDs@_Jm@sKUgCo@oDq@kCiAgD}@{BsAmCk@}@oA}BgAyBWo@uAkCkAsBuHgOeBeDoGiMqCmFmDwHsIuPyA}CmA}CkAeDaAeDwIc\\Qw@m@eDi@eCkHeXgKg`@oFcSw@mEMmAIoB@gAH{Bn@sJtCka@ZyEF_B?oGO{JSgGEuBQwAWqA]}@_AeBeC}Ce@u@i@aAe@kAsEoPwA}Fi@oB_DmJ]aBMeAI{ABuAlEsi@zC{_@dA{KrBiWvDud@HoAXoBLSp@iCl@qB@k@Ee@q@}BqA_E}A}Du@aB[O_@]{D_PuBaJYgA_@eBeB_Ho@}C{@{Cm@eCUs@c@iAkAgCu@sAgNqYuAyCoNyYoIaQm@y@w@q@u@_@aBe@}Bi@k@QgAQiYyGo\\_I}FoAvAkGb@yAxLgXlE{RzCcOf@iDpBcLXyAjBuLt@eEhK}m@bB{J`Loh@sEeAkDy@[vAe@Pc@@uAU}FoAyEgAk@Qc@[yE_Ci@QaBQ{@EsC?yH^mNf@i@MiAMgAYoAk@w@s@SOoFsGeAc@s@Mq@CcBFuEVqKZcPp@mHd@wAaA",
                "lts_B~fgdHJVLNLDXFPAl@QnBfFiDvAeBh@_BI_HqAo@S]S_@g@Oa@SiBGI[qEEiA{@{My@}LUqDs@_Jm@sKUgCo@oDq@kCiAgD}@{BsAmCk@}@oA}BgAyBWo@uAkCkAsBuHgOeBeDoGiMqCmFmDwHsIuPyA}CmA}CkAeDaAeDwIc\\Qw@m@eDi@eCkHeXgKg`@oFcSw@mEMmAIoB@gAH{Bn@sJtCka@ZyEF_B?oGO{JSgGEuBQwAWqA]}@_AeBeC}Ce@u@i@aAe@kAsEoPwA}Fi@oB_DmJ]aBMeAI{ABuAlEsi@zC{_@bAkKJY^iBLeA|@kKL_CBwAh@oGJa@RYXK\\?VJ^`@P\\DVCZIPSLm@FGBgGk@aDWeZmCgFw@iDu@aBa@oQeEgOqDi`@eJeCg@yG_BeCm@wHmB{IsBoCk@kWcGgLsCgCk@MaABe@V_Bt@qDzDqPzB{KrCoM\\gBpB_Jp[}yAbDkOJm@vAkGb@yAxLgXlE{RzCcOf@iDpBcLXyAjBuLt@eEhK}m@bB{J`Loh@sEeAkDy@[vAe@Pc@@uAU}FoAyEgAk@Qc@[yE_Ci@QaBQ{@EsC?yH^mNf@i@MiAMgAYoAk@w@s@SOoFsGeAc@s@Mq@CcBFuEVqKZcPp@mHd@wAaA",
                "lts_B~fgdHJVLNLDXFPAl@QnBfFiDvAeBh@_BI_HqAo@S]S_@g@Oa@SiBGI[qEEiA{@{My@}LUqDs@_Jm@sKUgCo@oDq@kCiAgD}@{BsAmCk@}@oA}BgAyBWo@uAkCkAsBuHgOeBeDoGiMqCmFmDwHsIuPyA}CmA}CkAeDaAeDwIc\\Qw@m@eDi@eC_BcGA]UqC@QcDgLaCgJB}AJ_A`@aBLkAAiAEg@Su@]m@WWm@Ua@Ao@By@LeAb@sK|D_AX_AHqBDkIDqFFm^VgACsAKwAWuAg@oASii@eP}DqAUQ]KS@IDwDQwAA{FFqACw@Oy@_@i@g@yCsD]q@Ok@M_AXmFFe@`AqLzAgMZ{BxAwIvAcHrBoIjBuGjAyDxD{KbDcIjRib@z@eCv@mCn@gDV_CN}BDuBAyBI}BUgD{@}IqAqJ}@_GmAeHsAqGeB{GeAgD{@gCiAwCaA{BqAqCwE_JyJ_QsB}DwAaDiAyCqB{G[wAY_AgDkMYo@e@q@o@o@u@e@cA_@qJ{BWKkRuEiA_@e@Yo@i@q@_Ac@cAOk@MiBBuANoBlCgLzB{KrCoM\\gBpB_Jp[}yAbDkOJm@vAkGb@yAxLgXlE{RzCcOf@iDpBcLXyAjBuLt@eEhK}m@bB{J`Loh@sEeAkDy@[vAe@Pc@@uAU}FoAyEgAk@Qc@[yE_Ci@QaBQ{@EsC?yH^mNf@i@MiAMgAYoAk@w@s@SOoFsGeAc@s@Mq@CcBFuEVqKZcPp@mHd@wAaA"
            ]

# Current UTC timestamp
now_utc = dt.datetime.now(dt.timezone.utc).replace(microsecond=0)

# Process each polyline and collect metrics
rows = []
for i, enc in enumerate(POLYLINES, 1):
    try:
        # Decode polyline and compute route metrics
        pts = decode_polyline(enc)
        # Compute route ETA
        data = compute_route_eta_fixed(pts, n_vias=N_VIAS)
        # Extract metrics from response
        route = data["routes"][0]
        distance_m = route.get("distanceMeters")
        # Duration parsing
        dur_s = parse_duration_to_seconds(route.get("duration"))
        static_s = parse_duration_to_seconds(route.get("staticDuration"))
        # Append results
        rows.append((now_utc, i, distance_m, dur_s, static_s))
        print(f"[OK] rota {i}: dist={distance_m}m, ETA={dur_s}s, static={static_s}s")
    except Exception as e:
        print(f"[ERRO] rota {i}: {e}")

[OK] rota 1: dist=24156m, ETA=1602s, static=1701s
[OK] rota 2: dist=26255m, ETA=1752s, static=1903s
[OK] rota 3: dist=26788m, ETA=1733s, static=1836s


In [None]:
"""
DISCLAIMER: The following code block is commented out because it relies on PySpark and Delta Lakehouse, which may not be set up in this environment.
Still, it illustrates how to write the collected data into a Delta Lakehouse table.
"""
# --- Delta Schema ---
# schema = T.StructType([
#     T.StructField("ts_utc", T.TimestampType(), False),
#     T.StructField("route_id", T.IntegerType(), False),
#     T.StructField("distance_m", T.IntegerType(), True),
#     T.StructField("duration_sec", T.IntegerType(), True),       # ETA with traffic
#     T.StructField("static_duration_sec", T.IntegerType(), True) # ETA "static" (baseline)
# ])

# --- Create dataframe and write to Delta Lakehouse ---
# spark.sql(f"CREATE SCHEMA IF NOT EXISTS {LAKEHOUSE_DB}")
# (
#     df.write
#       .format("delta")
#       .mode("append")
#       .saveAsTable(f"{LAKEHOUSE_DB}.{TABLE_NAME}")
# )

# --- Display inserted data (latest first) ---
# display(spark.table(f"{LAKEHOUSE_DB}.{TABLE_NAME}").orderBy(F.desc("ts_utc")))


"""
The output should look like this:
ts_utc              | route_id | distance_m | duration_sec | static_duration_sec
--------------------------------------------------------------------------------
2025-09-02 01:52:12 |     1    |   26788    | 1706         | 1838
--------------------------------------------------------------------------------
2025-09-02 01:52:12 |     1    |   24408    | 1595         | 1754
--------------------------------------------------------------------------------
2025-09-02 01:52:12 |     2    |   26255    | 1714         | 1904
"""