In [None]:
!pip install pyspark==3.4.2 delta-spark==2.4.0


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType

from delta import *

In [None]:
spark = (
    SparkSession
    .builder
    .master("local[*]")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)

In [None]:
data = [
    ("1", "Fusca","1990","Quebrou",   100.00),
    ("2", "Uno","2000","Quase quebrou", 1500.00),
    ("3", "Celta","2012","Funcionando",   20500.00)
]

schema = (
    StructType([
        StructField("ID_CARRO",     StringType(),True),
        StructField("NOME_CARRO",   StringType(),True),
        StructField("ANO",             StringType(),True),
        StructField("STATUS",         StringType(),True),
        StructField("VALOR", FloatType(), True)
    ])
)

df = spark.createDataFrame(data=data,schema=schema)

df.show(truncate=False)

+--------+----------+----+-------------+-------+
|ID_CARRO|NOME_CARRO|ANO |STATUS       |VALOR  |
+--------+----------+----+-------------+-------+
|1       |Fusca     |1990|Quebrou      |100.0  |
|2       |Uno       |2000|Quase quebrou|1500.0 |
|3       |Celta     |2012|Funcionando  |20500.0|
+--------+----------+----+-------------+-------+



In [None]:
(
    df
    .write
    .format("delta")
    .mode('overwrite')
    .save("./RAW/CARRO")
)

In [None]:
new_data = [
    ("1", "Fusca","1990","Quebrou",   150.00),
    ("2", "Uno","2000","Quase quebrou", 2500.00),
    ("4", "Carrao","2012","Funcionando",   30000.00)
]

df_new = spark.createDataFrame(data=new_data, schema=schema)

df_new.show()

+--------+----------+----+-------------+-------+
|ID_CARRO|NOME_CARRO| ANO|       STATUS|  VALOR|
+--------+----------+----+-------------+-------+
|       1|     Fusca|1990|      Quebrou|  150.0|
|       2|       Uno|2000|Quase quebrou| 2500.0|
|       4|    Carrao|2012|  Funcionando|30000.0|
+--------+----------+----+-------------+-------+



In [None]:
deltaTable = DeltaTable.forPath(spark, "./RAW/CARRO")

(
    deltaTable.alias("dados_atuais")
    .merge(
        df_new.alias("novos_dados"),
        "dados_atuais.ID_CARRO = novos_dados.ID_CARRO"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

In [None]:
deltaTable.delete("VALOR < 200.00")

In [None]:
(
    spark
    .read
    .format('delta')
    .load('./RAW/CARRO')
    .show()
)

+--------+----------+----+-------------+-------+
|ID_CARRO|NOME_CARRO| ANO|       STATUS|  VALOR|
+--------+----------+----+-------------+-------+
|       2|       Uno|2000|Quase quebrou| 2500.0|
|       3|     Celta|2012|  Funcionando|20500.0|
|       4|    Carrao|2012|  Funcionando|30000.0|
+--------+----------+----+-------------+-------+



In [None]:
#Para Arquivos CSV

In [None]:
# Carregar dados CSV
df = spark.read.format("csv").option("header", True).load("guns.csv")

In [None]:
df.show()

+-----------------+-------------+--------------+--------+-------+---------+-----------+
|         gun_name|gun_archetype|        Source|Element |gun_RoF|   Rarity|weapon_type|
+-----------------+-------------+--------------+--------+-------+---------+-----------+
|     Loud Lullaby|   Aggressive|          Moon| Kinetic|    120|Legendary|hand_cannon|
|        Pribina-D|   Aggressive|      Gunsmith| Kinetic|    120|Legendary|hand_cannon|
|    True Prophecy|   Aggressive|         World| Kinetic|    120|Legendary|hand_cannon|
|   Igneous Hammer|   Aggressive|        Trials|   Solar|    120|Legendary|hand_cannon|
|    Bottom Dollar|   Aggressive|        Gambit|    Void|    120|Legendary|hand_cannon|
|  The Steady Hand|   Aggressive|BL Iron Banner| Kinetic|    120|Legendary|hand_cannon|
| Crimil's Dagger |   Aggressive|   Iron Banner| Kinetic|    120|Legendary|hand_cannon|
|     Duke Mk. 44 |   Aggressive|         World| Kinetic|    120|Legendary|hand_cannon|
|       Thin Line |   Aggressive

In [None]:
(
    df
    .write
    .format("delta") \
    .mode('overwrite') \
    .option("delta.columnMapping.mode","name") \
    .option("mapColumnNames", "true") \
    .save("./RAW/DESTINY")
)


In [None]:
(
    spark
    .read
    .format('delta')
    .load('./RAW/DESTINY')
    .show()
)

In [None]:
new_data = [
    ("novo"        , "Teste", "Lua",  "Kinetic", "120", "Legendary", "hand_cannon"),
    ("21% Delirium", "AAAAA", "Moon", "Kinetic", "120", "Legendary", "hand_cannon")
]

schema = (
    StructType([
        StructField("gun_name",     StringType(),True),
        StructField("gun_archetype",   StringType(),True),
        StructField("Source",             StringType(),True),
        StructField("Element ",         StringType(),True),
        StructField("gun_RoF",         StringType(),True),
        StructField("Rarity",         StringType(),True),
        StructField("weapon_type", StringType(), True)
    ])
)

df_new = spark.createDataFrame(data=new_data, schema=schema)

deltaTable = DeltaTable.forPath(spark, "./RAW/DESTINY")

(
    deltaTable.alias("dados_atuais")
    .merge(
        df_new.alias("novos_dados"),
        "dados_atuais.gun_name = novos_dados.gun_name"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

In [None]:
(
    spark
    .read
    .format('delta')
    .load('./RAW/DESTINY')
    .show()
)

+------------------+--------------+----------------+--------+-------+---------+---------------+
|          gun_name| gun_archetype|          Source|Element |gun_RoF|   Rarity|    weapon_type|
+------------------+--------------+----------------+--------+-------+---------+---------------+
|      21% Delirium|         AAAAA|            Moon| Kinetic|    120|Legendary|    hand_cannon|
|   A Fine Memorial|      Adaptive|    Moon Essence|     Arc|    450|Legendary|    machine_gun|
|  Abide the Return|      Adaptive|           World|   Solar|      0|Legendary|          sword|
|Accrued Redemption|     Precision|        GoS Raid| Kinetic|    667|Legendary|            bow|
|     Ace of Spades|      Adaptive|           Quest| Kinetic|    140|   Exotic|    hand_cannon|
|       Adhortative|      Adaptive|   Vex Offensive|   Solar|    390|Legendary|    pulse_rifle|
|            Adored|      Adaptive|           Quest|     Arc|     90|Legendary|         sniper|
|     Age-Old Bond |   High Impact|     

In [None]:
deltaTable.delete("gun_archetype = 'AAAAA'")

In [None]:
(
    spark
    .read
    .format('delta')
    .load('./RAW/DESTINY')
    .show()
)

+------------------+--------------+----------------+--------+-------+---------+---------------+
|          gun_name| gun_archetype|          Source|Element |gun_RoF|   Rarity|    weapon_type|
+------------------+--------------+----------------+--------+-------+---------+---------------+
|   A Fine Memorial|      Adaptive|    Moon Essence|     Arc|    450|Legendary|    machine_gun|
|  Abide the Return|      Adaptive|           World|   Solar|      0|Legendary|          sword|
|Accrued Redemption|     Precision|        GoS Raid| Kinetic|    667|Legendary|            bow|
|     Ace of Spades|      Adaptive|           Quest| Kinetic|    140|   Exotic|    hand_cannon|
|       Adhortative|      Adaptive|   Vex Offensive|   Solar|    390|Legendary|    pulse_rifle|
|            Adored|      Adaptive|           Quest|     Arc|     90|Legendary|         sniper|
|     Age-Old Bond |   High Impact|       Last Wish|    Void|    360|Legendary|     auto_rifle|
|   Ancient Gospel |      Adaptive|     