## Importamos las librerias que son necesarias para poder hacer nuestro datapipeline, en este caso vamos a usar las libreria de Kubeflow y Google Cloud 

In [22]:
import kfp.dsl as dsl

from typing import NamedTuple, Callable, List, Any, Dict

from kfp.dsl import pipeline
from kfp.dsl import OutputPath
from kfp.dsl import InputPath

from kfp.dsl import Output
from kfp.dsl import Metrics

from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        component,
                        Markdown)

from kfp import compiler



from google.cloud import aiplatform as aip
from google.cloud.aiplatform import pipeline_jobs





#### Creamos variables para guardar tanto nuestro proyecto_id como nuestra ruta de PIPELINE ROOT

In [23]:


PROJECT_ID = "project-id"
PIPELINE_ROOT = "pipe-lineroot"
# Specify BigQuery table details (replace with your own)


#### Aca inicializamos donde va a estar localizado nuestro proyecto, porque en google cloud necesitamos saber la localizacion de nuestros servicios

In [24]:
aip.init(
    project=PROJECT_ID,
    location="us-east1",)

#### Consultamos las reviews en BigQuery a demás tenemos que usar los diferentes parametros de @component con la necesidad de decirle que imagen base usar, si peude exportar algun tipo de archivo y que paquetes/modulos tiene que instalar para que el componente creado pueda funcionar correctamente 


In [25]:

@component(
 packages_to_install=["google-cloud-bigquery", "appengine-python-standard"],
 output_component_file="bigquery_query.json",
 base_image="python:3.10"   
)
def query_bigquery(
    project_id: str,
    dataset_id: str,
    table_id: str,
    year : int,
    month : int
) -> List:
    """
    Ejecuta una query hacia bigquery para que nos devuelva las reviews que nosotros necesitamos

    Args:
        project_id (str): The BigQuery project ID.
        dataset_id (str): The dataset ID.
        table_id (str): The table ID.
        year (int): The year to filter reviews by.
        month (int): The month to filter reviews by.

    Returns:
        dict: El resultado de nuestra query sera devuelto en una diccionario
    """
    try :
        from google.cloud import bigquery


        # Initialize BigQuery client
        bq_client = bigquery.Client(project=project_id)

        # Build the query
        reviews_query = f"""
          SELECT review_id, text
          FROM `{project_id}.{dataset_id}.{table_id}`
          WHERE EXTRACT(YEAR FROM date) = {year} AND EXTRACT(MONTH FROM date) = {month} limit 10
          """

        try:
          # Execute the query
            reviews_query_job = bq_client.query(reviews_query)

            result = []
            for row in reviews_query_job:
                text = row[1]
                review_id = row[0]
                result.append(
                  {'review_id': review_id,
                  'text': text,
                  })
            return result
        except Exception as e:
              # Handle exceptions (e.g., logging, error handling)
              return {"error": str(e)}
    except Exception as e:
          print("error", str(e))

  @component(
  def query_bigquery(


In [None]:

@component(
 packages_to_install=["google-cloud-bigquery", "appengine-python-standard"],
 base_image="python:3.10"   
)
def query_bigquery_google(
    project_id: str,
    dataset_id: str,
    table_id: str,
    year : int,
    month : int
) -> List:
    """
    Ejecuta una query hacia bigquery para que nos devuelva las reviews que nosotros necesitamos

    Args:
        project_id (str): The BigQuery project ID.
        dataset_id (str): The dataset ID.
        table_id (str): The table ID.
        year (int): The year to filter reviews by.
        month (int): The month to filter reviews by.

    Returns:
        dict: El resultado de nuestra query sera devuelto en una diccionario
    """
    try :
        import datetime
        def obtener_parte_traducida(texto) -> str : 
            import re
            parte_traducida = re.search(r'\(Translated by Google\)(.*?)\n\n', texto)
            if parte_traducida:
                texto = parte_traducida.group(0).strip()
                texto_sin_google = texto.replace("(Translated by Google)", "").strip()
                return texto_sin_google
            else:
                return texto  # En caso de que no se encuentre, se devuelve el texto original

        from google.cloud import bigquery


        # Initialize BigQuery client
        bq_client = bigquery.Client(project=project_id)

        # Build the query
        # Query BigQuery to fetch reviews data
        reviews_query = f"""
        SELECT gmap_id, user_id, time, text
        FROM `{project_id}.{dataset_id}.{table_id}`
        WHERE EXTRACT(YEAR FROM time) = {year} AND EXTRACT(MONTH FROM time) = {month}
        LIMIT 10
        """

        try:
          # Execute the query
            reviews_query_job = bq_client.query(reviews_query)

            result = []
            for row in reviews_query_job:
                gmap_id = row[0]
                user_id = row[1]
                time = row[2]
                time_correx = time.timestamp()
                texto = row[3]
                if texto :
                    texto = obtener_parte_traducida(
                    texto)
                result.append(
                  {
                    "gmap_id" : gmap_id,
                    "user_id" : user_id,
                    "time" : time_correx,
                    'text': texto,
                  })
            return result
        except Exception as e:
              # Handle exceptions (e.g., logging, error handling)
            return [f"error {e}"]
    except Exception as e:
          print("error", str(e))

## Consultamos las categorias desde la tabla de BigQuery gracias a una query que creamos

In [27]:


@component(
    packages_to_install=["google-cloud-bigquery", "appengine-python-standard"],
    base_image="python:3.10"   
)
def query_bigquery_categories(
   project_id: str,
   dataset_id: str,
) -> Dict:

    """
    Ejecuta una Query de Bigquery para obtener las categorias de la base de dato
    Args :
      project_id (str) : El Proyect Id de BigQuery
      dataset_id (str) : El Id del Dataset
    """
    try : 
        from google.cloud import bigquery
        bq_client = bigquery.Client(project=project_id)
        categories_table_id = "Categories"

          # Define your query
        categories_query = f"""
              SELECT *
              FROM `{project_id}.{dataset_id}.{categories_table_id}`
          """
        try :
            # Run the query
            categories_query_job = bq_client.query(categories_query)
            categories_dictionary = {}

            for row in categories_query_job.result():
                categories_dictionary[row[0]] = [item['item'] for item in row[1]['list']]
            return categories_dictionary
        except Exception as e :
            return {"error": str(e)}
    except  Exception as e :
        return {"error": str(e)}## Instanciamos el modelo del analisis de sentimiento de las reviews, el cual nos devuelve las palabras mas relevantes para nuestro proposito que es encontrar en que tenemos que mejorar, y que fortalezas tenemos



## Analizamos el Sentimiento de las entidades referido al texto que extraemos de las reviews


In [28]:
@component(packages_to_install=["google-cloud-language", "google-cloud-storage", "appengine-python-standard"], base_image="python:3.10")
def all_entity_sentiments(result: List, categories_dictionary: Dict) -> List:
    """
    Ejecuta una función que nos extrae el texto de las reviews, lo insertamos en nuestra funcion
    Args:
    text (str): El texto que deseamos analizar
    diccionario de categoria: El Id del Dataset
    Returns:
    List: va a contener el diccionario entity_sentiments, que nos va a devolver el sentimiento de diferentes valores
    """
    try:
        from google.cloud import storage
        import json

        def analyze_entities_sentiments(text: str, categories_dictionary: dict):
            """
            Ejecuta análisis de entidad y sentimientos para obtener categorías de la base de datos
            Args:
            text (str): El texto que deseamos analizar
            diccionario de categoria: El Id del Dataset
            Returns:
            Dict: va a contener el diccionario entity_sentiments, que nos va a devolver el sentimiento de diferentes textos y cómo esto influyen a nuestros clientes a la hora de hospedarse
            """
            try:
                from google.cloud import language

                nlp_client = language.LanguageServiceClient()
                document = language.Document(content=text, type_=language.Document.Type.PLAIN_TEXT)

                # Analizar entidades en texto de review
                entity_analysis = nlp_client.analyze_entities(document=document)

                entity_sentiments = {}

                # Dividir las entidades y analizar el sentimiento por cada entidad
                for entity in entity_analysis.entities:
                    entity_text = entity.name
                    document = language.Document(content=entity_text, type_=language.Document.Type.PLAIN_TEXT)
                    sentiment = nlp_client.analyze_sentiment(document=document)

                    # Si la entidad es lo suficientemente relevante
                    if 0 < entity.type_ < 8:

                        found_match = False

                        # Categorizar la entidad
                        for key, value in categories_dictionary.items():
                            if found_match:
                                break
                            for item in value:
                                if item.casefold() in entity_text.casefold():
                                    category = key
                                    found_match = True
                                    break
                                else:
                                    category = None

                        # Almacenar datos
                        entity_sentiments[entity_text] = {
                            'category': category,
                            'sentiment_score': round(sentiment.document_sentiment.score, 2),
                        }
                return entity_sentiments
            except Exception as e:
                print(e)

        all_entity_sentiments = []
        for row in result:
            text = row["text"]
            review_id = row["review_id"]
            entity_sentiments_results = analyze_entities_sentiments(text, categories_dictionary)
            if entity_sentiments_results:
                for entity_text, data in entity_sentiments_results.items():
                    all_entity_sentiments.append(
                        {'review_id': review_id,
                         'entity': entity_text,
                         'category': data['category'],
                         'sentiment_score': data['sentiment_score'],
                         })
        json_data = json.dumps(all_entity_sentiments)

        bucket_name = "hoteles-datos"  # Nombre del bucket (sin "gs://")
        blob_name = f'PipeLine/json/archivo-yelp.json'

        # Crea una instancia del cliente de GCS
        storage_client = storage.Client("sharp-agent-398819")

        # Obtén un objeto Blob en el bucket
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(blob_name)

        # Sube la cadena JSON al Blob
        blob.upload_from_string(json_data, content_type='application/json')

        text = [f"{bucket_name}/{blob_name}"]
        return text
    except Exception as e:
        return print(f"error", str(e))


In [29]:
@component(packages_to_install=["google-cloud-language", "google-cloud-storage", "appengine-python-standard"], base_image="python:3.10")
def all_entity_sentiments_google(result: List, categories_dictionary: Dict) -> List:
    """
    Ejecuta una función que nos extrae el texto de las reviews, lo insertamos en nuestra funcion
    Args:
    text (str): El texto que deseamos analizar
    diccionario de categoria: El Id del Dataset
    Returns:
    List: va a contener el diccionario entity_sentiments, que nos va a devolver el sentimiento de diferentes valores
    """
    try:
        from google.cloud import storage
        import json
                
        
        def analyze_entities_sentiments(text: str, categories_dictionary: dict):
            """
            Ejecuta análisis de entidad y sentimientos para obtener categorías de la base de datos
            Args:
            text (str): El texto que deseamos analizar
            diccionario de categoria: El Id del Dataset
            Returns:
            Dict: va a contener el diccionario entity_sentiments, que nos va a devolver el sentimiento de diferentes textos y cómo esto influyen a nuestros clientes a la hora de hospedarse
            """
            try:
                from google.cloud import language
                
                
                nlp_client = language.LanguageServiceClient()
                document = language.Document(content=text, type_=language.Document.Type.PLAIN_TEXT)

                # Analizar entidades en texto de review
                entity_analysis = nlp_client.analyze_entities(document=document)

                entity_sentiments = {}

                # Dividir las entidades y analizar el sentimiento por cada entidad
                for entity in entity_analysis.entities:
                    entity_text = entity.name
                    document = language.Document(content=entity_text, type_=language.Document.Type.PLAIN_TEXT)
                    sentiment = nlp_client.analyze_sentiment(document=document)

                    # Si la entidad es lo suficientemente relevante
                    if 0 < entity.type_ < 8:

                        found_match = False

                        # Categorizar la entidad
                        for key, value in categories_dictionary.items():
                            if found_match:
                                break
                            for item in value:
                                if item.casefold() in entity_text.casefold():
                                    category = key
                                    found_match = True
                                    break
                                else:
                                    category = None

                        # Almacenar datos
                        entity_sentiments[entity_text] = {
                            'category': category,
                            'sentiment_score': round(sentiment.document_sentiment.score, 2),
                        }
                return entity_sentiments
            except Exception as e:
                print(e)

        all_entity_sentiments = []
        for row in result:
            text = row["text"]
            review_id = row["gmap_id"]
            user_id = row['user_id']
            time = row['time']
            entity_sentiments_results = analyze_entities_sentiments(text, categories_dictionary)
            if entity_sentiments_results:
                for entity_text, data in entity_sentiments_results.items():
                    all_entity_sentiments.append(
                        {'gmap_id': review_id,
                        'user_id': user_id,
                        'time': time,
                        'entity': entity_text,
                        'category': data['category'],
                        'sentiment_score': data['sentiment_score'],
                         })
        json_data = json.dumps(all_entity_sentiments)

        bucket_name = "hoteles-datos"  # Nombre del bucket (sin "gs://")
        blob_name = f'PipeLine/json/archivo-google.json'

        # Crea una instancia del cliente de GCS
        storage_client = storage.Client("sharp-agent-398819")

        # Obtén un objeto Blob en el bucket
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(blob_name)

        # Sube la cadena JSON al Blob
        blob.upload_from_string(json_data, content_type='application/json')

        text = [f"{bucket_name}/{blob_name}"]
        return text
    except Exception as e:
        return print(f"error", str(e))


## Finalmente cuando pasamos por todos los componentes podemos empezar a insertar los datos que recuperamos de nuestro modelo de NLP a nuestra base de dato (Data WareHouse), para proximamente utilizarlo en el dashboard para analizar nuestro FODA

In [30]:
@component(
    packages_to_install=["google-cloud-bigquery", "google-cloud-storage" , "appengine-python-standard"],
    base_image="python:3.8"
)
def insertar_big_query(url_json: List, project_id: str) -> str:
    """
    Ejecuta una función que nos extrae el texto de las reviews, lo insertamos en nuestra función
    Args:
        all_entity_sentiments (list): El texto que deseamos analizar
        project_id (str): El Id del Dataset
    Returns:
        Dict: va a contener el diccionario entity_sentiments, que nos va a devolver el sentimiento de diferentes valores
    """
    try:
        from google.cloud import bigquery
        from google.cloud import storage
        import json
        
        # Nombre del archivo JSON en GCS y el nombre del bucket
        bucket_name = "hoteles-datos"  # Nombre del bucket (sin "gs://")
        blob_name = f'PipeLine/json/archivo-yelp.json'

        # Crea una instancia del cliente de GCS<
        storage_client = storage.Client(project=project_id)

        # Obtén un objeto Blob en el bucket
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(blob_name)

        # Descarga el contenido del archivo JSON como una cadena
        json_data = blob.download_as_text()
        
        # Convierte la cadena JSON en una lista (o en la estructura que necesites)
        all_entity_sentiments = json.loads(json_data)
        bq_client = bigquery.Client(project=project_id)
        
        for entity in all_entity_sentiments:
            if entity['category']:
                analysis_query = f"""INSERT INTO `{project_id}.GoogleYelp.Yelp-Review-Analysis` (review_id, entity, category, sentiment_score)
                                    VALUES ('{entity['review_id']}', '{entity['entity']}', '{entity['category']}', {entity['sentiment_score']})
                                """
            else:
                analysis_query = f"""INSERT INTO `{project_id}.GoogleYelp.Yelp-Review-Analysis` (review_id, entity, sentiment_score)
                                    VALUES ('{entity['review_id']}', '{entity['entity']}', {entity['sentiment_score']})
                                """
            bq_client.query(analysis_query)
        
        return 'Success!'
    
    except Exception as e:
        return f"{e}"


In [31]:
@component(
    packages_to_install=["google-cloud-bigquery", "google-cloud-storage" , "appengine-python-standard"],
    base_image="python:3.8"
)
def insertar_big_query_google(url_json: List, project_id: str) -> str:
    """
    Ejecuta una función que nos extrae el texto de las reviews, lo insertamos en nuestra función
    Args:
        all_entity_sentiments (list): El texto que deseamos analizar
        project_id (str): El Id del Dataset
    Returns:
        Dict: va a contener el diccionario entity_sentiments, que nos va a devolver el sentimiento de diferentes valores
    """
    try:
        from google.cloud import bigquery
        from google.cloud import storage
        import json
        
        # Nombre del archivo JSON en GCS y el nombre del bucket
        bucket_name = "hoteles-datos"  # Nombre del bucket (sin "gs://")
        blob_name = f'PipeLine/json/archivo-google.json'

        # Crea una instancia del cliente de GCS<
        storage_client = storage.Client(project=project_id)

        # Obtén un objeto Blob en el bucket
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(blob_name)

        # Descarga el contenido del archivo JSON como una cadena
        json_data = blob.download_as_text()
        
        # Convierte la cadena JSON en una lista (o en la estructura que necesites)
        all_entity_sentiments = json.loads(json_data)
        bq_client = bigquery.Client(project=project_id)
        
        for entity in all_entity_sentiments:
            if entity['category']:
                analysis_query = f"""INSERT INTO `{project_id}.GoogleYelp.Google-Review-Analysis` (gmap_id, user_id, time, entity, category, sentiment_score)
                                VALUES ('{entity['gmap_id']}', '{entity['user_id']}', TIMESTAMP {entity['time']},'{entity['entity']}', '{entity['category']}', {entity['sentiment_score']})
                                """
            else:
                analysis_query = f"""INSERT INTO `{project_id}.GoogleYelp.Google-Review-Analysis` (gmap_id, user_id, entity, time, sentiment_score)
                                VALUES ('{entity['gmap_id']}', '{entity['user_id']}', '{entity['entity']}', TIMESTAMP {entity['time']}, , {entity['sentiment_score']})
                                """
        bq_client.query(analysis_query)
        
        return 'Success!'
    
    except Exception as e:
        return f"{e}"


### En la siguiente celda podemos ver la manera en como se crea un pipeline con condicionales, a demás este pipeline fue creado con kubeflow. 

In [34]:
@dsl.pipeline(name="my-pipeline", pipeline_root=PIPELINE_ROOT)
def my_pipeline(
    project_id: str = "project-id",
    dataset_id: str = "GoogleYelp",
    table_id: str = "Yelp-Reviews",
    year : int = 2019,
    month : int = 9,
   
):  
        def is_yelp_reviews(table_id):
            return table_id == "Yelp-Reviews"
        def is_google_reviews(table_id):
            return table_id == "Google-Reviews"
        
        with dsl.Condition(is_yelp_reviews(table_id)): 
            # Operaciones para consultar BigQuery y obtener datos de la tabla de categorías
            bq_data = query_bigquery(
                project_id=project_id,
                dataset_id=dataset_id,
                table_id=table_id,
                year = year,
                month = month
            )

            categories_data = query_bigquery_categories(
                project_id=project_id,
                dataset_id=dataset_id,
            )
            print()
            # Operación para procesar todas las entidades y sentimientos
            all_entity_sentiments_result = all_entity_sentiments(
                result=bq_data.output,
                categories_dictionary=categories_data.output,
            )
            insertar_bigquery_result = insertar_big_query(
                url_json = all_entity_sentiments_result.output,
                project_id=project_id,
            )
        
        with dsl.Condition(is_google_reviews(table_id)): 
            # Operaciones para consultar BigQuery y obtener datos de la tabla de categorías
            bq_data = query_bigquery_google(
                project_id=project_id,
                dataset_id=dataset_id,
                table_id=table_id,
                   year = year,
                month = month
            )

            categories_data = query_bigquery_categories(
                project_id=project_id,
                dataset_id=dataset_id,
            )
            # Operación para procesar todas las entidades y sentimientos
            all_entity_sentiments_result = all_entity_sentiments_google(
                result=bq_data.output,
                categories_dictionary=categories_data.output,
            )
            insertar_bigquery_result = insertar_big_query_google(
                url_json = all_entity_sentiments_result.output,
                project_id=project_id,
            )
        
        # Registro de eventos
        print(insertar_bigquery_result)     
if __name__ == "__main__":
    import kfp.compiler as compiler
    compiler.Compiler().compile(my_pipeline, "pipeline.json")



<kfp.components.pipeline_task.PipelineTask object at 0x7fbd74b6ab00>


### Y en las siguientes celda creamos las funciones para hacer el llamado al pipeline pero con diferentes parametros para que se cumpla la condicion puesta en nuestra pipeline 

In [35]:
job =  aip.PipelineJob(
    display_name="hello-world-pipeline",
    template_path = "pipeline.json",
    job_id ="yelp5",
    pipeline_root=PIPELINE_ROOT,
    enable_caching = False,
    parameter_values={
        'project_id': PROJECT_ID,
        "dataset_id" : "GoogleYelp",
        "table_id" : "Yelp-Reviews",
    }
)

job2 =  aip.PipelineJob(
    display_name="hello-world-pipeline",
    template_path = "pipeline.json",
    job_id ="google11",
    pipeline_root=PIPELINE_ROOT,
    enable_caching = False,
    parameter_values={
        'project_id': PROJECT_ID,
        "dataset_id" : "GoogleYelp",
        "table_id" : "Google-Reviews",
    }
)
job.submit()
job2.submit()