# Teste engenharia de dados Cognitivo.AI ##

In [2]:
from pyspark.sql import functions as f
from pyspark.sql import  *
from pyspark.sql.types import *
import json


In [3]:
spark = SparkSession \
    .builder \
    .appName("Teste Eng Dados") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

<h3> Ingestão do volume de dados </h3>
Extração dos dados de acordo com o diretorio.

In [26]:
df = spark.read.load(r"/home/jovyan/work/input/users/load.csv" ,
                         format="csv", sep=",", inferSchema="true", header="true")

df.show()

+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
| id|                name|               email|          phone|             address|age|         create_date|         update_date|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9997|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-03-03 18:47:...|
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9998|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-04-14 17:09:...|
|  2|sherlock.holmes@c...|     Sherlock Holmes|(11) 94815-1623|221B Baker Street...| 34|2018-04-21 20:21:...|2018-04-21 20:21:...|
|  3|spongebob.squarep...|Spongebob Squarep...|(11) 91234-5678|124 Conch Street,...| 13|2018-05-19 04:07:...|2018-05-19 04:07:...|
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9999|Mulholland Drive,...

<h3> Transformação </h3>
* Deduplicação dos dados convertidos
* Conversão do tipo dos dados


In [27]:

dataType = open("/home/jovyan/work/config/types_mapping.json","r")
types_mapping= json.load(dataType)





df = df \
        .withColumn("age", df["age"].cast(types_mapping['age'])) \
          .withColumn("create_date", df["create_date"].cast(types_mapping['update_date'])) \
            .withColumn("update_date", df["update_date"].cast(types_mapping['update_date'])) \

                

df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- create_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = true)



In [28]:
df = df\
.orderBy("id",  f.col("update_date").desc())\
.dropDuplicates(["id"])

df.show()

+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
| id|                name|               email|          phone|             address|age|         create_date|         update_date|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9999|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-05-23 10:13:...|
|  3|spongebob.squarep...|Spongebob Squarep...|(11) 98765-4321|122 Conch Street,...| 13|2018-05-19 04:07:...|2018-05-19 05:08:...|
|  2|sherlock.holmes@c...|     Sherlock Holmes|(11) 94815-1623|221B Baker Street...| 34|2018-04-21 20:21:...|2018-04-21 20:21:...|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+



<h3> Conversão para um formato alta performance de leitura </h3>
O formato Parquet se propõe a ser uma importante ferramenta para tornar mais eficiente o armazenamento e processamento de grandes volumes de dados.



In [29]:
df.repartition(1).write.mode('overwrite').parquet(r"/home/jovyan/work/output/load.parquet")

df = spark.read.parquet("/home/jovyan/work/output/load.parquet")
print(df.collect())


[Row(id=1, name='david.lynch@cognitivo.ai', email='David Lynch', phone='(11) 99999-9999', address='Mulholland Drive, Los Angeles, CA, US', age=72, create_date=datetime.datetime(2018, 3, 3, 18, 47, 1, 954000), update_date=datetime.datetime(2018, 5, 23, 10, 13, 59, 594000)), Row(id=3, name='spongebob.squarepants@cognitivo.ai', email='Spongebob Squarepants', phone='(11) 98765-4321', address='122 Conch Street, Bikini Bottom, Pacific Ocean', age=13, create_date=datetime.datetime(2018, 5, 19, 4, 7, 6, 854000), update_date=datetime.datetime(2018, 5, 19, 5, 8, 7, 964000)), Row(id=2, name='sherlock.holmes@cognitivo.ai', email='Sherlock Holmes', phone='(11) 94815-1623', address='221B Baker Street, London, UK', age=34, create_date=datetime.datetime(2018, 4, 21, 20, 21, 24, 364000), update_date=datetime.datetime(2018, 4, 21, 20, 21, 24, 364000))]
