Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/novo schema #151

Merged
merged 2 commits into from Jul 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -12,3 +12,4 @@ dist/
docs/_build
.vscode/tags
.cache/
.pytest_cache/
469 changes: 195 additions & 274 deletions pytrustnfe/Servidores.py

Large diffs are not rendered by default.

265 changes: 151 additions & 114 deletions pytrustnfe/nfe/__init__.py
Expand Up @@ -5,37 +5,18 @@

import os
import hashlib
import binascii
from lxml import etree
from .comunicacao import executar_consulta
from .assinatura import Assinatura
from pytrustnfe.xml import render_xml
from pytrustnfe.utils import CabecalhoSoap
from pytrustnfe.xml import render_xml, sanitize_response
from pytrustnfe.utils import gerar_chave, ChaveNFe
from pytrustnfe.Servidores import localizar_url, localizar_qrcode
from pytrustnfe.xml.validate import valida_nfe
from pytrustnfe.exceptions import NFeValidationException


def _build_header(method, **kwargs):
action = {
'NfeAutorizacao': ('NfeAutorizacao', '3.10'),
'NfeRetAutorizacao': ('NfeRetAutorizacao', '3.10'),
'NfeConsultaCadastro': ('CadConsultaCadastro2', '2.00'),
'NfeInutilizacao': ('NfeInutilizacao2', '3.10'),
'RecepcaoEventoCancelamento': ('RecepcaoEvento', '1.00'),
'RecepcaoEventoCarta': ('RecepcaoEvento', '1.00'),
'NFeDistribuicaoDFe': ('NFeDistribuicaoDFe/nfeDistDFeInteresse',
'1.00'),
'RecepcaoEventoManifesto': ('RecepcaoEvento', '1.00'),
'NfeConsulta2': ('NfeConsulta2', '3.10')
}
# Método específico para o estado da Bahia
if kwargs['estado'] == '29':
action['NfeConsulta2'] = ('NfeConsulta', '3.10')
vals = {'estado': kwargs['estado'],
'soap_action': action[method][0],
'versao': action[method][1]}
return CabecalhoSoap(**vals)
from pytrustnfe.certificado import extract_cert_and_key_from_pfx, save_cert_key

# Zeep
from requests import Session
from zeep import Client
from zeep.transports import Transport


def _generate_nfe_id(**kwargs):
Expand All @@ -57,62 +38,39 @@ def _generate_nfe_id(**kwargs):
item['infNFe']['ide']['cDV'] = chave_nfe[len(chave_nfe) - 1:]


def _add_required_node(elemTree):
ns = elemTree.nsmap
if None in ns:
ns['ns'] = ns[None]
ns.pop(None)

prods = elemTree.findall('ns:NFe/ns:infNFe/ns:det/ns:prod', namespaces=ns)
for prod in prods:
element = prod.find('ns:cEAN', namespaces=ns)
if element is None:
cEan = etree.Element('cEAN')
prod.insert(1, cEan)
element = prod.find('ns:cEANTrib', namespaces=ns)
if element is None:
cEANTrib = etree.Element('cEANTrib')
vProd = prod.find('ns:vProd', namespaces=ns)
prod.insert(prod.index(vProd) + 1, cEANTrib)
return elemTree


def _add_qrCode(xml, **kwargs):
xml = etree.fromstring(xml)
inf_nfe = kwargs['NFes'][0]['infNFe']
nfe = xml.find(".//{http://www.portalfiscal.inf.br/nfe}NFe")
infnfesupl = etree.Element('infNFeSupl')
qrcode = etree.Element('qrCode')
chave_nfe = inf_nfe['Id'][3:]
dh_emissao = inf_nfe['ide']['dhEmi'].encode('hex')
dh_emissao = binascii.hexlify(inf_nfe['ide']['dhEmi'].encode()).decode()
versao = '100'
ambiente = kwargs['ambiente']
valor_total = inf_nfe['total']['vNF']
dest_cpf = 'Inexistente'
dest = nfe.find(".//{http://www.portalfiscal.inf.br/nfe}dest")
if dest:
dest_parent = dest.getparent()
dest_parent.remove(dest)

if inf_nfe.get('dest', False):
if inf_nfe['dest'].get('CPF', False):
dest_cpf = inf_nfe['dest']['CPF']
dest = etree.Element('dest')
cpf = etree.Element('CPF')
cpf.text = dest_cpf
dest.append(cpf)
dest_parent.append(dest)

icms_total = inf_nfe['total']['vICMS']
dig_val = xml.find(
".//{http://www.w3.org/2000/09/xmldsig#}DigestValue")\
.text.encode('hex')
dig_val = binascii.hexlify(xml.find(
".//{http://www.w3.org/2000/09/xmldsig#}DigestValue").text.encode()).decode()
cid_token = kwargs['NFes'][0]['infNFe']['codigo_seguranca']['cid_token']
csc = kwargs['NFes'][0]['infNFe']['codigo_seguranca']['csc']

c_hash_QR_code = "chNFe={0}&nVersao={1}&tpAmb={2}&cDest={3}&dhEmi={4}&vNF\
={5}&vICMS={6}&digVal={7}&cIdToken={8}{9}".\
format(chave_nfe, versao, ambiente, dest_cpf, dh_emissao,
valor_total, icms_total, dig_val, cid_token, csc)
c_hash_QR_code = hashlib.sha1(c_hash_QR_code).hexdigest()
c_hash_QR_code = hashlib.sha1(c_hash_QR_code.encode()).hexdigest()

QR_code_url = "?chNFe={0}&nVersao={1}&tpAmb={2}&{3}dhEmi={4}&vNF={5}&vICMS\
={6}&digVal={7}&cIdToken={8}&cHashQRCode={9}".\
Expand All @@ -128,49 +86,23 @@ def _add_qrCode(xml, **kwargs):
return etree.tostring(xml)


def _send(certificado, method, sign, **kwargs):
def _render(certificado, method, sign, **kwargs):
path = os.path.join(os.path.dirname(__file__), 'templates')
xmlElem_send = render_xml(path, '%s.xml' % method, True, **kwargs)

modelo = xmlElem_send.find(".//{http://www.portalfiscal.inf.br/nfe}mod")
modelo = modelo.text if modelo is not None else '55'
if modelo == '65':
pagamento = etree.Element('pag')
tipo_pagamento = etree.Element('tPag')
valor = etree.Element('vPag')
valor_pago = kwargs['NFes'][0]['infNFe']['total']['vNF']
metodo_pagamento = kwargs['NFes'][0]['infNFe']['pagamento']
tipo_pagamento.text, valor.text = metodo_pagamento, valor_pago
pagamento.append(tipo_pagamento)
pagamento.append(valor)
transp = xmlElem_send.find(
".//{http://www.portalfiscal.inf.br/nfe}transp")
transp.addnext(pagamento)

if sign:
# Caso for autorização temos que adicionar algumas tags tipo
# cEan, cEANTrib porque o governo sempre complica e não segue padrão
if method == 'NfeAutorizacao':
xmlElem_send = _add_required_node(xmlElem_send)

signer = Assinatura(certificado.pfx, certificado.password)
if method == 'NfeInutilizacao':
xml_send = signer.assina_xml(xmlElem_send, kwargs['obj']['id'])
if method == 'NfeAutorizacao':
xml_send = signer.assina_xml(
xmlElem_send, kwargs['NFes'][0]['infNFe']['Id'])
if 'validate' in kwargs:
erros = valida_nfe(xml_send)
if erros:
raise NFeValidationException('Erro ao validar NFe',
erros=erros,
sent_xml=xml_send)
elif method == 'RecepcaoEventoCancelamento':
elif method == 'RecepcaoEvento':
xml_send = signer.assina_xml(
xmlElem_send, kwargs['eventos'][0]['Id'])

if method == 'RecepcaoEventoCarta':
xml_send = signer.assina_xml(
xmlElem_send, kwargs['Id'])
elif method == 'RecepcaoEventoManifesto':
xml_send = signer.assina_xml(
xmlElem_send, kwargs['manifesto']['identificador'])
Expand All @@ -180,68 +112,173 @@ def _send(certificado, method, sign, **kwargs):

else:
xml_send = etree.tostring(xmlElem_send)
return xml_send


def _send(certificado, method, **kwargs):
xml_send = kwargs["xml"]
base_url = localizar_url(
method, kwargs['estado'], kwargs['modelo'], kwargs['ambiente'])

cert, key = extract_cert_and_key_from_pfx(
certificado.pfx, certificado.password)
cert, key = save_cert_key(cert, key)

session = Session()
session.cert = (cert, key)
session.verify = False
transport = Transport(session=session)

xml = etree.fromstring(xml_send)
client = Client(base_url, transport=transport)

port = next(iter(client.wsdl.port_types))
first_operation = next(iter(client.wsdl.port_types[port].operations))
with client.options(raw_response=True):
response = client.service[first_operation](xml)
response, obj = sanitize_response(response.text)
return {
'sent_xml': xml_send,
'received_xml': response,
'object': obj.Body.getchildren()[0]
}

url = localizar_url(method, kwargs['estado'], modelo,
kwargs['ambiente'])
cabecalho = _build_header(method, **kwargs)

send_raw = False
if method == 'NFeDistribuicaoDFe':
send_raw = True

response, obj = executar_consulta(certificado, url, cabecalho, xml_send,
send_raw=send_raw)
return {
'sent_xml': xml_send,
'received_xml': response,
'object': obj
}
def xml_autorizar_nfe(certificado, **kwargs):
_generate_nfe_id(**kwargs)
return _render(certificado, 'NfeAutorizacao', True, **kwargs)


def autorizar_nfe(certificado, **kwargs): # Assinar
_generate_nfe_id(**kwargs)
return _send(certificado, 'NfeAutorizacao', True, **kwargs)
if "xml" not in kwargs:
kwargs['xml'] = xml_autorizar_nfe(certificado, **kwargs)
return _send(certificado, 'NfeAutorizacao', **kwargs)


def xml_retorno_autorizar_nfe(certificado, **kwargs):
return _render(certificado, 'NfeRetAutorizacao', False, **kwargs)


def retorno_autorizar_nfe(certificado, **kwargs):
return _send(certificado, 'NfeRetAutorizacao', False, **kwargs)
if "xml" not in kwargs:
kwargs['xml'] = xml_retorno_autorizar_nfe(certificado, **kwargs)
return _send(certificado, 'NfeRetAutorizacao', **kwargs)


def xml_recepcao_evento_cancelamento(certificado, **kwargs): # Assinar
return _render(certificado, 'RecepcaoEvento', True, **kwargs)


def recepcao_evento_cancelamento(certificado, **kwargs): # Assinar
return _send(certificado, 'RecepcaoEventoCancelamento', True, **kwargs)
if "xml" not in kwargs:
kwargs['xml'] = xml_recepcao_evento_cancelamento(certificado, **kwargs)
return _send(certificado, 'RecepcaoEvento', **kwargs)


def xml_inutilizar_nfe(certificado, **kwargs):
return _render(certificado, 'NfeInutilizacao', True, **kwargs)


def inutilizar_nfe(certificado, **kwargs):
if "xml" not in kwargs:
kwargs['xml'] = xml_inutilizar_nfe(certificado, **kwargs)
return _send(certificado, 'NfeInutilizacao', **kwargs)

def inutilizar_nfe(certificado, **kwargs): # Assinar
return _send(certificado, 'NfeInutilizacao', True, **kwargs)

def xml_consultar_protocolo_nfe(certificado, **kwargs):
return _render(certificado, 'NfeConsultaProtocolo', False, **kwargs)


def consultar_protocolo_nfe(certificado, **kwargs):
return _send(certificado, 'NfeConsulta2', False, **kwargs)
if "xml" not in kwargs:
kwargs['xml'] = xml_consultar_protocolo_nfe(certificado, **kwargs)
return _send(certificado, 'NfeConsultaProtocolo', **kwargs)


def xml_nfe_status_servico(certificado, **kwargs):
return _render(certificado, 'NfeStatusServico', False, **kwargs)


def nfe_status_servico(certificado, **kwargs):
return _send(certificado, 'NfeStatusServico', False, **kwargs)
if "xml" not in kwargs:
kwargs['xml'] = xml_nfe_status_servico(certificado, **kwargs)
return _send(certificado, 'NfeStatusServico', **kwargs)


def xml_consulta_cadastro(certificado, **kwargs):
return _render(certificado, 'NfeConsultaCadastro', False, **kwargs)


def consulta_cadastro(certificado, **kwargs):
return _send(certificado, 'NfeConsultaCadastro', False, **kwargs)
if "xml" not in kwargs:
kwargs['xml'] = xml_consulta_cadastro(certificado, **kwargs)
kwargs['modelo'] = '55'
return _send(certificado, 'NfeConsultaCadastro', **kwargs)


def xml_recepcao_evento_carta_correcao(certificado, **kwargs): # Assinar
return _render(certificado, 'RecepcaoEvento', True, **kwargs)


def recepcao_evento_carta_correcao(certificado, **kwargs): # Assinar
return _send(certificado, 'RecepcaoEventoCarta', True, **kwargs)
if "xml" not in kwargs:
kwargs['xml'] = xml_recepcao_evento_carta_correcao(
certificado, **kwargs)
return _send(certificado, 'RecepcaoEvento', **kwargs)


def xml_recepcao_evento_manifesto(certificado, **kwargs): # Assinar
return _render(certificado, 'RecepcaoEvento', True, **kwargs)


def recepcao_evento_manifesto(certificado, **kwargs): # Assinar
return _send(certificado, 'RecepcaoEventoManifesto', True, **kwargs)
if "xml" not in kwargs:
kwargs['xml'] = xml_recepcao_evento_manifesto(certificado, **kwargs)
return _send(certificado, 'RecepcaoEvento', **kwargs)


def recepcao_evento_epec(certificado, **kwargs): # Assinar
return _send(certificado, 'RecepcaoEventoEPEC', True, **kwargs)
def xml_consulta_distribuicao_nfe(certificado, **kwargs): # Assinar
return _render(certificado, 'NFeDistribuicaoDFe', False, **kwargs)


def consulta_distribuicao_nfe(certificado, **kwargs):
return _send(certificado, 'NFeDistribuicaoDFe', False, **kwargs)
if "xml" not in kwargs:
kwargs['xml'] = xml_consulta_distribuicao_nfe(certificado, **kwargs)
xml_send = kwargs["xml"]
base_url = localizar_url(
'NFeDistribuicaoDFe', kwargs['estado'], kwargs['modelo'],
kwargs['ambiente'])

cert, key = extract_cert_and_key_from_pfx(
certificado.pfx, certificado.password)
cert, key = save_cert_key(cert, key)

session = Session()
session.cert = (cert, key)
session.verify = False
transport = Transport(session=session)

xml = etree.fromstring(xml_send)
xml_um = etree.fromstring('<nfeCabecMsg xmlns="http://www.portalfiscal.inf.br/nfe/wsdl/"><cUF>AN</cUF><versaoDados>1.00</versaoDados></nfeCabecMsg>')
client = Client(base_url, transport=transport)

port = next(iter(client.wsdl.port_types))
first_operation = next(iter(client.wsdl.port_types[port].operations))
with client.options(raw_response=True):
response = client.service[first_operation](nfeDadosMsg=xml, _soapheaders=[xml_um])
response, obj = sanitize_response(response.text)
return {
'sent_xml': xml_send,
'received_xml': response,
'object': obj.Body.nfeDistDFeInteresseResponse.nfeDistDFeInteresseResult
}


def xml_download_nfe(certificado, **kwargs): # Assinar
return _render(certificado, 'NFeDistribuicaoDFe', False, **kwargs)


def download_nfe(certificado, **kwargs):
return _send(certificado, 'NFeDistribuicaoDFe', False, **kwargs)
if "xml" not in kwargs:
kwargs['xml'] = xml_download_nfe(certificado, **kwargs)
return _send(certificado, 'NFeDistribuicaoDFe', **kwargs)