In [1]:
import json

from pyspark.sql.session import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col

In [2]:
spark = SparkSession.builder.config("spark.executor.memory", "1g").enableHiveSupport().getOrCreate()

- Apesar de ORC tem um melhor desempenho na criação de arquivo e melhor compressão (segundo https://medium.com/@dhareshwarganesh/benchmarking-parquet-vs-orc-d52c39849aef)
- Optei por utilizar o Parquet por apresentar melhores resultados na execução das queries

In [3]:
df = spark.read.format("csv").option("inferSchema", "false").option("header", "true").load("data/input/users/load.csv")

df.coalesce(1).write.parquet("data/output/request_1.parquet")

In [4]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- age: string (nullable = true)
 |-- create_date: string (nullable = true)
 |-- update_date: string (nullable = true)



In [5]:
df.show()

+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
| id|                name|               email|          phone|             address|age|         create_date|         update_date|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9997|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-03-03 18:47:...|
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9998|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-04-14 17:09:...|
|  2|sherlock.holmes@c...|     Sherlock Holmes|(11) 94815-1623|221B Baker Street...| 34|2018-04-21 20:21:...|2018-04-21 20:21:...|
|  3|spongebob.squarep...|Spongebob Squarep...|(11) 91234-5678|124 Conch Street,...| 13|2018-05-19 04:07:...|2018-05-19 04:07:...|
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9999|Mulholland Drive,...

In [6]:
windowSpec = Window.partitionBy(df["id"]).orderBy(df["update_date"].desc())

df_rank = df.withColumn("rank", rank().over(windowSpec))
df_deduplicate = df_rank.filter("rank=1").drop("rank")

df_deduplicate.coalesce(1).write.parquet("data/output/request_2.parquet")

In [7]:
df_deduplicate.show()

+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
| id|                name|               email|          phone|             address|age|         create_date|         update_date|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
|  3|spongebob.squarep...|Spongebob Squarep...|(11) 98765-4321|122 Conch Street,...| 13|2018-05-19 04:07:...|2018-05-19 05:08:...|
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9999|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-05-23 10:13:...|
|  2|sherlock.holmes@c...|     Sherlock Holmes|(11) 94815-1623|221B Baker Street...| 34|2018-04-21 20:21:...|2018-04-21 20:21:...|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+



In [8]:
df_deduplicate.count()

3

In [9]:
df_mapping = df_deduplicate

with open("config/types_mapping.json") as file:
    data = json.load(file)
    for value in data:
        df_mapping = df_mapping.withColumn(value, col(value).cast(data[value]))

df_mapping.coalesce(1).write.parquet("data/output/request_3.parquet")

In [10]:
df_mapping.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- create_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = true)



In [11]:
df_mapping.show()

+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
| id|                name|               email|          phone|             address|age|         create_date|         update_date|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
|  3|spongebob.squarep...|Spongebob Squarep...|(11) 98765-4321|122 Conch Street,...| 13|2018-05-19 04:07:...|2018-05-19 05:08:...|
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9999|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-05-23 10:13:...|
|  2|sherlock.holmes@c...|     Sherlock Holmes|(11) 94815-1623|221B Baker Street...| 34|2018-04-21 20:21:...|2018-04-21 20:21:...|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+

