In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, DoubleType

In [2]:
spark = SparkSession.builder \
  .appName("IcebergLocalDevelopment") \
  .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1') \
  .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
  .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
  .config("spark.sql.catalog.local.type", "hadoop") \
  .config("spark.sql.catalog.local.warehouse", "spark-warehouse/iceberg") \
  .getOrCreate()

25/04/21 14:50:54 WARN Utils: Your hostname, edsatc resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/04/21 14:50:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/ed/%c3%81rea%20de%20trabalho/engdados02/.venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ed/.ivy2/cache
The jars for the packages stored in: /home/ed/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-dfa34a82-4f01-455f-94b0-129bf4febf83;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.6.1 in central
:: resolution report :: resolve 151ms :: artifacts dl 3ms
	:: modules in use:
	org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.6.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spar

In [3]:
apoliceSchema = StructType([
    StructField("cod_apolice", IntegerType(), False),
    StructField("cod_cliente", IntegerType(), False),
    StructField("data_inicio_vigencia", DateType(), True),
    StructField("data_fim_vigencia", DateType(), True),
    StructField("valor_cobertura", DoubleType(), True),
    StructField("valor_franquia", DoubleType(), True),
    StructField("placa", StringType(), True)
])

df_apolice = spark.read \
    .format("csv") \
    .schema(apoliceSchema) \
    .option("header", "true") \
    .option("encoding", "UTF-8") \
    .load("../data/apolice.csv")

In [4]:
carroSchema = StructType([
    StructField("placa", StringType(), False),
    StructField("modelo", StringType(), True),
    StructField("chassi", StringType(), False),
    StructField("marca", StringType(), True),
    StructField("ano", IntegerType(), True),
    StructField("cor", StringType(), True),
])

df_carro = spark.read \
    .format("csv") \
    .schema(carroSchema) \
    .option("header", "true") \
    .option("encoding", "UTF-8") \
    .load("../data/carro.csv")

In [5]:
clienteSchema = StructType([
    StructField("cod_cliente", IntegerType(), False),
    StructField("nome", StringType(), True),
    StructField("cpf", StringType(), False),
    StructField("sexo", StringType(), True),
    StructField("endereco", StringType(), True),
    StructField("telefone_fixo", StringType(), True),
    StructField("telefone_celular", StringType(), True),
])

df_cliente = spark.read \
    .format("csv") \
    .schema(clienteSchema) \
    .option("header", "true") \
    .option("encoding", "UTF-8") \
    .load("../data/cliente.csv")

In [6]:
sinistroSchema = StructType([
    StructField("cod_sinistro", IntegerType(), False),
    StructField("placa", StringType(), False),
    StructField("data_sinistro", DateType(), True),
    StructField("hora_sinistro", StringType(), True),
    StructField("local_sinistro", StringType(), True),
    StructField("condutor", StringType(), True),
])

df_sinistro = spark.read \
    .format("csv") \
    .schema(sinistroSchema) \
    .option("header", "true") \
    .option("encoding", "UTF-8") \
    .load("../data/sinistro.csv")

In [7]:
df_cliente.show()

                                                                                

+-----------+--------------------+-----------+----+--------------------+--------------+----------------+
|cod_cliente|                nome|        cpf|sexo|            endereco| telefone_fixo|telefone_celular|
+-----------+--------------------+-----------+----+--------------------+--------------+----------------+
|          1|MARISA MELO OLIVEIRA|11111111111|   F|RUA JOSÉ WOSCH SO...|(41) 5096-4117|  (41) 5096-4117|
|          2|MURILO CARVALHO C...|22222222222|   M|RUA GEORGE BERNAN...|(21) 3944-5385|            NULL|
|          3|VINICIUS ROCHA RO...|33333333333|   M|                NULL|          NULL|            NULL|
|          4|CAROLINA ROCHA GOMES|44444444444|   F|                NULL|          NULL|            NULL|
|          5| ALINE SANTOS CASTRO|55555555555|   F|RUA ARMANDO PACAG...|(19) 7287-2893|  (19) 7287-2893|
|          6|LEILA CORREIA CAV...|66666666666|   F|RUA FRANCISCO D'A...|          NULL|            NULL|
|          7|SOPHIA CORREIA SA...|77777777777|   F|RUA 

In [8]:
spark.sql("DROP TABLE IF EXISTS local.apolice")
spark.sql("DROP TABLE IF EXISTS local.carro")
spark.sql("DROP TABLE IF EXISTS local.cliente")
spark.sql("DROP TABLE IF EXISTS local.sinistro")

spark.sql("""
CREATE TABLE IF NOT EXISTS local.apolice (
    cod_apolice INT,
    cod_cliente INT,
    data_inicio_vigencia DATE,
    data_fim_vigencia DATE,
    valor_cobertura DECIMAL(15, 2),
    valor_franquia DECIMAL(15, 2),
    placa STRING
) USING iceberg
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS local.carro (
    placa STRING,
    modelo STRING,
    chassi STRING,
    marca STRING,
    ano INT,
    cor STRING
) USING iceberg
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS local.cliente (
    cod_cliente INT,
    nome STRING,
    cpf STRING,
    sexo STRING,
    endereco STRING,
    telefone_fixo STRING,
    telefone_celular STRING
) USING iceberg
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS local.sinistro (
    cod_sinistro INT,
    placa STRING,
    data_sinistro DATE,
    hora_sinistro STRING,
    local_sinistro STRING,
    condutor STRING
) USING iceberg
""")

DataFrame[]

In [9]:
df_cliente.writeTo("local.cliente").overwritePartitions()
df_carro.writeTo("local.carro").overwritePartitions()
df_apolice.writeTo("local.apolice").overwritePartitions()
df_sinistro.writeTo("local.sinistro").overwritePartitions()

                                                                                

In [10]:
spark.sql("""
SELECT * FROM local.cliente
""").show()

spark.sql("""
SELECT * FROM local.sinistro
""").show()

                                                                                

+-----------+--------------------+-----------+----+--------------------+--------------+----------------+
|cod_cliente|                nome|        cpf|sexo|            endereco| telefone_fixo|telefone_celular|
+-----------+--------------------+-----------+----+--------------------+--------------+----------------+
|          1|MARISA MELO OLIVEIRA|11111111111|   F|RUA JOSÉ WOSCH SO...|(41) 5096-4117|  (41) 5096-4117|
|          2|MURILO CARVALHO C...|22222222222|   M|RUA GEORGE BERNAN...|(21) 3944-5385|            NULL|
|          3|VINICIUS ROCHA RO...|33333333333|   M|                NULL|          NULL|            NULL|
|          4|CAROLINA ROCHA GOMES|44444444444|   F|                NULL|          NULL|            NULL|
|          5| ALINE SANTOS CASTRO|55555555555|   F|RUA ARMANDO PACAG...|(19) 7287-2893|  (19) 7287-2893|
|          6|LEILA CORREIA CAV...|66666666666|   F|RUA FRANCISCO D'A...|          NULL|            NULL|
|          7|SOPHIA CORREIA SA...|77777777777|   F|RUA 

In [None]:
spark.sql("""
INSERT INTO local.cliente VALUES (10,'LUCAS OLIVEIRA', '1010101010', 'M', 'RUA TESTE', '', '(48) 9-88001118')
""")

spark.sql("""
SELECT * FROM local.cliente
""").show()

In [None]:
spark.sql("""
UPDATE local.cliente SET telefone_celular = '(48) 5096-4117' WHERE cod_cliente = 1
""")

spark.sql("""
SELECT * FROM local.cliente
""").show()


++
||
++
++

+-----------+--------------------+-----------+----+--------------------+--------------+----------------+
|cod_cliente|                nome|        cpf|sexo|            endereco| telefone_fixo|telefone_celular|
+-----------+--------------------+-----------+----+--------------------+--------------+----------------+
|         10|      LUCAS OLIVEIRA| 1010101010|   M|           RUA TESTE|              | (48) 9-88001118|
|          1|MARISA MELO OLIVEIRA|11111111111|   F|RUA JOSÉ WOSCH SO...|(41) 5096-4117|  (48) 5096-4117|
|          2|MURILO CARVALHO C...|22222222222|   M|RUA GEORGE BERNAN...|(21) 3944-5385|            NULL|
|          3|VINICIUS ROCHA RO...|33333333333|   M|                NULL|          NULL|            NULL|
|          4|CAROLINA ROCHA GOMES|44444444444|   F|                NULL|          NULL|            NULL|
|          5| ALINE SANTOS CASTRO|55555555555|   F|RUA ARMANDO PACAG...|(19) 7287-2893|  (19) 7287-2893|
|          6|LEILA CORREIA CAV...|66666666

In [None]:
spark.sql("""
DELETE FROM local.cliente WHERE cod_cliente = 10
""")

spark.sql("""
SELECT * FROM local.cliente
""").show()

++
||
++
++

+-----------+--------------------+-----------+----+--------------------+--------------+----------------+
|cod_cliente|                nome|        cpf|sexo|            endereco| telefone_fixo|telefone_celular|
+-----------+--------------------+-----------+----+--------------------+--------------+----------------+
|          1|MARISA MELO OLIVEIRA|11111111111|   F|RUA JOSÉ WOSCH SO...|(41) 5096-4117|  (48) 5096-4117|
|          2|MURILO CARVALHO C...|22222222222|   M|RUA GEORGE BERNAN...|(21) 3944-5385|            NULL|
|          3|VINICIUS ROCHA RO...|33333333333|   M|                NULL|          NULL|            NULL|
|          4|CAROLINA ROCHA GOMES|44444444444|   F|                NULL|          NULL|            NULL|
|          5| ALINE SANTOS CASTRO|55555555555|   F|RUA ARMANDO PACAG...|(19) 7287-2893|  (19) 7287-2893|
|          6|LEILA CORREIA CAV...|66666666666|   F|RUA FRANCISCO D'A...|          NULL|            NULL|
|          7|SOPHIA CORREIA SA...|77777777

In [17]:
print("Apólices com valor de cobertura > 15000:")
spark.sql("""
SELECT a.cod_apolice, c.nome, a.placa, a.valor_cobertura 
FROM local.apolice a
JOIN local.cliente c ON a.cod_cliente = c.cod_cliente
WHERE a.valor_cobertura > 15000
""").show()

Apólices com valor de cobertura > 15000:
+-----------+--------------------+-------+---------------+
|cod_apolice|                nome|  placa|valor_cobertura|
+-----------+--------------------+-------+---------------+
|  202200012|MARISA MELO OLIVEIRA|CCR8096|       19970.84|
|  202200007|MURILO CARVALHO C...|NFT2212|       19509.51|
|  202200003|VINICIUS ROCHA RO...|JIE0952|       19456.46|
|  202200005|VINICIUS ROCHA RO...|LWJ9156|       19130.12|
|  202200016|VINICIUS ROCHA RO...|EEE1056|       15760.31|
|  202200014|CAROLINA ROCHA GOMES|GQY6753|       15040.52|
|  202200015|CAROLINA ROCHA GOMES|DLA3438|       16261.87|
|  202200009|LEILA CORREIA CAV...|FFR1234|       17561.01|
|  202200002|SOPHIA CORREIA SA...|NEM5116|       16081.90|
+-----------+--------------------+-------+---------------+

