# **Pipeline ETL com AWS - Python**
#### **Cinthia Santos - Engenheira de Dados**

[Linkedin](https://www.linkedin.com/in/cinthialpsantos/)


[Github](https://github.com/cinthialet)

cithsantos@gmail.com


# 1. Entendimento do problema

# Objetivo

A pedido de um Cientista de Dados, eu, como Engenheira de Dados, irei desenvolver um conjunto de dados adequado para modelagem de machine learning que ele irá fazer no futuro. O conjunto de dados deve incluir:

- **ID do Cliente**: Identificador único para cada cliente.
- **Gênero**: Informação sobre o gênero do cliente.
- **Senioridade**: Indica se o cliente é sênior ou não.
- **Dependentes**: Informação sobre se o cliente possui dependentes.
- **Parceiro**: Indica se o cliente vive com um parceiro.
- **Classificação do Cliente**: Baseado na duração do contrato com a empresa, categorizado como:
  - **New**: Até 6 meses de contratação do serviço.
  - **Bronze**: Mais de 6 e até 12 meses.
  - **Silver**: Mais de 12 até 36 meses.
  - **Gold**: Mais de 36 até 60 meses.
  - **Platinum**: Acima de 60 meses.

> A pipeline deve ser feita em cloud AWS,com execução manual e o destino dos dados é o Redshift


# 2. Entendimento dos dados (análise exploratória dos dados)

In [1]:
import pandas as pd

In [3]:
df = pd.read_csv('/content/WA_Fn-UseC_-Telco-Customer-Churn.csv')

In [5]:
df.head()

Unnamed: 0,customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,...,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
0,7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
1,5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,...,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
2,3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,...,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
3,7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,...,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
4,9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes


In [8]:
df.dtypes

customerID           object
gender               object
SeniorCitizen         int64
Partner              object
Dependents           object
tenure                int64
PhoneService         object
MultipleLines        object
InternetService      object
OnlineSecurity       object
OnlineBackup         object
DeviceProtection     object
TechSupport          object
StreamingTV          object
StreamingMovies      object
Contract             object
PaperlessBilling     object
PaymentMethod        object
MonthlyCharges      float64
TotalCharges         object
Churn                object
dtype: object

In [10]:
df['SeniorCitizen'].value_counts()

0    5901
1    1142
Name: SeniorCitizen, dtype: int64

In [11]:
df['tenure'].value_counts()

1     613
72    362
2     238
3     200
4     176
     ... 
28     57
39     56
44     51
36     50
0      11
Name: tenure, Length: 73, dtype: int64

# 3. Quebrar o problema em pequenas partes / 4. Realizando as transformações necessárias

## Demanda 1: Filtrar apenas os dados relevantes para análise - somente as primeiras 6 colunas

In [12]:
df = df[['customerID', 'gender', 'SeniorCitizen', 'Partner', 'Dependents', 'tenure']]
df.head()

Unnamed: 0,customerID,gender,SeniorCitizen,Partner,Dependents,tenure
0,7590-VHVEG,Female,0,Yes,No,1
1,5575-GNVDE,Male,0,No,No,34
2,3668-QPYBK,Male,0,No,No,2
3,7795-CFOCW,Male,0,No,No,45
4,9237-HQITU,Female,0,No,No,2


In [13]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7043 entries, 0 to 7042
Data columns (total 6 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   customerID     7043 non-null   object
 1   gender         7043 non-null   object
 2   SeniorCitizen  7043 non-null   int64 
 3   Partner        7043 non-null   object
 4   Dependents     7043 non-null   object
 5   tenure         7043 non-null   int64 
dtypes: int64(2), object(4)
memory usage: 330.3+ KB


## Demanda 2: Mapear a flag de SeniorCitizen para true/false

In [14]:
df['SeniorCitizen'] = df['SeniorCitizen'].replace({0: False, 1: True})
df['SeniorCitizen'].value_counts()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['SeniorCitizen'] = df['SeniorCitizen'].replace({0: False, 1: True})


False    5901
True     1142
Name: SeniorCitizen, dtype: int64

## Demanda 3: Normalização de informações booleanas - mapear as colunas Partner e	Dependents para true/false, para seguir o mesmo padrão da demanda 2.

In [15]:
df['Partner'] = df['Partner'].replace({'Yes': True, 'No': False})
df['Dependents'] = df['Dependents'].replace({'Yes': True, 'No': False})
print(df['Partner'].value_counts())
df['Dependents'].value_counts()

False    3641
True     3402
Name: Partner, dtype: int64


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['Partner'] = df['Partner'].replace({'Yes': True, 'No': False})
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['Dependents'] = df['Dependents'].replace({'Yes': True, 'No': False})


False    4933
True     2110
Name: Dependents, dtype: int64

## Demanda 4: Classifique os clientes com os seguintes critérios - new(até 6 meses de contrataçaõ do serviço), bronze(mais que 6 e até 12), silver(mais que 12 até 36), gold(mais que 36 até 60) e platinum(acima de 60)

In [16]:
# Função para classificar com base na coluna tenure
def classificar_clientes(tenure):
    if tenure <= 6:
        return 'new'
    elif tenure <= 12:
        return 'bronze'
    elif tenure <= 36:
        return 'silver'
    elif tenure <= 60:
        return 'gold'
    else:
        return 'platinum'

# Aplicando a função à coluna tenure e armazenando na nova coluna Classification
df['classificacao'] = df['tenure'].apply(classificar_clientes)
df.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['classificacao'] = df['tenure'].apply(classificar_clientes)


Unnamed: 0,customerID,gender,SeniorCitizen,Partner,Dependents,tenure,classificacao
0,7590-VHVEG,Female,False,True,False,1,new
1,5575-GNVDE,Male,False,False,False,34,silver
2,3668-QPYBK,Male,False,False,False,2,new
3,7795-CFOCW,Male,False,False,False,45,gold
4,9237-HQITU,Female,False,False,False,2,new


In [17]:
df['classificacao'].value_counts()

silver      1856
gold        1594
new         1481
platinum    1407
bronze       705
Name: classificacao, dtype: int64

# 5. SCRIPT PARA o JOB DO GLUE

## Bibliotecas e suas funções

### boto3
- **O que é?** SDK (Software Development Kit) oficial da Amazon para Python.
- **Para que serve?** Permite que desenvolvedores Python interajam e manipulem recursos da AWS (Amazon Web Services) de forma programática.

### BytesIO (da biblioteca `io`)
- **O que é?** Uma classe que fornece uma interface de "arquivo em memória" para sequências de bytes.
- **Para que serve?** Usado para simular um arquivo em memória, permitindo que operações de leitura e gravação sejam feitas sobre bytes como se estivessem trabalhando com arquivos reais.
- **Como isso se relaciona com o S3?** O Amazon S3 armazena arquivos em formatos binários (sequências de bytes), independentemente do tipo de arquivo (texto, imagem, vídeo, etc.). Quando você usa SDKs, como o boto3, para buscar arquivos do S3, o conteúdo é geralmente retornado como um fluxo de bytes.

  Para muitas operações, especialmente quando você deseja manipular ou processar esses dados com outras ferramentas ou bibliotecas, pode ser útil tratar esse fluxo de bytes como um "arquivo". No entanto, uma vez que você não está lendo de um arquivo físico no sistema de arquivos, mas sim de um serviço na nuvem, você precisa de uma forma de "enganar" essas ferramentas para fazê-las acreditar que estão trabalhando com um arquivo real.

### psycopg2
- **O que é?** É uma biblioteca Python que facilita a conexão e interação com bancos de dados PostgreSQL, incluindo o Amazon Redshift que é baseado no PostgreSQL.
- **Para que serve?** É usada para estabelecer conexões com bancos de dados PostgreSQL, enviar comandos SQL, executar consultas e manipular os resultados. Ela fornece uma interface eficiente e Pythonic para interagir com bancos de dados PostgreSQL.
- **Como isso se relaciona com o Redshift?** O Amazon Redshift é um serviço de armazenamento de dados em nuvem baseado no PostgreSQL. Assim, a `psycopg2` pode ser utilizada para conectar e interagir diretamente com instâncias Redshift, permitindo operações como criação de tabelas, inserção de dados, consulta e muito mais.

### sys
- **O que é?**  Uma biblioteca embutida do Python.
- **Para que serve?** Proporciona acesso a algumas variáveis de ambiente e funções que interagem com o interpretador Python, permitindo manipulações como passar argumentos de linha de comando e recuperar informações específicas do sistema.

### getResolvedOptions (da biblioteca awsglue.utils)
- **O que é?**  Uma função específica do SDK do AWS Glue para Python.
- **Para que serve?** Usado para recuperar os parâmetros passados a um GlueJob.
- **Como isso se relaciona com o Redshift?** Facilita o uso de parâmetros dinâmicos em jobs do AWS Glue, permitindo que scripts Python leiam valores definidos durante a criação ou execução do job.

In [None]:
import pandas as pd # para manipulação de dados
import boto3   # Importando o boto3, ele permite que os desenvolvedores Python interajam com serviços AWS, como S3.
from io import BytesIO  # Permite tratar bytes em memória como um "arquivo virtual" para leitura.
import psycopg2 # Para conectar e interagir com o banco de dados Redshift
import sys # Acesso a funções e variáveis do sistema.
from awsglue.utils import getResolvedOptions # Recupera parâmetros definidos no GlueJob.

########## Extração
print('Iniciando extração de dados')
# Inicializa o cliente do boto3 para interagir com o serviço Amazon S3
s3 = boto3.client('s3')

# Define o nome do bucket S3 onde o arquivo CSV está armazenado e o caminho (chave) do objeto dentro do bucket S3 que será acessado
bucket_name = "fonte-pipeline-etl"
object_key = "WA_Fn-UseC_-Telco-Customer-Churn.csv"

# Obtenção do objeto CSV do S3
response = s3.get_object(Bucket=bucket_name, Key=object_key)

# Lendo o conteúdo do objeto obtido em um buffer temporário.
buffer = BytesIO(response['Body'].read())

# Lendo o buffer diretamente com o pandas para obter um DataFrame.
print(f'Iniciando leitura dos dados do arquivo {object_key} no bucket {bucket_name}')
df = pd.read_csv(buffer)
print('Dados lidos com sucesso')

########## Transformação
print('Iniciando Transformações de dados')
# Demanda 1
df = df[['customerID', 'gender', 'SeniorCitizen', 'Partner', 'Dependents', 'tenure']]
print('Demanda 1 DONE')
# Demanda 2
df['SeniorCitizen'] = df['SeniorCitizen'].replace({0: False, 1: True})
print('Demanda 2 DONE')
# Demanda 3
df['Partner'] = df['Partner'].replace({'Yes': True, 'No': False})
df['Dependents'] = df['Dependents'].replace({'Yes': True, 'No': False})
print('Demanda 3 DONE')
# Demanda 4
def classificar_clientes(tenure):
    if tenure <= 6:
        return 'new'
    elif tenure <= 12:
        return 'bronze'
    elif tenure <= 36:
        return 'silver'
    elif tenure <= 60:
        return 'gold'
    else:
        return 'platinum'
df['classificacao'] = df['tenure'].apply(classificar_clientes)
print('Demanda 4 DONE')
print(f'Visualizando amostra de dados a serem inseridos no Redshift \n {df.head()}')

# Limitando para 100 registros para economizar tempo de processamento
df = df.head(100)

########## Carregamento(Load)
print('Iniciando Carregamento de dados no Redshift')

# Informações de Conexão com o Redshift
args = getResolvedOptions(sys.argv, [
    'REDSHIFT_HOST',
    'REDSHIFT_DBNAME',
    'REDSHIFT_PORT',
    'REDSHIFT_USER',
    'REDSHIFT_PASSWORD'
])

redshift_host = args['REDSHIFT_HOST']
redshift_dbname = args['REDSHIFT_DBNAME']
redshift_port = args['REDSHIFT_PORT']
redshift_user = args['REDSHIFT_USER']
redshift_password = args['REDSHIFT_PASSWORD']

# Estabelecendo conexão
conn = psycopg2.connect(
    host=redshift_host,
    dbname=redshift_dbname,
    user=redshift_user,
    password=redshift_password,
    port=redshift_port
)

# Criando a tabela, se ela ainda não existir
cursor = conn.cursor()

# Verificando e excluindo a tabela, caso ela exista.  Para fins didáticos, não para produção.
drop_table_query = """
DROP TABLE IF EXISTS dados_finais;
"""
cursor.execute(drop_table_query)

# Criando a tabela
create_table_query = """
CREATE TABLE dados_finais (
    customerID VARCHAR(255),
    gender VARCHAR(50),
    SeniorCitizen BOOLEAN,
    Partner BOOLEAN,
    Dependents BOOLEAN,
    tenure INT,
    classificacao VARCHAR(50)
);
"""
cursor.execute(create_table_query)
conn.commit()

# Inserindo dados do DataFrame na tabela
print('Inserindo dados no Redshift')
for index, row in df.iterrows():
    insert_query = """
    INSERT INTO dados_finais (
        customerID, gender, SeniorCitizen, Partner, Dependents, tenure, classificacao
    ) VALUES (%s, %s, %s, %s, %s, %s, %s);
    """
    cursor.execute(insert_query, tuple(row))
    conn.commit()

cursor.close()
conn.close()
print(f'Carregamento de dados no Redshift concluído com sucesso!Foram carregados {df.shape[0]} registros na tabela')