# Importações

In [94]:
from pyspark.sql import SparkSession, Row
from delta import configure_spark_with_delta_pip
from pyspark.sql.functions import col, trim
from pathlib import Path
from geopy.geocoders import Nominatim
import os, time

# Analise exploratória e tratamentos para camada silver

In [2]:
src_path = os.path.expanduser('~/Documents/PySpark-Projects/Sales-project/data/{stage}')

## Criando a sessão spark

In [3]:
builder = SparkSession.builder \
    .appName("etl-app") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

25/05/19 16:03:57 WARN Utils: Your hostname, gabriel-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.16.128 instead (on interface ens33)
25/05/19 16:03:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/gabriel/Documents/PySpark-Projects/Sales-project/.venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/gabriel/.ivy2/cache
The jars for the packages stored in: /home/gabriel/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e751f44f-c931-4cad-a6a6-54297739db09;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.3.1 in central
	found io.delta#delta-storage;3.3.1 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 2588ms :: artifacts dl 29ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.3.1 from central in [default]
	io.delta#delta-storage;3.3.1 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   | 

Versão do spark usada

In [4]:
print(spark.version)

3.5.5


In [5]:
print(spark.sparkContext)

<SparkContext master=local[*] appName=etl-app>


## Analise de customers

### Lendo o arquivo

In [6]:
df_customers = spark.read.csv(os.path.join(src_path.format(stage='bronze'), 'olist_customers_dataset.csv'), header=True, sep=",", inferSchema=True)
df_customers.show(10)

                                                                                

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
|879864dab9bc30475...|4c93744516667ad3b...|                   89254|      jaragua do sul|            SC|
|fd826e7cf63160e53...|addec96d2e059c80c...|            

Verificando a existencia de dados inconsistentes (nulos, vazios e etc...) em colunas principais como customer_id e customer_unique_id

In [7]:
df_customers.filter(
    col('customer_id').isNull() | 
    (trim(col('customer_id')) == '')
).show()

[Stage 4:>                                                          (0 + 1) / 1]

+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
+-----------+------------------+------------------------+-------------+--------------+



                                                                                

In [8]:
df_customers.filter(
    col('customer_unique_id').isNull() |
    (trim(col('customer_unique_id')) == '')
).show()

                                                                                

+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
+-----------+------------------+------------------------+-------------+--------------+



                                                                                

Decisão: As colunas principais não contem dados nulos, nenhuma ação deve ser tomada

Verificando a tipagem das colunas (Schema)

In [9]:
df_customers.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



Decisão: A tipagem dos dados parece adequada

Analisando dataset

In [10]:
df_customers.show()

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
|879864dab9bc30475...|4c93744516667ad3b...|                   89254|      jaragua do sul|            SC|
|fd826e7cf63160e53...|addec96d2e059c80c...|            

Analise de dados duplicados

In [11]:
df_customers.groupBy('customer_id')\
    .count()\
        .filter(col('count')>1)\
            .show()

[Stage 12:>                                                         (0 + 1) / 1]

+-----------+-----+
|customer_id|count|
+-----------+-----+
+-----------+-----+



                                                                                

In [12]:
df_customers.groupBy('customer_unique_id')\
    .count()\
        .filter(col('count')>1)\
            .show(truncate=False)



+--------------------------------+-----+
|customer_unique_id              |count|
+--------------------------------+-----+
|4b384b778ebc0449d0244902bfce7beb|2    |
|c85df1c6ef6f7bb605e1c49d34f172ee|2    |
|5b71ca52f13f79c2be72b2aad8755c0a|2    |
|cbe71e4cb0b82b97bbec5cd51e0fa3ea|2    |
|f31d2c22ddcad145e224e6dc48f349b9|2    |
|4011b7579e894fa929d2de97b4928f41|2    |
|7e7301841ddb4064c2f3a31e4c154932|2    |
|7b0eaf68a16e4808e5388c67345033c9|2    |
|5a2e847dd085d36e3ba8916b75e794ed|2    |
|1d2435aa3b858d45c707c9fc25e18779|2    |
|f00e99f043c33da533ebc107f5a68c4e|2    |
|bce006e903be688f122a39f45b210090|2    |
|26e025af2347c3968f6a578f853a9da2|2    |
|4e3b6c25502ef69e5c5e54616bf67ed7|2    |
|a35e6ad969a4298593c504f69c03efb4|2    |
|4702ba5faa8283e0f6b6a545cdaf8a9f|2    |
|7e4bebe20140a71b34263a659ba1ce11|2    |
|98a8081d6f922b47f0b48d0b10665047|2    |
|453c2895f29b6d9a47f8cb48e00b024b|2    |
|216ab90e27f18940cfac8f9f02fb4239|2    |
+--------------------------------+-----+
only showing top

                                                                                

In [13]:
df_customers.filter(col('customer_unique_id')=='4b384b778ebc0449d0244902bfce7beb').show(truncate=False)

                                                                                

+--------------------------------+--------------------------------+------------------------+--------------+--------------+
|customer_id                     |customer_unique_id              |customer_zip_code_prefix|customer_city |customer_state|
+--------------------------------+--------------------------------+------------------------+--------------+--------------+
|ce990e6331cacfbe30d267a2f847276a|4b384b778ebc0449d0244902bfce7beb|30170                   |belo horizonte|MG            |
|da81a00964085d84df193ee32dc97d20|4b384b778ebc0449d0244902bfce7beb|30170                   |belo horizonte|MG            |
+--------------------------------+--------------------------------+------------------------+--------------+--------------+



Decisão: Avaliando a estrutura recebida, identificou-se que a coluna customer_id relaciona-se com o arquivo de ordens - olist_orders_dataset.csv e que podemos gerar uma dimensão de clientes 1 x m com as clunas customer_unique_id, customer_zip_code_prefix, customer_xity, customer_state

### Separando um dataset 1 x m de customers

In [14]:
df_customers_unique = df_customers.select('customer_unique_id', 'customer_zip_code_prefix', 'customer_city', 'customer_state').distinct()
df_customers_unique.show(truncate=True)

[Stage 20:>                                                         (0 + 1) / 1]

+--------------------+------------------------+--------------------+--------------+
|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+------------------------+--------------------+--------------+
|a5844ba4bfc8d0cc6...|                    8225|           sao paulo|            SP|
|fa7e6f153b7724fda...|                   25850|      paraiba do sul|            RJ|
|ca87392503c5a503f...|                   78580|       alta floresta|            MT|
|c1a56ed1e45dde683...|                   27913|               macae|            RJ|
|f48a9bdc7d38b43bb...|                   36080|        juiz de fora|            MG|
|9e1a43ff4b344d862...|                   23059|      rio de janeiro|            RJ|
|3f9d987156fddc08e...|                   13840|          mogi-guacu|            SP|
|76967eb50da869a31...|                    2410|           sao paulo|            SP|
|728bcc9495f951828...|                    4295|           sao paulo|        

                                                                                

### Salvando arquivos na silver

Salvando df Dm_customers_order

In [15]:
Path(os.path.join(src_path.format(stage='silver'), 'olit_dm_customers_order')).mkdir(parents=True, exist_ok=True)
df_customers = df_customers.select('customer_id', 'customer_unique_id')
df_customers.write.mode('overwrite')\
    .format('delta')\
        .option("compression", "snappy")\
            .save(os.path.join(src_path.format(stage='silver'), 'olit_dm_customers_order'))

25/05/19 16:05:40 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [16]:
Path(os.path.join(src_path.format(stage='silver'), 'olit_dm_customers')).mkdir(parents=True, exist_ok=True)
df_customers_unique.write.mode('overwrite')\
    .format('delta')\
    .option("compression", "snappy")\
        .save(os.path.join(src_path.format(stage='silver'), 'olit_dm_customers'))

                                                                                

# Analise geolocation dataset

In [17]:
df_geolocation = spark.read.csv(os.path.join(src_path.format(stage='bronze'), 'olist_geolocation_dataset.csv'), header=True, sep=",", inferSchema=True)
df_geolocation.show()

                                                                                

+---------------------------+-------------------+-------------------+----------------+-----------------+
|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+-------------------+-------------------+----------------+-----------------+
|                       1037| -23.54562128115268| -46.63929204800168|       sao paulo|               SP|
|                       1046|-23.546081127035535| -46.64482029837157|       sao paulo|               SP|
|                       1046| -23.54612896641469| -46.64295148361138|       sao paulo|               SP|
|                       1041|  -23.5443921648681| -46.63949930627844|       sao paulo|               SP|
|                       1035|-23.541577961711493| -46.64160722329613|       sao paulo|               SP|
|                       1012|-23.547762303364266| -46.63536053788448|       são paulo|               SP|
|                       1047|-23.546273112412678| -46.6

Vamos se há algum padrão que devemos seguir

In [18]:
df_geolocation.groupBy(df_geolocation.columns).count().filter(col('count') > 1).show(truncate=True)

[Stage 52:>                                                         (0 + 1) / 1]

+---------------------------+-------------------+-------------------+----------------+-----------------+-----+
|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|geolocation_city|geolocation_state|count|
+---------------------------+-------------------+-------------------+----------------+-----------------+-----+
|                       1026| -23.53925652899767|-46.633440525535235|       sao paulo|               SP|    2|
|                       1106|-23.531062064948905|-46.629505278925556|       sao paulo|               SP|    2|
|                       1241|-23.545792634196992|  -46.6567234730798|       sao paulo|               SP|    3|
|                       1226|-23.538190850683794|-46.651323227306854|       são paulo|               SP|    4|
|                       1223|-23.543735510964265| -46.65415427699233|       sao paulo|               SP|    2|
|                       1303|-23.551504721622702| -46.64945164585375|       sao paulo|               SP|    3|
|

                                                                                

In [19]:
df_geolocation.filter(col('geolocation_zip_code_prefix') == '1026').show(truncate=True)

+---------------------------+-------------------+-------------------+----------------+-----------------+
|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+-------------------+-------------------+----------------+-----------------+
|                       1026|-23.539494799586286|-46.632843914149746|       são paulo|               SP|
|                       1026| -23.53984025062037|  -46.6302639919499|       sao paulo|               SP|
|                       1026| -23.53984025062037|  -46.6302639919499|       sao paulo|               SP|
|                       1026|-23.539439503750902| -46.63306386663973|       sao paulo|               SP|
|                       1026|-23.539167607276784| -46.63524162031853|       sao paulo|               SP|
|                       1026|-23.539167607276784| -46.63524162031853|       sao paulo|               SP|
|                       1026|-23.538895356079482| -46.6

Ha dados duplicados e não há um padrão. Isso acontece devido ao range de de endereços dentro de um mesmo cep (representado pela chave geolocation zip code). Para sequência e nosso objetivo, vamos tornar unicos nossos cep, pois a ideia é ter o range e o cep suprirá nossa necessidade

In [20]:
df_geolocation = df_geolocation.select('geolocation_zip_code_prefix', 'geolocation_lat', 'geolocation_lng', 'geolocation_city', 'geolocation_state')\
    .dropDuplicates(['geolocation_zip_code_prefix'])

In [21]:
df_geolocation.filter(col('geolocation_zip_code_prefix') == '1026').show(truncate=True)



+---------------------------+-------------------+-------------------+----------------+-----------------+
|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+-------------------+-------------------+----------------+-----------------+
|                       1026|-23.539494799586286|-46.632843914149746|       são paulo|               SP|
+---------------------------+-------------------+-------------------+----------------+-----------------+



                                                                                

Agora, vamos verificar se há dados vazios ou nulos

In [22]:
df_geolocation.filter(col('geolocation_zip_code_prefix').isNull()).show()

[Stage 57:>                                                         (0 + 2) / 2]

+---------------------------+---------------+---------------+----------------+-----------------+
|geolocation_zip_code_prefix|geolocation_lat|geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+---------------+---------------+----------------+-----------------+
+---------------------------+---------------+---------------+----------------+-----------------+



                                                                                

In [23]:
df_geolocation.filter(col('geolocation_zip_code_prefix')=='').show()

+---------------------------+---------------+---------------+----------------+-----------------+
|geolocation_zip_code_prefix|geolocation_lat|geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+---------------+---------------+----------------+-----------------+
+---------------------------+---------------+---------------+----------------+-----------------+



Agora antes de salvar, mas aguardar pelo arquivo de sellers que também possui geolocalização para completarmos

# Analise do arquivo Olist Order Items 

In [25]:
df_order_itens = spark.read.csv(os.path.join(src_path.format(stage='bronze'), 'olist_order_items_dataset.csv'), header=True, sep=",", inferSchema=True)
df_order_itens.show(truncate=False, n=10)

                                                                                

+--------------------------------+-------------+--------------------------------+--------------------------------+-------------------+------+-------------+
|order_id                        |order_item_id|product_id                      |seller_id                       |shipping_limit_date|price |freight_value|
+--------------------------------+-------------+--------------------------------+--------------------------------+-------------------+------+-------------+
|00010242fe8c5a6d1ba2dd792cb16214|1            |4244733e06e7ecb4970a6e2683c13e61|48436dade18ac8b2bce089ec2a041202|2017-09-19 09:45:35|58.9  |13.29        |
|00018f77f2f0320c557190d7a144bdd3|1            |e5f2d52b802189ee658865ca93d83a8f|dd7ddc04e1b6c2c614352b383efe2d36|2017-05-03 11:05:13|239.9 |19.93        |
|000229ec398224ef6ca0657da4fc703e|1            |c777355d18b72b67abbeef9df44fd0fd|5b51032eddd242adc84c38acab88f23d|2018-01-18 14:48:30|199.0 |17.87        |
|00024acbcdf0a6daa1e931b038114c75|1            |7634da152a4610f1

In [26]:
df_order_itens.filter(col('order_id')=='00010242fe8c5a6d1ba2dd792cb16214').show(truncate=True)

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+



Entendendo o dataset, vê-se que que a coluna order_id é o numero do pagamento, o order_item_id é o numero da linha da nota, e os damais são produtos, vendedor e entregas.

Vamos verificar se há dados vazios ou nulos em colunas relevantes como a producto, seller e order_id

In [27]:
df_order_itens.filter((col('order_id').isNull()) | (col('order_id')=='')).show()

+--------+-------------+----------+---------+-------------------+-----+-------------+
|order_id|order_item_id|product_id|seller_id|shipping_limit_date|price|freight_value|
+--------+-------------+----------+---------+-------------------+-----+-------------+
+--------+-------------+----------+---------+-------------------+-----+-------------+



In [28]:
df_order_itens.filter((col('product_id').isNull()) | (col('product_id')=='')).show()

+--------+-------------+----------+---------+-------------------+-----+-------------+
|order_id|order_item_id|product_id|seller_id|shipping_limit_date|price|freight_value|
+--------+-------------+----------+---------+-------------------+-----+-------------+
+--------+-------------+----------+---------+-------------------+-----+-------------+



In [29]:
df_order_itens.filter((col('seller_id').isNull()) | (col('seller_id')=='')).show()

+--------+-------------+----------+---------+-------------------+-----+-------------+
|order_id|order_item_id|product_id|seller_id|shipping_limit_date|price|freight_value|
+--------+-------------+----------+---------+-------------------+-----+-------------+
+--------+-------------+----------+---------+-------------------+-----+-------------+



Não foi identificado dados vazios ou nulos, então vamos prosseguir com a tipagem dos dados

In [30]:
df_order_itens.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)



Vamos salvar o parquet

In [31]:
Path(os.path.join(src_path.format(stage='silver'), 'olit_dm_order_itens')).mkdir(parents=True, exist_ok=True)
df_order_itens.write.mode('overwrite')\
    .format('delta')\
        .option("compression", "snappy")\
            .save(os.path.join(src_path.format(stage='silver'), 'olit_dm_order_itens'))

                                                                                

# Analise do arquivo order payments dataset

In [32]:
df_order_payments = spark.read.csv(os.path.join(src_path.format(stage='bronze'), 'olist_order_payments_dataset.csv'), header=True, sep=",", inferSchema=True)
df_order_payments.show(truncate=False, n=10)

+--------------------------------+------------------+------------+--------------------+-------------+
|order_id                        |payment_sequential|payment_type|payment_installments|payment_value|
+--------------------------------+------------------+------------+--------------------+-------------+
|b81ef226f3fe1789b1e8b2acac839d17|1                 |credit_card |8                   |99.33        |
|a9810da82917af2d9aefd1278f1dcfa0|1                 |credit_card |1                   |24.39        |
|25e8ea4e93396b6fa0d3dd708e76c1bd|1                 |credit_card |1                   |65.71        |
|ba78997921bbcdc1373bb41e913ab953|1                 |credit_card |8                   |107.78       |
|42fdf880ba16b47b59251dd489d4441a|1                 |credit_card |2                   |128.45       |
|298fcdf1f73eb413e4d26d01b25bc1cd|1                 |credit_card |2                   |96.12        |
|771ee386b001f06208a7419e4fc1bbd7|1                 |credit_card |1               

A primeira analiase, indentifico que esses dados são referentes à caracterisca de cada pagamento

Vamos validar se há duplicados ou nulos na coluna de order_id

In [33]:
df_order_payments.filter(col('order_id').isNull()).show()

+--------+------------------+------------+--------------------+-------------+
|order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------+------------------+------------+--------------------+-------------+
+--------+------------------+------------+--------------------+-------------+



Não há dados nulos

In [34]:
df_order_payments.groupBy(df_order_payments.columns).count().filter(col('count')>1).show()



+--------+------------------+------------+--------------------+-------------+-----+
|order_id|payment_sequential|payment_type|payment_installments|payment_value|count|
+--------+------------------+------------+--------------------+-------------+-----+
+--------+------------------+------------+--------------------+-------------+-----+



                                                                                

Não há dados duplicados, vamos verificar a tipagem dos dados

In [35]:
df_order_payments.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)



Vamos salvar o dataset em parquet

In [36]:
Path(os.path.join(src_path.format(stage='silver'), 'olit_order_payment')).mkdir(parents=True, exist_ok=True)
df_order_payments.write.mode('overwrite')\
    .format('delta')\
        .option("compression", "snappy")\
            .save(os.path.join(src_path.format(stage='silver'), 'olit_order_payment'))

                                                                                

# Analisando o arquivo order reviews

In [37]:
df_orders_reviews = spark.read.csv(os.path.join(src_path.format(stage='bronze'), 'olist_order_reviews_dataset.csv'), header=True, sep=",", inferSchema=True)
df_orders_reviews.show(truncate=False, n=10)

                                                                                

+--------------------------------+--------------------------------+------------+--------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+-----------------------+
|review_id                       |order_id                        |review_score|review_comment_title|review_comment_message                                                                                                                                                        |review_creation_date|review_answer_timestamp|
+--------------------------------+--------------------------------+------------+--------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+-----------------------+
|7bc2406110b926393aa56f80a40eba40|

vamos verificar se há duplicações na review_id

In [38]:
df_orders_reviews.groupBy(df_orders_reviews.columns).count().filter(col('count')>1).show()



+--------------------+--------------------+-------------------+--------------------+----------------------+--------------------+-----------------------+-----+
|           review_id|            order_id|       review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|count|
+--------------------+--------------------+-------------------+--------------------+----------------------+--------------------+-----------------------+-----+
|Bem antes do praz...|                NULL|               NULL|                NULL|                  NULL|                NULL|                   NULL|    2|
|A única coisa que...| 2017-11-22 00:00:00|2017-11-23 14:53:55|                NULL|                  NULL|                NULL|                   NULL|    2|
|POREM NAO RECEBEM...| SOMENTE OS PROTE...|2018-02-22 00:00:00| 2018-02-23 20:04:41|                  NULL|                NULL|                   NULL|    2|
|           Obrigada"| 2016-10-27 00:00:00|201

                                                                                

Além de duplicação, identificados que os dados estão bagunçadoos entre as colunas.

vamos começar identificando o que não é uma order_id válida e retirar da nossa base

In [39]:
df_orders_reviews_orders = df_orders_reviews.join(df_order_payments, on='order_id', how='inner')
df_orders_reviews_orders.show(truncate=False)

                                                                                

+--------------------------------+--------------------------------+------------+-----------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+-----------------------+------------------+------------+--------------------+-------------+
|order_id                        |review_id                       |review_score|review_comment_title   |review_comment_message                                                                                                                                                        |review_creation_date|review_answer_timestamp|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------------------+--------------------------------+------------+-----------------------+----------------------------------------------------------------------------------------------------------------

Agora que só temos ordens validas, vamos selecionar apenas as colunas referentes ao review

In [40]:
df_orders_reviews = df_orders_reviews_orders.select(
    'order_id', 
    'review_id', 
    'review_score', 
    'review_comment_title',
    'review_comment_message',
    'review_creation_date',
    'review_answer_timestamp'
)

df_orders_reviews.show(truncate=False)

                                                                                

+--------------------------------+--------------------------------+------------+-----------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+-----------------------+
|order_id                        |review_id                       |review_score|review_comment_title   |review_comment_message                                                                                                                                                        |review_creation_date|review_answer_timestamp|
+--------------------------------+--------------------------------+------------+-----------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+-----------------------+
|73fc7af87114b39712e6da79

Verificando duplicadas na coluna de review_id

In [41]:
df_orders_reviews.groupBy(col('review_id')).count().filter(col('count')>1).show(truncate=False)



+--------------------------------+-----+
|review_id                       |count|
+--------------------------------+-----+
|f144ac1998474653203c861be02fd31f|2    |
|e4f5fbbbf2fa8f259e02e0e529eeae26|2    |
|e5ed97ed48e1884fafa20d5e842045c5|3    |
|c2abd74d4d97c50f515d66ddcbd1a59b|2    |
|868c4827b9ab7272338ae1c95d192505|2    |
|a8ffb30c74397c7cf37ab595d3c1662e|2    |
|952786cedfae3b1743d3f2d67bd12e01|3    |
|dcb6c0dc808bb7101f3717be34f7e8f5|2    |
|be6033aa21c31ae127c6d3c7f1a83528|3    |
|37471542321c6dca40aa4813d53e322f|2    |
|2aaed1788eac6c8d9688d00d390c3c74|2    |
|55f5fbc835cbc7401c388e07d827abb1|2    |
|f6e05a3f6dbc3bdb1ec3068725c67677|2    |
|153c7691f094e2b2d8c1c21036773975|2    |
|eb76b6dc2f62d548b9823eab120860e7|4    |
|a6d3a3c6028f51bd2342a1c32d9c7750|4    |
|06612663a9f70fe8fac1d6575cc02df6|2    |
|e4d4b6477ff4777b725e5d5682fa785d|2    |
|0f502163b90bab787e2ddfb009bf761b|2    |
|32b4a2152df0e53c5635141e6fda6058|2    |
+--------------------------------+-----+
only showing top

                                                                                

In [42]:
df_orders_reviews.filter(col('review_id')=='f144ac1998474653203c861be02fd31f').show(truncate=False)

+--------------------------------+--------------------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|order_id                        |review_id                       |review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|
+--------------------------------+--------------------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|ebe8e5e35f85b19e3b8132a054bbf523|f144ac1998474653203c861be02fd31f|5           |NULL                |NULL                  |2018-02-06 00:00:00 |2018-02-07 14:37:49    |
|df3e4e0056f83d44b57664f39f3dba9d|f144ac1998474653203c861be02fd31f|5           |NULL                |NULL                  |2018-02-06 00:00:00 |2018-02-07 14:37:49    |
+--------------------------------+--------------------------------+------------+--------------------+----------------------+--------------------+-----

Identificamos que possuimos review_id duplicado, o que faz sentido quando entendemos que o mesmo revisor pode ter feito avaliação em duas compras diferentes no mesmo dia.
Com isso, vamos manter os dados da forma que ele está.


Vamos verificar se há valores errados nas colunas

Para verificar a coluna de Id, teremos que utilizar expressão regex, tendo em vista que a coluna Id é uma criptografia de 32 bits

In [43]:
df_orders_reviews_invalidos = df_orders_reviews.filter(~col('review_id').rlike("^[a-f0-9]{32}$")).show(truncate=False)

                                                                                

+--------+---------+------------+--------------------+----------------------+--------------------+-----------------------+
|order_id|review_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|
+--------+---------+------------+--------------------+----------------------+--------------------+-----------------------+
+--------+---------+------------+--------------------+----------------------+--------------------+-----------------------+



In [44]:
df_orders_reviews_invalidos = df_orders_reviews.filter(col('review_id').rlike("^[a-f0-9]{32}$")).show(truncate=False)

+--------------------------------+--------------------------------+------------+-----------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+-----------------------+
|order_id                        |review_id                       |review_score|review_comment_title   |review_comment_message                                                                                                                                                        |review_creation_date|review_answer_timestamp|
+--------------------------------+--------------------------------+------------+-----------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+-----------------------+
|73fc7af87114b39712e6da79

Ficou evidente que não há dados inválidos na coluna de review_id

Também ficou visivel dados nulos nas colunas temporais, vamos fazer a limpeza dos dados nulos

In [45]:
df_orders_reviews = df_orders_reviews.dropna(subset=['review_creation_date'])
df_orders_reviews.show()

+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|            order_id|           review_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|
+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|73fc7af87114b3971...|7bc2406110b926393...|           4|                NULL|                  NULL| 2018-01-18 00:00:00|    2018-01-18 21:46:59|
|a548910a1c6147796...|80e641a11e56f04c1...|           5|                NULL|                  NULL| 2018-03-10 00:00:00|    2018-03-11 03:05:13|
|f9e4b658b201a9f2e...|228ce5500dc1d8e02...|           5|                NULL|                  NULL| 2018-02-17 00:00:00|    2018-02-18 14:36:24|
|658677c97b385a9be...|e64fb393e7b32834b...|           5|                NULL|  Recebi bem antes ...| 2017-04-21 00:00:00|   

In [46]:
df_orders_reviews.filter(col('review_creation_date').isNull()).show()

                                                                                

+--------+---------+------------+--------------------+----------------------+--------------------+-----------------------+
|order_id|review_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|
+--------+---------+------------+--------------------+----------------------+--------------------+-----------------------+
+--------+---------+------------+--------------------+----------------------+--------------------+-----------------------+



Vamos verificar agora se há dados duplicados entre order id e review id

In [47]:
df_orders_reviews.groupBy(
    col('order_id'),
    col('review_id')
).count().filter(col('count')>1).show(truncate=False)

+--------------------------------+--------------------------------+-----+
|order_id                        |review_id                       |count|
+--------------------------------+--------------------------------+-----+
|9f9993914a8e2fda5081619a7a994853|32e3b541c2d80e1b430f9e2e88876788|2    |
|6ac559b49cbb0d6d7c3eb071891785c7|84e0c9399929ec8171ef318b89af7fb5|4    |
|ae940b7edae45de05e43817ff84efbfc|9ca5de8784e79d34283e0fdfe9531cce|2    |
|8f3a46cc0e9d362bf43dd339e4379615|3639e59f95a0241360cc0c52933c5493|2    |
|b485e99d1c3896c44eb56886216fcd01|3c588b9fec099991b945b3779973284b|3    |
|72c3f657842e35f5d5279cc0fd06826b|93dfdec4d64a188b5e64aa5ecfc3b22d|2    |
|c177bd75cb8bbff3f8ce6842201c7b6f|b01b9d62b9ae868ea704b2411dd32ecd|2    |
|f409f5ae35c08a9135d9e5d15ec6ab67|8ba43c59f383d1df29f569800bfacbd2|2    |
|79b49127db2c9bc718a161183671505c|068d46134e84c6eb27b45859217cfba6|3    |
|4e81a6ed1cdc31e84c13a74ec121798c|1bdc875a8e71947c899a581d4f10a8f4|2    |
|3ba1f632cf89a08caf9837999bf827ef|38f1

                                                                                

In [48]:
df_orders_reviews.filter(col('review_id')=="e136151cc080d267cbeceba07763279d").show(truncate=False)

[Stage 138:>                                                        (0 + 1) / 1]

+--------------------------------+--------------------------------+------------+--------------------+-----------------------------------------------------+--------------------+-----------------------+
|order_id                        |review_id                       |review_score|review_comment_title|review_comment_message                               |review_creation_date|review_answer_timestamp|
+--------------------------------+--------------------------------+------------+--------------------+-----------------------------------------------------+--------------------+-----------------------+
|99eae747b45fb42516e46311533b3064|e136151cc080d267cbeceba07763279d|5           |NULL                |Gostei do produto muito bem embalado. super recomendo|2017-10-25 00:00:00 |2017-10-25 19:32:47    |
|99eae747b45fb42516e46311533b3064|e136151cc080d267cbeceba07763279d|5           |NULL                |Gostei do produto muito bem embalado. super recomendo|2017-10-25 00:00:00 |2017-10-25 19:32:47 

                                                                                

In [49]:
df_orders_reviews = df_orders_reviews.dropDuplicates()
df_orders_reviews.groupBy(
    col('order_id'),
    col('review_id')
).count().filter(col('count')>1).show(truncate=False)

                                                                                

+--------+---------+-----+
|order_id|review_id|count|
+--------+---------+-----+
+--------+---------+-----+



Agora os dados estão limpos, vamos verificar a tipagem dos dados

In [50]:
df_orders_reviews.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- review_score: string (nullable = true)
 |-- review_comment_title: string (nullable = true)
 |-- review_comment_message: string (nullable = true)
 |-- review_creation_date: string (nullable = true)
 |-- review_answer_timestamp: string (nullable = true)



In [51]:
df_orders_reviews.show(truncate=False, n=5)

                                                                                



+--------------------------------+--------------------------------+------------+-------------------------+---------------------------------------------------------------------------------------------------------+--------------------+-----------------------+
|order_id                        |review_id                       |review_score|review_comment_title     |review_comment_message                                                                                   |review_creation_date|review_answer_timestamp|
+--------------------------------+--------------------------------+------------+-------------------------+---------------------------------------------------------------------------------------------------------+--------------------+-----------------------+
|d14460d3fa8f54b8f0f7befd11820202|ad9c2612cc5ea378a403f3ba94cd22de|5           |Agilidade                |Chegou super rápido e bem embalado.                                                                      |2018-07-14 00:

                                                                                

In [52]:
df_orders_reviews = df_orders_reviews.withColumn('review_score', col('review_score').cast('int'))\
    .withColumn('review_creation_date', col('review_creation_date').cast('date'))\
    .withColumn('review_answer_timestamp', col('review_answer_timestamp').cast('date'))
df_orders_reviews.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- review_comment_title: string (nullable = true)
 |-- review_comment_message: string (nullable = true)
 |-- review_creation_date: date (nullable = true)
 |-- review_answer_timestamp: date (nullable = true)



In [53]:
df_orders_reviews.show()



+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|            order_id|           review_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|
+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|d14460d3fa8f54b8f...|ad9c2612cc5ea378a...|           5|           Agilidade|  Chegou super rápi...|          2018-07-14|             2018-07-15|
|f2b00200fa3b427fe...|26ccedab64af303bf...|           4|              Ótimo |           Tudo certo!|          2018-06-08|             2018-06-11|
|104d88e666e2ec697...|f67200897bebcf89f...|           1|Recebi o produto ...|  Recebi um produto...|          2018-06-09|             2018-06-12|
|0ea7c4f19490b1328...|be478d4dcd43d78a8...|           1|Não recebi a colc...|  Foi me enviado ap...|          2018-05-22|   

                                                                                

Agora que temos a tipagem feita, vamos salvar o arquivo final em parquet

In [54]:
Path(os.path.join(src_path.format(stage='silver'), 'olit_order_reviews')).mkdir(parents=True, exist_ok=True)
df_orders_reviews.write.mode('overwrite')\
    .format('delta')\
        .option("compression", "snappy")\
            .save(os.path.join(src_path.format(stage='silver'), 'olit_order_reviews'))

                                                                                

# Analise do arquivo olist orders dataset

In [55]:
df_orders = spark.read.csv(os.path.join(src_path.format(stage='bronze'), 'olist_orders_dataset.csv'), header=True, sep=",", inferSchema=True)
df_orders.show(truncate=False, n=10)

                                                                                

+--------------------------------+--------------------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|order_id                        |customer_id                     |order_status|order_purchase_timestamp|order_approved_at  |order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------------------+--------------------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b7cc49136f2d6af7|9ef432eb6251297304e76186b10a928d|delivered   |2017-10-02 10:56:33     |2017-10-02 11:07:15|2017-10-04 19:55:00         |2017-10-10 21:25:13          |2017-10-18 00:00:00          |
|53cdb2fc8bc7dce0b6741e2150273451|b0830fb4747a6c6d20dea0b8c802d7ef|delivered   |2018-07-24 20:41:37     |2018-07-26 03:24:27|2018-07-26 14:3

Vamos verificar se há dados nulos nas colunas principais que são Order_id e customer_id

In [56]:
df_orders.filter((col('order_id').isNull()) | (col('order_id')=='')).show()

+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|order_id|customer_id|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+



In [57]:
df_orders.filter((col('customer_id').isNull()) | (col('customer_id')=='')).show()

[Stage 177:>                                                        (0 + 1) / 1]

+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|order_id|customer_id|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+



                                                                                

Não possuimos dados nulos nas colunas principais, aogra vamos verificar se essa coluna possui uma coluna de identificador único

In [58]:
df_orders.groupBy(col('order_id')).count().filter(col('count')>1).show()



+--------+-----+
|order_id|count|
+--------+-----+
+--------+-----+



                                                                                

A coluna de order_id parece ser nossa coluna ID, vamos verificar a coluna de customer

In [59]:
df_orders.groupBy(col('customer_id')).count().filter(col('count')>1).show()

[Stage 184:>                                                        (0 + 2) / 2]

+-----------+-----+
|customer_id|count|
+-----------+-----+
+-----------+-----+



                                                                                

A coluna customer id também não possui dados duplicados

Analisando a estrutura do sistema proposto, entende-se que essa tabela deve ter como identificador único a ordem, pois a proposta é identificar o status do pedido do cliente.

Agora, vamos analisar a tipagem dos dados

In [60]:
df_orders.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



Ao fazer a leitura do dataset com Spark, a tipagem dos dados já foi realizada com sucesso, não sendo necessario qualquer alteração

vamos verificar agora os valores da coluna de status para entender as possibildiades

In [61]:
df_orders.select('order_status').distinct().show()

+------------+
|order_status|
+------------+
|     shipped|
|    canceled|
|    approved|
|    invoiced|
|     created|
|   delivered|
| unavailable|
|  processing|
+------------+



Vamos salvar nosso dataset em parquet

In [62]:
Path(os.path.join(src_path.format(stage='silver'), 'olit_orders')).mkdir(parents=True, exist_ok=True)
df_orders.write.mode('overwrite')\
    .format('delta')\
        .option("compression", "snappy")\
            .save(os.path.join(src_path.format(stage='silver'), 'olit_orders'))

                                                                                

# Analisando arquivo olist products dataset

In [63]:
df_products = spark.read.csv(os.path.join(src_path.format(stage='bronze'), 'olist_products_dataset.csv'), header=True, sep=",", inferSchema=True)
df_products.show(truncate=False, n=10)

+--------------------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|product_id                      |product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|1e9e8ef04dbcff4541ed26657ea517e5|perfumaria           |40                 |287                       |1                 |225             |16               |10               |14              |
|3aa071139cb16b67ca9e5dea641aaa2f|artes                |44                 |276                       |1                 |1000            |30               |18               |20              |
|96bd76ec8810374ed1b65e291975717f|e

A primeira vista, o dataset deve ter a leitura de que o product_id é o identificador único, categoria deve ser mais multiplos e os demais são caracteristas de cada produto sendo que podem ser parecidos ou iguais, mas com Id diferentes

Vamos verificar se há nulos ou vazios na coluna product_id

In [64]:
df_products.filter((col('product_id').isNull()) | (col('product_id')=='')).show()

+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+



Não foi identificado valores nulos ou vazios, vamos agora verificar se há duplicados

In [65]:
df_products.groupBy(col('product_id')).count().filter(col('count') > 1).show()

+----------+-----+
|product_id|count|
+----------+-----+
+----------+-----+



Não possuimos nulos na nossa coluna de identificador

In [66]:
df_products.groupBy(df_products.columns).count().filter(col('count') > 1).show()

[Stage 211:>                                                        (0 + 1) / 1]

+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+-----+
|product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|count|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+-----+
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+-----+



                                                                                

Vamos verificar agora a tipagem dos dados

In [67]:
df_products.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)



A tipagem dos dados está coerente com os dados do dataset, não sendo necessário fazer demais alterações. Vamos salvar nosso dataset em parquet

In [68]:
Path(os.path.join(src_path.format(stage='silver'), 'olist_products')).mkdir(parents=True, exist_ok=True)
df_products.write.mode('overwrite')\
    .format('delta')\
        .option("compression", "snappy")\
            .save(os.path.join(src_path.format(stage='silver'), 'olist_products'))

                                                                                

# Analisando o arquivo olist sellers

In [69]:
df_sellers = spark.read.csv(os.path.join(src_path.format(stage='bronze'), 'olist_sellers_dataset.csv'), header=True, sep=",", inferSchema=True)
df_sellers.show(truncate=False, n=10)

+--------------------------------+----------------------+-----------------+------------+
|seller_id                       |seller_zip_code_prefix|seller_city      |seller_state|
+--------------------------------+----------------------+-----------------+------------+
|3442f8959a84dea7ee197c632cb2df15|13023                 |campinas         |SP          |
|d1b65fc7debc3361ea86b5f14c68d2e2|13844                 |mogi guacu       |SP          |
|ce3ad9de960102d0677a81f5d0bb7b2d|20031                 |rio de janeiro   |RJ          |
|c0f3eea2e14555b6faeea3dd58c1b1c3|4195                  |sao paulo        |SP          |
|51a04a8a6bdcb23deccc82b0b80742cf|12914                 |braganca paulista|SP          |
|c240c4061717ac1806ae6ee72be3533b|20920                 |rio de janeiro   |RJ          |
|e49c26c3edfa46d227d5121a6b6e4d37|55325                 |brejao           |PE          |
|1b938a7ec6ac5061a66a3766e0e75f90|16304                 |penapolis        |SP          |
|768a86e36ad6aae3d03e

A ideia proposta para esse arquivo é diferente do dataset do customers. Enquanto no customer a ideia é ter o customer_id que é relacionado à um customer_unique_id, aqui a ideia é que a coluna sellers_id já seja a coluna de identificador único, vamos seguir com a analise come essa regra.

In [70]:
df_sellers.filter((col('seller_id').isNull()) | (col('seller_id')=='')).show()

+---------+----------------------+-----------+------------+
|seller_id|seller_zip_code_prefix|seller_city|seller_state|
+---------+----------------------+-----------+------------+
+---------+----------------------+-----------+------------+



não possuimos valores nulos ou vazios na nossa coluna de identificador único. Vamos verificar a tipagem dos dados

In [71]:
df_sellers.printSchema()

root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)



A tipagem dos dados faz sentido com os dados no dataset. Vamos salvar nosso dataset em parquet

In [76]:
df_sellers_ = df_sellers.select(
    col('seller_id'),
    col('seller_zip_code_prefix'),    
)

df_geo = df_sellers.select(
    col('seller_zip_code_prefix'),
    col('seller_city'),
    col('seller_state')
)

df_sellers_.show(1)

+--------------------+----------------------+
|           seller_id|seller_zip_code_prefix|
+--------------------+----------------------+
|3442f8959a84dea7e...|                 13023|
+--------------------+----------------------+
only showing top 1 row



                                                                                

In [77]:
Path(os.path.join(src_path.format(stage='silver'), 'olist_sellers')).mkdir(parents=True, exist_ok=True)
df_sellers.write.mode('overwrite')\
    .format('delta')\
        .option("compression", "snappy")\
            .save(os.path.join(src_path.format(stage='silver'), 'olist_sellers'))

                                                                                

Agora vamos concluir o arquivo de geolocalização

In [78]:
df_geo.show(5)

+----------------------+-----------------+------------+
|seller_zip_code_prefix|      seller_city|seller_state|
+----------------------+-----------------+------------+
|                 13023|         campinas|          SP|
|                 13844|       mogi guacu|          SP|
|                 20031|   rio de janeiro|          RJ|
|                  4195|        sao paulo|          SP|
|                 12914|braganca paulista|          SP|
+----------------------+-----------------+------------+
only showing top 5 rows



Agora, vamos verificar apenas os zip_codes que não estão na tabela de geolocalização e fazer uma lista dessas cidades
a fim de coletar a latitude e longitude da cidade. Vamos utilizar localização da cidade para os casos em que não possui latitude e longitude

In [None]:
df_insert_geo = df_geo.join(df_geolocation, on=df_geolocation.geolocation_zip_code_prefix == df_geo.seller_zip_code_prefix, how="left_anti")
unique_cities = df_insert_geo.select("seller_city", "seller_state").distinct().collect()
unique_cities

                                                                                

[Row(seller_city='brasilia', seller_state='DF'),
 Row(seller_city='pocos de caldas', seller_state='MG'),
 Row(seller_city='porto alegre', seller_state='RS'),
 Row(seller_city='aruja', seller_state='SP'),
 Row(seller_city='sao paulo', seller_state='SP'),
 Row(seller_city='curitiba', seller_state='PR')]

Vamos intanciar a biblioteca GeoPy e fazer um looping pelas listas de cidades

In [95]:
geolocator = Nominatim(user_agent="geoapi")
city_coords = {}

for row in unique_cities:
    cidade = row['seller_city']
    estado = row['seller_state']
    local_str = f"{cidade}, {estado}, Brasil"
    
    try:
        location = geolocator.geocode(local_str)
        if location:
            city_coords[(cidade.lower(), estado)] = (location.latitude, location.longitude)
    except Exception as e:
        print(f"Erro em {local_str}: {e}")
    
    time.sleep(1)

Agora vamos montar um dataset na padrão que queremos

In [96]:
city_coord_rows = [
    Row(
        seller_city=city,
        seller_state=state,
        geolocation_lat=lat,
        geolocation_lng=lng
    )
    for (city, state), (lat, lng) in city_coords.items()
]

df_coords = spark.createDataFrame(city_coord_rows)

In [None]:
df_result = df_insert_geo.join(
    df_coords,
    on=["seller_city", "seller_state"],
    how="left"
)

df_result = df_result.withColumnRenamed("seller_zip_code_prefix", "geolocation_zip_code_prefix") \
                     .withColumnRenamed("seller_city", "geolocation_city") \
                     .withColumnRenamed("seller_state", "geolocation_state")

df_result.show(5)

                                                                                

+----------------+-----------------+---------------------------+---------------+---------------+
|geolocation_city|geolocation_state|geolocation_zip_code_prefix|geolocation_lat|geolocation_lng|
+----------------+-----------------+---------------------------+---------------+---------------+
|        brasilia|               DF|                      72580|    -15.7939869|       -47.8828|
|    porto alegre|               RS|                      91901|    -30.0324999|    -51.2303767|
| pocos de caldas|               MG|                      37708|    -21.7900318|    -46.5647928|
|       sao paulo|               SP|                       2285|    -23.5506507|    -46.6333824|
|           aruja|               SP|                       7412|     -23.396266|    -46.3175449|
+----------------+-----------------+---------------------------+---------------+---------------+
only showing top 5 rows



In [101]:
df_insert = df_result.select(
    col('geolocation_zip_code_prefix'),
    col('geolocation_lat'),
    col('geolocation_lng'),
    col('geolocation_city'),
    col('geolocation_state')
)

df_insert.show(5)

                                                                                

+---------------------------+---------------+---------------+----------------+-----------------+
|geolocation_zip_code_prefix|geolocation_lat|geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+---------------+---------------+----------------+-----------------+
|                      72580|    -15.7939869|       -47.8828|        brasilia|               DF|
|                      91901|    -30.0324999|    -51.2303767|    porto alegre|               RS|
|                      37708|    -21.7900318|    -46.5647928| pocos de caldas|               MG|
|                       2285|    -23.5506507|    -46.6333824|       sao paulo|               SP|
|                       7412|     -23.396266|    -46.3175449|           aruja|               SP|
+---------------------------+---------------+---------------+----------------+-----------------+
only showing top 5 rows



Agora vamos fazer um union com o dataset preexistente de geolocalização

In [102]:
df_geolocation_ = df_geolocation.union(df_insert)
df_geolocation_.filter(col('geolocation_zip_code_prefix')=='72580').show()

                                                                                

+---------------------------+---------------+---------------+----------------+-----------------+
|geolocation_zip_code_prefix|geolocation_lat|geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+---------------+---------------+----------------+-----------------+
|                      72580|    -15.7939869|       -47.8828|        brasilia|               DF|
+---------------------------+---------------+---------------+----------------+-----------------+



Agora vamos salvar o arquivo final da dimensão geolocation

<!-- # Path(os.path.join(src_path.format(stage='silver'), 'olit_dm_geolocation')).mkdir(parents=True, exist_ok=True)
# df_geolocation.write.mode('overwrite')\
#     .format('delta')\
#         .option("compression", "snappy")\
#             .save(os.path.join(src_path.format(stage='silver'), 'olit_dm_geolocation')) -->

In [103]:
Path(os.path.join(src_path.format(stage='silver'), 'olit_dm_geolocation')).mkdir(parents=True, exist_ok=True)
df_geolocation_.write.mode('overwrite')\
    .format('delta')\
        .option("compression", "snappy")\
            .save(os.path.join(src_path.format(stage='silver'), 'olit_dm_geolocation'))

                                                                                