## Desafio de Deduplicação dos dados

In [1]:
# Import e inicialização do Spark
import findspark
findspark.init()

### Import dependencias

In [2]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import collect_list
from pyspark.sql.types import (
    IntegerType,
    TimestampType,
    StringType,
    StructType,
    StructField,
)

spark = (
    SparkSession.builder.master("local").appName("challenge-dedup-data").getOrCreate()
)

### Criando o Schema e carregando o DataSet

In [3]:
schema = StructType(
    [
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("email", StringType(), True),
        StructField("phone", StringType(), True),
        StructField("address", StringType(), True),
        StructField("age", IntegerType(), True),
        StructField("create_date", TimestampType(), True),
        StructField("update_date", TimestampType(), True),
    ]
)

dataset = (
    spark.read.option("header", True)
    .option("inferSchema", "true")
    .schema(schema)
    .csv("./data/input/users/")
)

In [4]:
dataset.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- create_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = true)



### Realizando o Dedup

In [5]:
dataset_dedup = (
    dataset.orderBy("update_date", ascending=False)
    .groupBy("id", "create_date")
    .agg(
        collect_list("name").alias("l_name"),
        collect_list("email").alias("l_email"),
        collect_list("phone").alias("l_phone"),
        collect_list("address").alias("l_address"),
        collect_list("age").alias("l_age"),
        collect_list("update_date").alias("l_update_date"),
    )
)

In [6]:
dataset_dedup.show(n=5)

+---+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------------------+
| id|         create_date|              l_name|             l_email|             l_phone|           l_address|       l_age|       l_update_date|
+---+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------------------+
|  1|2018-03-03 18:47:...|[david.lynch@cogn...|[David Lynch, Dav...|[(11) 99999-9999,...|[Mulholland Drive...|[72, 72, 72]|[2018-05-23 10:13...|
|  3|2018-05-19 04:07:...|[spongebob.square...|[Spongebob Square...|[(11) 98765-4321,...|[122 Conch Street...|    [13, 13]|[2018-05-19 05:08...|
|  2|2018-04-21 20:21:...|[sherlock.holmes@...|   [Sherlock Holmes]|   [(11) 94815-1623]|[221B Baker Stree...|        [34]|[2018-04-21 20:21...|
+---+--------------------+--------------------+--------------------+--------------------+--------------------+------------+-------

In [7]:
dataset_dedup = spark.createDataFrame(
    data=[
        Row(
            **dict(
                [
                    (key.replace("l_", ""), value[0])
                    for key, value in row.asDict().items()
                    if "l_" in key
                ],
                id=row["id"],
                create_date=row["create_date"],
            )
        )
        for row in dataset_dedup.collect()
    ]
)

### Persistindo o DataSet final em parquet

In [8]:
dataset_dedup.write.mode('overwrite').parquet('./data/output/users')