#Transformação (Prata)

Utilizando os dados da vamada Bronze vou transformar e organizar em estruturas mais adequadas para análise preparando para a modelagem analítica.

In [0]:
#leitura teste
df_business_bronze = spark.table("bronze_yelp_business_raw")
df_review_bronze   = spark.table("bronze_yelp_review_raw")

display(df_business_bronze.limit(5))
display(df_review_bronze.limit(5))

### Criando a `silver_yelp_business`:

In [0]:
from pyspark.sql import functions as F

df_business_bronze = spark.table("bronze_yelp_business_raw")

df_business_silver = (
    df_business_bronze
    .select(
        F.col("business_id").cast("string").alias("business_id"),
        F.col("name").cast("string").alias("name"),
        F.col("city").cast("string").alias("city"),
        F.col("state").cast("string").alias("state"),
        F.col("postal_code").cast("string").alias("postal_code"),
        F.col("latitude").cast("double").alias("latitude"),
        F.col("longitude").cast("double").alias("longitude"),
        F.col("stars").cast("double").alias("stars"),
        F.col("review_count").cast("int").alias("review_count"),
        F.col("is_open").cast("int").alias("is_open"),
        F.col("categories").cast("string").alias("categories")
    )
    .withColumn("name", F.trim(F.col("name")))
    .withColumn("city", F.trim(F.col("city")))
    .withColumn("state", F.trim(F.col("state")))
)
display(df_business_silver.limit(5))

In [0]:
#persiste
(df_business_silver
 .write
 .mode("overwrite")
 .format("delta")
 .saveAsTable("silver_yelp_business"))

In [0]:
#Validação
print("bronze rows:", df_business_bronze.count())
print("silver rows:", df_business_silver.count())

spark.sql("SELECT COUNT(*) AS n FROM silver_yelp_business").show()
spark.sql("SELECT MIN(stars) AS min_stars, MAX(stars) AS max_stars FROM silver_yelp_business").show()
spark.sql("SELECT is_open, COUNT(*) AS n FROM silver_yelp_business GROUP BY is_open ORDER BY is_open").show()

### Criando a `silver_yelp_review`:

In [0]:
from pyspark.sql import functions as F

df_review_bronze = spark.table("bronze_yelp_review_raw")

df_review_silver = (
    df_review_bronze
    .select(
        F.col("review_id").cast("string").alias("review_id"),
        F.col("business_id").cast("string").alias("business_id"),
        F.col("user_id").cast("string").alias("user_id"),
        F.col("stars").cast("double").alias("review_stars"),
        F.to_timestamp("date").alias("review_ts"),
        F.to_date("date").alias("review_date"),
        F.col("useful").cast("int").alias("useful"),
        F.col("funny").cast("int").alias("funny"),
        F.col("cool").cast("int").alias("cool"),
        F.col("text").cast("string").alias("text")
    )
    .withColumn("text", F.trim(F.col("text")))
)

display(df_review_silver.limit(5))

In [0]:
#Persiste
(df_review_silver
 .write
 .mode("overwrite")
 .format("delta")
 .saveAsTable("silver_yelp_review"))

In [0]:
#Validação
print("bronze rows:", df_review_bronze.count())
print("silver rows:", df_review_silver.count())

spark.sql("SELECT COUNT(*) AS n FROM silver_yelp_review").show()
spark.sql("SELECT MIN(review_stars) AS min_stars, MAX(review_stars) AS max_stars FROM silver_yelp_review").show()

# checks úteis para catálogo/qualidade
spark.sql("""
SELECT
  SUM(CASE WHEN review_id IS NULL THEN 1 ELSE 0 END) AS null_review_id,
  SUM(CASE WHEN business_id IS NULL THEN 1 ELSE 0 END) AS null_business_id,
  SUM(CASE WHEN review_date IS NULL THEN 1 ELSE 0 END) AS null_review_date
FROM silver_yelp_review
""").show()

### Check de integridade:

In [0]:
spark.sql("""
SELECT
  COUNT(*) AS total_reviews,
  SUM(CASE WHEN b.business_id IS NOT NULL THEN 1 ELSE 0 END) AS reviews_com_business_match,
  ROUND(100.0 * SUM(CASE WHEN b.business_id IS NOT NULL THEN 1 ELSE 0 END) / COUNT(*), 2) AS pct_match
FROM silver_yelp_review r
LEFT JOIN silver_yelp_business b
  ON r.business_id = b.business_id
""").show()