# Teste BelugaDB - Data Engineer
## Rafael Augusto de Oliveira
### Github: https://github.com/OliRafa
### LinkedIn: https://www.linkedin.com/in/rafael-augusto-oliveira-658560171/

###### Maio - 2019

#### Importando blibiotecas e setando algumas configurações iniciais

In [1]:
#import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from dateutil import parser, tz

#pd.set_option('display.max_colwidth', -1)
#pd.set_option('display.max_rows', 1000)
#pd.set_option('display.max_columns', -1)

spark = SparkSession \
        .builder \
        .appName("BelugaDB") \
        .config("spark.shuffle.service.enabled","true") \
        .config("spark.dynamicAllocation.enabled","true")\
        .getOrCreate()

#### Importando dataset e checando sua estrutura

In [2]:
dim = spark.read.csv('dim.csv', header=True)
dim.registerTempTable("dim")
dim.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- is_banner: string (nullable = true)
 |-- is_tax: string (nullable = true)
 |-- is_market_place: string (nullable = true)
 |-- material_status: string (nullable = true)
 |-- current_price_range: string (nullable = true)
 |-- cmc_division: string (nullable = true)
 |-- cmc_business_unit: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- product: string (nullable = true)



In [3]:
fact = spark.read.csv('fact.csv', header=True)
fact.registerTempTable("fact")
fact.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- view_date3: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- device: string (nullable = true)
 |-- product: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- store: string (nullable = true)
 |-- company: string (nullable = true)
 |-- view_date10: string (nullable = true)
 |-- gross total volume: string (nullable = true)
 |-- product net cost: string (nullable = true)
 |-- product net revenue: string (nullable = true)
 |-- gross merchandise volume: string (nullable = true)
 |-- item sold: string (nullable = true)
 |-- pageviews: string (nullable = true)



#### Realizando Join das tabelas para criação do flat dataset

Como inserido em ambiente de testes, DataFrame.cache() e DataFrame.show() serão utilizados aqui para visualização das transformações, apesar do seu alto uso de memória.

In [4]:
sql='''SELECT b.*,
                a.is_banner, 
                a.is_tax, 
                a.is_market_place, 
                a.material_status, 
                a.current_price_range, 
                a.cmc_division, 
                a.cmc_business_unit,
                a.gender
        FROM dim a INNER JOIN fact b 
        ON a.product = b.product
        ORDER BY order_date'''

dataset = spark.sql(sql)
dataset = dataset.drop('_c0')
dataset.createOrReplaceTempView("dataset")
dataset.cache()
dataset.printSchema()

root
 |-- id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- view_date3: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- device: string (nullable = true)
 |-- product: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- store: string (nullable = true)
 |-- company: string (nullable = true)
 |-- view_date10: string (nullable = true)
 |-- gross total volume: string (nullable = true)
 |-- product net cost: string (nullable = true)
 |-- product net revenue: string (nullable = true)
 |-- gross merchandise volume: string (nullable = true)
 |-- item sold: string (nullable = true)
 |-- pageviews: string (nullable = true)
 |-- is_banner: string (nullable = true)
 |-- is_tax: string (nullable = true)
 |-- is_market_place: string (nullable = true)
 |-- material_status: string (nullable = true)
 |-- current_price_range: string (nullable = true)
 |-- cmc_division: string (nullable = true)
 |-- cmc_business_unit: string (nullable = true

In [5]:
# Liberando espaço de memória por exclusão de tabelas já utilizadas
spark.catalog.dropTempView('dim')
spark.catalog.dropTempView('fact')
dim.unpersist()
fact.unpersist()

DataFrame[_c0: string, id: string, order_date: string, view_date3: string, currency: string, device: string, product: string, channel: string, store: string, company: string, view_date10: string, gross total volume: string, product net cost: string, product net revenue: string, gross merchandise volume: string, item sold: string, pageviews: string]

#### Calculando Average Cost Per Item

In [6]:
sql = '''SELECT t.total_sum / b.weight_sum AS `average cost`, t.product
         FROM (SELECT SUM(a.price_sum * a.weights) AS total_sum, a.product
                FROM (SELECT SUM(`product net cost`) AS price_sum, `item sold` AS weights, product
                      FROM dataset
                      WHERE `item sold` > 0.0 AND `product net cost` > 0.0
                      GROUP BY product, `item sold`) AS a
                GROUP BY a.product) AS t
         JOIN (SELECT SUM(`item sold`) AS weight_sum, product
                  FROM dataset
                  WHERE `item sold` > 0.0 AND `product net cost` > 0.0
                  GROUP BY product) AS b
         ON t.product = b.product'''
spark.sql(sql).show(n=100)

+------------------+-------+
|      average cost|product|
+------------------+-------+
|23.942538459274743|1002783|
|12.079333333333333| 101122|
| 69247.32821449758|1024606|
|            13.337| 102952|
|             32.96|1036260|
|           42852.0|1046915|
|32.919142856290335| 105344|
|27.154666669230462| 106585|
|             26.86|  11078|
| 30.68066666672093|1111275|
|             12.58| 111710|
| 8.549333339495465| 111982|
|             16.47|1177234|
|            229.59| 120922|
| 4.105052645474192| 123679|
|40.589666666666666|1240595|
| 42.18288889439857|1243371|
| 6.068444445065689|  12529|
|19.059802469517106|1282020|
|             5.198|1297992|
|17.269823548284748|1347366|
|37.788066669679814|  13772|
| 60.08369476238137|1444012|
| 7.180666667654134| 145079|
|             110.2| 154645|
|             6.513|1556400|
|             12.14|1567958|
|             24.01| 159526|
| 27.63216665719813| 159967|
|             18.05|1651826|
| 16.70833333428956|1654122|
| 20.152391965

In [7]:
temp = spark.sql(sql)
temp.createOrReplaceTempView("temp")
sql = '''SELECT a.*, b.`average cost`
        FROM dataset a LEFT JOIN temp b 
        ON a.product = b.product
        ORDER BY order_date'''
dataset = spark.sql(sql)
dataset.printSchema()

root
 |-- id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- view_date3: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- device: string (nullable = true)
 |-- product: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- store: string (nullable = true)
 |-- company: string (nullable = true)
 |-- view_date10: string (nullable = true)
 |-- gross total volume: string (nullable = true)
 |-- product net cost: string (nullable = true)
 |-- product net revenue: string (nullable = true)
 |-- gross merchandise volume: string (nullable = true)
 |-- item sold: string (nullable = true)
 |-- pageviews: string (nullable = true)
 |-- is_banner: string (nullable = true)
 |-- is_tax: string (nullable = true)
 |-- is_market_place: string (nullable = true)
 |-- material_status: string (nullable = true)
 |-- current_price_range: string (nullable = true)
 |-- cmc_division: string (nullable = true)
 |-- cmc_business_unit: string (nullable = true

In [8]:
dataset.createOrReplaceTempView('dataset')
spark.catalog.dropTempView('temp')

#### Calculando Average Price Per Item

In [9]:
sql = '''SELECT t.total_sum / b.weight_sum AS `average price`, t.product
         FROM (SELECT SUM(a.price_sum * a.weights) AS total_sum, a.product
                FROM (SELECT SUM(`product net revenue`) AS price_sum, `item sold` AS weights, product
                      FROM dataset
                      WHERE `item sold` > 0.0 AND `product net revenue` > 0.0
                      GROUP BY product, `item sold`) AS a
                GROUP BY a.product) AS t
         JOIN (SELECT SUM(`item sold`) AS weight_sum, product
                  FROM dataset
                  WHERE `item sold` > 0.0 AND `product net revenue` > 0.0
                  GROUP BY product) AS b
         ON t.product = b.product'''
spark.sql(sql).show(n=1000)

+------------------+-------+
|     average price|product|
+------------------+-------+
| 47.98561538007844|1002783|
|              31.1|1005483|
| 48.48466666666667| 101122|
| 5.017636358138073|1014942|
|13.459238095567121|1016299|
| 98687.69997308632|1024606|
|             28.74|1026172|
| 5.266432432945643|1028527|
|             37.99|1028799|
|           13.4595| 102952|
|             43.57|1036260|
|             90.93|1036553|
|          63785.46|1046915|
|19.161153846199777|1049200|
| 5.191999999137599|1052326|
|62.089538459916035| 105344|
|13.347999999999999|1055308|
|             41.84|1059049|
|10.756666668145336|1064995|
|5.0417142854448285|1065129|
| 18.67766666842776| 106585|
|             60.68|1065935|
|            202.47|1069730|
|1222.7654555555555|1069791|
|  8.77254273527006|1071238|
|4237.8247139208615|1076191|
|             26.42|1077907|
|             35.81|1077926|
|             36.36|  11078|
| 34.70333333339467|1111275|
|             18.17| 111710|
|12.8337500092

In [10]:
temp = spark.sql(sql)
temp.createOrReplaceTempView("temp")
sql = '''SELECT a.*, b.`average price`
       FROM dataset a LEFT JOIN temp b 
       ON a.product = b.product
       ORDER BY order_date'''
dataset = spark.sql(sql)
dataset.createOrReplaceTempView('dataset')
dataset.printSchema()

root
 |-- id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- view_date3: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- device: string (nullable = true)
 |-- product: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- store: string (nullable = true)
 |-- company: string (nullable = true)
 |-- view_date10: string (nullable = true)
 |-- gross total volume: string (nullable = true)
 |-- product net cost: string (nullable = true)
 |-- product net revenue: string (nullable = true)
 |-- gross merchandise volume: string (nullable = true)
 |-- item sold: string (nullable = true)
 |-- pageviews: string (nullable = true)
 |-- is_banner: string (nullable = true)
 |-- is_tax: string (nullable = true)
 |-- is_market_place: string (nullable = true)
 |-- material_status: string (nullable = true)
 |-- current_price_range: string (nullable = true)
 |-- cmc_division: string (nullable = true)
 |-- cmc_business_unit: string (nullable = true

In [11]:
dataset.createOrReplaceTempView('dataset')
spark.catalog.dropTempView('temp')

#### Calculando Absolute e Percentage Margin

In [12]:
dataset = dataset.withColumn('absolute margin', (dataset['product net revenue'] - dataset['product net cost']) / dataset['product net revenue'])

In [13]:
dataset = dataset.withColumn('percentage margin', dataset['absolute margin'] * 100)

In [14]:
dataset.createOrReplaceTempView('dataset')
dataset.printSchema()

root
 |-- id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- view_date3: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- device: string (nullable = true)
 |-- product: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- store: string (nullable = true)
 |-- company: string (nullable = true)
 |-- view_date10: string (nullable = true)
 |-- gross total volume: string (nullable = true)
 |-- product net cost: string (nullable = true)
 |-- product net revenue: string (nullable = true)
 |-- gross merchandise volume: string (nullable = true)
 |-- item sold: string (nullable = true)
 |-- pageviews: string (nullable = true)
 |-- is_banner: string (nullable = true)
 |-- is_tax: string (nullable = true)
 |-- is_market_place: string (nullable = true)
 |-- material_status: string (nullable = true)
 |-- current_price_range: string (nullable = true)
 |-- cmc_division: string (nullable = true)
 |-- cmc_business_unit: string (nullable = true

#### Convertendo datas

Checando se todas as datas de uma mesma coluna seguem o mesmo padrão de tamanho, procurando possíveis valores discrepantes (date e timestamp misturados) que precisam ser tratados

In [15]:
sql = 'SELECT order_date FROM dataset WHERE LENGTH(order_date) > 8'
spark.sql(sql).show(n=1000, truncate=False)

+----------+
|order_date|
+----------+
+----------+



In [16]:
'''TESTAR 
utc_zone =  tz.gettz('UTC')

func = udf(lambda x: parser.parse(x).astimezone(utc_zone).isoformat(),  StringType())

test = fact.withColumn("order_date2",func(col("order_date")))
test.createOrReplaceTempView('test')
test.cache()
spark.sql('select order_date order_date2 from test limit 5').show(truncate=False)
spark.sql('select DATE(order_date) from test ORDER BY order_date').show(truncate=False)'''

'TESTAR \nutc_zone =  tz.gettz(\'UTC\')\n\nfunc = udf(lambda x: parser.parse(x).astimezone(utc_zone).isoformat(),  StringType())\n\ntest = fact.withColumn("order_date2",func(col("order_date")))\ntest.createOrReplaceTempView(\'test\')\ntest.cache()\nspark.sql(\'select order_date order_date2 from test limit 5\').show(truncate=False)\nspark.sql(\'select DATE(order_date) from test ORDER BY order_date\').show(truncate=False)'

Criando a função de conversão para 'order_date', pois todas as outras formas de conversão testadas anteriormente
retornaram 'null' ou datas errôneas

In [17]:
def date_carai(x):
    return x[:4] + '-' + x[4:6] + '-' + x[6:]

squared_udf = udf(lambda y: date_carai(y), StringType())

dataset = dataset.withColumn('order_date', squared_udf('order_date'))

In [18]:
dataset.createOrReplaceTempView('dataset')
dataset.printSchema()

root
 |-- id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- view_date3: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- device: string (nullable = true)
 |-- product: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- store: string (nullable = true)
 |-- company: string (nullable = true)
 |-- view_date10: string (nullable = true)
 |-- gross total volume: string (nullable = true)
 |-- product net cost: string (nullable = true)
 |-- product net revenue: string (nullable = true)
 |-- gross merchandise volume: string (nullable = true)
 |-- item sold: string (nullable = true)
 |-- pageviews: string (nullable = true)
 |-- is_banner: string (nullable = true)
 |-- is_tax: string (nullable = true)
 |-- is_market_place: string (nullable = true)
 |-- material_status: string (nullable = true)
 |-- current_price_range: string (nullable = true)
 |-- cmc_division: string (nullable = true)
 |-- cmc_business_unit: string (nullable = true

In [None]:
dataset.show(n=5, truncate=False)

#### Calculando Variation in Average Price Per Item (% vs. last day)

Realizando cálculo de average cost per item per day para ser utilizado posteriormente

In [None]:
sql = '''SELECT t.total_sum / b.weight_sum AS `average cost per day`, t.product, t.order_date
         FROM (SELECT SUM(a.price_sum * a.weights) AS total_sum, 
                     a.product, 
                     order_date
                FROM (SELECT SUM(`product net cost`) AS price_sum, `item sold` AS weights, product, order_date
                      FROM dataset
                      WHERE `item sold` > 0.0 AND `product net cost` > 0.0
                      GROUP BY product, order_date, `item sold`) AS a
                GROUP BY a.product, a.order_date) AS t
         JOIN (SELECT SUM(`item sold`) AS weight_sum, 
                     product, 
                     order_date
                  FROM dataset
                  WHERE `item sold` > 0.0 AND `product net cost` > 0.0
                  GROUP BY product, order_date) AS b
         ON t.product = b.product AND t.order_date = b.order_date'''

temp = spark.sql(sql)

In [None]:
temp.createOrReplaceTempView('temp')
temp.printSchema()

In [None]:
sql = '''SELECT a.*, b.`average cost per day`
        FROM dataset a LEFT JOIN temp b
        ON a.product = b.product AND a.order_date = b.order_date'''
dataset = spark.sql(sql)

In [None]:
dataset.createOrReplaceTempView('dataset')

Calculando Variation in Average Price Per Item

In [None]:
sql = '''WITH daily_avg AS (SELECT DATE(order_date) date, product, `average cost per day` avg
                FROM dataset
                GROUP BY product, date, avg)
        SELECT (((avg - yesterday_avg) / yesterday_avg) * 100) AS `variation in averge price`, 
                date, 
                product
        FROM (SELECT today_avg.avg AS avg, 
                today_avg.date AS date, 
                today_avg.product AS product, 
                lastday_avg.avg AS yesterday_avg
            FROM daily_avg AS today_avg 
            LEFT JOIN daily_avg AS lastday_avg
            WHERE today_avg.product = lastday_avg.product
            AND today_avg.date = DATE_SUB(lastday_avg.date, 1))'''

temp = spark.sql(sql)
temp.createOrReplaceTempView('temp')

In [None]:
sql = '''SELECT a.*, b.`variation in averge price`
        FROM dataset a LEFT JOIN temp b
        ON a.order_date = b.date
        AND a.product = b.product'''

dataset = spark.sql(sql)

In [None]:
dataset = dataset.drop('average cost per day')
dataset.createOrReplaceTempView('dataset')
dataset.cache()
spark.catalog.dropTempView('temp')
dataset.printSchema()

#### Mapeando gender

In [None]:
dataset.select('gender').distinct().show(n=1000, truncate=False)

Devido aos possíveis valores de 'gender', escolhi reduzir a apenas quatro possibilidades, sendo elas Masculino, Feminino, Intanfil e Unissex

In [None]:
def gender(x):
    x = x.lower()
    infantil = ['beb', 'niñ', 'infantil', 'menina']
    y = ['menino', 'menina']
    if any (_ in x for _ in infantil) or any (_ is x for _ in y):
        return 'Infantil'
    elif ('uni' in x) or (('masc' in x) and ('fem' in x)) or (('boy' in x) and ('girl' in x)):
        return 'Unissex'
    elif ('fem' in x) or (x == 'girls'):
        return 'Feminino'
    elif ('masc' in x) or (x == 'boys') or (x == 'menino'):
        return 'Masculino'

gender_udf = udf(lambda y: gender(y), StringType())

dataset = dataset.withColumn('gender', gender_udf('gender'))

In [None]:
dataset.select('gender').distinct().show(truncate=False)

In [None]:
dataset.createOrReplaceTempView('dataset')
dataset.cache()

#### ISO 8601

In [None]:
dataset = dataset.withColumn('order_date', to_timestamp('order_date'))

iso_converter_udf = udf(lambda y: str(y).replace(' ', 'T') + 'Z', StringType())

dataset = dataset.withColumn('order_date', iso_converter_udf('order_date'))

In [None]:
dataset.select('order_date').show()

In [None]:
dataset.createOrReplaceTempView('dataset')
dataset.cache()

#### Removendo alguns campos que não estão diretamente ligados a produto

In [None]:
dataset = dataset.drop('id', 
               'view_date3', 
               'view_date10', 
               'device', 
               'is_banner', 
               'is_tax', 
               'is_market_place', 
               'material_status','channel')

In [None]:
dataset.show()

In [None]:
dataset.createOrReplaceTempView('dataset')
dataset.cache()

#### To JSON

In [None]:
to_disk = dataset.toJSON()

In [None]:
to_disk.first()

Salvando em disco, para exemplificar saída para ElasticSearch

In [None]:
#to_disk.saveAsTextFile('dataset_final.txt')

#### Questions

1 - 

In [None]:
dataset.describe('order_date').show()

In [None]:
dataset.describe('product').show()

In [None]:
x=spark.sql('SELECT count(product), order_date FROM dataset group by order_date').toPandas()

In [None]:
x.hist()

O número de produtos relacionados com cada um dos timestamps é assimetrico, tendo assim uma pior distribuição dos dados entre as partições, o que faz com que a computação dos dados seja ineficiente do ponto de vista de poder computacional, onde um grupo de partições realizará muitas mais operações que os restantes, aumentando o tempo necessário para computar determinada transformação.

Isso pode ser corrigido utilizando a técnica de Salting, que adiciona um valor randômico, de 0 até N, de forma a dividir os dados em N partições, fazendo com que esses dados da dimensão sejam melhor distribuídos.

2 - Modificar o Mapping, como segue:

In [None]:
'''{
  "template": "beluga_timestamp_index-*",
  "settings": {
    "index": {
      "refresh_interval": "5s"
    }
  },
  "mappings": {
    "_default_": {
      "dynamic_templates": [
        {
          "strings": {
            "match": "*",
            "match_mapping_type": "string",
            "mapping":   { 
                            "type": "string",  
                            "doc_values": true, 
                            "index": "not_analyzed" 
                        }
          }
        }
      ],
      "_all":            { "enabled": false },
      "_source":         { "enabled": true },
      "properties": {
        "order_date":                  { "type": "date",    "doc_values": true, "format": "yyyy-MM-dd'T'HH:mm:ssZ" },
        "product":                    { "type": "integer", "doc_values": true, "index": "no" },
        "currency":                   { "type": "float",   "doc_values": true, "index": "no" },
        "store":                      { "type": "float",   "doc_values": true, "index": "no" },
        "company":                    { "type": "float",   "doc_values": true, "index": "no" },
        "current_price_range":        { "type": "integer", "doc_values": true, "index": "no" },
        "cmc_division":               { "type": "integer", "doc_values": true, "index": "no" },
        "cmc_business_unit":          { "type": "float",   "doc_values": true, "index": "no" },
        "gender":                     { "type": "float",   "doc_values": true, "index": "no" },
        "gross total volume":         { "type": "float",   "doc_values": true, "index": "no" },
        "product net cost":           { "type": "float",   "doc_values": true, "index": "no" },
        "product net revenue":        { "type": "float",   "doc_values": true, "index": "no" },
        "gross merchandise volume":   { "type": "float",   "doc_values": true, "index": "no" },
        "item sold":                  { "type": "float",   "doc_values": true, "index": "no" },
        "pageviews":                  { "type": "float",   "doc_values": true, "index": "no" },
        "average cost":               { "type": "float",   "doc_values": true, "index": "no" },
        "average price":              { "type": "float",   "doc_values": true, "index": "no" },
        "absolute margin":            { "type": "float",   "doc_values": true, "index": "no" },
        "percentage margin":          { "type": "boolean", "doc_values": true, "index": "no" },
        "variation in averge price":  { "type": "string",  "doc_values": true, "index": "no" }
      }
    }
  }
}'''

"order_date" é a única propriedade indexada, pois as pesquisas serão feitas com base nesse campo. Os outros não são indexados pois são dados que serão apenas retornados na query.

"_all" setado em False pelo mesmo motivo acima citado

"doc_values" estão habilitados para contornar o problema de estouro de heap memory de fielddata quando são feitas grandes quantidades de aggregations e sorting.