<a href="https://colab.research.google.com/github/LucasbFrancaa/HDATA/blob/main/CASE2_HDATA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install apache_beam
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions
from google.cloud import storage, bigquery
!pip install apache-beam[gcp] google-cloud-storage google-cloud-bigquery
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions



#BUCKET PARA LEITURA

In [None]:
gsutil mb gs://h_data


gsutil cp FILA_SENHA.csv gs://h_data/pasta/
gsutil cp MOTIVO_ALTA.csv gs://h_data/pasta/
gsutil cp PESSOA_FISICA_MEDICO.csv gs://h_data/pasta/
gsutil cp PESSOA_FISICA_PAC.csv gs://h_data/pasta/
gsutil cp ATENDIMENTO.csv gs://h_data/pasta/

#Criação e Execução do Pipeline no Dataflow

In [None]:
# Definindo o esquema BigQuery
bq_schema = 'nr_atendimento:INTEGER, dt_entrada:DATETIME, dt_inicio_atendimento:DATETIME, dt_atend_medico:DATETIME, dt_alta:DATETIME, cd_pessoa_fisica:INTEGER, ie_tipo_atendimento:INTEGER, ie_clinica:INTEGER'

In [None]:


# Função para processar o CSV, aplicando o filtro para IE_TIPO_ATENDIMENTO = 3 e mapeando IE_CLINICA
def parse_and_filter_csv(line):
    fields = line.split(';')  # Assumindo que o delimitador é ";"

    # Filtro para considerar apenas Pronto Atendimento (IE_TIPO_ATENDIMENTO = 3)
    if int(fields[6]) == 3:
        # Mapeamento das especialidades no campo IE_CLINICA
        especialidades = {
            1: 'Clínica Médica',
            4: 'Pediatria',
            10: 'Cirurgia Geral'
        }

        ie_clinica = int(fields[7])
        especialidade = especialidades.get(ie_clinica, 'Outros')  # Caso o IE_CLINICA seja desconhecido

        # Validação dos campos essenciais (exemplo: campos de data não podem ser nulos ou vazios)
        if not fields[1] or not fields[2] or not fields[3]:
            return None  # Ignora se faltar algum campo essencial de datas

        return {
            'nr_atendimento': int(fields[0]),
            'dt_entrada': fields[1],
            'dt_inicio_atendimento': fields[2],
            'dt_atend_medico': fields[3],
            'dt_alta': fields[4],
            'cd_pessoa_fisica': int(fields[5]),
            'ie_tipo_atendimento': int(fields[6]),  # Será sempre 3, pois filtramos acima
            'especialidade': especialidade  # Campo com a descrição da especialidade
        }
    else:
        return None  # Não processar registros que não são Pronto Atendimento

# Função para fazer o upsert (inserção ou atualização) no BigQuery
def upsert_to_bigquery(new_record, bq_client):
    # Verificar se o registro já existe no BigQuery
    query = f"""
        SELECT nr_atendimento
        FROM `seu-projeto-id.dataset.tabela_atendimento`
        WHERE nr_atendimento = @nr_atendimento
    """
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter("nr_atendimento", "INT64", new_record['nr_atendimento']),
        ]
    )
    results = bq_client.query(query, job_config=job_config)

    # Se o registro já existir, fazer atualização, senão inserir
    if results.total_rows > 0:
        update_query = f"""
        UPDATE `seu-projeto-id.dataset.tabela_atendimento`
        SET dt_entrada = @dt_entrada,
            dt_inicio_atendimento = @dt_inicio_atendimento,
            dt_atend_medico = @dt_atend_medico,
            dt_alta = @dt_alta,
            cd_pessoa_fisica = @cd_pessoa_fisica,
            especialidade = @especialidade
        WHERE nr_atendimento = @nr_atendimento
        """
        update_job_config = bigquery.QueryJobConfig(
            query_parameters=[
                bigquery.ScalarQueryParameter("nr_atendimento", "INT64", new_record['nr_atendimento']),
                bigquery.ScalarQueryParameter("dt_entrada", "DATETIME", new_record['dt_entrada']),
                bigquery.ScalarQueryParameter("dt_inicio_atendimento", "DATETIME", new_record['dt_inicio_atendimento']),
                bigquery.ScalarQueryParameter("dt_atend_medico", "DATETIME", new_record['dt_atend_medico']),
                bigquery.ScalarQueryParameter("dt_alta", "DATETIME", new_record['dt_alta']),
                bigquery.ScalarQueryParameter("cd_pessoa_fisica", "INT64", new_record['cd_pessoa_fisica']),
                bigquery.ScalarQueryParameter("especialidade", "STRING", new_record['especialidade'])
            ]
        )
        bq_client.query(update_query, job_config=update_job_config).result()
    else:
        # Inserir o novo registro
        bq_client.insert_rows_json('seu-projeto-id.dataset.tabela_atendimento', [new_record])

# Função principal do pipeline
def run():
    # Opções do pipeline Dataflow
    options = PipelineOptions()
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = '2'
    google_cloud_options.job_name = 'pipeline-pronto-atendimento'
    google_cloud_options.staging_location = 'gs://h_data/staging'
    google_cloud_options.temp_location = 'gs://h_data/temp'
    options.view_as(StandardOptions).runner = 'DataflowRunner'

    # Criar cliente BigQuery
    bq_client = bigquery.Client()

    # Definir o pipeline Apache Beam
    with beam.Pipeline(options=options) as p:
        lines = p | 'Ler CSV do GCS' >> beam.io.ReadFromText('gs://seu-bucket/pasta/FILA_SENHA.csv')

        # Processar as linhas CSV com filtro, mapeamento e validação
        parsed_lines = lines | 'Processar e Filtrar CSV' >> beam.Map(parse_and_filter_csv)

        # Aplicar a lógica de upsert para cada registro
        parsed_lines | 'Upsert no BigQuery' >> beam.Map(lambda record: upsert_to_bigquery(record, bq_client))

if __name__ == '__main__':
    run()


#Esse código garante que o pipeline seja capaz de lidar com cargas incrementais, usando a lógica de upsert no BigQuery.

<!-- Esse código garante que o pipeline seja capaz de lidar com cargas incrementais, usando a lógica de upsert no BigQuery. -->

#Automatização com Cloud Functions

In [None]:
from googleapiclient.discovery import build

def trigger_dataflow(event, context):
    # Configurações do job do Dataflow
    project = 'HDATA'
    job_name = 'processamento-pipeline'
    template_path = 'gs://h_data/templates/dataflow_template'

    dataflow = build('dataflow', 'v1b3', cache_discovery=False)

    job = {
        "jobName": job_name,
        "parameters": {
            "inputFile": event['name']  #arquivo
        },
        "environment": {
            "tempLocation": "gs://seu-bucket/temp"
        }
    }

    request = dataflow.projects().templates().launch(
        projectId=project,
        gcsPath=template_path,
        body=job
    )

    response = request.execute()
    print(f"Job {job_name} started with response: {response}")


#Verificação dos Dados no BigQuery

In [None]:
SELECT
  nr_atendimento,
  TIMESTAMP_DIFF(dt_inicio_atendimento, dt_entrada, MINUTE) AS tempo_espera,
  TIMESTAMP_DIFF(dt_alta, dt_inicio_atendimento, MINUTE) AS tempo_atendimento
FROM
  `seu-projeto-id.dataset.tabela_atendimento`
