In [51]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,regexp_replace
import pandas as pd
from datetime import datetime




In [5]:
# Inicialize a sessão Spark
spark = SparkSession.builder \
    .appName("QualityAnalyses") \
    .getOrCreate()




In [9]:
# Carregar seus dados em um DataFrame Spark
input_s3_path_bancos = "s3://172684736408-trusted/Bancos/*.parquet"
input_s3_path_empregados = "s3://172684736408-trusted/Empregados/*.parquet"
input_s3_path_reclamacoes = "s3://172684736408-trusted/Reclamações/*.parquet"

df_bancos = spark.read.option("encoding", "UTF-8").parquet(input_s3_path_bancos)
df_empregados = spark.read.option("encoding", "UTF-8").parquet(input_s3_path_empregados)
df_reclamacoes = spark.read.option("encoding", "UTF-8").parquet(input_s3_path_reclamacoes)





In [53]:
# Verificar qualidade da coluna "CNPJ" em df_bancos
def verificar_qualidade_cnpj(df):
    # Filtrar valores CNPJ zerados
    cnpj_zerados = df[df['CNPJ'] == 0]  # Supondo que um CNPJ zerado seja representado por 14 zeros
    cnpj_zerados['Data'] = datetime.today().date()
    
    if not cnpj_zerados.empty:
        return cnpj_zerados
    else:
        df = pd.DataFrame(columns=['CNPJ', 'Nome'])  # Retorna um dataframe vazio
        df.loc[0] = ['null', 'null']
        df['Data'] = datetime.today().date()
        return df

# Verificar qualidade da coluna "Nome" em df_reclamacoes
def verificar_qualidade_nome(df):
    # Verificar valores nulos na coluna Nome
    nulos = df[df['Nome'].isnull()]
    nulos['Data'] = datetime.today().date()
    
    if not nulos.empty:
        return nulos
    else:
        df = pd.DataFrame(columns=['Nome', 'indice', 'qtd_total_reclamacoes', 'qtd_total_clientes'])
        df.loc[0] = ['null', 'null', 'null', 'null']
        df['Data'] = datetime.today().date()
        return df 

# Verificar qualidade da coluna "Nome" em df_empregados
def verificar_qualidade_nome_empregados(df):
    # Verificar valores em branco na coluna Nome
    em_branco = df[df['Nome'].str.strip() == ""]
    em_branco['Data'] = datetime.today().date()
    
    if not em_branco.empty:
        return em_branco
    else:
        df = pd.DataFrame(columns=['Nome', 'satisfacao_salario', 'satisfacao_empregado'])
        df.loc[0] = ['null', 'null', 'null']
        df['Data'] = datetime.today().date()
        return df

# Função para escrever um dataframe em um bucket S3
def escrever_dataframe_s3(df, bucket_name, object_key):
    csv_buffer = StringIO()
    df.to_csv(csv_buffer, index=False)
    
    s3 = boto3.client('s3')
    s3.put_object(Bucket=bucket_name, Key=object_key, Body=csv_buffer.getvalue())

# Chamando as funções de verificação
df_qualidade_cnpj = verificar_qualidade_cnpj(df_bancos.toPandas())
df_qualidade_nome = verificar_qualidade_nome(df_reclamacoes.toPandas())
df_qualidade_nome_empregados = verificar_qualidade_nome_empregados(df_empregados.toPandas())

#df['Data'] = datetime.today().date()

#df_qualidade_cnpj.write.format("parquet").mode("overwrite").option("path", f"{input_s3_path_reclamacoes}").save()
#df_qualidade_nome.write.format("parquet").mode("overwrite").option("path", f"{input_s3_path_reclamacoes}").save()
#df_qualidade_nome_empregados.write.format("parquet").mode("overwrite").option("path", f"{input_s3_path_reclamacoes}").save()




In [55]:
df_qualidade_cnpj = spark.createDataFrame(df_qualidade_cnpj)
df_qualidade_nome = spark.createDataFrame(df_qualidade_nome)
df_qualidade_nome_empregados = spark.createDataFrame(df_qualidade_nome_empregados)

TypeError: data is already a DataFrame


In [57]:
jdbc_url = "jdbc:sqlserver://database.cq2cmdvszmas.us-east-1.rds.amazonaws.com:1433"


connection_mssql_options = {
    "url": jdbc_url,
    "user": "admin",
    "password": "admin1234"}

tb = "ativ.dbo.banco_log"
df_qualidade_cnpj.write.mode("overwrite").jdbc(url=jdbc_url, \
                                                  table = tb, properties = connection_mssql_options)

tb = "ativ.dbo.empregados_log"
df_qualidade_nome_empregados.write.mode("overwrite").jdbc(url=jdbc_url, \
                                                  table = tb, properties = connection_mssql_options)

tb = "ativ.dbo.reclamacoes_log"
df_qualidade_nome.write.mode("overwrite").jdbc(url=jdbc_url, \
                                                  table = tb, properties = connection_mssql_options)


