# Demo de Extracción desde Socrata

En este notebook se muestra paso a paso cómo utilizar la clase `SocrataDatasetLoader` para extraer datos de Socrata, aplicar validaciones de gobernanza y calidad, y registrar la metadata asociada al proceso. 

Cada celda está comentada para que puedas entender el propósito y el funcionamiento de cada paso.

## Librerias

In [1]:
# Importamos las librerías y módulos necesarios.
from sodapy import Socrata
import sys
import yaml

### Configuración del entorno de ejecución

In [2]:
from config.notebook_location import find_project_root

In [3]:
# Definir el nombre del directorio del proyecto
project_name = "personal-library"

# Encontrar la raíz del proyecto
project_root = find_project_root(project_name)

# Agregar la raíz del proyecto al principio de sys.path para facilitar las importaciones
sys.path.insert(0, project_root)
print("Project root added to sys.path:", project_root)

Project root added to sys.path: d:\Espacios de trabajo\personal-library


### Librerías personales

In [5]:
# Importamos las clases definidas en nuestro sistema de ingesta.
from ingestion.sources.from_socrata import SocrataDatasetLoader
from ingestion.governance.engine import GovernanceEngine
from ingestion.governance.engine_policy_autogen import get_or_create_policy
from ingestion.governance.metrics import MetricsEngine
from ingestion.base.metadata_logger import MetadataLogger
from ingestion.governance.remediation import RemediationEngine
from ingestion.governance.intelligent_improvement import IntelligentImprovementEngine

## Proceso de ingesta de los datos

In [21]:
# Inicializar el logger de metadatos
meta_logger = MetadataLogger(report_path="demos/reports/demo_socrata.parquet")

**1. Inicialización del Cliente Socrata**
 
Se crea una instancia del cliente de Socrata apuntando al dominio de datos. En este ejemplo, usamos "www.datos.gov.co". 
Si tu dataset requiere autenticación, provee las credenciales necesarias (aquí se usa `None` como placeholder).

In [7]:
# Inicializamos el cliente de Socrata.
client = Socrata("www.datos.gov.co", None)



**2. Configuración del Loader y Extracción de Datos**
 
Se crea una instancia de `SocrataDatasetLoader` y se utiliza para extraer datos de un dataset específico.
- **dataset_code:** Código del dataset en Socrata.
- **filters:** Filtros de consulta; en este ejemplo, extraemos registros donde `fecha_de_firma` es mayor o igual a "2023-01-01".
- **limit:** Número máximo de registros a extraer.

In [8]:
# Creamos la instancia del loader para Socrata.
loader = SocrataDatasetLoader(client)

In [9]:
# Definimos los parámetros para la extracción.
dataset_code = "jbjy-vk9h"  # Código del dataset en Socrata
filters = {"fecha_de_firma": (">=", "2023-01-01")}
limit = 100

In [22]:
meta_logger.log({
    "step": "data_extraction",
    "source": "Socrata",
    "dataset_code": dataset_code,
    "filters": filters,
    "limit": limit,
    "status": "started"
})

2025-03-29 18:37:19,445 - ingestion.base.metadata_logger - DEBUG - Logged metadata: {'step': 'data_extraction', 'source': 'Socrata', 'dataset_code': 'jbjy-vk9h', 'filters': {'fecha_de_firma': ('>=', '2023-01-01')}, 'limit': 100, 'status': 'started', 'uuid': '922889f7-64ec-4a29-bcea-604a65fb2f9d', 'timestamp': '2025-03-29T22:37:19.444448+00:00'}


In [10]:
# Realizamos la carga de datos.
df = loader.load_data(dataset_code=dataset_code, filters=filters, limit=limit)

# Mostramos las primeras filas del DataFrame extraído.
df.head(2)

2025-03-29 18:30:11,024 - ingestion.sources.from_socrata - DEBUG - Executing query: SELECT * WHERE fecha_de_firma >= '2023-01-01' LIMIT 100
2025-03-29 18:30:11,035 - urllib3.connectionpool - DEBUG - Starting new HTTPS connection (1): www.datos.gov.co:443
2025-03-29 18:30:12,898 - urllib3.connectionpool - DEBUG - https://www.datos.gov.co:443 "GET /resource/jbjy-vk9h.json?%24query=SELECT+%2A+WHERE+fecha_de_firma+%3E%3D+%272023-01-01%27+LIMIT+100 HTTP/1.1" 200 None
2025-03-29 18:30:13,218 - ingestion.sources.from_socrata - INFO - Loaded 100 rows from dataset jbjy-vk9h


Unnamed: 0,nombre_entidad,nit_entidad,departamento,ciudad,localizaci_n,orden,sector,rama,entidad_centralizada,proceso_de_compra,...,nombre_ordenador_del_gasto,tipo_de_documento_ordenador_del_gasto,n_mero_de_documento_ordenador_del_gasto,nombre_supervisor,tipo_de_documento_supervisor,n_mero_de_documento_supervisor,nombre_ordenador_de_pago,tipo_de_documento_ordenador_de_pago,n_mero_de_documento_ordenador_de_pago,fecha_de_notificaci_n_de_prorrogaci_n
0,HOSPITAL SAN JUAN DE DIOS DE HONDA ESE,890700666,Tolima,Honda,"Colombia, Tolima , Honda",Territorial,Salud y Protección Social,Corporación Autónoma,Descentralizada,CO1.BDOS.3681223,...,MANUEL ALFONSO GONZaLEZ CANTOR,Cédula de Ciudadanía,79393172,MARTHA LUCIA OSORIO RAMIREZ,Cédula de Ciudadanía,38281318,No definido,No definido,No definido,
1,HOSPITAL DE CASTILLA LA NUEVA EMPRESA SOCIAL D...,900004059,Meta,Castilla La Nueva,"Colombia, Meta , Castilla La Nueva",Territorial,Salud y Protección Social,Corporación Autónoma,Descentralizada,CO1.BDOS.3690528,...,ROSA MARIA JIMENEZ BAQUERO,Cédula de Ciudadanía,40370893,LUIS ENRIQUE BARON TELLO,Cédula de Ciudadanía,1121827353,No definido,No definido,No definido,


In [23]:
meta_logger.log({
    "step": "data_extraction",
    "record_count": len(df),
    "status": "completed"
})

2025-03-29 18:37:35,488 - ingestion.base.metadata_logger - DEBUG - Logged metadata: {'step': 'data_extraction', 'record_count': 100, 'status': 'completed', 'uuid': '029fa1d4-5aef-4604-8072-93bf29bc0688', 'timestamp': '2025-03-29T22:37:35.488826+00:00'}


**3. Carga de la Política de Gobernanza**

La política define las reglas de calidad y gobernabilidad que se aplicarán al dataset.

Se carga desde un archivo YAML. Asegúrate de que el archivo `s2_contracts.yaml` exista en la ruta indicada.

In [11]:
policy = get_or_create_policy(df, "s2_contracts.yaml")

[INFO] Policy 's2_contracts.yaml' not found. Generating default...


  return pd.to_datetime(col, errors='raise')
  return pd.to_datetime(col, errors='raise')
  return pd.to_datetime(col, errors='raise')
  return pd.to_datetime(col, errors='raise')
  return pd.to_datetime(col, errors='raise')
  return pd.to_datetime(col, errors='raise')
  return pd.to_datetime(col, errors='raise')
  return pd.to_datetime(col, errors='raise')
  return pd.to_datetime(col, errors='raise')
  return pd.to_datetime(col, errors='raise')
  return pd.to_datetime(col, errors='raise')
  return pd.to_datetime(col, errors='raise')
  return pd.to_datetime(col, errors='raise')
  return pd.to_datetime(col, errors='raise')
  return pd.to_datetime(col, errors='raise')
  return pd.to_datetime(col, errors='raise')
  return pd.to_datetime(col, errors='raise')
  return pd.to_datetime(col, errors='raise')
  return pd.to_datetime(col, errors='raise')
  return pd.to_datetime(col, errors='raise')
  return pd.to_datetime(col, errors='raise')
  return pd.to_datetime(col, errors='raise')
  return p

In [12]:
# Imprimimos la política cargada para verificar su contenido.
print("Política de Gobernanza Cargada:")
print(yaml.dump(policy, sort_keys=False, allow_unicode=True))

Política de Gobernanza Cargada:
dataset_metadata:
  columns: 85
  rows_sampled: 100
  generated_by: default_policy_generator
  generated_on: '2025-03-29T22:30:23.876640Z'
  data_source: inferred
  schema_version: '1.0'
  language: undetected
compliance:
  compliance_frameworks:
  - GDPR
  risk_level: high
enforcement_requirements:
  mandatory_rules:
  - no_nulls
  - valid_date_format
  required_fields: []
  privacy_enforcement: medium
  security_baseline: masked
  allowed_transparency:
  - internal
  - public
  risk_acceptance: medium
  framework_enforcement:
  - GDPR
fields:
- field_name: nombre_entidad
  type: string
  required: true
  rules: []
  privacy_level: high
  security: encrypted
  transparency: internal
  integrity:
    unique: false
    no_nulls: true
    consistent_format: true
    contains_outliers: false
  compliance_tags:
  - GDPR
  data_subject: true
  access_restriction: restricted
  retention_policy: default
  critical_field: false
  justification: Field 'nombre_ent

**4. Validación del Dataset con el Motor de Gobernanza**

Se utiliza la clase `GovernanceEngine` para aplicar las validaciones definidas en la política al DataFrame.

Se generan reportes que incluyen errores y advertencias detectadas en la ingesta.

In [14]:
# Crea una instancia del engine usando el nombre de política deseado
engine = GovernanceEngine(df, "s2_contracts.yaml")
    
# Ejecuta las validaciones y muestra las advertencias
warnings = engine.run_policy_checks()
if warnings:
    print("Warnings generated by GovernanceEngine:")
    for w in warnings:
        print(" -", w)
else:
    print("No warnings. The dataset complies with the policy.")

 - {'field': '__global__', 'issue': "Dataset risk level 'high' exceeds accepted 'medium'.", 'severity': 'error', 'execution_id': '3d56820c-b07e-472c-b0ce-1ba0cacae657', 'timestamp': '2025-03-29T22:31:23.990960'}


In [25]:
meta_logger.log({
    "step": "policy_application",
    "policy_name": "s2_contracts.yaml",
    "status": "applied"
})

2025-03-29 18:38:37,882 - ingestion.base.metadata_logger - DEBUG - Logged metadata: {'step': 'policy_application', 'policy_name': 's2_contracts.yaml', 'status': 'applied', 'uuid': 'b7d0d869-248f-45dd-9565-c3c59bcebf94', 'timestamp': '2025-03-29T22:38:37.882044+00:00'}


In [15]:
metrics_engine = MetricsEngine(df, "s2_contracts.yaml")
metrics_dict = metrics_engine.generate_quality_metrics()

In [None]:
meta_logger.log({
    "step": "governance_metrics",
    "status": "executed"
})

In [16]:
metrics_dict

{'nombre_entidad': {'null_percentage': np.float64(0.0),
  'type_match': True,
  'duplicate_percentage': 69.0,
  'uniqueness_rate': 0.31,
  'contains_outliers': False,
  'mean': None,
  'median': None,
  'std': None,
  'skewness': None,
  'percentiles': {},
  'outlier_percentage': None,
  'temporal_anomaly': None,
  'cardinality_ratio': 0.31,
  'security_compliant': np.False_,
  'relational_compliance': 'n/a',
  'field_quality_score': np.float64(86.2)},
 'nit_entidad': {'null_percentage': np.float64(0.0),
  'type_match': False,
  'duplicate_percentage': 69.0,
  'uniqueness_rate': 0.31,
  'contains_outliers': False,
  'mean': np.float64(1101537680.46),
  'median': np.float64(890751787.5),
  'std': np.float64(1329078600.2603405),
  'percentiles': {'25': np.float64(842000004.0),
   '50': np.float64(890751787.5),
   '75': np.float64(890980093.0)},
  'skewness': np.float64(5.615650057510579),
  'outlier_percentage': np.float64(3.0),
  'temporal_anomaly': None,
  'cardinality_ratio': None,
  

In [17]:
remediation_engine = RemediationEngine(df, "s2_contracts.yaml")
df_procesed = remediation_engine.run_remediation()

In [26]:
df_procesed.head()

Unnamed: 0,nombre_entidad,nit_entidad,departamento,ciudad,localizaci_n,orden,sector,rama,entidad_centralizada,proceso_de_compra,...,nombre_ordenador_del_gasto,tipo_de_documento_ordenador_del_gasto,n_mero_de_documento_ordenador_del_gasto,nombre_supervisor,tipo_de_documento_supervisor,n_mero_de_documento_supervisor,nombre_ordenador_de_pago,tipo_de_documento_ordenador_de_pago,n_mero_de_documento_ordenador_de_pago,fecha_de_notificaci_n_de_prorrogaci_n
0,SE9TUElUQUwgU0FOIEpVQU4gREUgRElPUyBERSBIT05EQS...,ODkwNzAwNjY2,Tolima,Honda,"Colombia, Tolima , Honda",Territorial,Salud y Protección Social,Corporación Autónoma,RGVzY2VudHJhbGl6YWRh,Q08xLkJET1MuMzY4MTIyMw==,...,MANUEL ALFONSO GONZaLEZ CANTOR,Cédula de Ciudadanía,NzkzOTMxNzI=,MARTHA LUCIA OSORIO RAMIREZ,Cédula de Ciudadanía,MzgyODEzMTg=,No definido,No definido,No definido,NaT
1,SE9TUElUQUwgREUgQ0FTVElMTEEgTEEgTlVFVkEgRU1QUk...,OTAwMDA0MDU5,Meta,Castilla La Nueva,"Colombia, Meta , Castilla La Nueva",Territorial,Salud y Protección Social,Corporación Autónoma,RGVzY2VudHJhbGl6YWRh,Q08xLkJET1MuMzY5MDUyOA==,...,ROSA MARIA JIMENEZ BAQUERO,Cédula de Ciudadanía,NDAzNzA4OTM=,LUIS ENRIQUE BARON TELLO,Cédula de Ciudadanía,MTEyMTgyNzM1Mw==,No definido,No definido,No definido,NaT
2,SE9TUElUQUwgU0FOIEpVQU4gREUgRElPUyBERSBIT05EQS...,ODkwNzAwNjY2,Tolima,Honda,"Colombia, Tolima , Honda",Territorial,Salud y Protección Social,Corporación Autónoma,RGVzY2VudHJhbGl6YWRh,Q08xLkJET1MuMzY3NzYyNA==,...,MANUEL ALFONSO GONZaLEZ CANTOR,Cédula de Ciudadanía,NzkzOTMxNzI=,ANGELA MARLEN SALGUERO RAMIREZ,Cédula de Ciudadanía,MTExMDUyNjM2NQ==,No definido,No definido,No definido,NaT
3,RU1QUkVTQSBTT0NJQUwgREVMIEVTVEFETyBIT1NQSVRBTC...,OTY0NDUwMjI2,Antioquia,Amalfi,"Colombia, Antioquia , Amalfi",Territorial,Salud y Protección Social,Ejecutivo,RGVzY2VudHJhbGl6YWRh,Q08xLkJET1MuMzY4ODM2NQ==,...,LICINIA DEL CARMEN RAVE BERMUDEZ,Cédula de Ciudadanía,NDI4NzY1MTY=,ANGEL ERNESTO FRANCO HENAO,Cédula de Ciudadanía,NzAyNTIxOTY=,No definido,No definido,No definido,NaT
4,TVVOSUNJUElPIERFIElUQUdVSQ==,ODkwOTgwMDkz,Antioquia,Itagui,"Colombia, Antioquia , Itagui",Territorial,"Vivienda, Ciudad y Territorio",Ejecutivo,Q2VudHJhbGl6YWRh,Q08xLkJET1MuMzY3MTQ0MQ==,...,No definido,No definido,Tm8gZGVmaW5pZG8=,No definido,No definido,Tm8gZGVmaW5pZG8=,No definido,No definido,No definido,NaT


In [None]:
meta_logger.log({
    "step": "remediation",
    "status": "executed"
})

In [18]:
metrics_engine_procesed = MetricsEngine(df_procesed, "s2_contracts.yaml")
metrics_dict_df_processed = metrics_engine_procesed.generate_quality_metrics()

  return np.nanmean(a, axis, out=out, keepdims=keepdims)
  return np.nanmean(a, axis, out=out, keepdims=keepdims)
  return np.nanmean(a, axis, out=out, keepdims=keepdims)
  return np.nanmean(a, axis, out=out, keepdims=keepdims)
  return np.nanmean(a, axis, out=out, keepdims=keepdims)
  return np.nanmean(a, axis, out=out, keepdims=keepdims)
  return np.nanmean(a, axis, out=out, keepdims=keepdims)
  return np.nanmean(a, axis, out=out, keepdims=keepdims)
  return np.nanmean(a, axis, out=out, keepdims=keepdims)
  return np.nanmean(a, axis, out=out, keepdims=keepdims)
  return np.nanmean(a, axis, out=out, keepdims=keepdims)
  return np.nanmean(a, axis, out=out, keepdims=keepdims)
  return np.nanmean(a, axis, out=out, keepdims=keepdims)


In [19]:
metrics_dict_df_processed

{'nombre_entidad': {'null_percentage': np.float64(0.0),
  'type_match': True,
  'duplicate_percentage': 69.0,
  'uniqueness_rate': 0.31,
  'contains_outliers': False,
  'mean': None,
  'median': None,
  'std': None,
  'skewness': None,
  'percentiles': {},
  'outlier_percentage': None,
  'temporal_anomaly': None,
  'cardinality_ratio': 0.31,
  'security_compliant': np.True_,
  'relational_compliance': 'n/a',
  'field_quality_score': np.float64(86.2)},
 'nit_entidad': {'null_percentage': np.float64(0.0),
  'type_match': False,
  'duplicate_percentage': 70.0,
  'uniqueness_rate': 0.3,
  'contains_outliers': False,
  'mean': nan,
  'median': np.float64(nan),
  'std': np.float64(nan),
  'percentiles': {'25': np.float64(nan),
   '50': np.float64(nan),
   '75': np.float64(nan)},
  'skewness': nan,
  'outlier_percentage': np.float64(0.0),
  'temporal_anomaly': None,
  'cardinality_ratio': None,
  'security_compliant': True,
  'relational_compliance': 'n/a',
  'field_quality_score': np.float64

In [None]:
meta_logger.log({
    "step": "governance_metrics",
    "status": "executed"
})

In [20]:
engine = IntelligentImprovementEngine(metrics_dict_df_processed, metrics_engine.policy)
recommendations = engine.generate_improvement_recommendations()

print("Global Recommendations:")
for rec in recommendations["global"]:
    print(" -", rec)
for field, recs in recommendations["fields"].items():
    print(f"Field '{field}':")
    for r in recs:
        print("   -", r)

Global Recommendations:
 - La calidad global es baja; se recomienda una revisión integral de la ingesta.
Field 'nombre_entidad':
   - Optimizar 'duplicate_percentage' podría incrementar el score en hasta 13.8 puntos.
Field 'nit_entidad':
   - Optimizar 'type_match' podría incrementar el score en hasta 20.0 puntos.
   - Optimizar 'duplicate_percentage' podría incrementar el score en hasta 14.0 puntos.
Field 'departamento':
   - Optimizar 'duplicate_percentage' podría incrementar el score en hasta 17.4 puntos.
Field 'ciudad':
   - Optimizar 'duplicate_percentage' podría incrementar el score en hasta 13.8 puntos.
Field 'localizaci_n':
   - Optimizar 'duplicate_percentage' podría incrementar el score en hasta 13.8 puntos.
Field 'orden':
   - Optimizar 'duplicate_percentage' podría incrementar el score en hasta 19.6 puntos.
Field 'sector':
   - Optimizar 'duplicate_percentage' podría incrementar el score en hasta 19.4 puntos.
Field 'rama':
   - Optimizar 'duplicate_percentage' podría increm

**5. Registro de Metadata y Auditoría**
 
Utilizamos la clase `MetadataLogger` para registrar la metadata del proceso de ingesta, incluyendo:

- Información del loader (número de filas, estado, filtros aplicados, etc.)

- Reporte de gobernanza (errores y advertencias)

La metadata se guarda en un archivo Parquet para su posterior auditoría y seguimiento.

In [17]:
meta_logger.save()

2025-03-28 08:00:50,830 - ingestion.base.metadata_logger - INFO - Metadata log saved to reports/demo_socrata.parquet


Metadata registrada y audit log guardado.
