<a href="https://colab.research.google.com/github/ignitz/teste_luizalabs/blob/main/notebooks/Desafio_Luizalabs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Desafio Luizalabs

> Yuri Niitsuma

[Colab](https://colab.research.google.com/drive/1MpLpGatDDtAmdPAYawasUkreaNitwKPz#scrollTo=Z8Jyxm79eyjt)

Esse é o desafio para ​Engenharia de Dados ​do Luizalabs. Esse desafio contém 2 partes onde você precisará demonstrar seus conhecimentos em:

- Apache Spark (Scala ou Python)
- Estruturas de Dados
- Programação

Ao final de cada task você deve exportar um CSV para que possamos conferir o seu resultado. Você deve também versionar o seu código no github/bitbucket/gitlab de modo que possamos avaliar o seu desenvolvimento. Inclua no repositório os CSVs gerados.
Devemos conseguir rodar seu código e obter o mesmo resultado dos seus CSVs.


## Configuração do ambiente no Google Colab

O Google Colab foi escolhido pois para o teste será mais fácil de apresentar e replicá-lo.

In [1]:
# Instalação do pyspark no ambiente. Na versão 3 pyspark já vem incluso os binários do Spark deixando de instalar e configurar separadamente.
!pip install -q pyspark

[K     |████████████████████████████████| 212.4 MB 65 kB/s 
[K     |████████████████████████████████| 198 kB 57.9 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
# Inicialização do Spark Single Node
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [3]:
# Bibliotecas auxiliriares
import re
from pyspark.sql.functions import *

## Task 1

Leia o arquivo de texto *wordcount.txt*, e conte as palavras que contém até 10 letras. Conte também quantas palavras com mais de 10 letras existem no texto.

Dataset: https://storage.googleapis.com/luizalabs-hiring-test/wordcount.txt

In [4]:
!mkdir -pv data/wordcount && curl -o data/wordcount/data.txt https://storage.googleapis.com/luizalabs-hiring-test/wordcount.txt

mkdir: created directory 'data'
mkdir: created directory 'data/wordcount'
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 18436  100 18436    0     0  29592      0 --:--:-- --:--:-- --:--:-- 29544


In [5]:
# Investigando o conteúdo dos dados
!head -n 1 data/wordcount/data.txt

henDRERIt. MoNTEs PURus lUCtUS DIcTuM, eST Mattis ESt phasELLUS DiGNISsiM rHoNcUS cuBIlIa sIT nunC aT inTErDum vIvamUS lUcTuS AntE AC. amet ELEmEnTUM CoNGUE jUsTO eGET, EGEsTas VITaE Per lacus SOCIiS PURus quIsQUE LUCtUs eLeIfeNd INcEPTOs condiMENTuM eTIAM VEl aUGuE VivaMus prOin eget plAcerAt. conUbia OdIo IncePTOS FeugiAT etiAM venEnaTis. cLAsS LaCuS uLTriceS CoNVaLliS AEnEan. DiAm DoNEc In socIis nEQUe ac DignIssIM AT ultRICEs. NiSL. sEMpeR netus cubilIA. NUnC ANtE. pOrttItOR ALIQueT PROIN, EROs vIverRA sOdALeS NeTuS INTEgER aUCToR. TORtOR uLtRICIES, cuBiLiA NEtUS pOsuERE ipSUM qUAM mONtes soCiosqu VIVAMuS. cONDiMentum IN LeCTus. NaSCetUr eU EtIAM nEC tempoR POsUEre SaGIttIs RutRUm. LuCtuS FACiLIsIs FeUgIAT tURPIs PLATeA, uT ErOS PoRTTITOr vIVerRa LIGULA iPSuM ETIAm plACErAT. eleMENtUm seM sEmpeR nec FRiNGILLa DiCtUMST egeT sIT MalesUaDA saGiTTis ArCu. JUsTo cUm ODIo aNte LectuS pOtENTI nIBh aMEt vel Netus. peNaTIBUS, vIVAMUs RUtrum OrCi SOlLicItUDIN COMMODO pLAcErAt NullA mORBI PuL

Vemos que o conteúdo das palavras contém maiúsculos e minúsculos e possuem pontuações ('.', ';', ','). Para garantir que não afete na contagem da palavra parseada faremos uma limpeza filtrando por caracteres alfanuméricos quando o arquivo for lido pelo Spark.


A estratégia utilizada será com RDD para parsear as palavras utilizando Spark. (E é um dos primeiros exemplos que está no site oficial do Spark http://spark.apache.org/examples.html)

In [6]:
text_file = spark.sparkContext.textFile("data/wordcount/data.txt")

In [7]:
words = text_file.flatMap(lambda line: line.split(" "))\
             .map(lambda word: (re.sub("[^0-9a-zA-Z]+", "", word).lower(), 1)) \
             .reduceByKey(lambda a, b: a + b)
words.take(5)

[('purus', 21), ('dictum', 15), ('est', 11), ('mattis', 11), ('phasellus', 8)]

In [8]:
words.toDF()\
    .withColumnRenamed('_1', 'word')\
    .withColumnRenamed('_2', 'count')\
    .show()

+---------+-----+
|     word|count|
+---------+-----+
|    purus|   21|
|   dictum|   15|
|      est|   11|
|   mattis|   11|
|phasellus|    8|
|  rhoncus|   18|
|  cubilia|   15|
|     nunc|   13|
|       at|   14|
| interdum|   18|
|     ante|   18|
|     amet|   12|
|   congue|   12|
|    justo|   11|
|     eget|   16|
|    vitae|   16|
|    lacus|   10|
|   sociis|   17|
|  quisque|   21|
| inceptos|   18|
+---------+-----+
only showing top 20 rows



**Contar as palavras que contém até 10 letras.**


In [9]:
words_until_ten = text_file.flatMap(lambda line: line.split(" "))\
            .filter(lambda word: 0 < len(re.sub("[^0-9a-zA-Z]+", "", word)) <= 10 )\
            .map(lambda word: (re.sub("[^0-9a-zA-Z]+", "", word).lower(), 1)) \
            .reduceByKey(lambda a, b: a + b)
words_until_ten.take(5)

[('purus', 21), ('dictum', 15), ('est', 11), ('mattis', 11), ('phasellus', 8)]

**Conte também quantas palavras com mais de 10 letras existem no texto**


In [10]:
words_greater_ten = text_file.flatMap(lambda line: line.split(" "))\
            .filter(lambda word: len(re.sub("[^0-9a-zA-Z]+", "", word)) > 10 )\
            .map(lambda word: (re.sub("[^0-9a-zA-Z]+", "", word).lower(), 1)) \
            .reduceByKey(lambda a, b: a + b)
words_greater_ten.take(5)

[('condimentum', 22),
 ('sollicitudin', 17),
 ('scelerisque', 19),
 ('ullamcorper', 18),
 ('pellentesque', 9)]

Apesar do exemplo do dataset final mostrar o formato semelhante ao escrever `saveAsTextFile`, irei converter em DataFrame e salvar em CSV como pedido na introdução do teste.

In [11]:
words_until_ten.toDF()\
    .withColumnRenamed('_1', 'word')\
    .withColumnRenamed('_2', 'count')\
    .coalesce(1).write.mode('overwrite').csv('output/task1/words_until_ten', header=True)

In [12]:
words_greater_ten.toDF()\
    .withColumnRenamed('_1', 'word')\
    .withColumnRenamed('_2', 'count')\
    .coalesce(1).write.mode('overwrite').csv('output/task1/words_greater_then_ten', header=True)

## Task 2

Leia o arquivo pedidos.csv e agrupe todos os cliente que fizeram mais de 2 compras nos dias de black friday dos últimos três anos. Filtre todos os clientes que são menores de 30 anos e coloque numa lista TODOS os códigos de pedido e a data em que foram efetuados. Adicione também a idade do cliente.

Dataset: https://storage.googleapis.com/luizalabs-hiring-test/clientes_pedidos.csv

> Nota: Os dados utilizados no desafio são antigos, portanto pode considerar os últimos 5 anos e não 3 como fala no teste 2.


In [13]:
!mkdir -pv data/pedidos && curl -o data/pedidos/data.csv https://storage.googleapis.com/luizalabs-hiring-test/clientes_pedidos.csv

mkdir: created directory 'data/pedidos'
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 27.5M  100 27.5M    0     0  24.0M      0  0:00:01  0:00:01 --:--:-- 23.9M


In [14]:
df = spark.read.csv('data/pedidos', header=True)
df.show(truncate=False)

+--------------------------------+--------------------------------+-----------------------+-----------+
|codigo_pedido                   |codigo_cliente                  |data_nascimento_cliente|data_pedido|
+--------------------------------+--------------------------------+-----------------------+-----------+
|bc8b03a005d5bf742fc7290db1b218de|b07af86a4a68707373856bcc3946583f|1985-12-04 00:00:00    |1542974527 |
|19b0583adf75322cc58963f3e8eff362|eaaf6b26ef3b9712e384741d8fcf7555|1979-11-14 00:00:00    |1542998573 |
|58fafb698b6d343e06c242a3254e1b7c|c69f2ab5fc61484d75905fd7a6d4f2ff|1989-07-25 00:00:00    |1543007822 |
|79dd9f6c88ba32c9754bc3bbbb0f3939|b40678455119975170709c9607dbf52e|1953-12-14 00:00:00    |1542966096 |
|968806d40adf6aa8c3768c439d8d82b2|7eecbc06bfec32b80654c7d76b3243c9|1985-05-03 00:00:00    |1543000756 |
|b8c6e74cf1b462489e907a336b7597b0|f240c43e82dfe3ca0e4dca0bf5277d93|1980-04-16 00:00:00    |1542993637 |
|5d91ea3b69a22d55bcfa5a4b536d416e|ae03fddbb707cb7398e3efd3cb0802

In [15]:
df1 = df.withColumn('data_nascimento_cliente', col('data_nascimento_cliente').cast('timestamp'))\
    .withColumn('data_pedido', (col('data_pedido') * 1).cast('timestamp'))\
    .withColumn('idade', floor(datediff(current_date(), 'data_nascimento_cliente')/365))

df1.show(truncate=False)

+--------------------------------+--------------------------------+-----------------------+-------------------+-----+
|codigo_pedido                   |codigo_cliente                  |data_nascimento_cliente|data_pedido        |idade|
+--------------------------------+--------------------------------+-----------------------+-------------------+-----+
|bc8b03a005d5bf742fc7290db1b218de|b07af86a4a68707373856bcc3946583f|1985-12-04 00:00:00    |2018-11-23 12:02:07|35   |
|19b0583adf75322cc58963f3e8eff362|eaaf6b26ef3b9712e384741d8fcf7555|1979-11-14 00:00:00    |2018-11-23 18:42:53|41   |
|58fafb698b6d343e06c242a3254e1b7c|c69f2ab5fc61484d75905fd7a6d4f2ff|1989-07-25 00:00:00    |2018-11-23 21:17:02|32   |
|79dd9f6c88ba32c9754bc3bbbb0f3939|b40678455119975170709c9607dbf52e|1953-12-14 00:00:00    |2018-11-23 09:41:36|67   |
|968806d40adf6aa8c3768c439d8d82b2|7eecbc06bfec32b80654c7d76b3243c9|1985-05-03 00:00:00    |2018-11-23 19:19:16|36   |
|b8c6e74cf1b462489e907a336b7597b0|f240c43e82dfe3ca0e4dca

In [16]:
df2 = df1.groupBy(['codigo_cliente', 'idade'])\
    .agg(count(lit(1)).alias("numero_pedidos"), collect_list(struct('codigo_pedido', to_date(col('data_pedido')).alias('data_pedido')))\
    .alias('lista_pedidos'))\
    .filter('idade < 30 and numero_pedidos > 2')

df2.show(truncate=False)

+--------------------------------+-----+--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|codigo_cliente                

In [17]:
# Para abrir corretamente no Excel da versão do macOS
# vamos converter o struct em uma lista contendo o seguinte formato:
#  [[codigo_pedido, data_pedido], ...]
# E transformaremos em string pois o spark tem a limitação de salvar array
# de tipos mais simples em vez de estruturas mais complexas (struct e arrays).

In [18]:
# Salvar o dado CSV
convertStructToArrayUDF = udf(lambda z: str([[x[0], str(x[1])] for x in z]),StringType())

df2.withColumn('lista_pedidos', convertStructToArrayUDF(col('lista_pedidos'))).write.mode('overwrite').csv('output/task2/result', header=True)

In [19]:
# Não usamos coaslesce para não correr o risco de estourar a memória heap. Assim, vamos carregar
# este dado salvo novamente e reparticionar para gravar em apenas um arquivo.

df_csv = spark.read.csv('output/task2/result', header=True)
df_csv.coalesce(1).write.mode('overwrite').csv('output/task2/result_one_file', header=True)

**Testando se o arquivo abre corretamente pelo Pandas**

In [20]:
import os
import glob
import pandas as pd

# Pegar o nome do arquivo CSV pois o pandas não suporta carregar csv pelo folder
csv_files = glob.glob(os.path.join('output/task2/result_one_file', "*.csv"))
data = pd.read_csv(f'{csv_files[0]}', header=0)
data

Unnamed: 0,codigo_cliente,idade,numero_pedidos,lista_pedidos
0,9ba43b1b796103b4f020481d169bfd25,20,7106,"[['ae5f74bbf632921ff465c391d115ec9a', '2018-11..."
1,0fa42f07841dc1ca0a2982cd62934ac2,27,3,"[['6c83d74346019ab0671ea68437678a34', '2018-11..."
2,18d6d2ac14593d9a6570f63a980453f0,28,112,"[['15e668c90e8926cd1655ce0c9db403e2', '2017-11..."
3,2bb58c03967eb8c4ee3df3a3e6516504,20,4,"[['e1af9ebb325a39930d0871fca6500f9f', '2018-11..."
4,52c69e3a57331081823331c4e69d3f2e,27,3390,"[['eedd47e8f3753bbec6fa4b278d88f209', '2017-11..."
...,...,...,...,...
724,9c99541228b2f3cc30b80baccf914dfc,21,3,"[['785f1f1b8edf91078bc96885e32d7d5b', '2018-11..."
725,45d1c732e44375967abf3d7ede5b0187,22,3,"[['d9433dbbe6b7808a03744b9672bdc3ef', '2018-11..."
726,fca65461d6f37510b34a3ea3f6fd211a,26,3,"[['5e4fe79826b6f59850a44869e3f9147f', '2018-11..."
727,197d8c66c65a88d59b82d90ed4e9ce35,26,3,"[['950c5f34fbb963a4993c3f1ca5e62af7', '2017-11..."
