In [0]:
import requests
import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType

headers = {
    'accept': 'application/json',
    'x-access-token-ws': 'd1bf4238656c2167c5997e4081b549eedd0a25622a18740d8ad1a8a31ee22b15'
}

# -------------------- EXTRAÇÃO ---------------------------------------
url = 'https://api.irancho.com.br/api/animal/nutricoes'
response = requests.get(url, headers=headers)
nutricoes_list = response.json()
df_nutricoes_pandas = pd.json_normalize(nutricoes_list, sep='.')
print(f'nutricoes {response.status_code}')

# ✅ CONVERTER TUDO PARA STRING NO PANDAS
df_nutricoes_pandas = df_nutricoes_pandas.astype(str)

# Criar schema com tudo como String
schema = StructType([
    StructField(col, StringType(), True) 
    for col in df_nutricoes_pandas.columns
])

df_nutricoes = spark.createDataFrame(df_nutricoes_pandas, schema=schema)
df_nutricoes.write.mode("overwrite").saveAsTable("nutricoes_raw")

print("✅ Tabela criada com sucesso!")

In [0]:
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import (
    col,
    regexp_replace,
    schema_of_json,
    from_json,
    explode
)

# 1) --- Troca aspas simples por aspas duplas
df_clean = df_nutricoes.withColumn(
    "rows_clean",
    regexp_replace(col("rows"), "'", '"')
)

# 2) --- Troca valores Python pelos correspondentes JSON
df_clean = (
    df_clean
    .withColumn("rows_clean", regexp_replace("rows_clean", "None", "null"))
    .withColumn("rows_clean", regexp_replace("rows_clean", "True", "true"))
    .withColumn("rows_clean", regexp_replace("rows_clean", "False", "false"))
)

# 3) --- Pegar um exemplo para inferir o schema JSON
sample_json = df_clean.select("rows_clean").first()[0]

# 4) --- Inferir automaticamente o schema JSON
json_schema = (
    spark.range(1)
         .select(schema_of_json(sample_json).alias("schema"))
         .first()["schema"]
)

# 5) --- Converter STRING → ARRAY<STRUCT>
df_array = df_clean.withColumn(
    "rows_array",
    from_json(col("rows_clean"), json_schema)
)

# 6) --- Explodir a lista
df_exploded = df_array.select(explode("rows_array").alias("row"))

# 7) --- Expandir os campos internos
df_final = df_exploded.select("row.*")

df_exploded = df_final.select(
    explode("fichas_nutricao").alias("nutricao")
)

df_nutricoes_final=df_exploded.select("nutricao.*")

df_nutricoes_final.write.mode("overwrite").option("mergeSchema", "true").saveAsTable("nutricoes_raw")



In [0]:
%sql
select * from nutricoes_raw