# Variables

## DEV

In [1]:
%%writefile .env

GCP_PROJECT_ID=dev-izipay-data-storage

SECRET_TOKEN=izi-token-7rMVpUvdo8T3BlbkFJvlPEtwcw

DEPLOY_SERVICE_ACCOUNT=dev-data-storage-onemarketer@dev-izipay-data-storage.iam.gserviceaccount.com
DEPLOY_SERVICE_NAME=dev-izi-hashing-api-v1

Overwriting .env-dev


## PRD

In [1]:
%%writefile .env

GCP_PROJECT_ID=prd-izipay-advanced-analytics

SECRET_TOKEN=prd-izipay-token-7rMVpUvdo8T3BlbkFJvlPEtwcw

DEPLOY_SERVICE_ACCOUNT=prd-dataops-apis-onemarketer@prd-izipay-advanced-analytics.iam.gserviceaccount.com
DEPLOY_SERVICE_NAME=prd-izipay-hashing-api-v0.1

Overwriting .env


# Get Variables

In [2]:
import os
from dotenv import load_dotenv

load_dotenv(".env")

GCP_PROJECT_ID = os.getenv('GCP_PROJECT_ID')
SECRET_TOKEN = os.getenv('SECRET_TOKEN')

DEPLOY_SERVICE_ACCOUNT = os.getenv('DEPLOY_SERVICE_ACCOUNT')
DEPLOY_SERVICE_NAME = os.getenv('DEPLOY_SERVICE_NAME')

#    --service-account $DEPLOY_SERVICE_ACCOUNT \
SECRET_TOKEN, DEPLOY_SERVICE_ACCOUNT, DEPLOY_SERVICE_NAME

('prd-izipay-token-7rMVpUvdo8T3BlbkFJvlPEtwcw',
 'prd-dataops-apis-onemarketer@prd-izipay-advanced-analytics.iam.gserviceaccount.com',
 'prd-izipay-hashing-api-v0.1')

# API Chat

In [3]:
DIRECTORY='service_api'
!mkdir -p $DIRECTORY

In [4]:
%%writefile $DIRECTORY/requirements.txt
numpy==1.26.4
pandas==2.2.2
urllib3==1.26.16
requests
fastapi
pydantic
uvicorn
google-cloud-bigquery
google-cloud-firestore
gunicorn==20.1.0

Writing service_api/requirements.txt


In [16]:
%%writefile $DIRECTORY/Dockerfile

# Start from a base image
FROM python:3.11

ENV PYTHONUNBUFFERED True
#ENV PATH="/root/.local/bin:${PATH}"

# Set the working directory
WORKDIR /app

# Copy the requirements file into the container
COPY requirements.txt requirements.txt

# Install the required packages
RUN pip install --upgrade pip
RUN pip install --no-cache-dir -r requirements.txt

# Expose the app port
# EXPOSE 80
ENV PORT=80
EXPOSE $PORT

COPY . ./
# Run command
# CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", $PORT]
CMD exec uvicorn main:app --host 0.0.0.0 --port ${PORT}

Overwriting service_api/Dockerfile


In [17]:
%%writefile $DIRECTORY/deploy.sh

export $(xargs < ../.env)

PROJECT_ID=$GCP_PROJECT_ID

DEPLOY_SERVICE_ACCOUNT=$DEPLOY_SERVICE_ACCOUNT
DEPLOY_SERVICE_NAME=$DEPLOY_SERVICE_NAME

REGION='us-central1'
CLOUD_RUN_PROJECT_NAME=$DEPLOY_SERVICE_NAME

echo $CLOUD_RUN_PROJECT_NAME
echo $REGION
echo $DEPLOY_SERVICE_ACCOUNT
echo 'project id-> ' $PROJECT_ID

gcloud run deploy ${CLOUD_RUN_PROJECT_NAME} \
    --region $REGION \
    --project $PROJECT_ID \
    --allow-unauthenticated \
    --cpu-boost \
    --cpu=1 \
    --memory=0.5G \
    --min-instances=0 \
    --max-instances=5 \
    --service-account $DEPLOY_SERVICE_ACCOUNT \
    --set-env-vars="GCP_PROJECT_ID=$PROJECT_ID" \
    --set-env-vars="SECRET_TOKEN=$SECRET_TOKEN" \
    --verbosity=debug \
    --source .

SERVICE_URL=$(gcloud run services describe $CLOUD_RUN_PROJECT_NAME --format 'value(status.url)' --project $PROJECT_ID --region $REGION)
echo "SERVICE_URL: $SERVICE_URL"

Overwriting service_api/deploy.sh


## Scripts

In [18]:
%%writefile $DIRECTORY/utils.py
from google.cloud import firestore
import hashlib
from datetime import datetime, date
from typing import Optional, Dict, List
import uuid

class FirestoreLogs:
    def __init__(self, project_id:str, database_id:str, collection_name: str = "logs"):
        """
        Inicializa la conexión con Firestore.
        
        Args:
            collection_name (str): Nombre de la colección en Firestore
        """
        self.db = firestore.Client(project=project_id, database=database_id)
        self.collection = self.db.collection(collection_name)
    
    def _hash_ruc(self, ruc: str) -> str:
        """
        Procesa el RUC para determinar el código del tipo de RUC (COD_TIP_RUC)
        y calcula el hash final (INTER_HASH_URC).

        Parámetros:
            ruc (str): Número de RUC.

        Retorna:
            dict: Un diccionario con el RUC original, COD_TIP_RUC e INTER_HASH_URC.
        """
        # Convertir a cadena y eliminar espacios
        ruc = str(ruc).strip()

        # Determinar el código del tipo de RUC (COD_TIP_RUC)
        if len(ruc) == 8:
            cod_tip_ruc = '01'
        elif len(ruc) == 9 or (10 <= len(ruc) <= 12 and ruc[0] == '0'):
            cod_tip_ruc = '02'
        elif len(ruc) == 11 and ruc[:2] in ['20', '17', '15', '10', '12', '11']:
            cod_tip_ruc = '03'
        else:
            cod_tip_ruc = '06'

        # Rellenar el RUC con ceros a la izquierda hasta 15 caracteres
        padded_ruc = cod_tip_ruc + ruc.zfill(15)

        # Calcular el hash SHA256 anidado
        inner_hash = hashlib.sha256(padded_ruc.encode()).hexdigest().upper()
        inter_hash_urc = hashlib.sha256(inner_hash.encode()).hexdigest().upper()

        # Retornar los resultados
        # Calcular y retornar el hash SHA256
        return inter_hash_urc

    def store_log(self, ruc: str) -> Dict[str, str]:
        """
        Almacena un nuevo log en Firestore usando un ID único.
        
        Args:
            ruc (str): RUC a almacenar
            
        Returns:
            Dict[str, str]: Datos almacenados incluyendo el hash generado
        """
        
        ruc_hash = self._hash_ruc(ruc)
        
        log_data = {
            'timestamp_consulta': datetime.now(),
            'ruc': ruc,
            'ruc_hasheado': ruc_hash,
            'process_date': date.today().strftime("%d-%m-%Y"),
            'record_source': 'API_IZIPAY',
            'load_date': datetime.now(),
            'creation_user': 'ETL_USER'
        }
        
        # Generar ID único para cada log
        doc_id = str(uuid.uuid4())
        self.collection.document(doc_id).set(log_data)
        
        return ruc_hash, log_data
    
    def get_ruc_by_hash(self, ruc_hash: str) -> List[Dict]:
        """
        Busca todos los registros asociados a un hash de RUC.
        
        Args:
            ruc_hash (str): Hash del RUC a buscar
            
        Returns:
            List[Dict]: Lista de todos los registros encontrados, ordenados por fecha
        """
        docs = (self.collection
                .where('ruc_hasheado', '==', ruc_hash)
                .order_by('fecha_process', direction=firestore.Query.DESCENDING)
                .stream())
        
        return [doc.to_dict() for doc in docs]
    
    def get_latest_ruc_by_hash(self, ruc_hash: str) -> Optional[str]:
        """
        Obtiene el RUC más reciente asociado a un hash.
        
        Args:
            ruc_hash (str): Hash del RUC a buscar
            
        Returns:
            Optional[str]: RUC más reciente si se encuentra, None si no existe
        """
        docs = (self.collection
                .where('ruc_hasheado', '==', ruc_hash)
                .order_by('load_date', direction=firestore.Query.DESCENDING)
                .limit(1)
                .stream())
        
        for doc in docs:  # Solo tomará el primer documento
            return doc.to_dict().get('ruc')
        return None

    def get_ruc_history(self, ruc: str) -> List[Dict]:
        """
        Obtiene todo el historial de logs para un RUC específico.
        
        Args:
            ruc (str): RUC del cual obtener el historial
            
        Returns:
            List[Dict]: Lista de todos los logs asociados al RUC
        """
        ruc_hash = self._hash_ruc(ruc)
        return self.get_ruc_by_hash(ruc_hash)

Overwriting service_api/utils.py


In [19]:
%%writefile $DIRECTORY/main.py
import os
import json
import time
import requests
from pydantic import BaseModel, validator

from typing import Optional as OptionalType

from fastapi import FastAPI, Depends
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI, Header, HTTPException, Request

from starlette.middleware.base import BaseHTTPMiddleware
import logging

import hashlib

from google.cloud import bigquery
from datetime import datetime

from utils import FirestoreLogs

app = FastAPI(title="Hashing API",
              description="""Convertir a hash un RUC""",
              version="0.0.1"
             )

# Reemplaza con tu IP fija.
class IPRestrictionMiddleware(BaseHTTPMiddleware):
    def __init__(self, app: FastAPI, authorized_ips: list[str]):
        super().__init__(app)
        self.authorized_ips = authorized_ips

    async def dispatch(self, request: Request, call_next):
        # Imprimimos todas las posibles fuentes de IP
        print("X-Forwarded-For:", request.headers.get("X-Forwarded-For"))
        print("X-Real-IP:", request.headers.get("X-Real-IP"))
        print("client.host:", request.client.host)
        
        client_ip = request.headers.get("X-Forwarded-For", request.client.host)
        if isinstance(client_ip, str) and "," in client_ip:
            client_ip = client_ip.split(",")[0].strip()
        
        print("IP detectada:", client_ip)
        print("IPs autorizadas:", self.authorized_ips)

        if client_ip not in self.authorized_ips:
            raise HTTPException(
                status_code=403,
                detail=f"Acceso denegado para IP: {client_ip}"
            )

        return await call_next(request)

# Uso con múltiples IPs

# IPs de prueba en local
#AUTHORIZED_IPS = ["127.0.0.1", "190.239.73.213"]

# AUTHORIZED_IPS = ["PRD OM", "OM DEV CHATBOT"]
AUTHORIZED_IPS = ["35.232.156.102", "190.113.29.10"]
origins = ["*"]

# Middlewares
app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

app.add_middleware(
    IPRestrictionMiddleware,
    authorized_ips=AUTHORIZED_IPS
)

class RequestChatModel(BaseModel):
    input: str 

# @app.before_request
# def restrict_ip():
#     # Obtén la IP del cliente (usa X-Forwarded-For si estás detrás de un proxy)
#     client_ip = request.headers.get("X-Forwarded-For", request.remote_addr)
#     # Verifica si la IP es autorizada
#     if client_ip != AUTHORIZED_IP:
#         return jsonify({"error": "Acceso denegado"}, 403)

# Home page
@app.get("/")
def home( token: str = Header(None)):
    if token is None or token != SECRET_TOKEN:
        raise HTTPException(status_code=401, detail="Token inválido")
    return {"message": "API Hashing Ruc Izipay", "model_version": 0.1}

@app.post("/hashing")
def api_hashing(payload: RequestChatModel, token: str = Header(None)):
    
    if token is None or token != SECRET_TOKEN:
        raise HTTPException(status_code=401, detail="Token inválido")
    
    # Get Data
    data = payload.dict()
    # print('Payload:', data)
    
    data_input = data.get('input')
    
    try:
        response = {}
        
        ruc_hasheado, _ = api_logger.store_log(ruc=data_input)        
        
        response["ruc_hash"] = ruc_hasheado
        
        return response
    except Exception as e:
        print("error-> ",str(e))
        raise HTTPException(status_code=401, detail=str(e))

@app.post("/unhashing")
def api_unhushing(payload: RequestChatModel, token: str = Header(None)):
    
    if token is None or token != SECRET_TOKEN:
        raise HTTPException(status_code=401, detail="Token inválido")
    
    # Get Data
    data = payload.dict()
    # print('Payload:', data)
    
    data_input = data.get('input')
    
    try:
        response = {}
        
        ruc = api_logger.get_latest_ruc_by_hash(ruc_hash=data_input)        
        
        response["ruc"] = ruc
        
        return response
    except Exception as e:
        print("error-> ",str(e))
        raise HTTPException(status_code=401, detail=str(e))

# Startup
async def startup_event():
    global SECRET_TOKEN    
    global GCP_PROJECT_ID
    global api_logger
    
    GCP_PROJECT_ID = os.getenv('GCP_PROJECT_ID', "my-secret-token")
    SECRET_TOKEN = os.getenv('SECRET_TOKEN', "my-secret-token")
    
    api_logger = FirestoreLogs(project_id=GCP_PROJECT_ID, database_id='logs-hushing-ruc',collection_name="collection-logs-hushing-ruc")
    
    # Inicializar el cliente
    #client = bigquery.Client(project=GCP_PROJECT_ID)

    # cred = credentials.ApplicationDefault()
    # firebase_admin.initialize_app(cred, {'projectId': GCP_PROJECT_ID,})
    # FS_CLIENT = firestore.client()
    print("La aplicación se está iniciando")    
    
app.add_event_handler("startup", startup_event)

Overwriting service_api/main.py


#### Steps:

```sh
cd ./service_api
export $(xargs < ../.env-dev)
uvicorn main:app --reload
```

# Testing

In [20]:
import os
import json
import requests

secret_token = os.getenv('SECRET_TOKEN')

# Localhost
#url = "http://127.0.0.1:8000"
url = "https://dev-izi-hashing-api-v1-3cvhiueqqa-uc.a.run.app"

In [21]:
secret_token

'izi-token-7rMVpUvdo8T3BlbkFJvlPEtwcw'

## Ping

In [22]:
endpoint = f"{url}"
headers = {'token': secret_token}
response = requests.get(endpoint, headers=headers)

In [23]:
response

<Response [500]>

In [24]:
response.json()

JSONDecodeError: Expecting value: line 1 column 1 (char 0)

## Hashing

In [53]:
endpoint = f"{url}/hashing"
headers = {'token': secret_token}

In [54]:
ruc = '10474840552'

In [55]:
data = {}
data["input"] = ruc

In [56]:
response = requests.post(endpoint, json=data, headers=headers)

In [57]:
response_result = response.json()

In [58]:
response_result

{'ruc_hash': '327D350D0D82F81F03C6E5629E031F2541609FF9218A5B6AE7196A4794551535'}

In [59]:
print("ruc: ", data["input"])
print("ruc hasheado: ", response_result["ruc_hash"])

ruc:  10474840552
ruc hasheado:  327D350D0D82F81F03C6E5629E031F2541609FF9218A5B6AE7196A4794551535


## UnHashing

In [41]:
endpoint = f"{url}/unhashing"
headers = {'token': secret_token}

In [42]:
hush_ruc = '327D350D0D82F81F03C6E5629E031F2541609FF9218A5B6AE7196A4794551535'

In [43]:
data = {}
data["input"] = hush_ruc

In [44]:
response = requests.post(endpoint, json=data, headers=headers)

In [45]:
response_result = response.json()

In [46]:
response_result

{'ruc': '10474840552'}

In [47]:
print("ruc husheado: ", data["input"])
print("ruc: ", response_result["ruc"])

ruc husheado:  327D350D0D82F81F03C6E5629E031F2541609FF9218A5B6AE7196A4794551535
ruc:  10474840552
