In [0]:
# etl/transformacao.ipynb

# Limpeza e padronização dos dados CRM
cleaned_crm_data 	  = crm_spark_df.dropDuplicates().filter(col('data_base') == current_date())
standardized_crm_data = (
	cleaned_crm_data.withColumn("qtd_dias", datediff(current_date(), col("data_envio")))
					.select("id_cliente", "email", "telefone", "assunto","data_envio", "qtd_dias")
					    )
					

# Limpeza e padronização dos dados de Marketing Cloud utilizando expressão SQL
cleaned_marketing_data 		= marketing_spark_df.dropDuplicates().filter(marketing_spark_df.email.isNotNull())
standardized_marketing_data = (
	cleaned_marketing_data.selectExpr(
		"id_cliente","email", "telefone", "subject as assunto",
		"data_envio", "date_diff(today(), data_envio) as qtd_dias")
			)


# Limpeza e padronização dos dados de suspeita de fraude
cleaned_fraude_data = fraude_data.filter(col('data_exclus').isNull())
standardized_fraude_data = (
	cleaned_fraude_data.withColumnRenamed("dt_inclus", "data_envio")
					   .withColumn("qtd_dias", datediff(current_date(), col("data_envio")))
					   .withColumn("assunto", lit('suspeita de fraude'))
					)


# Limpeza e padronização da lista de restritos
standardized_restritos_data = (
	restritos_data.select('id_cliente'
			,lit("Restritos").alias('canal')
			,col('contatos.email').alias('email')
			,col('contatos.numero_telefone').alias('telefone')
			,col('motivos.descricao').alias('assunto')
			,col('contatos.data_solicitacao').cast('date').alias('data_envio'))
		).distinct()
					

# Aplicação das regras de quarentena
quarentena_crm 		 	= standardized_crm_data.withColumn("saida_prevista", date_add(current_date(), 30))
quarentena_marketing 	= standardized_marketing_data.withColumn("saida_prevista", date_add(current_date(), 7))
quarentena_recomendacao = recomendacao_data.withColumn("saida_prevista", date_add(current_date(), 15))
quarentena_fraude 		= standardized_fraude_data.withColumn("saida_prevista", lit(None)).drop('data_exclus')
quarentena_restritos 	= standardized_restritos_data.withColumn("qtd_dias", datediff(current_date(), col("data_envio")))\
													 .withColumn("saida_prevista", lit(90))


# Unificação dos dados em uma única estrutura
consolidated_data = (
	quarentena_crm.where(col("qtd_dias") <= 30)
				  .union(quarentena_marketing.filter   ( col("qtd_dias") <= 7 ) )
				  .union(quarentena_recomendacao.filter( col("qtd_dias") <= 15) )
				  .union(quarentena_restritos.filter   ( col("qtd_dias") <= 90) )
				  .union(quarentena_ofertas.filter     ( col("qtd_dias") <= 5 ) )
				  .union(quarentena_fraude)
				  
		).withColumn("data_carga" lit(f'{current_date()}').cast('date'))