# Reading data

In this exercise we will cover how to use polars to read data from external data sources.

There are two primary data sources we will use:

1. Ferry data: https://wsdot.wa.gov/
2. Weather data: https://open-meteo.com/

Specifically, there are 4 data sets we will focus on:

Vessel History: <https://www.wsdot.wa.gov/ferries/api/vessels/rest/help/operations/GetAllVessels>

```
https://www.wsdot.wa.gov/Ferries/API/Vessels/rest/vesselhistory?apiaccesscode={APIACCESSCODE}
```

Terminal Locations: <https://www.wsdot.wa.gov/Ferries/API/terminals/rest/help/operations/GetAllTerminalLocations>

```
https://www.wsdot.wa.gov/Ferries/API/Terminals/rest/terminallocations?apiaccesscode={APIACCESSCODE}
```

Vessel Verbose: <https://www.wsdot.wa.gov/ferries/api/vessels/rest/help/operations/GetAllVesselVerboseDetails>

```
https://www.wsdot.wa.gov/Ferries/API/Vessels/rest/vesselverbose?apiaccesscode={APIACCESSCODE}
```

Weather data: <https://open-meteo.com/en/docs/historical-weather-api#start_date=2022-12-01&end_date=2022-12-31&hourly=temperature_2m,precipitation,weather_code,wind_speed_10m,wind_direction_10m,wind_gusts_10m&timezone=America%2FLos_Angeles>

```
https://archive-api.open-meteo.com/v1/archive?latitude=47.623651&longitude=122.360291&start_date=2022-12-01&end_date=2022-12-31&hourly=temperature_2m,precipitation,weather_code,wind_speed_10m,wind_direction_10m,wind_gusts_10m&timezone=America%2FLos_Angeles
```

## Task 1 - read data

### 🔄 Task

- Download the **Vessel Verbose** data
- Convert the data into a polars dataframe

### 🧑‍💻 Code

The State of Washington data portal uses makes data available over an API. The API has lots of features, you can read more about how to use it here: <https://wsdot.wa.gov/traffic/api/>.

To download the data, many persons first instinct is to download via:

- Clicking through your web browser.
- Via the curl command in the terminal.

```bash
WSDOT_ACCESS_CODE='xxxx-xxxx-xxxx-xxxx-xxxx'
curl "https://www.wsdot.wa.gov/Ferries/API/Vessels/rest/vesselverbose?apiaccesscode=${WSDOT_ACCESS_CODE}"
```

There is a better way though! Using httpx we can download the data as JSON and then convert it into a Python dictionary. Then we use polars to create a DataFrame directly from the dictionary. First, lets download the data using httpx.

In [None]:
import os
from pathlib import Path

import httpx
from dotenv import load_dotenv

In [None]:
base_url = "https://www.wsdot.wa.gov/Ferries/API/Vessels/rest"
path = "vesselverbose"

In [None]:
# Get the API key from an environment variable.
if Path(".env").exists():
    load_dotenv()

ws_dot_access_code = os.environ["WSDOT_ACCESS_CODE"]

In [None]:
# Define our params in a dictionary.
params = {"apiaccesscode": ws_dot_access_code}

with httpx.Client(base_url=base_url, params=params) as client:
    response = client.get(path)

response

The `Response` object from httpx has several methods and attributes we can use to get more info about the request, and the response.

In [None]:
# The URL that was used to make the request.
response.url

In [None]:
# The status of the response
response.status_code

In [None]:
# Convert the response from JSON to a dictionary.
response.json()

In [None]:
# Check how many records are in the response.
len(response.json())

In [None]:
# Use the pprint function from rich for nicer formatting of the dictionary data.
from rich.pretty import pprint

In [None]:
pprint(response.json()[0])

Lastly, we can use polars to convert the dictionary into a DataFrame.


In [None]:
import polars as pl

In [None]:
vessel_verbose_raw = pl.DataFrame(response.json())
vessel_verbose_raw

## Task 2 - write data to database

### 🔄 Task

- Save `vessel_verbose_raw` to our database.
- Ideally we want to do most of our data tidying in "Step 2", but this dataset has a struct that won't save to the database. So we will need to do some tidying at this phase.
- This way, we do not need to hit the API every time we need to interact with the raw data.

### 🧑‍💻 Code

The column `Class` is a struct. Each row contains a dictionary object of key value pairs.

In [None]:
vessel_verbose_raw.get_column("Class")

In [None]:
vessel_verbose_raw.get_column("Class").to_list()[0]

This data would be easier to work with if it was in a tabular format, and not a nested dictionary. To do this, unnest the `Class` struct so that each data point is in its own column.

In [None]:
vessel_verbose_raw = vessel_verbose_raw.unnest("Class")

In [None]:
vessel_verbose_raw.head(2)

VesselDrawingImg only as null values, so we should drop it.

In [None]:
vessel_verbose_raw.get_column("VesselDrawingImg").value_counts()

In [None]:
vessel_verbose_raw = vessel_verbose_raw.drop("VesselDrawingImg")

Now we can write the data to the database.

In [None]:
uri = os.environ["DATABASE_URI_PYTHON"]

In [None]:
# Write to the database
vessel_verbose_raw.write_database(
    table_name="vessel_verbose_raw",
    connection=uri,
    engine="adbc",
    if_table_exists='replace'
)

To reuse this data in future code we can use `pl.read_database_uri`.

In [None]:
# Test that you can read the data
pl.read_database_uri(
    query="SELECT * FROM vessel_verbose_raw LIMIT 5;",
    uri=uri,
    engine="adbc"
)

## Task 3 - Get Other Data Sets

### 🔄 Task

Get the following additional data sets:

- **Vessel History**: the `https://www.wsdot.wa.gov/Ferries/API/Vessels/rest/vesselhistory` endpoint contains historical data about sailings.
- **Terminal locations**: the `https://www.wsdot.wa.gov/Ferries/API/terminals/rest/terminallocations` endpoint contains information about ferry terminals locations.
- **Weather data**:

### 🧑‍💻 Code

#### Vessel History

In [None]:
# Get all of the vessel names
base_url = "https://www.wsdot.wa.gov/Ferries/API/Vessels/rest"
params = {"apiaccesscode": os.environ["WSDOT_ACCESS_CODE"]}

with httpx.Client(base_url=base_url, params=params) as client:
    response = client.get("/vesselverbose")

vessel_names = [i["VesselName"] for i in response.json()]
vessel_names

In [None]:
# For each vessel, get all of the history from the desired date range. Define
# the start date and end date.
import datetime

In [None]:
# To speed things up, I recommend using a more recent date, for example
# try using datetime.date(2024, 3, 1)
start_date = datetime.date(2020, 1, 1)
start_date

In [None]:
# Subtract 1 week from today, the Weather API has a 5 day delay.
end_date = datetime.date.today() - datetime.timedelta(weeks=1)
end_date

The vessel history data set is much larger. Instead of httpx, we will use hishel, which has built in easy caching. This is really useful when you are developing, and will prevent you from hitting the API too many times.

In [None]:
import hishel

controller = hishel.Controller(allow_heuristics=True)
cache_transport = hishel.CacheTransport(
    transport=httpx.HTTPTransport(), controller=controller
)

In [None]:
%%time
# Get the vessel history for each vessel.
vessel_history_json = []

for vessel_name in vessel_names:
    print(f"Getting vessel history for {vessel_name}...")
    with httpx.Client(
        base_url=base_url, params=params, transport=cache_transport
    ) as client:
        response = client.get(
            f"/vesselhistory/{vessel_name}/{start_date}/{end_date}", timeout=30
        )
        print(f"\t{len(response.json()):,} records retrieved for {vessel_name}.")
        print(f"\tCache used: {response.extensions['from_cache']}")

    vessel_history_json += response.json()

In [None]:
# Check how many records were returned.
f"{len(vessel_history_json):,}"

In [None]:
# Preview the first two records.
vessel_history_json[0:2]

In [None]:
# Convert the data from JSON to a polars DataFrame
vessel_history_raw = pl.DataFrame(vessel_history_json)
vessel_history_raw

In [None]:
# Write to the database
vessel_history_raw.write_database(
    table_name="vessel_history_raw",
    connection=uri,
    engine="adbc",
    if_table_exists='replace'
)

#### Terminal Locations

In [None]:
# Get all of the terminal location data
base_url = "https://www.wsdot.wa.gov/Ferries/API/terminals/rest"
params = {"apiaccesscode": os.environ["WSDOT_ACCESS_CODE"]}

with httpx.Client(base_url=base_url, params=params) as client:
    response = client.get("/terminallocations")

In [None]:
# Check how many records were returned.
f"{len(response.json()):,}"

In [None]:
# Preview the first two records.
response.json()[0:2]

In [None]:
# List all of the terminal names
{terminal["TerminalName"]: terminal["TerminalAbbrev"] for terminal in response.json()}

In [None]:
terminal_locations_raw = pl.DataFrame(response.json())
terminal_locations_raw

Before saving to the database drop the DispGISZoomLoc which we will not need and is not in a format supported by the database.

In [None]:
# Write to the database
terminal_locations_raw.drop("DispGISZoomLoc").write_database(
    table_name="terminal_locations_raw",
    connection=uri,
    engine="adbc",
    if_table_exists='replace'
)

#### Terminal Weather

Get the weather data from <https://open-meteo.com/en/docs>. Here is an example URL:

`https://api.open-meteo.com/v1/forecast?latitude=52.52&longitude=13.41&hourly=temperature_2m,precipitation,cloud_cover,visibility,wind_speed_10m`

In [None]:
# Get changes of of date ranges, starting from start_date.
_start_date = start_date
_end_date = _start_date + datetime.timedelta(weeks=4)
date_ranges = [(start_date, _end_date)]

while True:
    _start_date = _end_date + datetime.timedelta(days=1)
    _end_date = min(_start_date + datetime.timedelta(weeks=4), end_date)
    date_ranges.append((_start_date, _end_date))

    if _end_date == end_date:
        break

date_ranges

In [None]:
%%time

import time
from typing import TypedDict

base_url = "https://archive-api.open-meteo.com/v1/"


class WeatherParams(TypedDict):
    hourly: list[str]
    latitude: float
    longitude: float
    start_date: datetime.date
    end_date: datetime.date


json_data = []

with httpx.Client(base_url=base_url, transport=cache_transport) as client:
    for i in (
        terminal_locations_raw.select("Latitude", "Longitude", "TerminalName")
        .to_pandas()
        .to_dict(orient="records")
    ):
        for date_range in date_ranges:
            params: WeatherParams = {
                "hourly": [
                    "weather_code",
                    "temperature_2m",
                    "precipitation",
                    "cloud_cover",
                    "wind_speed_10m",
                    "wind_direction_10m",
                    "wind_gusts_10m",
                ],
                "start_date": date_range[0],
                "end_date": date_range[1],
                "latitude": round(i["Latitude"], 2),
                "longitude": round(i["Longitude"], 2),
            }

            print(
                f'Getting records for: {i["TerminalName"]} <> {params["latitude"]}, {params["longitude"]} <> {params["start_date"]} to {params["end_date"]}...'
            )

            response = client.get("/archive", params=params)

            try:
                print(f"\t{response}")
                print(f"\tFrom cache: {response.extensions['from_cache']}")
                response.raise_for_status()
                _json_data = response.json()
                _json_data["terminal_name"] = i["TerminalName"]
                json_data.append(_json_data)

            except httpx.HTTPStatusError as exc:
                if response.status_code == 429:
                    print("\tRate limit exceeded. Waiting 60 seconds...")
                    time.sleep(60)
                    response = client.get("/forecast", params=params)
                    print(f"\t{response}")
                    print(f"\tFrom cache: {response.extensions['from_cache']}")
                    response.raise_for_status()
                    _json_data = response.json()
                    _json_data["terminal_name"] = i["TerminalName"]
                    json_data.append(_json_data)
                else:
                    raise exc

In [None]:
terminal_weather = (
    pl.DataFrame(json_data)
    .unnest("hourly")
    .explode(
        "time",
        "weather_code",
        "temperature_2m",
        "precipitation",
        "cloud_cover",
        "wind_speed_10m",
        "wind_direction_10m",
        "wind_gusts_10m",
    )
)

terminal_weather

Drop the hourly_units field, they may not write to the database correctly and we do not need it.

In [None]:
terminal_weather = terminal_weather.select(
    pl.col("*").exclude("hourly_units")
)

terminal_weather.head()

In [None]:
# Write to the database
terminal_weather.write_database(
    table_name="terminal_weather_raw",
    connection=uri,
    engine="adbc",
    if_table_exists='replace'
)

## Task 4 - Publish the solution notebook to Connect

### 🔄 Task

- Publish the solution notebook to Posit Connect.
- Share the notebook with the rest of the workshop.
- Schedule the notebook to run once every week.

### 🧑‍💻 Code

Run the following to deploy the notebook to Connect:

```bash
# Check that you have the required environment variables set
echo $DATABASE_URI_PYTHON
echo $WSDOT_ACCESS_CODE

# Publish the notebook
rsconnect deploy notebook --title "Seattle Ferries #1 - Raw data" -E DATABASE_URI_PYTHON -E WSDOT_ACCESS_CODE notebook.ipynb
```

After the deployment is successful:

- Share the notebook with the person beside you.
- Schedule the notebook to run once every week.

In [None]:
print("Notebook complete ✅")