In [1]:
# Importa pandas e funções do PySpark
import pandas as pd
import os
import sys
import boto3

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from botocore.exceptions import ClientError


In [67]:
# Inicializa a SparkSession
spark = SparkSession.builder.appName("InicializarDynamo").getOrCreate()

In [68]:
# Lê o CSV 
df = spark.read.csv('../data/dados_importar.csv', header=True, inferSchema=True, sep=';')
print("Linhas no arquivo:", df.count())
df.show(5, truncate=False)
df.columns

Linhas no arquivo: 3000
+-------------------------------------------------------+------------------+-----------------+-------------------+-------------------+------+----------------+--------------+--------------------------------+
|Nome da Tarefa                                         |Tipo da Tarefa    |Tipo da Tarefa ID|Data de Criação    |Data de Conclusão  |Status|Status Descrição|Usuário       |ID do Usuário                   |
+-------------------------------------------------------+------------------+-----------------+-------------------+-------------------+------+----------------+--------------+--------------------------------+
|Cum iure exercitationem laboriosam                     |Tarefa a Ser Feita|2                |2024-07-18 22:51:51|2025-02-12 22:51:51|2     |Concluído       |Jeferson Klau |b4853fc1f03a3a4cec530a98a94d89ad|
|Recusandae quasi                                       |Tarefa a Ser Feita|2                |2024-01-09 21:03:53|NULL               |3     |Cancela

['Nome da Tarefa',
 'Tipo da Tarefa',
 'Tipo da Tarefa ID',
 'Data de Criação',
 'Data de Conclusão',
 'Status',
 'Status Descrição',
 'Usuário',
 'ID do Usuário']

In [69]:
# Substitui status de acordo com a descrição e remove o usuário diferente de Jeferson
df = df.withColumn(
    "status",
    when(col("Status Descrição") == "Concluído", "done")
    .when(col("Status Descrição") == "A Fazer", "todo")
    .otherwise(None)
)

In [70]:
# Mantém apenas linhas com status definido (done, todo)
df = df.filter(col("status").isNotNull())
print("Linhas após filtro usuário:", df.count())
df.show(100, truncate=False)

Linhas após filtro usuário: 2027
+------------------------------------------------------------+------------------+-----------------+-------------------+-------------------+------+----------------+--------------+--------------------------------+
|Nome da Tarefa                                              |Tipo da Tarefa    |Tipo da Tarefa ID|Data de Criação    |Data de Conclusão  |status|Status Descrição|Usuário       |ID do Usuário                   |
+------------------------------------------------------------+------------------+-----------------+-------------------+-------------------+------+----------------+--------------+--------------------------------+
|Cum iure exercitationem laboriosam                          |Tarefa a Ser Feita|2                |2024-07-18 22:51:51|2025-02-12 22:51:51|done  |Concluído       |Jeferson Klau |b4853fc1f03a3a4cec530a98a94d89ad|
|Asperiores iusto                                            |Tarefa a Ser Feita|2                |2025-05-29 09:07:49|

In [71]:
# Mantém apenas linhas onde o usuário é 'Jeferson Klau'
df = df.filter(col("Usuário") == "Jeferson Klau")
print("Linhas após filtro usuário:", df.count())
df.show(100, truncate=False)

Linhas após filtro usuário: 1022
+----------------------------------------------------------------------------+------------------+-----------------+-------------------+-------------------+------+----------------+-------------+--------------------------------+
|Nome da Tarefa                                                              |Tipo da Tarefa    |Tipo da Tarefa ID|Data de Criação    |Data de Conclusão  |status|Status Descrição|Usuário      |ID do Usuário                   |
+----------------------------------------------------------------------------+------------------+-----------------+-------------------+-------------------+------+----------------+-------------+--------------------------------+
|Cum iure exercitationem laboriosam                                          |Tarefa a Ser Feita|2                |2024-07-18 22:51:51|2025-02-12 22:51:51|done  |Concluído       |Jeferson Klau|b4853fc1f03a3a4cec530a98a94d89ad|
|Asperiores iusto                                          

In [72]:
# Remove a coluna Status Descrição
df = df.drop("Status Descrição")

# Remove a coluna Tipo da Tarefa ID
df = df.drop("Tipo da Tarefa ID")

In [73]:
# Renomeia colunas 
df = df.withColumnRenamed("Nome da Tarefa", "name") \
       .withColumnRenamed("Tipo da Tarefa", "task_type") \
       .withColumnRenamed("Data de Criação", "created_at") \
       .withColumnRenamed("Data de Conclusão", "completed_at") \
       .withColumnRenamed("ID do Usuário", "user_ID") \
       .withColumnRenamed("Usuário", "user") \
       .withColumnRenamed("Status", "status") 
df.columns

['name',
 'task_type',
 'created_at',
 'completed_at',
 'status',
 'user',
 'user_ID']

In [74]:

df = df.withColumn("created_at", date_format(col("created_at"), "yyyy-MM-dd"))
df = df.withColumn("completed_at", date_format(col("completed_at"), "yyyy-MM-dd"))


# Cria a coluna 'PK' no formato LIST#yyyyMMdd
df = df.withColumn("PK", concat(lit("LIST#"), date_format(col("created_at"), "yyyyMMdd")))

df = df.withColumn("row_id", monotonically_increasing_id())
df = df.withColumn("itemId", sha2(col("row_id").cast("string"), 256))
df = df.withColumn("SK", concat(lit("ITEM#"), col("itemId")))

df = df.drop("row_id", "itemId", "date")

# Visualiza os dados
df.show(1000, truncate=False)

+-------------------------------------------------------------------------------+------------------+----------+------------+------+-------------+--------------------------------+-------------+---------------------------------------------------------------------+
|name                                                                           |task_type         |created_at|completed_at|status|user         |user_ID                         |PK           |SK                                                                   |
+-------------------------------------------------------------------------------+------------------+----------+------------+------+-------------+--------------------------------+-------------+---------------------------------------------------------------------+
|Cum iure exercitationem laboriosam                                             |Tarefa a Ser Feita|2024-07-18|2025-02-12  |done  |Jeferson Klau|b4853fc1f03a3a4cec530a98a94d89ad|LIST#20240718|ITEM#5feceb66ffc86f

In [75]:
from pyspark.sql.functions import col

# Remove valores nulos e força tipo string na coluna "name"
df_nonull = df.filter(col("name").isNotNull()).withColumn("name", col("name").cast("string"))


In [76]:
# Cria a coluna 'PK' no formato LIST#yyyyMMdd
df = df.withColumn("PK", concat(lit("LIST#"), date_format(col("created_at"), "yyyyMMdd")))

In [77]:
df = df.withColumn("row_id", monotonically_increasing_id())

In [78]:
df = df.withColumn("itemId", sha2(col("row_id").cast("string"), 256))

In [79]:
df = df.withColumn("SK", concat(lit("ITEM#"), col("itemId")))

In [80]:
df = df.drop("row_id", "itemId", "date")

In [81]:
# Visualiza os dados
df.show(1000, truncate=False)

+-------------------------------------------------------------------------------+------------------+----------+------------+------+-------------+--------------------------------+-------------+---------------------------------------------------------------------+
|name                                                                           |task_type         |created_at|completed_at|status|user         |user_ID                         |PK           |SK                                                                   |
+-------------------------------------------------------------------------------+------------------+----------+------------+------+-------------+--------------------------------+-------------+---------------------------------------------------------------------+
|Cum iure exercitationem laboriosam                                             |Tarefa a Ser Feita|2024-07-18|2025-02-12  |done  |Jeferson Klau|b4853fc1f03a3a4cec530a98a94d89ad|LIST#20240718|ITEM#5feceb66ffc86f

In [82]:
from dotenv import load_dotenv

load_dotenv()
USER_ID = os.getenv("USER_ID")

df_translated = df.withColumn("user_ID", lit(USER_ID))

In [83]:
# 1. Define a ordem desejada (ajuste para os nomes corretos das colunas)
ordem_desejada = ["PK", "SK", "created_at", "name", "status"]

# 2. Obtém as colunas restantes
colunas_restantes = [c for c in df.columns if c not in ordem_desejada]

# 3. Cria a lista final de colunas
nova_ordem = ordem_desejada + colunas_restantes

# 4. Reordena o DataFrame
df_translated = df.select(*nova_ordem)

# 5. Mostra o DataFrame reordenado
df_translated.show(100, truncate=False)

+-------------+---------------------------------------------------------------------+----------+----------------------------------------------------------------------------+------+------------------+------------+-------------+--------------------------------+
|PK           |SK                                                                   |created_at|name                                                                        |status|task_type         |completed_at|user         |user_ID                         |
+-------------+---------------------------------------------------------------------+----------+----------------------------------------------------------------------------+------+------------------+------------+-------------+--------------------------------+
|LIST#20240718|ITEM#5feceb66ffc86f38d952786c6d696c79c2dbc239dd4e91b46729d73a27fb57e9|2024-07-18|Cum iure exercitationem laboriosam                                          |done  |Tarefa a Ser Feita|2025-02-12  |Jeferson

In [84]:

# 5. Mostra o DataFrame reordenado
df_translated.show(100, truncate=False)
df.columns

+-------------+---------------------------------------------------------------------+----------+----------------------------------------------------------------------------+------+------------------+------------+-------------+--------------------------------+
|PK           |SK                                                                   |created_at|name                                                                        |status|task_type         |completed_at|user         |user_ID                         |
+-------------+---------------------------------------------------------------------+----------+----------------------------------------------------------------------------+------+------------------+------------+-------------+--------------------------------+
|LIST#20240718|ITEM#5feceb66ffc86f38d952786c6d696c79c2dbc239dd4e91b46729d73a27fb57e9|2024-07-18|Cum iure exercitationem laboriosam                                          |done  |Tarefa a Ser Feita|2025-02-12  |Jeferson

['name',
 'task_type',
 'created_at',
 'completed_at',
 'status',
 'user',
 'user_ID',
 'PK',
 'SK']

In [85]:
import time
from botocore.exceptions import ClientError
import boto3

dynamodb = boto3.resource('dynamodb', region_name='sa-east-1')
table = dynamodb.Table("MarketList")

TAMANHO_LOTE = 100
DELAY_SEGUNDOS = 2

dados = [row.asDict() for row in df_translated.collect()]

for i in range(0, len(dados), TAMANHO_LOTE):
    lote = dados[i:i + TAMANHO_LOTE]
    try:
        with table.batch_writer() as batch:
            for item in lote:
                batch.put_item(Item=item)
        print(f"Lote {i//TAMANHO_LOTE + 1} inserido com sucesso!")
    except ClientError as e:
        print(f"Erro na inserção do lote: {e.response['Error']['Message']}")
    except Exception as e:
        print(f"Ocorreu um erro inesperado: {e}")
    time.sleep(DELAY_SEGUNDOS)

Lote 1 inserido com sucesso!
Lote 2 inserido com sucesso!
Lote 3 inserido com sucesso!
Lote 4 inserido com sucesso!
Lote 5 inserido com sucesso!
Lote 6 inserido com sucesso!
Lote 7 inserido com sucesso!
Lote 8 inserido com sucesso!
Lote 9 inserido com sucesso!
Lote 10 inserido com sucesso!
Lote 11 inserido com sucesso!
