# Exemplo de Integração de Dados


### Funcionalidades demonstradas

1. Mapeamento dos esquemas dos arquivos CSVs presentes no diretório data/input/users através da correlação das colunas correspondentes (seguindo o arquivo schema_mapping.json presente na pasta config);

2. Conversão do formato dos arquivo integrado para um formato colunar de alta performance de leitura (Parquet).

3. Deduplicação dos dados convertidos: No conjunto de dados convertidos há múltiplas entradas para um mesmo registro, variando apenas os valores de alguns dos campos entre elas. Foi necessário realizar um processo de deduplicação destes dados, a fim de apenas manter a última entrada de cada registro, usando como referência o id para identificação dos registros duplicados e a data de atualização (update_date) para definição do registro mais recente;

4. Conversão do tipo dos dados deduplicados: No diretório config há um arquivo JSON de configuração (types_mapping.json), contendo os nomes dos campos e os respectivos tipos desejados de output. Utilizando esse arquivo como input, foi realizado um processo de conversão dos tipos dos campos descritos, no conjunto de dados deduplicados.

### Notas gerais
- Todas as operações foram realizadas utilizando Spark.

- Cada operação utilizou como base o dataframe resultante do passo anterior, sendo persistido em arquivos Parquet.

- Houve a transformação de tipos de dados em alguns campos (id, age, create_date, update_date)

### Referências

[1] PLASE, D.; NIEDRITE, L.; TARANOVS, R. A comparison of HDFS compact data formats: Avro versus Parquet / HDFS glaustųjų duomenų formatų palyginimas: Avro prieš Parquet. Mokslas – Lietuvos ateitis / Science – Future of Lithuania, v. 9, n. 3, p. 267-276, 4 jul. 2017.

## Carregamento dos arquivos de entrada e configuração

In [1]:
!wget -P data/input/users/ https://raw.githubusercontent.com/ifpb/Integracao-dados-overview/main/data/input/users/load.csv
!wget -P data/input/users/ https://raw.githubusercontent.com/ifpb/Integracao-dados-overview/main/data/input/users/load2.csv
!wget -P config/ https://raw.githubusercontent.com/ifpb/Integracao-dados-overview/main/config/schema_mapping.json
!wget -P config/ https://raw.githubusercontent.com/ifpb/Integracao-dados-overview/main/config/types_mapping_full.json

--2023-08-23 00:27:28--  https://raw.githubusercontent.com/ifpb/Integracao-dados-overview/main/data/input/users/load.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.111.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 655 [text/plain]
Saving to: 'data/input/users/load.csv.1'


2023-08-23 00:27:29 (20.8 MB/s) - 'data/input/users/load.csv.1' saved [655/655]

--2023-08-23 00:27:29--  https://raw.githubusercontent.com/ifpb/Integracao-dados-overview/main/data/input/users/load2.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.108.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 446 [text/plain]
Saving to: 'data/input/u

## Instalação das dependências

In [8]:
!pip install findspark==2.0.1
!pip install numpy==1.21.6
!pip install pandas==1.3.5
!pip install py4j==0.10.9.5
!pip install pyspark==3.3.0
!pip install python-dateutil==2.8.2
!pip install pytz==2022.2.1
!pip install six==1.16.0



## Carregamento da Configuração


In [9]:
CONFIG = {
    'INPUT_FILE_1': 'data/input/users/load.csv',
    'INPUT_FILE_2': 'data/input/users/load2.csv',
    'TYPES_MAPPING': 'config/types_mapping_full.json',
    'SCHEMA_MAPPING': 'config/schema_mapping.json',
    'OUTPUT_PATH': 'data/output/users.parquet',
    'OUTPUT_PATH_CSV': 'data/output/users.csv',
    'OUTPUT_PATH_DEDUPLICATED': 'data/output/users-deduplicated.parquet',
    'APP_NAME': 'Demonstracao-IntegracaoDados'
}

## Inicialização do Spark

In [10]:
import os

import findspark
from pyspark.sql.types import StructType

findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName(CONFIG['APP_NAME']).getOrCreate()

## Etapa 1 - Mapeamento de Esquemas e conversão de formatos

In [11]:

"""
Carrega o arquivo de entrada (load.csv), realiza mapeamento do esquema conforme o arquivo JSON (ex., types_mapping.json)

@see config.py para ter acesso às configurações dos arquivos que são carregados 
"""
def convert_data_types_and_formats():

    ## Obtém a lista de campos na ordem original presente no CSV de entrada
    fields1 = spark.read.csv(CONFIG['INPUT_FILE_1'], header=False).first()
    fields2 = spark.read.csv(CONFIG['INPUT_FILE_2'], header=False).first()
    fields = [f for f in fields1]

    ## Carrega o JSON contendo uma lista indicando o tipo de dado das colunas mapeadas
    type_mapping = spark.read.json(CONFIG['TYPES_MAPPING'], multiLine=True)
    schema_mappping = spark.read.json(CONFIG['SCHEMA_MAPPING'], multiLine=True)

    ## Constrói um dicionário fazendo a mescla com os dados que foram definidos no JSON
    ## Isso é necessário para permitir que elementos do esquema definido no JSON sejam descritos em qualquer ordem
    schema_dict = next(map(lambda row: row.asDict(), type_mapping.collect()))
    schema_dict = _transform_dict(_create_dict(fields, schema_dict))
    schema = StructType.fromJson(schema_dict)

    ## De posse do esquema pronto a ser utilizado, é feito o carregamento dos dados do CSV
    df1 = spark.read.csv(CONFIG['INPUT_FILE_1'], header=True, mode="DROPMALFORMED", schema=schema)
    df2 = spark.read.csv(CONFIG['INPUT_FILE_2'], header=True, inferSchema=True)
    df2 = df2.drop("internal_id")

    df1.show()
    df2.show()

    ## Colunas correspondentes são mapeadas e os dois conjuntos são mesclados
    for source, target in zip(schema_mappping.columns, schema_mappping.collect().pop()):
        df2 = df2.withColumnRenamed(source, target)
    df3 = df1.unionByName(df2)

    ## Exibe o resultado (note que o esquema segue o que foi definido no arquivo JSON)
    df3.show()
    print(df3.printSchema)

    ## Salva os dados carregados como Parquet no diretório indicado
    if not os.path.isdir(CONFIG['OUTPUT_PATH']):
        df3.write.parquet(CONFIG['OUTPUT_PATH'])

def _transform_dict(d):
    """
    Recebe um dicionário e retorna a versão mapeada para ser utilizada como StructField

    :param d: o dicionário a ser transformado, contendo como chave o nome da coluna e como valor o tipo de dado
    :return: versão de dicionário compatível com os campos da StructType
    """
    newdict = {}
    fields = []
    for k,v in d.items():
        item = {}
        item['name'] = k
        item['type'] = v
        item['nullable'] = True
        item['metadata'] = {}
        fields.append(item)
    newdict['fields'] = fields
    newdict['type'] = 'struct'
    return newdict

def _create_dict(fields, schema_dict):
    """"
    Cria um dicionário a partir da lista completa de campos lidos do CSV e dos itens mapeados no JSON
    Caso um elemento presente no CSV não seja mapeado no JSON, o seu tipo de dado é atribuído como string
    """
    newdict = {}
    for f in fields:
        if f in schema_dict:
            newdict[f] = schema_dict[f]
        else:
            newdict[f] = 'string'
    return newdict


convert_data_types_and_formats()

+---+--------------------+---------------+---------------+--------------------+---+--------------------+--------------------+
| id|               email|           name|          phone|             address|age|         create_date|         update_date|
+---+--------------------+---------------+---------------+--------------------+---+--------------------+--------------------+
|  1|david.lynch@compa...|    David Lynch|(11) 99999-9997|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-03-03 18:47:...|
|  1|david.lynch@compa...|    David Lynch|(11) 99999-9998|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-04-14 17:09:...|
|  2|sherlock.holmes@c...|Sherlock Holmes|(11) 94815-1623|221B Baker Street...| 34|2018-04-21 20:21:...|2018-04-21 20:21:...|
|  1|david.lynch@compa...|    David Lynch|(11) 99999-9999|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-05-23 10:13:...|
+---+--------------------+---------------+---------------+--------------------+---+--------------------+--------------

## Etapa 2 - Deduplicação de Dados

In [12]:
import pyspark.sql.functions as func

"""
Recupera a lista de usuários construída no arquivo (converter.py) e realiza a remoção das instâncias duplicadas
"""
def deduplicate():
    ## Carrega lista de usuários persistida no formato Parquet
    users = spark.read.parquet(CONFIG['OUTPUT_PATH'])

    ## Cria um grupo contendo um id único e a data de última atualização das instâncias vinculadas ao id corrente
    cluster = users.groupBy('id').agg(func.max("update_date").alias('update_date'))

    ## Faz o join do dataframe completo com o grupo, removendo as duplicatas
    users_deduplicated = users.join(cluster, ['id', 'update_date'])\
        .sort(users.id.asc())

    ## Exibe o resultado
    users_deduplicated.show()

    ## Persiste o novo dataframe em um novo Parquet
    if not os.path.isdir(CONFIG['OUTPUT_PATH_DEDUPLICATED']):
        users_deduplicated.write.parquet(CONFIG['OUTPUT_PATH_DEDUPLICATED'])
    return users_deduplicated

deduplicate()

+---+--------------------+--------------------+--------------------+---------------+--------------------+---+--------------------+
| id|         update_date|               email|                name|          phone|             address|age|         create_date|
+---+--------------------+--------------------+--------------------+---------------+--------------------+---+--------------------+
|  1|2018-05-23 10:13:...|david.lynch@compa...|         David Lynch|(11) 99999-9999|Mulholland Drive,...| 72|2018-03-03 18:47:...|
|  2|2018-04-21 20:21:...|sherlock.holmes@c...|     Sherlock Holmes|(11) 94815-1623|221B Baker Street...| 34|2018-04-21 20:21:...|
|  3|2018-05-19 05:08:...|spongebob.squarep...|Spongebob Squarep...|(11) 98765-4321|122 Conch Street,...| 13|2018-05-19 04:07:...|
+---+--------------------+--------------------+--------------------+---------------+--------------------+---+--------------------+



DataFrame[id: int, update_date: timestamp, email: string, name: string, phone: string, address: string, age: int, create_date: timestamp]

## Etapa 3 - Operações

In [13]:
def operations():
    users = spark.read.parquet(CONFIG['OUTPUT_PATH_DEDUPLICATED'])
    users = users.sort(users.id.asc())
    users.show()
    print("Total de usuários = ", users.count())
    users_pd = users.toPandas()
    print("Média de idade = ", users_pd['age'].mean())
    print("Usuário mais velho = ", users.select('name', 'email', 'age').sort(users.age.desc()).first())
    print("Usuário mais novo = ", users.select('name', 'email', 'age').sort(users.age.asc()).first())

operations()

+---+--------------------+--------------------+--------------------+---------------+--------------------+---+--------------------+
| id|         update_date|               email|                name|          phone|             address|age|         create_date|
+---+--------------------+--------------------+--------------------+---------------+--------------------+---+--------------------+
|  1|2018-05-23 10:13:...|david.lynch@compa...|         David Lynch|(11) 99999-9999|Mulholland Drive,...| 72|2018-03-03 18:47:...|
|  2|2018-04-21 20:21:...|sherlock.holmes@c...|     Sherlock Holmes|(11) 94815-1623|221B Baker Street...| 34|2018-04-21 20:21:...|
|  3|2018-05-19 05:08:...|spongebob.squarep...|Spongebob Squarep...|(11) 98765-4321|122 Conch Street,...| 13|2018-05-19 04:07:...|
+---+--------------------+--------------------+--------------------+---------------+--------------------+---+--------------------+

Total de usuários =  3
Média de idade =  39.666666666666664
Usuário mais velho =  