In [22]:
# Importa funções do PySpark
import os
import sys
import boto3

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from botocore.exceptions import ClientError

In [24]:
# Inicializa a SparkSession
spark = SparkSession.builder.appName("InicializarDynamo").getOrCreate()

In [25]:
# Lê o CSV 
df = spark.read.csv('../data/dados_importar.csv', header=True, inferSchema=True, sep=';')

In [26]:
# Substitui status de acorco com a descrição e remove o usuário diferente de Jeferson
df = df.withColumn(
    "status",
    when(col("Status Descrição") == "Concluído", "done")
    .when(col("Status Descrição") == "A Fazer", "todo")
    .otherwise(None)
)

In [27]:
# Mantém apenas linhas com status definido (done, todo)
df = df.filter(col("status").isNotNull())

In [28]:
# Mantém apenas linhas onde o usuário é 'Jeferson Klau'
df = df.filter(col("Usuário") == "Jeferson Klau")

In [29]:
# Remove a coluna "Status Descrição" e "Tipo da Tarefa ID"
df = df.drop("Status Descrição", "Tipo da Tarefa ID")

In [30]:
# Renomeia colunas e adiciona colunas "pk" e "sk"
df = df.withColumnRenamed("Nome da Tarefa", "name") \
       .withColumnRenamed("Tipo da Tarefa", "taskType") \
       .withColumnRenamed("Data de Criação", "date") \
       .withColumnRenamed("Data de Conclusão", "dateCompleted") \
       .withColumnRenamed("Usuário", "user") \
       .withColumnRenamed("ID do Usuário", "userID") \
       .withColumnRenamed("Status", "status") 

In [31]:
# Adicionando a coluna PK
df = df.withColumn("PK", concat(lit("LIST#"), date_format(col("date"), "yyyyMMdd")))

In [32]:
# Cria coluna 'date' formatada
df = df.withColumn("date", date_format(col("date"), "yyyy-MM-dd"))

In [33]:
df = df.withColumn("dateCompleted", date_format(col("dateCompleted"), "yyyy-MM-dd"))

In [34]:
# Visualiza os dados
df.show(100, truncate=False)

+----------------------------------------------------------------------------+------------------+----------+-------------+------+-------------+--------------------------------+-------------+
|name                                                                        |taskType          |date      |dateCompleted|status|user         |userID                          |PK           |
+----------------------------------------------------------------------------+------------------+----------+-------------+------+-------------+--------------------------------+-------------+
|Cum iure exercitationem laboriosam                                          |Tarefa a Ser Feita|2024-07-18|2025-02-12   |done  |Jeferson Klau|b4853fc1f03a3a4cec530a98a94d89ad|LIST#20240718|
|Asperiores iusto                                                            |Tarefa a Ser Feita|2025-05-29|NULL         |todo  |Jeferson Klau|b4853fc1f03a3a4cec530a98a94d89ad|LIST#20250529|
|Asperiores pariatur voluptatibus magnam     

In [35]:
df = df.withColumn("row_id", monotonically_increasing_id())

In [36]:
df = df.withColumn("itemId", sha2(col("row_id").cast("string"), 256))

In [37]:
df = df.withColumn("SK", concat(lit("ITEM#"), col("itemId")))

In [38]:
df = df.drop("row_id")

In [39]:
from dotenv import load_dotenv
load_dotenv()
userId = os.getenv("USER_ID")
df = df.withColumn("userID", lit(userId))

In [40]:
df_final = df.select(
    "PK", "SK", "date", "itemId", "name", "status",
    "dateCompleted", "taskType", "user", "userID"
)

In [41]:
df_final.show(100, truncate=False)

+-------------+---------------------------------------------------------------------+----------+----------------------------------------------------------------+----------------------------------------------------------------------------+------+-------------+------------------+-------------+------------------------------------+
|PK           |SK                                                                   |date      |itemId                                                          |name                                                                        |status|dateCompleted|taskType          |user         |userID                              |
+-------------+---------------------------------------------------------------------+----------+----------------------------------------------------------------+----------------------------------------------------------------------------+------+-------------+------------------+-------------+------------------------------------+
|LIST#2024

In [42]:
import time
from botocore.exceptions import ClientError
import boto3

dynamodb = boto3.resource('dynamodb', region_name='sa-east-1')
table = dynamodb.Table("ListaMercado")

TAMANHO_LOTE = 100
DELAY_SEGUNDOS = 2

dados = [row.asDict() for row in df_final.collect()]

for i in range(0, len(dados), TAMANHO_LOTE):
    lote = dados[i:i + TAMANHO_LOTE]
    
    try:
        with table.batch_writer() as batch:
            for item in lote:  
                batch.put_item(Item=item)
        print(f"Lote {i//TAMANHO_LOTE + 1} inserido com sucesso!")
    except ClientError as e:
        print(f"Erro na inserção do lote: {e.response['Error']['Message']}")
    except Exception as e:
        print(f"Erro inesperado: {e}")
    
    time.sleep(DELAY_SEGUNDOS)


Lote 1 inserido com sucesso!
Lote 2 inserido com sucesso!
Lote 3 inserido com sucesso!
Lote 4 inserido com sucesso!
Lote 5 inserido com sucesso!
Lote 6 inserido com sucesso!
Lote 7 inserido com sucesso!
Lote 8 inserido com sucesso!
Lote 9 inserido com sucesso!
Lote 10 inserido com sucesso!
Lote 11 inserido com sucesso!
