# Shipping Forecast Bot Prototype

Preinstall the required packages

In [3]:
!pip install requests beautifulsoup4


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


## Define the data class for the forecast

In [40]:
from dataclasses import dataclass, asdict
from typing import List

@dataclass
class Forecast:
    source_name: str
    url: str
    publication_time: str
    synoptic_info: str
    warnings: List[dict]
    forecast_details: dict

    def display_info(self):
        return f"Forecast published at {self.publication_time} on {self.source_name}"

    def to_dict(self):
        return asdict(self)

    @classmethod
    def from_dict(cls, data: dict):
        return cls(
            source_name=data["source_name"],
            url=data["url"],
            publication_time=data["publication_time"],
            synoptic_info=data["synoptic_info"],
            warnings=data["warnings"],
            forecast_details=data["forecast_details"]
        )

    def __eq__(self, other):
        if not isinstance(other, Forecast):
            return False
        return self.to_dict() == other.to_dict()


## Define the report generator class

In [23]:
class ReportGenerator:

    def __init__(self, forecast: Forecast):
        self.forecast = forecast

    def generate_report(self, subscribed_areas) -> str:
        report_lines = [
            f"Forecast Publication Time: {self.forecast.publication_time}\n",
            f"General Synoptic Information: {self.forecast.synoptic_info}\n"
        ]

        # Warnings: include warnings only if any of the affected areas contain one of the user's subscribed areas.
        relevant_warnings = []
        for warning in self.forecast.warnings:
            for warning_area in warning.get("areas", []):
                for area in subscribed_areas:
                    if area.lower() in warning_area.lower():
                        relevant_warnings.append(warning)
                        break
                else:
                    continue
                break

        if relevant_warnings:
            report_lines.append("Warnings:")
            for warning in relevant_warnings:
                report_lines.append(f"  Warning Type: {warning.get('warning_type', 'N/A')}")
                report_lines.append("  Affected Areas:")
                for w_area in warning.get("areas", []):
                    report_lines.append(f"    - {w_area}")
                report_lines.append("")  # blank line for readability
        else:
            report_lines.append("No warnings for your subscribed areas.\n")

        # Forecasts for each of the user's areas
        report_lines.append("Forecasts for your subscribed areas:")
        for user_area in subscribed_areas:
            found = False
            for region, forecast in self.forecast.forecast_details.items():
                # Check if the user's area is present in the forecast region name (case-insensitive)
                if user_area.lower() in region.lower():
                    report_lines.append(f"{region}:")
                    report_lines.append(forecast)
                    report_lines.append("")  # add a blank line between regions
                    found = True
            if not found:
                report_lines.append(f"{user_area}: Forecast not found.\n")

        report_lines.append(f"Source: {self.forecast.url}\n")
        return "\n".join(report_lines)

## Define the DWDParser class

In [6]:
from bs4 import BeautifulSoup, NavigableString
import re

class DWDParser:

    def __init__(self, url="https://www.dwd.de/EN/ourservices/seewetternordostseeen/seewetternordostsee.html"):
        # Name of the source
        self.name = "DWD"
        # URL of the web page with forecasts from the German Weather Service
        self.url = url
        # Send a GET request to fetch the page content
        headers = {
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
        }
        self.response = requests.get(url, headers=headers)
        # Check if the request was successful
        if self.response.status_code != 200:
            raise ValueError(f"Failed to download the page. Status code: {self.response.status_code}")

    def get_forecast(self) -> Forecast:
        # Get the HTML content
        html_content = self.response.text
        # Parse the full HTML document
        soup = BeautifulSoup(html_content, "html.parser")
        # Find the <pre> tag which contains the bulletin text
        pre_tag = soup.find("pre")
        if not pre_tag:
            raise ValueError("No <pre> tag found in the HTML.")
        # Get the plain text (for publication time, synoptic info, and warnings)
        pre_text = pre_tag.get_text(separator="\n")
        # Also keep the HTML of the pre tag to leverage the bold (<B>) tags for forecast areas.
        pre_html = str(pre_tag)
        pre_soup = BeautifulSoup(pre_html, "html.parser")
        # --- 1. Extract Publication Time ---
        # Look for a date/time pattern like "10.03.2025, 15.36 UTC"
        pub_time_match = re.search(r"(\d{2}\.\d{2}\.\d{4},\s*\d{2}\.\d{2}\s*UTC)", pre_text)
        publication_time = pub_time_match.group(1) if pub_time_match else "Not found"
        # --- 2. Extract General Synoptic Information ---
        # We look for the line after the bold header "General synoptic situation"
        synoptic_info_lines = []
        lines = pre_text.splitlines()
        synoptic_flag = False
        for line in lines:
            if "general synoptic situation" in line.lower():
                synoptic_flag = True
                continue
            if synoptic_flag:
                # Stop if we hit a blank line or a line that likely begins a new section (e.g. warnings)
                if line.strip().lower().startswith("forecast valid") or line.strip().lower().startswith("until"):
                    break
                synoptic_info_lines.append(line.strip())
        synoptic_info = " ".join(synoptic_info_lines)
        # --- 3. Extract Warnings Information (e.g. gales, strong winds) ---
        # The warnings are given in lines that start with "until ... in the following forecast areas ... are expected:"
        warnings = []
        i = 0
        while i < len(lines):
            line = lines[i].strip()
            # Check for a warning header line using a case-insensitive match
            if line.lower().startswith("until"):
                # Persist the valid period and the warning type
                warning_type = line
                if lines[i+1].strip().lower().endswith("expected:"):
                    i += 1
                    line = lines[i].strip()
                    warning_type += " " + line
                if line.lower().endswith("expected:"):
                    # Collect subsequent lines as warning areas until a blank line or another section starts
                    warning_areas = []
                    i += 1
                    while i < len(lines):
                        next_line = lines[i].strip()
                        if next_line == "" or next_line.lower().startswith("until") or next_line.startswith("<B>"):
                            break
                        warning_areas.append(next_line)
                        i += 1

                    warnings.append({
                        "warning_type": warning_type,
                        "areas": warning_areas
                    })
            else:
                i += 1
        # --- 4. Extract Forecast Details for Each Region ---
        # We only consider forecast areas that are marked with bold (<B>) tags,
        # and skip any sections related to the outlook forecast.
        forecast_header = pre_soup.find(lambda tag: tag.name == "b" and "forecast valid until" in tag.get_text().lower())
        forecast_details = {}
        if forecast_header:
            # Iterate over all <b> tags that come after the forecast header.
            for bold_tag in forecast_header.find_all_next("b"):
                bold_text = bold_tag.get_text(strip=True)
                # Skip any forecast section that is part of the outlook
                if "outlook" in bold_text.lower():
                    break
                # Process only forecast areas: they should end with a colon (e.g., "German Bight:")
                if not bold_text.endswith(":"):
                    continue
                region = bold_text[:-1].strip()  # Remove the trailing colon

                # To avoid duplicates, skip if the region is already present.
                if region in forecast_details:
                    continue

                # Collect all following text (from sibling nodes) until the next bold tag is encountered.
                forecast_info = ""
                for sibling in bold_tag.next_siblings:
                    # Stop at the next bold tag, which indicates the start of the next forecast area.
                    if getattr(sibling, "name", None) == "b":
                        break
                    if isinstance(sibling, NavigableString):
                        forecast_info += sibling.strip() + " "
                    else:
                        forecast_info += sibling.get_text(" ", strip=True) + " "
                forecast_details[region] = forecast_info.strip()
        # return results as a Forecast object
        return Forecast(
            source_name=self.name,
            url=self.url,
            publication_time=publication_time,
            synoptic_info=synoptic_info,
            warnings=warnings,
            forecast_details=forecast_details
        )

## Define the ForecastNotifier class

In [24]:
import requests

class ForecastNotifier:

    def __init__(self, forecast: Forecast):
        self.forecast = forecast

    def notify(self, subscribed_areas, token, chat_id):
        report_generator = ReportGenerator(self.forecast)
        report = report_generator.generate_report(subscribed_areas)
        telegram_send_url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={report}"
        return {"body": requests.get(telegram_send_url).json()}

# Test the pipeline

In [41]:
import os

forecast = DWDParser().get_forecast()
print(forecast)



In [42]:
import json

forecast_json = json.dumps(forecast.to_dict())
forecast_from_json = Forecast.from_dict(json.loads(forecast_json))
forecast == forecast_from_json

True

In [43]:
ForecastNotifier(forecast).notify(["Western Baltic"], os.environ["TOKEN"], os.environ["CHAT_ID"])
print("done")

done
