In [3]:
import requests
from datetime import datetime
import requests
import json
import logging

In [None]:
from google.cloud import storage
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.google.cloud.transfers.http_to_gcs import HttpToGcsOperator
from airflow.providers.google.cloud.transfers.gcs_to_local import GoogleCloudStorageToLocaFileSystemOperator
from datetime import datetime
import requests
import json
import logging

# Configura las credenciales de Google Cloud Platform en Airflow antes de ejecutar este DAG

def check_for_new_data(year):
    url = f'https://datosabiertos.compraspublicas.gob.ec/PLATAFORMA/api/search_ocds?page=1&year={year}'
    try:
        response = requests.get(url)
        response.raise_for_status()  # Verificar si la solicitud HTTP es exitosa
        data = response.json()
        total_pages = data['pages']
        
        # Puedes comparar total_pages con el valor almacenado previamente para el año
        # Si hay un cambio, significa que hay nuevas páginas de datos
        # Actualiza el valor almacenado si es necesario
        logging.info(f'Year: {year}, Total Pages: {total_pages}')
    except requests.exceptions.RequestException as e:
        logging.error(f'Error en la solicitud HTTP: {str(e)}')

def download_data(year, task_instance, batch_size=1000):
    bucket_name = 'airflow_iackathon'
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    
    url_base = 'https://datosabiertos.compraspublicas.gob.ec/PLATAFORMA/api/search_ocds?page={page}&year={year}'
    try:
        response = requests.get(url_base.format(page=1, year=year))
        response.raise_for_status()
        data = response.json()
        total_pages = data['pages']
        
        current_batch = []  # Almacena datos de la página actual
        for page in range(1, total_pages + 1):
            page_url = url_base.format(page=page, year=year)
            page_response = requests.get(page_url)
            page_response.raise_for_status()
            page_data = page_response.json()
            current_batch.extend(page_data['data'])
            
            if len(current_batch) >= batch_size or page == total_pages:
                # Sube el archivo JSON de la página actual al final del lote o si es la última página
                object_name = f'data/{year}_pages_{page - len(current_batch) + 1}_to_{page}.json'
                blob = bucket.blob(object_name)
                blob.upload_from_string(json.dumps(current_batch), content_type='application/json')
                logging.info(f'Uploaded batch {page - len(current_batch) + 1} to {page} to GCS: {object_name}')
                current_batch = []  # Reinicia el lote actual
        
        logging.info(f'Downloaded and uploaded data in batches for year {year}')
    except requests.exceptions.RequestException as e:
        logging.error(f'Error en la solicitud HTTP: {str(e)}')


dag = DAG(
    'data_download_dag',
    schedule_interval='@daily',  # Configura el horario de ejecución según tus necesidades
    start_date=datetime(2023, 1, 1),  # Fecha de inicio
    catchup=False,  # Evita ponerse al día en la ejecución
)

check_for_new_data_task = PythonOperator(
    task_id='check_for_new_data',
    python_callable=check_for_new_data,
    op_args=['{{ execution_date.year }}'],  # Año actual
    dag=dag,
)

download_data_task = PythonOperator(
    task_id='download_data',
    python_callable=download_data,
    op_args=['{{ execution_date.year }}'],
    provide_context=True,
    dag=dag,
)

upload_to_gcs_task = HttpToGcsOperator(
    task_id='upload_to_gcs',
    endpoint='/upload',  # Endpoint de GCS para subir el archivo
    bucket_name='your-gcs-bucket-name',
    object_name="{{ task_instance.xcom_pull(task_ids='download_data', key='gcs_object_name') }}",
    mime_type='application/json',
    google_cloud_storage_conn_id='google_cloud_default',  # Debes configurar la conexión a GCS en Airflow
    download_path="{{ task_instance.xcom_pull(task_ids='download_data', key='local_file_path') }}",
    task_id='upload_to_gcs',
    dag=dag,
)

check_for_new_data_task >> download_data_task >> upload_to_gcs_task
