Esse notebook tem por objetivo fazer a relação de empresas que importam somente uma marca, facilitando a relação importador-

In [1]:
# Importing the modules needed
import sys

import pandas as pd

sys.path.append("../src/")

from src.data.dremio_utils import *
# Data Handling
from dotenv import dotenv_values 

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, StringType, DecimalType, FloatType, DateType, TimestampType

import pyarrow.parquet as pq

In [2]:
spark = SparkSession.builder.master("local[1]").appName("attributes_dict").getOrCreate()

In [3]:
config = dotenv_values(".env")
bds = BaseDremioService(config)

## 1. Old Data

### 1.1 Extract the data

In [8]:
big_data_di_sql = ''' 
 SELECT 
     dpb.di_number,
     sd.sgl_uf_import as sgl_uf,
     sd.cidade_import,
     sd.anomes,
     dpb.urf_entrada_carga,
     CAST(val_unid_prod_us AS FLOAT) as val_unid_prod_us
 FROM searchx.search_dump sd
 INNER JOIN di_pu.di_pu_bronze dpb
    ON CAST(dpb.di_number AS INT) = CAST(sd.di_number AS INT)
 WHERE anomes <= 202112
    AND anomes >= 201801
    AND ISNUMERIC(val_unid_prod_us)
'''
aux_df = bds.extract_dremio_dataset(big_data_di_sql)
aux_df.to_parquet("../data/raw/unity_price/search_dump.parquet")

[INFO] Authentication was successful
[INFO] Query:   
 SELECT 
     dpb.di_number,
     sd.sgl_uf_import as sgl_uf,
     sd.cidade_import,
     sd.anomes,
     dpb.urf_entrada_carga,
     CAST(val_unid_prod_us AS FLOAT) as val_unid_prod_us
 FROM searchx.search_dump sd
 INNER JOIN di_pu.di_pu_bronze dpb
    ON CAST(dpb.di_number AS INT) = CAST(sd.di_number AS INT)
 WHERE anomes <= 202112
    AND anomes >= 201801
    AND ISNUMERIC(val_unid_prod_us)
[INFO] GetSchema was successful
[INFO] Schema:  <pyarrow.flight.SchemaResult schema=(di_number: int32
sgl_uf: string
cidade_import: string
anomes: string
urf_entrada_carga: string
val_unid_prod_us: float)>
[INFO] GetFlightInfo was successful
[INFO] Ticket:  <pyarrow.flight.Ticket ticket=b'\n\x8f\x03 \n SELECT \n     dpb.di_number,\n     sd.sgl_uf_import as sgl_uf,\n     sd.cidade_import,\n     sd.anomes,\n     dpb.urf_entrada_carga,\n     CAST(val_unid_prod_us AS FLOAT) as val_unid_prod_us\n FROM searchx.search_dump sd\n INNER JOIN di_pu.di_pu

### 1.2 Group By

In [4]:
search_dump = spark.read.parquet("../data/raw/unity_price/search_dump.parquet")

In [5]:
di_pu_add = spark.read.parquet("../data/raw/unity_price/di_pu_add.parquet")

In [6]:
search_dump_comp = di_pu_add.join(search_dump, search_dump.di_number == di_pu_add.di_number)

In [7]:
search_dump_comp.columns

['id',
 'pais',
 'cod_ncm',
 'di_control_pu_addition_id',
 'di_number',
 'di_number',
 'sgl_uf',
 'cidade_import',
 'anomes',
 'urf_entrada_carga',
 'val_unid_prod_us']

In [8]:
search_dump_grouped = search_dump_comp.groupBy([
    'pais',
    'cod_ncm',
    'sgl_uf',
    'cidade_import',
    'urf_entrada_carga',
    'anomes']).avg('val_unid_prod_us')

In [9]:
search_dump_grouped = search_dump_grouped.withColumnRenamed("pais", "id_pais_origem")\
    .withColumnRenamed("cod_ncm", "ncm")\
    .withColumnRenamed("cidade_import", "importador_municipio")\
    .withColumnRenamed("sgl_uf", "importador_uf")\
    .withColumnRenamed("urf_entrada_carga", "urf")\
    .withColumnRenamed("anomes", "anomes")\
    .withColumnRenamed("avg(val_unid_prod_us)", "avg_valor_item")

In [10]:
# search_dump_grouped.show(5)

In [11]:
search_dump_grouped.write.parquet("../data/interim/dump_grouped", mode="overwrite")

## 2. New Data

### 2.1 Extract the data

In [41]:
# Snippet to extract the data
# ```Python
big_data_di_sql = ''' 
SELECT
    dpa.id as id,
    cb.name_pt as pais,
    CAST(cod_ncm as int) as cod_ncm,
    di_control_pu_addition_id,
    di_number
FROM di_pu.di_pu_addition_bronze dpa
LEFT JOIN di_pu.country_bronze cb
    ON dpa.id_pais_origem = cb.id
'''
aux_df = bds.extract_dremio_dataset(big_data_di_sql)
aux_df.to_parquet("../data/raw/unity_price/di_pu_add.parquet")

[INFO] Authentication was successful
[INFO] Query:   
SELECT
    dpa.id as id,
    cb.name_pt as pais,
    CAST(cod_ncm as int) as cod_ncm,
    di_control_pu_addition_id,
    di_number
FROM di_pu.di_pu_addition_bronze dpa
LEFT JOIN di_pu.country_bronze cb
    ON dpa.id_pais_origem = cb.id
[INFO] GetSchema was successful
[INFO] Schema:  <pyarrow.flight.SchemaResult schema=(id: int64
pais: string
cod_ncm: int32
di_control_pu_addition_id: int64
di_number: int32)>
[INFO] GetFlightInfo was successful
[INFO] Ticket:  <pyarrow.flight.Ticket ticket=b'\n\xee\x01 \nSELECT\n    dpa.id as id,\n    cb.name_pt as pais,\n    CAST(cod_ncm as int) as cod_ncm,\n    di_control_pu_addition_id,\n    di_number\nFROM di_pu.di_pu_addition_bronze dpa\nLEFT JOIN di_pu.country_bronze cb\n    ON dpa.id_pais_origem = cb.id\n\x12\x8b\x02\n\x88\x02\n\xee\x01 \nSELECT\n    dpa.id as id,\n    cb.name_pt as pais,\n    CAST(cod_ncm as int) as cod_ncm,\n    di_control_pu_addition_id,\n    di_number\nFROM di_pu.di_pu_addi

big_data_di_sql = ''' 
SELECT 
    valor_item,
    di_pu_addition_id
FROM di_pu.di_pu_addition_itens_bronze
'''
aux_df = bds.extract_dremio_dataset(big_data_di_sql)
aux_df.to_parquet("../data/raw/unity_price/di_pu_add_itens.parquet")

big_data_di_sql = ''' 
SELECT 
    importador_endereco_municipio,
    importador_endereco_uf,
    urf_entrada_carga,
    CONCAT(EXTRACT(YEAR FROM CAST(data_hora_registro AS DATE)), LPAD(EXTRACT(MONTH FROM CAST(data_hora_registro AS DATE)), 2, '0')) as anomes,
    di_number 
FROM di_pu.di_pu_bronze
WHERE
    CAST(data_hora_registro AS DATE) < '2021-12-01' OR 
    CAST(data_hora_registro AS DATE) > '2022-02-28'
'''
aux_df = bds.extract_dremio_dataset(big_data_di_sql)
aux_df.to_parquet("../data/raw/unity_price/di_pu.parquet")

In [42]:
di_pu = spark.read.parquet("../data/raw/unity_price/di_pu.parquet")
di_pu_add = spark.read.parquet("../data/raw/unity_price/di_pu_add.parquet")
di_pu_add_itens = spark.read.parquet("../data/raw/unity_price/di_pu_add_itens.parquet")

### 2.2 Cross the Data

In [34]:
di_pu_add_itens.columns

['valor_item', 'di_pu_addition_id']

In [44]:
di_complete = di_pu_add.join(di_pu_add_itens, di_pu_add_itens.di_pu_addition_id == di_pu_add.id)
di_complete.printSchema()

root
 |-- id: long (nullable = true)
 |-- pais: string (nullable = true)
 |-- cod_ncm: double (nullable = true)
 |-- di_control_pu_addition_id: long (nullable = true)
 |-- di_number: integer (nullable = true)
 |-- valor_item: decimal(17,6) (nullable = true)
 |-- di_pu_addition_id: long (nullable = true)


In [45]:
di_complete = di_complete.join(di_pu, di_complete.di_number == di_pu.di_number)
di_complete.printSchema()

root
 |-- id: long (nullable = true)
 |-- pais: string (nullable = true)
 |-- cod_ncm: double (nullable = true)
 |-- di_control_pu_addition_id: long (nullable = true)
 |-- di_number: integer (nullable = true)
 |-- valor_item: decimal(17,6) (nullable = true)
 |-- di_pu_addition_id: long (nullable = true)
 |-- importador_endereco_municipio: string (nullable = true)
 |-- importador_endereco_uf: string (nullable = true)
 |-- urf_entrada_carga: string (nullable = true)
 |-- anomes: string (nullable = true)
 |-- di_number: integer (nullable = true)


### 2.3 Group by

In [46]:
di_complete = di_complete.groupBy([
                                'pais',
                                 'cod_ncm',
                                 'importador_endereco_municipio',
                                 'importador_endereco_uf',
                                 'urf_entrada_carga',
                                 'anomes']).avg("valor_item")

In [47]:
di_complete = di_complete.withColumnRenamed("pais", "pais")\
    .withColumnRenamed("cod_ncm", "ncm")\
    .withColumnRenamed("importador_endereco_municipio", "importador_municipio")\
    .withColumnRenamed("importador_endereco_uf", "importador_uf")\
    .withColumnRenamed("urf_entrada_carga", "urf")\
    .withColumnRenamed("ano", "ano")\
    .withColumnRenamed("avg(valor_item)", "avg_valor_item")

In [48]:
di_complete = di_complete.withColumn("ncm", di_complete.ncm.cast(DecimalType(18,2)))

In [None]:
di_complete.show()

In [49]:
di_complete.write.parquet("../data/interim/di_completed", mode="overwrite")

## 3. Merge old and new Data

In [12]:
di_complete = spark.read.parquet("../data/interim/di_completed")

In [13]:
historic_completed = search_dump_grouped.unionAll(di_complete)

In [16]:
# df.withColumn('total_sale_volume', df.total_sale_volume.cast(DecimalType(18, 2)))
historic_completed = historic_completed.withColumn("ncm", historic_completed.ncm.cast(DecimalType(18, 2)))

In [None]:
# search_dump_grouped.count()
# historic_completed.count()

In [17]:
historic_completed.write.parquet("../data/interim/unity_price_historic", mode="overwrite")

In [18]:
table = pq.read_table("../data/interim/unity_price_historic")
df = table.to_pandas()

In [19]:
df = df[df["anomes"].astype(int) >= 201800].copy()
df["semestre"] = df.apply(lambda x: 1 if int(str(x["anomes"])[-2:]) <= 6 else 2, axis=1)
df["ano"] = df.apply(lambda x: int(str(x["anomes"])[:4]), axis=1)
df["ano_semestre"] = df.apply(lambda x: x["ano"]*100 + x["semestre"], axis=1)

In [20]:
df.to_parquet("../data/processed/average_unity_price_historic.parquet")