# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


####  Run this cell to set up and start your interactive session.


In [1]:
"""
# Timeout do job
%idle_timeout 20

# Versão do Glue
%glue_version 3.0

# Tipo de worker "instância"
%worker_type G.1X

# Quantidade de workers
%number_of_workers 2
"""

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.38.1 
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::299399630206:role/acesso_total
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 8bbd9141-76cd-4b09-8728-c23549e73fdf
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.38.1
--enable-glue-datacatalog true
Waiting for session 8bbd9141-76cd-4b09-8728-c23549e73fdf to get into ready status...
Session 8bbd9141-76cd-4b09-8728-c23549e73fdf has been created.



### Tratando dados para a tabela PACIENTES

Os dados estão separados em dois arquivos de acordo com sexo, serão carregados e unificado em um só

In [2]:
# Criando dataframe do arquivo de pacientes sexo feminino
pacientes_f = spark.read.json(path = 's3://arquivos-medland/Raw data/pacientes_f.json', multiLine = True)

# Criando dataframe do arquivo de pacientes sexo masculino
pacientes_m = spark.read.json(path = 's3://arquivos-medland/Raw data/pacientes_m.json', multiLine = True)




In [3]:
# União dos dois dataframes:
df = pacientes_f.union(pacientes_m).orderBy('nome')
print('O total de linhas do dataframe unificado é: {}'.format(df.count()))

O total de linhas do dataframe unificado é: 1085


In [4]:
# Visualizando Schema:
df.printSchema()

root
 |-- altura: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- celular: string (nullable = true)
 |-- cep: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- cor: string (nullable = true)
 |-- cpf: string (nullable = true)
 |-- data_nasc: string (nullable = true)
 |-- email: string (nullable = true)
 |-- endereco: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- idade: long (nullable = true)
 |-- mae: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- numero: long (nullable = true)
 |-- pai: string (nullable = true)
 |-- peso: long (nullable = true)
 |-- rg: string (nullable = true)
 |-- senha: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- signo: string (nullable = true)
 |-- telefone_fixo: string (nullable = true)
 |-- tipo_sanguineo: string (nullable = true)


#### Transformando os dados:

In [5]:
# Transformando a data de string para tipo date:
df = df.withColumn('data_nasc', F.to_date(F.col('data_nasc'), 'dd/MM/yyyy'))

# Substituindo sexo:
df = df.replace(['Feminino', 'Masculino'], ['F', 'M'], 'sexo')

# Removendo pontuação coluna CPF:
df = df.withColumn('cpf', F.regexp_replace('cpf', '[^\w\s]', ''))

# Removendo pontuação coluna RG:
df = df.withColumn('rg', F.regexp_replace('rg', '[^\w\s]', ''))

# Removendo espaços da coluna celular e telefone fixo:
df = df.withColumn('celular', F.regexp_replace(F.col('celular'),' ', '')).withColumn('telefone_fixo', F.regexp_replace(F.col('telefone_fixo'),' ', ''))

# Coalesce para pegar o valor do telefone fixo se o celular estiver nulo + renomenado coluna:
df = df.withColumn('celular', 
                     F.coalesce(F.col('celular'), F.col('telefone_fixo')))\
                    .withColumnRenamed("celular", "telefone")

# Concatenando endereco com número para virar uma coluna só:
df = df.withColumn('endereco', F.concat_ws(' - ', F.col('endereco'), F.col('numero'), F.col('bairro')))




In [6]:
from pyspark.sql.window import Window

# Criando uma janela com base na coluna 'nome' para criar uma ordenação que será o ID_PACIENTE
# A função window é usada para fornecer o número sequencial da linha começando de 1 ao resultado de cada partição da janela
window = Window.orderBy(F.col('nome'))

# A função row_number atribui um número sequencial para cada linha do dataframe com base na ordem de classificação especificada pela janela window criada
df = df.withColumn("ID_PACIENTE", F.row_number().over(window))




In [7]:
# Registrando o DataFrame como uma tabela temporária:
df.createOrReplaceTempView("temp_pacientes")

# Executando consulta SQL ordenando e renomeando colunas de acordo com o database para catalogar no GLUE
df_final = spark.sql("""
    SELECT 
        ID_PACIENTE,
        nome as NOME, 
        sexo as SEXO, 
        tipo_sanguineo as TP_SANGUE, 
        rg as RG, 
        cpf as CPF, 
        data_nasc as NASCIMENTO, 
        mae as NOME_MAE, 
        telefone as TELEFONE, 
        email as EMAIL, 
        endereco as ENDERECO, 
        cep as CEP, 
        cidade as CIDADE, 
        estado as ESTADO
    FROM temp_pacientes
""")

# Exibindo o novo DataFrame
df_final.show(5, truncate = False)

+-----------+--------------------------------+----+---------+---------+-----------+----------+---------------------+--------------+------------------------------------+------------------------------------------------+---------+--------------+------+
|ID_PACIENTE|NOME                            |SEXO|TP_SANGUE|RG       |CPF        |NASCIMENTO|NOME_MAE             |TELEFONE      |EMAIL                               |ENDERECO                                        |CEP      |CIDADE        |ESTADO|
+-----------+--------------------------------+----+---------+---------+-----------+----------+---------------------+--------------+------------------------------------+------------------------------------------------+---------+--------------+------+
|1          |Abel Renan Rodrigues            |M   |A-       |131145873|70264651634|1958-02-07|Liz Jaqueline Juliana|(31)3949-1110 |abel_rodrigues@ericsson.com         |Rua José Joaquim dos Santos - 211 - Copacabana  |31540-524|Belo Horizonte|MG    |


In [8]:
# Salvando o DF como parquet e definindo somente uma partição para gerar um arquivo único
caminho_pacientes = "s3://arquivos-medland/ProcessedData/Pacientes"

df_final.coalesce(1).write.mode("overwrite").parquet(caminho_pacientes)




### Tratando dados para a tabela CONSULTAS

In [9]:
# Dataframe do arquivo de consultas
consultas = spark.read.csv("s3://arquivos-medland/Raw data/consultas.csv", header = True, inferSchema = True, sep = ";")




In [10]:
# Visualizando dataframe
consultas.show(10, truncate = False)

+--------+------+--------+----------------+-------+--------------------------------------+---------+
|Paciente|Medico|Hospital|Data            |Triagem|Diagnostico                           |Internado|
+--------+------+--------+----------------+-------+--------------------------------------+---------+
|1       |4     |1       |14/05/2022 09:13|2      |Sintomas médios de COVID SARS-CoV-2   |0        |
|2       |5     |2       |09/03/2022 07:23|1      |Princípio  de infarto                 |0        |
|3       |7     |1       |10/06/2022 05:47|4      |Febre/ Expectoração/mucosidade anormal|1        |
|3       |5     |2       |16/09/2022 06:05|1      |Sintomas graves de COVID SARS-CoV-2   |0        |
|4       |5     |2       |08/04/2022 09:29|4      |Infeccção intestinal                  |0        |
|4       |6     |1       |23/06/2022 07:45|2      |Sintomas médios de COVID SARS-CoV-2   |0        |
|5       |7     |2       |26/08/2022 06:49|4      |Perturbações visuais/ Dor no olho     |0

In [11]:
# Visualizando Schema
consultas.printSchema()

root
 |-- Paciente: integer (nullable = true)
 |-- Medico: integer (nullable = true)
 |-- Hospital: integer (nullable = true)
 |-- Data: string (nullable = true)
 |-- Triagem: integer (nullable = true)
 |-- Diagnostico: string (nullable = true)
 |-- Internado: integer (nullable = true)


In [12]:
# Transformando a data de string para tipo date
consultas = consultas.withColumn("Data", F.to_timestamp("Data", "dd/MM/yyyy HH:mm"))

# Ordenando dataframe pela colun data
consultas = consultas.orderBy("Data")




In [13]:
# Visualizando Schema
consultas.printSchema()

root
 |-- Paciente: integer (nullable = true)
 |-- Medico: integer (nullable = true)
 |-- Hospital: integer (nullable = true)
 |-- Data: timestamp (nullable = true)
 |-- Triagem: integer (nullable = true)
 |-- Diagnostico: string (nullable = true)
 |-- Internado: integer (nullable = true)


In [14]:
# Renomeando colunas de acordo com o database para catalogar no GLUE
consultas = consultas.withColumnRenamed("ID", "ID_CONSULTA")\
                    .withColumnRenamed("Paciente", "ID_PACIENTE")\
                    .withColumnRenamed("Medico", "ID_MEDICO")\
                    .withColumnRenamed("Hospital", "ID_HOSPITAL")\
                    .withColumnRenamed("Data", "DATA")\
                    .withColumnRenamed("Triagem", "GRAU_RISCO")\
                    .withColumnRenamed("Diagnostico", "DIAGNOSTICO")\
                    .withColumnRenamed("Internado", "INTERNACAO")




In [15]:
# Visualizando dataframe
consultas.show(10, truncate = False)

+-----------+---------+-----------+-------------------+----------+--------------------------------------+----------+
|ID_PACIENTE|ID_MEDICO|ID_HOSPITAL|DATA               |GRAU_RISCO|DIAGNOSTICO                           |INTERNACAO|
+-----------+---------+-----------+-------------------+----------+--------------------------------------+----------+
|117        |2        |1          |2022-01-01 10:27:00|2         |Sintomas graves de COVID SARS-CoV-2   |0         |
|131        |1        |2          |2022-01-03 07:49:00|4         |Hipertensão sem complicações          |0         |
|130        |9        |1          |2022-01-05 08:18:00|4         |Cefaleia/ Febre/ Tosse                |0         |
|7          |7        |1          |2022-01-10 00:50:00|4         |Sintomas médios de COVID SARS-CoV-2   |0         |
|26         |1        |1          |2022-01-10 08:58:00|4         |Sinais/sintomas múltiplos das artic.  |1         |
|111        |7        |2          |2022-01-10 11:14:00|4        

In [16]:
# Salvando o DF como parquet e definindo somente uma partição para gerar um arquivo único
caminho_consultas = "s3://arquivos-medland/ProcessedData/Consultas"

consultas.coalesce(1).write.mode("overwrite").parquet(caminho_consultas)


