In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

import requests
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, IntegerType, FloatType, DecimalType

from pyspark.sql.functions import col, when, mean, stddev, count, avg, sum, monotonically_increasing_id

import psycopg2
from psycopg2 import Error

from datetime import datetime

from decimal import *
import json

###Inicializando SparkSeesion

In [2]:
#Inicializando SparkSession

spark = SparkSession.builder \
      .appName("Qualidade_de_dados") \
      .config("spark.jars.packages", "org.postgresql:postgresql:42.7.4") \
      .getOrCreate()

KeyboardInterrupt: 

###Função para extrair dados da api fornecida.

In [None]:
def extract_data(url_api, headers=None, params=None):
  try:
    response = requests.get(url_api,headers=headers, params=params)
    response.raise_for_status()
    print(f"Response:{response.json()}")
    return response.text
  except requests.exceptions.RequestException as e:
    print(f"Erro ao acessar API: {e}")
    return None




In [None]:
def define_schema():

  rating = StructType([
      StructField("rate", DecimalType(10,2), True),
      StructField("count", IntegerType(), True),
  ])

  return StructType([
      StructField("id", IntegerType(), True),
      StructField("title", StringType(), True),
      StructField("price", DecimalType(10,2), True),
      StructField("description", StringType(), True),
      StructField("category", StringType(), True),
      StructField("image", StringType(), True),
      StructField("rating", rating, True)

  ])


Configurar Api

In [None]:
url_api = "https://fakestoreapi.com/products"


Configurar Banco de dados:

###Extrair dados da api fornecida:
#### https://fakestoreapi.com/products

In [None]:
data_api = extract_data(url_api= url_api)

In [None]:
data = json.loads(data_api)

#Normalizando valores para decimal
for item in data:
  item["price"] = Decimal(str(item["price"]))
  item["rating"]["rate"] = Decimal(str(item["rating"]["rate"]))


schema = define_schema()
df = spark.createDataFrame(data, schema = schema)

df.show(5)


### Tratamento dos valores nulos, inconsistentes e outliers

In [None]:
# Removendo valores nulos
df_clean = df.dropna(subset=["price","category","rating.rate"])

# Adicionar uma coluna temporária para extrair rating.rate
df_clean = df_clean.withColumn("rate", col("rating.rate"))
# Filtro de dados inconsistentes

df_clean = df_clean.filter((col("price") >= 0) & (col("rate").between(0, 5)))


# Removendo outliers com o método IQR (Intervalo Interquartil)
quantiles_price = df_clean.approxQuantile("price", [0.25, 0.75], 0.05)
quantiles_rate = df_clean.approxQuantile("rate", [0.25, 0.75], 0.05)

# Q1, Q3 e IQR para price
q1_price, q3_price = quantiles_price[0], quantiles_price[1]
iqr_price = q3_price - q1_price
lower_bound_price = q1_price - 1.5 * iqr_price
upper_bound_price = q3_price + 1.5 * iqr_price

#Q1,Q3 e IQR para rate
q1_rate, q3_rate = quantiles_rate[0], quantiles_rate[1]
iqr_rate = q3_rate - q1_rate
lower_bound_rate = q1_rate - 1.5 * iqr_rate
upper_bound_rate = q3_rate + 1.5 * iqr_rate

df_clean = df_clean.filter(
    (col("price") >= lower_bound_price) & (col("price") <= upper_bound_price) &
    (col("rate") >= lower_bound_rate) & (col("rate") <= upper_bound_rate)
)

#Filtros para preco >= 100 e avaliação >= 3.5
df_clean = df_clean.filter((col("price") >= 100) & (col("rate") >= 3.5))

df_clean.show()

### Sumarizaçao por categoria, preço medio, avaliação media

In [None]:
df_summary = df_clean.groupBy("category" ).agg(
    avg("price").alias("preco_medio"),
    avg("rate").alias("avaliacao_media")
).withColumnRenamed("category", "categoria")

df_summary.show()

In [None]:
# Configurações do PostgreSQL
db_properties = {
    "url": "jdbc:postgresql://localhost:5432/desafio_tecnico",  # Substitua por sua URL
    "driver": "org.postgresql.Driver",
    "user": "desafio_tecnico_user",  # Substitua pelo seu usuário
    "password": "desafiotecnico123",  # Substitua pela sua senha
    "dbtable_produtos": "produtos",
    "dbtable_summary": "categoria_media"
}

db_config = {
    "host": "localhost",
    "port": "5432",
    "database": "desafio_tecnico",  # Substitua pelo seu banco
    "user": "desafio_tecnico_user",  # Substitua pelo seu usuário
    "password": "desafiotecnico123"  # Substitua pela sua senha
}

### Função para salvar Dataframe no PostreSQL

In [None]:
def save_to_postgres(df, table_name, db_properties, mode="append"):
    df.write \
        .format("jdbc") \
        .option("url", db_properties["url"]) \
        .option("dbtable", table_name) \
        .option("driver", db_properties["driver"]) \
        .option("user", db_properties["user"]) \
        .option("password", db_properties["password"]) \
        .mode(mode) \
        .save()

Dividir df_clean e salvar em tabela 'produtos' com transação

In [None]:
try:
    # Estabelecer conexão com psycopg2 para controle transacional
    conn = psycopg2.connect(
        host=db_config["host"],
        port=db_config["port"],
        database=db_config["database"],
        user=db_config["user"],
        password=db_config["password"]
    )
    conn.autocommit = False  # Desativar commit automático
    cursor = conn.cursor()

    cursor.execute("""
        CREATE TABLE IF NOT EXISTS produtos (
            id INTEGER,
            title TEXT,
            price FLOAT,
            description TEXT,
            category TEXT,
            image TEXT,
            rating_rate FLOAT,
            rating_count INTEGER
        );
    """)
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS categoria_media (
            category TEXT,
            mean_price FLOAT,
            mean_rate FLOAT
        );
    """)

    # Partes menores de 5 registros para facilitar o processamento e a inserção no banco de dados.
    df_clean_with_index = df_clean.withColumn("row_index", monotonically_increasing_id())
    total_rows = df_clean_with_index.count()
    batch_size = 5
    num_batches = (total_rows + batch_size - 1)

    for i in range(num_batches):
        start_index = i * batch_size
        end_index = start_index + batch_size
        df_batch = df_clean_with_index.filter(
            (col("row_index") >= start_index) & (col("row_index") < end_index)
        ).drop("row_index")

        # Ajustar estrutura para corresponder à tabela (desaninhar rating)
        df_batch = df_batch.select(
            col("id"),
            col("title"),
            col("price"),
            col("description"),
            col("category"),
            col("image"),
            col("rating.rate").alias("rate"),
            col("rating.count").alias("count")
        )

        print(f"Salvando batch {i+1} com {df_batch.count()} registros na tabela 'produtos'...")
        save_to_postgres(df_batch, db_properties["dbtable_produtos"], db_properties, mode="append")



    print("Salvando df_summary na tabela 'categoria_media'...")
    save_to_postgres(df_summary, db_properties["dbtable_summary"], db_properties, mode="overwrite")

    conn.commit()
    print("Transação confirmada com sucesso.")


except Exception as e:
    print(f"Erro durante a persistência: {str(e)}")
    if 'conn' in locals():
        conn.rollback()
        print("Transação revertida devido a erro.")
    raise

finally:
    if 'cursor' in locals():
        cursor.close()
    if 'conn' in locals():
        conn.close()
    spark.stop()