# Variables

In [20]:
#!pip install cybersource-rest-client-python

## DEV

In [None]:
%%writefile .env

GCP_PROJECT_ID=dev-izipay-advanced-analytics
BUCKET_NAME=dev-ingesta-izipay-fx33
SECRET_TOKEN=izi-cyber-token-7rMVpUvdo8T3BlbkFJvlPEtwcw

DEPLOY_SERVICE_ACCOUNT=dev-izipay-gcai-api-intgr@dev-izipay-advanced-analytics.iam.gserviceaccount.com
DEPLOY_SERVICE_NAME=dev-izi-cyber-api-v1

## PRD

In [2]:
%%writefile .env

GCP_PROJECT_ID=prd-izipay-data-storage
BUCKET_NAME=prd-ingesta-izipay-fx32
SECRET_TOKEN=prd-izipay-token-7rMVpUvdo8T3BlbkFJvlPEtwcw

DEPLOY_SERVICE_ACCOUNT=prd-izipay-gcai-api-intgr@prd-izipay-advanced-analytics.iam.gserviceaccount.com
DEPLOY_SERVICE_NAME=prd-izi-cyber-api-v1

Overwriting .env


# Get Variables

In [None]:
import os
from dotenv import load_dotenv

load_dotenv(".env")

GCP_PROJECT_ID = os.getenv('GCP_PROJECT_ID')
BUCKET_NAME = os.getenv('BUCKET_NAME')
SECRET_TOKEN = os.getenv('SECRET_TOKEN')

DEPLOY_SERVICE_ACCOUNT = os.getenv('DEPLOY_SERVICE_ACCOUNT')
DEPLOY_SERVICE_NAME = os.getenv('DEPLOY_SERVICE_NAME')

#    --service-account $DEPLOY_SERVICE_ACCOUNT \
SECRET_TOKEN, BUCKET_NAME, DEPLOY_SERVICE_ACCOUNT, DEPLOY_SERVICE_NAME

# API Chat

In [None]:
DIRECTORY='service_api'
!mkdir -p $DIRECTORY

In [4]:
%%writefile $DIRECTORY/requirements.txt
numpy==1.26.4
pandas==2.2.2
pyarrow
urllib3==1.26.16
requests
fastapi
pydantic
uvicorn
cybersource-rest-client-python
google-cloud-storage
gunicorn==20.1.0

Overwriting service_api/requirements.txt


In [5]:
%%writefile $DIRECTORY/Dockerfile

# Start from a base image
FROM python:3.11

ENV PYTHONUNBUFFERED True
#ENV PATH="/root/.local/bin:${PATH}"

# Set the working directory
WORKDIR /app

# Copy the requirements file into the container
COPY requirements.txt requirements.txt

# Install the required packages
RUN pip install --upgrade pip
RUN pip install --no-cache-dir -r requirements.txt

# Expose the app port
# EXPOSE 80
ENV PORT=80
EXPOSE $PORT

COPY . ./
# Run command
# CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", $PORT]
CMD exec uvicorn main:app --host 0.0.0.0 --port ${PORT}

Overwriting service_api/Dockerfile


In [6]:
%%writefile $DIRECTORY/deploy.sh

export $(xargs < ../.env)

PROJECT_ID=$GCP_PROJECT_ID

DEPLOY_SERVICE_ACCOUNT=$DEPLOY_SERVICE_ACCOUNT
DEPLOY_SERVICE_NAME=$DEPLOY_SERVICE_NAME

BUCKET_NAME=$BUCKET_NAME
SECRET_TOKEN=$SECRET_TOKEN

REGION='us-central1'
CLOUD_RUN_PROJECT_NAME=$DEPLOY_SERVICE_NAME

echo $CLOUD_RUN_PROJECT_NAME
echo $REGION
echo $DEPLOY_SERVICE_ACCOUNT
echo 'project id-> ' $PROJECT_ID

gcloud run deploy ${CLOUD_RUN_PROJECT_NAME} \
    --region $REGION \
    --project $PROJECT_ID \
    --allow-unauthenticated \
    --cpu-boost \
    --cpu=2 \
    --memory=4Gi \
    --min-instances=0 \
    --max-instances=5 \
    --service-account $DEPLOY_SERVICE_ACCOUNT \
    --set-env-vars="GCP_PROJECT_ID=$PROJECT_ID" \
    --set-env-vars="SECRET_TOKEN=$SECRET_TOKEN" \
    --set-env-vars="BUCKET_NAME=$BUCKET_NAME" \
    --verbosity=debug \
    --source .

SERVICE_URL=$(gcloud run services describe $CLOUD_RUN_PROJECT_NAME --format 'value(status.url)' --project $PROJECT_ID --region $REGION)
echo "SERVICE_URL: $SERVICE_URL"

Overwriting service_api/deploy.sh


In [7]:
ORGANIZATION_ID = os.getenv('ORGANIZATION_ID', "my-secret-token")
MERCHANT_KEY_ID = os.getenv('MERCHANT_KEY_ID', "my-secret-token")
MERCHANT_SECREY_KEY_ID = os.getenv('MERCHANT_SECREY_KEY_ID', "my-secret-token")

## Scripts

In [8]:
%%writefile $DIRECTORY/Configuration.py

import os
from CyberSource.logging.log_configuration import LogConfiguration

class Configuration:
    def __init__(self, organization_id, merchant_id, merchant_secret_id):
        self.authentication_type ="http_signature"
        self.merchantid = organization_id
        self.run_environment = "api.cybersource.com"
        self.request_json_path = ""
        # JWT PARAMETERS
        self.key_alias = organization_id
        self.key_pass = organization_id
        self.key_file_name = organization_id
        self.keys_directory = os.path.join(os.getcwd(), "resources")
        # HTTP PARAMETERS
        self.merchant_keyid = merchant_id
        self.merchant_secretkey = merchant_secret_id
        # META KEY PARAMETERS
        self.use_metakey = False
        self.portfolio_id = ''
        # CONNECTION TIMEOUT PARAMETER
        self.timeout = 1000
        # LOG PARAMETERS
        self.enable_log = True
        self.log_file_name = "cybs"
        self.log_maximum_size = 10487560
        self.log_directory = os.path.join(os.getcwd(), "Logs")
        self.log_level = "Debug"
        self.enable_masking = True
        self.log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
        self.log_date_format = "%Y-%m-%d %H:%M:%S"
        self.useMLEGlobally = False

    def get_configuration(self):
        configuration_dictionary = {}
        configuration_dictionary["authentication_type"] = self.authentication_type
        configuration_dictionary["merchantid"] = self.merchantid
        configuration_dictionary["run_environment"] = self.run_environment
        configuration_dictionary["request_json_path"] = self.request_json_path
        configuration_dictionary["key_alias"] = self.key_alias
        configuration_dictionary["key_password"] = self.key_pass
        configuration_dictionary["key_file_name"] = self.key_file_name
        configuration_dictionary["keys_directory"] = self.keys_directory
        configuration_dictionary["merchant_keyid"] = self.merchant_keyid
        configuration_dictionary["merchant_secretkey"] = self.merchant_secretkey
        configuration_dictionary["use_metakey"] = self.use_metakey
        configuration_dictionary["portfolio_id"] = self.portfolio_id
        configuration_dictionary["timeout"] = self.timeout
        configuration_dictionary['useMLEGlobally'] = self.useMLEGlobally

        log_config = LogConfiguration()
        log_config.set_enable_log(self.enable_log)
        log_config.set_log_directory(self.log_directory)
        log_config.set_log_file_name(self.log_file_name)
        log_config.set_log_maximum_size(self.log_maximum_size)
        log_config.set_log_level(self.log_level)
        log_config.set_enable_masking(self.enable_masking)
        log_config.set_log_format(self.log_format)
        log_config.set_log_date_format(self.log_date_format)

        configuration_dictionary["log_config"] = log_config
        return configuration_dictionary

Overwriting service_api/Configuration.py


In [9]:
%%writefile $DIRECTORY/main.py

import os
import json
import time
import pandas as pd
import requests
from pydantic import BaseModel, validator

from typing import Optional as OptionalType

from fastapi import FastAPI, Depends
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI, Header, HTTPException, Request

from starlette.middleware.base import BaseHTTPMiddleware
import logging

import hashlib

from datetime import datetime

#from utils import Configuration
from CyberSource import *
from pathlib import Path
import os
import json
from importlib.machinery import SourceFileLoader
from CyberSource.api.report_downloads_api import ReportDownloadsApi

config_file = os.path.join(os.getcwd(), "Configuration.py")
configuration = SourceFileLoader("module.name", config_file).load_module()

app = FastAPI(title="Cyber API",
              description="""Obtener datos de Cyber y ponerlos en Storage Izipay""",
              version="0.0.1"
             )


origins = ["*"]

# Middlewares
app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# app.add_middleware(
#     IPRestrictionMiddleware,
#     authorized_ips=AUTHORIZED_IPS
# )

class RequestCyberModel(BaseModel):
    organization_id: str
    merchant_key_id: str
    merchant_secret_key_id: str
    report_name: str
    fecha: str

# @app.before_request
# def restrict_ip():
#     # Obtén la IP del cliente (usa X-Forwarded-For si estás detrás de un proxy)
#     client_ip = request.headers.get("X-Forwarded-For", request.remote_addr)
#     # Verifica si la IP es autorizada
#     if client_ip != AUTHORIZED_IP:
#         return jsonify({"error": "Acceso denegado"}, 403)


def del_none(d):
    for key, value in list(d.items()):
        if value is None:
            del d[key]
        elif isinstance(value, dict):
            del_none(value)
    return d

def download_report(process_date, report_name, organization_id, merchant_id, merchant_secret_id):
    organizationId = organization_id #PARAMETRIZAR
    reportDate = process_date
    reportName = report_name #PARAMETRIZAR

    try:
        config_obj = configuration.Configuration(organization_id=organizationId, merchant_id=merchant_id, merchant_secret_id=merchant_secret_id)
        
        client_config = config_obj.get_configuration()
        api_instance = ReportDownloadsApi(client_config)
        #api_instance.api_client.download_file_path = os.path.join(os.getcwd(), "resources", "download_report.csv")
        api_instance.api_client.download_file_path = os.path.join(os.getcwd(), csv_path)
        print(f"antes del download report\n {organizationId}, {reportDate}, {reportName}")
        status, headers = api_instance.download_report(reportDate, reportName, organization_id=organizationId)

        print("Download Status : ", status)
        print("Response Headers : ", headers)

        print("Response downloaded at the location : " + api_instance.api_client.download_file_path)
        #write_log_audit(status)
    except Exception as e:
        #write_log_audit(e.status if hasattr(e, 'status') else 999)
        print("\nException when calling ReportDownloadsApi->download_report: %s\n" % e)

def load_cyber_report(csv_path):
    try:
        # Intentar leer el archivo, saltando la primera fila y forzando todo a string
        df = pd.read_csv(csv_path, dtype=str, skiprows=1)

        # Verificar si el DataFrame está vacío o tiene columnas mal formateadas
        if df.empty or any('unnamed' in col.lower() for col in df.columns):
            print("❌ Error en la carga: el archivo está vacío o tiene columnas inválidas ('Unnamed').")
            df_cyber = pd.DataFrame()
        else:
            print("✅ Archivo cargado correctamente.")
            print("Columnas:", df.columns.tolist())
            print("Número de filas:", len(df))
            df_cyber = df

    except Exception as e:
        print(f"❌ Error al intentar leer el archivo: {e}")
        df_cyber = pd.DataFrame()

    return df_cyber

def fn_ingesta(fecha, report_name, var_bucket_name, var_project_id, var_dataset_id, df_cyber):
    from datetime import datetime, timedelta
    import gc
    from google.cloud import storage

    fecha0 = datetime.strptime(fecha, "%Y-%m-%d")
    fecha_d_1 = fecha0 - timedelta(days=1)
    fecha_d_1yyyymmdd = fecha_d_1.strftime("%Y%m%d")

    try:
        df = df_cyber.copy()

        # Eliminar columnas sin cabecera
        df = df.loc[:, ~df.columns.str.contains('^Unnamed')]
        df = df.astype(str).replace('nan', None)

        if df.empty or df.isnull().all().all():
            print(f"⚠️ Archivo creado para {fecha}, pero está vacío. No se cargará.")
        else:
            # Guardar en Parquet
            file_name = f"{report_name}_{fecha_d_1yyyymmdd}.parquet"
            df.to_parquet(file_name, index=False)

            # Subir a GCS
            storage_client = storage.Client()
            bucket = storage_client.bucket(var_bucket_name)
            destination_blob_name = f"Cybersource/Transaccional/{report_name}/{fecha_d_1yyyymmdd[:4]}/{fecha_d_1yyyymmdd[4:6]}/{file_name}"
            blob = bucket.blob(destination_blob_name)
            blob.upload_from_filename(file_name)

            print(f"✅ Archivo '{file_name}' guardado en Cloud Storage en '{destination_blob_name}'")
            
            if os.path.exists(file_name):
                os.remove(file_name)
                print(f"Excel {file_name} temporal eliminado.")

        del df
        gc.collect()

    except Exception as e:
        print(f"❌ Error durante la ingesta para {fecha}: {e}")

# Home page
@app.get("/")
def home( token: str = Header(None)):
    if token is None or token != SECRET_TOKEN:
        raise HTTPException(status_code=401, detail="Token inválido")
    return {"message": "API Cybersource Izipay", "model_version": 0.1}

@app.post("/cyber")
def api_cyber(payload: RequestCyberModel, token: str = Header(None)):

    if token is None or token != SECRET_TOKEN:
        raise HTTPException(status_code=401, detail="Token inválido")

    # Get Data
    data = payload.dict()
    print('Payload:', data)

    #current_date = datetime.now()
    #formatted_date = current_date.strftime("%Y-%m-%d")
    var_bucket_name = BUCKET_NAME
    var_project_id = GCP_PROJECT_ID
    var_dataset_id = ''
    var_file = ''
    var_source =''
    #fecha =  formatted_date

    #data_input = data.get('input')
    fecha = data.get('fecha')
    organization_id = data.get('organization_id')
    merchant_key_id = data.get('merchant_key_id')
    merchant_secret_key_id = data.get('merchant_secret_key_id')
    report_name = data.get('report_name')
        
    try:
        response = {}

        download_report(fecha, report_name, organization_id, merchant_key_id, merchant_secret_key_id)
        df_cyber = load_cyber_report(csv_path=csv_path)
        fn_ingesta(fecha, report_name, var_bucket_name, var_project_id, var_dataset_id, df_cyber)
        response["success"] = True
        #ruta_archivo = "/ruta/al/archivo.txt"

        if os.path.exists(csv_path):
            os.remove(csv_path)
            print(f"Excel {csv_path} temporal eliminado.")
        return response
    except Exception as e:
        print("error ->  ",str(e))
        raise HTTPException(status_code=401, detail=str(e))

# Startup
async def startup_event():
    global SECRET_TOKEN
    global GCP_PROJECT_ID
    global BUCKET_NAME
    global api_logger
    global csv_path
    GCP_PROJECT_ID = os.getenv('GCP_PROJECT_ID', "my-secret-token")
    SECRET_TOKEN = os.getenv('SECRET_TOKEN', "my-secret-token")
    BUCKET_NAME = os.getenv('BUCKET_NAME', "my-secret-token")
        
    # Crear carpeta si no existe
    resources_path = 'resources'
    os.makedirs(resources_path, exist_ok=True)
    
    # Crear CSV vacío
    current_date = datetime.now()
    csv_path = os.path.join(resources_path, f'download_report_{current_date}.csv')
    pd.DataFrame().to_csv(csv_path, index=False)

    print(f"Archivo CSV vacío creado en: {csv_path}")

    #api_logger = FirestoreLogs(project_id=GCP_PROJECT_ID, database_id='logs-hushing-ruc',collection_name="collection-logs-hushing-ruc")

    # Inicializar el cliente
    #client = bigquery.Client(project=GCP_PROJECT_ID)

    # cred = credentials.ApplicationDefault()
    # firebase_admin.initialize_app(cred, {'projectId': GCP_PROJECT_ID,})
    # FS_CLIENT = firestore.client()
    print("La aplicación se está iniciando")

app.add_event_handler("startup", startup_event)

Overwriting service_api/main.py


#### Steps:

```sh
cd ./service_api
export $(xargs < ../.env)
uvicorn main:app --reload
```

# Testing

In [10]:
import os
import json
import requests

secret_token = os.getenv('SECRET_TOKEN')

# Localhost
#url = "http://127.0.0.1:8000"
url = "https://dev-izi-cyber-api-v1-dl7olq7kiq-uc.a.run.app"

In [11]:
secret_token

'izi-cyber-token-7rMVpUvdo8T3BlbkFJvlPEtwcw'

## Ping

In [12]:
endpoint = f"{url}"
headers = {'token': secret_token}
response = requests.get(endpoint, headers=headers)

In [13]:
response

<Response [200]>

In [14]:
response.json()

{'message': 'API Cybersource Izipay', 'model_version': 0.1}

## Cyber

### 1. TRR DAILY

In [15]:
from datetime import datetime

In [16]:
endpoint = f"{url}/cyber"
headers = {'token': secret_token}

In [17]:
ORGANIZATION_ID_TRR_DAILY="izipay_master_acct"
MERCHANT_KEY_ID_TRR_DAILY="98c3b49d-0dc0-4a35-933c-0037d1789462"
MERCHANT_SECRET_KEY_ID_TRR_DAILY="/mxDSoyXaym+HDoVtqPm8zbdQ3Poos3CO3ZHlIEHOxc=="
REPORT_NAME_TRR_DAILY="TRR_daily"

In [18]:
data = {}
data["organization_id"] = ORGANIZATION_ID_TRR_DAILY
data["merchant_key_id"] = MERCHANT_KEY_ID_TRR_DAILY
data["merchant_secret_key_id"] = MERCHANT_SECRET_KEY_ID_TRR_DAILY
data["report_name"] = REPORT_NAME_TRR_DAILY
data["fecha"] = "2025-07-14"

In [19]:
current_date = datetime.now()
formatted_date = current_date.strftime("%Y-%m-%d")
data["fecha"] = formatted_date

In [20]:
type(formatted_date)

str

In [21]:
data["fecha"] = "2025-07-14"

In [22]:
response = requests.post(endpoint, json=data, headers=headers)

In [23]:
response_result = response.json()

In [24]:
response_result

{'success': True}

### 2. TRR LOW DIARIO

In [135]:
ORGANIZATION_ID_TRR_LOW_DIARIO="izipay_low"
MERCHANT_KEY_ID_TRR_LOW_DIARIO="a4a51476-564b-41f8-b95a-50a3374dfd2e"
MERCHANT_SECRET_KEY_ID_TRR_LOW_DIARIO="JnlUrHHtGXf41VBWilrJP74CXx7mDCqrHMMQztMVlsk="
REPORT_NAME_TRR_LOW_DIARIO="TRR-LOW-DIARIO"

In [136]:
data = {}
data["organization_id"] = ORGANIZATION_ID_TRR_DAILY
data["merchant_key_id"] = MERCHANT_KEY_ID_TRR_DAILY
data["merchant_secret_key_id"] = MERCHANT_SECRET_KEY_ID_TRR_DAILY
data["report_name"] = REPORT_NAME_TRR_DAILY
data["fecha"] = "2025-07-14"

In [137]:
response = requests.post(endpoint, json=data, headers=headers)

In [138]:
response_result = response.json()

In [139]:
response_result

{'success': True}

# Scheduling

## TRR Dayli

In [34]:
%%writefile $DIRECTORY/job-cyber-trr-dayli.sh

PROJECT_ID="dev-izipay-advanced-analytics"
JOB_NAME="job_api_cyber_trr_daily_3am"
REGION="us-central1"
CRON_SCHEDULE="10 3 * * *"
TIME_ZONE="America/Lima"
RUN_URL="https://dev-izi-cyber-api-v1-dl7olq7kiq-uc.a.run.app/cyber"
FECHA=$(date +%F)

BODY="{
    \"organization_id\": \"izipay_master_acct\",
    \"merchant_key_id\": \"98c3b49d-0dc0-4a35-933c-0037d1789462\",
    \"merchant_secret_key_id\": \"/mxDSoyXaym+HDoVtqPm8zbdQ3Poos3CO3ZHlIEHOxc==\",
    \"report_name\": \"TRR_daily\",
    \"fecha\": \"$FECHA\"
}"

echo $BODY

HEADERS='Content-Type=application/json, token=izi-cyber-token-7rMVpUvdo8T3BlbkFJvlPEtwcw'

gcloud scheduler jobs create http $JOB_NAME \
  --project=$PROJECT_ID \
  --location=$REGION \
  --schedule="$CRON_SCHEDULE" \
  --time-zone="$TIME_ZONE" \
  --uri="$RUN_URL" \
  --http-method=POST \
  --message-body="$BODY" \
  --headers="$HEADERS"

Overwriting service_api/job-cyber-trr-dayli.sh


In [None]:
  # --retry-count=3 \
  # --min-backoff=20s \
  # --max-backoff=300s \
  # --max-doublings=2

In [10]:
%%writefile $DIRECTORY/job-cyber-dmdr-low-diario.sh

PROJECT_ID="dev-izipay-advanced-analytics"
JOB_NAME="job_api_cyber_dmdr_low_diario_3am"
REGION="us-central1"
CRON_SCHEDULE="20 3 * * *"
TIME_ZONE="America/Lima"
RUN_URL="https://dev-izi-cyber-api-v1-dl7olq7kiq-uc.a.run.app/cyber"
FECHA=$(date +%F)

BODY="{
    \"organization_id\": \"izipay_low\",
    \"merchant_key_id\": \"a8fe761c-6179-487c-863b-ffb2dc03e2b1\",
    \"merchant_secret_key_id\": \"pPudDBsvHEvvzh7LZAj/1td0FwnWLhQnDZsCnik2HbM=\",
    \"report_name\": \"DMDR-LOW-DIARIO\",
    \"fecha\": \"$FECHA\"
}"

echo $BODY

HEADERS='Content-Type=application/json, token=izi-cyber-token-7rMVpUvdo8T3BlbkFJvlPEtwcw'

gcloud scheduler jobs create http $JOB_NAME \
  --project=$PROJECT_ID \
  --location=$REGION \
  --schedule="$CRON_SCHEDULE" \
  --time-zone="$TIME_ZONE" \
  --uri="$RUN_URL" \
  --http-method=POST \
  --message-body="$BODY" \
  --headers="$HEADERS"

Overwriting service_api/job-cyber-trr-low-diario.sh


In [None]:
%%writefile $DIRECTORY/job-cyber-dmdr-high-diario.sh

PROJECT_ID="dev-izipay-advanced-analytics"
JOB_NAME="job_api_cyber_dmdr_high_diario_3am"
REGION="us-central1"
CRON_SCHEDULE="30 3 * * *"
TIME_ZONE="America/Lima"
RUN_URL="https://dev-izi-cyber-api-v1-dl7olq7kiq-uc.a.run.app/cyber"
FECHA=$(date +%F)

BODY="{
    \"organization_id\": \"izipay_high\",
    \"merchant_key_id\": \"a8fe761c-6179-487c-863b-ffb2dc03e2b1\",
    \"merchant_secret_key_id\": \"pPudDBsvHEvvzh7LZAj/1td0FwnWLhQnDZsCnik2HbM=\",
    \"report_name\": \"DMDR-HIGH-DIARIO\",
    \"fecha\": \"$FECHA\"
}"

echo $BODY

HEADERS='Content-Type=application/json, token=izi-cyber-token-7rMVpUvdo8T3BlbkFJvlPEtwcw'

gcloud scheduler jobs create http $JOB_NAME \
  --project=$PROJECT_ID \
  --location=$REGION \
  --schedule="$CRON_SCHEDULE" \
  --time-zone="$TIME_ZONE" \
  --uri="$RUN_URL" \
  --http-method=POST \
  --message-body="$BODY" \
  --headers="$HEADERS"