In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Iceberg") \
    .master("spark://spark-iceberg:7077") \
    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.iceberg.type", "hadoop") \
    .config("spark.sql.catalog.iceberg.warehouse", "s3a://warehouse/") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.hadoop.fs.s3a.access.key", "admin") \
    .config("spark.hadoop.fs.s3a.secret.key", "password") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9001") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

print("PySpark started")

Comandos gerais

In [None]:
# Criação do DataFrame vazio com o esquema desejado

from pyspark.sql.types import DoubleType, FloatType, LongType, StructType,StructField, StringType
from pyspark.sql.types import *

schema = StructType([
    StructField("Matchday", IntegerType(), True),
    StructField("Date", StringType(), True),
    StructField("Time", StringType(), True),
    StructField("Home Team", StringType(), True),
    StructField("homeScore", IntegerType(), True),
    StructField("homeXG", FloatType(), True),
    StructField("awayScore", IntegerType(), True),
    StructField("awayXG", FloatType(), True),
    StructField("Away Team", StringType(), True),
    StructField("Attendance", StringType(), True),  
    StructField("Referee", StringType(), True),
    StructField("Stadium", StringType(), True),
    StructField("Result", StringType(), True),
    StructField("*Additional Stats", StringType(), True)
])


df = spark.createDataFrame([], schema)
df.writeTo("football.premier_league").create()

In [None]:
# Lê o CSV e salva como Parquet, depois insere no Iceberg

# 1. Lê o CSV
df_csv = spark.read.option("header", "true").option("inferSchema", "true").csv("PremierLeagueMatches.csv")

# 2. Salva como Parquet
df_csv.write.mode("overwrite").parquet("dados/parquet/premier_league")

# 3. Lê o Parquet de volta 
df_parquet = spark.read.parquet("dados/parquet/premier_league")

# 4. Insere no Iceberg
df_parquet.writeTo("football.premier_league").append()

In [None]:
# Mostra a tabela inteira

# equivalente ao SELECT * FROM football.premier_league
spark.table("football.premier_league").show()

In [None]:
# Insere uma linha na tabela

# Cria uma linha (linha de exemplo) com a data já convertida para o tipo Date
data = [
    (1, "2022-08-05", "20:00", "Crystal Palace", 0, 1.2, 2, 1.0, "Arsenal", "25,286", "Anthony Taylor", "Selhurst Park", "A", "https://fbref.com//en/matches/e62f6e78/Crystal-Palace-Arsenal-August-5-2022-Premier-League")
]

# Aqui, você já passa a data como uma string, pois o PySpark vai fazer a conversão automaticamente ao inserir
df_single_row = spark.createDataFrame(data, schema)

# Insere a linha na tabela do Iceberg
df_single_row.writeTo("football.premier_league").append()

# Comando SQL equivalente:
# INSERT INTO football.premier_league (Matchday, Date, Time, `Home Team`, homeScore, homeXG, awayScore, awayXG, `Away Team`, Attendance, Referee, Stadium, Result, `*Additional Stats`)
# VALUES (1, '2022-08-05', '20:00', 'Crystal Palace', 0, 1.2, 2, 1.0, 'Arsenal', '25,286', 'Anthony Taylor', 'Selhurst Park', 'A', 'https://fbref.com//en/matches/e62f6e78/Crystal-Palace-Arsenal-August-5-2022-Premier-League');


In [None]:
# Apaga todos os registros onde o time visitante é o Arsenal

spark.sql('DELETE FROM football.premier_league WHERE `Away Team` = "Arsenal"')

In [None]:
#Mostra todos os registros onde o time visitante é o Arsenal

from pyspark.sql.functions import col


# Equivalente SQL:
# SELECT * FROM football.premier_league WHERE `Home Team` = 'Arsenal';
df_arsenal_home = spark.read.table("football.premier_league").filter(col("`Home Team`") == "Arsenal")
df_arsenal_home.show()

In [None]:
# Visualiza o dado 

df_result = spark.sql("""
    SELECT * FROM football.premier_league
    WHERE `Home Team` = 'Fulham'
      AND `Away Team` = 'Liverpool'
      AND Date = '2022-08-06'
""")
df_result.show()

In [None]:
# Atualiza o resultado de um jogo específico

df = spark.sql("""
    UPDATE football.premier_league
    SET homeScore = 3, awayScore = 1
    WHERE `Home Team` = 'Fulham'
      AND `Away Team` = 'Liverpool'
      AND Date = '2022-08-06'
""")

In [None]:

df = spark.sql("""
    DELETE FROM football.premier_league
    WHERE `Home Team` = 'Fulham'
      AND `Away Team` = 'Liverpool'
      AND Date = '2022-08-06'
""")