# Tarefas abandonadas com PySpark

Este projeto utiliza PySpark para identificar e analisar tarefas abandonadas ao longo do tempo, com o objetivo de monitorar padrões de abandono e apoiar a tomada de decisões para melhorias.

**Fluxo do projeto:**

1. **Extração e transformação:** Os dados são extraídos de um arquivo CSV, tratados e transformados em DataFrames Spark.
2. **Persistência:** Os dados processados são enviados e armazenados em uma tabela DynamoDB.
3. **Consulta e análise:** Os dados são recuperados do DynamoDB, processados em DataFrames e utilizados para gerar relatórios mensais dos últimos 6 meses, detalhando o volume de tarefas não concluídas por tipo.



## 1. Extração e transformação

In [2]:
from src.clients.spark_client import get_spark_session
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, trim, when, lit,  current_timestamp, try_to_timestamp, date_format, concat, udf
import uuid


spark = get_spark_session()


base_dataframe = (
    spark
    .read
    .option('delimiter', ';')
    .option('header', 'true')
    .option('inferSchema', 'true')
    .option('enconding', 'ISO-8859-1')
    .csv('./data/amostragem.csv')
)



df = base_dataframe.withColumnsRenamed(
    {'Nome da Tarefa': 'nome', 
     'Data de Criação': 'data', 
     'Data de Conclusão': 'data_conclusao', 
     'Status': 'status', 
     'Tipo da Tarefa': 'tipo_tarefa', 
     'ID do Usuário': 'user_id',
     'Usuário': 'usuario'
     }
    )

df = df.withColumn('status', df['status'].cast(StringType()))
df = df.withColumn('data', df['data'].cast(StringType()))
df = df.withColumn('data_conclusao', df['data_conclusao'].cast(StringType()))



Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/06/11 15:43:52 WARN Utils: Your hostname, kassia-pc, resolves to a loopback address: 127.0.1.1; using 172.18.227.129 instead (on interface eth0)
25/06/11 15:43:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/11 15:43:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Realiza a limpeza dos dados conforme os critérios abaixo:
- Exclui tarefas cujo nome está nulo ou vazio.
- Atribui a data atual para tarefas sem data ou com data inválida.

In [172]:

df = df.filter((df['usuario'] == 'Jeferson Klau') & (df['status'] != '3'))
df = df.distinct()
df = df.withColumn('nome', trim('nome'))
df = df.withColumn('nome', when(col('nome') == '', lit(None)).otherwise(col('nome')))
df = df.dropna(subset=['nome'])

df = df.withColumn('data', when(try_to_timestamp(col('data')).isNotNull(), col('data')).otherwise(current_timestamp().cast(StringType())))



Faz a transformação dos dados para o padrão esperado da tabela do DynamoDB

In [174]:

def generate_uuid():
  return str(uuid.uuid4())

uuid_udf = udf(generate_uuid, StringType())
user_id = '035c6ada-4091-703a-1837-677cad18d4a5'

df = df.withColumn('status', when(df['status'] == '1', 'TODO').otherwise('DONE'))
df = df.withColumn('user_id', when((df['user_id'] == 'b4853fc1f03a3a4cec530a98a94d89ad') | (df['usuario'] == 'Jeferson Klau'), user_id))
df = df.withColumn('PK', concat(lit('LIST#'), date_format(col('data'), 'yyyyMMdd')))
df = df.withColumn('SK', concat(lit('ITEM#'), uuid_udf()))  
df = df.select(df.PK, df.SK, df.user_id, df.usuario, df.nome, df.data, df.data_conclusao, df.status, df.tipo_tarefa)



## 2. Enviando os dados para a tabela do dynamo

In [176]:
import time
from src.clients.dynamodb_client import DynamoDBClient

TAMANHO_LOTE = 100
DELAY_EM_SEGUNDOS = 2


dynamodb = DynamoDBClient()
table = dynamodb.get_table('shopping-list')

pandas_df = df.toPandas()
rows = pandas_df.to_dict(orient='records')

for i in range(0, len(rows), TAMANHO_LOTE):
  batch_rows = rows[i:i+TAMANHO_LOTE]

  with table.batch_writer() as batch:
    for item in batch_rows:
      
      batch.put_item(Item=item) 

  time.sleep(DELAY_EM_SEGUNDOS)

print('Todos items foram inseridos')

Todos items foram inseridos


## 3. Obtendo volume de tarefas abandonadas 

In [2]:
from src.clients.spark_client import get_spark_session
from src.clients.dynamodb_client import DynamoDBClient
from boto3.dynamodb.conditions import Key
from pyspark.sql.types import StringType, StructField, StructType

spark = get_spark_session()

# Query para obter as tarefas não concluídas
dynamodb = DynamoDBClient()
table = dynamodb.get_table('shopping-list')

response = table.query(
    IndexName='user_id',
    KeyConditionExpression=Key('user_id').eq('035c6ada-4091-703a-1837-677cad18d4a5') & Key('PK').begins_with('LIST#'),
    FilterExpression=Key('status').eq('TODO')
)

items = response['Items']

schema = StructType([
    StructField('PK', StringType(), False),
    StructField('SK', StringType(), False),
    StructField('user_id', StringType(), False),
    StructField('usuario', StringType(), False),
    StructField('nome', StringType(), False),
    StructField('data', StringType(), False),
    StructField('data_conclusao', StringType(), True),
    StructField('status', StringType(), False),
    StructField('tipo_tarefa', StringType(), False)
])

dynamodb_dataframe = spark.createDataFrame(items, schema=schema)




Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/06/12 09:36:22 WARN Utils: Your hostname, kassia-pc, resolves to a loopback address: 127.0.1.1; using 172.18.227.129 instead (on interface eth0)
25/06/12 09:36:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/12 09:36:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [38]:
import pandas as pd

from datetime import date
from dateutil.relativedelta import relativedelta
from pyspark.sql import functions as F

today = date.today()
datas = [(today - relativedelta(months=i)).strftime('%Y-%m') for i in range(5, -1, -1)]

abandonadas_df = (
        dynamodb_dataframe
        .select(
            F.col('nome'),
            F.col('data'),
            F.col('tipo_tarefa')
        
        )
        .where(
            ((F.col('tipo_tarefa') == 'Tarefa a Ser Feita') & (F.datediff(F.lit(today), F.col('data')) > 15)) |
            ((F.col('tipo_tarefa') == 'Item de Compra') & (F.datediff(F.lit(today), F.col('data')) > 30)) 
        )
        .withColumn('mes', F.date_format(F.col('data'), 'yyyy-MM'))
        .orderBy('tipo_tarefa')
    )



resultados = []

for data in datas:
    df = abandonadas_df\
    .where(F.col('mes').like(data))\
    .groupby(F.col('tipo_tarefa'))\
    .count()\
    .select(F.col('tipo_tarefa'), F.col('count').alias(data))\
    .toPandas()\
    .set_index('tipo_tarefa')
    resultados.append(df)
    

print(len(resultados))
    

abandonadas_por_data = pd.concat(resultados, axis=1)
abandonadas_por_data.to_excel('abandonadas.xlsx', index_label=[""], index=True, na_rep=0, sheet_name='Tarefas abandonadas por data')

   



6
