In [1]:
import pandas as pd
import requests
from datetime import datetime


### Conexión a BigQuery

In [2]:
# Cargar el archivo CSV
credentials_path = 'src/connections/protean-fabric-386717-d6a21dd66382.json'
credentials_dir = open(credentials_path, 'rb')

In [3]:
# bigquery_connection.py
from google.cloud import bigquery
from google.oauth2 import service_account
import os

def connect_to_bigquery():

    # Load BigQuery credentials from the secret
    #credentials_json = os.environ.get('GOOGLE_APPLICATION_CREDENTIALS')

    # Load BigQuery credentials from the service_account.json file
    #credentials = service_account.Credentials.from_service_account_info(credentials_json)

    credentials = service_account.Credentials.from_service_account_file('src/connections/protean-fabric-386717-d6a21dd66382.json')

    # Connect to the BigQuery API using the credentials
    client = bigquery.Client(credentials=credentials)
    
    return client

### Extracción de Datos de Bigquery

In [4]:
def extract_dataframe_from_bigquery(project_id, dataset_id, table_name):
    # Crea una instancia del cliente de BigQuery
    client = connect_to_bigquery()

    # Obtén la fecha actual en el formato requerido por BigQuery
    current_date = datetime.now().strftime("%Y-%m-%d")
    
    # Crea la consulta SQL para extraer todos los datos de la tabla para el día actual
    query = f"SELECT * FROM `{project_id}.{dataset_id}.{table_name}` WHERE DATE_TRUNC(date, DAY) = '{current_date}'"

    # Ejecuta la consulta y obtiene los resultados como un DataFrame
    df = client.query(query).to_dataframe()

    return df

project_id = 'protean-fabric-386717'
dataset_id = "ml_datasets"
table_name = "insurance_synt"

dataframe = extract_dataframe_from_bigquery(project_id, dataset_id, table_name)
print(dataframe)

   user_cod        date  age     sex    bmi  children smoker     region
0    CZSZUP  2023-07-11   21    male  37.33         0     no  southwest
1    NINSQQ  2023-07-11   37  female  34.38         0     no  northwest
2    SGMIMH  2023-07-11   53  female  32.07         0    yes  southwest
3    DBRCWT  2023-07-11   36    male  38.97         1    yes  southeast
4    ZQMPZG  2023-07-11   33    male  28.74         1     no  northeast
5    FDZYCG  2023-07-11   41  female  36.72         1     no  northeast
6    AYOFFQ  2023-07-11   42  female  30.96         1     no  southwest
7    PTZWLT  2023-07-11   44  female  27.17         1     no  southwest
8    UKPCPE  2023-07-11   23  female  25.27         1     no  southeast
9    MXNLDW  2023-07-11   30  female  37.48         1     no  southwest
10   TLKLYW  2023-07-11   39  female  34.51         0     no  northwest
11   SWZCOQ  2023-07-11   26    male  21.57         0     no  southeast
12   SYTJHY  2023-07-11   38    male  24.05         0     no  so

In [5]:
df_original = dataframe

In [6]:
df_original

Unnamed: 0,user_cod,date,age,sex,bmi,children,smoker,region
0,CZSZUP,2023-07-11,21,male,37.33,0,no,southwest
1,NINSQQ,2023-07-11,37,female,34.38,0,no,northwest
2,SGMIMH,2023-07-11,53,female,32.07,0,yes,southwest
3,DBRCWT,2023-07-11,36,male,38.97,1,yes,southeast
4,ZQMPZG,2023-07-11,33,male,28.74,1,no,northeast
5,FDZYCG,2023-07-11,41,female,36.72,1,no,northeast
6,AYOFFQ,2023-07-11,42,female,30.96,1,no,southwest
7,PTZWLT,2023-07-11,44,female,27.17,1,no,southwest
8,UKPCPE,2023-07-11,23,female,25.27,1,no,southeast
9,MXNLDW,2023-07-11,30,female,37.48,1,no,southwest


In [7]:
input_df = df_original.drop(['user_cod', 'date'], axis=1)

### Test List Dict

In [8]:
input_list = input_df.to_dict(orient='records')

In [9]:
input_list

[{'age': 21,
  'sex': 'male',
  'bmi': 37.33,
  'children': 0,
  'smoker': 'no',
  'region': 'southwest'},
 {'age': 37,
  'sex': 'female',
  'bmi': 34.38,
  'children': 0,
  'smoker': 'no',
  'region': 'northwest'},
 {'age': 53,
  'sex': 'female',
  'bmi': 32.07,
  'children': 0,
  'smoker': 'yes',
  'region': 'southwest'},
 {'age': 36,
  'sex': 'male',
  'bmi': 38.97,
  'children': 1,
  'smoker': 'yes',
  'region': 'southeast'},
 {'age': 33,
  'sex': 'male',
  'bmi': 28.74,
  'children': 1,
  'smoker': 'no',
  'region': 'northeast'},
 {'age': 41,
  'sex': 'female',
  'bmi': 36.72,
  'children': 1,
  'smoker': 'no',
  'region': 'northeast'},
 {'age': 42,
  'sex': 'female',
  'bmi': 30.96,
  'children': 1,
  'smoker': 'no',
  'region': 'southwest'},
 {'age': 44,
  'sex': 'female',
  'bmi': 27.17,
  'children': 1,
  'smoker': 'no',
  'region': 'southwest'},
 {'age': 23,
  'sex': 'female',
  'bmi': 25.27,
  'children': 1,
  'smoker': 'no',
  'region': 'southeast'},
 {'age': 30,
  'sex': '

### Generación de Predicciones

In [16]:
PREDICT_ENDPOINT = 'http://127.0.0.1:8000/batch_predict_pipeline'

def predict_input(input_list):
    input_list_dict = input_list.to_dict(orient='records')  # Convertir DataFrame a lista de diccionarios
    response = requests.post(PREDICT_ENDPOINT, json=input_list_dict)
    if response.status_code == 200:
        datax = response.json()
        df = pd.DataFrame(datax)
        return df
    else:
        print("Error al realizar las estimaciones")
        return None

In [17]:
df_batch_result = predict_batch(input_df)

TypeError: Object of type DataFrame is not JSON serializable

### Carga de Resultados a Bigquery

In [None]:
df_api_result.dtypes

In [None]:
# Paso 4: Concatenar el DataFrame original con el resultado de la API
df_final = pd.concat([df_original[['user_cod', 'date']], df_api_result], axis=1)


In [None]:
df_final

In [None]:
table_name = 'insurance_predictions'
dataset_id = 'ml_datasets'

In [None]:
#@task
def ingest_or_create_to_bigquery(df_final, table_name, dataset_id):
    #configurar cliente de bigqury

    client = connect_to_bigquery()
    table_ref = client.dataset(dataset_id).table(table_name)

    # Verificar si la tabla existe
   # table_exists = client.get_table(table_ref, retry=retry.Retry(deadline=30))
    
    # Verificar si la tabla existe
    table_exists = client.get_table(table_ref)
    
    if table_exists is None:
        # Crear la tabla si no existe
        table = bigquery.Table(table_ref)
        schema = []
        for column_name, column_type in df_final.dtypes.items():
            schema.append(bigquery.SchemaField(name=column_name, field_type=column_type.name))
        table.schema = schema
        table = client.create_table(table)
        print(f"Se ha creado la tabla {dataset_id}.{table_name} en BigQuery.")
    
    # Cargar los datos del DataFrame en la tabla
    job_config = bigquery.LoadJobConfig()
    job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
    job = client.load_table_from_dataframe(df_final, table_ref, job_config=job_config)
    job.result()



In [None]:
Load_completed = ingest_or_create_to_bigquery(df_final, table_name, dataset_id)

In [None]:
""" #@flow(name="Return Insurance Preds to Bigquery")
def generate_insurance_data_bigquery():
    table_name = 'insurance_predictions'
    dataset_id = 'ml_datasets'
    df= generate_dataframe()
    load_completed = ingest_or_create_to_bigquery(df, table_name, dataset_id)
    return load_completed

if __name__ == '__main__':
 generate_insurance_data_bigquery()"""

In [None]:
!pip install pandera
