# DataWarehouse com dados de Eccomerce

Os scripts abaixo realizam a modelagem de dados para criar um data warehouse.

Primeiro, é feito uma analise bem preliminar dos dados, junto a testes de criação de outras colunas. Por último, os dados são modelados para a criação de um data warehouse.

## Importando Bibliotecas

In [0]:
from pyspark.sql.functions import col,isnan, when, count, sum, udf, to_timestamp, datediff, max, min, lit, avg, row_number
from pyspark.sql.functions import year, month, hour, dayofmonth, dayofweek, date_format
from pyspark.sql.window import Window
from pyspark.sql.utils import AnalysisException
from pyspark.sql.types import StringType, IntegerType, DoubleType, TimestampType, StructType
from pyspark.ml.feature import QuantileDiscretizer

import datetime
import pandas as pd
import math




### Conectando a pasta de dados no Azure

In [0]:
dbutils.fs.mkdirs("/mnt")
dbutils.fs.mkdirs("/mnt/eccommerce")
dbutils.fs.mkdirs("/mnt/eccommerce/dados")
dbutils.fs.mkdirs("/mnt/eccommerce/dados/raw_data")
dbutils.fs.mkdirs("/mnt/eccommerce/dados/dw_data")

if dbutils.fs.ls("/mnt/eccommerce/dados/raw_data")[0].size > 0:
    dbutils.fs.unmount("/mnt/eccommerce/dados/") 

/mnt/eccommerce/dados/ has been unmounted.


In [0]:
client_id = dbutils.secrets.get("eccomerce_secrets", "client_id")
client_secret = dbutils.secrets.get("eccomerce_secrets", "client_secret")
client_endpoint = dbutils.secrets.get("eccomerce_secrets", "client_endpoint")

configs = {"fs.azure.account.auth.type": "OAuth",
       "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id": client_id,
       "fs.azure.account.oauth2.client.secret": client_secret,
       "fs.azure.account.oauth2.client.endpoint": client_endpoint,
       "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

dbutils.fs.mount(
source = "abfss://c-eccommerce@saeccommerce.dfs.core.windows.net/",
mount_point = "/mnt/eccommerce/dados/",
extra_configs = configs)

Out[113]: True

### Importando os dados

In [0]:
path = "/mnt/eccommerce/dados/raw_data/"

# CSV
orders_items_df = spark.read.csv(path+'olist_order_items_dataset.csv',header=True)
orders_df = spark.read.csv(path+'olist_orders_dataset.csv',header=True)
orders_payments_df = spark.read.csv(path+'olist_order_payments_dataset.csv',header=True)
orders_reviews_df = spark.read.option("multiLine",True).csv(path+'olist_order_reviews_dataset.csv',header=True, escape="\"")

customers_df = spark.read.csv(path+'olist_customers_dataset.csv',header=True)
geolocations_df = spark.read.csv(path+'olist_geolocation_dataset.csv',header=True)
sellers_df = spark.read.csv(path+'olist_sellers_dataset.csv',header=True)
products_df = spark.read.csv(path+'olist_products_dataset.csv',header=True)

### Verificando características dos dados

Verifica primeiras linhas de cada tabela

In [0]:
orders_items_df.limit(10).toPandas()

Unnamed: 0,order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value
0,00010242fe8c5a6d1ba2dd792cb16214,1,4244733e06e7ecb4970a6e2683c13e61,48436dade18ac8b2bce089ec2a041202,2017-09-19 09:45:35,58.9,13.29
1,00018f77f2f0320c557190d7a144bdd3,1,e5f2d52b802189ee658865ca93d83a8f,dd7ddc04e1b6c2c614352b383efe2d36,2017-05-03 11:05:13,239.9,19.93
2,000229ec398224ef6ca0657da4fc703e,1,c777355d18b72b67abbeef9df44fd0fd,5b51032eddd242adc84c38acab88f23d,2018-01-18 14:48:30,199.0,17.87
3,00024acbcdf0a6daa1e931b038114c75,1,7634da152a4610f1595efa32f14722fc,9d7a1d34a5052409006425275ba1c2b4,2018-08-15 10:10:18,12.99,12.79
4,00042b26cf59d7ce69dfabb4e55b4fd9,1,ac6c3623068f30de03045865e4e10089,df560393f3a51e74553ab94004ba5c87,2017-02-13 13:57:51,199.9,18.14
5,00048cc3ae777c65dbb7d2a0634bc1ea,1,ef92defde845ab8450f9d70c526ef70f,6426d21aca402a131fc0a5d0960a3c90,2017-05-23 03:55:27,21.9,12.69
6,00054e8431b9d7675808bcb819fb4a32,1,8d4f2bb7e93e6710a28f34fa83ee7d28,7040e82f899a04d1b434b795a43b4617,2017-12-14 12:10:31,19.9,11.85
7,000576fe39319847cbb9d288c5617fa6,1,557d850972a7d6f792fd18ae1400d9b6,5996cddab893a4652a15592fb58ab8db,2018-07-10 12:30:45,810.0,70.75
8,0005a1a1728c9d785b8e2b08b904576c,1,310ae3c140ff94b03219ad0adc3c778f,a416b6a846a11724393025641d4edd5e,2018-03-26 18:31:29,145.95,11.65
9,0005f50442cb953dcd1d21e1fb923495,1,4535b0e1091c278dfd193e5a1d63b39f,ba143b05f0110f0dc71ad71b4466ce92,2018-07-06 14:10:56,53.99,11.4


In [0]:
print((orders_items_df.count(), len(orders_items_df.columns)))

(112650, 7)


In [0]:
orders_df.limit(10).toPandas()

Unnamed: 0,order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
0,e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00
1,53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13 00:00:00
2,47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04 00:00:00
3,949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15 00:00:00
4,ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26 00:00:00
5,a4591c265e18cb1dcee52889e2d8acc3,503740e9ca751ccdda7ba28e9ab8f608,delivered,2017-07-09 21:57:05,2017-07-09 22:10:13,2017-07-11 14:58:04,2017-07-26 10:57:55,2017-08-01 00:00:00
6,136cce7faa42fdb2cefd53fdc79a6098,ed0271e0b7da060a393796590e7b737a,invoiced,2017-04-11 12:22:08,2017-04-13 13:25:17,,,2017-05-09 00:00:00
7,6514b8ad8028c9f2cc2374ded245783f,9bdf08b4b3b52b5526ff42d37d47f222,delivered,2017-05-16 13:10:30,2017-05-16 13:22:11,2017-05-22 10:07:46,2017-05-26 12:55:51,2017-06-07 00:00:00
8,76c6e866289321a7c93b82b54852dc33,f54a9f0e6b351c431402b8461ea51999,delivered,2017-01-23 18:29:09,2017-01-25 02:50:47,2017-01-26 14:16:31,2017-02-02 14:08:10,2017-03-06 00:00:00
9,e69bfb5eb88e0ed6a785585b27e16dbf,31ad1d1b63eb9962463f764d4e6e0c9d,delivered,2017-07-29 11:55:02,2017-07-29 12:05:32,2017-08-10 19:45:24,2017-08-16 17:14:30,2017-08-23 00:00:00


In [0]:
print((orders_df.count(), len(orders_df.columns)))

(99441, 8)


In [0]:
orders_payments_df.limit(10).toPandas()

Unnamed: 0,order_id,payment_sequential,payment_type,payment_installments,payment_value
0,b81ef226f3fe1789b1e8b2acac839d17,1,credit_card,8,99.33
1,a9810da82917af2d9aefd1278f1dcfa0,1,credit_card,1,24.39
2,25e8ea4e93396b6fa0d3dd708e76c1bd,1,credit_card,1,65.71
3,ba78997921bbcdc1373bb41e913ab953,1,credit_card,8,107.78
4,42fdf880ba16b47b59251dd489d4441a,1,credit_card,2,128.45
5,298fcdf1f73eb413e4d26d01b25bc1cd,1,credit_card,2,96.12
6,771ee386b001f06208a7419e4fc1bbd7,1,credit_card,1,81.16
7,3d7239c394a212faae122962df514ac7,1,credit_card,3,51.84
8,1f78449c87a54faf9e96e88ba1491fa9,1,credit_card,6,341.09
9,0573b5e23cbd798006520e1d5b4c6714,1,boleto,1,51.95


In [0]:
print((orders_payments_df.count(), len(orders_payments_df.columns)))

(103886, 5)


In [0]:
orders_reviews_df.limit(10).toPandas()

Unnamed: 0,review_id,order_id,review_score,review_comment_title,review_comment_message,review_creation_date,review_answer_timestamp
0,7bc2406110b926393aa56f80a40eba40,73fc7af87114b39712e6da79b0a377eb,4,,,2018-01-18 00:00:00,2018-01-18 21:46:59
1,80e641a11e56f04c1ad469d5645fdfde,a548910a1c6147796b98fdf73dbeba33,5,,,2018-03-10 00:00:00,2018-03-11 03:05:13
2,228ce5500dc1d8e020d8d1322874b6f0,f9e4b658b201a9f2ecdecbb34bed034b,5,,,2018-02-17 00:00:00,2018-02-18 14:36:24
3,e64fb393e7b32834bb789ff8bb30750e,658677c97b385a9be170737859d3511b,5,,Recebi bem antes do prazo estipulado.,2017-04-21 00:00:00,2017-04-21 22:02:06
4,f7c4243c7fe1938f181bec41a392bdeb,8e6bfb81e283fa7e4f11123a3fb894f1,5,,Parabéns lojas lannister adorei comprar pela I...,2018-03-01 00:00:00,2018-03-02 10:26:53
5,15197aa66ff4d0650b5434f1b46cda19,b18dcdf73be66366873cd26c5724d1dc,1,,,2018-04-13 00:00:00,2018-04-16 00:39:37
6,07f9bee5d1b850860defd761afa7ff16,e48aa0d2dcec3a2e87348811bcfdf22b,5,,,2017-07-16 00:00:00,2017-07-18 19:30:34
7,7c6400515c67679fbee952a7525281ef,c31a859e34e3adac22f376954e19b39d,5,,,2018-08-14 00:00:00,2018-08-14 21:36:06
8,a3f6f7f6f433de0aefbb97da197c554c,9c214ac970e84273583ab523dfafd09b,5,,,2017-05-17 00:00:00,2017-05-18 12:05:37
9,8670d52e15e00043ae7de4c01cc2fe06,b9bf720beb4ab3728760088589c62129,4,recomendo,aparelho eficiente. no site a marca do aparelh...,2018-05-22 00:00:00,2018-05-23 16:45:47


In [0]:
print((orders_reviews_df.count(), len(orders_reviews_df.columns)))

(99223, 7)


In [0]:
customers_df.limit(10).toPandas()

Unnamed: 0,customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state
0,06b8999e2fba1a1fbc88172c00ba8bc7,861eff4711a542e4b93843c6dd7febb0,14409,franca,SP
1,18955e83d337fd6b2def6b18a428ac77,290c77bc529b7ac935b93aa66c333dc3,9790,sao bernardo do campo,SP
2,4e7b3e00288586ebd08712fdd0374a03,060e732b5b29e8181a18229c7b0b2b5e,1151,sao paulo,SP
3,b2b6027bc5c5109e529d4dc6358b12c3,259dac757896d24d7702b9acbbff3f3c,8775,mogi das cruzes,SP
4,4f2d8ab171c80ec8364f7c12e35b23ad,345ecd01c38d18a9036ed96c73b8d066,13056,campinas,SP
5,879864dab9bc3047522c92c82e1212b8,4c93744516667ad3b8f1fb645a3116a4,89254,jaragua do sul,SC
6,fd826e7cf63160e536e0908c76c3f441,addec96d2e059c80c30fe6871d30d177,4534,sao paulo,SP
7,5e274e7a0c3809e14aba7ad5aae0d407,57b2a98a409812fe9618067b6b8ebe4f,35182,timoteo,MG
8,5adf08e34b2e993982a47070956c5c65,1175e95fb47ddff9de6b2b06188f7e0d,81560,curitiba,PR
9,4b7139f34592b3a31687243a302fa75b,9afe194fb833f79e300e37e580171f22,30575,belo horizonte,MG


In [0]:
print((customers_df.count(), len(customers_df.columns)))

(99441, 5)


In [0]:
geolocations_df.limit(10).toPandas()

Unnamed: 0,geolocation_zip_code_prefix,geolocation_lat,geolocation_lng,geolocation_city,geolocation_state
0,1037,-23.54562128115268,-46.63929204800168,sao paulo,SP
1,1046,-23.54608112703553,-46.64482029837157,sao paulo,SP
2,1046,-23.54612896641469,-46.64295148361138,sao paulo,SP
3,1041,-23.5443921648681,-46.63949930627844,sao paulo,SP
4,1035,-23.541577961711493,-46.64160722329613,sao paulo,SP
5,1012,-23.547762303364262,-46.63536053788448,são paulo,SP
6,1047,-23.54627311241268,-46.64122516971552,sao paulo,SP
7,1013,-23.546923208436723,-46.6342636964915,sao paulo,SP
8,1029,-23.543769055769133,-46.63427784085132,sao paulo,SP
9,1011,-23.547639550320632,-46.63603162315495,sao paulo,SP


In [0]:
print((geolocations_df.count(), len(geolocations_df.columns)))

(1000163, 5)


In [0]:
sellers_df.limit(10).toPandas()

Unnamed: 0,seller_id,seller_zip_code_prefix,seller_city,seller_state
0,3442f8959a84dea7ee197c632cb2df15,13023,campinas,SP
1,d1b65fc7debc3361ea86b5f14c68d2e2,13844,mogi guacu,SP
2,ce3ad9de960102d0677a81f5d0bb7b2d,20031,rio de janeiro,RJ
3,c0f3eea2e14555b6faeea3dd58c1b1c3,4195,sao paulo,SP
4,51a04a8a6bdcb23deccc82b0b80742cf,12914,braganca paulista,SP
5,c240c4061717ac1806ae6ee72be3533b,20920,rio de janeiro,RJ
6,e49c26c3edfa46d227d5121a6b6e4d37,55325,brejao,PE
7,1b938a7ec6ac5061a66a3766e0e75f90,16304,penapolis,SP
8,768a86e36ad6aae3d03ee3c6433d61df,1529,sao paulo,SP
9,ccc4bbb5f32a6ab2b7066a4130f114e3,80310,curitiba,PR


In [0]:
print((sellers_df.count(), len(sellers_df.columns)))

(3095, 4)


In [0]:
products_df.limit(10).toPandas()

Unnamed: 0,product_id,product_category_name,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm
0,1e9e8ef04dbcff4541ed26657ea517e5,perfumaria,40,287,1,225,16,10,14
1,3aa071139cb16b67ca9e5dea641aaa2f,artes,44,276,1,1000,30,18,20
2,96bd76ec8810374ed1b65e291975717f,esporte_lazer,46,250,1,154,18,9,15
3,cef67bcfe19066a932b7673e239eb23d,bebes,27,261,1,371,26,4,26
4,9dc1a7de274444849c219cff195d0b71,utilidades_domesticas,37,402,4,625,20,17,13
5,41d3672d4792049fa1779bb35283ed13,instrumentos_musicais,60,745,1,200,38,5,11
6,732bd381ad09e530fe0a5f457d81becb,cool_stuff,56,1272,4,18350,70,24,44
7,2548af3e6e77a690cf3eb6368e9ab61e,moveis_decoracao,56,184,2,900,40,8,40
8,37cc742be07708b53a98702e77a21a02,eletrodomesticos,57,163,1,400,27,13,17
9,8c92109888e8cdf9d66dc7e463025574,brinquedos,36,1156,1,600,17,10,12


In [0]:
print((products_df.count(), len(products_df.columns)))

(32951, 9)


### Cria tabelas que serão usadas no Azure Synapse

In [0]:
%sql
create database if not exists db_eccommerce
location '/mnt/eccommerce/dados/dw_data'

In [0]:
%sql
create table if not exists db_eccommerce.dw_time(
  IDSK int
  , full_date timestamp
  , year int
  , month int
  , day int
  , hour int
  , year_month VARCHAR(7)
  , month_name VARCHAR(20)
  , is_weekend int
  , day_of_week VARCHAR(20) 
)using delta
location '/mnt/eccommerce/dados/dw_data/dw_time'

In [0]:
%sql
create table if not exists db_eccommerce.dw_products(
  IDSK int
  , product_id VARCHAR(32)
  , product_category VARCHAR(50)
  , product_name_lenght int
  , product_description_lenght int
  , product_photos_qty int
  , product_weight_g double
  , product_length_cm double
  , product_height_cm double
  , product_width_cm double
  , INICIO VARCHAR(10)
  , FIM VARCHAR(10)
)using delta
location '/mnt/eccommerce/dados/dw_data/dw_products'

In [0]:
%sql
create table if not exists db_eccommerce.dw_sellers(
  IDSK int
  , seller_id VARCHAR(32) 
  , seller_city VARCHAR(50)
  , seller_state VARCHAR(2)
  , seller_zip_code_prefix VARCHAR(5)
  , geolocation_lat double
  , geolocation_lng double
  , INICIO VARCHAR(10)
  , FIM VARCHAR(10)
)using delta
location '/mnt/eccommerce/dados/dw_data/dw_sellers'

In [0]:
%sql
create table if not exists db_eccommerce.dw_customers(
  IDSK int
  , customer_id VARCHAR(32) 
  , customer_unique_id VARCHAR(32)
  , customer_zip_code_prefix VARCHAR(5)
  , customer_city VARCHAR(50)
  , customer_state VARCHAR(2)
  , geolocation_lat double
  , geolocation_lng double
  , customer_segmantation VARCHAR(50)
)using delta
location '/mnt/eccommerce/dados/dw_data/dw_customers'

In [0]:
%sql
create table if not exists db_eccommerce.dw_orders(
  IDSK int
  , order_id VARCHAR(32)
  , arrival_status VARCHAR(50)
  , estimated_delivery_rate VARCHAR(50)
  , arrival_delivery_rate VARCHAR(50)
  , shipping_delivery_rate VARCHAR(50)  
)using delta
location '/mnt/eccommerce/dados/dw_data/dw_orders'

In [0]:
%sql
create table if not exists db_eccommerce.dw_payments(
  customer_id int
  , order_id int
  , payment_sequential int
  , payment_type VARCHAR(50)  
  , payment_installments int
  , payment_value double
  , order_purchase_timestamp_id int
  , order_approved_at_id int
  , order_delivered_carrier_date_id int
  , order_delivered_customer_date_id int
  , order_estimated_delivery_date_id int
  , estimated_days int
  , arrival_days int
  , shipping_days int
)using delta
location '/mnt/eccommerce/dados/dw_data/dw_payments'

In [0]:
%sql
create table if not exists db_eccommerce.dw_reviews(
  customer_id int
  , order_id int
  , review_id VARCHAR(32)
  , review_score int
  , review_comment_title VARCHAR(80)
  , review_comment_message VARCHAR(2000)
  , order_purchase_timestamp_id int
  , order_approved_at_id int
  , order_delivered_carrier_date_id int
  , order_delivered_customer_date_id int
  , order_estimated_delivery_date_id int
  , review_creation_date_id int
  , review_answer_timestamp_id int
  , estimated_days int
  , arrival_days int
  , shipping_days int
)using delta
location '/mnt/eccommerce/dados/dw_data/dw_reviews'

In [0]:
%sql
create table if not exists db_eccommerce.dw_itens(
  order_item_id int
  , customer_id int
  , order_id int
  , product_id int
  , seller_id int
  , price double
  , freight_value double
  , order_purchase_timestamp_id int
  , order_approved_at_id int
  , order_delivered_carrier_date_id int
  , order_delivered_customer_date_id int
  , order_estimated_delivery_date_id int
  , shipping_limit_date_id int
  , estimated_days int
  , arrival_days int
  , shipping_days int
  , seller_to_carrier_status VARCHAR(50)
)using delta
location '/mnt/eccommerce/dados/dw_data/dw_itens'

In [0]:
dw_products_old = sqlContext.table("db_eccommerce.dw_products")
dw_sellers_old = sqlContext.table("db_eccommerce.dw_sellers")
dw_customers_old = sqlContext.table("db_eccommerce.dw_customers")

dw_orders_old = sqlContext.table("db_eccommerce.dw_orders")
dw_payments_old = sqlContext.table("db_eccommerce.dw_payments")
dw_reviews_old = sqlContext.table("db_eccommerce.dw_reviews")
dw_itens_old = sqlContext.table("db_eccommerce.dw_itens")

In [0]:
print(dw_products_old.count(), len(dw_products_old.columns))
print(dw_sellers_old.count(), len(dw_sellers_old.columns))
print(dw_customers_old.count(), len(dw_customers_old.columns))

print(dw_orders_old.count(), len(dw_orders_old.columns))
print(dw_payments_old.count(), len(dw_payments_old.columns))
print(dw_reviews_old.count(), len(dw_reviews_old.columns))
print(dw_itens_old.count(), len(dw_itens_old.columns))

32951 12
3095 9
99441 9
99441 6
103886 14
99223 16
112650 17


In [0]:
if dw_products_old.count() > 0:
    dw_products_old = dw_products_old.withColumn("IDSK", col("IDSK").cast(IntegerType()) )
    dw_products_old = dw_products_old.withColumn("product_name_lenght", col("product_name_lenght").cast(IntegerType()))
    dw_products_old = dw_products_old.withColumn("product_description_lenght", col("product_description_lenght").cast(IntegerType()))
    dw_products_old = dw_products_old.withColumn("product_photos_qty", col("product_photos_qty").cast(IntegerType()))
    dw_products_old = dw_products_old.withColumn("product_weight_g", col("product_weight_g").cast(DoubleType()))
    dw_products_old = dw_products_old.withColumn("product_length_cm", col("product_length_cm").cast(DoubleType()))
    dw_products_old = dw_products_old.withColumn("product_height_cm", col("product_height_cm").cast(DoubleType()))
    dw_products_old = dw_products_old.withColumn("product_width_cm", col("product_width_cm").cast(DoubleType()))

if dw_sellers_old.count() > 0:
    dw_sellers_old = dw_sellers_old.withColumn("IDSK", col("IDSK").cast(IntegerType()) )
    dw_sellers_old = dw_sellers_old.withColumn("geolocation_lat", col("geolocation_lat").cast(DoubleType()) )
    dw_sellers_old = dw_sellers_old.withColumn("geolocation_lng", col("geolocation_lng").cast(DoubleType()) )

if dw_customers_old.count() > 0:
    dw_customers_old = dw_customers_old.withColumn("IDSK", col("IDSK").cast(IntegerType()) )
    dw_customers_old = dw_customers_old.withColumn("geolocation_lat", col("geolocation_lat").cast(DoubleType()) )
    dw_customers_old = dw_customers_old.withColumn("geolocation_lng", col("geolocation_lng").cast(DoubleType()) )

if dw_orders_old.count() > 0:
    dw_orders_old = dw_orders_old.withColumn("IDSK", col("IDSK").cast(IntegerType()) )

if dw_payments_old.count() > 0:
    dw_payments_old = dw_payments_old.withColumn("customer_id", col("customer_id").cast(IntegerType()) )
    dw_payments_old = dw_payments_old.withColumn("order_id", col("order_id").cast(IntegerType()) )
    dw_payments_old = dw_payments_old.withColumn("payment_sequential", col("payment_sequential").cast(IntegerType()) )
    dw_payments_old = dw_payments_old.withColumn("payment_installments", col("payment_installments").cast(IntegerType()) )
    dw_payments_old = dw_payments_old.withColumn("payment_value", col("payment_value").cast(DoubleType()) )

    dw_payments_old = dw_payments_old.withColumn('order_purchase_timestamp_id', col('order_purchase_timestamp_id').cast(IntegerType())) 
    dw_payments_old = dw_payments_old.withColumn('order_approved_at_id', col('order_approved_at_id').cast(IntegerType())) 
    dw_payments_old = dw_payments_old.withColumn('order_delivered_carrier_date_id', col('order_delivered_carrier_date_id').cast(IntegerType())) 
    dw_payments_old = dw_payments_old.withColumn('order_delivered_customer_date_id', col('order_delivered_customer_date_id').cast(IntegerType())) 
    dw_payments_old = dw_payments_old.withColumn('order_estimated_delivery_date_id', col('order_estimated_delivery_date_id').cast(IntegerType()))

    dw_payments_old = dw_payments_old.withColumn('estimated_days', col('estimated_days').cast(IntegerType()))
    dw_payments_old = dw_payments_old.withColumn('arrival_days', col('arrival_days').cast(IntegerType()))
    dw_payments_old = dw_payments_old.withColumn('shipping_days', col('shipping_days').cast(IntegerType()))

if dw_reviews_old.count() > 0:
    dw_reviews_old = dw_reviews_old.withColumn("customer_id", col("customer_id").cast(IntegerType()) )
    dw_reviews_old = dw_reviews_old.withColumn("order_id", col("order_id").cast(IntegerType()) )
    dw_reviews_old = dw_reviews_old.withColumn("review_score", col("review_score").cast(IntegerType()) )
    dw_reviews_old = dw_reviews_old.withColumn("review_creation_date_id", col("review_creation_date_id").cast(IntegerType()) )
    dw_reviews_old = dw_reviews_old.withColumn("review_answer_timestamp_id" , col("review_answer_timestamp_id" ).cast(IntegerType()) )

    dw_reviews_old = dw_reviews_old.withColumn('order_purchase_timestamp_id', col('order_purchase_timestamp_id').cast(IntegerType())) 
    dw_reviews_old = dw_reviews_old.withColumn('order_approved_at_id', col('order_approved_at_id').cast(IntegerType())) 
    dw_reviews_old = dw_reviews_old.withColumn('order_delivered_carrier_date_id', col('order_delivered_carrier_date_id').cast(IntegerType())) 
    dw_reviews_old = dw_reviews_old.withColumn('order_delivered_customer_date_id', col('order_delivered_customer_date_id').cast(IntegerType())) 
    dw_reviews_old = dw_reviews_old.withColumn('order_estimated_delivery_date_id', col('order_estimated_delivery_date_id').cast(IntegerType()))

    dw_reviews_old = dw_reviews_old.withColumn('estimated_days', col('estimated_days').cast(IntegerType()))
    dw_reviews_old = dw_reviews_old.withColumn('arrival_days', col('arrival_days').cast(IntegerType()))
    dw_reviews_old = dw_reviews_old.withColumn('shipping_days', col('shipping_days').cast(IntegerType()))

if dw_itens_old.count() > 0:
    dw_itens_old = dw_itens_old.withColumn("order_item_id", col("order_item_id").cast(IntegerType()) )
    dw_itens_old = dw_itens_old.withColumn("customer_id", col("customer_id").cast(IntegerType()) )
    dw_itens_old = dw_itens_old.withColumn("order_id", col("order_id").cast(IntegerType()) )
    dw_itens_old = dw_itens_old.withColumn("seller_id", col("order_id").cast(IntegerType()) )
    dw_itens_old = dw_itens_old.withColumn("product_id", col("order_id").cast(IntegerType()) )

    dw_itens_old  = dw_itens_old.withColumn("price", col("price").cast(DoubleType()) )
    dw_itens_old  = dw_itens_old.withColumn("freight_value", col("freight_value").cast(DoubleType()) )
    
    dw_itens_old = dw_itens_old.withColumn('order_purchase_timestamp_id', col('order_purchase_timestamp_id').cast(IntegerType())) 
    dw_itens_old = dw_itens_old.withColumn('order_approved_at_id', col('order_approved_at_id').cast(IntegerType())) 
    dw_itens_old = dw_itens_old.withColumn('order_delivered_carrier_date_id', col('order_delivered_carrier_date_id').cast(IntegerType())) 
    dw_itens_old = dw_itens_old.withColumn('order_delivered_customer_date_id', col('order_delivered_customer_date_id').cast(IntegerType())) 
    dw_itens_old = dw_itens_old.withColumn('order_estimated_delivery_date_id', col('order_estimated_delivery_date_id').cast(IntegerType()))
    dw_itens_old = dw_itens_old.withColumn('shipping_limit_date_id', col('shipping_limit_date_id').cast(IntegerType()))

    dw_itens_old = dw_itens_old.withColumn('estimated_days', col('estimated_days').cast(IntegerType()))
    dw_itens_old = dw_itens_old.withColumn('arrival_days', col('arrival_days').cast(IntegerType()))
    dw_itens_old = dw_itens_old.withColumn('shipping_days', col('shipping_days').cast(IntegerType()))

### Cria funções para auxiliar na criação de tabelas do data warehouse

Reduz o número de categorias para facilitar as analises

In [0]:
def reduce_categories(x):
    if x in ['moveis_escritorio', 'moveis_decoracao', 'moveis_sala', 'moveis_cozinha_area_de_servico_jantar_e_jardim', 'cama_mesa_banho', 'casa_conforto', 'casa_conforto_2', 'casa_construcao', 'ferramentas_jardim', 'moveis_quarto', 'moveis_colchao_e_estofado']:
        return 'Mobília'
    
    elif x in ['automotivo', 'informatica_acessorios', 'instrumentos_musicais', 'consoles_games', 'relogios_presentes', 'climatizacao', 'telefonia', 'eletronicos', 'telefonia_fixa', 'tablets_impressao_imagem', 'pcs', 'portateis_casa_forno_e_cafe', 'eletroportateis', 'audio', 'sinalizacao_e_seguranca', 'seguros_e_servicos']:
        return 'Eletrônicos'
    
    elif x in ['fashion_roupa_feminina', 'fashion_roupa_masculina', 'fashion_bolsas_e_acessorios', 'fashion_calcados', 'fashion_esporte', 'fashion_underwear_e_moda_praia', 'fashion_roupa_infanto_juvenil', 'bebes', 'cool_stuff', ]:
        return 'Fashion'
    
    elif x in ['utilidades_domesticas', 'casa_conforto', 'eletrodomesticos', 'eletrodomesticos_2', 'flores', 'construcao_ferramentas_jardim', 'ferramentas_jardim', 'construcao_ferramentas_iluminacao', 'construcao_ferramentas_ferramentas', 'malas_acessorios', 'la_cuisine', 'pet_shop', 'market_place']:
        return 'Acessórios Doméstico'
    
    elif x in ['esporte_lazer', 'brinquedos', 'cds_dvds_musicais', 'musica', 'dvds_blu_ray', 'cine_foto', 'artigos_de_festas', 'artigos_de_natal', 'artes_e_artesanato', 'artes']:
        return 'Entreterimento'
    
    elif x in ['beleza_saude', 'perfumaria', 'fraldas_higiene']:
        return 'Produtos de Beleza e Higiene'
    
    elif x in ['alimentos_bebidas', 'bebidas', 'alimentos']:
        return 'Comidas e Bebidas'
    
    elif x in ['livros_interesse_geral', 'livros_tecnicos', 'livros_importados', 'papelaria']:
        return 'Livros e artigos de papelaria'
    
    elif x in ['construcao_ferramentas_construcao', 'construcao_ferramentas_seguranca', 'industria_comercio_e_negocios', 'agro_industria_e_comercio']:
        return 'Industria e Construção'
    else:
        return x
    
    
reduce_categories_udf = udf(reduce_categories, StringType())

Cria colunas para verificar o status da entregas

In [0]:
def get_arrival_status(x):
    if x == 0:
        return 'Atrasado'
    if x:
        if x >= 0:
            return 'Em tempo'
        else:
            return 'Atrasado'
    else:
        return None
    
get_arrival_status_udf = udf(get_arrival_status, StringType())


In [0]:
def get_duration_status(x):
    if x:
        if x in range(0, 8):
            return 'Muito Rápido'
        
        elif x in range(8, 16):
            return 'Rápido'
        
        elif x in range(16, 25):
            return 'Duração OK'
        
        elif x > 24:
            return 'Devagar'
        
        else:
            return 'Muito Devagar'
    else:
        return None

get_duration_status_udf = udf(get_duration_status, StringType())


Cria colunas para facilitar analise das notas

In [0]:
get_review_status_udf = udf(lambda x: 'Satisfeito' if x >= 4 else 'Não Satisfeito', StringType())

Realiza segmentação de clientes

In [0]:
df= customers_df.join( orders_df,["customer_id"], how='inner')
df= df.join(orders_reviews_df, ["order_id"], how='inner')
df= df.join(orders_items_df, ["order_id"], how='inner')
df= df.join(products_df, ["product_id"], how='inner')
df= df.join(orders_payments_df, ["order_id"], how='inner')
df= df.join(sellers_df, ['seller_id'], how='inner')

print((df.count(), len(df.columns)))

(117328, 39)


In [0]:
max_date_df = df.select( max("order_purchase_timestamp")).first()
max_date = max_date_df.asDict()["max(order_purchase_timestamp)"]
print(max_date)

2018-09-03 09:06:57


In [0]:
rfm_table = df.groupby('customer_unique_id').agg(max('order_purchase_timestamp').alias('Recency'),
                                                 count('product_id').alias('Frequancy'),
                                                 sum('payment_value').alias('Monetary'))
rfm_table = rfm_table.withColumn('Recency', datediff( lit(max_date),col('Recency'))) 
rfm_table.limit(10).toPandas()

Unnamed: 0,customer_unique_id,Recency,Frequancy,Monetary
0,0005e1862207bf6ccc02e4228effd9a0,548,1,150.12
1,0006fdc98a402fceb4eb0ee528f6a8d4,412,1,29.0
2,00090324bbad0e9342388303bb71ba0a,163,1,63.66
3,000c8bdb58a29e7115cfc257230fb21b,265,1,29.0
4,00115fc7123b5310cf6d3a3aa932699e,590,1,76.11
5,0011c98589159d6149979563c504cb21,394,1,117.94
6,001a3a8e11d76c9a366c31a4aa2cc529,105,1,24.23
7,001deb796b28a3a128d6113857569aa4,73,1,63.33
8,001f3c4211216384d5fe59b041ce1461,533,1,35.84
9,0023557a94bef0038066b5d1b3dc763e,159,1,107.44


In [0]:
qd =  QuantileDiscretizer(numBuckets=4, inputCols=["Recency", "Frequancy", "Monetary"],
                          outputCols=["r_score", "f_score", "m_score"])
rfm_table = qd.fit(rfm_table).transform(rfm_table)
rfm_table.limit(10).toPandas()

Unnamed: 0,customer_unique_id,Recency,Frequancy,Monetary,r_score,f_score,m_score
0,0005e1862207bf6ccc02e4228effd9a0,548,1,150.12,3.0,1.0,2.0
1,0006fdc98a402fceb4eb0ee528f6a8d4,412,1,29.0,3.0,1.0,0.0
2,00090324bbad0e9342388303bb71ba0a,163,1,63.66,1.0,1.0,0.0
3,000c8bdb58a29e7115cfc257230fb21b,265,1,29.0,2.0,1.0,0.0
4,00115fc7123b5310cf6d3a3aa932699e,590,1,76.11,3.0,1.0,1.0
5,0011c98589159d6149979563c504cb21,394,1,117.94,3.0,1.0,2.0
6,001a3a8e11d76c9a366c31a4aa2cc529,105,1,24.23,0.0,1.0,0.0
7,001deb796b28a3a128d6113857569aa4,73,1,63.33,0.0,1.0,0.0
8,001f3c4211216384d5fe59b041ce1461,533,1,35.84,3.0,1.0,0.0
9,0023557a94bef0038066b5d1b3dc763e,159,1,107.44,1.0,1.0,1.0


In [0]:
rfm_table = rfm_table.withColumn('r_score', (-1)*(col("r_score") - 4))
rfm_table = rfm_table.withColumn('f_score', col('f_score') + 1 )
rfm_table = rfm_table.withColumn('m_score', col('m_score') + 1 )

rfm_table.limit(10).toPandas()

Unnamed: 0,customer_unique_id,Recency,Frequancy,Monetary,r_score,f_score,m_score
0,0005e1862207bf6ccc02e4228effd9a0,548,1,150.12,1.0,2.0,3.0
1,0006fdc98a402fceb4eb0ee528f6a8d4,412,1,29.0,1.0,2.0,1.0
2,00090324bbad0e9342388303bb71ba0a,163,1,63.66,3.0,2.0,1.0
3,000c8bdb58a29e7115cfc257230fb21b,265,1,29.0,2.0,2.0,1.0
4,00115fc7123b5310cf6d3a3aa932699e,590,1,76.11,1.0,2.0,2.0
5,0011c98589159d6149979563c504cb21,394,1,117.94,1.0,2.0,3.0
6,001a3a8e11d76c9a366c31a4aa2cc529,105,1,24.23,4.0,2.0,1.0
7,001deb796b28a3a128d6113857569aa4,73,1,63.33,4.0,2.0,1.0
8,001f3c4211216384d5fe59b041ce1461,533,1,35.84,1.0,2.0,1.0
9,0023557a94bef0038066b5d1b3dc763e,159,1,107.44,3.0,2.0,2.0


In [0]:
def customer_segmantation(rfm_score):
    
    if rfm_score == 444:
        return 'VIP'
    
    elif rfm_score >= 433 and rfm_score < 444:
        return 'Muito Leal'

    elif  rfm_score >=421 and rfm_score< 433:
        return 'Lealdade em Potencial'

    elif rfm_score>=344 and rfm_score < 421:
        return 'Novo Cliente'

    elif rfm_score>=323 and rfm_score<344:
        return 'Cliente em Potencial'

    elif rfm_score>=224 and rfm_score<311:
        return 'Alto Risco de Rotatividade' 

    else:
        return 'Cliente Perdido' 

In [0]:
customer_segmantation_udf = udf(customer_segmantation,  StringType())

rfm_table = rfm_table.withColumn('rfm_score', (100 * col("r_score") + 10 * col("f_score") + col("m_score")) )
rfm_table = rfm_table.withColumn('customer_segmantation', customer_segmantation_udf(col("rfm_score")) )

rfm_table.limit(20).toPandas()

Unnamed: 0,customer_unique_id,Recency,Frequancy,Monetary,r_score,f_score,m_score,rfm_score,customer_segmantation
0,0005e1862207bf6ccc02e4228effd9a0,548,1,150.12,1.0,2.0,3.0,123.0,Cliente Perdido
1,0006fdc98a402fceb4eb0ee528f6a8d4,412,1,29.0,1.0,2.0,1.0,121.0,Cliente Perdido
2,00090324bbad0e9342388303bb71ba0a,163,1,63.66,3.0,2.0,1.0,321.0,Cliente Perdido
3,000c8bdb58a29e7115cfc257230fb21b,265,1,29.0,2.0,2.0,1.0,221.0,Cliente Perdido
4,00115fc7123b5310cf6d3a3aa932699e,590,1,76.11,1.0,2.0,2.0,122.0,Cliente Perdido
5,0011c98589159d6149979563c504cb21,394,1,117.94,1.0,2.0,3.0,123.0,Cliente Perdido
6,001a3a8e11d76c9a366c31a4aa2cc529,105,1,24.23,4.0,2.0,1.0,421.0,Lealdade em Potencial
7,001deb796b28a3a128d6113857569aa4,73,1,63.33,4.0,2.0,1.0,421.0,Lealdade em Potencial
8,001f3c4211216384d5fe59b041ce1461,533,1,35.84,1.0,2.0,1.0,121.0,Cliente Perdido
9,0023557a94bef0038066b5d1b3dc763e,159,1,107.44,3.0,2.0,2.0,322.0,Cliente Perdido


Funções para criação da dimensão tempo

In [0]:
def get_month_name (x):
    if x == 1:
        return "Janeiro"
    elif x == 2:
        return "Fevereiro"
    elif x == 3:
        return "Março"
    elif x == 4:
        return "Abril"
    elif x == 5:
        return "Maio"
    elif x == 6:
        return "Junho"
    elif x == 7:
        return "Julho"
    elif x == 8:
        return "Agosto"
    elif x == 9:
        return "Setembro"
    elif x == 10:
        return "Outubro"
    elif x == 11:
        return "Novembro"
    elif x == 12:
        return "Dezembro"

get_month_name_udf = udf(get_month_name, StringType())

def get_year_month (year, month):
    month = str(month) if month >= 10 else "0" + str(month) 
    return str(year) + '-' + month

get_year_month_udf = udf(get_year_month, StringType())

def get_is_weekend (x):
    if x in [1,7]:
        return 1
    else:
        return 0
    
get_is_weekend_udf = udf(get_is_weekend, IntegerType())

def get_day_of_week (x):
    if x == 1:
        return 'Domingo'
    elif x == 2:
        return 'Segunda-Feira'
    elif x == 3:
        return 'Terça-Feira'
    elif x == 4:
        return 'Quarta-Feira'
    elif x == 5:
        return 'Quinta-feira'
    elif x == 6:
        return 'Sexta-Feira'
    elif x == 7:
        return 'Sábado'
    
get_day_of_week_udf = udf(get_day_of_week, StringType())

Funções para dimensões com "slowly dimension change"

In [0]:
def verify_new_str_value (idsk, value, new_value):
    value = str(value)
    new_value = str(new_value)

    if not idsk:
        return new_value
    else:
        return value

def verify_new_double_value (idsk, value, new_value):
    try:
        value = float(value)
    except:
        value = None
    
    try:
        new_value = float(new_value)
    except:
        new_value = None

    if not idsk:
        return new_value
    else:
        return value
    
def verify_new_int_value (idsk, value, new_value):
    try:
        value = int(value)
    except:
        value = None
    
    try:
        new_value = int(new_value)
    except:
        new_value = None

    if not idsk:
        return new_value
    else:
        return value

verify_new_value_str_udf = udf(verify_new_str_value, StringType())
verify_new_value_int_udf = udf(verify_new_int_value, IntegerType())
verify_new_value_double_udf = udf(verify_new_double_value, DoubleType())

def verify_INICIO (inicio):
    if not inicio:
        return datetime.date.today().strftime("%Y-%m-%d")
    else:
        return inicio
    
verify_INICIO_udf = udf(verify_INICIO, StringType())

In [0]:
def verify_new_product_FIM (fim, idsk, product_category, product_category_new, product_name_lenght, product_name_lenght_new,
                            product_description_lenght, product_description_lenght_new, product_photos_qty, product_photos_qty_new,
                            product_weight_g, product_weight_g_new, product_length_cm, product_length_cm_new, product_height_cm,
                            product_height_cm_new, product_width_cm, product_width_cm_new):
    
    product_category = product_category if product_category and product_category != 'None' else None
    product_category_new = product_category_new if product_category_new and product_category_new != 'None' else None

    product_name_lenght = int(product_name_lenght) if product_name_lenght else None
    product_name_lenght_new = int(product_name_lenght_new) if product_name_lenght_new else None
    product_description_lenght = int(product_description_lenght) if product_description_lenght else None
    product_description_lenght_new = int(product_description_lenght_new) if product_description_lenght_new else None
    product_photos_qty = int(product_photos_qty) if product_photos_qty else None
    product_photos_qty_new = int(product_photos_qty_new) if product_photos_qty_new else None

    product_weight_g = float(product_weight_g) if product_weight_g else None
    product_weight_g_new = float(product_weight_g_new) if product_weight_g_new else None
    product_length_cm= float(product_length_cm) if product_length_cm else None
    product_length_cm_new = float(product_length_cm_new) if product_length_cm_new else None
    product_height_cm = float(product_height_cm) if product_height_cm else None
    product_height_cm_new = float(product_height_cm_new) if product_height_cm_new else None
    product_width_cm = float(product_width_cm) if product_width_cm else None
    product_width_cm_new = float(product_width_cm_new) if product_width_cm_new else None
    
    if ( product_category != product_category_new or product_name_lenght != product_name_lenght_new or product_description_lenght != product_description_lenght_new or product_photos_qty != product_photos_qty_new or product_weight_g != product_weight_g_new or product_length_cm != product_length_cm_new or product_height_cm != product_height_cm_new or product_width_cm != product_width_cm_new) and idsk:
        return datetime.date.today().strftime("%Y-%m-%d")
    else:
        return None


verify_new_product_FIM_udf = udf(verify_new_product_FIM, StringType())

In [0]:
def verify_new_seller_FIM (fim, idsk, seller_city, seller_city_new, seller_state, seller_state_new, seller_zip_code_prefix, seller_zip_code_prefix_new):

    seller_city = seller_city if seller_city and seller_city != 'None' else None
    seller_city_new = seller_city_new if seller_city_new and seller_city_new != 'None' else None

    seller_state = seller_state if seller_state and seller_state != 'None' else None
    seller_state_new = seller_state_new if seller_state_new and seller_state_new != 'None' else None

    seller_zip_code_prefix = seller_zip_code_prefix if seller_zip_code_prefix and seller_zip_code_prefix != 'None' else None
    seller_zip_code_prefix_new = seller_zip_code_prefix_new if seller_zip_code_prefix_new and seller_zip_code_prefix_new != 'None' else None

    if (seller_city != seller_city_new or seller_state != seller_state_new or seller_zip_code_prefix != seller_zip_code_prefix_new) and idsk:
        return datetime.date.today().strftime("%Y-%m-%d")
    else:
        return None


verify_new_seller_FIM_udf = udf(verify_new_seller_FIM, StringType())

Função para lidar com códigos postais com zero a esquerda

In [0]:
def convert_zip_code (x):
    if x:
        return str(int(float(x))).rjust(5,'0')
    else:
        return x

convert_zip_code_udf = udf(convert_zip_code, StringType())

def verify_string_number (x):
    if x:
        if x.isnumeric():
            return 'STRINGNUMBER' + x
        else:
            return x
    else:
        return x

verify_string_number_udf = udf(verify_string_number, StringType())

def remove_STRINGNUMBER (x):
    if x:
        if 'STRINGNUMBER' in x:
            return x.replace('STRINGNUMBER','')
        else:
            return x
    else:
        return x

remove_STRINGNUMBER_udf = udf(remove_STRINGNUMBER, StringType())

Função para realizar operação com datas

In [0]:
def datediff_cond (x,y):
    if not x or not y:
        return None
    else:
        date1 = datetime.datetime.strptime(x, "%Y-%m-%d %H:%M:%S") if type(x) == str else x
        date2 = datetime.datetime.strptime(y, "%Y-%m-%d %H:%M:%S") if type(y) == str else y
        delta = date1 - date2
        return delta.days
    
datediff_cond_udf = udf(datediff_cond, IntegerType())

### Modelagem de dados para DataWarehouse

Esse datawarehouse usam as colunas novas testadas. Além disso, diferente do que foi feito nos testes, nenhuma linha foi excluída

#### Cria dimensão Tempo

In [0]:
start_date = '2016-01-01'
end_date = datetime.date.today().strftime("%Y-%m-%d")

dates = pd.date_range(start=start_date, end=end_date, freq = '1H')
datetimes = [date.to_pydatetime() for date in dates]

dw_time = spark.createDataFrame(datetimes, TimestampType())
dw_time = dw_time.withColumnRenamed("value", "full_date")

dw_time.limit(20).toPandas()

Unnamed: 0,full_date
0,2016-01-01 00:00:00
1,2016-01-01 01:00:00
2,2016-01-01 02:00:00
3,2016-01-01 03:00:00
4,2016-01-01 04:00:00
5,2016-01-01 05:00:00
6,2016-01-01 06:00:00
7,2016-01-01 07:00:00
8,2016-01-01 08:00:00
9,2016-01-01 09:00:00


In [0]:
dw_time = dw_time.select( "full_date", year("full_date").alias('year'), month("full_date").alias('month'), 
                         dayofmonth("full_date").alias('day'), hour("full_date").alias("hour") ,
                         dayofweek("full_date").alias('dayofweek'))

dw_time = dw_time.withColumn("year_month", get_year_month_udf(col("year"),col("month")))
dw_time = dw_time.withColumn("is_weekend", get_is_weekend_udf(col('dayofweek')))
dw_time = dw_time.withColumn("day_of_week", get_day_of_week_udf(col('dayofweek')))
dw_time = dw_time.withColumn("month_name", get_month_name_udf(col("month")))

windowSpec  = Window.partitionBy("full_date").orderBy("full_date")
dw_time = dw_time.withColumn("IDSK1", row_number().over(windowSpec))
windowSpec  = Window.partitionBy("IDSK1").orderBy("full_date")
dw_time = dw_time.withColumn("IDSK", row_number().over(windowSpec))

dw_time = dw_time.select(["IDSK", "full_date", "year", "month", "day", "hour", "year_month", "month_name" , "is_weekend",
                          "day_of_week"])

dw_time = dw_time.withColumn("IDSK", col("IDSK").cast(IntegerType()))
dw_time = dw_time.withColumn("year", col("year").cast(IntegerType()))
dw_time = dw_time.withColumn("month", col("month").cast(IntegerType()))
dw_time = dw_time.withColumn("day", col("day").cast(IntegerType()))
dw_time = dw_time.withColumn("hour", col("hour").cast(IntegerType()))
dw_time = dw_time.withColumn("is_weekend", col("is_weekend").cast(IntegerType()))
dw_time = dw_time.withColumn("full_date", col("full_date").cast(TimestampType()))

dw_time.limit(27).toPandas()

dw_time.write.mode('overwrite').saveAsTable("db_eccommerce.dw_time")

#### Cria dimensão Produto com a característica de "slowly changing dimensions"

In [0]:
dw_products = products_df.select(products_df.columns)

dw_products = dw_products.withColumn("product_category", reduce_categories_udf(col("product_category_name")))
dw_products = dw_products.select(['product_id', "product_category", 'product_name_lenght', 'product_description_lenght', 
                                  'product_photos_qty', 'product_weight_g', 'product_length_cm', 'product_height_cm', 
                                  'product_width_cm']) 

if (dw_products_old.count() > 0):
    cols = dw_products_old.columns
    
    dw_products = dw_products.withColumn("product_name_lenght", col("product_name_lenght").cast(IntegerType()))
    dw_products = dw_products.withColumn("product_description_lenght", col("product_description_lenght").cast(IntegerType()))
    dw_products = dw_products.withColumn("product_photos_qty", col("product_photos_qty").cast(IntegerType()))
    dw_products = dw_products.withColumn("product_weight_g", col("product_weight_g").cast(DoubleType()))
    dw_products = dw_products.withColumn("product_length_cm", col("product_length_cm").cast(DoubleType()))
    dw_products = dw_products.withColumn("product_height_cm", col("product_height_cm").cast(DoubleType()))
    dw_products = dw_products.withColumn("product_width_cm", col("product_width_cm").cast(DoubleType()))

    dw_products_new = dw_products.select(dw_products.columns)
    dw_products = dw_products_old.select(dw_products_old.columns)

    dw_products_new = dw_products_new.withColumn("INICIO", lit(datetime.date.today().strftime("%Y-%m-%d")).cast(StringType()))
    dw_products_new = dw_products_new.withColumn("FIM",lit(None).cast(StringType()) )

    dw_products.write.mode('overwrite').saveAsTable("db_eccommerce.dw_products")
    dw_products_new.createOrReplaceTempView("dw_products_new")

    dw_products = spark.sql("""insert into db_eccommerce.dw_products (product_id,product_category,product_name_lenght,
                            product_description_lenght, product_photos_qty,product_weight_g,product_length_cm,product_height_cm,
                            product_width_cm, INICIO, FIM ,IDSK)
                            select pn.product_id, pn.product_category, pn.product_name_lenght, pn.product_description_lenght,
                            pn.product_photos_qty, pn.product_weight_g, pn.product_length_cm, pn.product_height_cm, pn.product_width_cm, pn.INICIO, pn.FIM ,
                            ( (select max((cast(IDSK as INT))) from db_eccommerce.dw_products) + ROW_NUMBER() OVER (
                                ORDER BY pn.product_id
                            ) ) as IDSK
                            from dw_products_new pn
                            inner join db_eccommerce.dw_products p on p.product_id = pn.product_id and p.FIM is null
                            where
                            pn.product_category <> p.product_category
                            or pn.product_name_lenght <> p.product_name_lenght
                            or pn.product_description_lenght <> p.product_description_lenght
                            or pn.product_photos_qty <> p.product_photos_qty
                            or pn.product_weight_g <> p.product_weight_g
                            or pn.product_length_cm <> p.product_length_cm
                            or pn.product_height_cm <> p.product_height_cm
                            or pn.product_width_cm <>  p.product_width_cm               
                            """)
    
    dw_products = spark.sql("""insert into db_eccommerce.dw_products (product_id,product_category,product_name_lenght,
                            product_description_lenght, product_photos_qty,product_weight_g,product_length_cm,product_height_cm,
                            product_width_cm, INICIO, FIM ,IDSK)
                            select pn.product_id, pn.product_category, pn.product_name_lenght, pn.product_description_lenght,
                            pn.product_photos_qty, pn.product_weight_g, pn.product_length_cm, pn.product_height_cm, pn.product_width_cm, pn.INICIO, pn.FIM ,
                            ( (select max((cast(IDSK as INT))) from db_eccommerce.dw_products) + ROW_NUMBER() OVER (
                                ORDER BY pn.product_id
                            ) ) as IDSK
                            from dw_products_new pn
                            where pn.product_id  not in (select product_id from db_eccommerce.dw_products)                  
                            """)

    dw_products = sqlContext.table("db_eccommerce.dw_products")

    dw_products_new = dw_products_new.select(col('product_id'),
                                     col('product_category').alias('product_category_new'),
                                     col('product_name_lenght').alias('product_name_lenght_new'),
                                     col('product_description_lenght').alias('product_description_lenght_new'),
                                     col('product_photos_qty').alias('product_photos_qty_new'),
                                     col('product_weight_g').alias('product_weight_g_new'),
                                     col('product_length_cm').alias('product_length_cm_new'),
                                     col('product_height_cm').alias('product_height_cm_new'),
                                     col('product_width_cm').alias('product_width_cm_new'))
    
    
    dw_products = dw_products_new.join(dw_products, ['product_id'], how = 'right')

    dw_products = dw_products.withColumn( "INICIO", verify_INICIO_udf(col("INICIO")) )
    dw_products = dw_products.withColumn( "FIM", verify_new_product_FIM_udf(  col("FIM"), col("IDSK"),
                                                                            col("product_category"), col("product_category_new"), 
                                                                            col("product_name_lenght"), col("product_name_lenght_new"),
                                                                            col("product_description_lenght"), col("product_description_lenght_new"), 
                                                                            col("product_photos_qty"), col("product_photos_qty_new"),
                                                                            col("product_weight_g"), col("product_weight_g_new"), 
                                                                            col("product_length_cm"), col("product_length_cm_new"), 
                                                                            col("product_height_cm"), col("product_height_cm_new"), 
                                                                            col("product_width_cm"), col("product_width_cm_new") ) )

    dw_products = dw_products.withColumn("product_category", verify_new_value_str_udf(col("IDSK"), col("product_category"), col("product_category_new")) )
    dw_products = dw_products.withColumn("product_name_lenght", verify_new_value_int_udf(col("IDSK"), col("product_name_lenght"), col("product_name_lenght_new")) )
    dw_products = dw_products.withColumn("product_description_lenght", verify_new_value_int_udf(col("IDSK"), col("product_description_lenght"), col("product_description_lenght_new")) )
    dw_products = dw_products.withColumn("product_photos_qty", verify_new_value_int_udf(col("IDSK"), col("product_photos_qty"), col("product_photos_qty_new")) )
    dw_products = dw_products.withColumn("product_weight_g", verify_new_value_double_udf(col("IDSK"), col("product_weight_g"), col("product_weight_g_new")) )
    dw_products = dw_products.withColumn("product_length_cm", verify_new_value_double_udf(col("IDSK"), col("product_length_cm"), col("product_length_cm_new")) )
    dw_products = dw_products.withColumn("product_height_cm", verify_new_value_double_udf(col("IDSK"), col("product_height_cm"), col("product_height_cm_new")) )
    dw_products = dw_products.withColumn("product_width_cm", verify_new_value_double_udf(col("IDSK"), col("product_width_cm"), col("product_width_cm_new")) )
    
    dw_products = dw_products.select(cols)

else:
    dw_products = dw_products.withColumn("INICIO", lit(datetime.date(2016,1,1)).cast("string"))
    dw_products = dw_products.withColumn("FIM",lit(None).cast(StringType()) )

    windowSpec  = Window.orderBy("product_id")
    dw_products = dw_products.withColumn("IDSK", row_number().over(windowSpec))

dw_products = dw_products.withColumn("product_name_lenght", col("product_name_lenght").cast(IntegerType()))
dw_products = dw_products.withColumn("product_description_lenght", col("product_description_lenght").cast(IntegerType()))
dw_products = dw_products.withColumn("product_photos_qty", col("product_photos_qty").cast(IntegerType()))
dw_products = dw_products.withColumn("product_weight_g", col("product_weight_g").cast(DoubleType()))
dw_products = dw_products.withColumn("product_length_cm", col("product_length_cm").cast(DoubleType()))
dw_products = dw_products.withColumn("product_height_cm", col("product_height_cm").cast(DoubleType()))
dw_products = dw_products.withColumn("product_width_cm", col("product_width_cm").cast(DoubleType()))

cols = dw_products.columns
dw_products = dw_products.select(cols)  

dw_products.limit(10).toPandas()

Unnamed: 0,IDSK,product_id,product_category,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm,INICIO,FIM
0,1,00066f42aeeb9f3007548bb9d3f33c38,Produtos de Beleza e Higiene,53,596,6,300.0,20.0,16.0,16.0,2016-01-01,
1,2,00088930e925c41fd95ebfe695fd2655,Eletrônicos,56,752,4,1225.0,55.0,10.0,26.0,2016-01-01,
2,3,0009406fd7479715e4bef61dd91f2462,Mobília,50,266,2,300.0,45.0,15.0,35.0,2016-01-01,
3,4,000b8f95fcb9e0096488278317764d19,Acessórios Doméstico,25,364,3,550.0,19.0,24.0,12.0,2016-01-01,
4,5,000d9be29b5207b54e86aa1b1ac54872,Eletrônicos,48,613,4,250.0,22.0,11.0,15.0,2016-01-01,
5,6,0011c512eb256aa0dbbb544d8dffcf6e,Eletrônicos,58,177,1,100.0,16.0,15.0,16.0,2016-01-01,
6,7,00126f27c813603687e6ce486d909d01,Fashion,42,2461,1,700.0,25.0,5.0,15.0,2016-01-01,
7,8,001795ec6f1b187d37335e1c4704762e,Eletrônicos,53,274,1,600.0,30.0,20.0,20.0,2016-01-01,
8,9,001b237c0e9bb435f2e54071129237e9,Mobília,42,253,1,6000.0,40.0,4.0,30.0,2016-01-01,
9,10,001b72dfd63e9833e8c02742adf472e3,Mobília,45,520,3,600.0,26.0,8.0,22.0,2016-01-01,


In [0]:
print((dw_products.count(), len(dw_products.columns)))
dw_products.write.mode('overwrite').saveAsTable("db_eccommerce.dw_products")

(32951, 12)


In [0]:
%sql
select * from db_eccommerce.dw_products

IDSK,product_id,product_category,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm,INICIO,FIM
1,00066f42aeeb9f3007548bb9d3f33c38,Produtos de Beleza e Higiene,53.0,596.0,6.0,300.0,20.0,16.0,16.0,2016-01-01,
2,00088930e925c41fd95ebfe695fd2655,Eletrônicos,56.0,752.0,4.0,1225.0,55.0,10.0,26.0,2016-01-01,
3,0009406fd7479715e4bef61dd91f2462,Mobília,50.0,266.0,2.0,300.0,45.0,15.0,35.0,2016-01-01,
4,000b8f95fcb9e0096488278317764d19,Acessórios Doméstico,25.0,364.0,3.0,550.0,19.0,24.0,12.0,2016-01-01,
5,000d9be29b5207b54e86aa1b1ac54872,Eletrônicos,48.0,613.0,4.0,250.0,22.0,11.0,15.0,2016-01-01,
6,0011c512eb256aa0dbbb544d8dffcf6e,Eletrônicos,58.0,177.0,1.0,100.0,16.0,15.0,16.0,2016-01-01,
7,00126f27c813603687e6ce486d909d01,Fashion,42.0,2461.0,1.0,700.0,25.0,5.0,15.0,2016-01-01,
8,001795ec6f1b187d37335e1c4704762e,Eletrônicos,53.0,274.0,1.0,600.0,30.0,20.0,20.0,2016-01-01,
9,001b237c0e9bb435f2e54071129237e9,Mobília,42.0,253.0,1.0,6000.0,40.0,4.0,30.0,2016-01-01,
10,001b72dfd63e9833e8c02742adf472e3,Mobília,45.0,520.0,3.0,600.0,26.0,8.0,22.0,2016-01-01,


Faz a média entre latitude e longitude para cada código postal. Esses dados serão usados na dimensão Vendedor e Cliente

In [0]:
geolocations_df = geolocations_df.groupby('geolocation_zip_code_prefix').agg(avg('geolocation_lat').alias('geolocation_lat'),
                                                                             avg('geolocation_lng').alias('geolocation_lng'))
geolocations_df.limit(10).toPandas()

Unnamed: 0,geolocation_zip_code_prefix,geolocation_lat,geolocation_lng
0,2053,-23.513228,-46.602793
1,2943,-23.476741,-46.724994
2,3442,-23.543788,-46.5399
3,3511,-23.534921,-46.527164
4,3904,-23.578208,-46.518503
5,4319,-23.647404,-46.63657
6,4438,-23.67657,-46.669897
7,5176,-23.477063,-46.711912
8,5163,-23.481597,-46.764806
9,5422,-23.566922,-46.687446


#### Cria dimensão Vendedor com a característica de "slowly changing dimensions"

In [0]:
dw_sellers = sellers_df.select(sellers_df.columns)
geolocations_df = geolocations_df.withColumnRenamed("geolocation_zip_code_prefix", "seller_zip_code_prefix")

dw_sellers = dw_sellers.join(geolocations_df, ["seller_zip_code_prefix"], how='left')
dw_sellers = dw_sellers.select(["seller_id","seller_zip_code_prefix","seller_city","seller_state", "geolocation_lat","geolocation_lng"])

if dw_sellers_old.count() > 0:
    cols = dw_sellers_old.columns
    
    dw_sellers = dw_sellers.withColumn("geolocation_lat", col("geolocation_lat").cast(DoubleType()) )
    dw_sellers = dw_sellers.withColumn("geolocation_lng", col("geolocation_lng").cast(DoubleType()) )

    dw_sellers_new = dw_sellers.select(dw_sellers.columns)

    dw_sellers = dw_sellers_old.select(dw_sellers_old.columns)

    dw_sellers_new = dw_sellers_new.withColumn("INICIO", lit(datetime.date.today().strftime("%Y-%m-%d")).cast(StringType()))
    dw_sellers_new = dw_sellers_new.withColumn("FIM",lit(None).cast(StringType()) )

    dw_sellers = dw_sellers.withColumn("seller_city",verify_string_number_udf(col("seller_city")) )
    dw_sellers = dw_sellers.withColumn("seller_state",verify_string_number_udf(col("seller_state")) )
    dw_sellers_new = dw_sellers_new.withColumn("seller_city",verify_string_number_udf(col("seller_city")) )
    dw_sellers_new = dw_sellers_new.withColumn("seller_state",verify_string_number_udf(col("seller_state")) )

    dw_sellers.write.mode("overwrite").saveAsTable("db_eccommerce.dw_sellers")
    dw_sellers_new.createOrReplaceTempView("dw_sellers_new")

    dw_sellers = spark.sql("""insert into db_eccommerce.dw_sellers (seller_id, seller_city, seller_state, seller_zip_code_prefix, 
                           geolocation_lat, geolocation_lng, INICIO, FIM ,IDSK)
                            select sn.seller_id, sn.seller_city, sn.seller_state, sn.seller_zip_code_prefix, sn.geolocation_lat, sn.geolocation_lng, sn.INICIO, sn.FIM , 
                            ( (select max((cast(IDSK as INT))) from db_eccommerce.dw_sellers) + ROW_NUMBER() OVER (
                                ORDER BY sn.seller_id
                            ) ) as IDSK
                            from dw_sellers_new sn
                            inner join db_eccommerce.dw_sellers s on s.seller_id = sn.seller_id and s.FIM is null
                            where
                            sn.seller_city <> s.seller_city
                            or sn.seller_state <> s.seller_state
                            or sn.seller_zip_code_prefix <> lpad( cast( cast( s.seller_zip_code_prefix as INT) as STRING), 5, '0');              
                            """)
    
    dw_sellers = spark.sql("""insert into db_eccommerce.dw_sellers (seller_id, seller_city, seller_state, seller_zip_code_prefix, 
                           geolocation_lat, geolocation_lng, INICIO, FIM ,IDSK)
                            select sn.seller_id, sn.seller_city, sn.seller_state, sn.seller_zip_code_prefix, sn.geolocation_lat, sn.geolocation_lng, sn.INICIO, sn.FIM ,
                            ( (select max((cast(IDSK as INT))) from db_eccommerce.dw_sellers) + ROW_NUMBER() OVER (
                                ORDER BY sn.seller_id
                            ) ) as IDSK
                            from dw_sellers_new sn
                            where sn.seller_id  not in (select seller_id from db_eccommerce.dw_sellers)                    
                           """)
    
    dw_sellers = sqlContext.table("db_eccommerce.dw_sellers")

    dw_sellers = dw_sellers.withColumn("seller_city", remove_STRINGNUMBER_udf(col("seller_city")))
    dw_sellers = dw_sellers.withColumn("seller_state", remove_STRINGNUMBER_udf(col("seller_state")))
    dw_sellers = dw_sellers.withColumn("seller_zip_code_prefix", convert_zip_code_udf(col("seller_zip_code_prefix")))

    dw_sellers_new = dw_sellers_new.withColumn("seller_city", remove_STRINGNUMBER_udf(col("seller_city")))
    dw_sellers_new = dw_sellers_new.withColumn("seller_state", remove_STRINGNUMBER_udf(col("seller_state")))
    dw_sellers_new = dw_sellers_new.withColumn("seller_zip_code_prefix", convert_zip_code_udf(col("seller_zip_code_prefix")))

    dw_sellers_new = dw_sellers_new.select(col('seller_id'),
                                     col('seller_zip_code_prefix').alias('seller_zip_code_prefix_new'),
                                     col('seller_city').alias('seller_city_new'),
                                     col('seller_state').alias('seller_state_new'),
                                     col('geolocation_lat').alias('geolocation_lat_new'),
                                     col('geolocation_lng').alias('geolocation_lng_new'))

    dw_sellers = dw_sellers_new.join(dw_sellers, ['seller_id'], how = 'right')

    dw_sellers = dw_sellers.withColumn( "INICIO", verify_INICIO_udf(col("INICIO")) )
    dw_sellers = dw_sellers.withColumn( "FIM", verify_new_seller_FIM_udf(  col("FIM"), col("IDSK"),
                                                                            col("seller_zip_code_prefix"), col("seller_zip_code_prefix_new"), 
                                                                            col("seller_city"), col("seller_city_new"),
                                                                            col("seller_state"), col("seller_state_new") ) )

    dw_sellers = dw_sellers.withColumn("seller_zip_code_prefix", verify_new_value_str_udf(col("IDSK"), col("seller_zip_code_prefix"), col("seller_zip_code_prefix_new")) )
    dw_sellers = dw_sellers.withColumn("seller_city", verify_new_value_str_udf(col("IDSK"), col("seller_city"), col("seller_city_new")) )
    dw_sellers = dw_sellers.withColumn("seller_state", verify_new_value_str_udf(col("IDSK"), col("seller_state"), col("seller_state_new")) )
    dw_sellers = dw_sellers.withColumn("geolocation_lat", verify_new_value_double_udf(col("IDSK"), col("geolocation_lat"), col("geolocation_lat_new")) )
    dw_sellers = dw_sellers.withColumn("geolocation_lng", verify_new_value_double_udf(col("IDSK"), col("geolocation_lng"), col("geolocation_lng_new")) )
    
    dw_sellers = dw_sellers.select(cols)

else:
    dw_sellers = dw_sellers.withColumn("INICIO", lit(datetime.date(2016,1,1)).cast("string"))
    dw_sellers = dw_sellers.withColumn("FIM",lit(None).cast(StringType()) )

    windowSpec  = Window.orderBy("seller_id")
    dw_sellers = dw_sellers.withColumn("IDSK", row_number().over(windowSpec))

dw_sellers = dw_sellers.withColumn("geolocation_lat", col("geolocation_lat").cast(DoubleType()) )
dw_sellers = dw_sellers.withColumn("geolocation_lng", col("geolocation_lng").cast(DoubleType()) )

cols = dw_sellers.columns
dw_sellers = dw_sellers.select(cols)

dw_sellers.limit(10).toPandas()

(3095, 6)
(3095, 14)
(3095, 9)


Unnamed: 0,IDSK,seller_id,seller_city,seller_state,seller_zip_code_prefix,geolocation_lat,geolocation_lng,INICIO,FIM
0,76,062ce95fa2ad4dfaedfc79260130565f,lajeado,RS,95913,-29.44658,-51.961202,2016-01-01,
1,147,0b64bcdb0784abc139af04077d49a20e,canoas,RS,92420,-29.88533,-51.178237,2016-01-01,
2,174,0ea22c1cfbdc755f86b9b54b39c16043,sete lagoas,MG,35700,-19.457995,-44.248128,2016-01-01,
3,380,2009a095de2a2a41626f6c6d7722678d,sao jose do rio preto,SP,15025,-20.806707,-49.389165,2016-01-01,
4,488,297d5eccd19fa9a83b2630071ff105e4,curitiba,PR,80710,-25.428323,-49.299818,2016-01-01,
5,923,4d600e08ecbe08258c79e536c5a42fee,entre rios do oeste,PR,85988,-24.702966,-54.237448,2016-01-01,
6,1357,6eeed17989b0ae47c9f11ece6f38ea90,sao paulo,SP,4123,-23.610422,-46.626594,2016-01-01,
7,1490,791cfcfe22fe4a771ece27f90017da92,ribeirao preto,SP,14010,-21.17954,-47.808598,2016-01-01,
8,1747,8e6cc767478edae941d9bd9eb778d77a,araguari,MG,38442,-18.645099,-48.204993,2016-01-01,
9,1856,9803a40e82e45418ab7fb84091af5231,rio verde,GO,75901,-17.804501,-50.916495,2016-01-01,


In [0]:
print((dw_sellers.count(), len(dw_sellers.columns)))
dw_sellers.write.mode("overwrite").saveAsTable("db_eccommerce.dw_sellers")

(3095, 9)


#### Cria dimensão Cliente

In [0]:
dw_customers = customers_df.select(customers_df.columns)
geolocations_df = geolocations_df.withColumnRenamed("seller_zip_code_prefix", "customer_zip_code_prefix")

dw_customers = dw_customers.join(geolocations_df, ["customer_zip_code_prefix"], how='left')
dw_customers = dw_customers.join(rfm_table, ["customer_unique_id"], how='left')

dw_customers = dw_customers.select(['customer_id', 'customer_unique_id', 'customer_zip_code_prefix', 'customer_city', 
                                    'customer_state', 'geolocation_lat', 'geolocation_lng', 'customer_segmantation'])

if dw_customers_old.count() > 0:
    cols = dw_customers_old.columns
    dw_customers_new = dw_customers.select(dw_customers.columns)
    dw_customers = dw_customers.select(col('customer_id'),
                                       col('customer_unique_id').alias('customer_unique_id_new'),
                                       col('customer_zip_code_prefix').alias('customer_zip_code_prefix_new'),
                                       col('customer_city').alias('customer_city_new'),
                                       col('customer_state').alias('customer_state_new'),
                                       col('geolocation_lat').alias('geolocation_lat_new'),
                                       col('geolocation_lng').alias('geolocation_lng_new'),
                                       col('customer_segmantation').alias('customer_segmantation_new'))
    
    dw_customers = dw_customers.join(dw_customers_old, ['customer_id'], how = 'right')
    
    dw_customers = dw_customers.withColumn("IDSK", col("IDSK").cast(IntegerType()) )

    dw_customers = dw_customers.withColumn("customer_unique_id", verify_new_value_str_udf(col("IDSK"), col("customer_unique_id"), col("customer_unique_id_new")) )
    dw_customers = dw_customers.withColumn("customer_zip_code_prefix", verify_new_value_str_udf(col("IDSK"), col("customer_zip_code_prefix"), col("customer_zip_code_prefix_new")) )
    dw_customers = dw_customers.withColumn("customer_city", verify_new_value_str_udf(col("IDSK"), col("customer_city"), col("customer_city_new")) )
    dw_customers = dw_customers.withColumn("customer_state", verify_new_value_str_udf(col("IDSK"), col("customer_state"), col("customer_state_new")) )
    dw_customers = dw_customers.withColumn("geolocation_lat", verify_new_value_double_udf(col("IDSK"), col("geolocation_lat"), col("geolocation_lat_new")) )
    dw_customers = dw_customers.withColumn("geolocation_lng", verify_new_value_double_udf(col("IDSK"), col("geolocation_lng"), col("geolocation_lng_new")) )
    dw_customers = dw_customers.withColumn("customer_segmantation", verify_new_value_str_udf(col("IDSK"), col("customer_segmantation"), col("customer_segmantation_new")) )

    dw_customers = dw_customers.select(cols)

    dw_customers = dw_customers.withColumn("customer_city",verify_string_number_udf(col("customer_city")) )
    dw_customers = dw_customers.withColumn("customer_state",verify_string_number_udf(col("customer_state")) )
    dw_customers_new = dw_customers_new.withColumn("customer_city",verify_string_number_udf(col("customer_city")) )
    dw_customers_new = dw_customers_new.withColumn("customer_state",verify_string_number_udf(col("customer_state")) )

    dw_customers_new = dw_customers_new.withColumn("geolocation_lat", col("geolocation_lat").cast(DoubleType()) )
    dw_customers_new = dw_customers_new.withColumn("geolocation_lng", col("geolocation_lng").cast(DoubleType()) )

    dw_customers.write.mode("overwrite").saveAsTable("db_eccommerce.dw_customers")
    dw_customers_new.createOrReplaceTempView("dw_customers_new")

    dw_customers = spark.sql("""insert into db_eccommerce.dw_customers (customer_id, customer_unique_id, customer_zip_code_prefix, 
                             customer_city, customer_state, geolocation_lat, geolocation_lng, customer_segmantation , IDSK)
                            select cn.customer_id, cn.customer_unique_id, cn.customer_zip_code_prefix, cn.customer_city,
                            cn.customer_state, cn.geolocation_lat, cn.geolocation_lng, cn.customer_segmantation ,
                            ( (select max((cast(IDSK as INT))) from db_eccommerce.dw_customers) + ROW_NUMBER() OVER (
                                ORDER BY cn.customer_id
                            ) ) as IDSK
                            from dw_customers_new cn
                            where cn.customer_id  not in (select customer_id from db_eccommerce.dw_customers)                  
                            """)

    dw_customers = sqlContext.table("db_eccommerce.dw_customers")

    dw_customers = dw_customers.withColumn("customer_city", remove_STRINGNUMBER_udf(col("customer_city")))
    dw_customers = dw_customers.withColumn("customer_state", remove_STRINGNUMBER_udf(col("customer_state")))
    dw_customers = dw_customers.withColumn("customer_zip_code_prefix", convert_zip_code_udf(col("customer_zip_code_prefix")))
else:
    windowSpec  = Window.orderBy("customer_id")
    dw_customers = dw_customers.withColumn("IDSK", row_number().over(windowSpec))

cols = dw_customers.columns
dw_customers = dw_customers.select(cols)

dw_customers = dw_customers.withColumn("geolocation_lat", col("geolocation_lat").cast(DoubleType()) )
dw_customers = dw_customers.withColumn("geolocation_lng", col("geolocation_lng").cast(DoubleType()) )

dw_customers.limit(10).toPandas()

Unnamed: 0,IDSK,customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state,geolocation_lat,geolocation_lng,customer_segmantation
0,710,01d190d14b00073f76e0a5ec46166352,2e5dcf79b225e8d1673671db21933168,2925,sao paulo,SP,-23.501947,-46.6985,Alto Risco de Rotatividade
1,1459,03a7750fc7a7bfbd7a84b2f4f26b92f1,ae7e471f70f6fb521a3dc8770cefa369,83430,campina grande do sul,PR,-25.342398,-49.086043,Lealdade em Potencial
2,1689,04495037fc6899faffa41ba3bc4272b4,c611b2ddcec5427603ec76ba4c117373,15953,botelho,SP,-21.975891,-47.67509,Cliente em Potencial
3,1847,04b7d26bde4f2d2fee0043ef81e664b1,9ef6d1d9fdc6511e44eb6b7c68a765e9,38067,uberaba,MG,-19.728707,-47.935534,Cliente Perdido
4,1890,04cef6b920c0d8f16702cab269b59044,6d0b86c615a3aa7ef4a13555ea965ef7,3183,sao paulo,SP,-23.560357,-46.59086,Lealdade em Potencial
5,2373,06199a7981ec145069afeb0baf66a49b,70f023b13072d262c2d27b022dd6e9c4,6653,itapevi,SP,-23.547054,-46.929721,Lealdade em Potencial
6,2421,0632cb63610a5d7d5da9a4fb595dd101,df63b769c1d1fdecc40cfc343cea50c6,86047,londrina,PR,-23.346966,-51.171659,Lealdade em Potencial
7,2558,068d6956b4f2f9a39cfb807516f55ecb,8423f43e0049483163143ee6950c2cb7,21010,rio de janeiro,RJ,-22.823839,-43.306302,Cliente Perdido
8,2644,06cedf45bb3fda13ad8ac53fa73c39a3,95e4e31eda671dfa6983a83d7b5522c7,46430,guanambi,BA,-14.223207,-42.783398,Cliente Perdido
9,2750,0721e1c4b91bc6ded6579edace8659d3,575523447cb7272877b2840e4a4efb59,28695,papucaia,RJ,-22.602307,-42.761426,Cliente Perdido


In [0]:
print((dw_customers.count(), len(dw_customers.columns)))
dw_customers.write.mode("overwrite").saveAsTable("db_eccommerce.dw_customers")

(99441, 9)


#### Cria dimensão de Pedidos

In [0]:
dw_orders = orders_df.select(orders_df.columns)

dw_orders = dw_orders.withColumn('estimated_days', datediff_cond_udf(col('order_estimated_delivery_date'),col('order_purchase_timestamp'))) 
dw_orders = dw_orders.withColumn('arrival_days', datediff_cond_udf(col('order_delivered_customer_date'),col('order_purchase_timestamp'))) 
dw_orders = dw_orders.withColumn('shipping_days', datediff_cond_udf(col('order_delivered_customer_date'),col('order_delivered_carrier_date'))) 
dw_orders = dw_orders.withColumn('arrival_status',datediff_cond_udf(col('order_estimated_delivery_date'),col('order_delivered_customer_date'))) 

dw_orders = dw_orders.withColumn('arrival_status',get_arrival_status_udf(col('arrival_status'))) 

dw_orders = dw_orders.withColumn('estimated_delivery_rate', get_duration_status_udf(col('estimated_days')))
dw_orders = dw_orders.withColumn('arrival_delivery_rate', get_duration_status_udf(col('arrival_days'))) 
dw_orders = dw_orders.withColumn('shipping_delivery_rate', get_duration_status_udf(col('shipping_days')))


dw_orders.limit(20).toPandas()

Unnamed: 0,order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,estimated_days,arrival_days,shipping_days,arrival_status,estimated_delivery_rate,arrival_delivery_rate,shipping_delivery_rate
0,e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00,15,8.0,6.0,Em tempo,Rápido,Rápido,Muito Rápido
1,53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13 00:00:00,19,13.0,12.0,Em tempo,Duração OK,Rápido,Rápido
2,47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04 00:00:00,26,9.0,9.0,Em tempo,Devagar,Rápido,Rápido
3,949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15 00:00:00,26,13.0,9.0,Em tempo,Devagar,Rápido,Rápido
4,ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26 00:00:00,12,2.0,1.0,Em tempo,Rápido,Muito Rápido,Muito Rápido
5,a4591c265e18cb1dcee52889e2d8acc3,503740e9ca751ccdda7ba28e9ab8f608,delivered,2017-07-09 21:57:05,2017-07-09 22:10:13,2017-07-11 14:58:04,2017-07-26 10:57:55,2017-08-01 00:00:00,22,16.0,14.0,Em tempo,Duração OK,Duração OK,Rápido
6,136cce7faa42fdb2cefd53fdc79a6098,ed0271e0b7da060a393796590e7b737a,invoiced,2017-04-11 12:22:08,2017-04-13 13:25:17,,,2017-05-09 00:00:00,27,,,,Devagar,,
7,6514b8ad8028c9f2cc2374ded245783f,9bdf08b4b3b52b5526ff42d37d47f222,delivered,2017-05-16 13:10:30,2017-05-16 13:22:11,2017-05-22 10:07:46,2017-05-26 12:55:51,2017-06-07 00:00:00,21,9.0,4.0,Em tempo,Duração OK,Rápido,Muito Rápido
8,76c6e866289321a7c93b82b54852dc33,f54a9f0e6b351c431402b8461ea51999,delivered,2017-01-23 18:29:09,2017-01-25 02:50:47,2017-01-26 14:16:31,2017-02-02 14:08:10,2017-03-06 00:00:00,41,9.0,6.0,Em tempo,Devagar,Rápido,Muito Rápido
9,e69bfb5eb88e0ed6a785585b27e16dbf,31ad1d1b63eb9962463f764d4e6e0c9d,delivered,2017-07-29 11:55:02,2017-07-29 12:05:32,2017-08-10 19:45:24,2017-08-16 17:14:30,2017-08-23 00:00:00,24,18.0,5.0,Em tempo,Duração OK,Duração OK,Muito Rápido


In [0]:
dw_orders = dw_orders.withColumn('order_purchase_timestamp', to_timestamp(date_format(col("order_purchase_timestamp"), "yyyy-MM-dd HH") ))
dw_orders = dw_orders.withColumn('order_approved_at', to_timestamp(date_format(col('order_approved_at'), "yyyy-MM-dd HH") ))
dw_orders = dw_orders.withColumn('order_delivered_carrier_date', to_timestamp(date_format(col('order_delivered_carrier_date'), "yyyy-MM-dd HH") ))
dw_orders = dw_orders.withColumn('order_delivered_customer_date', to_timestamp(date_format(col('order_delivered_customer_date'), "yyyy-MM-dd HH") ))
dw_orders = dw_orders.withColumn('order_estimated_delivery_date', to_timestamp(date_format(col('order_estimated_delivery_date'), "yyyy-MM-dd HH") ))

dw_orders = dw_orders.join(dw_time, dw_orders.order_purchase_timestamp == dw_time.full_date , how='left')
dw_orders = dw_orders.select(['order_id', 'customer_id', 'order_status',  'IDSK', 'order_approved_at', 
                              'order_delivered_carrier_date', 'order_delivered_customer_date', 'order_estimated_delivery_date',
                              'estimated_days', 'arrival_days', 'shipping_days', 'arrival_status', 'estimated_delivery_rate',
                              'arrival_delivery_rate', 'shipping_delivery_rate' ])
dw_orders = dw_orders.withColumnRenamed("IDSK", "order_purchase_timestamp_id")

dw_orders = dw_orders.join(dw_time, dw_orders.order_approved_at == dw_time.full_date , how='left')
dw_orders = dw_orders.select(['order_id', 'customer_id', 'order_status',  'order_purchase_timestamp_id', 'IDSK', 
                              'order_delivered_carrier_date', 'order_delivered_customer_date', 'order_estimated_delivery_date',
                              'estimated_days', 'arrival_days', 'shipping_days', 'arrival_status', 'estimated_delivery_rate',
                              'arrival_delivery_rate', 'shipping_delivery_rate' ])
dw_orders = dw_orders.withColumnRenamed("IDSK", "order_approved_at_id")

dw_orders = dw_orders.join(dw_time, dw_orders.order_delivered_carrier_date == dw_time.full_date , how='left')
dw_orders = dw_orders.select(['order_id', 'customer_id', 'order_status',  'order_purchase_timestamp_id', 'order_approved_at_id', 
                              'IDSK', 'order_delivered_customer_date', 'order_estimated_delivery_date',
                              'estimated_days', 'arrival_days', 'shipping_days', 'arrival_status', 'estimated_delivery_rate',
                              'arrival_delivery_rate', 'shipping_delivery_rate', 'order_delivered_carrier_date' ])
dw_orders = dw_orders.withColumnRenamed("IDSK", "order_delivered_carrier_date_id")

dw_orders = dw_orders.join(dw_time, dw_orders.order_delivered_customer_date == dw_time.full_date , how='left')
dw_orders = dw_orders.select(['order_id', 'customer_id', 'order_status',  'order_purchase_timestamp_id', 'order_approved_at_id', 
                              'order_delivered_carrier_date_id', 'IDSK', 'order_estimated_delivery_date',
                              'estimated_days', 'arrival_days', 'shipping_days', 'arrival_status', 'estimated_delivery_rate',
                              'arrival_delivery_rate', 'shipping_delivery_rate', 'order_delivered_carrier_date' ])
dw_orders = dw_orders.withColumnRenamed("IDSK", "order_delivered_customer_date_id")

dw_orders = dw_orders.join(dw_time, dw_orders.order_estimated_delivery_date == dw_time.full_date , how='left')
dw_orders = dw_orders.select(['order_id', 'customer_id', 'order_status',  'order_purchase_timestamp_id', 'order_approved_at_id', 
                              'order_delivered_carrier_date_id', 'order_delivered_customer_date_id', 'IDSK',
                              'estimated_days', 'arrival_days', 'shipping_days', 'arrival_status', 'estimated_delivery_rate',
                              'arrival_delivery_rate', 'shipping_delivery_rate', 'order_delivered_carrier_date' ])
dw_orders = dw_orders.withColumnRenamed("IDSK", "order_estimated_delivery_date_id")

if dw_orders_old.count() > 0:
    cols = dw_orders.columns
    dw_orders_new = dw_orders.select(dw_orders.columns)
    dw_orders = dw_orders.select(col('order_id'),
                                 col('customer_id'), 
                                 col('order_status'),  
                                 col('order_purchase_timestamp_id'), 
                                 col('order_approved_at_id'), 
                                 col('order_delivered_carrier_date_id'), 
                                 col('order_delivered_customer_date_id'), 
                                 col('order_estimated_delivery_date_id'),
                                 col('estimated_days'), 
                                 col('arrival_days'), 
                                 col('shipping_days'),
                                 col('order_delivered_carrier_date'),
                                 col('arrival_status').alias('arrival_status_new'),
                                 col('estimated_delivery_rate').alias('estimated_delivery_rate_new'),
                                 col('arrival_delivery_rate').alias('arrival_delivery_rate_new'),
                                 col('shipping_delivery_rate').alias('shipping_delivery_rate_new'))
    
    dw_orders = dw_orders.join(dw_orders_old, ['order_id'], how = 'right')

    dw_orders = dw_orders.withColumn("IDSK", col("IDSK").cast(IntegerType()) )
    
    dw_orders = dw_orders.withColumn("arrival_status", verify_new_value_str_udf(col("IDSK"), col("arrival_status"), col("arrival_status_new")) )
    dw_orders = dw_orders.withColumn("estimated_delivery_rate", verify_new_value_str_udf(col("IDSK"), col("estimated_delivery_rate"), col("estimated_delivery_rate_new")) )
    dw_orders = dw_orders.withColumn("arrival_delivery_rate", verify_new_value_str_udf(col("IDSK"), col("arrival_delivery_rate"), col("arrival_delivery_rate_new")) )
    dw_orders = dw_orders.withColumn("shipping_delivery_rate", verify_new_value_str_udf(col("IDSK"), col("shipping_delivery_rate"), col("shipping_delivery_rate_new")) )

    dw_orders.write.mode("overwrite").saveAsTable("dw_orders")
    dw_orders_new.createOrReplaceTempView("dw_orders_new")

    dw_orders = spark.sql("""insert into dw_orders (order_id, customer_id, order_status,  order_purchase_timestamp_id,
                        order_approved_at_id, order_delivered_carrier_date_id, order_delivered_customer_date_id,
                        order_estimated_delivery_date_id, estimated_days, arrival_days, shipping_days, arrival_status, 
                        estimated_delivery_rate, arrival_delivery_rate, shipping_delivery_rate, order_delivered_carrier_date , IDSK)
                        select odn.order_id, odn.customer_id, odn.order_status,  odn.order_purchase_timestamp_id,
                        odn.order_approved_at_id, odn.order_delivered_carrier_date_id, odn.order_delivered_customer_date_id,
                        odn.order_estimated_delivery_date_id, odn.estimated_days, odn.arrival_days, odn.shipping_days, 
                        odn.arrival_status, odn.estimated_delivery_rate, odn.arrival_delivery_rate, odn.shipping_delivery_rate, 
                        odn.order_delivered_carrier_date ,
                        ( (select max((cast(IDSK as INT))) from dw_orders) + ROW_NUMBER() OVER (
                            ORDER BY odn.order_id
                        ) ) as IDSK
                        from dw_orders_new odn
                        where odn.order_id  not in (select order_id from dw_orders)                  
                        """)
     
    dw_orders = sqlContext.table("dw_orders")
else:
    windowSpec  = Window.orderBy("order_id")
    dw_orders = dw_orders.withColumn("IDSK", row_number().over(windowSpec))

cols = dw_orders.columns
dw_orders = dw_orders.select(cols)


dw_orders.limit(20).toPandas()

Unnamed: 0,order_id,customer_id,order_status,order_purchase_timestamp_id,order_approved_at_id,order_delivered_carrier_date_id,order_delivered_customer_date_id,order_estimated_delivery_date_id,estimated_days,arrival_days,...,order_delivered_carrier_date,arrival_status_new,estimated_delivery_rate_new,arrival_delivery_rate_new,shipping_delivery_rate_new,IDSK,arrival_status,estimated_delivery_rate,arrival_delivery_rate,shipping_delivery_rate
0,003324c70b19a16798817b2b3640e721,43696894b5bf8fbe1a40b2148ea505a0,delivered,12092,12092,12113,12369,12553,19,11,...,2017-05-19 16:00:00,Em tempo,Duração OK,Rápido,Rápido,81,Em tempo,Duração OK,Rápido,Rápido
1,018cb6a10649393055c272ba2984e8bb,169c3ca987a830baef97320592292dae,delivered,19771,19771,19796,20298,20401,26,21,...,2018-04-04 19:00:00,Em tempo,Devagar,Duração OK,Duração OK,588,Em tempo,Devagar,Duração OK,Duração OK
2,01908071e35fdc75bc0d7764f153abc2,e30bdf332b6c38f5f22d50dec12282cb,delivered,15933,15963,16272,16389,16705,32,18,...,2017-11-08 23:00:00,Em tempo,Devagar,Duração OK,Muito Rápido,590,Em tempo,Devagar,Duração OK,Muito Rápido
3,024a19cc498cc1a9efcf675d1ca465c3,54ae421973032c25067cdf03e04be610,delivered,19769,19770,19845,20418,20305,22,27,...,2018-04-06 20:00:00,Atrasado,Duração OK,Devagar,Duração OK,873,Atrasado,Duração OK,Devagar,Duração OK
4,024d4903890a56c2e98df63a32206bd9,a1aca6bcf003c08a08097758ff8cf96c,delivered,16162,16186,16235,16413,16729,23,10,...,2017-11-07 10:00:00,Em tempo,Duração OK,Rápido,Muito Rápido,878,Em tempo,Duração OK,Rápido,Muito Rápido
5,02c13bb9b84866055ea2074cc6a83a16,bc9802f1680047c23503920524d21e8b,delivered,21793,21795,21802,22123,22513,29,13,...,2018-06-27 09:00:00,Em tempo,Devagar,Rápido,Rápido,1059,Em tempo,Devagar,Rápido,Rápido
6,03a85a6adb776ffefc4a8d7f845b10a4,641f77a2bb1d1380cff2e60afd691c07,delivered,21433,21444,21687,21980,22513,44,22,...,2018-06-22 14:00:00,Em tempo,Devagar,Duração OK,Rápido,1422,Em tempo,Devagar,Duração OK,Rápido
7,04836fc7917dc0bfcfbd4456ec4156ac,136d20997016db0a1a8016f0fa7b9ff5,delivered,22381,22382,22446,22533,23089,29,6,...,2018-07-24 05:00:00,Em tempo,Devagar,Muito Rápido,Muito Rápido,1768,Em tempo,Devagar,Muito Rápido,Muito Rápido
8,04e2e5f16cad2f04bb156a4bcc034520,139a718c01cce276519bede2cbe5cae5,delivered,16984,16984,17063,17228,17641,27,10,...,2017-12-11 22:00:00,Em tempo,Devagar,Rápido,Muito Rápido,1913,Em tempo,Devagar,Rápido,Muito Rápido
9,0562291f2b37f55cc259053d2230fdc5,24824d3120ac876328bf46760d493969,delivered,20411,20411,20416,20638,21289,36,9,...,2018-04-30 15:00:00,Em tempo,Devagar,Rápido,Rápido,2117,Em tempo,Devagar,Rápido,Rápido


In [0]:
print((dw_orders.count(), len(dw_orders.columns)))

(99441, 21)


#### Cria tabela Fato de Pagamentos

In [0]:
dw_payments = orders_payments_df.select(orders_payments_df.columns)
dw_payments = dw_payments.select(['payment_sequential', 'payment_type', 'payment_installments', 'payment_value', "order_id" ])
dw_payments = dw_payments.join(dw_orders, ["order_id"], how='left')
cols = ['payment_sequential', 'payment_type', 'payment_installments', 'payment_value', 'customer_id', 'order_status', 
        'order_purchase_timestamp_id', 'order_approved_at_id', 'order_delivered_carrier_date_id', 
        'order_delivered_customer_date_id', 'order_estimated_delivery_date_id', 'IDSK', 'estimated_days', 'arrival_days', 
        'shipping_days']
dw_payments = dw_payments.select(cols)
dw_payments = dw_payments.withColumnRenamed("IDSK", "order_id")

cols = dw_payments.columns
cols.remove("customer_id")
dw_payments = dw_payments.join(dw_customers, ["customer_id"], how='left')
cols = [*cols, *["IDSK"]]
dw_payments = dw_payments.select(cols)
dw_payments = dw_payments.withColumnRenamed("IDSK", "customer_id")

dw_payments = dw_payments.select(['customer_id','order_id', 'payment_sequential', 'payment_type', 'payment_installments', 
                                  'payment_value', 'order_purchase_timestamp_id', 'order_approved_at_id', 
                                  'order_delivered_carrier_date_id', 'order_delivered_customer_date_id',
                                  'order_estimated_delivery_date_id', 'estimated_days', 'arrival_days', 
                                  'shipping_days'])

dw_payments = dw_payments.withColumn("payment_sequential", col("payment_sequential").cast(IntegerType()) )
dw_payments = dw_payments.withColumn("payment_installments", col("payment_installments").cast(IntegerType()) )
dw_payments = dw_payments.withColumn("payment_value", col("payment_value").cast(DoubleType()) )

if dw_reviews_old.count() > 0:
    dw_payments_old.write.mode("overwrite").saveAsTable("db_eccommerce.dw_payments")
    dw_payments.createOrReplaceTempView("dw_payments_new")

    dw_payments = spark.sql("""insert into  db_eccommerce.dw_payments (customer_id,order_id, payment_sequential, payment_type, 
                            payment_installments, payment_value, order_purchase_timestamp_id, order_approved_at_id, 
                            order_delivered_carrier_date_id, order_delivered_customer_date_id, order_estimated_delivery_date_id, 
                            estimated_days, arrival_days, shipping_days)
                        select customer_id,order_id, payment_sequential, payment_type, payment_installments, payment_value, 
                                  order_purchase_timestamp_id, order_approved_at_id, order_delivered_carrier_date_id, 
                                  order_delivered_customer_date_id, order_estimated_delivery_date_id, estimated_days, arrival_days, 
                                  shipping_days
                        from dw_payments_new 
                        where (order_id, payment_sequential) not in (select order_id, payment_sequential 
                        from db_eccommerce.dw_payments);                  
                        """)
     
    dw_payments = sqlContext.table("db_eccommerce.dw_payments")

dw_payments.limit(20).toPandas()

Unnamed: 0,customer_id,order_id,payment_sequential,payment_type,payment_installments,payment_value,order_purchase_timestamp_id,order_approved_at_id,order_delivered_carrier_date_id,order_delivered_customer_date_id,order_estimated_delivery_date_id,estimated_days,arrival_days,shipping_days
0,53963,72675,1,credit_card,1,37.15,22841,22842,22858,,23161,13,,
1,32808,51800,1,credit_card,1,72.75,15119,15119,15138,15236.0,15625,21,4.0,4.0
2,8241,54409,3,voucher,1,15.0,19391,19415,19434,19504.0,19801,17,4.0,2.0
3,41004,32641,1,credit_card,2,61.19,12420,12425,12442,13385.0,13057,26,40.0,39.0
4,98810,36281,1,credit_card,2,136.26,10243,10267,10332,10572.0,10921,28,13.0,9.0
5,1228,25314,1,boleto,1,89.27,17054,17069,17230,17274.0,17449,16,9.0,1.0
6,9636,74075,1,credit_card,2,239.5,18252,18252,18400,18786.0,19249,41,22.0,16.0
7,93214,20116,1,credit_card,3,248.51,18973,18974,19071,19849.0,19441,19,36.0,32.0
8,92651,24663,1,boleto,1,91.05,17830,17908,17948,18067.0,18433,25,9.0,4.0
9,55527,88057,1,credit_card,3,102.03,18979,18979,19009,19412.0,19729,31,18.0,16.0


In [0]:
print((dw_payments.count(), len(dw_payments.columns)))
dw_payments.write.mode("overwrite").saveAsTable("db_eccommerce.dw_payments")

(103886, 14)


#### Cria tabela Fato de Avaliações

In [0]:
dw_reviews = orders_reviews_df.select(orders_reviews_df.columns)
dw_reviews = dw_reviews.select(["review_id","order_id","review_score","review_comment_title","review_comment_message",
                                "review_creation_date","review_answer_timestamp"])
dw_reviews = dw_reviews.join(dw_orders, ["order_id"], how= 'left')
cols = dw_reviews.columns
cols.remove("order_id")
cols.remove("order_status")
dw_reviews = dw_reviews.select(cols)
dw_reviews = dw_reviews.withColumnRenamed("IDSK", "order_id")

cols = dw_reviews.columns
dw_reviews = dw_reviews.join(dw_customers, ["customer_id"], how='left')
cols.remove("customer_id")
cols = [*cols, *["IDSK"]]
dw_reviews = dw_reviews.select(cols)
dw_reviews = dw_reviews.withColumnRenamed("IDSK", "customer_id")

dw_reviews = dw_reviews.withColumn('review_creation_date', to_timestamp(date_format(col('review_creation_date'), "yyyy-MM-dd HH") ))
dw_reviews = dw_reviews.withColumn('review_answer_timestamp', to_timestamp(date_format(col('review_answer_timestamp'), "yyyy-MM-dd HH") ))

dw_reviews = dw_reviews.join(dw_time, dw_reviews.review_creation_date == dw_time.full_date , how='left')
dw_reviews = dw_reviews.select(['customer_id','order_id', "review_id", "review_score","review_comment_title","review_comment_message",
                                'order_purchase_timestamp_id', 'order_approved_at_id', 'order_delivered_carrier_date_id', 
                                'order_delivered_customer_date_id', 'order_estimated_delivery_date_id', 'IDSK', 'review_answer_timestamp', 
                                'estimated_days', 'arrival_days', 'shipping_days'])
dw_reviews = dw_reviews.withColumnRenamed("IDSK", "review_creation_date_id")

dw_reviews = dw_reviews.join(dw_time, dw_reviews.review_answer_timestamp == dw_time.full_date , how='left')
dw_reviews = dw_reviews.select([ 'customer_id','order_id', "review_id", "review_score","review_comment_title","review_comment_message",
                                'order_purchase_timestamp_id', 'order_approved_at_id', 'order_delivered_carrier_date_id', 
                                'order_delivered_customer_date_id', 'order_estimated_delivery_date_id', 'review_creation_date_id',
                                'IDSK', 'estimated_days', 'arrival_days', 'shipping_days'])
dw_reviews = dw_reviews.withColumnRenamed("IDSK", "review_answer_timestamp_id")

if dw_reviews_old.count() > 0:
    dw_reviews_old.write.option("multiLine", "true").mode("overwrite").saveAsTable("db_eccommerce.dw_reviews")
    dw_reviews.createOrReplaceTempView("dw_reviews_new")

    dw_reviews = spark.sql("""insert into db_eccommerce.dw_reviews ( customer_id,order_id, review_id, review_score,review_comment_title, 
                                review_comment_message, order_purchase_timestamp_id, order_approved_at_id, 
                                order_delivered_carrier_date_id, order_delivered_customer_date_id, order_estimated_delivery_date_id, 
                                review_creation_date_id, review_answer_timestamp_id, estimated_days, arrival_days, shipping_days)
                        select  customer_id, order_id, review_id, review_score,review_comment_title,review_comment_message,
                                order_purchase_timestamp_id, order_approved_at_id, order_delivered_carrier_date_id, 
                                order_delivered_customer_date_id, order_estimated_delivery_date_id, review_creation_date_id,
                                review_answer_timestamp_id, estimated_days, arrival_days, shipping_days
                        from dw_reviews_new 
                        where (order_id, review_id) not in (select order_id, review_id from db_eccommerce.dw_reviews);                  
                        """)
     
    dw_reviews = sqlContext.table("db_eccommerce.dw_reviews")

dw_reviews = dw_reviews.withColumn("review_score", col("review_score").cast(IntegerType()) )

dw_reviews.limit(20).toPandas()

Unnamed: 0,customer_id,order_id,review_id,review_score,review_comment_title,review_comment_message,order_purchase_timestamp_id,order_approved_at_id,order_delivered_carrier_date_id,order_delivered_customer_date_id,order_estimated_delivery_date_id,review_creation_date_id,review_answer_timestamp_id,estimated_days,arrival_days,shipping_days
0,55527,88057,28642ce6250b94cc72bc85960aec6c62,5,,,18979,18979,19009,19412,19729,19537,19558,31,18,16
1,12517,82641,8ef0597f9b1870cf46c81358425f0e24,5,,nada a declarar,12066,12075,12086,12111,12409,12121,12180,14,1,1
2,59175,5420,b66f1f980a4832698b613bcce72bb779,4,,Ótima compra. Ótimo produto. Só senti falta de...,14086,14086,14107,14537,15193,14545,14570,46,18,17
3,71107,14375,9d0d0a431b7fd6db8137f0c9f942b633,5,,,20683,20700,20750,20921,21265,20929,20949,24,9,7
4,74461,95620,d5afe15f58c18e891b2050e743e6be31,5,,,17489,17489,17588,17781,18481,17737,17811,41,12,8
5,46964,49662,584d557d42a363433e67b35b21630880,3,,,13287,13287,13363,13502,13849,13513,13586,23,8,5
6,17880,14781,d47aa070748c52a219d77f7f33087c8a,5,,Excelente,19905,19905,19914,19939,20305,19945,20015,16,1,1
7,16921,11067,cd8283bde32ad44883579ba439f5ea44,5,Recomendo,,20005,20006,20317,20348,20305,20353,20375,12,14,1
8,20336,32364,6bd75b3bd8efb88e88d205c1add1f51e,3,parcial,Entregas separadas...ainda não chegou todo o p...,20944,20945,20990,21209,21289,21217,21251,14,11,9
9,29960,4615,ecd4114eb921e79ccc5e14ff101e86c5,5,,,16910,16910,16988,17277,17641,17281,17304,30,15,12


In [0]:
print((dw_reviews.count(), len(dw_reviews.columns)))
dw_reviews.write.option("multiLine", "true").mode("overwrite").saveAsTable("db_eccommerce.dw_reviews")

(99223, 16)


#### Cria tabela Fato de Itens

In [0]:
dw_itens = orders_items_df.select(orders_items_df.columns)
dw_itens = dw_itens.select(["order_item_id","order_id","order_item_id","product_id","seller_id","shipping_limit_date","price","freight_value"])
dw_itens = dw_itens.join(dw_orders, ["order_id"], how='left')
cols = dw_itens.columns
cols.remove("order_id")
dw_itens = dw_itens.select(cols)
dw_itens = dw_itens.withColumnRenamed("IDSK", "order_id")

cols = dw_itens.columns
dw_itens = dw_itens.join(dw_customers, ["customer_id"], how='left')
cols.remove("customer_id")
cols = [*cols, *["IDSK"]]
dw_itens = dw_itens.select(cols)
dw_itens = dw_itens.withColumnRenamed("IDSK", "customer_id")

dw_itens = dw_itens.withColumn('shipping_limit_date', to_timestamp(date_format(col('shipping_limit_date'), "yyyy-MM-dd HH") ))

dw_itens = dw_itens.withColumn('seller_to_carrier_status', datediff_cond_udf(col('shipping_limit_date'),col('order_delivered_carrier_date'))) 
dw_itens = dw_itens.withColumn('seller_to_carrier_status', get_arrival_status_udf(col('seller_to_carrier_status'))) 

dw_itens = dw_itens.join(dw_time, dw_itens.shipping_limit_date == dw_time.full_date , how='left')
dw_itens = dw_itens.select(["order_item_id", 'customer_id','order_id', "seller_id" , "product_id" , "price", "freight_value",
                            'order_purchase_timestamp_id', 'order_approved_at_id', 'order_delivered_carrier_date_id', 
                            'order_delivered_customer_date_id', 'order_estimated_delivery_date_id', 'IDSK', 'estimated_days', 
                            'arrival_days', 'shipping_days', 'seller_to_carrier_status'])
dw_itens = dw_itens.withColumnRenamed("IDSK", "shipping_limit_date_id")

if dw_itens_old.count() > 0:
    cols = dw_itens_old.columns
    dw_itens_new = dw_itens.select(dw_itens.columns)

    dw_itens_new  = dw_itens_new.withColumn("order_item_id", col("order_item_id").cast(IntegerType()) )
    dw_itens_new  = dw_itens_new.withColumn("price", col("price").cast(DoubleType()) )
    dw_itens_new  = dw_itens_new.withColumn("freight_value", col("freight_value").cast(DoubleType()) )

    dw_itens_old = dw_itens_old.select(["order_item_id", 'order_id', 'customer_id',"seller_id" , "product_id" , "price", "freight_value", 
                                        'shipping_limit_date_id', 'seller_to_carrier_status'])
    dw_orders_temp = dw_orders.select(['IDSK', 'order_purchase_timestamp_id', 'order_approved_at_id', 
                                       'order_delivered_carrier_date_id', 
                                        'order_delivered_customer_date_id', 'order_estimated_delivery_date_id', 'estimated_days', 
                                        'arrival_days', 'shipping_days'])
    dw_itens_old = dw_itens_old.join(dw_orders_temp, dw_itens_old.order_id == dw_orders.IDSK, how='left')
    dw_itens_old = dw_itens_old.select(["order_item_id", 'customer_id','order_id', "seller_id" , "product_id" , "price", "freight_value",
                                        'order_purchase_timestamp_id', 'order_approved_at_id', 'order_delivered_carrier_date_id', 
                                        'order_delivered_customer_date_id', 'order_estimated_delivery_date_id', 'shipping_limit_date_id', 
                                        'estimated_days', 'arrival_days', 'shipping_days', 'seller_to_carrier_status'])

    dw_itens_old.write.mode("overwrite").saveAsTable("db_eccommerce.dw_itens")
    dw_itens_new.createOrReplaceTempView("dw_itens_new")
    dw_products.createOrReplaceTempView("dw_products")
    dw_sellers.createOrReplaceTempView("dw_sellers")

    dw_itens = spark.sql("""insert into  db_eccommerce.dw_itens (order_item_id, order_id, customer_id, product_id, seller_id, 
                         order_purchase_timestamp_id, order_approved_at_id, order_delivered_carrier_date_id, 
                         order_delivered_customer_date_id, order_estimated_delivery_date_id, shipping_limit_date_id, price, 
                         freight_value, estimated_days, arrival_days, shipping_days, seller_to_carrier_status)
                        select itn.order_item_id, itn.order_id, itn.customer_id, p.IDSK, s.IDSK, itn.order_purchase_timestamp_id, 
                        itn.order_approved_at_id, itn.order_delivered_carrier_date_id, itn.order_delivered_customer_date_id, 
                        itn.order_estimated_delivery_date_id, itn.shipping_limit_date_id, itn.price, itn.freight_value, 
                        itn.estimated_days, itn.arrival_days, itn.shipping_days, itn.seller_to_carrier_status
                        from dw_itens_new itn
                        inner join db_eccommerce.dw_products p on p.product_id = itn.product_id and p.FIM is null
                        inner join db_eccommerce.dw_sellers s on s.seller_id = itn.seller_id and s.FIM is null
                        where (itn.order_id, itn.order_item_id) not in ( select order_id, order_item_id 
                        from db_eccommerce.dw_itens);                  
                        """)
     
    dw_itens = sqlContext.table("db_eccommerce.dw_itens")
    dw_itens = dw_itens.select(cols)

else:
    cols = dw_itens.columns
    dw_itens = dw_itens.join(dw_sellers, (dw_sellers.seller_id == dw_itens.seller_id), how='left')
    cols.remove("seller_id")
    cols = [*cols, *["IDSK"]]
    dw_itens = dw_itens.select(cols)
    dw_itens = dw_itens.withColumnRenamed("IDSK", "seller_id")
    
    cols = dw_itens.columns
    dw_itens = dw_itens.join(dw_products, (dw_products.product_id == dw_itens.product_id) , how='left')
    cols.remove("product_id")
    cols = [*cols, *["IDSK"]]
    dw_itens = dw_itens.select(cols)
    dw_itens = dw_itens.withColumnRenamed("IDSK", "product_id")

dw_itens  = dw_itens.withColumn("order_item_id", col("order_item_id").cast(IntegerType()) )
dw_itens  = dw_itens.withColumn("price", col("price").cast(DoubleType()) )
dw_itens  = dw_itens.withColumn("freight_value", col("freight_value").cast(DoubleType()) )

dw_itens.limit(20).toPandas()

Unnamed: 0,order_item_id,customer_id,order_id,product_id,seller_id,price,freight_value,order_purchase_timestamp_id,order_approved_at_id,order_delivered_carrier_date_id,order_delivered_customer_date_id,order_estimated_delivery_date_id,shipping_limit_date_id,estimated_days,arrival_days,shipping_days,seller_to_carrier_status
0,1,17784,493,493,493,27.9,3.81,13746,13746,13772,13864,14257,13890,21,4,3,Em tempo
1,2,17784,493,493,493,21.33,25.39,13746,13746,13772,13864,14257,13890,21,4,3,Em tempo
2,1,54626,606,606,606,159.9,28.5,18517,18518,18616,18819,19273,18638,31,12,8,Atrasado
3,1,43963,632,632,632,34.99,15.1,17963,17963,18071,18310,18745,18107,32,14,9,Em tempo
4,1,81124,715,715,715,151.99,17.77,14081,14099,14132,14255,14545,14243,19,7,5,Em tempo
5,1,54474,980,980,980,49.99,11.73,17242,17243,17273,17295,17713,17411,19,2,0,Em tempo
6,1,365,1334,1334,1334,55.0,14.14,14711,14711,14733,14925,15241,14879,22,8,7,Em tempo
7,1,24886,1526,1526,1526,46.9,8.88,19500,19507,19582,19600,19801,19651,12,4,0,Em tempo
8,1,3582,1624,1624,1624,59.9,14.73,22841,22842,22933,23012,23113,22938,11,7,3,Atrasado
9,1,16107,2240,2240,2240,27.99,22.67,13715,13716,13864,14374,14689,13860,40,27,21,Atrasado


In [0]:
print((dw_itens.count(), len(dw_itens.columns)))
dw_itens.write.mode("overwrite").saveAsTable("db_eccommerce.dw_itens")

(112650, 17)


In [0]:
dw_orders = dw_orders.select(["IDSK", "order_id", 'arrival_status', 'estimated_delivery_rate', 'arrival_delivery_rate',
                              'shipping_delivery_rate'])
dw_orders.write.mode("overwrite").saveAsTable("db_eccommerce.dw_orders")

In [0]:
%sql
drop table IF EXISTS dw_orders

### Salvando dados no Azure Synapse

In [0]:
access_key = dbutils.secrets.get("eccomerce_secrets", "access_key")
sql_password = dbutils.secrets.get("eccomerce_secrets", "sql_password")

storage_account_name = "saeccommerce"
storage_account_key = access_key
storage_container_name = "c-eccommerce"
storage_folder_data = "temp_data"

temp_dir_url = "wasbs://{}@{}.blob.core.windows.net/{}".format(storage_container_name, storage_account_name,storage_folder_data)

spark_config_key = "fs.azure.account.key.{}.blob.core.windows.net".format(storage_account_name)
spark_config_value = storage_account_key

spark_config_storage_account_access = "fs.azure.account.key.{}.dfs.core.windows.net={}".format(storage_account_name, spark_config_value)

spark.conf.set(spark_config_key, spark_config_value)
spark.conf.set( "spark.sql.parquet.writeLegacyFormat", "true")

servername = "snpeccomerce"
databasename = "snpdedicated"
username = "sqladminuser"
password = sql_password

sql_dw_connection_string = "jdbc:sqlserver://{}.database.windows.net:1433;database={};user={}@{};password={};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;".format(servername, databasename, username, servername, password)


In [0]:
new_table_name = "bi.dw_time"

if dw_time.count() > 0:
    dw_time.write \
    .format("com.databricks.spark.sqldw") \
    .option("url", sql_dw_connection_string) \
    .option("forwardSparkAzureStorageCredentials", "true") \
    .option("dbTable", new_table_name) \
    .option("tempDir", temp_dir_url) \
    .mode("overwrite") \
    .save()



In [0]:
new_table_name = "bi.dw_products"


if dw_products.count() > 0:
    dw_products = dw_products.withColumn("product_weight_g", col("product_weight_g").cast("decimal(20,18)") )
    dw_products = dw_products.withColumn("product_length_cm", col("product_length_cm").cast("decimal(20,18)") )
    dw_products = dw_products.withColumn("product_height_cm", col("product_height_cm").cast("decimal(20,18)") )
    dw_products = dw_products.withColumn("product_width_cm", col("product_width_cm").cast("decimal(20,18)") )

    dw_products.write \
    .format("com.databricks.spark.sqldw") \
    .option("url", sql_dw_connection_string) \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", new_table_name) \
    .option("tempdir", temp_dir_url) \
    .mode("overwrite") \
    .save()

In [0]:
new_table_name = "bi.dw_sellers"

if dw_sellers.count() > 0:
    dw_sellers = dw_sellers.withColumn("geolocation_lat", col("geolocation_lat").cast("decimal(20,18)") )
    dw_sellers = dw_sellers.withColumn("geolocation_lng", col("geolocation_lng").cast("decimal(20,18)") )

    dw_sellers.write \
    .format('com.databricks.spark.sqldw') \
    .option("url", sql_dw_connection_string) \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", new_table_name) \
    .option("tempdir", temp_dir_url) \
    .mode("overwrite") \
    .save()

In [0]:
new_table_name = "bi.dw_customers"

if dw_customers.count() > 0:
    dw_customers = dw_customers.withColumn("geolocation_lat", col("geolocation_lat").cast("decimal(20,18)") )
    dw_customers = dw_customers.withColumn("geolocation_lng", col("geolocation_lng").cast("decimal(20,18)") )

    dw_customers.write \
    .format('com.databricks.spark.sqldw') \
    .option("url", sql_dw_connection_string) \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", new_table_name) \
    .option("tempdir", temp_dir_url) \
    .mode("overwrite") \
    .save()

In [0]:
new_table_name = "bi.dw_orders"

if dw_orders.count() > 0:
    dw_orders.write \
    .format("com.databricks.spark.sqldw") \
    .option("url", sql_dw_connection_string) \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", new_table_name) \
    .option("tempdir", temp_dir_url) \
    .mode("overwrite") \
    .save()

In [0]:
new_table_name = "bi.dw_payments"


if dw_payments.count() > 0:
    dw_payments = dw_payments.withColumn("payment_value", col("payment_value").cast("decimal(9,2)") )

    dw_payments.write \
    .format("com.databricks.spark.sqldw") \
    .option("url", sql_dw_connection_string) \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", new_table_name) \
    .option("tempdir", temp_dir_url) \
    .mode("overwrite") \
    .save()

In [0]:
new_table_name = "bi.dw_reviews"

if dw_reviews.count() > 0:
    dw_reviews.write \
    .format("com.databricks.spark.sqldw") \
    .option("url", sql_dw_connection_string) \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", new_table_name) \
    .option("tempdir", temp_dir_url) \
    .mode("overwrite") \
    .save()

In [0]:
new_table_name = "bi.dw_itens"

if dw_itens.count() > 0:
    dw_itens = dw_itens.withColumn("price", col("price").cast("decimal(9,2)") )
    dw_itens = dw_itens.withColumn("freight_value", col("freight_value").cast("decimal(9,2)") )

    dw_itens.write \
    .format("com.databricks.spark.sqldw") \
    .option("url", sql_dw_connection_string) \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", new_table_name) \
    .option("tempdir", temp_dir_url) \
    .mode("overwrite") \
    .save()