# Imports e Constantes

In [1]:
import json

from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, dense_rank, rank
from pyspark.sql.types import IntegerType, StringType, StructField, StructType, TimestampType

TYPES_MAPPING = "../config/types_mapping.json"
INPUT_PATH = "../data/input/users/load.csv"
OUTPUT_PATH = "../data/output/users/users.parquet"
DEDUPLICATED_PATH = "../data/output/users/deduplicated.parquet"

# Conversão de Tipos

In [2]:
def str_to_dtypes(type: str):
    """Responsavel por converter para pyspark types"""
    
    data = { "integer": IntegerType(), "timestamp": TimestampType()}    
    try:
        return data[type]
    except KeyError:        
        return StringType()

# Spark Session

In [3]:
spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        getOrCreate()

# Criação de Schema. (types_mapping.json)

In [4]:
with open(TYPES_MAPPING) as f:
    data = json.load(f)
    
schema = StructType([
    StructField("id", IntegerType(), True), 
    StructField("name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("address", StringType(), True),
    StructField("age", str_to_dtypes(data["age"]), True),
    StructField("create_date", str_to_dtypes(data["create_date"]), True),
    StructField("update_date", str_to_dtypes(data["update_date"]), True)
])

schema.simpleString()

'struct<id:int,name:string,email:string,phone:string,address:string,age:int,create_date:timestamp,update_date:timestamp>'

# Criação do DataFrame com Schema

In [5]:
df_raw = spark.read.option("header","true").schema(schema).csv(INPUT_PATH)

In [6]:
df_raw.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- create_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = true)



# Deduplicação dos dados

In [7]:
window = Window.partitionBy("id").orderBy(col("update_date").desc())
df_raw.withColumn("rank", rank().over(window)).show(truncate=False)

+---+----------------------------------+---------------------+---------------+----------------------------------------------+---+--------------------------+--------------------------+----+
|id |name                              |email                |phone          |address                                       |age|create_date               |update_date               |rank|
+---+----------------------------------+---------------------+---------------+----------------------------------------------+---+--------------------------+--------------------------+----+
|1  |david.lynch@cognitivo.ai          |David Lynch          |(11) 99999-9999|Mulholland Drive, Los Angeles, CA, US         |72 |2018-03-03 18:47:01.954752|2018-05-23 10:13:59.594752|1   |
|1  |david.lynch@cognitivo.ai          |David Lynch          |(11) 99999-9998|Mulholland Drive, Los Angeles, CA, US         |72 |2018-03-03 18:47:01.954752|2018-04-14 17:09:48.558151|2   |
|1  |david.lynch@cognitivo.ai          |David Lynch    

# DataFrame de dados Deduplicados

In [8]:
df_deduplicated = df_raw.withColumn("rank",rank().over(window)).where(col("rank") != 1)

# DataFrame de dados sem Deduplicação

In [9]:
df_no_deduplicated = df_raw.withColumn("rank",rank().over(window)).where(col("rank") == 1)

# Salvando arquivos em formato Parquet

In [10]:
df_deduplicated = df_deduplicated.drop("rank").write.mode("overwrite").parquet(DEDUPLICATED_PATH)
df_no_deduplicated = df_no_deduplicated.drop("rank").write.mode("overwrite").parquet(OUTPUT_PATH)

                                                                                

In [11]:
df_deduplicated_parquet = spark.read.parquet(DEDUPLICATED_PATH)
df_no_deduplicated_parquet = spark.read.parquet(OUTPUT_PATH)

In [12]:
df_deduplicated_parquet.show(truncate=False)

+---+----------------------------------+---------------------+---------------+----------------------------------------------+---+--------------------------+--------------------------+
|id |name                              |email                |phone          |address                                       |age|create_date               |update_date               |
+---+----------------------------------+---------------------+---------------+----------------------------------------------+---+--------------------------+--------------------------+
|3  |spongebob.squarepants@cognitivo.ai|Spongebob Squarepants|(11) 91234-5678|124 Conch Street, Bikini Bottom, Pacific Ocean|13 |2018-05-19 04:07:06.854752|2018-05-19 04:07:06.854752|
|1  |david.lynch@cognitivo.ai          |David Lynch          |(11) 99999-9998|Mulholland Drive, Los Angeles, CA, US         |72 |2018-03-03 18:47:01.954752|2018-04-14 17:09:48.558151|
|1  |david.lynch@cognitivo.ai          |David Lynch          |(11) 99999-9997|Mu

In [13]:
df_no_deduplicated_parquet.show(truncate=False)

+---+----------------------------------+---------------------+---------------+----------------------------------------------+---+--------------------------+--------------------------+
|id |name                              |email                |phone          |address                                       |age|create_date               |update_date               |
+---+----------------------------------+---------------------+---------------+----------------------------------------------+---+--------------------------+--------------------------+
|3  |spongebob.squarepants@cognitivo.ai|Spongebob Squarepants|(11) 98765-4321|122 Conch Street, Bikini Bottom, Pacific Ocean|13 |2018-05-19 04:07:06.854752|2018-05-19 05:08:07.964752|
|2  |sherlock.holmes@cognitivo.ai      |Sherlock Holmes      |(11) 94815-1623|221B Baker Street, London, UK                 |34 |2018-04-21 20:21:24.364752|2018-04-21 20:21:24.364752|
|1  |david.lynch@cognitivo.ai          |David Lynch          |(11) 99999-9999|Mu