## Configuração

In [1]:
from pyspark.sql.types import *
import pyspark.sql.functions as fn
from pyspark.sql import SparkSession
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

spark = (SparkSession.builder
         .config("spark.jars","""/home/jovyan/jars/aws-java-sdk-core-1.11.534.jar,
                                 /home/jovyan/jars/aws-java-sdk-dynamodb-1.11.534.jar,
                                 /home/jovyan/jars/aws-java-sdk-s3-1.11.534.jar,
                                 /home/jovyan/jars/hadoop-aws-3.2.2.jar""")
         .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
         .config("spark.hadoop.fs.s3a.access.key", "aulafia")
         .config("spark.hadoop.fs.s3a.secret.key", "aulafia@123")
         .config("spark.hadoop.fs.s3a.path.style.access", True)
         .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
         .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
         .getOrCreate()
        )

# ETL - Raw

## Lendo Camada Landing

In [12]:
path = 's3a://landing/'

df_landing = spark.read.format('csv').option('header', 'true').load(path)

df_landing.show(10, False)

+----------+----------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Refdate   |Link_Apto                                                                                                                   |Endereco                                                                                                                                                                                                 |Bairro       |Valor       |Informacoes                                                                                    

In [13]:
print('Lendo todas as datas com dados armazenados no Minio\n')

df_landing.select('Refdate').distinct().show()

Lendo todas as datas com dados armazenados no Minio

+----------+
|   Refdate|
+----------+
|17/06/2023|
|02/05/2023|
|03/06/2023|
|17/04/2023|
|17/05/2023|
|02/07/2023|
+----------+



In [14]:
df_landing.printSchema()

root
 |-- Refdate: string (nullable = true)
 |-- Link_Apto: string (nullable = true)
 |-- Endereco: string (nullable = true)
 |-- Bairro: string (nullable = true)
 |-- Valor: string (nullable = true)
 |-- Informacoes: string (nullable = true)



In [15]:
df_raw = (df_landing
          .withColumn('Refdate', fn.to_date(fn.col('Refdate'), 'dd/MM/yyyy')))

df_raw.show(10, False)

+----------+----------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Refdate   |Link_Apto                                                                                                                   |Endereco                                                                                                                                                                                                 |Bairro       |Valor       |Informacoes                                                                                    

In [16]:
df_raw.printSchema()

root
 |-- Refdate: date (nullable = true)
 |-- Link_Apto: string (nullable = true)
 |-- Endereco: string (nullable = true)
 |-- Bairro: string (nullable = true)
 |-- Valor: string (nullable = true)
 |-- Informacoes: string (nullable = true)



## Salvando Camada Raw

In [9]:
(df_raw
 .write
 .format('parquet')
 .mode('overwrite')
 .partitionBy('Refdate')
 .save('s3a://raw/loft/'))

In [18]:
df_raw.show(10, False)

+----------+----------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Refdate   |Link_Apto                                                                                                                   |Endereco                                                                                                                                                                                                 |Bairro       |Valor       |Informacoes                                                                                    

# ETL - Context

## Lendo Camada Raw

In [45]:
path = 's3a://raw/data'

df_raw = spark.read.format('parquet').load(path)

df_raw.show(10, False)

+--------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+
|Link_Apto                                                                                                           |Endereco                                                                                                                                                                         |Bairro       |Valor       |Informacoes                                                                                                                                                               |

## Funções Auxiliares - Regex

In [46]:
import re

In [49]:
# Ajustando a Coluna Valor -> Retirando o R$ e a separação de .
def valor(str):
    return (str
            .replace("R$ ", "")
            .replace(".", "")
           )

# Função "Explode" na coluna Endereço
def endereco(str):
    padroes = [
        r'Cond[a-z].+SP',
        r'Ru[a-z].+SP',
        r'Av[a-z].+SP',
        r'Praç[a-z].+SP',
        r'Alamed[a-z].+SP',
        r'Travess[a-z].+SP',
        r'Parqu[a-z].+SP',
        r'Viel[a-z].+SP', 
        r'Vil[a-z].+SP', 
        r'Lar[a-z].+SP',
        r'Estr[a-z].+SP', 
        r'Jar[a-z].+SP', 
        r'Viad[a-z].+SP'
    ]

    for padrao in padroes:
        resultado = re.findall(padrao, str.replace('Condomínio à venda', ''))
        if resultado:
            return (resultado[0]
                    .replace("', ' ", "-")
                    .replace('-', ' - ')
                    .replace('" ', '')
                    .replace('"', '')
                    .replace("' ", '')
                    .replace(", ", " - ")
                   )

    # Se nenhum padrão foi encontrado, retorna None
    return None

# Bairro
def bairro(str):  
    return re.sub(r'(?<!^)(?=[A-Z])', ' ', str)

## Explode Infos
# Andares
def andares(str):
    # Andares
    regex = r'[0-9]º\s[a-z]+\s[a-z]+\s[a-z]+'
    request = re.findall(regex, str)
    if request:
        return request[0]

    regex = r'Entre\s[0-9]º\sa\s[0-9]º\s\w+'
    request = re.findall(regex, str)
    if request:
        return request[0]

    floor_regex = r'[0-9]º\sandar'
    request = re.findall(floor_regex, str)
    if request:
        return request[0]

    return 'Sem info'

# Tamanho
def tamanho(str):
    regex = r'[0-9]+\sm²'
    request = re.findall(regex, str)
    if request:
        return int(request[0].split(' m²')[0])

    return 0 

#Preco
def preco(str): 
    regex = r'R\$\s[0-9]+\.[0-9]+\s\/m²|R\$\s[0-9]+\s\/m²'
    request = re.findall(regex, str)
    if request:
        return int(request[0]
                   .split('R$ ')[1]
                   .split(' /m²')[0]
                   .replace('.', ''))

    return 0 

# Quartos
# Considerando que Studio tem 1 quarto
def quartos(str):
    regex = r'[0-9]\squarto[s]?'
    request = re.findall(regex, str)
    if request:
        return (request[0]
                .split('qua')[0])

    regex = r'studio'
    request = re.findall(regex, str)
    if request:
        return '1'

    return 'Sem info'

# Suites
def suite(str):
    regex = r'[0-9]\ssuíte'
    request = re.findall(regex, str)
    if request: 
        return int(request[0]
                   .split(' su')[0])

    regex = r'sem\ssuíte'
    request = re.findall(regex, str)
    if request: 
        return 0
    
    return 0 

# Garagem
def garagem(str):
    regex = r'Não\s\-\sVag\w+'
    request = re.findall(regex, str)
    if request:
        return 0 
    
    regex = r'[0-9]\svag\w+'
    request = re.findall(regex, str)
    if request: 
        return int(request[0]
                   .split(' va')[0])
    
    return 0 
    
# Banheiros
def banheiros(str):
    regex = r'[0-9]\sBan\w+'
    request = re.findall(regex, str)
    if request: 
        return int(request[0]
                   .split(' Ba')[0])
    
    return 0 

# Varanda
def varanda(str):
    regex = r'[0-9]\sVar\w+s'
    request = re.findall(regex, str)
    if request:
        return int(request[0]
                   .split(' Va')[0])

    regex = r'Var\w+a'
    request = re.findall(regex, str)
    if request:
        return 1
    
    return 0 

# Mobiliado
def mobiliado(str):
    regex = r'Não\s\-\sMob\w+'
    request = re.findall(regex, str)
    if request:
        return 'Não'
    
    regex = r'Mob\w+'
    request = re.findall(regex, str)
    if request:
        return 'Sim'
    
# Portaria
def portaria(str):
    regex = r'Por\w+'
    request = re.findall(regex, str)
    if request: 
        if (request[0] == 'Portaria24h'): 
            return '24h'
        
        elif (request[0] == 'Portariaparcial'): 
            return 'Parcial'
    
    return 'Sem descrição'

# Metro
def metro(str):
    regex = r'Metrô'
    request = re.findall(regex, str)
    if request: 
        regex = r'a\s[0-9]+m'
        request = re.findall(regex, str)
        return int(request[0]
                   .replace('a ', '')
                   .replace('m', ''))

    else: 
        return 100000000

## Aplicando Regex

In [50]:
valor_UDF     = fn.udf(lambda z: valor(z))
endereco_UDF  = fn.udf(lambda z: endereco(z))
bairro_UDF    = fn.udf(lambda z: bairro(z))
andares_UDF   = fn.udf(lambda z: andares(z))
tamanho_UDF   = fn.udf(lambda z: tamanho(z))
preco_UDF     = fn.udf(lambda z: preco(z))
quartos_UDF   = fn.udf(lambda z: quartos(z))
suite_UDF     = fn.udf(lambda z: suite(z))
garagem_UDF   = fn.udf(lambda z: garagem(z))
banheiros_UDF = fn.udf(lambda z: banheiros(z))
varanda_UDF   = fn.udf(lambda z: varanda(z))
mobiliado_UDF = fn.udf(lambda z: mobiliado(z))
portaria_UDF  = fn.udf(lambda z: portaria(z))
metro_UDF     = fn.udf(lambda z: metro(z))

In [51]:
df_context = (df_raw
              .withColumn('Valor', valor_UDF(fn.col('Valor')))
              .withColumn('Endereco', endereco_UDF(fn.col('Endereco')))
              .withColumn('Bairro', bairro_UDF(fn.col('Bairro')))
              .withColumn('Andares', andares_UDF(fn.col('Informacoes')))
              .withColumn('Tamanho_m2', tamanho_UDF(fn.col('Informacoes')))
              .withColumn('Valor_RS/m2', preco_UDF(fn.col('Informacoes')))
              .withColumn('Quartos', quartos_UDF(fn.col('Informacoes')))
              .withColumn('Suites', suite_UDF(fn.col('Informacoes')))
              .withColumn('Garagem', garagem_UDF(fn.col('Informacoes')))
              .withColumn('Banheiros', banheiros_UDF(fn.col('Informacoes')))
              .withColumn('Varanda', varanda_UDF(fn.col('Informacoes')))
              .withColumn('Mobiliado', mobiliado_UDF(fn.col('Informacoes')))
              .withColumn('Portaria', portaria_UDF(fn.col('Informacoes')))
              .withColumn('Metro', metro_UDF(fn.col('Informacoes')))
             )

df_context = (df_context
              .drop(fn.col('Informacoes'))
              .withColumnRenamed("Valor", "Valor_RS")
             )

df_context = (df_context
              .select('Link_Apto', 'Endereco', 'Bairro', fn.col('Valor_RS').cast('integer'), 'Andares', 
                      fn.col('Tamanho_m2').cast('integer'), fn.col('Valor_RS/m2').cast('integer'), fn.col('Quartos').cast('integer'), 
                      fn.col('Suites').cast('integer'), fn.col('Garagem').cast('integer'), fn.col('Banheiros').cast('integer'), 
                      fn.col('Varanda').cast('integer'), 'Mobiliado', 'Portaria', fn.col('Metro').cast('integer'), 'Refdate'
                     )
             )

# Elimando "sujeira" da Raw
df_context = (df_context.filter(fn.col('Valor_RS').isNotNull()))

df_context.show(10, False)

+--------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------+--------------+--------+------------------+----------+-----------+-------+------+-------+---------+-------+---------+-------------+---------+----------+
|Link_Apto                                                                                                           |Endereco                                                                                  |Bairro        |Valor_RS|Andares           |Tamanho_m2|Valor_RS/m2|Quartos|Suites|Garagem|Banheiros|Varanda|Mobiliado|Portaria     |Metro    |Refdate   |
+--------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------+--------------+--------+------------------+----------+-----------+-

## Salvando Camada Context

In [29]:
(df_context
 .write
 .format('parquet')
 .mode('overwrite')
 .partitionBy('Refdate')
 .save('s3a://context/loft/'))

# ETL - Trust

### Lendo a Camada Context

In [3]:
path = 's3a://context/data'

df_context = spark.read.format('parquet').load(path)

df_context.show(10, False)

+--------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------+--------------+--------+------------------+----------+-----------+-------+------+-------+---------+-------+---------+-------------+---------+----------+
|Link_Apto                                                                                                           |Endereco                                                                                  |Bairro        |Valor_RS|Andares           |Tamanho_m2|Valor_RS/m2|Quartos|Suites|Garagem|Banheiros|Varanda|Mobiliado|Portaria     |Metro    |Refdate   |
+--------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------+--------------+--------+------------------+----------+-----------+-

### Tratamento 

In [4]:
df_trust = (df_context
            .orderBy("Refdate", ascending=False)
            .dropDuplicates(["Link_Apto", "Endereco", "Valor_RS"])
            .na.drop()
            .orderBy("Refdate", ascending=False)
            .withColumn("id", fn.monotonically_increasing_id())
           )

df_trust = (df_trust
            .select("id", "Refdate", "Link_Apto", "Endereco", "Bairro", "Valor_RS", 
                    "Andares", "Tamanho_m2", "Valor_RS/m2", "Quartos", "Suites", 
                    "Garagem", "Banheiros", "Varanda", "Mobiliado", "Portaria", 
                    "Metro")
           )

df_trust.show(10, False)

+---+----------+---------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+-----------------+--------+------------------+----------+-----------+-------+------+-------+---------+-------+---------+-------------+---------+
|id |Refdate   |Link_Apto                                                                                                      |Endereco                                                                                           |Bairro           |Valor_RS|Andares           |Tamanho_m2|Valor_RS/m2|Quartos|Suites|Garagem|Banheiros|Varanda|Mobiliado|Portaria     |Metro    |
+---+----------+---------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+-----------------+--------

### Salvando Camada Trust

In [5]:
(df_trust
 .write
 .format('parquet')
 .mode('overwrite')
 .partitionBy('Refdate')
 .save('s3a://trust/particionado')
)

In [41]:
(df_trust
 .write
 .format('parquet')
 .mode('overwrite')
 .save('s3a://trust/unificado')
)

### Lendo Camada Trust

In [6]:
path = 's3a://trust/particionado'

df_trust = spark.read.format('parquet').load(path)

df_trust.show(10, False)

+---+---------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+-----------------+--------+------------------+----------+-----------+-------+------+-------+---------+-------+---------+-------------+---------+----------+
|id |Link_Apto                                                                                                      |Endereco                                                                                           |Bairro           |Valor_RS|Andares           |Tamanho_m2|Valor_RS/m2|Quartos|Suites|Garagem|Banheiros|Varanda|Mobiliado|Portaria     |Metro    |Refdate   |
+---+---------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+-----------------+--------+----------

# Fazendo algumas análises

In [2]:
path = 's3a://trust/particionado'

df_trust = spark.read.format('parquet').load(path)

df_trust.show(10, False)

+---+---------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+-----------------+--------+------------------+----------+-----------+-------+------+-------+---------+-------+---------+-------------+---------+----------+
|id |Link_Apto                                                                                                      |Endereco                                                                                           |Bairro           |Valor_RS|Andares           |Tamanho_m2|Valor_RS/m2|Quartos|Suites|Garagem|Banheiros|Varanda|Mobiliado|Portaria     |Metro    |Refdate   |
+---+---------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+-----------------+--------+----------

In [3]:
df_trust.printSchema()

root
 |-- id: long (nullable = true)
 |-- Link_Apto: string (nullable = true)
 |-- Endereco: string (nullable = true)
 |-- Bairro: string (nullable = true)
 |-- Valor_RS: integer (nullable = true)
 |-- Andares: string (nullable = true)
 |-- Tamanho_m2: integer (nullable = true)
 |-- Valor_RS/m2: integer (nullable = true)
 |-- Quartos: integer (nullable = true)
 |-- Suites: integer (nullable = true)
 |-- Garagem: integer (nullable = true)
 |-- Banheiros: integer (nullable = true)
 |-- Varanda: integer (nullable = true)
 |-- Mobiliado: string (nullable = true)
 |-- Portaria: string (nullable = true)
 |-- Metro: integer (nullable = true)
 |-- Refdate: date (nullable = true)



In [9]:
df_trust.filter(fn.col("Endereco").like("%à venda%")).show(10, False)

+---+---------+--------+------+--------+-------+----------+-----------+-------+------+-------+---------+-------+---------+--------+-----+-------+
|id |Link_Apto|Endereco|Bairro|Valor_RS|Andares|Tamanho_m2|Valor_RS/m2|Quartos|Suites|Garagem|Banheiros|Varanda|Mobiliado|Portaria|Metro|Refdate|
+---+---------+--------+------+--------+-------+----------+-----------+-------+------+-------+---------+-------+---------+--------+-----+-------+
+---+---------+--------+------+--------+-------+----------+-----------+-------+------+-------+---------+-------+---------+--------+-----+-------+



In [10]:
(df_trust
 .groupBy('Bairro')
 .mean('Valor_RS')
 .select('Bairro', fn.round('avg(Valor_RS)', 2).alias('Media_Valor_Bairro'))
 .orderBy('Media_Valor_Bairro', ascending=False)
).show(10, False)

# (df_context
#  .groupBy('Bairro')
#  .mean('Valor_RS')
#  .select('Bairro', fn.col('avg(Valor_RS)').alias('Valor_RS'))
#  .orderBy('Valor_RS', ascending=True)
#  .count()
# )#.show(1000, False)

+-------------------+------------------+
|Bairro             |Media_Valor_Bairro|
+-------------------+------------------+
|Jardim Europa      |5685555.56        |
|Vila Nova Conceição|5014809.52        |
|Altode Pinheiros   |3758200.0         |
|Sumaré             |2738583.24        |
|Jardim Paulistano  |2541073.28        |
|Higienópolis       |2423269.19        |
|Jardim América     |2392785.68        |
|Itaim Bibi         |2062979.21        |
|Moema Pássaros     |1954678.92        |
|Paraíso            |1935208.29        |
+-------------------+------------------+
only showing top 10 rows



In [11]:
(df_trust
 .groupBy('Bairro')
 .max('Valor_RS')
 .select('Bairro', fn.col('max(Valor_RS)').alias('Max_Valor_Bairro'))
 .orderBy('Max_Valor_Bairro', ascending=False)
).show(10, False)

+-------------------+----------------+
|Bairro             |Max_Valor_Bairro|
+-------------------+----------------+
|Vila Matilde       |50000000        |
|Vila Nova Conceição|31990000        |
|Jardim Europa      |23000000        |
|Jardim América     |19150000        |
|Altode Pinheiros   |18937000        |
|Vila Andrade       |18500000        |
|Jardim Paulistano  |17800000        |
|Sumaré             |15000000        |
|Itaim Bibi         |14500000        |
|Vila Olímpia       |14000000        |
+-------------------+----------------+
only showing top 10 rows



In [25]:
df_trust.show(10, False)

+---+---------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+-----------------+--------+------------------+----------+-----------+-------+------+-------+---------+-------+---------+-------------+---------+----------+
|id |Link_Apto                                                                                                      |Endereco                                                                                           |Bairro           |Valor_RS|Andares           |Tamanho_m2|Valor_RS/m2|Quartos|Suites|Garagem|Banheiros|Varanda|Mobiliado|Portaria     |Metro    |Refdate   |
+---+---------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+-----------------+--------+----------

In [None]:
df.createOrReplaceTempView("imoveis")

spark.sql('''SELECT 
             distinct(Bairro), 
             round(avg(Valor_RS) over (partition by Bairro), 2) as avg_bairro
             FROM imoveis''').show(10, False)

In [45]:
spark.sql('''SELECT 
             distinct(Bairro), 
             Valor_RS, 
             round(avg(Valor_RS) over (partition by Bairro), 2) as avg_bairro,
             CASE
                 WHEN (Valor_RS - (round(avg(Valor_RS) over (partition by Bairro), 2))) > 0 THEN 'Caro'
                 ELSE 'Barato'
             END AS Caro_Barato
             FROM imoveis''').show(10, False)

+---------+--------+----------+-----------+
|Bairro   |Valor_RS|avg_bairro|Caro_Barato|
+---------+--------+----------+-----------+
|Aclimação|432000  |1307484.76|Barato     |
|Aclimação|405000  |1307484.76|Barato     |
|Aclimação|740000  |1307484.76|Barato     |
|Aclimação|1400000 |1307484.76|Caro       |
|Aclimação|1280000 |1307484.76|Barato     |
|Aclimação|621000  |1307484.76|Barato     |
|Aclimação|1472000 |1307484.76|Caro       |
|Aclimação|380000  |1307484.76|Barato     |
|Aclimação|330000  |1307484.76|Barato     |
|Aclimação|460000  |1307484.76|Barato     |
+---------+--------+----------+-----------+
only showing top 10 rows

