# Download Chicago Taxi Trips

## Install Google CLI

```bash
# install the client
curl -O https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-cli-linux-x86_64.tar.gz
tar -xf google-cloud-cli-linux-x86_64.tar.gz
./google-cloud-sdk/install.sh

# authenticate
./google-cloud-sdk/bin/gcloud init
./google-cloud-sdk/bin/gcloud auth application-default login
```

In [1]:
!pip install --upgrade "google-cloud-bigquery[pandas,storage]" google-cloud-bigquery-storage pandas db-dtypes



In [2]:
import time
from datetime import datetime

import pandas as pd
import numpy as np 
from google.cloud import bigquery

## 0. BigQuery Helper

In [3]:
import time
import pandas as pd
import numpy as np  # FIX: Added missing numpy import
from google.cloud import bigquery

class BigQueryHelper(object):
    """
    Helper class to simplify common BigQuery tasks.
    It can interact with datasets in any project, while billing and job execution
    are handled by a specified billing project.
    """

    def __init__(self, billing_project, data_project, dataset_name, max_wait_seconds=180):
        """
        Args:
            billing_project (str): Your Google Cloud Project ID that will be billed for queries.
            data_project (str): The project ID where the dataset you want to query resides.
                                For public data, this is often 'bigquery-public-data'.
            dataset_name (str): The name of the dataset.
            max_wait_seconds (int): Max time to wait for a query to complete.
        """
        # FIX: Renamed for clarity and to distinguish between the two project roles.
        self.billing_project = billing_project
        self.data_project = data_project
        self.dataset_name = dataset_name
        
        self.max_wait_seconds = max_wait_seconds
        
        # The client ALWAYS uses your project for billing and job execution.
        self.client = bigquery.Client(project=self.billing_project)
        
        # The dataset reference MUST point to the project where the data lives.
        # FIX: The `project` argument now correctly uses `data_project`.
        self.__dataset_ref = self.client.dataset(self.dataset_name, project=self.data_project)
        
        self.dataset = None
        self.tables = {}  # {table name (str): table object}
        self.__table_refs = {}  # {table name (str): table reference}
        self.total_gb_used_net_cache = 0
        self.BYTES_PER_GB = 2**30

    def __fetch_dataset(self):
        """Lazy loading of dataset."""
        if self.dataset is None:
            self.dataset = self.client.get_dataset(self.__dataset_ref)

    def __fetch_table(self, table_name):
        """Lazy loading of table."""
        self.__fetch_dataset()
        if table_name not in self.__table_refs:
            self.__table_refs[table_name] = self.dataset.table(table_name)
        if table_name not in self.tables:
            self.tables[table_name] = self.client.get_table(self.__table_refs[table_name])

    def __handle_record_field(self, row, schema_details, top_level_name=''):
        """Unpack a single row, including any nested fields."""
        name = row['name']
        if top_level_name != '':
            name = top_level_name + '.' + name
        schema_details.append([{
            'name': name,
            'type': row['type'],
            'mode': row['mode'],
            # FIX: Replaced deprecated pd.np.nan with np.nan
            'fields': np.nan,
            'description': row['description']
        }])
        if not isinstance(row.get('fields'), list):
            return
        for entry in row['fields']:
            self.__handle_record_field(entry, schema_details, name)

    def __unpack_all_schema_fields(self, schema):
        """Unrolls nested schemas to a DataFrame."""
        schema_details = []
        schema.apply(lambda row: self.__handle_record_field(row, schema_details), axis=1)
        result = pd.concat([pd.DataFrame.from_dict(x) for x in schema_details])
        result.reset_index(drop=True, inplace=True)
        del result['fields']
        return result

    def table_schema(self, table_name):
        """Get the schema for a specific table, unrolling nested fields."""
        self.__fetch_table(table_name)
        raw_schema = self.tables[table_name].schema
        schema = pd.DataFrame.from_dict([x.to_api_repr() for x in raw_schema])
        if 'fields' in schema.columns:
            schema = self.__unpack_all_schema_fields(schema)
        if 'description' not in schema.columns:
            schema['description'] = None
        schema = schema[['name', 'type', 'mode', 'description']]
        return schema

    def list_tables(self):
        """List the names of the tables in the dataset."""
        self.__fetch_dataset()
        return [x.table_id for x in self.client.list_tables(self.dataset)]

    def estimate_query_size(self, query):
        """Estimate gigabytes scanned by a query without running it."""
        my_job_config = bigquery.job.QueryJobConfig()
        my_job_config.dry_run = True
        my_job = self.client.query(query, job_config=my_job_config)
        return my_job.total_bytes_processed / self.BYTES_PER_GB

    def query_to_pandas(self, query):
        """Execute a SQL query and return a pandas DataFrame."""
        my_job = self.client.query(query)
        start_time = time.time()
        # Custom wait loop to provide a timeout
        while not my_job.done():
            if (time.time() - start_time) > self.max_wait_seconds:
                print("Max wait time elapsed, query cancelled.")
                self.client.cancel_job(my_job.job_id)
                return None
            time.sleep(0.1)
        
        # This will raise the exception if the job failed.
        df = my_job.to_dataframe()

        if my_job.total_bytes_billed:
            self.total_gb_used_net_cache += my_job.total_bytes_billed / self.BYTES_PER_GB
        return df

    def query_to_pandas_safe(self, query, max_gb_scanned=1):
        """Run a query only if its estimated scan size is within a limit."""
        query_size = self.estimate_query_size(query)
        if query_size <= max_gb_scanned:
            return self.query_to_pandas(query)
        msg = f"Query cancelled; estimated size of {query_size:.4f} GB exceeds limit of {max_gb_scanned} GB"
        print(msg)
        return None

    def head(self, table_name, num_rows=5, start_index=None, selected_columns=None):
        """Get the first n rows of a table as a DataFrame."""
        self.__fetch_table(table_name)
        active_table = self.tables[table_name]
        
        schema_subset = None
        if selected_columns:
            schema_subset = [col for col in active_table.schema if col.name in selected_columns]
            
        rows = self.client.list_rows(
            active_table, 
            selected_fields=schema_subset,
            max_results=num_rows, 
            start_index=start_index
        )
        
        # Convert the row iterator to a DataFrame
        return rows.to_dataframe()

## 1. Configuration

In [4]:
output_path = "/data/data/taxi_trips"
project_id = "mlrun-trino"  # replace with your GCP project ID

## 2. BigQuery Connection

In [5]:
bq_assistant = BigQueryHelper(billing_project="mlrun-trino",
                              data_project="bigquery-public-data",
                              dataset_name="chicago_taxi_trips")

In [6]:
# confirm the connection is up by listing the tables
bq_assistant.list_tables()

['taxi_trips']

## 3. Preview Data

In [7]:
bq_assistant.head("taxi_trips", num_rows=3)

Unnamed: 0,unique_key,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_census_tract,dropoff_census_tract,pickup_community_area,dropoff_community_area,...,extras,trip_total,payment_type,company,pickup_latitude,pickup_longitude,pickup_location,dropoff_latitude,dropoff_longitude,dropoff_location
0,9f4d762848799be329e007ae695297ddf2ec9e68,53b9f89457dbe76abd2c5d0d7f00e0c2bf48a1e1035122...,2019-09-09 15:00:00+00:00,2019-09-09 15:15:00+00:00,1279,14.39,,,,,...,24.0,60.75,Credit Card,Flash Cab,,,,,,
1,9f28593d41d91fcf46c81ab9395c2dc8a6a26c9a,a7108907c04f278905344870b79e80244259d14f1335b5...,2019-09-04 07:15:00+00:00,2019-09-04 07:45:00+00:00,1920,14.4,,,,,...,0.0,44.2,Credit Card,Star North Management LLC,,,,,,
2,9dd67852ff43243a3121d24ea21ac5f84dd798a1,81d652b4d1a83430b2ecbdd8f126b135f9f37b6115b7a5...,2019-09-05 07:15:00+00:00,2019-09-05 07:30:00+00:00,459,1.0,,,,,...,0.0,6.5,Cash,Taxi Affiliation Service Yellow,,,,,,


In [8]:
bq_assistant.table_schema("taxi_trips")

Unnamed: 0,name,type,mode,description
0,unique_key,STRING,REQUIRED,Unique identifier for the trip.
1,taxi_id,STRING,REQUIRED,A unique identifier for the taxi.
2,trip_start_timestamp,TIMESTAMP,NULLABLE,"When the trip started, rounded to the nearest ..."
3,trip_end_timestamp,TIMESTAMP,NULLABLE,"When the trip ended, rounded to the nearest 15..."
4,trip_seconds,INTEGER,NULLABLE,Time of the trip in seconds.
5,trip_miles,FLOAT,NULLABLE,Distance of the trip in miles.
6,pickup_census_tract,INTEGER,NULLABLE,The Census Tract where the trip began. For pri...
7,dropoff_census_tract,INTEGER,NULLABLE,The Census Tract where the trip ended. For pri...
8,pickup_community_area,INTEGER,NULLABLE,The Community Area where the trip began.
9,dropoff_community_area,INTEGER,NULLABLE,The Community Area where the trip ended.


## 4. Download Chicago Taxi Trips

In [9]:
# ---- parameters you can tweak ----
TABLE_FQN = "`bigquery-public-data.chicago_taxi_trips.taxi_trips`"
START = pd.Timestamp("2023-01-01")
END   = pd.Timestamp.today().normalize()  # up to today; change if you want a fixed cutoff
OUTDIR = output_path
# BigQuery billed bytes guard per chunk; raise/lower if needed
MAX_GB_SCANNED = 100   # per month-chunk limit for query_to_pandas_safe
# ---------------------------------

In [10]:
months = pd.period_range(START, END, freq="M")
saved = []

for p in months:
    month_start = pd.Timestamp(p.start_time)
    month_end   = (month_start + pd.offsets.MonthBegin(1))  # exclusive upper bound

    query = f"""
    SELECT *
    FROM {TABLE_FQN}
    WHERE trip_start_timestamp >= TIMESTAMP('{month_start.strftime("%Y-%m-%d")} 00:00:00')
      AND trip_start_timestamp <  TIMESTAMP('{month_end.strftime("%Y-%m-%d")} 00:00:00')
    """

    # Pull this month's rows into a DataFrame (safeguarded by MAX_GB_SCANNED)
    df = bq_assistant.query_to_pandas_safe(query, max_gb_scanned=MAX_GB_SCANNED)

    # Skip empty months gracefully
    if df is None or len(df) == 0:
        print(f"[skip] {p} -> no rows returned")
        continue

    # Write compressed parquet (gzip) into /kaggle/working
    # File name like: chicago_taxi_2023_01.parquet.gzip
    out_path = f"{OUTDIR}/chicago_taxi_{p.year:04d}_{p.month:02d}.parquet"
    df.to_parquet(out_path,compression="snappy")
    print(f"[ok] saved {out_path} (rows={len(df)})")
    saved.append(out_path)

[ok] saved /data/data/taxi_trips/chicago_taxi_2023_01.parquet (rows=414457)
[ok] saved /data/data/taxi_trips/chicago_taxi_2023_02.parquet (rows=446724)
[ok] saved /data/data/taxi_trips/chicago_taxi_2023_03.parquet (rows=585160)
[ok] saved /data/data/taxi_trips/chicago_taxi_2023_04.parquet (rows=570241)
[ok] saved /data/data/taxi_trips/chicago_taxi_2023_05.parquet (rows=592701)
[ok] saved /data/data/taxi_trips/chicago_taxi_2023_06.parquet (rows=625645)
[ok] saved /data/data/taxi_trips/chicago_taxi_2023_07.parquet (rows=548629)
[ok] saved /data/data/taxi_trips/chicago_taxi_2023_08.parquet (rows=553854)
[ok] saved /data/data/taxi_trips/chicago_taxi_2023_09.parquet (rows=558682)
[ok] saved /data/data/taxi_trips/chicago_taxi_2023_10.parquet (rows=606474)
[ok] saved /data/data/taxi_trips/chicago_taxi_2023_11.parquet (rows=527795)


Forbidden: 403 Quota exceeded: Your project exceeded quota for free query bytes scanned. For more information, see https://cloud.google.com/bigquery/docs/troubleshoot-quotas; reason: quotaExceeded, location: unbilled.analysis, message: Quota exceeded: Your project exceeded quota for free query bytes scanned. For more information, see https://cloud.google.com/bigquery/docs/troubleshoot-quotas

Location: US
Job ID: 14522ada-301d-4b2c-8459-5c8b5e65ad6f


In [None]:
# Optional: manifest of all part files
pd.Series(saved).to_csv(f"{OUTDIR}/chicago_taxi_2023_onward_manifest.txt", index=False)
print(f"\nWrote manifest with {len(saved)} files to {OUTDIR}/chicago_taxi_2023_onward_manifest.txt")