## Criar um ambiente virtual e instalar todos os pacotes Python, via pip install.
    Aqui é assumido que o python e o gerenciador de pacotes pip já estão 
    instalados e funcionando em seu ambiente.


### Criando um ambiente virtual:
    python -m venv .venv

### Ativando o ambiente virtual Python:
    source .venv/bin/activate

### Os pacotes Python, podem ser instalados usando com requirements.txt, na mesma pasta do script:
    pip install -r requirements.txt

## Importar bibliotecas que foram instaladas via pip


In [None]:
import requests
import json
import pandas as pd
from pandas.io.json import json_normalize
import numpy as np
import geopy
from geopy.geocoders import Nominatim
from geopy.point import Point
from cryptography.fernet import Fernet
from datetime import datetime
from google.oauth2.service_account import Credentials
from google.oauth2 import service_account
import google.auth
from google.auth.transport.requests import AuthorizedSession
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas_gbq as gbq




## Gerar um token para acessar a API
    O token para acessar a API da begrowth é fornecido após cadastrar um
    usuário.

    Informar os dados Nome e E-mail nos campos full_name e email.
    
    Após fazer um request, é retornado o usuário e o token para acessar a API.
    Por fim, imprime na tela o json obtido.

In [None]:
headers = {
    'accept': 'application/json',
    'Content-Type': 'application/json',
}

json_data = {
    'full_name': 'Fabiano Moreira Alves',
    'email': 'fabianomalves@proton.me',
}

access_token = requests.post('https://begrowth.deta.dev/user/', headers=headers, json=json_data)
access_token_json = access_token.json()

print(access_token_json)


## Separando o usuário e o token para acessar a API
    Como o request retorna um json(dicionário) é preciso separar o dicionário
    e pegar somente a informação do token.
    
    Após isso, imprime na tela somente o token.

In [None]:
# split dictionary into keys and values
keys = []
values = []
items = access_token_json.items()
for item in items:
    keys.append(item[0]), values.append(item[1])
 
# printing keys and values separately
string_acess_token = str(values[-1])
print(string_acess_token)


## Consumir a api, chamando-a e passando o token adquirido
    Concatenando a url https://begrowth.deta.dev/token=access_token, junto do
    token obtido é necessário para acessar os dados da API.

    Para isso, concatenei a url com o token obtido anteriormente e salvei numa
    variável.
    
    Exibindo a url, concatenando com o token obtido.

In [None]:
url_dev = "https://begrowth.deta.dev/token="
url_dev_with_token = url_dev + string_acess_token
print(url_dev_with_token)

## Normalizar o json recebido pelo request, inserindo ele num dataframe Pandas
    Como os dados do obtidos pelo requests são um json, 
    foi feita uma "normalização dos dados e foi inserido num dataframe.

    Processo feito para trabalhar melhor com os dados, principalmente com campo
    address, que contem outro dicionário dentro da estrutura do json.
    
    Exibindo as primeiras linhas do dataframe.

In [None]:
data = json.loads(requests.get(url_dev_with_token).text)
df_json_normalize = pd.json_normalize(data)
df_json_normalize.head()


## Procurando linhas duplicadas:
    Ao exibir as primeiras linhas do dataframe, já exibiam algumas linhas
    duplicadas. 
    
    Filtrando os duplicados pelo id dos usuários e retornando 
    todos os duplicados.

In [None]:
duplicate_rows = df_json_normalize[df_json_normalize.duplicated(['id', ])]
print(duplicate_rows)


## Removendo as linhas duplicadas
    Dando um drop nas linhas duplicadas, filtrando pelo 'id' e salvando
    em outro dataframe.

    Depois exibe somente as linhas distintas.

In [None]:
df_distinct_id_rows = df_json_normalize.drop_duplicates(subset=['id'])
print(df_distinct_id_rows)


## Fazendo um reverse geocoding 
    Após normalizar os dados, foram criados 3 novos campos, 
    destrinchando o campo address. 

    Campos address.geo_latitude, address.geo_longitude, address.country.

    Para conseguir a informação do estado, 
    é necessário o processo de reverse geocoding, 
    onde pegamos a latitude e longitude e filtramos as informações que queremos,
    no caso o estado. 
    
    Porém, verifiquei que os nomes dos estados
    apresentam acentos, o que é um problema na hora de fazer os selects.
    
    Para isso, peguei o campo ISO 3166-2, que é um padrão mundial de siglas
    por estados, gerando o padrão BR-UF.
    Como teremos os UFs, será melhor para fazer os selects.

In [None]:
# Create a geocoder object using the Nominatim API
geolocator = Nominatim(user_agent="my_geocoder_state")

# Define a function to reverse geocode the state
def get_state(lat, lng):
  # Use the geocoder object to reverse geocode the coordinates
  location = geolocator.reverse((lat, lng))
  # Extract the state code from the response, using ISO3166-2-lvl4
  state = location.raw['address']['ISO3166-2-lvl4']
  return state

# Apply the function to each row of the DataFrame and store the result in a new column

df_distinct_id_rows.loc[:, ['address_state']] = df_distinct_id_rows.apply(lambda x: get_state(x['address.geo_latitude'], x['address.geo_longitude']), axis=1)



# Display a resulting DataFrame
df_distinct_id_rows.head()



## Salvando o novo dataframe num arquivo csv.
    Como o processo de reverse geocodin demora demais, 
    salvei o dataframe localmente, 
    para não demorar muito, 
    caso fosse necessário rodar novamente o script.

    Conseguiria trabalhar desse ponto em diante com o arquivo fisico,
    caso necessário.

In [None]:
df_distinct_id_rows.to_csv('../data_engineer_test/df_distinct_id_rows.csv', index=False)

# Read csv file

In [None]:
df_distinct_id_rows = pd.read_csv('../data_engineer_test/df_distinct_id_rows.csv')
df_distinct_id_rows.head()

## Descriptografar o campo CPF
    O campo CPF está criptografado, pela criptografia Fernet.
    
    Para isso, é necessário usar a chave Fernet, passada por e-mail.
    

In [None]:
fernet = Fernet(b'ekkxXo0uHWRkIbHqHrLS4gaMj2hWTYMJyPTAbi9INGI=')

df_distinct_id_rows['cpf'] = df_distinct_id_rows['cpf'].apply(lambda x: fernet.decrypt(x.encode()))

df_distinct_id_rows.head()



## Salvando o arquivo descriptografado num csv

In [None]:

df_distinct_id_rows.to_csv('../data_engineer_test/df_decrypt_cpf.csv', index=False)


## Lendo o arquivo csv, cujo cpf foi descriptografado.

In [None]:
df_decrypt_cpf = pd.read_csv('../data_engineer_test/df_decrypt_cpf.csv' )
df_decrypt_cpf.head()


## Formatando a coluna CPF
    Após descriptografar o cpf, a coluna ficou um b de byte, 
    oriundo da chave de descriptografar.

    Fazendo a remoção das informações que são desnecessárias.

In [None]:
df_decrypt_cpf['cpf'] = df_decrypt_cpf['cpf'].apply(lambda x: x[2: -1])
df_decrypt_cpf.head()


## Salvando o arquivo descriptografado e formatado num csv

In [None]:
df_decrypt_cpf['cpf'] = df_decrypt_cpf['cpf'].astype(str)
df_decrypt_cpf.to_csv('../data_engineer_test/df_decrypt_formatting_cpf.csv', index=False)



## Lendo o arquivo csv, descriptografado e formatado e transformando num df

In [None]:
pd.set_option('display.max_rows', 100)
df_decrypt_formatting_cpf = pd.read_csv('../data_engineer_test/df_decrypt_formatting_cpf.csv', dtype={'cpf': str})
print(df_decrypt_formatting_cpf.head(20))

## Removendo o prefixo "BR-" da coluna address_state
    Para deixar a coluna mais clean, 
    manter somente as UFs no campo address_state

In [None]:
df_decrypt_formatting_cpf["address_state"] = df_decrypt_formatting_cpf["address_state"].str.replace("BR-", "")
df_decrypt_formatting_cpf.head()

## Inserindo colunas dt_insert e candidate_name
    Inserindo as colunas dt_insert e candidate_name
    conforme solicitado e prenchendo com um timestamp e o nome.

In [None]:
# Create a variable now, with the actual timestamp
now = datetime.now()

# Create new columns
df_decrypt_formatting_cpf = df_decrypt_formatting_cpf.assign(
    dt_insert=now,
    candidate_name='Fabiano Moreira Alves'
)

df_decrypt_formatting_cpf.head()


## Alterar os nomes das colunas, inserindo um '_' ao invés de '.'
    Removendo os pontos dos nomes das colunas e trocando por underscores.
    Processo necessário para inseir os dados no Google BigQuery.


In [None]:
df_decrypt_formatting_cpf = df_decrypt_formatting_cpf.rename(columns=lambda x: x.replace('.', '_'))

df_decrypt_formatting_cpf.head()


## Salvando o dataframe após as transformações.

In [None]:
df_decrypt_formatting_cpf['cpf'] = df_decrypt_formatting_cpf['cpf'].astype(str)
df_decrypt_formatting_cpf.to_csv('../data_engineer_test/bg_data_enginner_test_fabiano.csv', index=False)



In [None]:
df = pd.read_csv('../data_engineer_test/bg_data_enginner_test_fabiano.csv', dtype={'cpf': str}, )
print(df.head(30))

## Conectar no big query e fazer um select
### Tabela: bg_users.brazilian_state

In [None]:

project_id = 'begrowth-user-api-demo'
dataset_id = 'bg_users'

# Set the path to your service account file
service_account_file = '/home/fabiano/Projects/data_engineer_test/svc-data-engineer-test.json'

# Load the service account credentials from the file
credentials = service_account.Credentials.from_service_account_file(service_account_file)

# Create a BigQuery client using your service account credentials
client = bigquery.Client(credentials=credentials, project=project_id)

sql = """
SELECT 
    *
FROM 
begrowth-user-api-demo.bg_users.brazilian_state
"""
df = gbq.read_gbq(sql, project_id = project_id, credentials=credentials, progress_bar_type=None)

print(df)




## Conectar no big query e fazer um select
### Tabela: begrowth-user-api-demo.bg_users.cpf_state

In [None]:
project_id = 'begrowth-user-api-demo'
dataset_id = 'bg_users'

# Set the path to your service account file
service_account_file = '/home/fabiano/Projects/data_engineer_test/svc-data-engineer-test.json'

# Load the service account credentials from the file
credentials = service_account.Credentials.from_service_account_file(service_account_file)

# Create a BigQuery client using your service account credentials
client = bigquery.Client(credentials=credentials, project=project_id)

sql = """
SELECT 
    *
FROM 
begrowth-user-api-demo.bg_users.cpf_state
"""
df = gbq.read_gbq(sql, project_id = project_id, credentials=credentials, progress_bar_type=None)

print(df)


## Create a service account with the key provided in email

In [40]:
df = pd.read_csv('../data_engineer_test/bg_data_enginner_test_fabiano.csv', dtype={'cpf': str})




# Set the path to your service account file
service_account_file = '/home/fabiano/Projects/data_engineer_test/svc-data-engineer-test.json'

# Load the service account credentials from the file
credentials = service_account.Credentials.from_service_account_file(service_account_file)

# Create a BigQuery client using your service account credentials
client = bigquery.Client(credentials=credentials, project=project_id)

project_id = 'begrowth-user-api-demo'
dataset_id = 'bg_users'
table_id = 'bg_data_enginner_test_fabiano_moreira_alves'


gbq.to_gbq(df, f"{project_id}.{dataset_id}.{table_id}", project_id=project_id, if_exists = 'replace', credentials=credentials)

Forbidden: 403 GET https://bigquery.googleapis.com/bigquery/v2/projects/begrowth-user-api-demo/datasets/bg_users/tables/bg_data_enginner_test_fabiano_moreira_alves?prettyPrint=false: Access Denied: Table begrowth-user-api-demo:bg_users.bg_data_enginner_test_fabiano_moreira_alves: Permission bigquery.tables.get denied on table begrowth-user-api-demo:bg_users.bg_data_enginner_test_fabiano_moreira_alves (or it may not exist).