# Desafio: Engenharia de Dados

## Task1

Leia o arquivo de texto ​ wordcount.txt ​ e conte as palavras que contém até 10 letras. Conte
também quantas palavras com mais de 10 letras existem no texto.
Dataset: ​ https://storage.googleapis.com/luizalabs-hiring-test/wordcount.txt

Exemplo do dataset final:
[('two', 2),
('behold', 1),
('itself', 3),
(‘MAIORES QUE 10’, 3)]

In [1]:
# Parte a ser adaptada com a referência do pacote pyspark

import findspark
findspark.init('/home/lucas/spark-3.0.0-preview-bin-hadoop2.7')

In [2]:
# Biblioteca Task1

import pyspark
from pyspark.sql import SparkSession

from pyspark import SparkFiles

from pyspark.ml.feature import Tokenizer, RegexTokenizer

from pyspark.sql.functions import udf,col
from pyspark.sql.types import (StructType,
                               IntegerType, 
                               ArrayType, 
                               StringType,
                               StructField)
import pandas as pd

In [3]:
# Criando sessão
spark = SparkSession.builder.appName('desafio').getOrCreate()

In [4]:
# Leitura do texto

url1 = 'https://storage.googleapis.com/luizalabs-hiring-test/wordcount.txt'

spark.sparkContext.addFile(url1)

text_task_1 = spark.read.text("file://" + SparkFiles.get('wordcount.txt'), wholetext=True, lineSep=None)

print(text_task_1.schema)
#print(text_task_1.head())
print('\n')
print(text_task_1.columns)

StructType(List(StructField(value,StringType,true)))


['value']


In [5]:
# Pretendo selecionar todas as palavras eliminando todo o tipo de pontuação, espaço ou símbolo especial.

# Para isso usarei o padrão '\W' de Expressão Regular, que corresponde a caracteres diferentes de letras e algarismos, 
# ou seja, corresponde a espaços, pontuações.

regex_tokenizer = RegexTokenizer(inputCol='value', outputCol='words',
                                pattern='\\W')

In [6]:
rg_tokenized = regex_tokenizer.transform(text_task_1)

In [7]:
rg_tokenized.show()

+--------------------+--------------------+
|               value|               words|
+--------------------+--------------------+
|henDRERIt. MoNTEs...|[hendrerit, monte...|
+--------------------+--------------------+



In [8]:
rg_tokenized.schema

StructType(List(StructField(value,StringType,true),StructField(words,ArrayType(StringType,true),true)))

In [9]:
# definindo meu contador de palavras, a ser utilizado em uma udf.

def word_counter(lista,num_max=10):
    #dicionário para contagem de palavras:
    count_words = {}
    
    #variável para registrar número máximo de caracteres considerado na contagem:
    key_max = 'MAIORES QUE ' + str(num_max)
    
    #contagem de palavras com mais de 'num_max' inicializada com 0
    count_words = {key_max:0}
    
    #Para cada nova palavra encontrada, adiciono nova referência ao dicionário
    #Para cada palavra repetida que aparece adiciono 1 ao contador:
    for word in lista:
        if len(word) <=num_max:
            if word in count_words:
                count_words[word] +=1
                print(word)
            else:
                count_words[word] =1
        else:
            count_words[key_max] +=1
    return [(key,value) for (key,value) in count_words.items()]


In [10]:
count_words = udf(word_counter, ArrayType(StructType([StructField("palavra",StringType(),True),StructField("contagem",IntegerType(), True)])))

In [11]:
task1 = rg_tokenized.withColumn("palavras_contadas", count_words(col("words"))).select("palavras_contadas")

In [12]:
#task1.show(1, False)

# Forma de visualizar melhor o resultado.
from pyspark.sql.functions import explode
task1.withColumn("palavras",
                 explode(task1.palavras_contadas)
                ).select('palavras').show(5)

+--------------------+
|            palavras|
+--------------------+
|[MAIORES QUE 10, ...|
|     [hendrerit, 21]|
|        [montes, 22]|
|         [purus, 21]|
|        [luctus, 13]|
+--------------------+
only showing top 5 rows



In [13]:
#task1.write.json('palavras_contadas.json')

In [14]:
pandas_task1 = task1.select("palavras_contadas").toPandas()

In [15]:
pandas_task1.head()

Unnamed: 0,palavras_contadas
0,"[(MAIORES QUE 10, 115), (hendrerit, 21), (mont..."


In [16]:
pandas_task1.to_csv('task1.csv', index=False)

In [17]:
#https://stackoverflow.com/questions/57381557/pyspark-converting-an-array-of-struct-into-string

In [18]:
# tentativa fracassada de transformar a lista em uma string para contornar um problema
# de exportar tipo Array diretamente para CSV.
#def stringfier(lista):
#    resultado = "["
#    for element in lista:
#        resultado.append("["+str(element[0])+','+str(element[1])+']'+',')
#    resultado.rstrip(',').append(']')
#    return resultado

#stringfy = udf(stringfier, StringType())
#task1.withColumn('stringificado',stringfy(col("palavras_contadas"))).show(1, False)

# Task 2

- Leia o arquivo pedidos.csv 
- Agrupe todos os cliente que fizeram mais de 2 compras nos dias de black friday dos últimos três anos. 
- Filtre todos os clientes que são menores de 30 anos e coloque numa lista TODOS os códigos de pedido e a data em que foram efetuados. 
- Adicione também a idade do cliente. 

Dataset: ​ https://storage.googleapis.com/luizalabs-hiring-test/clientes_pedidos.csv 


In [19]:
# Biblioteca Task2

from pyspark import SparkFiles

from pyspark.sql.types import (StructType,
                               StructField,
                               StringType,
                               IntegerType,
                               DateType)

from pyspark.sql.functions import (to_date,
                                   to_timestamp,
                                   unix_timestamp,
                                   from_unixtime,
                                   lit,
                                   current_date,
                                   datediff,
                                   collect_list,
                                   arrays_zip,
                                   concat_ws)

In [20]:
# Leitura do Arquivo:
url2 = 'https://storage.googleapis.com/luizalabs-hiring-test/clientes_pedidos.csv'

spark.sparkContext.addFile(url2)

table = spark.read.csv("file://" + SparkFiles.get('clientes_pedidos.csv'), header=True)

print(table.schema)

StructType(List(StructField(codigo_pedido,StringType,true),StructField(codigo_cliente,StringType,true),StructField(data_nascimento_cliente,StringType,true),StructField(data_pedido,StringType,true)))


https://medium.com/luizalabs/as-armadilhas-do-spark-101-d13a3296dcd9

In [21]:
#Configuração do Schema
schema = StructType([
    StructField("codigo_pedido",StringType(),True),
    StructField("codigo_cliente",StringType(),True),
    StructField("data_nascimento_cliente",StringType(),True),
    StructField("data_pedido",StringType(),True)])

#Nova leitura com o schema correto.
table = spark.read.csv("file://" + SparkFiles.get('clientes_pedidos.csv'), header=True, schema=schema)

In [22]:
table.printSchema()

root
 |-- codigo_pedido: string (nullable = true)
 |-- codigo_cliente: string (nullable = true)
 |-- data_nascimento_cliente: string (nullable = true)
 |-- data_pedido: string (nullable = true)



### Verificando o aspecto dos dados.

In [23]:
table.select("data_nascimento_cliente").head()

Row(data_nascimento_cliente='1985-12-04 00:00:00')

In [24]:
table.select("data_pedido").head()

Row(data_pedido='1542974527')

#### Criarei uma nova coluna "d_nascimento" através da transformação da coluna "data_nascimento": transformando a informação do tipo string para o tipo data.

In [25]:
new_table = table.withColumn('d_nascimento', to_date(table.data_nascimento_cliente).alias("to_date"))


In [26]:
new_table.select("d_nascimento").show(5)

+------------+
|d_nascimento|
+------------+
|  1985-12-04|
|  1979-11-14|
|  1989-07-25|
|  1953-12-14|
|  1985-05-03|
+------------+
only showing top 5 rows



#### Agora vou verificar o formato de "data_pedido" e transformar em um formato que permita a análise desejada.

In [27]:
table.select('data_pedido').describe().show()

+-------+--------------------+
|summary|         data_pedido|
+-------+--------------------+
|  count|              297309|
|   mean|1.5287494219447646E9|
| stddev|1.5643343589912213E7|
|    min|          1509503453|
|    max|          1543622000|
+-------+--------------------+



Aparentemente "data_pedido" o tempo Unix: contagem em segundos com relação ao marco UNIX (1970-01-01 00:00:00 UTC).

Referência: https://sparkbyexamples.com/spark/spark-sql-date-and-time-functions/

#### Criarei uma nova coluna "d_pedido" através da transformação da coluna "data_pedido": transformando a informação unixtime (registrado como inteiro) para o tipo data.

In [28]:
new_table = new_table.withColumn('d_pedido',to_date(from_unixtime(table.data_pedido)))

In [29]:
new_table.select('d_pedido').show(5)

+----------+
|  d_pedido|
+----------+
|2018-11-23|
|2018-11-23|
|2018-11-23|
|2018-11-23|
|2018-11-23|
+----------+
only showing top 5 rows



In [30]:
new_table.head()

Row(codigo_pedido='bc8b03a005d5bf742fc7290db1b218de', codigo_cliente='b07af86a4a68707373856bcc3946583f', data_nascimento_cliente='1985-12-04 00:00:00', data_pedido='1542974527', d_nascimento=datetime.date(1985, 12, 4), d_pedido=datetime.date(2018, 11, 23))

In [31]:
new_table.printSchema()

root
 |-- codigo_pedido: string (nullable = true)
 |-- codigo_cliente: string (nullable = true)
 |-- data_nascimento_cliente: string (nullable = true)
 |-- data_pedido: string (nullable = true)
 |-- d_nascimento: date (nullable = true)
 |-- d_pedido: date (nullable = true)



### Agora que já temos um formato adequado para tratar as datas dos pedidos precisamos filtrar os clientes que realizaram mais de 2 compras considerando as três últimas datas de Black Friday.

Black friday dos últimos três anos:
- 2018: 23 de novembro
- 2017: 24 de novembro
- 2016: 25 de novembro

In [32]:
black_fridays = ("2018-11-23", "2017-11-24", "2016-11-25")

##### Contando pedidos feitos nas datas de Black Friday:

In [33]:
contagem_black = new_table.filter((new_table.d_pedido == (lit(black_fridays[0])))|(new_table.d_pedido ==lit(black_fridays[1]))|(new_table.d_pedido ==lit(black_fridays[2]))).groupBy('codigo_cliente').agg({'codigo_pedido':'count'})

In [34]:
contagem_black.printSchema()

root
 |-- codigo_cliente: string (nullable = true)
 |-- count(codigo_pedido): long (nullable = false)



In [35]:
contagem_black.columns

['codigo_cliente', 'count(codigo_pedido)']

#### Filtrando clientes que realizaram mais de duas compras considerando as três últimas Black Fridays:

In [36]:
clientes_black = contagem_black.filter(contagem_black['count(codigo_pedido)']>2).select('codigo_cliente')

In [37]:
clientes_black.show(3)

+--------------------+
|      codigo_cliente|
+--------------------+
|b67ef7abecc0a8e88...|
|28688f66084a7f1de...|
|3f89d915a06a3d01e...|
+--------------------+
only showing top 3 rows



#### Tendo a lista de clientes compradores que satisfazem a condição de ter feito mais de duas compras ao longo das últimas três Black Fridays, conseguimos recuperar a totalidade de compras realizadas por eles (tanto dentro como fora do período de Black Friday).

In [38]:
#realizando a junção interna da tabela de clientes com a tabela contendo o registro geral de pedidos para filtrar 
# apenas os clientes que satisfazem a condição e resgatar as demais compras feitas por eles.
compras_clientes_black = clientes_black.join(new_table, clientes_black.codigo_cliente == new_table.codigo_cliente, "inner")

#retirando coluna duplicada vinda do join
compras_clientes_black = compras_clientes_black.drop(new_table["codigo_cliente"])

#retirando registros duplicados vindos do join
compras_clientes_black.dropDuplicates()

#contando o número de pedidos feitos pelos clientes (tanto na Black Friday como fora dela)
# e armazenando em compra_geral_clientes_black
compra_geral_clientes_black = compras_clientes_black.groupBy('codigo_cliente').agg({'codigo_pedido':'count'})


### Devemos agora construir uma nova coluna "idade" a partir da coluna "d_nascimento" para selecionar apenas os clientes de menos de 30 anos.

In [39]:
#A coluna "idade" é construída pela diferença entre a data atual e a data
# do aniversário registrada em 'd_nascimento'.
compras_clientes_black = compras_clientes_black.withColumn('idade', ((datediff(current_date(),compras_clientes_black.d_nascimento))/365).cast('integer'))

In [40]:
compras_clientes_black.select('idade').show(3)

+-----+
|idade|
+-----+
|   49|
|   49|
|    8|
+-----+
only showing top 3 rows



#### Filtrando clientes que satisfazem a condição de compra e que têm menos de 30 anos:

In [41]:
compras_clientes_black_under30 = compras_clientes_black.filter(compras_clientes_black['idade']<30)

https://stackoverflow.com/questions/37440373/spark-dataframe-aggregate-column-values-by-key-into-list

In [42]:
#Aqui construirei a tabela "pedido" que registra para cada cliente uma lista com TODOS os pedidos no 
# formato ["código","data"] dos clientes filtrados.

pedido = (compras_clientes_black_under30.groupBy(compras_clientes_black_under30["codigo_cliente"])
     .agg(collect_list("codigo_pedido").alias("pedidos"),
        collect_list("d_pedido").alias("data"))
     .withColumn("pedidos", arrays_zip("pedidos","data")).drop("data"))

In [43]:
pedido.show(5)

+--------------------+--------------------+
|      codigo_cliente|             pedidos|
+--------------------+--------------------+
|3bfcd49a281054bbf...|[[a071669a5000ce0...|
|18d6c857acbfb21b5...|[[aede83e21d3683a...|
|3860f681456fae15d...|[[66c1ea9f1aa3b25...|
|a3c3bc3ebac103447...|[[07089f306713afa...|
|d221905b7af7b56c0...|[[82c2a7a825ac040...|
+--------------------+--------------------+
only showing top 5 rows



In [44]:
# Agora construirei a visualização solicitada:

# Unindo a tabela contendo informações gerais com a contendo a lista de pedidos de cada cliente filtrado:
visual = compras_clientes_black_under30.join(pedido, compras_clientes_black_under30.codigo_cliente == pedido.codigo_cliente,"inner").drop(pedido["codigo_cliente"])
# Unindo agora com o registro de contagem de compras gerais feitas pelo cliente filtrado
visual = visual.join(compra_geral_clientes_black, visual.codigo_cliente == compra_geral_clientes_black.codigo_cliente).drop(compra_geral_clientes_black["codigo_cliente"])
# Selecionando apenas as colunas solicitadas para a visualização
visual = visual.select(['codigo_cliente','count(codigo_pedido)','idade','pedidos'])
# deduplicando os registros duplicados advindos do join
visual = visual.dropDuplicates()

# apenas para verificação:
visual.show(2)

+--------------------+--------------------+-----+--------------------+
|      codigo_cliente|count(codigo_pedido)|idade|             pedidos|
+--------------------+--------------------+-----+--------------------+
|3bfcd49a281054bbf...|                   3|   19|[[a071669a5000ce0...|
|18d6c857acbfb21b5...|                   4|   29|[[aede83e21d3683a...|
+--------------------+--------------------+-----+--------------------+
only showing top 2 rows



In [45]:
visual.select('pedidos').head()

Row(pedidos=[Row(pedidos='a071669a5000ce07ad709886c8109906', data=datetime.date(2018, 11, 23)), Row(pedidos='ca2360f9f20482f1f0d851a15ae67498', data=datetime.date(2018, 11, 23)), Row(pedidos='cb422c641730320669dc49b88c069a00', data=datetime.date(2018, 11, 23))])

In [46]:
print(visual.schema)

StructType(List(StructField(codigo_cliente,StringType,true),StructField(count(codigo_pedido),LongType,false),StructField(idade,IntegerType,true),StructField(pedidos,ArrayType(StructType(List(StructField(pedidos,StringType,true),StructField(data,DateType,true))),false),true)))


In [47]:
#visual.write.json('pedidos.json')

In [48]:
visual_pandas = visual.toPandas()
visual_pandas.to_csv('task2.csv', index=False)