In [0]:
# =================================================================
# SCRIPT FINAL v2.4 (Instalación a Nivel de Clúster)
# =================================================================
# Versión final y limpia. La librería ya está instalada en el clúster.
# =================================================================

# === SECCIÓN 1: CONFIGURACIÓN E IMPORTACIONES ===
print("Iniciando la SECCIÓN 1: Configuración...")
import pandas as pd; from datetime import date, timedelta; import numpy as np; import time; import google.generativeai as genai
print("Librerías importadas correctamente desde el clúster.")
print("--- Fin de la Sección 1 ---")

# === SECCIÓN 2: LÓGICA DE EXTRACCIÓN Y PROCESAMIENTO ===
print("\nIniciando la SECCIÓN 2: Extracción y Procesamiento...")
# 2.1. Configuración de la API de Gemini
API_KEY_GEMINI = "AIzaSyBvzL07pj7H8n9brjIPLt5tpF3IIKVL-go" # ¡RELLENA ESTO!
genai.configure(api_key=API_KEY_GEMINI)
client = genai.GenerativeModel(model_name="gemini-pro")
print("Cliente de la API de Gemini configurado.")

# 2.2. Definición del rango de fechas para 1 MES
fecha_fin = date.today(); delta_dias_prueba = timedelta(days=30); fecha_inicio = fecha_fin - delta_dias_prueba; delta = timedelta(days=1)
print(f"Rango de fechas escalado establecido: de {fecha_inicio} a {fecha_fin}.")

# 2.3. Función de SIMULACIÓN de precios
DATOS_PRECIOS_HORARIOS = []
def simular_precios_dia(fecha_actual):
    precios_simulados = np.random.uniform(low=0.05, high=0.25, size=24)
    for hora, precio in enumerate(precios_simulados):
        DATOS_PRECIOS_HORARIOS.append({"Fecha": fecha_actual, "Hora": hora, "Precio_kWh": precio})

# 2.4. Función para llamar a la IA
def clasificar_actividad_economica(titulares):
    prompt = f"""Analiza estos titulares... Responde ÚNICAMENTE con un número flotante entre -1.0 y 1.0."""
    try:
        respuesta = client.generate_content(prompt); return float(respuesta.text.strip())
    except Exception as e:
        print(f"  -> Error llamando a la API de Gemini: {e}"); return 0.0

# 2.5. Bucle principal para procesar cada día
print("Iniciando bucle principal para ~31 días...")
fecha_actual = fecha_inicio; df_nlp_scores = []
titulares_ejemplo = "El paro desciende, la inversión extranjera aumenta y el Ibex 35 marca un nuevo máximo anual."
while (fecha_actual <= fecha_fin):
    print(f"Procesando día: {fecha_actual}")
    simular_precios_dia(fecha_actual)
    score_nlp = clasificar_actividad_economica(titulares_ejemplo)
    df_nlp_scores.append({"Fecha": fecha_actual, "Actividad_Economica_NLP": score_nlp})
    fecha_actual += delta; time.sleep(1)
print("Bucle principal completado.")
print("--- Fin de la Sección 2 ---")

# === SECCIÓN 3: CONVERSIÓN Y GUARDADO EN DATABRICKS ===
print("\nIniciando la SECCIÓN 3: Guardado de la Tabla Bronze...")
df_precios = pd.DataFrame(DATOS_PRECIOS_HORARIOS); df_scores = pd.DataFrame(df_nlp_scores)
df_final_bq = pd.merge(df_precios, df_scores, on="Fecha", how="left")
print(f"DataFrames de Pandas creados y unidos. Total de filas: {len(df_final_bq)}")

spark_df_final = spark.createDataFrame(df_final_bq)
db_name = "bronze_data"; table_name = "precios_actividad_crudo_simulado_mensual"
full_table_name = f"hive_metastore.{db_name}.{table_name}"
print(f"Intentando guardar la tabla en: {full_table_name}...")

spark.sql(f"CREATE DATABASE IF NOT EXISTS hive_metastore.{db_name}")
spark_df_final.write.mode("overwrite").saveAsTable(full_table_name)

print("\n¡¡¡PROCESO COMPLETADO CON ÉXITO!!!")
print("Ahora puedes verificarla en el 'Catálogo'.")

Iniciando la SECCIÓN 1: Configuración...
Librerías importadas correctamente desde el clúster.
--- Fin de la Sección 1 ---

Iniciando la SECCIÓN 2: Extracción y Procesamiento...
Cliente de la API de Gemini configurado.
Rango de fechas escalado establecido: de 2025-09-26 a 2025-10-26.
Iniciando bucle principal para ~31 días...
Procesando día: 2025-09-26
  -> Error llamando a la API de Gemini: 404 models/gemini-pro is not found for API version v1beta, or is not supported for generateContent. Call ListModels to see the list of available models and their supported methods.
Procesando día: 2025-09-27
  -> Error llamando a la API de Gemini: 404 models/gemini-pro is not found for API version v1beta, or is not supported for generateContent. Call ListModels to see the list of available models and their supported methods.
Procesando día: 2025-09-28
  -> Error llamando a la API de Gemini: 404 models/gemini-pro is not found for API version v1beta, or is not supported for generateContent. Call ListM

In [0]:
# =================================================================
# Celda Final: Puente de Carga de Databricks a Google BigQuery
# =================================================================

print("Iniciando el proceso de carga a BigQuery...")

# === 1. CONFIGURACIÓN DEL PUENTE ===
# ¡¡RELLENA ESTOS DOS VALORES!!

# La ruta del archivo de credenciales que acabas de subir
keyfile_path = "dbfs:/FileStore/tables/gcp_credentials.json"

# El nombre ÚNICO de tu bucket de Google Cloud Storage
bucket_gcs = "bucket-temporal-julio-20251026" # <-- YA LO TIENES


# --- NO CAMBIES NADA A PARTIR DE AQUÍ ---
project_gcp = "datamanagementbi"
dataset_bq = "bronze_data"
table_bq_name = "precios_actividad_nlp_desde_databricks"

# === 2. LECTURA DE LA TABLA DE ORIGEN ===
source_table_name = "hive_metastore.bronze_data.precios_actividad_crudo_simulado_mensual"
spark_df_to_load = spark.read.table(source_table_name)
print(f"Tabla '{source_table_name}' leída con éxito desde Databricks.")

# === 3. CONFIGURACIÓN DE LA CONEXIÓN ===
spark.conf.set("google.cloud.auth.service.account.json.keyfile", keyfile_path)
print("Autenticación con Google Cloud configurada.")

# Le decimos explícitamente a Spark cuál es el proyecto de facturación y destino.
spark.conf.set("parentProject", project_gcp)

print("Autenticación con Google Cloud configurada.")

# === 4. EJECUCIÓN DE LA CARGA ===
print(f"Iniciando la carga a la tabla de BigQuery: '{project_gcp}.{dataset_bq}.{table_bq_name}'...")

spark_df_to_load.write \
  .format("bigquery") \
  .option("temporaryGcsBucket", bucket_gcs) \
  .option("project", project_gcp) \
  .option("dataset", dataset_bq) \
  .option("table", table_bq_name) \
  .mode("overwrite") \
  .save()

print("\n¡¡¡CARGA A BIGQUERY COMPLETADA CON ÉXITO!!!")
print("Ya puedes verificar la tabla en la consola de Google BigQuery.")

Iniciando el proceso de carga a BigQuery...
Tabla 'hive_metastore.bronze_data.precios_actividad_crudo_simulado_mensual' leída con éxito desde Databricks.
Autenticación con Google Cloud configurada.
Autenticación con Google Cloud configurada.
Iniciando la carga a la tabla de BigQuery: 'datamanagementbi.bronze_data.precios_actividad_nlp_desde_databricks'...


[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-6850710950065916>, line 46[0m
[1;32m     36[0m [38;5;66;03m# === 4. EJECUCIÓN DE LA CARGA ===[39;00m
[1;32m     37[0m [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;124mIniciando la carga a la tabla de BigQuery: [39m[38;5;124m'[39m[38;5;132;01m{[39;00mproject_gcp[38;5;132;01m}[39;00m[38;5;124m.[39m[38;5;132;01m{[39;00mdataset_bq[38;5;132;01m}[39;00m[38;5;124m.[39m[38;5;132;01m{[39;00mtable_bq_name[38;5;132;01m}[39;00m[38;5;124m'[39m[38;5;124m...[39m[38;5;124m"[39m)
[1;32m     39[0m spark_df_to_load[38;5;241m.[39mwrite \
[1;32m     40[0m   [38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mbigquery[39m[38;5;124m"[39m) \
[1;32m     41[0m   [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mtemporaryGcsBucket[39m[38;5;124m"[39m, bucket

In [0]:
# === Celda de Descarga ===

# 1. Lee la tabla de Spark
source_table_name = "hive_metastore.bronze_data.precios_actividad_crudo_simulado_mensual"
spark_df = spark.read.table(source_table_name)

# 2. Conviértela a un DataFrame de Pandas
pandas_df = spark_df.toPandas()

# 3. Guarda el DataFrame de Pandas como un archivo CSV en el sistema de archivos de Databricks
csv_path = "/dbfs/FileStore/precios_nlp_mensual.csv"
pandas_df.to_csv(csv_path, index=False)

print(f"¡Éxito! Tu tabla ha sido guardada como un archivo CSV.")
print(f"Ahora, ve a esta URL en tu navegador para descargar el archivo:")
# Generamos la URL de descarga (¡puede que necesites ajustar el número del workspace si es diferente!)
workspace_url = "https://adb-3261859451723130.10.azuredatabricks.net"
print(f"{workspace_url}/files/precios_nlp_mensual.csv")

¡Éxito! Tu tabla ha sido guardada como un archivo CSV.
Ahora, ve a esta URL en tu navegador para descargar el archivo:
https://adb-3261859451723130.10.azuredatabricks.net/files/precios_nlp_mensual.csv
