%md
# 01-raw: Auslesen der Volltexte aus Talkwalker

* Um was handelt es sich hier  (Kurzbeschreibung Inhalt):  
Anbinden der PMG API um Print und Online Medien Daten in 01_Raw abzuspeichern.


---
## QUELLEN:  
- PMG API

## ZIEL  
- Unity-Catalog: 
  - datif_pz_uk_dev.01-toRaw.pmg

  
---
* Versionen (aktuelle immer oben):
- 06.08.2025 Max Mustermann: Init

In [0]:
%run ../../common/nb_init

In [0]:
import requests
import pandas as pd
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, DoubleType,
    BooleanType, TimestampType, MapType, ArrayType, DateType  
)
import datetime
import numpy as np
from datetime import datetime
import zoneinfo 
import pytz
import xml.etree.ElementTree as ET
from azure.storage.filedatalake import DataLakeServiceClient
import json
from azure.storage.blob import BlobServiceClient

In [0]:
pz_storage_name = get_secret("storage-datalake-name")
pz_storage_key = get_secret("storage-access-key")
connection_string = f'DefaultEndpointsProtocol=https;AccountName={pz_storage_name};AccountKey={pz_storage_key};EndpointSuffix=core.windows.net'
json_base_path = f"abfss://01-raw@{pz_storage_name}.dfs.core.windows.net/pmg/json/"

In [0]:
secret= get_secret("pmg-api")


## Funktion um die APIs abzurufen

In [0]:

def get(URL: str,) -> dict:
    """
    Führt eine GET-Anfrage an die angegebene URL mit dem API-Key im Header 'auth' aus.

    Input:
    - api_key: Der API-Schlüssel für den Zugriff

    Output:
    - JSON-Antwort als dict
    """
    headers = {
        "Content-Type": "application/json",
        "auth": secret
    }

    response = requests.get(URL, headers=headers)
    if response.status_code == 200:
        print(" API funktioniert!")
        return response.json()
    else:
        print("Fehler beim API-Aufruf:", response.status_code)
        print(response.text)
        return {}

In [0]:
import xml.etree.ElementTree as ET

def get_xml(url: str) -> ET.Element:
    """
    Funktion um den Content als xml Datei zu laden.

    Input
    -url: URL der XML-Datei

    Output:
    - Element-Tree-Objekt der XML-Datei
    """
    
    headers={"auth": secret}
    response = requests.get(url, headers=headers)
    
    if response.status_code == 200:
        #print("XML erfolgreich geladen.")
        try:
            root = ET.fromstring(response.content)
            return root
        except ET.ParseError as e:
            print("Fehler beim XML-Parsen:", e)
    else:
        print("Fehler beim API-Aufruf:", response.status_code)
        print(response.text)
    
    return None


## Name der gespeicherten Suche

In [0]:
def get_first_feed_url(stored_search_url: str = "https://api-mediahub.presse-monitor.de/neo/api/v1/storedsearch") -> dict:
    """
    Fetches the feed URL from the stored search endpoint for every search as dict.

    This function calls the given stored search API endpoint, retrieves the JSON data,
    and extracts the `href` value of the first `_links` entry from the first `items` element.

    Args:
        stored_search_url (str): The API endpoint URL for stored searches.

    Returns:
        str: dict with the feed URL for every search

    """
    dict = {}
    data = get(stored_search_url)
    for i in data["items"]:
        dict[i["name"]] = i["_links"][0]["href"]
    return dict


In [0]:
pd.set_option("display.max_columns", None)
pd.set_option('display.max_colwidth', None)

## Daten aus der Trefferliste

In [0]:
def get_data(feed_url: str) -> pd.DataFrame:
    """
    Führt eine GET-Anfrage an die angegebene URL mit dem API-Key im Header 'auth' aus.

    Input:
    - feed_url: Url Endpoint für den Zugriff

    Output:
    - pd.DataFrame mit der Trefferliste
    """

    # try:
    #     data = pd.DataFrame(get(feed_url))
    #     return data
    
    # except ValueError as ve:
    #             if "If using all scalar values, you must pass an index" in str(ve):
    #                 print(f"URL gibt keine neuen Daten zurück: {ve}")
    #                 return None  # Kein Fallback
    return pd.DataFrame(get(feed_url))

### Inhalt aus Index als seperate Spalten

In [0]:
def get_index_data(data: pd.DataFrame) -> pd.DataFrame:
  """
  Fügt das Dict aus der Spalte Items als seperate Spalten hinzu

  Input:
  - data: DataFrame mit der Trefferliste

  Output:
  - pd.DataFrame mit zusätzlich den Items als Spalten
  """
    
  items = data["items"].apply(pd.Series)
  items = items.add_prefix("items_")

  return pd.concat([pd.DataFrame(data).drop("items", axis=1), items], axis=1)

### Inhalt aus Attachment als seperate Spalten

In [0]:
def get_attachment_data(data: pd.DataFrame) -> pd.DataFrame:
    """
    Fügt das Dict aus der Spalte Attachment als seperate Spalten hinzu

    Input:
    - data: DataFrame mit der Trefferliste

    Output:
    - pd.DataFrame mit zusätzlich den Items als Spalten
    """

    attachments_df = data["items_attachments"].apply(
        lambda x: x[0] if isinstance(x, list) and x else {}
    ).apply(pd.Series)
    attachments_df = attachments_df.add_prefix("attachment_")

    df_with_items_attachments = pd.concat([data.drop("items_attachments", axis=1), attachments_df], axis=1)

    return pd.concat([data.drop("items_attachments", axis=1), attachments_df], axis=1)

### Timestamp hinzufügen

In [0]:
def add_timestamp(df: pd.DataFrame) -> pd.DataFrame:
    """
    Fügt dem DataFrame eine Spalte mit dem Timestamp hinzu

    Input:
    - df: DataFrame mit der Trefferliste

    Output:
    - pd.DataFrame mit zusätzlich der Spalte timestamp
    """
    berlin_tz = pytz.timezone("Europe/Berlin")
    df["timestamp"] = datetime.now(berlin_tz).strftime("%Y-%m-%d %H:%M:%S %Z")
    return df

### Content als serperate Spalte

In [0]:
def fetch_content_xml(df: pd.DataFrame) -> pd.Series:
    """Fetches XML content from a list of content URLs in a DataFrame and returns it as a Series.

    This function iterates over each URL in the specified DataFrame column, 
    retrieves the XML from the API via `get_xml()`, converts it to a Unicode string, 
    and stores it in a Pandas Series. The Series retains the original index of the input DataFrame.

    Args:
        df (pd.DataFrame): Input DataFrame containing the URLs.

    Returns:
        pd.Series: A Pandas Series named `content_xml` with the same index as `df`, 
                   where each element is the XML content as a string.
    """
    content = []
    for url in df["items_url"]:
        # Get XML Element from API
        xml_element = get_xml(url)
        # Convert XML Element to string
        xml_str = ET.tostring(xml_element, encoding="unicode")
        if isinstance(xml_str, (list, tuple)):
            xml_str = xml_str[0] if xml_str else None
        content.append(xml_str)
    df_content = pd.Series(content, name="content_xml", index=df.index)

    return pd.concat([df_with_items_attachments, df_content], axis=1)


In [0]:
def expand_xml_column(df: pd.DataFrame, xml_column: str = "content_xml", prefix: str = "content_") -> pd.DataFrame:
    """
    Expands an XML string column in a DataFrame into separate columns for each unique tag path,
    including nested elements, attributes, and repeated tags (with index suffix).

    Args:
        df (pd.DataFrame): Input DataFrame containing the XML string column.
        xml_column (str): Name of the column containing XML strings.
        prefix (str): Prefix to prepend to all generated column names.

    Returns:
        pd.DataFrame: DataFrame with additional columns for each flattened XML field.
    """

    def flatten_xml(elem, parent_path=""):
        """
        Recursively flattens an XML element into a dictionary of key-value pairs with full paths.
        Repeated tags get a numeric suffix (.0, .1, ...).
        Attributes are stored as path@attr=value.
        """
        data = {}
        path = f"{parent_path}.{elem.tag}" if parent_path else elem.tag

        # Add attributes as separate keys
        for attr, val in elem.attrib.items():
            data[f"{path}@{attr}"] = val

        # Add element text, if meaningful
        text = elem.text.strip() if elem.text and elem.text.strip() else None
        if text:
            data[path] = text

        # Count child tags to track duplicates
        tag_counts = {}
        for child in elem:
            tag_counts[child.tag] = tag_counts.get(child.tag, 0) + 1

        # Process child elements
        child_index = {}
        for child in elem:
            tag = child.tag
            count = tag_counts[tag]

            if count > 1:
                idx = child_index.get(tag, 0)
                child_path = f"{path}.{tag}.{idx}"
                child_index[tag] = idx + 1
            else:
                child_path = f"{path}.{tag}"

            data.update(flatten_xml(child, parent_path=path))

        return data

    records = []
    for xml_str in df[xml_column]:
        try:
            root = ET.fromstring(xml_str)
            flattened = flatten_xml(root)
        except ET.ParseError:
            flattened = {}
        records.append(flattened)

    df_flat = pd.DataFrame(records, index=df.index).add_prefix(prefix)
    return pd.concat([df, df_flat], axis=1)

### Speicher als .json

In [0]:
def upload_df_ndjson_to_blob(
    df: pd.DataFrame,
    connection_string: str,
    container: str,
    blob_path: str,
) -> None:
    """Uploads a pandas DataFrame to Azure Blob Storage as newline-delimited JSON (NDJSON).

    Args:
        df (pd.DataFrame): DataFrame to upload.
        connection_string (str): Azure Storage connection string.
        container (str): Container name, e.g. "01-raw".
        blob_path (str): Blob path within the container, e.g. "mediahub/df_master.json".

    Returns:
        None
    """
    # Pandas -> NDJSON
    # Orient=records erzeugt eine Liste von Dicts; wir packen pro Zeile ein JSON-Objekt.
    records = df.to_dict(orient="records")
    json_lines = "\n".join(json.dumps(r, ensure_ascii=False, default=str) for r in records)

    # Upload
    bsc = BlobServiceClient.from_connection_string(connection_string)
    blob_client = bsc.get_blob_client(container=container, blob=blob_path)
    blob_client.upload_blob(json_lines, overwrite=True)


### String pfadsicher

In [0]:
def sanitize_for_path(s: str) -> str:
    """Macht einen String pfadsicher (Azure/Unix): nur a-zA-Z0-9._-/ ersatzweise '_'."""
    return re.sub(r"[^a-zA-Z0-9._\-]", "_", s.strip())

### Alle Seiten abrufen

In [0]:
def get_all_data_as_df(start_df: pd.DataFrame) -> pd.DataFrame:
    """
    Holt alle Seiten über `next_url` und gibt einen zusammengefügten DataFrame zurück.

    Args:
        start_df (pd.DataFrame): DataFrame mit den Inhalten der ersten Seite (inkl. 'next_url').

    Returns:
        pd.DataFrame: Zusammengefügter DataFrame aller Seiteninhalte.
    """
    all_dfs = [start_df]
    
    next_url = start_df.loc[0, "next_url"] if "next_url" in start_df.columns else None

    while next_url:
        print(f"Fetching next page: {next_url}")
        try:
            next_data = get_data(next_url)
            all_dfs.append(next_data)
            next_url = next_data.loc[0, "next_url"] if "next_url" in next_data.columns else None
        except Exception as e:
            print(f"Fehler beim Laden von next_url ({next_url}): {e}")
            break
    if len(all_dfs) > 1:
        print(f"Found {len(all_dfs)} pages.")
    else:
        print(f"Found only one page.")
    # combined_df = pd.concat(all_dfs, ignore_index=True)
    # combined_df = combined_df.reset_index(drop=True)
    return pd.concat(all_dfs, ignore_index=True).reset_index(drop=True)


### Start Url bestimmen (feed oder _new url)

In [0]:
def get_latest_data_from_feed(name: str, feed_url: str, json_base_path: str) -> dict | None:
    """
    Gibt API-Daten zurück – bevorzugt _new_url, falls vorhanden.
    Wenn _new_url leer ist (spezifischer ValueError), wird None zurückgegeben.
    Bei anderen Fehlern wird auf feed_url zurückgegriffen.

    Args:
        name (str): Name der Suche.
        feed_url (str): Ursprüngliche Feed-URL.
        json_base_path (str): Pfad zum JSON-Basisordner.

    Returns:
        dict | None: API-Daten von _new_url oder feed_url, oder None wenn _new_url leer war.
    """
    print(f"Suche für: {name}")

    try:
        latest_df = load_json(f"{json_base_path}{name}/", newest_first=True)[0]
        new_url = latest_df.loc[0, "_new_url"]

        if pd.notna(new_url) and new_url != "":
            print(f"🔗 Versuche _new_url: {new_url}")
            try:
                data = get_data(new_url)
                # Provoziert ValueError bei leeren Daten
                print("_new_url erfolgreich verwendet")
                return data

            except ValueError as ve:
                if "If using all scalar values, you must pass an index" in str(ve):
                    print(f"_new_url gibt keine neuen Daten zurück: {ve}")
                    return None  # Kein Fallback
                else:
                    print(f"Unerwarteter ValueError bei _new_url: {ve}")
                    print("Fallback auf feed_url")
                    return get_data(feed_url)

            except Exception as e:
                print(f"Fehler bei _new_url: {e}")
                print("Fallback auf feed_url")
                return get_data(feed_url)

        else:
            print("Keine _new_url vorhanden, verwende feed_url")
            return get_data(feed_url)

    except Exception as e:
        print(f"Fehler beim Laden der gespeicherten _new_url: {e}")
        print("Fallback auf feed_url")
        try:
                data = get_data(feed_url)
                # Provoziert ValueError bei leeren Daten
                print("_new_url erfolgreich verwendet")
                return data

        except ValueError as ve:
                if "If using all scalar values, you must pass an index" in str(ve):
                    print(f"feed_url gibt keine neuen Daten zurück: {ve}")
                    return None  # Kein Fallback


### json laden

In [0]:
def load_json(base_path: str, recursive: bool = True, newest_first: bool = False) -> list[pd.DataFrame]:
    """Load all JSON (NDJSON) files from an abfss path into a list of pandas DataFrames,
    sorted by file modification time.

    Args:
        base_path (str): abfss folder path ending with '/', e.g. ".../pmg/json/".
        recursive (bool, optional): If True, also load JSONs from direct subfolders. Defaults to True.
        newest_first (bool, optional): If True, return newest → oldest. Otherwise oldest → newest. Defaults to False.

    Returns:
        list[pd.DataFrame]: List of DataFrames, sorted by file time as specified.
    """
    def _ls(path: str):
        try:
            return dbutils.fs.ls(path)
        except Exception:
            return []

    # 1) Dateien einsammeln
    entries = []
    for e in _ls(base_path):
        if e.name.endswith(".json"):
            entries.append(e)
        elif recursive and e.path.endswith("/"):
            for f in _ls(e.path):
                if f.name.endswith(".json"):
                    entries.append(f)

    if not entries:
        print("Keine JSON-Dateien gefunden.")
        return []

    # 2) Sortieren nach Änderungszeit (Fallback: Pfadname)
    # Databricks FileInfo hat i.d.R. 'modificationTime'
    def sort_key(fi):
        mt = getattr(fi, "modificationTime", None)
        return (mt if mt is not None else 0, fi.path)

    entries.sort(key=sort_key)  # älteste → neueste
    if newest_first:
        entries.reverse()

    # 3) Einlesen
    df_list = []
    for fi in entries:
        sdf = spark.read.json(fi.path)
        df_list.append(sdf.toPandas())

    print(f"Es wurden {len(df_list)} JSON-Dateien eingelesen. Reihenfolge: "
          f"{'neueste → älteste' if newest_first else 'älteste → neueste'}.")

    return df_list

## Ausführung

In [0]:
# get feed_url from the search
feed_url_dict = get_first_feed_url()

for name, url in feed_url_dict.items():
    print(f"Suche {name} mit URL: {url}")
    base_path = f"abfss://01-raw@{pz_storage_name}.dfs.core.windows.net/pmg/json/{name}"
    # get data from start url
    data = get_latest_data_from_feed(name, url, json_base_path)
    # Check ob es keine neuen Daten gibt
    if data is None:
        print(f"Keine neuen Daten für {name}, überspringe.")
        continue
    
    # get next_url data if present
    data_next_url = get_all_data_as_df(data)
    # get items as separate columns
    df_with_items = get_index_data(data_next_url)
    # get attachments as separate columns
    df_with_items_attachments = get_attachment_data(df_with_items)
    # add timestamp
    df_timestamp = add_timestamp(df_with_items_attachments)
    # get context as separate columns
    df_content = fetch_content_xml(df_timestamp)
    df_master = expand_xml_column(df_content)

    file_name = sanitize_for_path(f"abzug_{df_master.loc[0, 'timestamp']}.json")
    # save as json
    upload_df_ndjson_to_blob(
        df=df_master,
        connection_string=connection_string,  # better via Databricks Secrets
        container="01-raw",
        blob_path=f"pmg/json/{name}/{file_name}"
    )