# PySpark + Iceberg — Versão Final (Compatível com Codespaces)

In [45]:

!pip install pyspark==3.5.1
    



In [46]:

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("ExemploIcebergCompleto")
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1")
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.local.type", "hadoop")
    .config("spark.sql.catalog.local.warehouse", "warehouse_path")
    .getOrCreate()
)

print("Spark iniciado com Iceberg!")


25/11/05 20:01:47 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/11/05 20:01:47 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


Spark iniciado com Iceberg!


In [47]:

spark.sql("CREATE DATABASE IF NOT EXISTS local.filmes")

spark.sql("""
CREATE TABLE IF NOT EXISTS local.filmes.top100 (
    Rank STRING,
    Title STRING,
    Year STRING,
    Genre STRING,
    Director STRING,
    Main_Actor STRING,
    Country STRING,
    IMDb_Rating DOUBLE,
    Rotten_Tomatoes STRING,
    Runtime STRING,
    Language STRING,
    Oscars_Won STRING,
    Box_Office DOUBLE,
    Metacritic_Score STRING
)
USING iceberg
""")


print("Tabela Iceberg criada com sucesso!")


Tabela Iceberg criada com sucesso!


In [48]:

csv_path = "top_100_movies_full_best_effort.csv"

df = spark.read.option("header", True).csv(csv_path)
df = df.withColumnRenamed("IMDb Rating", "IMDb_Rating")        .withColumnRenamed("Box Office ($M)", "Box_Office")        .withColumnRenamed("Main Actor(s)", "Main_Actor")        .withColumnRenamed("Genre(s)", "Genre")        .withColumnRenamed("Rotten Tomatoes %", "Rotten_Tomatoes")        .withColumnRenamed("Runtime (mins)", "Runtime")        .withColumnRenamed("Oscars Won", "Oscars_Won")        .withColumnRenamed("Metacritic Score", "Metacritic_Score")

print("Dataset carregado!")
df.show(5)


Dataset carregado!
+----+--------------------+----+------------------+--------------------+--------------------+--------------------+-----------+---------------+-------+--------+----------+----------+----------------+
|Rank|               Title|Year|             Genre|            Director|          Main_Actor|             Country|IMDb_Rating|Rotten_Tomatoes|Runtime|Language|Oscars_Won|Box_Office|Metacritic_Score|
+----+--------------------+----+------------------+--------------------+--------------------+--------------------+-----------+---------------+-------+--------+----------+----------+----------------+
|   1|The Shawshank Red...|1994|             Drama|      Frank Darabont|Tim Robbins|Morga...|       United States|        9.3|             91|    142| English|         0|      58.0|              82|
|   2|       The Godfather|1972|       Crime|Drama|Francis Ford Coppola|Marlon Brando|Al ...|       United States|        9.2|             98|    175| English|         3|     246.1|    

In [49]:

df.writeTo("local.filmes.top100").createOrReplace()
print("Dados gravados na tabela Iceberg!")


Dados gravados na tabela Iceberg!


In [50]:

print("Exemplo SELECT:")
spark.sql("SELECT Title, Year, IMDb_Rating FROM local.filmes.top100 LIMIT 5").show()


Exemplo SELECT:
+--------------------+----+-----------+
|               Title|Year|IMDb_Rating|
+--------------------+----+-----------+
|The Shawshank Red...|1994|        9.3|
|       The Godfather|1972|        9.2|
|     The Dark Knight|2008|        9.0|
|The Godfather: Pa...|1974|        9.0|
|        12 Angry Men|1957|        9.0|
+--------------------+----+-----------+



In [53]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import os

schema = StructType([
    StructField("Rank", StringType(), True),
    StructField("Title", StringType(), True),
    StructField("Year", StringType(), True),
    StructField("Genre", StringType(), True),
    StructField("Director", StringType(), True),
    StructField("Main_Actor", StringType(), True),
    StructField("Country", StringType(), True),
    StructField("IMDb_Rating", DoubleType(), True),
    StructField("Rotten_Tomatoes", StringType(), True),
    StructField("Runtime", StringType(), True),
    StructField("Language", StringType(), True),
    StructField("Oscars_Won", StringType(), True),
    StructField("Box_Office", DoubleType(), True),
    StructField("Metacritic_Score", StringType(), True)
])

data = [(
    "101", "Novo Filme", "2025", None, None, None, None, 9.4,
    None, None, None, None, None, None
)]

novo_df = spark.createDataFrame(data, schema=schema)

print("Iniciando a inserção (append) na tabela do catálogo...")

# Comando correto para inserir na tabela do catálogo
novo_df.writeTo("local.filmes.top100").append()

print("Registro inserido com sucesso na tabela 'local.filmes.top100'!")

Iniciando a inserção (append) na tabela do catálogo...


[Stage 5:>                                                          (0 + 2) / 2]

Registro inserido com sucesso na tabela 'local.filmes.top100'!


                                                                                

In [54]:

print("Verificando o novo registro inserido:")
spark.sql("SELECT Rank, Title, Year, IMDb_Rating FROM local.filmes.top100 WHERE Rank = '101'").show()


Verificando o novo registro inserido:
+----+----------+----+-----------+
|Rank|     Title|Year|IMDb_Rating|
+----+----------+----+-----------+
| 101|Novo Filme|2025|        9.4|
+----+----------+----+-----------+



In [55]:

from pyspark.sql.functions import when, col

df_atual = spark.table("local.filmes.top100")
df_atualizado = df_atual.withColumn(
    "IMDb_Rating",
    when(col("Rank") == "101", 9.9).otherwise(col("IMDb_Rating"))
)

df_atualizado.writeTo("local.filmes.top100").overwritePartitions()
print("Registro atualizado com sucesso!")


Registro atualizado com sucesso!


In [56]:

df_filtrado = spark.table("local.filmes.top100").filter(col("Year") >= "2000")
df_filtrado.writeTo("local.filmes.top100").overwritePartitions()
print("Registros antigos deletados!")


Registros antigos deletados!


In [57]:

print("Tabela final após operações:")
spark.sql("SELECT Rank, Title, Year, IMDb_Rating FROM local.filmes.top100 LIMIT 10").show()
spark.stop()
print("Execução finalizada com sucesso!")


Tabela final após operações:
+----+--------------------+----+-----------+
|Rank|               Title|Year|IMDb_Rating|
+----+--------------------+----+-----------+
|   3|     The Dark Knight|2008|        9.0|
|   6|The Lord of the R...|2003|        8.9|
|   8|The Lord of the R...|2001|        8.8|
|   9|           Inception|2010|        8.8|
|  11|The Lord of the R...|2002|        8.7|
|  19|         City of God|2002|        8.6|
|  25|       Spirited Away|2001|        8.6|
|  28|            Parasite|2019|        8.6|
|  29|        Interstellar|2014|        8.6|
|  38|         The Pianist|2002|        8.5|
+----+--------------------+----+-----------+

Execução finalizada com sucesso!
