<a href="https://colab.research.google.com/github/KawiraSharon/.io/blob/main/Project_flow.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#Extraction transformation and loading yaml file
id: nycTaxi
namespace: kestra-task

tasks:
  - id: extracting_nyc_taxi
    type: io.kestra.plugin.scripts.python.Script
    taskRunner:
      type: io.kestra.plugin.core.runner.Process
    beforeCommands:
      - python3 -m venv .venv
      - . .venv/bin/activate
      - pip install pandas numpy requests pyarrow
    outputFiles:
      - nyc_taxi_data.csv
    warningOnStdErr: false
    script: |
      import pandas as pd
      import numpy as np
      import requests
      import pyarrow
      import re
      from io import BytesIO

      #Data extraction
      #Setting Kaggle API credentials
      KAGGLE_USERNAME = "sharonmungania"
      KAGGLE_KEY = "c4942b83fcf67954e6e6a6709feecdea"

      #Downloading the zip file directly
      url = "https://www.kaggle.com/api/v1/datasets/download/albertjavier/yellow-tripdata-2023"
      headers = {"Authorization": f"Bearer {KAGGLE_KEY}"}
      response = requests.get(url, headers=headers, stream=True)

      #Extracting the CSV from the ZIP file since Kaggle downloads datasets as ZIP
      from zipfile import ZipFile
      with ZipFile(BytesIO(response.content)) as zip_file:
        with zip_file.open("yellow_tripdata_2023-01.parquet") as parquet_file:
            df = pd.read_parquet(parquet_file)
      #Saving data to CSV file
      df.to_csv('nyc_taxi_data.csv', index=False)
  - id: data_transformation
    type: io.kestra.plugin.scripts.python.Script
    outputFiles:
      - transformed_data.csv
    taskRunner:
      type: io.kestra.plugin.core.runner.Process
    beforeCommands:
      - python3 -m venv .venv
      - . .venv/bin/activate
      - pip install pandas
    warningOnStdErr: false
    script: |
      import pandas as pd
      #Loading car sases data from the CSV file
      df = pd.read_csv('{{ outputs.extracting_nyc_taxi.outputFiles["nyc_taxi_data.csv"] }}')
      # removing ghost trips and trips without passengers
      new_df = df[(df['passenger_count'] > 0) & (df['trip_distance'] > 0)].copy()
      # normalize datetime columns
      new_df['tpep_pickup_datetime'] = pd.to_datetime(new_df['tpep_pickup_datetime'])
      new_df['tpep_dropoff_datetime'] = pd.to_datetime(new_df['tpep_dropoff_datetime'])
      # creating column trip_duration for visualization
      new_df['trip_duration'] = (new_df['tpep_dropoff_datetime'] - new_df  ['tpep_pickup_datetime']).dt.total_seconds()/60
      # removing trips longer than 180 mins/3hrs
      new_df = new_df[new_df['trip_duration'] <= 180]
      new_df = new_df.sample(n=20000, random_state=42)
      new_df.to_csv('transformed_data.csv', index=False)
  - id: loading_nyc_taxi_data
    type: io.kestra.plugin.scripts.python.Script
    taskRunner:
      type: io.kestra.plugin.core.runner.Process
    beforeCommands:
      - python3 -m venv .venv
      - . .venv/bin/activate
      - pip install pandas psycopg2-binary

    warningOnStdErr: false
    script: |
      import pandas as pd
      import psycopg2
      from psycopg2.extras import execute_batch

      # Load the transformed data from the stored CSV
      df = pd.read_csv('{{ outputs.data_transformation.outputFiles["transformed_data.csv"] }}')

      # Connect to the PostgreSQL database
      conn = psycopg2.connect(
          host="postgres",
          database="kestra",
          user="kestra",
          password="k3str4"
      )
      cursor = conn.cursor()

      # Create the table if it doesn't exist
      create_table_query = """
      CREATE TABLE IF NOT EXISTS nyc_taxi_trips (
          VendorID INT,
          tpep_pickup_datetime TIMESTAMP WITH TIME ZONE,
          tpep_dropoff_datetime TIMESTAMP WITH TIME ZONE,
          passenger_count INT,
          trip_distance FLOAT,
          RatecodeID INT,
          store_and_fwd_flag VARCHAR,
          PULocationID INT,
          DOLocationID INT,
          payment_type INT,
          fare_amount FLOAT,
          extra FLOAT,
          mta_tax FLOAT,
          tip_amount FLOAT,
          tolls_amount FLOAT,
          improvement_surcharge FLOAT,
          total_amount FLOAT,
          congestion_surcharge FLOAT,
          airport_fee FLOAT,
          trip_duration INT
      )
      """
      cursor.execute(create_table_query)
      conn.commit()

      # Prepare data for bulk insert
      data_to_insert = [
          (
              row['VendorID'], row['tpep_pickup_datetime'], row['tpep_dropoff_datetime'],
              row['passenger_count'], row['trip_distance'], row['RatecodeID'],
              row['store_and_fwd_flag'], row['PULocationID'], row['DOLocationID'],
              row['payment_type'], row['fare_amount'], row['extra'], row['mta_tax'],
              row['tip_amount'], row['tolls_amount'], row['improvement_surcharge'],
              row['total_amount'], row['congestion_surcharge'], row['airport_fee'],
              row['trip_duration']
          )
          for index, row in df.iterrows()
      ]

      insert_query = """
      INSERT INTO nyc_taxi_trips (
          VendorID, tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance,
          RatecodeID, store_and_fwd_flag, PULocationID, DOLocationID, payment_type, fare_amount,
          extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount,
          congestion_surcharge, airport_fee, trip_duration
      ) VALUES (
          %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
      )
      """
      execute_batch(cursor, insert_query, data_to_insert)
      conn.commit()

