<h1>Case Pismo - Data Engineering</h1>

<h2>Desafio Proposto</h2>
<body>Com os dados fornecidos, o desafio é construir uma solução que seja capaz de consumir o conjunto de arquivos e separá-los em diretórios para cada tipo de evento.</body>
- Observações importantes:
  - Usar python
  - Os diretórios devem ser particionados em: ano, mês, dia
  - O arquivo final deve estar em parquet
  - Deve ser mantido sempre o arquivo mais atual, em caso de duplicidade

In [0]:
import pyspark.sql.functions as sf

df_to_json = sqlContext\
    .read\
    .json(sc
          .wholeTextFiles("s3://desafio-pismo/*.json")
          .values()
          .flatMap(lambda x: x
                   .replace("\n", "#!#")
                   .replace("{#!#", "{")
                   .replace("#!#}", "}")
                   .replace(",#!#", ",")
                   .replace("}{", "},{")
                   .replace("},{", "},#####")
                   .replace('{"timestamp"','[{"timestamp"')
                   .replace("},#####","#####,#####")
                   .replace("}}}","}}}]")
                   .replace("#####,#####","},{")
                   .split("#!#")
                  )
         )\

display(df_to_json)

data,domain,event_id,event_type,timestamp
"List(null, 13.0, null, null, null, 186295, null, null, null, null, List(UNBLGBYV, 5551969893626), null, CELPHONE_RECHARGE)",transaction,6d7d0c85-94a2-4bf0-a570-72774e4fbc0d,creation,2021-01-01T00:00:29
"List(null, null, null, null, null, 40093, ACTIVE, SUSPENDED, null, Quas., null, null, null)",account,96579a99-7deb-4691-95d3-390178936615,status-change,2021-01-01T00:00:51
"List(null, 4.0, null, null, null, 384341, null, null, null, null, List(HBVVGBE5, 5521944543480), null, CELPHONE_RECHARGE)",transaction,bfa28493-b97c-4ee6-9612-f6422d185229,creation,2021-01-01T00:01:01
"List(null, null, null, null, null, 553807, ACTIVE, SUSPENDED, null, Quidem quidem facilis nostrum pariatur., null, null, null)",account,7b2480cc-50ed-4a57-8bd2-ace0d78da198,status-change,2021-01-01T00:01:37
"List(null, 92.0, null, null, null, 170156, null, null, null, null, List(UTRIGBZE, 5581980129475), null, CELPHONE_RECHARGE)",transaction,abd543ca-ace2-490e-8757-214c69da76ab,creation,2021-01-01T00:04:14
"List(null, 92.0, null, null, null, 170156, null, null, null, null, List(UTRIGBZE, 5581980129475), null, CELPHONE_RECHARGE)",transaction,abd543ca-ace2-490e-8757-214c69da76ab,creation,2021-01-01 00:04:15
"List(null, 9288.81, null, null, List(9971, Minus mollitia., Mirella da Costa), 748292, null, null, null, null, null, null, CASHIN)",transaction,5d743c5e-a967-4118-b4f6-e9963affb3fe,creation,2021-01-01T00:05:43
"List(null, null, null, null, null, 976984, JUDICIAL_BLOCK, ACTIVE, null, Dicta vero., null, null, null)",account,63987632-f9f9-4de6-8256-0bdabbcc3f3b,status-change,2021-01-01T00:06:28
"List(null, 7908.16, null, null, List(2033, Libero., Noah Martins), 947884, null, null, null, null, null, null, CASHIN)",transaction,0fd4c241-c32d-4578-b10e-fd94e5640363,creation,2021-01-01T00:06:53
"List(null, 7908.16, null, null, List(2033, Libero., Noah Martins), 947884, null, null, null, null, null, null, CASHIN)",transaction,0fd4c241-c32d-4578-b10e-fd94e5640363,creation,2021-01-01 00:07:00


<h2>Transform</h2>
- Nesse bloco estamos fazendo a `leitura` e `validação` da estrutura json dos arquivos e aplicando a correção via `replace`

<h3>Dando continuidade ao tratamento dos dados:</h3>
- No primeiro passo foi feito o tratamento do campo `timestamp` por alguns registros virem com o `T` e outros não, assim foi normalizado
- Após isso a mesma tipo de dados da coluna foi convertido de `string` para `timestamp`, para ser feito o tratamento usando um campo de data
- Concatenando os campos, obtivemos os tipos de eventos
- Logo após foi extraído o ano e mês dos registros
- Em seguida está sendo retornado o ultimo event_id de cada registro tendo como base a coluna `timestamp`
- Por fim, foi feita a separação das partições necessários da data, que são: `ano`, `mês` e `dia`

In [0]:
eventsDF = (df_to_json.withColumn("timestamp", sf.regexp_replace("timestamp", "T", " "))
            .withColumn("timestamp", sf.to_timestamp(sf.col("timestamp"), "yyyy-MM-dd HH:mm:ss"))
            .withColumn("event_key", sf.concat(sf.col("domain"),sf.lit('_'), sf.col("event_type")))
            .groupBy("event_id", "domain", "event_type", "event_key", "data").agg(sf.max("timestamp").alias("timestamp"))
            .withColumn("date", sf.split(sf.to_date(sf.col("timestamp")),"-"))
            .withColumn("year", sf.col("date").getItem(0))
            .withColumn("month", sf.col("date").getItem(1))
            .withColumn("day", sf.col("date").getItem(2))
            .select("timestamp", "event_id", "domain", "event_type", "event_key", "year","month","day", "data")
        )

display(eventsDF)

timestamp,event_id,domain,event_type,event_key,year,month,day,data
2021-01-01T01:14:40.000+0000,a8a42041-27d0-4332-b716-4948dbdcdcf9,transaction,creation,transaction_creation,2021,1,1,"List(null, 9588.73, null, null, List(1805, Dignissimos quaerat esse perferendis., Mariana Aragão), 556775, null, null, null, null, null, null, CASHIN)"
2021-01-01T08:05:30.000+0000,c0aee4d8-2948-4d49-9123-3101d16678c4,transaction,creation,transaction_creation,2021,1,1,"List(null, 8764.7, null, null, List(9973, Laboriosam voluptatem., Vitória Araújo), 920085, null, null, null, null, null, null, CASHIN)"
2021-01-01T14:38:48.000+0000,8c9654b9-1188-4ad2-9ae1-f6847b177c4a,account,status-change,account_status-change,2021,1,1,"List(null, null, null, null, null, 398434, ACTIVE, SUSPENDED, null, Natus odit nemo aliquam dicta laudantium doloribus impedit., null, null, null)"
2021-01-01T20:41:34.000+0000,c30eb3e8-381e-4fa2-b710-eb17ee1a8249,transaction,creation,transaction_creation,2021,1,1,"List(null, 8816.52, null, null, null, 276055, null, null, null, null, null, List(5933, Nesciunt impedit debitis., Antônio Sales), CASHOUT)"
2021-01-01T21:08:11.000+0000,10088b37-b04f-4aae-b09d-bc75b799bbfe,transaction,creation,transaction_creation,2021,1,1,"List(null, 7659.2, null, null, null, 553591, null, null, null, null, null, List(7287, Tempore illum illum numquam., Marcelo Duarte), CASHOUT)"
2021-01-01T21:59:45.000+0000,c4ccc848-bfcd-4a32-8815-8bafe81a0c04,account,creation,account_creation,2021,1,1,"List(List(List(Freitas da Prata, Aparecida, 30, 86896-979, Goiás, GO, Núcleo Juliana Pinto, Street)), null, List(List(List(Nogueira, Laranjeiras, 49, 70298-093, Rio Grande do Norte, SE, Vale Moura, Street), List(da Cunha dos Dourados, Conjunto Santa Maria, 937, 93002775, Alagoas, PE, Via de da Conceição, Street)), Oliveira), null, null, 333117, null, null, List(1956-01-18, Vicente Barros, Letrista), null, null, null, null)"
2021-01-02T02:49:34.000+0000,d5589db3-2921-4b95-abcd-52f133e15c94,transaction,creation,transaction_creation,2021,1,2,"List(null, 5335.81, null, null, null, 611106, null, null, null, null, null, List(4261, Animi veniam., Yasmin Costela), CASHOUT)"
2021-01-02T03:38:56.000+0000,341e3153-8eef-4aa3-ac8b-6e67f8c2edd5,account,creation,account_creation,2021,1,2,"List(null, null, null, null, null, 434265, null, null, List(1984-08-14, Pedro Lucas Duarte, Judoca), null, null, null, null)"
2021-01-02T04:02:34.000+0000,d13e15d0-6c67-4329-a2d8-d6610b1b55a3,transaction,creation,transaction_creation,2021,1,2,"List(null, 4902.4, null, null, List(7397, Nostrum., Theo Duarte), 510403, null, null, null, null, null, null, CASHIN)"
2021-01-02T06:43:32.000+0000,bece94fa-afa7-4817-96d8-5011b01801ce,transaction,creation,transaction_creation,2021,1,2,"List(null, 6969.8, null, null, List(4249, Impedit modi officiis., Luigi da Costa), 219071, null, null, null, null, null, null, CASHIN)"


<h2>Load</h2>
- No bloco final estamos escrevendo os arquivos em formato `parquet` e em diretórios particionados por: `event_key`, `year`, `month`, `day`

In [0]:
eventsDF.write.mode("overwrite").partitionBy("event_key", "year", "month", "day").parquet("s3://trusted-data-pismo/results")

<h3>Resultado</h3>
Como podemos ver abaixo, o DataFrame final em parquet

In [0]:
df_parquet = spark.read.parquet("s3://trusted-data-pismo/results")
display(df_parquet)

timestamp,event_id,domain,event_type,data,event_key,year,month,day
2021-02-01T12:43:07.000+0000,4c5687c4-e873-482d-808e-d7217c761127,transaction,creation,"List(null, 9436.2, null, null, List(8697, Dolorem nam., Eloah Porto), 300820, null, null, null, null, null, null, CASHIN)",transaction_creation,2021,2,1
2021-02-01T16:16:33.000+0000,8721294c-6e72-47c9-ab73-54d2af887add,transaction,creation,"List(null, 6469.95, null, null, null, 574498, null, null, null, null, null, List(4259, Reiciendis assumenda qui culpa vitae deserunt., Davi Luiz Duarte), CASHOUT)",transaction_creation,2021,2,1
2021-02-01T07:46:25.000+0000,f4cbfacc-dd4f-401d-9c5b-735ade5d76cd,transaction,creation,"List(null, 8430.44, null, List(Residencial Evelyn Moura, 94 Vila Independencia 1ª Seção 19803160 Almeida / RJ, List(47.92526, -97.03285, Grand Forks, US, America/Chicago), Mendes), null, 640069, null, null, null, null, null, null, DEBIT)",transaction_creation,2021,2,1
2021-02-01T07:54:02.000+0000,1848e803-f4c7-4b39-93af-b4fdf56f4c97,transaction,creation,"List(null, 1921.62, null, null, null, 13986, null, null, null, null, null, List(4967, Occaecati illo., Amanda Farias), CASHOUT)",transaction_creation,2021,2,1
2021-02-01T13:52:10.000+0000,c17890fd-15ee-4a0c-b6f1-33fc5dd21c1b,transaction,creation,"List(null, 8228.47, null, null, List(2859, Earum., Larissa Novaes), 503025, null, null, null, null, null, null, CASHIN)",transaction_creation,2021,2,1
2021-02-01T15:11:12.000+0000,024a2957-d4b7-47aa-8d8e-4e0c1f5e63d1,transaction,creation,"List(null, 3852.32, null, List(Estrada de Araújo, 948 Vila Satélite 21982454 Santos / RJ, List(46.32374, -120.00865, Sunnyside, US, America/Los_Angeles), Costela da Mota e Filhos), null, 696144, null, null, null, null, null, null, DEBIT)",transaction_creation,2021,2,1
2021-02-01T17:04:50.000+0000,28aa181b-8bc5-4d8f-aef6-db91bfd6e0f1,transaction,creation,"List(null, 19.0, null, null, null, 609237, null, null, null, null, List(ZUQNGBRX, 5571957702694), null, CELPHONE_RECHARGE)",transaction_creation,2021,2,1
2021-02-01T22:02:57.000+0000,f880a0f6-10dd-4b12-9906-1512ff76b649,transaction,creation,"List(null, 9740.72, null, null, List(6710, Iusto sint., Emilly Nunes), 948287, null, null, null, null, null, null, CASHIN)",transaction_creation,2021,2,1
2021-02-01T01:39:25.000+0000,f2822352-7463-44ea-89b8-a7c69875b29c,transaction,creation,"List(null, 9869.23, null, null, List(6735, Iste expedita porro., Nina Fogaça), 875776, null, null, null, null, null, null, CASHIN)",transaction_creation,2021,2,1
2021-02-01T09:36:53.000+0000,85399c0a-bc88-45bf-912d-c3b8cd4a414a,transaction,creation,"List(null, 7635.96, null, null, List(8014, Illum occaecati molestias aliquam optio., Srta. Gabrielly Moraes), 144261, null, null, null, null, null, null, CASHIN)",transaction_creation,2021,2,1


<h2>Explicação</h2>

Para a resolução do desafio foi usado o Databricks para seu consumo, limpeza e carregamento dos dados, como armazenamento foi usado Amazon S3 Bucket. Inicialmente para a tratativa dos arquivos `JSON` foi usado uma lógica simples de `replace` tendo em vista que os arquivos não estavam com a estrutura padrão de um `JSON`, como por exemplo, os separadores de objetos não possuírem vírgula e os arquivos não começarem e não terminarem com `[]`. Para a validação dos registros duplicados foi aplicado um agrupamento com `MAX()` para buscar sempre o último registro e consequetemente excluindo os duplicado e por fim está sendo carregado no S3 particionado por `event_key` (campo criado com base na regra do contrato) e pelos campos `ano. mes e dia`.