# Script ETL para a dimensão CONCORRENTE (e BAIRRO)
*autor: André Costa (2019-03-09)* [https://www.linkedin.com/in/a-l-costa]

- Este script executa o processo ETL para gerar a dimensão CONCORRENTE, e também a mini-dimensão BAIRRO, diretamente associada
- *AVISO:* Este script assume que os dados para gerar a dimensão (provenientes de diversos arquivos separados) contenham todos os registros necessários
- *NOTA:* BAIRRO é uma mini-dimensão que faz parte da dimensão CONCORRENTE. Assim sendo, registro de bairros não associados aos concorrentes são ignorados

In [1]:
from pyspark.sql.types import *
spark.sql("use geodata")

DataFrame[]

## Extração do arquivo *concorrentes.csv*
- O arquivo lido gera um Spark Dataframe, que é imediatamente registrado no Hive com nome **raw_conc**

In [2]:
rawConcSchema = StructType([
             StructField('codigo', LongType()),
             StructField('nome', StringType()),
             StructField('categoria', StringType()),
             StructField('faixa_preco', IntegerType()),
             StructField('endereco', StringType()),
             StructField('municipio', StringType()),
             StructField('uf', StringType()),
             StructField('bairro_id', LongType()),
            ])
rawConc = spark.read.csv("file:/home/hadoop/data/concorrentes.csv", schema = rawConcSchema, header = True)
rawConc.registerTempTable('raw_conc')
rawConc.take(3)

[Row(codigo=431962533652067, nome='Boizão Lanches', categoria='Bar', faixa_preco=2, endereco='13190-000 Monte Mor', municipio='Monte Mor', uf='SP', bairro_id=None),
 Row(codigo=1663855903830869, nome='Bar do Serjão', categoria='Bar', faixa_preco=0, endereco='Rua das Dracenas, Americana', municipio='Americana', uf='SP', bairro_id=None),
 Row(codigo=567824576564110, nome='Recanto Do Kuca', categoria='Restaurant', faixa_preco=0, endereco='Jarinu', municipio='Jarinu', uf='SP', bairro_id=None)]

## Extração do arquivo *bairro.csv*
- O arquivo lido gera um Spark Dataframe, que é imediatamente registrado no Hive com nome **raw_bairro**

In [3]:
rawBairroSchema = StructType([
             StructField('codigo', LongType()),
             StructField('nome', StringType()),
             StructField('municipio', StringType()),
             StructField('uf', StringType()),
             StructField('area', FloatType()),
            ])
rawBairro = spark.read.csv("file:/home/hadoop/data/bairros.csv", schema = rawBairroSchema, header = True)
rawBairro.registerTempTable('raw_bairro')
rawBairro.take(3)

[Row(codigo=355620110, nome='Observatório', municipio='Valinhos', uf='SP', area=68.00090026855469),
 Row(codigo=3519071024, nome='Rp 6-24', municipio='Hortolândia', uf='SP', area=0.981768012046814),
 Row(codigo=3536505002, nome='Jardim De Itapoan', municipio='Paulínia', uf='SP', area=0.8085370063781738)]

## Extração do arquivo *populacao.json*
- O arquivo lido gera um Spark Dataframe, que é imediatamente registrado no Hive com nome **raw_pop**

In [4]:
#from pyspark.sql.functions import udf

rawPopSchema = StructType([
             StructField('codigo', LongType()),
             StructField('populacao', IntegerType()),
            ])
rawPop = spark.read.json("file:/home/hadoop/data/populacao.json", schema = rawPopSchema)
#rawPop = rawPop.withColumn("dens_demo", 0)
rawPop.registerTempTable('raw_pop')
rawPop.take(3)

[Row(codigo=355620110, populacao=8717),
 Row(codigo=3519071024, populacao=5764),
 Row(codigo=3536505002, populacao=1195)]

## Extração do arquivo *potencial.csv*
- O arquivo lido gera um Spark Dataframe, que é imediatamente registrado no Hive com nome **raw_pot**

In [5]:
rawPotSchema = StructType([
             StructField('codigo', LongType()),
             StructField('agencias', IntegerType()),
             StructField('empresas', IntegerType()),
             StructField('empregados', IntegerType()),
             StructField('renda', StringType()),
             StructField('faturamento', StringType()),
            ])
rawPot = spark.read.csv("file:/home/hadoop/data/potencial.csv", schema = rawPotSchema, header = True)
rawPot.registerTempTable('raw_pot')
rawPot.take(3)

[Row(codigo=355620110, agencias=2, empresas=73, empregados=772, renda='D', faturamento='28,467.00'),
 Row(codigo=3519071024, agencias=3, empresas=429, empregados=1004, renda='E', faturamento='2,707.20'),
 Row(codigo=3536505002, agencias=1, empresas=176, empregados=1663, renda='C', faturamento='28,580.00')]

## Expansão da mini-dimensão BAIRRO
- Nesta etapa, os dados provenientes de três arquivos diretamente associados à informação de localidade são combinados em uma única tabela
- Adicionalmente, nesta etapa também é realizada a seleção dos registros de bairro diretamente associados aos estabelecimentos sendo processados

In [6]:
spark.sql('create TEMPORARY view exp_bairro AS select a.codigo, \
    a.nome, a.area, b.populacao, c.agencias, c.empresas, c.empregados, \
    c.renda, CAST(REPLACE(c.faturamento, ",", "") as FLOAT) as faturamento FROM \
    (select a.codigo, a.nome, a.area FROM raw_bairro as a WHERE a.codigo in (select bairro_id FROM \
    raw_conc WHERE isnotnull(bairro_id))) as a \
    LEFT join raw_pop as b ON a.codigo = b.codigo \
    LEFT join raw_pot as c ON a.codigo = c.codigo')

DataFrame[]

## Verificação de alterações dos registros da mini-dimensão BAIRRO
- Neste etapa, os registros comuns existentes no warehouse são obtidos e verificados quanto à atualização de atributos considerados de tipo 2

In [7]:
spark.sql('create TEMPORARY view old_bairro AS \
    select a.*, IF(a.populacao=b.populacao and a.agencias=b.agencias and \
    a.empresas=b.empresas and a.empregados=b.empregados and a.renda=b.renda \
    and a.faturamento=b.faturamento, 0, 1) as has_changed FROM bairro as a \
    INNER join exp_bairro as b ON a.codigo = b.codigo and isnull(a.end_date)')

DataFrame[]

## Preparação dos dados para inserção (BAIRRO)
- Nesta etapa, os registros existentes no banco de dados expirados são atualizados com o preenchimento do atributo *end_date*
- Adicionalmente, são atribuídas as chaves SK à todos os registros novos e os que foram atualizados, por meio do uso de uma função hash combinando a chave natural e a hora corrente
- Finalmente, os registros de ambos subsets são combinados

In [8]:
#spark.sql('truncate table bairro')
expired_bairro = spark.sql('select id, codigo, nome, area, populacao, dens_demo, agencias, \
    empresas, empregados, renda, faturamento, start_date, from_unixtime(unix_timestamp()) as end_date \
    from old_bairro WHERE has_changed=1')

new_bairro = spark.sql('select IF(NVL(b.has_changed, 1)==1,hash(a.codigo, unix_timestamp()),b.id) as id, \
    a.nome, a.codigo, a.area, a.populacao, \
    IF(isnull(a.area) or a.area=0, null, a.populacao / a.area) as dens_demo, \
    a.agencias, a.empresas, a.empregados, a.renda, a.faturamento, \
    IF(NVL(b.has_changed, 1)==1, from_unixtime(unix_timestamp()), b.start_date) as start_date, \
    null as end_date FROM exp_bairro as a LEFT join old_bairro as b ON a.codigo=b.codigo')

new_bairro = new_bairro.union(expired_bairro)
print('Contagem de registros:', new_bairro.count())
new_bairro.show(5)
new_bairro.registerTempTable('new_bairro')

Contagem de registros: 106
+-----------+-----------------+----------+--------+---------+------------------+--------+--------+----------+-----+-----------+-------------------+--------+
|         id|             nome|    codigo|    area|populacao|         dens_demo|agencias|empresas|empregados|renda|faturamento|         start_date|end_date|
+-----------+-----------------+----------+--------+---------+------------------+--------+--------+----------+-----+-----------+-------------------+--------+
| 1606669759|        Indústria| 355620112| 19.6054|     9315|475.12419840457267|       4|     191|       476|    C|   244775.0|2019-03-11 22:16:33|    null|
|-1841388482|          Rp 6-25|3519071025| 2.16498|    21943|10135.428808543591|       3|     406|      2600|    C|   490860.0|2019-03-11 22:16:33|    null|
| 1873609668|   Alto Pinheiros|3536505022| 1.91767|     6820|  3556.39915054671|       4|     100|       522|    C|   183550.0|2019-03-11 22:16:33|    null|
| 1598048694|Jardim De Itapoan|

## Carregamento da mini-dimensão BAIRRO
- Nesta etapa, todos os registros do warehouse são combinados com os novos registros em uma operação de *FULL JOIN*
- Então, para cada atributo, é escolhido o primeiro valor não nulo, iniciando pelos novos registros
- Finalmente, os dados são carregados em uma tabela *buffer*, a tabela original é removida, e a tabela buffer é renomeada como definitiva

In [9]:
final_bairro = spark.sql('select NVL(a.id, b.id) as id, NVL(a.codigo, b.codigo) as codigo, \
    NVL(a.nome, b.nome) as nome, NVL(a.area, b.area) as area, \
    NVL(a.populacao, b.populacao) as populacao, NVL(a.dens_demo, b.dens_demo) as dens_demo, \
    NVL(a.agencias, b.agencias) as agencias, NVL(a.empresas, b.empresas) as empresas, \
    NVL(a.empregados, b.empregados) as empregados, NVL(a.renda, b.renda) as renda, \
    NVL(a.faturamento, b.faturamento) as faturamento, \
    NVL(a.start_date, b.start_date) as start_date, NVL(a.end_date, b.end_date) as end_date \
    FROM new_bairro as a FULL join bairro as b ON a.id=b.id')
final_bairro.registerTempTable('final_bairro')
print('Contagem de registros:', final_bairro.count())

spark.sql('create table bairro_buff like bairro')
spark.sql('insert into table bairro_buff SELECT * from final_bairro')
spark.sql('drop table bairro')
spark.sql('alter table bairro_buff RENAME TO bairro')
spark.sql('select * from bairro limit 5').show()

Contagem de registros: 106
+----------+----------+--------------------+--------+---------+---------+--------+--------+----------+-----+-----------+-------------------+--------+
|        id|    codigo|                nome|    area|populacao|dens_demo|agencias|empresas|empregados|renda|faturamento|         start_date|end_date|
+----------+----------+--------------------+--------+---------+---------+--------+--------+----------+-----+-----------+-------------------+--------+
| 139131066|3552403003|Ar3 - Administraç...| 11.3204|    41304|3648.6343|       1|     177|       234|    B|  1661520.0|2019-03-11 22:16:42|    null|
| 305391958| 355620112|           Indústria| 19.6054|     9315| 475.1242|       4|     191|       476|    C|   244775.0|2019-03-11 22:16:42|    null|
| 478593782|3536505015|          Bela Vista|0.924061|      667|721.81384|       4|     548|      3687|    E|     2177.0|2019-03-11 22:16:42|    null|
| 796954586|  35095076|   PUCCamp (Roseira)| 15.6622|   112644|7192.0933|

## Verificação de alterações dos registros da dimensão CONCORRENTE
- Neste etapa, os registros comuns existentes no warehouse são obtidos e verificados quanto à atualização de atributos considerados de tipo 2

In [10]:
spark.sql('create TEMPORARY view old_conc AS \
    select a.*, IF((isnull(a.faixa_preco) or a.faixa_preco=b.faixa_preco) and (isnull(a.bairro_id) or \
    a.bairro_id=b.bairro_id), 0, 1) as has_changed \
    FROM concorrente as a \
    INNER join raw_conc as b ON a.codigo = b.codigo and isnull(a.end_date)')

DataFrame[]

## Preparação dos dados para inserção (CONCORRENTE)
- Nesta etapa, os registros existentes no banco de dados expirados são atualizados com o preenchimento do atributo *end_date*
- Adicionalmente, são atribuídas as chaves SK à todos os registros novos e os que foram atualizados, por meio do uso de uma função hash combinando a chave natural e a hora corrente
- Finalmente, os registros de ambos subsets são combinados

In [11]:
#spark.sql('truncate table bairro')
expired_conc = spark.sql('select id, codigo, nome, categoria, faixa_preco, endereco, municipio, uf, \
    bairro_id, start_date, from_unixtime(unix_timestamp()) as end_date \
    from old_conc WHERE has_changed=1')

new_conc = spark.sql('select IF(NVL(b.has_changed, 1)==1,hash(a.codigo, unix_timestamp()),b.id) as id, \
    a.codigo, a.nome, a.categoria, a.faixa_preco, a.endereco, a.municipio, a.uf, a.bairro_id, \
    IF(NVL(b.has_changed, 1)==1, from_unixtime(unix_timestamp()), b.start_date) as start_date, \
    null as end_date FROM raw_conc as a LEFT join old_conc as b ON a.codigo=b.codigo')

new_conc = new_conc.union(expired_conc)
print('Contagem de registros:', new_conc.count())
new_conc.show(5)
new_conc.registerTempTable('new_conc')

Contagem de registros: 4202
+----------+---------------+--------------------+--------------------+-----------+--------------------+-----------+---+----------+-------------------+--------+
|        id|         codigo|                nome|           categoria|faixa_preco|            endereco|  municipio| uf| bairro_id|         start_date|end_date|
+----------+---------------+--------------------+--------------------+-----------+--------------------+-----------+---+----------+-------------------+--------+
|-102351746|108804285864355|Restaurante Tratt...|   Buffet Restaurant|          3|Rua Camélias 317,...|   Holambra| SP|      null|2019-03-11 22:16:57|    null|
|1264435566|133298243462499|Sorveteria Mr. Fr...|Ice Cream Shop, F...|          2|Rua Nove de Julho...|    Vinhedo| SP| 355670116|2019-03-11 22:16:57|    null|
|-222226030|176326925911652|Churrascaria Da F...|Cafeteria, Barbec...|          2|Rodovia do Açúcar...|      Salto| SP|      null|2019-03-11 22:16:57|    null|
|-485482073|

## Carregamento da dimensão CONCORRENTE
- Nesta etapa, todos os registros do warehouse são combinados com os novos registros em uma operação de *FULL JOIN*
- Então, para cada atributo, é escolhido o primeiro valor não nulo, iniciando pelos novos registros
- Finalmente, os dados são carregados em uma tabela *buffer*, a tabela original é removida, e a tabela buffer é renomeada como definitiva

In [12]:
final_conc = spark.sql('select NVL(a.id, b.id) as id, NVL(a.codigo, b.codigo) as codigo, \
    NVL(a.nome, b.nome) as nome, NVL(a.categoria, b.categoria) as categoria, \
    NVL(a.faixa_preco, b.faixa_preco) as faixa_preco, \
    NVL(a.endereco, b.endereco) as endereco, NVL(a.municipio, b.municipio) as municipio, \
    NVL(a.uf, b.uf) as uf, NVL(a.bairro_id, b.bairro_id) as bairro_id, \
    NVL(a.start_date, b.start_date) as start_date, NVL(a.end_date, b.end_date) as end_date \
    FROM new_conc as a FULL join concorrente as b ON a.id=b.id')
final_conc.registerTempTable('final_conc')
print('Contagem de registros:', final_conc.count())

spark.sql('create table conc_buff like concorrente')
spark.sql('insert into table conc_buff SELECT * from final_conc')
spark.sql('drop table concorrente')
spark.sql('alter table conc_buff RENAME TO concorrente')
spark.sql('select * from concorrente limit 5').show()

Contagem de registros: 4202
+-----------+----------------+--------------------+--------------------+-----------+--------------------+--------------------+---+---------+-------------------+--------+
|         id|          codigo|                nome|           categoria|faixa_preco|            endereco|           municipio| uf|bairro_id|         start_date|end_date|
+-----------+----------------+--------------------+--------------------+-----------+--------------------+--------------------+---+---------+-------------------+--------+
|-2111193495|1504385383178890|       Espaço Akalum|Caterer, Event Pl...|          0|Avenida Brasil , ...|             Vinhedo| SP|355670116|2019-03-11 22:17:02|    null|
|-2017896669| 351811404939666|     Gelateria Conti|Ice Cream Shop, R...|          3|Rua Capitão Jose ...| Monte Alegre do Sul| SP|     null|2019-03-11 22:17:02|    null|
|-1720757415| 153896648098901|     Sabor & Encanto|     Food & Beverage|          0|Rua Gustavo Beck ...|      Artur Nogue