In [1]:
import findspark
findspark.init("C:\spark\spark")
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-core_2.11:0.4.0 pyspark-shell'
import pyspark
from pyspark.sql import SparkSession
#import pandas as pd
#import numpy as np

sc = pyspark.SparkContext.getOrCreate()

sc.version

spark = SparkSession.builder.getOrCreate()

df = spark.sql("select 'spark' as hello ")

df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [2]:
# explorando os dados
df = spark.read.csv("teste-eng-dados/data/input/users/load.csv", header=True, inferSchema=True)
df.show(10)

+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
| id|                name|               email|          phone|             address|age|         create_date|         update_date|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9997|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-03-03 18:47:...|
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9998|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-04-14 17:09:...|
|  2|sherlock.holmes@c...|     Sherlock Holmes|(11) 94815-1623|221B Baker Street...| 34|2018-04-21 20:21:...|2018-04-21 20:21:...|
|  3|spongebob.squarep...|Spongebob Squarep...|(11) 91234-5678|124 Conch Street,...| 13|2018-05-19 04:07:...|2018-05-19 04:07:...|
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9999|Mulholland Drive,...

In [3]:
df.dtypes

[('id', 'int'),
 ('name', 'string'),
 ('email', 'string'),
 ('phone', 'string'),
 ('address', 'string'),
 ('age', 'int'),
 ('create_date', 'timestamp'),
 ('update_date', 'timestamp')]

Como pede a instrução (3) o arquivo types_mapping.json marca os campos e os respectivos tipos desejados de output,
porém quando utilizamos inferSchema=True na leitura do CSV ele já consegue identificar o que pede o JSON.

age:"integer"
create_date:"timestamp"
update_date:"timestamp"

Caso já tenhamos o schema em json podemos importa-lo como schema, porém o problema com o JSON é que realmente não há garantia em relação à ordenação de campos, sem mencionar o tratamento de campos ausentes, tipos inconsistentes etc.

Para resolver isso podemos copiar o schema padrão como string, alteramos e importamos novamente para o dataframe

In [4]:
df2 = spark.read.csv("teste-eng-dados/data/input/users/load.csv", header=True)
df2.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- age: string (nullable = true)
 |-- create_date: string (nullable = true)
 |-- update_date: string (nullable = true)



In [5]:
from pyspark.sql.types import *
import json
# exportando o schema do dataframe 
schema_json = df2.schema.json()
print(schema_json)

{"fields":[{"metadata":{},"name":"id","nullable":true,"type":"string"},{"metadata":{},"name":"name","nullable":true,"type":"string"},{"metadata":{},"name":"email","nullable":true,"type":"string"},{"metadata":{},"name":"phone","nullable":true,"type":"string"},{"metadata":{},"name":"address","nullable":true,"type":"string"},{"metadata":{},"name":"age","nullable":true,"type":"string"},{"metadata":{},"name":"create_date","nullable":true,"type":"string"},{"metadata":{},"name":"update_date","nullable":true,"type":"string"}],"type":"struct"}


In [6]:
schema_json2 = eval(schema_json.replace("true", "True"))
#usando o types_mapping como input
with open('teste-eng-dados/config/types_mapping.json') as f:
    data = json.load(f)
data.update(id = 'integer')
for field in schema_json2['fields']:
    if field['name'] in data.keys():
        field['type'] = data[field['name']]
schema_json2 = json.dumps(schema_json2)
# alterações feitas agora vamos carregar o Python Object string para Json, também coloquei o id com integer para melhor processamento dos dados.
new_schema = StructType.fromJson(json.loads(schema_json2))

In [7]:
#agora podemos carregar novamente o dataframe colocando o argumento schema na leitura
df2 = spark.read.csv("teste-eng-dados/data/input/users/load.csv", header=True, schema=new_schema)
df2.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- create_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = true)



In [8]:
"""estes dados possuem um id para cada usuario, então podemos ordernar os dados pela coluna update_date em ordem decrescente.
Spark tem uma função chamada .dropDuplicates que remove linhas duplicadas baseada em uma coluna, """

df_drop = df2.orderBy(df2.update_date.desc()).dropDuplicates(['id'])

In [9]:
# As colunas email e nome estão com os dados trocados
df_renamed = df_drop.withColumnRenamed("name", "e_mail").withColumnRenamed("email", "name").withColumnRenamed("e_mail", "email")
df_renamed.show(5)

+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
| id|               email|                name|          phone|             address|age|         create_date|         update_date|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9999|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-05-23 10:13:...|
|  3|spongebob.squarep...|Spongebob Squarep...|(11) 98765-4321|122 Conch Street,...| 13|2018-05-19 04:07:...|2018-05-19 05:08:...|
|  2|sherlock.holmes@c...|     Sherlock Holmes|(11) 94815-1623|221B Baker Street...| 34|2018-04-21 20:21:...|2018-04-21 20:21:...|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+



In [10]:
# salvando os dados em um formato colunar de alta performance 'parquet', arquivos CSV em spark são lentos para analisar, com parquet mesmo em datasets pequenos,
# as queries são mais rápidas e usam menos banda da rede (predicate pushdown), o que diminui o custo.
df_renamed.write.parquet("teste-eng-dados/data/output/load.parquet", mode="overwrite")