# Teste lib PyCarol

## Preparando o ambiente

In [15]:
from pycarol import Staging, Carol, BQ, CarolHandler, Connectors
from pycarol.bigquery import TokenManager
from dotenv import load_dotenv
import pandas as pd
import logging
import os
import json

In [2]:
load_dotenv(".env")

True

In [3]:
carol = Carol()

In [4]:
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
carolHandler = CarolHandler(Carol())
carolHandler.setLevel(logging.INFO)
logger.addHandler(carolHandler)
if(carolHandler.task_id is not None):
    os.environ['LONGTASKID'] = carolHandler.task_id

In [12]:
logging.warning("this is a warning message")



In [9]:
pd.set_option('future.no_silent_downcasting', False)

### Buscar Schema

Retorna o schema da tabela junto com seus respectivos metadados

In [18]:
staging = Staging(Carol())
schema = staging.get_schema(staging_name="unique_hlpartnumberslot",
    connector_name="study_connector")
jSchema = json.dumps(schema)

In [20]:
with open('stagging_unique_hlpartnumberslot_schema.json', 'w+', encoding='utf-8') as file:
    file.write(jSchema)

### Parque Stagging

Buscando os dados dos arquivos parquet em stagging. Está com algumas coisas desatualizadas envolvendo o Pandas. Esse método apresentou erros dependendo da chamada. Parece ter um problema na forma de usar o pandas. Além disso, não deixa possibilidade de usar outras libs, como polars.

In [11]:
df = staging.fetch_parquet(
    staging_name="users",
    connector_name="lscloud",
    backend='pandas',    
    max_hits=100,
    return_metadata=False,
    columns=['idProductLine', 'packageId']
)
df

  0%|          | 67/45626 [03:33<40:15:26,  3.18s/it]
  d['mdmDeleted'] = d['mdmDeleted'].fillna(False)


Unnamed: 0,idProductLine,packageId
0,XX,1617889565933
1,02,1617889565539
2,02,1617889714473
3,XX,1617890466045
4,XX,1617890532805
...,...,...
95,01,1617965584631
96,XX,1617966491236
97,01,1617966788236
98,XX,1617966788236


### Chamando direto o BigQuery

In [26]:
bq = BQ(Carol())
query_str = """select trim(partnumberid) as partnumberid, slotid      
    from stg_hlcloud_hlpartnumberslot
    qualify ROW_NUMBER() OVER (PARTITION BY partnumberid, slotid ORDER BY mdmCounterForEntity DESC, mdmLastUpdated DESC) = 1  """
results = bq.query(query_str)
results

Unnamed: 0,partnumberid,slotid
0,0111022000-2,4120
1,0113301170,4088
2,0118501008-6,0408
3,103600042-7,4003
4,103600054-0,4000
...,...,...
7016,CML.002499,3137
7017,CMV.002537,4185
7018,CMV.003499,3137
7019,CONSL.253,3516


### Token manager
É possível gerar um service account para conectar diretamente ao BQ.

In [None]:
tm = TokenManager(Carol())
service_account = tm.get_token().service_account
service_account

In [7]:
unique_hlpartnumberslot = results.to_dict(orient="records")
unique_hlpartnumberslot[0]

{'partnumberid': '0111022000-2', 'slotid': '4120'}

In [31]:
connector_name = "study_connector"
connector_id = Connectors(carol).create(name='study_connector', label="study_connector_label", group_name="StudyConnectorGroup")
print(f"This is the connector id: {connector_id}")

This is the connector id: 447f4ae9d5594e66996e492d98388033


In [10]:
new_connector_id = '447f4ae9d5594e66996e492d98388033'
staging_name='unique_hlpartnumberslot'
staging = Staging(carol)

In [41]:
staging.create_schema(staging_name=staging_name, data = unique_hlpartnumberslot[0], connector_id=new_connector_id,                      
                        connector_name=connector_name, crosswalk_name= staging_name ,crosswalk_list=['partnumberid'])

  staging.create_schema(staging_name=staging_name, data = unique_hlpartnumberslot[0], connector_id=connector_id,


In [11]:
staging.send_data(staging_name = staging_name, data = unique_hlpartnumberslot, step_size = 10,
                 connector_id=new_connector_id, print_stats = True)

7021/7021 sent

In [None]:
bq = BQ(Carol())
query_str = "select * from unique_hlpartnumberslot"
results = bq.query(query_str)
results