In [0]:
from pyspark.sql.functions import regexp_replace, concat, lit

# Read the payload text file into a Spark DataFrame
payload = spark.read.text("/Volumes/swi_audience_prd/swi-posts/payloads/7455.728927.txt")

# Replace all occurrences of "false" with "False" in the 'value' column
payload = payload.withColumn("value", regexp_replace("value", "false", "False"))

# Convert the original payload DataFrame to a pandas DataFrame and then to a dictionary
payload = payload.toPandas().to_dict('list')

In [0]:
import requests
import os
import json
import time  # Add this import

# Zugangsdaten (SECRET_SCOPE = "swi-secret-scope"
# Für Hilfe: Keller, Pascal (SRF) oder hier: https://github.com/mmz-srf/swi-analytics-databricks/blob/main/intelligence.eu.mapp.com_analysis-query_7455/Secret%20management%20in%20databricks.ipynb)
SECRET_SCOPE = "swi-secret-scope"
user   = dbutils.secrets.get(SECRET_SCOPE, "mapp-user")
secret = dbutils.secrets.get(SECRET_SCOPE, "mapp-secret")
try:
    baseurl = dbutils.secrets.get(SECRET_SCOPE, "mapp-baseurl")
except:
    baseurl = "https://intelligence.eu.mapp.com"

token_file = 'mapp_token.json'

# Prüfen, ob bereits ein Token existiert und ob es noch gültig ist
def get_token():
    # Prüfen, ob bereits ein Token existiert und ob es noch gültig ist
    if os.path.exists(token_file):
        with open(token_file, 'r') as f:
            data = json.load(f)
            token = data.get('access_token')
            expires_at = data.get('expires_at')
            if token and expires_at and time.time() < expires_at:
                return token  # ⏳ Noch gültig

    # 🆕 Token holen
    auth_url = f"{baseurl}/analytics/api/oauth/token"
    querystring = {"grant_type": "client_credentials", "scope": "mapp.intelligence-api"}
    response = requests.post(auth_url, auth=(user, secret), params=querystring)
    response.raise_for_status()
    result = response.json()
    token = result['access_token']
    expires_in = result.get('expires_in', 3600)  # meist 3600 Sekunden
    expires_at = time.time() + expires_in - 60   # etwas Puffer

    # Token speichern für später
    with open(token_file, 'w') as f:
        json.dump({'access_token': token, 'expires_at': expires_at}, f)

    return token

# Token abrufen
token = get_token()

In [0]:
import requests
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace

# SparkSession holen
spark = SparkSession.builder.getOrCreate()

# Auth-Header
headers = {
    'Authorization': f'Bearer {token}',
    'Content-Type': 'application/json'
}

# API Query
url = f"{baseurl}/analytics/api/analysis-query"
response = requests.post(url, headers=headers, json=payload)
result = response.json()

resultUrl = result.get("resultUrl")
statusUrl = result.get("statusUrl") if result.get("statusUrl") else None

# ggf. warten
tries = 0
while not resultUrl and tries < 10:
    time.sleep(10)
    status_response = requests.get(statusUrl, headers=headers)
    result = status_response.json()
    resultUrl = result.get("resultUrl")
    tries += 1

if not resultUrl:
    print("❌ Kein Ergebnis nach mehreren Versuchen.")
else:
    result_data = requests.get(resultUrl, headers=headers).json()

    # Spaltennamen und Zeilen extrahieren
    headers_out = [col["name"] for col in result_data["headers"]]
    rows = result_data["rows"]

    # DataFrame erstellen
    df = spark.createDataFrame(rows, headers_out)

   
    
    # 🔻 Letzte Zeile entfernen
    row_count = df.count()
    if row_count > 1:
        df = df.limit(row_count - 1)
    else:
        print("⚠️ Zu wenige Zeilen zum Kürzen.")

    # Ergebnisse anzeigen (optional)
    display(df)