In [None]:
import requests
import json
from datetime import datetime
from pyspark.sql import SparkSession

# Initialisation de SparkSession pour activer le support de Hive
spark = SparkSession.builder.appName("DataFetchWithFluxTracking").enableHiveSupport().getOrCreate()

# Vérifie si la table existe dans Hive et la crée si nécessaire
spark.sql("""
CREATE TABLE IF NOT EXISTS c_tech.offset_update (
    date STRING,
    flux STRING,
    last_successful_offset INT,
    success BOOLEAN
)""")

# Fonction pour logger les messages avec un horodatage
def log(message):
    print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {message}")

# Fonction pour obtenir un token d'authentification de l'API
def get_bearer_token(auth_url, client_id, client_secret, audience, scope="read", retries=2):
    while retries > 0:
        try:
            # Préparation de la requête pour l'obtention du token
            payload = {"audience": audience, "grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret, "scope": scope}
            headers = {"Content-Type": "application/json"}
            # Envoi de la requête
            response = requests.post(auth_url, json=payload, headers=headers)
            # Vérification du statut de la réponse
            response.raise_for_status()
            # Retourne le token si la requête est réussie
            return response.json().get('access_token')
        except Exception as e:
            # Log des erreurs en cas de problème lors de l'obtention du token
            log(f"Erreur lors de l'obtention du token: {e}. Tentatives restantes : {retries}")
            retries -= 1
    log("Échec de l'obtention d'un nouveau token après plusieurs tentatives.")
    return None

# Fonction pour extraire le flux (nom de la ressource) à partir de l'URL de l'API
def extract_flux_from_url(url):
    segments = url.strip('/').split('/')
    return segments[4] if len(segments) >= 4 else None

# Fonction pour obtenir le dernier offset et date d'échec enregistré dans Hive
def get_last_failure_info(flux):
    """Fonction pour obtenir les informations sur le dernier échec enregistré dans Hive."""
    try:
        query_result = spark.sql(f"""
            SELECT date, last_successful_offset
            FROM c_tech.offset_update
            WHERE flux = '{flux}' AND success = FALSE
            ORDER BY date ASC, last_successful_offset ASC
            LIMIT 1
        """).collect()
        if query_result:
            return query_result[0]['date'], query_result[0]['last_successful_offset']
        return None, 0
    except Exception as e:
        log(f"Erreur lors de la récupération des informations sur le dernier échec : {e}")
        return None, 0

# Fonction pour mettre à jour le suivi des offsets dans Hive
def update_offset_tracking(date_str, flux, offset, success):
    try:
        # Insère l'offset actuel dans la table si l'opération est un échec
        if not success:
            spark.sql(f"INSERT INTO TABLE c_tech.offset_update VALUES ('{date_str}', '{flux}', {offset}, {success})")
    except Exception as e:
        log(f"Erreur lors de la mise à jour du suivi des offsets : {e}")

# Fonction principale pour récupérer les données de l'API et les sauvegarder localement
def fetch_data_api(base_url, save_directory, auth_url, client_id, client_secret, audience, user_defined_limit, scope="read"):
    flux = extract_flux_from_url(base_url)
    if not flux:
        log("Impossible de déterminer le flux à partir de l'URL.")
        return

    date_str, last_failure_offset = get_last_failure_info(flux)
    if not date_str:
        date_str = datetime.now().strftime("%Y%m%d")

    output_filename = flux  # Utilise le flux comme nom pour le fichier de sortie
    token = get_bearer_token(auth_url, client_id, client_secret, audience, scope)
    if not token:
        return  # Quitte si l'obtention du token échoue

    offset = last_failure_offset if last_failure_offset is not None else 0
    file_number = offset // user_defined_limit if offset else 0

    while True:
        try:
            headers = {"Authorization": f"Bearer {token}"}
            response = requests.get(base_url, headers=headers, params={"limit": user_defined_limit, "offset": offset})
            if response.status_code == 401:  # Token expiré
                log("Le token a expiré, obtention d'un nouveau...")
                token = get_bearer_token(auth_url, client_id, client_secret, audience, scope)
                if not token:
                    break  # Quitte si un nouveau token ne peut pas être obtenu
                continue  # Réessaye avec le nouveau token

            if response.status_code == 200:
                data = response.json()
                total_count = data.get("meta", {}).get("total_count", 0)
                limit = data.get("meta", {}).get("limit", user_defined_limit)

                # Arrêter le script si total_count est 0 dès le début
                if total_count == 0 and offset == 0:
                    log("Aucune donnée à récupérer.")
                    break

                # Création du nom du fichier et sauvegarde des données
                formatted_filename = f"{output_filename}_{date_str}_{file_number}.json"
                file_path = f"{save_directory}/{formatted_filename}"
                with open(file_path, "w") as file:
                    json.dump(data, file)
                log(f"Données sauvegardées dans {file_path}")

                # Mise à jour des variables pour la prochaine itération
                offset += limit
                file_number += 1
                if offset > total_count:
                    log("Toutes les données ont été récupérées avec succès.")
                    if last_failure_offset is not None:
                       update_offset_tracking(date_str, flux, last_failure_offset, True)

                    break
            else:
                log(f"Erreur avec le code de réponse {response.status_code}")
                update_offset_tracking(date_str, flux, offset, False)
                break
        except Exception as e:
            log(f"Erreur lors de la récupération des données : {e}")
            update_offset_tracking(date_str, flux, offset, False)
            break