In [28]:
import pandas as pd
from sqlalchemy.sql import text
from sqlalchemy.exc import SQLAlchemyError
import requests
import urllib.parse
from sqlalchemy import create_engine, Table, MetaData, Column, String, DateTime, Integer, Float

In [29]:
def get_database_engine():
  print('EXECUTANDO NO BANCO DE PROD!')
  username = 'svc_p_ito_01'
  password = urllib.parse.quote('_c4u_6_XHE4y1@Cg')
  server = 'SQLPRD0007AG'
  port = 8000
  database = 'dbsgdt1'
  # print(f'DB: Usuário: "{username}", Password: "{password}", Server: "{server}", Porta: "{port}", Database: "{database}"')
  connection_string = f'mssql+pymssql://{username}:{password}@{server}:{port}/{database}?charset=utf8'
  engine = create_engine(connection_string)
  return engine


In [30]:
engine = get_database_engine()
metadata = MetaData()

EXECUTANDO NO BANCO DE PROD!


In [31]:

# Função para obter dados de uma API e transformar em DataFrame
def obter_dados_da_api(url_api):
   response = requests.get(url_api)
   if response.status_code == 200:
       dados_json = response.json()
       df = pd.json_normalize(dados_json)
       return df
   else:
       print(f"Erro ao acessar a API: {response.status_code}")
       return None

In [32]:
# Função para aplicar filtros de data e categoria no DataFrame
def aplicar_filtros(df, filtro_data=None, coluna_data=None, filtro_categoria=None, coluna_categoria=None):
   # Aplicar filtro de data, se fornecido
   if filtro_data and coluna_data:
       df[coluna_data] = pd.to_datetime(df[coluna_data], errors='coerce')  # Garantir que a coluna é datetime
       df = df[df[coluna_data] > filtro_data]
       print(f"Filtro de data aplicado: {filtro_data} - Total de registros: {len(df)}")
   # Aplicar filtro de categoria, se fornecido
   if filtro_categoria and coluna_categoria:
       df = df[df[coluna_categoria] == filtro_categoria]
       print(f"Filtro de categoria aplicado: {filtro_categoria} - Total de registros: {len(df)}")
   return df

In [33]:
# Função para criar um índice nas colunas de chave
def criar_indice(conn, tabela_nome, colunas_indice):
   conn = engine.connect()
   try:
       nome_indice = f"idx_{'_'.join(colunas_indice)}"
       verificar_indice_query = text(f"""
           SELECT COUNT(*)
           FROM sys.indexes
           WHERE name = '{nome_indice}'
           AND object_id = OBJECT_ID('{tabela_nome}');
       """)
       resultado = conn.execute(verificar_indice_query).scalar()
       if resultado == 0:
           criar_indice_query = text(f"""
               CREATE INDEX {nome_indice}
               ON {tabela_nome} ({', '.join([f'[{col}]' for col in colunas_indice])});
           """)
           conn.execute(criar_indice_query)
           print(f"Índice {nome_indice} criado com sucesso.")
       else:
           print(f"Índice {nome_indice} já existe.")
   except SQLAlchemyError as e:
       print(f"Erro ao criar o índice: {e}")

In [34]:
# Função para criar a tabela com tipos de dados definidos, se necessário
def criar_tabela(df, tabela_nome, tipos_colunas):
   columns = []
   for column_name in df.columns:
       if column_name in tipos_colunas:
           tipo = tipos_colunas[column_name]
       else:
           tipo = String  # Tipo padrão como String
       columns.append(Column(column_name, tipo))
   # Aqui criamos a tabela com as colunas definidas
   table = Table(tabela_nome, metadata, *columns, extend_existing=True)
   metadata.create_all(engine)  # Cria a tabela no banco de dados se não existir

In [35]:
# Função para realizar o merge dos dados com inserção e atualização em lote
def merge_dados_em_lote(df, tabela_nome, chave_unica, batch_size=1000, tipos_colunas=None):
   try:
    #    df.columns = df.columns.str.replace(' ', '_')
       conn = engine.connect()
        #criar_indice(conn, tabela_nome, chave_unica)
       # Criar a tabela se ainda não existir e aplicar os tipos de colunas definidos
       if tipos_colunas:
           criar_tabela(df, tabela_nome, tipos_colunas)
       total_registros = len(df)
       for start in range(0, total_registros, batch_size):
           batch_df = df.iloc[start:start + batch_size]
           trans = conn.begin()
           for index, row in batch_df.iterrows():
               chave_clause = " AND ".join([f"target.[{col}] = :{col}" for col in chave_unica])
               update_clause = ", ".join([f"target.[{col}] = source.[{col}]" for col in df.columns if col not in chave_unica])
               insert_columns = ", ".join([f"[{col}]" for col in df.columns])
               insert_values = ", ".join([f"source.[{col}]" for col in df.columns])
               merge_query = text(f"""
                   MERGE INTO {tabela_nome} AS target
                   USING (SELECT :{', :'.join(df.columns)}) AS source({', '.join(df.columns)})
                   ON {chave_clause}
                   WHEN MATCHED THEN
                       UPDATE SET {update_clause}
                   WHEN NOT MATCHED THEN
                       INSERT ({insert_columns}) VALUES ({insert_values});
               """)
               conn.execute(merge_query, row.to_dict())
           trans.commit()
           print(f"Lote {start // batch_size + 1} processado com sucesso ({min(start + batch_size, total_registros)} de {total_registros} registros).")
       print(f"Merge de dados na tabela '{tabela_nome}' concluído com sucesso.")
   except SQLAlchemyError as e:
       trans.rollback()
       print(f"Erro ao realizar o merge de dados: {e}")
   finally:
       conn.close()
# Função principal para processar os dados e enviar ao banco

In [36]:
def processar_dados(origem, tipo_origem, tabela_nome, chave_unica, batch_size=1000, filtro_data=None, coluna_data=None, filtro_categoria=None, coluna_categoria=None, tipos_colunas=None):
   if tipo_origem == 'arquivo':
       df = pd.read_excel(origem)  # Ou pd.read_csv() dependendo do tipo do arquivo
   elif tipo_origem == 'api':
       df = obter_dados_da_api(origem)
   else:
       print("Tipo de origem desconhecido.")
       return
   if df is not None:
       df.dropna(how='all', inplace=True)
       df.columns = df.columns.str.replace(' ', '_')
       df = aplicar_filtros(df, filtro_data=filtro_data, coluna_data=coluna_data, filtro_categoria=filtro_categoria, coluna_categoria=coluna_categoria)
       merge_dados_em_lote(df, tabela_nome, chave_unica, batch_size, tipos_colunas)
# Exemplo de uso com definição de tipos de dados
tipos_colunas = {
    "Hostname": String,
    "Domain": String,
    "OS Version": String,
    "SCCM Last Communnication": DateTime,
    "SCCM Client Version": String,  # Definindo a coluna como datetime
}
processar_dados(
   origem=r"C:\Users\t781646\Downloads\data (8).xlsx",
   tipo_origem="arquivo",
   tabela_nome="cmdb_servidores_windows",
   chave_unica=["Hostname"],
   batch_size=500,
   filtro_data="2024-09-01",
   coluna_data="SCCM Last Communnication",
#    filtro_categoria="categoria_a",
#    coluna_categoria="categoria",
   tipos_colunas=tipos_colunas
)

KeyError: 'SCCM Last Communnication'