# Intellibrand - Data Engineer Test Case

In [1]:
# #############################################################################
# Lucas Lukasavicus Silva
# March 03, 2023
# lukasavicus at gmail dot com
# -----------------------------------------------------------------------------
# Code Test 4 Data Engineer @ Intellibrand
# #############################################################################

Ficamos muito felizes com o seu interesse nesta vaga e esse teste é de grande importância para que possamos avaliar o seu conhecimento técnico, criatividade e organização no desenvolvimento da tarefa. Nosso trabalho diário é desenvolver e implementar soluções de dados que aumentam a performance dos nossos clientes nos principais sites de varejo online. Por isso, criamos este desafio técnico para simular uma atividade que é realizada pelos nossos times especialistas. Nessa tarefa, você deverá encontrar **os 10 produtos com a maior variação de preço, entre o valor encontrado no varejo online (Americanas) e o estipulado pelo fabricante**, e **os 10 produtos que apresentam maior indisponibilidade**. _Nosso cliente deseja observar com essa task se existem produtos que estão sendo vendidos com valores muito distantes do ideal_. Para isso, disponibilizamos dados coletados nos varejos em diferentes datas para que possa desenvolver essa atividade. Alguns pontos importantes sobre a estrutura dos nossos dados:

- "retailerPrice" -- é preço encontrado no varejo;
- "manufacturerPrice" -- é preço indicado pelo fabricante;
- "priceVariation" -- é a variação do preço, pode ser negativa ou positiva.
- “available” – se o produto está disponível para compra

O escopo do teste deve incluir, mas não se restringindo:
- Ingestão dos dados;
- Armazenamento dos arquivos;
- Tratamento dos dados;
- Cálculo das métricas;

Desejáveis:
- Aplicação de PyLint ou similar;
- Aplicação de Clean Code;
- Diagrama da pipeline;
- Readme;
- Coverage e Testes unitários;


Os arquivos para o teste estão em formato .zip no link;

https://drive.google.com/file/d/14nMMC19vpQ8s69TqCkQZh7SxJxlpaOzW/view?usp=sharing

Esperamos que o resultado seja entregue em um repositório GIT.
Boa sorte!


---

In [2]:
import os
from pathlib import Path
import json
import pandas as pd
import numpy as np
from zipfile import ZipFile
import os

In [3]:
READ_FROM_ORIGIN = True

In [4]:
base = os.path.join('.', 'datatest', 'root', 'workspace', 'data-eng-test')
files = [path.name for path in Path(base).rglob('*.json')]

In [5]:
# Check some constraints:
# 1. All files are unique on name;
# 2. We can assess the structure of all data

In [6]:
assert len(files) == np.unique(files).size # 1

### Read Data

In [7]:
if(READ_FROM_ORIGIN):
    all_data = []
    errors = []
    until = 1000000
    with ZipFile('datatest.zip') as myzip:
        zip_info = myzip.infolist()
        for i, zipfile in enumerate(zip_info):
            # print("READING", zipfile.filename)
            if('.json' not in zipfile.filename):
                continue
            until -= 1
            if(until == 0): break
            try:
                with myzip.open(zipfile.filename) as myfile:
                    data = json.load(myfile)
                    all_data.append({
                            'file_properties' :
                            {
                                'title' : zipfile.filename
                            },
                            'content' : data,
                            'sk' : i
                        })
            except Exception as e:
                print(e)
                errors.append({'file' : zipfile.filename, 'error' : e, 'sk' : i})
                raise e
    print("Readed", len(all_data))
    only_data = [d['content'] for d in all_data]

Readed 100970


In [8]:
test_cases = [
    {'table1' : [{'explanation' : "Table 1 - same structure", 'a' : 1, 'b' : 'Hello', 'c' : "2022-12-01T00:00:00.000Z"}]},
    {'table1' : [{'explanation' : "Table 1 - same structure", 'a' : 2, 'b' : 'wORLD', 'c' : "2022-12-02T00:00:00.000Z"}]},
    {'table1' : [{'explanation' : "Table 1 - same structure", 'a' : 3, 'b' : 'h-e-l-l-o', 'c' : "2022-12-03T00:00:00.000Z"}]},
    {'table1' : [{'explanation' : "Table 1 - same structure", 'a' : 4, 'b' : 'world', 'c' : "2022-12-041T00:00:00.000Z"}]},
    {'table1' : [{'explanation' : "Table 1 - minus structure", 'a' : 4, 'b' : 'world'}]},
    {'table1' : [{'explanation' : "Table 1 - minus structure", 'a' : 4, 'c' : "2022-12-041T00:00:00.000Z"}]},
    {'table1' : [{'explanation' : "Table 1 - plus structure", 'a' : 4, 'b' : 'world', 'c' : "2022-12-041T00:00:00.000Z", "d" : -456.90}]},
    {'table1' : [{'explanation' : "Composite Table ", 'a' : 4, 'b' : 'world', 'e' : { 'f1' : 1, 'f2' : 2} }]},
    {'table1' : [{'explanation' : "Table 1 - same structure (test1 again)", 'a' : 1, 'b' : 'Hello', 'c' : "2022-12-01T00:00:00.000Z"}]},
    {'table1' : [{'explanation' : "Multi level structure", 'a' : 4, 'b' : 'world', 'e' : { 'f1' : 1, 'f2' : 2, 'f3' : {'c1': None, 'c2' : None} } }]},

]

In [9]:
def extract_signatures(signatures, key, obj):
    # print('='*30)
    # print('signatures', signatures)
    # print('key', key)
    # print('obj', obj)
    # print('+'*30)
    ks = list(obj.keys())
    if(key is not None):
        # print("\n\nSUPER FLAG", signatures[key] is None, '\n\n')
        # print(" Inside>> ", signatures[key])
        if(signatures[key] is not None):
            for _k in ks:
                if(_k not in signatures[key]):
                    signatures[key].append(_k)
        else:
            # if(isinstance(signatures[key], list) or isinstance(signatures[key], dict)):
            signatures[key] = ks
        # print('sk', signatures)
    for k in ks:
        # print('inner signatures', signatures)
        # print('inner k', k)
        if(isinstance(signatures, list)):
            continue
        
        if(k not in signatures):
            signatures[k] = None

        # passing signatures by ref
        if(isinstance(obj[k], list)):
            for item in obj[k]:
                extract_signatures(signatures, k, item)
        elif(isinstance(obj[k], dict)):
            extract_signatures(signatures, k, obj[k])
        else:
            pass
#             print('not covered yet', type(obj[k]), obj[k], k)

---

### Tests:

In [10]:
s = {}
o = test_cases[7]
extract_signatures(s, None, o)
print('s', [item for item in s.items() if item[1] is not None])

s [('table1', ['explanation', 'a', 'b', 'e']), ('e', ['f1', 'f2'])]


In [11]:
signatures = {}
for i, d in enumerate(test_cases):
    # print("TEST CASE", i, d)
    objj = d.copy()
    extract_signatures(signatures, None, objj)

print('s', [item for item in signatures.items() if item[1] is not None])
# signatures

s [('table1', ['explanation', 'a', 'b', 'c', 'd', 'e']), ('e', ['f1', 'f2', 'f3']), ('f3', ['c1', 'c2'])]


---

### Extract Signatures from Data

#### Análise do Schema dos Dados:

Essa análise tem como propósito verificar o schema dos dados e entender como eles se relacionam. A partir dessa análise esperamos poder extrair os dados de forma performática e armazenar os dados em um esquema mais fácil para futuras análises.

In [12]:
len(only_data)

100970

In [13]:
signatures = {}
for d in only_data[:]:
#     print(d, '\n\n')
    objj = d.copy()
    extract_signatures(signatures, None, objj)

In [14]:
# Assinaturas dos Dados
sgnats = [item for item in signatures.items() if item[1] is not None]
print('s', [item for item in signatures.items() if item[1] is not None])

s [('assortment', ['dateTimeReference', 'idRetailerSKU', 'modifiedDate', 'seller', 'retailerFinalUrl', 'retailerProductCode', 'available', 'unavailable', 'notListed', 'screenshot', 'retailerDescription', 'retailerPrice', 'retailerTitle', 'retailerRatingCount', 'retailerFirstImageType', 'heroImageFlag', 'manufacturerExtraImagesTotal', 'manufacturerImagesTotal', 'retailerExtraImagesTotal', 'retailerImagesTotal', 'retailerDescriptionFoundWords', 'descriptionFlag', 'descriptionPercentage', 'manufacturerDescription', 'manufacturerPrice', 'priceInsideRange', 'priceAboveRange', 'priceBelowRange', 'priceOutOfThePolicy', 'priceVariation', 'useManufacturerPriceVariationRule', 'useProductPriceRangeRule', 'retailerTitleFoundWords', 'titleFlag', 'titlePercentage', 'brandRetailerTitle', 'manufacturerTitle', 'retailerAverageRating', 'retailerReviewCount', 'correctExtraImagesTotal', 'extraImagesPercentage', 'primaryImageFlag', 'retailerFromPrice', 'priceDiscount']), ('images', ['idProductCatalog', 'or

---

In [15]:
# Principais "tabelas" a serem modeladas
for k, v in sgnats:
    print(k)

assortment
images
reviews
variants
marketplace


In [16]:
# Campos da "tabela" Assortment
sgnats[0]

('assortment',
 ['dateTimeReference',
  'idRetailerSKU',
  'modifiedDate',
  'seller',
  'retailerFinalUrl',
  'retailerProductCode',
  'available',
  'unavailable',
  'notListed',
  'screenshot',
  'retailerDescription',
  'retailerPrice',
  'retailerTitle',
  'retailerRatingCount',
  'retailerFirstImageType',
  'heroImageFlag',
  'manufacturerExtraImagesTotal',
  'manufacturerImagesTotal',
  'retailerExtraImagesTotal',
  'retailerImagesTotal',
  'retailerDescriptionFoundWords',
  'descriptionFlag',
  'descriptionPercentage',
  'manufacturerDescription',
  'manufacturerPrice',
  'priceInsideRange',
  'priceAboveRange',
  'priceBelowRange',
  'priceOutOfThePolicy',
  'priceVariation',
  'useManufacturerPriceVariationRule',
  'useProductPriceRangeRule',
  'retailerTitleFoundWords',
  'titleFlag',
  'titlePercentage',
  'brandRetailerTitle',
  'manufacturerTitle',
  'retailerAverageRating',
  'retailerReviewCount',
  'correctExtraImagesTotal',
  'extraImagesPercentage',
  'primaryImageFl

#### Sobre o Schema de Dados:

Pudemos observar que 4 principais tabelas podem ser modeladas a partir dos dados que temos a disposição, sendo que a principal delas é a tabela de _Assortment_ que contém os dados coletados do retailer.

---

In [17]:
def flatten_assortment(registry):
    fprops = registry['file_properties']
    content = registry['content']
    assortments = content['assortment']
    flatten_registries = []
    for assortment in assortments:
#         print('assortment', assortment)
#         print('fprops', fprops)
        assrtmnt = {**fprops, **assortment}
        assrtmnt['sk'] = registry['sk']
        flatten_registries.append(assrtmnt)
    return flatten_registries

In [23]:
READ_FROM_ORIGIN = False

In [30]:
if(READ_FROM_ORIGIN):
    # Transform JSON files into a Pandas.DataFrame

    flattened_data = [flatten_assortment(d) for d in all_data]
    flattened_list = [item for sublist in flattened_data for item in sublist]
    df = pd.DataFrame(flattened_list)
    
    df.reset_index(drop=True, inplace=True)
    
    df.to_parquet('Datatest.parquet.gzip', compression='gzip')
else:
    df = pd.read_parquet('Datatest.parquet.gzip')

In [49]:
df.describe()

Unnamed: 0,idRetailerSKU,sk,retailerPrice,retailerRatingCount,retailerFirstImageType,manufacturerExtraImagesTotal,manufacturerImagesTotal,retailerExtraImagesTotal,retailerImagesTotal,descriptionPercentage,...,retailerAverageRating,retailerReviewCount,correctExtraImagesTotal,extraImagesPercentage,retailerFromPrice,priceDiscount,availability,occurence,unavailability,priceVar
count,100970.0,100970.0,22114.0,30589.0,18023.0,18096.0,18096.0,18096.0,18096.0,19066.0,...,24457.0,22072.0,17053.0,17053.0,2168.0,2168.0,100970.0,100970.0,100970.0,22114.0
mean,1122299.0,50485.939972,591.492001,965.156233,0.398657,6.362014,7.362622,4.771883,5.769341,60.199725,...,4.497134,4.836988,3.102856,42.480888,1319.819161,72.304405,0.219016,1.0,0.07629,-34.497819
std,585798.8,29148.099452,2133.545231,4598.041848,0.489636,5.078741,5.076059,6.229312,6.229139,35.735667,...,0.465938,1.83403,3.806713,43.058324,3852.285873,235.027508,0.413581,0.0,0.265463,601.246361
min,27284.0,1.0,0.99,0.0,0.0,0.0,1.0,0.0,1.0,0.0,...,1.0,1.0,0.0,0.0,2.29,-935.18,0.0,1.0,0.0,-11788.65
25%,603295.0,25243.25,12.99,4.0,0.0,3.0,4.0,0.0,1.0,26.67,...,4.3,4.0,0.0,0.0,24.99,2.0,0.0,1.0,0.0,-2.73
50%,1008662.0,50485.5,25.99,23.0,0.0,6.0,7.0,3.0,4.0,61.76,...,4.6,6.0,2.0,28.57,149.945,12.99,0.0,1.0,0.0,1.74
75%,1840011.0,75728.75,193.6625,200.0,1.0,8.0,9.0,7.0,8.0,100.0,...,4.8,6.0,6.0,91.67,1641.91,58.49,0.0,1.0,0.0,12.56
max,1981812.0,100971.0,43999.99,33068.0,1.0,37.0,38.0,47.0,48.0,100.0,...,5.0,6.0,23.0,100.0,41999.99,2000.0,1.0,1.0,1.0,4924.28


In [31]:
dfp = df[[
    'manufacturerTitle',
    'idRetailerSKU', 'retailerPrice', 'manufacturerPrice', 'useManufacturerPriceVariationRule', 'priceBelowRange',
    'priceVariation', 'priceDiscount', 'available', 'unavailable'
]]

In [32]:
dfp[~dfp['retailerPrice'].isnull()]

Unnamed: 0,manufacturerTitle,idRetailerSKU,retailerPrice,manufacturerPrice,useManufacturerPriceVariationRule,priceBelowRange,priceVariation,priceDiscount,available,unavailable
17,Absorvente Externo Nortuno SEMPRE LIVRE® ADAPT...,1839998,15.90,17.59,True,False,-9.61,,True,False
20,Protetor Solar Corpo e Rosto SUNDOWN® Praia e ...,1839968,38.99,39.76,True,False,-1.94,,True,False
25,Bebida à Base de Castanha-de-Caju Orgânica Ori...,218550,7.99,6.75,False,False,18.37,,True,False
29,Fralda Huggies Tripla Proteção G - 36 fraldas,553313,44.99,44.99,False,False,0.00,,True,False
38,Fralda Pampers Supersec G - 26 unidades,745209,32.99,1.00,True,False,3199.00,,True,False
...,...,...,...,...,...,...,...,...,...,...
100944,NIVEA Creme Hidratante Lata 56g,487947,12.99,20.50,True,True,-36.63,,True,False
100950,Shampoo Hidratação Glicerinada Dove Baby Frasc...,749363,11.99,1.00,True,False,1099.00,,True,False
100956,Cervejeira Consul Cor Titanium com 82L Display...,498709,2499.99,2499.00,True,False,0.04,-478.08,True,False
100963,Shampoo Johnson`s Baby Regular 400ml,749364,17.99,1.00,True,False,1699.00,,True,False


---

In [33]:
# 1. Produtos com maior variação de preço entre o Retailer e o Manufacturer:

# Product DataFrame
pdf = df[["idRetailerSKU", "retailerPrice", "manufacturerPrice"]].groupby(by="idRetailerSKU").mean().reset_index()
pdf['priceVariation'] = pdf['retailerPrice'] - pdf['manufacturerPrice']
pdf['absPriceVariation'] = abs(pdf['priceVariation'])

pdf.sort_values(by=['absPriceVariation', 'priceVariation'], ascending=False).head(10)

Unnamed: 0,idRetailerSKU,retailerPrice,manufacturerPrice,priceVariation,absPriceVariation
3512,1964334,41931.024483,49999.0,-8067.975517,8067.975517
1318,866361,7947.609048,14732.72,-6785.110952,6785.110952
3123,1907007,4866.262857,0.0,4866.262857,4866.262857
3135,1907784,3729.99,0.0,3729.99,3729.99
2809,1876930,6341.36931,9999.0,-3657.63069,3657.63069
2804,1876924,11399.99,14999.0,-3599.01,3599.01
3138,1917908,5420.679655,8999.0,-3578.320345,3578.320345
2803,1876923,4975.852069,8499.0,-3523.147931,3523.147931
3134,1907680,3499.99,0.0,3499.99,3499.99
901,619475,6920.29,9999.0,-3078.71,3078.71


---

In [50]:
# 2. Produtos com maior indisponibilidade:

df['availability'] = df['available'].astype(int)
df['occurence'] = 1
df['unavailability'] = df['unavailable'].astype(int)

# Unavailability DataFrame
udf = df[['idRetailerSKU', 'availability', 'occurence', 'unavailability']].groupby(by='idRetailerSKU').sum().reset_index()
# df[['idRetailerSKU', 'unavailability']].groupby(by='idRetailerSKU').sum().reset_index().sort_values(by='unavailability', ascending=False).head(20)

udf['unavailability_pctg'] = udf.apply(lambda row : (row['unavailability'] / row['occurence'] * 100) if row['occurence'] != 0 else 0 , axis=1)
udf.sort_values(by=['unavailability_pctg', 'unavailability'], ascending=False).head(10)

Unnamed: 0,idRetailerSKU,availability,occurence,unavailability,unavailability_pctg
7,28005,0,29,29,100.0
16,28711,0,29,29,100.0
22,28885,0,29,29,100.0
30,29149,0,29,29,100.0
32,29199,0,29,29,100.0
34,29273,0,29,29,100.0
51,39939,0,29,29,100.0
53,40494,0,29,29,100.0
56,40759,0,29,29,100.0
57,40794,0,29,29,100.0


In [61]:
itemsUnavailable = list(udf[udf['unavailability_pctg'] == 100]['idRetailerSKU'])
f = df['idRetailerSKU'].apply(lambda sku: sku in itemsUnavailable)

In [62]:
namedItemsUnavailable = np.unique(df[f]['manufacturerTitle'])
print(namedItemsUnavailable)

['2021 Smart TV LG 43" Full HD 43LM6370 WiFi Bluetooth HDR ThinQAI compatível com Inteligência Artificial'
 '2021 Smart Tv Lg 32" Hd 32Lm627B Wifi Bluetooth Hdr Thinqai Compatível Com Inteligência Artificial'
 '2021 Smart Tv Lg 70" 4K Uhd 70Up7750 Wifi Bluetooth Hdr Inteligência Artificial Thinq Smart Magic Google Alexa'
 '2021 Smart Tv Lg 75" 4K Uhd 75Up8050 Wifi Bluetooth Hdr Inteligência Artificial Thinq Smart Magic Google Alexa'
 '2022 Smart TV LG 77" 4K OLED77C2 Evo 120Hz Mais Brilho G-Sync FreeSync 4x HDMI 2.1 ThinQAI Google Alexa'
 '49" LG Ultra HD 4K TV | 49UJ6525' '49" LG Ultra HD 4K TV | 49UJ6565'
 'A-Oxitive Noite, Cuidado peeling, Avène - 30ml'
 'A-Oxitive Olhos, Cuidado contorno dos olhos suavizante, Avène - 15ml'
 'A-Oxitive Sérum, Sérum protetor antioxidante, Avène - 30ml'
 'Absorvente Always Noites Tranquilas Cobertura Suave com 32 unidades'
 'Absorvente Interno Intimus Mini - 16 unidades'
 'Actine Colors FPS 70 Pele Morena, Darrow - 40g'
 'Actine Proteor Solar FPS 60 C

---

In [None]:
for c in df.columns:
    print(c)
    
# notListed, 
# retailerTitle, retailerRatingCount, manufacturerDescription, title, manufacturerTitle

In [None]:
# Futuras análises:

# a. notListed x unavailable
# b. notListed x priceVariation (eu acho q coisas notListed o pessoal joga o preço pra 0 ou pra 1, então dá uma variação maior);
# c. Será que algum tipo de produto específico apresenta variação maior? Ou indisponibilidade? Tentar fazer uma análise com palavras (título) x essas duas variáveis;

In [40]:
# Análise adicional:
# 1. notListed == Unavailable

print("Unavailable == NotListed? ", sum(df['unavailable'] == df['notListed']) == df.shape[0])
print("notListed is in Unavailable? ", all(df[df['notListed'] == True]['unavailable']))

# R.: Não. E não acredito que isso seja um erro, mas talvez todos que não estão listado deveriam ser marcados como não disponíveis.

Unavailable == NotListed?  False


In [35]:
# Análise adicional:
# 2. notListed == Unavailable

df["priceVar"] = df["retailerPrice"] - df["manufacturerPrice"]
df[['idRetailerSKU', 'notListed', 'unavailable', 'available', "retailerPrice", "manufacturerPrice", "priceVar"]].sort_values(by=['priceVar'], ascending=False).head(30)

Unnamed: 0,idRetailerSKU,notListed,unavailable,available,retailerPrice,manufacturerPrice,priceVar
53969,1907007,False,False,True,4924.28,0.0,4924.28
52805,1907007,False,False,True,4924.28,0.0,4924.28
17949,1907007,False,False,True,4924.28,0.0,4924.28
15692,1907007,False,False,True,4924.28,0.0,4924.28
24159,1907007,False,False,True,4924.28,0.0,4924.28
69617,1907007,False,False,True,4924.28,0.0,4924.28
25887,1907007,False,False,True,4822.75,0.0,4822.75
96508,1907007,False,False,True,4822.75,0.0,4822.75
8832,1907007,False,False,True,4822.75,0.0,4822.75
48680,1907007,False,False,True,4822.75,0.0,4822.75


In [84]:
shortTitle = df['manufacturerTitle'].apply(lambda title : title.split(' ')[0] if title is not None else '')

In [92]:
productUnavailabilityDf = pd.DataFrame(columns=['shortTitle', 'unavailability'], data=list(zip(shortTitle, df['unavailability'])))
productUnavailabilityDf.groupby('shortTitle').sum().reset_index().sort_values(by=['unavailability'], ascending=False).head(20)

Unnamed: 0,shortTitle,unavailability
124,NIVEA,685
77,Geladeira,498
71,Fralda,405
70,Forno,370
78,Geladeira/Refrigerador,275
98,Lava,273
122,Máquina,238
106,Liquidificador,236
1,2021,185
9,Actine,162


In [99]:
productPriceVarDf = pd.DataFrame(columns=['shortTitle', 'priceVar', 'item'], data=list(zip(shortTitle, df['priceVar'], list(range(df.shape[0])))))

In [104]:
#productPriceVarDf.groupby('shortTitle').mean().reset_index().sort_values(by=['priceVar'], ascending=False).head(20)

productPriceVarDf.groupby('shortTitle').agg({'priceVar':'mean', 'item': 'count'}).reset_index().sort_values(by=['priceVar'], ascending=False).head(20)

Unnamed: 0,shortTitle,priceVar,item
85,Home,2357.813529,58
112,Maquina,1844.982,29
100,Lavadora,1442.297692,57
78,Geladeira/Refrigerador,999.418571,366
116,Microondas,682.4,56
67,Fogão,586.388571,395
158,Smart,474.002291,301
68,Fone,431.040345,29
5,AR,360.554483,29
122,Máquina,240.990802,799
