<a href="https://colab.research.google.com/github/RayRayKing/blablacar/blob/main/BlablaCar_Case_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [9]:
import requests
import json
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from airflow.decorators import dag, task

# Retry logic with exponential backoff
@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=2, max=60),  # Exponential backoff: 2, 4, 8, 16, 32 seconds
    retry=retry_if_exception_type(requests.exceptions.RequestException)  # Retry on HTTP request errors
)
@task
def get_data() -> str :
    """returns filename that has """


    base_url = "http://v0.ovapi.nl/"
    endpoint = "line/"

    try:
        response = requests.get(base_url + endpoint)
        response.raise_for_status()  # Check for HTTP errors
        data = response.json()
        ## Convert to Ndjson for easier load to BQ.
        # Also allows handling of scd within down stream transformation instead of touching the EL process.
        # less schema enforcement with this method.


        output_file = 'processed_json_data.njson'
        with open(output_file, 'w') as f:
            for line_id, line_info in data.items():
                # Write each JSON object on a new line
                json.dump({line_id: line_info}, f)
                f.write('\n')  # Add newline after each JSON object
        print(f"NDJSON data saved to {output_file}")

        ## In real world, file would be saved to cloud (GSC bucket and returns gsi)
        return output_file

    except requests.exceptions.RequestException as e:
        print(f"Error fetching data from the API: {e}")
        return None




In [15]:
##DML for BQ
CREATE TABLE IF NOT EXISTS dataset_name.nl_lines_data_raw (
    id STRING,
    line_data JSON
) PARTITION BY DATE(_PARTITIONTIME);


Error fetching data from the API: 504 Server Error: Gateway Time-out for url: http://v0.ovapi.nl/line/


In [16]:
#DDL For inserting to BQ

import json
from google.cloud import bigquery

def insert_to_bq(gsi_filepath, dataset_id, table_id):
    """
    Function to load data into BigQuery.
    """
    # Initialize BigQuery client
    client = bigquery.Client()

    # Define the dataset and table reference
    dataset_ref = client.dataset(dataset_id)
    table_ref = dataset_ref.table(table_id)

    # Define job configuration for loading JSON data
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
        autodetect=True,  # Automatically detect schema
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND,  # Append to table
    )

    # Create a job to load the data
    job = client.load_table_from_uri(
        gsi_filepath,
        table_ref,
        job_config=job_config
    )

    # Wait for the job to complete
    job.result()


# IDOMPOTENCY/ ACID
# In this particular case, the goal is to delete partions and replace them for large sets of data. This method is a bit more cost saving.
# an alternative is to use upserts/merge statemnents to handle the idompotency, but for larger data sets, it requires full table/partition scans.
def delete_reloading_data(start_date,end_date,dataset_table,additional_filter=None):
  """ Deletes data from the same loading timeframe (Date-based setup) """
  client = bigquery.Client()
  query = (
      f"""
      DELETE FROM {dataset_table}
      WHERE _PARTITIONTIME BETWEEN '{start_date}' AND '{end_date}' {additional_filter}
      """
  )
  client.query(query)


In [None]:
from airflow import DAG
from airflow.providers.python_operator import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.utils.dates import days_ago
import os

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'retries': 1,
}

with DAG(
    'bq_load',
    default_args=default_args,
    description='loading data into bq',
    schedule_interval=None #chron time setup
) as dag:


  # Task 1: get data & convert to ndjson & upload file to GCS
  # Task creation via task api

  # Task 2: create idempotency by deleting any possible similar timeframe data
  delete_reloading_task = PythonOperator(
      task_id='delete_reloading_data',
      python_callable=delete_reloading_data,
      op_kwargs={
          'start_date': 'YYYY-MM-DD',
          'end_date': 'YYYY-MM-DD',
          'dataset_table': 'dataset_name.nl_lines_data_raw',
          'additional_filter': 'AND column_name = "value"'  # Optional filter
      }
  )
  # Task 3:
  insert_to_bq_task = PythonOperator(
    task_id='insert_to_bq',
    python_callable=insert_to_bq,
    op_kwargs={
        'gsi_filepath': gs_filename,
        'dataset_id': 'dataset_name',
        'table_id': 'nl_lines_data_raw'
    })

  # Define task dependencies
  gs_filename = get_data()
  gs_filename >> delete_reloading_task >> insert_to_bq_task