# Notebook para criação da base do volume de vendas (qtd) por:
   - SKU;
   - Seller;
   - Estado (Regional);
   - Departamento;
   - Setor;

## Inicialização da sessão do notebook importando a biblioteca do projeto

In [2]:
# @hidden_cell
# The project token is an authorization token that is used to access project resources like data sources, connections, and used by platform APIs.
from project_lib import Project
project = Project(spark.sparkContext, 'de02faf2-fbe4-466f-b8cd-44296f096a9d', 'p-25957a2b4ca812c11c01435cf691ea3068358702')
pc = project.project_context

## Importando as bibliotecas iniciais e credencias do projeto

In [3]:
# Importando algumas bibliotecas
import sys
import types
import pandas as pd
from botocore.client import Config
import ibm_boto3

In [4]:
# @hidden_cell
# The following code contains the credentials for a file in your IBM Cloud Object Storage.
# You might want to remove those credentials before you share your notebook.
credentials_1 = {
    'IAM_SERVICE_ID': 'iam-ServiceId-f8ddc357-a261-42a2-a6f2-44ac38d6cc50',
    'IBM_API_KEY_ID': '5y97tXSIMk-o11IIlG-qhORjxg4zQ6yFprxqB54ntyX9',
    'ENDPOINT': 'https://s3-api.us-geo.objectstorage.service.networklayer.com',
    'IBM_AUTH_ENDPOINT': 'https://iam.cloud.ibm.com/oidc/token',
    'BUCKET': 'projetoviagrupo6-donotdelete-pr-ehmgbn0j3d7dfc',
    'FILE': 'compra.snappy.parquet'
}

In [5]:
# The following code accesses a file in your IBM Cloud Object Storage. It includes your credentials.
# You might want to remove those credentials before you share your notebook.
cgsClient = ibm_boto3.client(service_name='s3',
    ibm_api_key_id = credentials_1['IBM_API_KEY_ID'],
    ibm_auth_endpoint="https://iam.ng.bluemix.net/oidc/token",
    config=Config(signature_version='oauth'),
    endpoint_url='https://s3-api.us-geo.objectstorage.service.networklayer.com')

### Listando as pastas que estão na raiz do projeto

In [6]:
ls

[0m[01;34mconda[0m/  [01;34mlogs[0m/  [01;34mspark-events[0m/  [01;34muser-libs[0m/


### Criando uma pasta temporária: tmp

In [7]:
mkdir tmp

In [8]:
ls

[0m[01;34mconda[0m/  [01;34mlogs[0m/  [01;34mspark-events[0m/  [01;34mtmp[0m/  [01;34muser-libs[0m/


# Importando as credenciais e trazendo as bases

## Primeira base para agregação e avaliação: LOJISTA 

In [9]:
import ibmos2spark, os
# @hidden_cell

if os.environ.get('RUNTIME_ENV_LOCATION_TYPE') == 'external':
    endpoint_f35d990ae08e4675b679f0f50769655c = 'https://s3-api.us-geo.objectstorage.softlayer.net'
else:
    endpoint_f35d990ae08e4675b679f0f50769655c = 'https://s3-api.us-geo.objectstorage.service.networklayer.com'

credentials = {
    'endpoint': endpoint_f35d990ae08e4675b679f0f50769655c,
    'service_id': 'iam-ServiceId-f8ddc357-a261-42a2-a6f2-44ac38d6cc50',
    'iam_service_endpoint': 'https://iam.cloud.ibm.com/oidc/token',
    'api_key': '5y97tXSIMk-o11IIlG-qhORjxg4zQ6yFprxqB54ntyX9'
}

configuration_name = 'os_f35d990ae08e4675b679f0f50769655c_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

from pyspark.sql import SparkSession
from pyspark.sql import functions as func

## Selecionando 
spark = SparkSession.builder.getOrCreate()
df_lojista = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(cos.url('lojista.csv', 'projetoviagrupo6-donotdelete-pr-ehmgbn0j3d7dfc'))
df_lojista.take(5)

[Row(idlojista='10822', flagativa='1', flaglojistadefault='0', classificacao='4', porcentagempositivo='89', quantidadereviews='56', estado='SP', flagrestricaovendapj='0', retiroemloja=None, lojistagpa=None, lojistainternacional=None, idgrupoitem='1000', datadesativacao=None, dataatualizacao='2019-03-29 16:58:04.247', idcompanhia=None, idseller='f5619066-5eca-4048-aade-e8a7149dc55e', idsubseller='700003557', idbandeira='7'),
 Row(idlojista='13812', flagativa='1', flaglojistadefault='0', classificacao='3', porcentagempositivo='60', quantidadereviews='5', estado='SP', flagrestricaovendapj='0', retiroemloja=None, lojistagpa=None, lojistainternacional=None, idgrupoitem='1000', datadesativacao=None, dataatualizacao='2019-03-29 16:59:09.853', idcompanhia=None, idseller='90fdcbdd-49ae-4b39-8b2b-fe7ced365e5d', idsubseller='700011193', idbandeira='343'),
 Row(idlojista='14404', flagativa='1', flaglojistadefault='0', classificacao='3', porcentagempositivo='64', quantidadereviews='71', estado='SP'

## Segunda base para agregação e avaliação: COMPRAENTREGA

In [10]:
df_compraentrega = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(cos.url('compraentrega.csv', 'projetoviagrupo6-donotdelete-pr-ehmgbn0j3d7dfc'))
df_compraentrega.take(5)

[Row(idcompraentrega='82908258', idcompra='166020553', idfreteentregatipo='17', idcompraentregastatus='VAL', identificadorfrete=None, idadministradorstatus='1', datasaida=None, dataprevisao=None, dataentrega=None, datastatus='2019-06-05 17:55:00', prazoentregamaisdisponibilidade=None, prazoestoque=None, gerencialid='16602055302', dataemissaonotafiscal=None, idtransportadora=None, codigocontratotransportadora=None, dataentregacorrigida=None, numctrc=None, idnotafiscal=None, idperiodoentrega=None, dataentregaagendada=None, idregiao='0', flag_chave_nfe='0', dataprometidaoriginal=None, idlojista='14406', flagenviadomarketplace='0', gerencialidmktp=None, prazotransportadora='0', prazocd='0', idfilial='1', origem='TD', ordemdevolucao='0', sequencialcompra='2', flagbloqueado=None, ordemvendaerp='B20139885', flagprazoentregamarcado=None, nomeautorizaretireloja=None, rgautorizaretireloja=None, enviadolojistagpa='0', datalimitesaidacd='2019-06-05 00:00:00', sequencialprocesso='2', dataordemvenda

## Terceira base para agregação e avaliação: COMPRAENTREGASKU

In [11]:
df_compraentregasku = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(cos.url('compraentregasku.csv', 'projetoviagrupo6-donotdelete-pr-ehmgbn0j3d7dfc'))
df_compraentregasku.take(5)

[Row(idcompraentregasku='61380787', idcompraentregaskupai=None, iddescontocomprejunto=None, iddescontobrindecausa=None, idcompraentrega='74476290', idsku='11728496', idestoqueprovavel='7', valorvendaunidadesemdesconto='169.90', valorvendaunidade='169.90', valorcupomnominal='0.00', valorvendaunidademenoscupomnominal='169.90', valorfretecomdesconto='72.60', valorfrete='72.60', flagestoqueimpacto='1', presentemensagem=None, presentede=None, presentepara=None, idskukitsige=None, tipocombosige=None, datacriacaoregistro='2019-06-01 00:15:56.447', valorbogof='0.00', parceirorecomendacao=None, flagajusteoperacional=None, valorfreteoriginalnaorentavel='72.60', statustdca=None, valorst='0.00', sequencial='1', prazogarantiafornecedor='12', tipoestoque='N', valorjurosunitario=None, idskureferencia='11728496', precoiofvalorvenda='0.00', valorcomissao=None, percentualComissao=None, idbandeira='7'),
 Row(idcompraentregasku='61394653', idcompraentregaskupai=None, iddescontocomprejunto=None, iddesconto

## Quarta base para agregação e avaliação: COMPRA

In [12]:
df_compra = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(cos.url('compra.csv', 'projetoviagrupo6-donotdelete-pr-ehmgbn0j3d7dfc'))
df_compra.take(5)

[Row(idcompra='165996430', idcliente='11086617', idadministradortelevenda=None, idlistadecompra=None, midia='cpc', parceiro='rtbhouse', campanha=None, palavrachave='rtbhouse', origem=None, data='2019-06-01 10:03:00', valortotalcomdesconto='387.30', valortotalcomdesconto_devido='387.30', testeab=None, flagativa='0', statusintegracao=None, ipcliente='177.67.247.195', codigopromocao=None, infocomplcodigopromocao=None, servidororigem='CARRINHO-EX12', codloja=None, idlojafisica=None, idopcaoentregaexpressa=None, flagrespostapromocao=None, idcanalvenda='SITE', datacriacaoregistro='2019-06-04 06:02:53.520', respostapromocao=None, flag_cupom='0', idenderecolojafisica=None, useragent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36', dtajusteoperacional=None, idtipopedido='1', flaggerapontos=None, dataaceiteclubeextra=None, idtipoimposto=None, cif_fob=None, pedidoexterno=None, sequencial='1', flagcomprasige=None, flagaprovado='

## Quinta base para agregação e avaliação: SKUCATEGORIA

In [13]:
df_skucategoria = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(cos.url('skucategoria.csv', 'projetoviagrupo6-donotdelete-pr-ehmgbn0j3d7dfc'))
df_skucategoria.take(5)

[Row(idsku='13828044', iddepartamento='1813', nomedepartamento='Malas e Mochilas', idsetor='1830', nomesetor='Mochilas'),
 Row(idsku='1500292229', iddepartamento='102', nomedepartamento='Beleza e Saúde  ', idsetor='2839', nomesetor='Tratamento de Ar'),
 Row(idsku='1500861985', iddepartamento='2471', nomedepartamento='Instrumentos Musicais', idsetor='2093', nomesetor='Cordas'),
 Row(idsku='3758662', iddepartamento='2547', nomedepartamento='Casa e Construção', idsetor='1093', nomesetor='Duchas, Chuveiros e Acessórios'),
 Row(idsku='11686170', iddepartamento='418', nomedepartamento='Esporte e Lazer', idsetor='422', nomesetor='Monitores e Acessórios')]

## Criando as bases temporárias para fazer os relacionamentos necessários 

In [32]:
df_compraentregasku.createOrReplaceTempView("tb_compraentregasku")
df_compraentrega.createOrReplaceTempView("tb_compraentrega")
df_compra.createOrReplaceTempView("tb_compra")
df_lojista.createOrReplaceTempView("tb_lojista")
df_skucategoria.createOrReplaceTempView("tb_skucategoria")

## Relacionando as bases temporárias para o perído de COMPRAS realizadas em até 60 dias

### Observações:
##### 1) A base de COMPRA se encontra com informações até o dia 30-04-2021.
##### 2) Para uma base produtiva "online" é interessante fazer a seguinte substituição no código:                  
######                DE: and cast(CP.data as date) >= cast('2021-04-30' as date) - 60
######           PARA: and cast(CP.data as date) >= current_date() - 60         
##### 3) Foi considerado para este MVP somente os seguintes sellers: 
#####         L.idlojista in ('10012','37450','92893','80333','64189') 
#####        Atenção: recomendado que se tire tal filtro para um ambiente produtivo deste notebook)

In [34]:
df_60=spark.sql("SELECT distinct '60' as pub \
                 ,CS.idskureferencia \
                 ,L.idlojista, L.estado \
                 ,CS.idcompraentrega \
                 ,DP.iddepartamento  \
                 ,DP.nomedepartamento \
                 ,DP.idsetor \
                 ,DP.nomesetor \
             FROM tb_compra CP\
                 ,tb_compraentrega CE\
                 ,tb_compraentregasku CS\
                 ,tb_lojista L\
                 ,tb_skucategoria DP\
            where CP.idcompra = CE.idcompra \
              and CE.idcompraentrega = CS.idcompraentrega \
              and L.idlojista = CE.idlojista\
              and DP.idsku = CS.idsku \
              and L.idlojista in ('10012','37450','92893','80333','64189') \
              and cast(CP.data as date) >= cast('2021-04-30' as date) - 60 \
             ;")

In [16]:
df_60.show()

+---+---------------+---------+------+---------------+--------------+--------------------+-------+--------------------+
|pub|idskureferencia|idlojista|estado|idcompraentrega|iddepartamento|    nomedepartamento|idsetor|           nomesetor|
+---+---------------+---------+------+---------------+--------------+--------------------+-------+--------------------+
| 60|       11653756|    31143|    SP|      117999785|           448|            Calçados|   2246|          Masculinos|
| 60|       11653756|    31133|    SP|      113292829|           448|            Calçados|   2246|          Masculinos|
| 60|     1500708198|    13612|    SP|       96235693|            93|              Móveis|     97|       Sala de Estar|
| 60|     1509159341|    56064|    SP|       96374319|           371|Utilidades Domést...|    396|Acessórios e Uten...|
| 60|     1504248423|    38790|    SP|       95990919|            38|Telefones e Celul...|    326|         Smartphones|
| 60|        9274501|    14471|    SC|  

## Relacionando as bases temporárias para o perído de COMPRAS realizadas em até 30 dias

### Observações:
##### 1) A base de COMPRA se encontra com informações até o dia 30-04-2021.
##### 2) Para uma base produtiva "online" é interessante fazer a seguinte substituição no código:                  
######                DE: and cast(CP.data as date) >= cast('2021-04-30' as date) - 30
######           PARA: and cast(CP.data as date) >= current_date() - 30         
##### 3) Foi considerado para este MVP somente os seguintes sellers: 
#####         L.idlojista in ('10012','37450','92893','80333','64189') 
#####        Atenção: recomendado que se tire tal filtro para um ambiente produtivo deste notebook)

In [35]:
df_30=spark.sql("SELECT distinct '30' as pub \
                 ,CS.idskureferencia \
                 ,L.idlojista, L.estado \
                 ,CS.idcompraentrega \
                 ,DP.iddepartamento  \
                 ,DP.nomedepartamento \
                 ,DP.idsetor \
                 ,DP.nomesetor \
             FROM tb_compra CP\
                 ,tb_compraentrega CE\
                 ,tb_compraentregasku CS\
                 ,tb_lojista L\
                 ,tb_skucategoria DP\
            where CP.idcompra = CE.idcompra \
              and CE.idcompraentrega = CS.idcompraentrega \
              and L.idlojista = CE.idlojista\
              and DP.idsku = CS.idsku \
              and L.idlojista in ('10012','37450','92893','80333','64189') \
              and cast(CP.data as date) >= cast('2021-04-30' as date) - 30 \
             ;")

## Relacionando as bases temporárias para o perído de COMPRAS realizadas em até 7 dias

### Observações:
##### 1) A base de COMPRA se encontra com informações até o dia 30-04-2021.
##### 2) Para uma base produtiva "online" é interessante fazer a seguinte substituição no código:                  
######                DE: and cast(CP.data as date) >= cast('2021-04-30' as date) - 7
######           PARA: and cast(CP.data as date) >= current_date() - 7         
##### 3) Foi considerado para este MVP somente os seguintes sellers: 
#####         L.idlojista in ('10012','37450','92893','80333','64189') 
#####        Atenção: recomendado que se tire tal filtro para um ambiente produtivo deste notebook)

In [36]:
df_7=spark.sql("SELECT distinct ' 7' as pub \
                 ,CS.idskureferencia \
                 ,L.idlojista, L.estado \
                 ,CS.idcompraentrega \
                 ,DP.iddepartamento  \
                 ,DP.nomedepartamento \
                 ,DP.idsetor \
                 ,DP.nomesetor \
             FROM tb_compra CP\
                 ,tb_compraentrega CE\
                 ,tb_compraentregasku CS\
                 ,tb_lojista L\
                 ,tb_skucategoria DP\
            where CP.idcompra = CE.idcompra \
              and CE.idcompraentrega = CS.idcompraentrega \
              and L.idlojista = CE.idlojista\
              and DP.idsku = CS.idsku \
              and L.idlojista in ('10012','37450','92893','80333','64189') \
              and cast(CP.data as date) >= cast('2021-04-30' as date) - 7 \
             ;")

### Concatenando("Empilhando") as informações de 60, 30 e 7 dias

In [37]:
df_conc = df_60.union(df_7.union(df_30))

In [38]:
df_conc.show()

+---+---------------+---------+------+---------------+--------------+--------------------+-------+--------------------+
|pub|idskureferencia|idlojista|estado|idcompraentrega|iddepartamento|    nomedepartamento|idsetor|           nomesetor|
+---+---------------+---------+------+---------------+--------------+--------------------+-------+--------------------+
| 60|        8154506|    37450|    SP|       96382078|            73|     Eletroportáteis|   2963|Chocolateira e Fo...|
| 60|     1502998518|    10012|    GO|      118308184|          3279|               Áudio|   3279|               Áudio|
| 60|     1504540521|    10012|    GO|      114765979|           836|          Automotivo|    990|               Motos|
| 60|       11264860|    37450|    SP|       96573761|           371|Utilidades Domést...|    389|Sobremesa, Chá e ...|
| 60|     1502998701|    10012|    GO|       95977244|           836|          Automotivo|    990|               Motos|
| 60|     1505102152|    10012|    GO|  

### Criando a base temporária: tb_base_volume para o Agrupamento Final: df_final

In [39]:
df_conc.createOrReplaceTempView("tab_base_volume")

In [40]:
df_final=spark.sql("SELECT pub \
                 ,idskureferencia \
                 ,idlojista, estado \
                 ,nomedepartamento \
                 ,nomesetor \
                 ,count(idcompraentrega) as qtd \
             FROM tab_base_volume \
             group by \
               pub \
              ,idskureferencia \
              ,idlojista \
              ,estado \
              ,nomedepartamento \
              ,nomesetor \
              order by \
              qtd desc;")

## Atenção: Veja os comentários acima para considerar a base num ambiente produtivo "online"
### Persistindo a base final com as informações dos 5 sellers

In [41]:
df_final.repartition(1).write.mode("overwrite").csv('/home/spark/shared/tmp/volume_vendas_73060_b5.csv')

### Lista o local de onde se encontra a base particionada para que se possa disponibilizar a mesma para trativas para o FRONTEND vide o seguinte link:

In [42]:
ls /home/spark/shared/tmp/volume_vendas_73060_b5.csv

part-00000-a6318afb-6eb9-4a9c-b46a-208eef2b2a21-c000.csv  _SUCCESS


In [43]:
cgsClient.upload_file(Filename='/home/spark/shared/tmp/volume_vendas_73060_b5.csv/part-00000-a6318afb-6eb9-4a9c-b46a-208eef2b2a21-c000.csv',Bucket=credentials_1['BUCKET'],Key='volume_vendas_73060_b5.csv')

######### Fazendo gravação como Parquet #########

In [29]:
df_final.repartition(1).write.mode("overwrite").parquet('/home/spark/shared/tmp/volume_vendas_73060.parquet')

In [30]:
ls /home/spark/shared/tmp/volume_vendas_73060.parquet

part-00000-65a74389-3dfe-4070-8f41-c71541d59f6e-c000.snappy.parquet  _SUCCESS


In [31]:
cgsClient.upload_file(Filename='/home/spark/shared/tmp/volume_vendas_73060.parquet/part-00000-65a74389-3dfe-4070-8f41-c71541d59f6e-c000.snappy.parquet',Bucket=credentials_1['BUCKET'],Key='volume_vendas_73060.parquet')