diff --git a/datasets/new_york_taxi_trips/_images/run_csv_transform_kub/Dockerfile b/datasets/new_york_taxi_trips/_images/run_csv_transform_kub/Dockerfile new file mode 100644 index 000000000..7265a1b71 --- /dev/null +++ b/datasets/new_york_taxi_trips/_images/run_csv_transform_kub/Dockerfile @@ -0,0 +1,37 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The base image for this build +FROM python:3.8 + +# Allow statements and log messages to appear in Cloud logs +ENV PYTHONUNBUFFERED True + +# Copy the requirements file into the image +COPY requirements.txt ./ + +# Install the packages specified in the requirements file +RUN python3 -m pip install --no-cache-dir -r requirements.txt + +# The WORKDIR instruction sets the working directory for any RUN, CMD, +# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. +# If the WORKDIR doesn’t exist, it will be created even if it’s not used in +# any subsequent Dockerfile instruction +WORKDIR /custom + +# Copy the specific data processing script/s in the image under /custom/* +COPY ./csv_transform.py . + +# Command to run the data processing script when the container is run +CMD ["python3", "csv_transform.py"] diff --git a/datasets/new_york_taxi_trips/_images/run_csv_transform_kub/csv_transform.py b/datasets/new_york_taxi_trips/_images/run_csv_transform_kub/csv_transform.py new file mode 100644 index 000000000..775e691fb --- /dev/null +++ b/datasets/new_york_taxi_trips/_images/run_csv_transform_kub/csv_transform.py @@ -0,0 +1,423 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import os +import pathlib +import typing +from datetime import datetime + +import pandas as pd +import requests +from google.cloud import bigquery, storage +from google.cloud.exceptions import NotFound + + +def main( + source_url: str, + source_file: pathlib.Path, + target_file: pathlib.Path, + project_id: str, + dataset_id: str, + table_id: str, + schema_path: str, + chunksize: str, + target_gcs_bucket: str, + target_gcs_path: str, + pipeline_name: str, + input_headers: typing.List[str], + data_dtypes: dict, + output_headers: typing.List[str], +) -> None: + logging.info(f"New York taxi trips - {pipeline_name} process started") + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + execute_pipeline( + source_url, + str(source_file), + str(target_file), + project_id, + dataset_id, + table_id, + schema_path, + chunksize, + target_gcs_bucket, + target_gcs_path, + pipeline_name, + input_headers, + data_dtypes, + output_headers, + ) + logging.info(f"New York taxi trips - {pipeline_name} process completed") + + +def execute_pipeline( + source_url: str, + source_file: str, + target_file: str, + project_id: str, + dataset_id: str, + table_id: str, + schema_path: str, + chunksize: str, + target_gcs_bucket: str, + target_gcs_path: str, + pipeline_name: str, + input_headers: typing.List[str], + data_dtypes: dict, + output_headers: typing.List[str], +) -> None: + for year_number in range(datetime.now().year, (datetime.now().year - 6), -1): + target_file_name = str.replace(target_file, ".csv", f"_{year_number}.csv") + process_year_data( + source_url, + int(year_number), + source_file, + target_file, + target_file_name, + project_id, + dataset_id, + table_id, + schema_path, + chunksize, + target_gcs_bucket, + target_gcs_path, + pipeline_name, + input_headers, + data_dtypes, + output_headers, + ) + + +def process_year_data( + source_url: str, + year_number: int, + source_file: str, + target_file: str, + target_file_name: str, + project_id: str, + dataset_id: str, + table_id: str, + schema_path: str, + chunksize: str, + target_gcs_bucket: str, + target_gcs_path: str, + pipeline_name: str, + input_headers: typing.List[str], + data_dtypes: dict, + output_headers: typing.List[str], +) -> None: + logging.info(f"Processing year {year_number}") + destination_table = f"{table_id}_{year_number}" + year_data_available = False + for month_number in range(1, 13): + month_data_available = process_month( + source_url, + year_number, + month_number, + source_file, + target_file, + target_file_name, + chunksize, + input_headers, + data_dtypes, + output_headers, + pipeline_name, + ) + if month_data_available: + year_data_available = True + else: + pass + if os.path.exists(target_file_name) and year_data_available: + upload_file_to_gcs( + target_file_name, + target_gcs_bucket, + str(target_gcs_path).replace(".csv", f"_{year_number}.csv"), + ) + create_dest_table( + project_id, dataset_id, destination_table, schema_path, target_gcs_bucket + ) + load_data_to_bq(project_id, dataset_id, destination_table, target_file_name) + else: + logging.info( + f"Informational: The data file {target_file_name} was not generated because no data was available for year {year_number}. Continuing." + ) + logging.info(f"Processing year {year_number} completed") + + +def load_data_to_bq( + project_id: str, dataset_id: str, table_id: str, file_path: str +) -> None: + logging.info( + f"Loading data from {file_path} into {project_id}.{dataset_id}.{table_id} started" + ) + client = bigquery.Client(project=project_id) + table_ref = client.dataset(dataset_id).table(table_id) + job_config = bigquery.LoadJobConfig() + job_config.source_format = bigquery.SourceFormat.CSV + job_config.skip_leading_rows = 1 # ignore the header + job_config.autodetect = False + with open(file_path, "rb") as source_file: + job = client.load_table_from_file(source_file, table_ref, job_config=job_config) + job.result() + logging.info( + f"Loading data from {file_path} into {project_id}.{dataset_id}.{table_id} completed" + ) + + +def create_dest_table( + project_id: str, + dataset_id: str, + table_id: str, + schema_filepath: list, + bucket_name: str, +) -> bool: + table_ref = f"{project_id}.{dataset_id}.{table_id}" + logging.info(f"Attempting to create table {table_ref} if it doesn't already exist") + client = bigquery.Client() + success = False + try: + table_exists_id = client.get_table(table_ref).table_id + logging.info(f"Table {table_exists_id} currently exists.") + success = True + except NotFound: + logging.info( + ( + f"Table {table_ref} currently does not exist. Attempting to create table." + ) + ) + schema = create_table_schema([], bucket_name, schema_filepath) + table = bigquery.Table(table_ref, schema=schema) + client.create_table(table) + print(f"Table {table_ref} was created".format(table_id)) + success = True + return success + + +def create_table_schema( + schema_structure: list, bucket_name: str = "", schema_filepath: str = "" +) -> list: + logging.info(f"Defining table schema... {bucket_name} ... {schema_filepath}") + schema = [] + if not (schema_filepath): + schema_struct = schema_structure + else: + storage_client = storage.Client() + bucket = storage_client.get_bucket(bucket_name) + blob = bucket.blob(schema_filepath) + schema_struct = json.loads(blob.download_as_string(client=None)) + for schema_field in schema_struct: + fld_name = schema_field["name"] + fld_type = schema_field["type"] + try: + fld_descr = schema_field["description"] + except KeyError: + fld_descr = "" + fld_mode = schema_field["mode"] + schema.append( + bigquery.SchemaField( + name=fld_name, field_type=fld_type, mode=fld_mode, description=fld_descr + ) + ) + return schema + + +def process_month( + source_url: str, + year_number: int, + month_number: int, + source_file: str, + target_file: str, + target_file_name: str, + chunksize: str, + input_headers: typing.List[str], + data_dtypes: dict, + output_headers: typing.List[str], + pipeline_name: str, +) -> None: + process_month = str(year_number) + "-" + str(month_number).zfill(2) + logging.info(f"Processing {process_month} started") + source_url_to_process = f"{source_url}{process_month}.csv" + source_file_to_process = str(source_file).replace(".csv", f"_{process_month}.csv") + successful_download = download_file(source_url_to_process, source_file_to_process) + if successful_download: + with pd.read_csv( + source_file_to_process, + engine="python", + encoding="utf-8", + quotechar='"', + chunksize=int(chunksize), + sep=",", + names=input_headers, + skiprows=1, + dtype=data_dtypes, + ) as reader: + for chunk_number, chunk in enumerate(reader): + logging.info( + f"Processing chunk #{chunk_number} of file {process_month} started" + ) + target_file_batch = str(target_file).replace( + ".csv", f"-{process_month}-{chunk_number}.csv" + ) + df = pd.DataFrame() + df = pd.concat([df, chunk]) + process_chunk( + df, + target_file_batch, + target_file_name, + month_number == 1 and chunk_number == 0, + month_number == 1 and chunk_number == 0, + output_headers, + pipeline_name, + ) + logging.info( + f"Processing chunk #{chunk_number} of file {process_month} completed" + ) + logging.info(f"Processing {process_month} completed") + return successful_download + + +def download_file(source_url: str, source_file: pathlib.Path) -> bool: + logging.info(f"Downloading {source_url} into {source_file}") + success = True + r = requests.get(source_url, stream=True) + with open(source_file, "wb") as f: + for chunk in r: + f.write(chunk) + # if the file contains the string "NoSuchKey" then the url returned + # that it could not locate the respective file + if open(source_file, "rb").read().find(b"NoSuchKey") > -1: + success = False + if success: + logging.info(f"Download {source_url} to {source_file} complete.") + else: + logging.info( + f"Unable to download {source_url} to {source_file} at this time. The URL may not exist." + ) + return success + + +def process_chunk( + df: pd.DataFrame, + target_file_batch: str, + target_file: str, + include_header: bool, + truncate_file: bool, + output_headers: typing.List[str], + pipeline_name: str, +) -> None: + if pipeline_name == "tlc_green_trips": + df["distance_between_service"] = "" + df["time_between_service"] = "" + df = format_date_time(df, "pickup_datetime", "strftime", "%Y-%m-%d %H:%M:%S") + df = format_date_time(df, "dropoff_datetime", "strftime", "%Y-%m-%d %H:%M:%S") + df = remove_null_rows(df) + df = df[output_headers] + save_to_new_file(df, file_path=str(target_file_batch)) + append_batch_file(target_file_batch, target_file, include_header, truncate_file) + logging.info(f"Processing Batch {target_file_batch} completed") + + +def remove_null_rows(df: pd.DataFrame) -> pd.DataFrame: + logging.info("Removing Null rows... ") + df = df.dropna(axis=0, subset=["vendor_id"]) + return df + + +def format_date_time( + df: pd.DataFrame, field_name: str, str_pf_time: str, dt_format: str +) -> pd.DataFrame: + if str_pf_time == "strptime": + logging.info( + f"Transform: Formatting datetime for field {field_name} from datetime to {dt_format} " + ) + df[field_name] = df[field_name].apply(lambda x: datetime.strptime(x, dt_format)) + else: + logging.info( + f"Transform: Formatting datetime for field {field_name} from {dt_format} to datetime " + ) + df[field_name] = df[field_name].dt.strftime(dt_format) + return df + + +def save_to_new_file(df, file_path, sep="|") -> None: + logging.info(f"Saving to file {file_path} separator='{sep}'") + df.to_csv(file_path, sep=sep, index=False) + + +def append_batch_file( + batch_file_path: str, + target_file_path: str, + include_header: bool, + truncate_target_file: bool, +) -> None: + logging.info( + f"Appending file {batch_file_path} to file {target_file_path} with include_header={include_header} and truncate_target_file={truncate_target_file}" + ) + data_file = open(batch_file_path, "r") + if truncate_target_file: + target_file = open(target_file_path, "w+").close() + target_file = open(target_file_path, "a+") + if not include_header: + logging.info( + f"Appending batch file {batch_file_path} to {target_file_path} without header" + ) + next(data_file) + else: + logging.info( + f"Appending batch file {batch_file_path} to {target_file_path} with header" + ) + target_file.write(data_file.read()) + data_file.close() + target_file.close() + if os.path.exists(batch_file_path): + os.remove(batch_file_path) + + +def upload_file_to_gcs( + file_path: pathlib.Path, target_gcs_bucket: str, target_gcs_path: str +) -> None: + if os.path.exists(file_path): + logging.info( + f"Uploading output file to gs://{target_gcs_bucket}/{target_gcs_path}" + ) + storage_client = storage.Client() + bucket = storage_client.bucket(target_gcs_bucket) + blob = bucket.blob(target_gcs_path) + blob.upload_from_filename(file_path) + else: + logging.info( + f"Cannot upload file to gs://{target_gcs_bucket}/{target_gcs_path} as it does not exist." + ) + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + main( + source_url=os.environ["SOURCE_URL"], + source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), + target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), + project_id=os.environ["PROJECT_ID"], + dataset_id=os.environ["DATASET_ID"], + table_id=os.environ["TABLE_ID"], + schema_path=os.environ["SCHEMA_PATH"], + chunksize=os.environ["CHUNKSIZE"], + target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], + target_gcs_path=os.environ["TARGET_GCS_PATH"], + pipeline_name=os.environ["PIPELINE_NAME"], + input_headers=json.loads(os.environ["INPUT_CSV_HEADERS"]), + data_dtypes=json.loads(os.environ["DATA_DTYPES"]), + output_headers=json.loads(os.environ["OUTPUT_CSV_HEADERS"]), + ) diff --git a/datasets/new_york_taxi_trips/_images/run_csv_transform_kub/green_trips_schema.json b/datasets/new_york_taxi_trips/_images/run_csv_transform_kub/green_trips_schema.json new file mode 100644 index 000000000..a2fa3775f --- /dev/null +++ b/datasets/new_york_taxi_trips/_images/run_csv_transform_kub/green_trips_schema.json @@ -0,0 +1,128 @@ +[ + { + "name": "vendor_id", + "type": "STRING", + "description": "A code indicating the LPEP provider that provided the record. 1= Creative Mobile Technologies, LLC; 2= VeriFone Inc.", + "mode": "REQUIRED" + }, + { + "name": "pickup_datetime", + "type": "TIMESTAMP", + "description": "The date and time when the meter was engaged", + "mode": "NULLABLE" + }, + { + "name": "dropoff_datetime", + "type": "TIMESTAMP", + "description": "The date and time when the meter was disengaged", + "mode": "NULLABLE" + }, + { + "name": "store_and_fwd_flag", + "type": "STRING", + "description": "This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka 'store and forward,' because the vehicle did not have a connection to the server. Y= store and forward trip N= not a store and forward trip", + "mode": "NULLABLE" + }, + { + "name": "rate_code", + "type": "STRING", + "description": "The final rate code in effect at the end of the trip. 1= Standard rate 2=JFK 3=Newark 4=Nassau or Westchester 5=Negotiated fare 6=Group ride", + "mode": "NULLABLE" + }, + { + "name": "passenger_count", + "type": "INTEGER", + "description": "The number of passengers in the vehicle. This is a driver-entered value.", + "mode": "NULLABLE" + }, + { + "name": "trip_distance", + "type": "NUMERIC", + "description": "The elapsed trip distance in miles reported by the taximeter.", + "mode": "NULLABLE" + }, + { + "name": "fare_amount", + "type": "NUMERIC", + "description": "The time-and-distance fare calculated by the meter", + "mode": "NULLABLE" + }, + { + "name": "extra", + "type": "NUMERIC", + "description": "Miscellaneous extras and surcharges. Currently, this only includes the $0.50 and $1 rush hour and overnight charges", + "mode": "NULLABLE" + }, + { + "name": "mta_tax", + "type": "NUMERIC", + "description": "$0.50 MTA tax that is automatically triggered based on the metered rate in use", + "mode": "NULLABLE" + }, + { + "name": "tip_amount", + "type": "NUMERIC", + "description": "Tip amount. This field is automatically populated for credit card tips. Cash tips are not included.", + "mode": "NULLABLE" + }, + { + "name": "tolls_amount", + "type": "NUMERIC", + "description": "Total amount of all tolls paid in trip.", + "mode": "NULLABLE" + }, + { + "name": "ehail_fee", + "type": "NUMERIC", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "total_amount", + "type": "NUMERIC", + "description": "The total amount charged to passengers. Does not include cash tips.", + "mode": "NULLABLE" + }, + { + "name": "payment_type", + "type": "string", + "description": "A numeric code signifying how the passenger paid for the trip. 1= Credit card 2= Cash 3= No charge 4= Dispute 5= Unknown 6= Voided trip", + "mode": "NULLABLE" + }, + { + "name": "distance_between_service", + "type": "NUMERIC", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "time_between_service", + "type": "INTEGER", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "trip_type", + "type": "STRING", + "description": "A code indicating whether the trip was a street-hail or a dispatch that is automatically assigned based on the metered rate in use but can be altered by the driver. 1= Street-hail 2= Dispatch", + "mode": "NULLABLE" + }, + { + "name": "imp_surcharge", + "type": "NUMERIC", + "description": "$0.30 improvement surcharge assessed on hailed trips at the flag drop. The improvement surcharge began being levied in 2015.", + "mode": "NULLABLE" + }, + { + "name": "pickup_location_id", + "type": "STRING", + "description": "TLC Taxi Zone in which the taximeter was engaged", + "mode": "NULLABLE" + }, + { + "name": "dropoff_location_id", + "type": "STRING", + "description": "TLC Taxi Zone in which the taximeter was disengaged", + "mode": "NULLABLE" + } +] diff --git a/datasets/new_york_taxi_trips/_images/run_csv_transform_kub/requirements.txt b/datasets/new_york_taxi_trips/_images/run_csv_transform_kub/requirements.txt new file mode 100644 index 000000000..f87f393f3 --- /dev/null +++ b/datasets/new_york_taxi_trips/_images/run_csv_transform_kub/requirements.txt @@ -0,0 +1,4 @@ +google-cloud-storage +google-cloud-bigquery +pandas +requests diff --git a/datasets/new_york_taxi_trips/_images/run_csv_transform_kub/yellow_trips_schema.json b/datasets/new_york_taxi_trips/_images/run_csv_transform_kub/yellow_trips_schema.json new file mode 100644 index 000000000..aea390c5f --- /dev/null +++ b/datasets/new_york_taxi_trips/_images/run_csv_transform_kub/yellow_trips_schema.json @@ -0,0 +1,104 @@ +[ + { + "name": "vendor_id", + "type": "STRING", + "description": "A code indicating the LPEP provider that provided the record. 1= Creative Mobile Technologies, LLC; 2= VeriFone Inc.", + "mode": "REQUIRED" + }, + { + "name": "pickup_datetime", + "type": "TIMESTAMP", + "description": "The date and time when the meter was engaged", + "mode": "NULLABLE" + }, + { + "name": "dropoff_datetime", + "type": "TIMESTAMP", + "description": "The date and time when the meter was disengaged", + "mode": "NULLABLE" + }, + { + "name": "passenger_count", + "type": "INTEGER", + "description": "The number of passengers in the vehicle. This is a driver-entered value.", + "mode": "NULLABLE" + }, + { + "name": "trip_distance", + "type": "NUMERIC", + "description": "The elapsed trip distance in miles reported by the taximeter.", + "mode": "NULLABLE" + }, + { + "name": "rate_code", + "type": "STRING", + "description": "The final rate code in effect at the end of the trip. 1= Standard rate 2=JFK 3=Newark 4=Nassau or Westchester 5=Negotiated fare 6=Group ride", + "mode": "NULLABLE" + }, + { + "name": "store_and_fwd_flag", + "type": "STRING", + "description": "This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka 'store and forward,' because the vehicle did not have a connection to the server. Y= store and forward trip N= not a store and forward trip", + "mode": "NULLABLE" + }, + { + "name": "payment_type", + "type": "string", + "description": "A numeric code signifying how the passenger paid for the trip. 1= Credit card 2= Cash 3= No charge 4= Dispute 5= Unknown 6= Voided trip", + "mode": "NULLABLE" + }, + { + "name": "fare_amount", + "type": "NUMERIC", + "description": "The time-and-distance fare calculated by the meter", + "mode": "NULLABLE" + }, + { + "name": "extra", + "type": "NUMERIC", + "description": "Miscellaneous extras and surcharges. Currently, this only includes the $0.50 and $1 rush hour and overnight charges", + "mode": "NULLABLE" + }, + { + "name": "mta_tax", + "type": "NUMERIC", + "description": "$0.50 MTA tax that is automatically triggered based on the metered rate in use", + "mode": "NULLABLE" + }, + { + "name": "tip_amount", + "type": "NUMERIC", + "description": "Tip amount. This field is automatically populated for credit card tips. Cash tips are not included.", + "mode": "NULLABLE" + }, + { + "name": "tolls_amount", + "type": "NUMERIC", + "description": "Total amount of all tolls paid in trip.", + "mode": "NULLABLE" + }, + { + "name": "imp_surcharge", + "type": "NUMERIC", + "description": "$0.30 improvement surcharge assessed on hailed trips at the flag drop. The improvement surcharge began being levied in 2015.", + "mode": "NULLABLE" + }, + { + "name": "total_amount", + "type": "NUMERIC", + "description": "The total amount charged to passengers. Does not include cash tips.", + "mode": "NULLABLE" + }, + { + "name": "pickup_location_id", + "type": "STRING", + "description": "TLC Taxi Zone in which the taximeter was engaged", + "mode": "NULLABLE" + }, + { + "name": "dropoff_location_id", + "type": "STRING", + "description": "TLC Taxi Zone in which the taximeter was disengaged", + "mode": "NULLABLE" + } +] diff --git a/datasets/new_york_taxi_trips/infra/new_york_taxi_trips_dataset.tf b/datasets/new_york_taxi_trips/infra/new_york_taxi_trips_dataset.tf new file mode 100644 index 000000000..890312890 --- /dev/null +++ b/datasets/new_york_taxi_trips/infra/new_york_taxi_trips_dataset.tf @@ -0,0 +1,25 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_dataset" "new_york_taxi_trips" { + dataset_id = "new_york_taxi_trips" + project = var.project_id +} + +output "bigquery_dataset-new_york_taxi_trips-dataset_id" { + value = google_bigquery_dataset.new_york_taxi_trips.dataset_id +} diff --git a/datasets/new_york_taxi_trips/infra/new_york_taxi_trips_pipeline.tf b/datasets/new_york_taxi_trips/infra/new_york_taxi_trips_pipeline.tf new file mode 100644 index 000000000..cf18c92d0 --- /dev/null +++ b/datasets/new_york_taxi_trips/infra/new_york_taxi_trips_pipeline.tf @@ -0,0 +1,52 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "new_york_taxi_trips_tlc_green_trips" { + project = var.project_id + dataset_id = "new_york_taxi_trips" + table_id = "tlc_green_trips" + description = "New York green taxi trips table" + depends_on = [ + google_bigquery_dataset.new_york_taxi_trips + ] +} + +output "bigquery_table-new_york_taxi_trips_tlc_green_trips-table_id" { + value = google_bigquery_table.new_york_taxi_trips_tlc_green_trips.table_id +} + +output "bigquery_table-new_york_taxi_trips_tlc_green_trips-id" { + value = google_bigquery_table.new_york_taxi_trips_tlc_green_trips.id +} + +resource "google_bigquery_table" "new_york_taxi_trips_tlc_yellow_trips" { + project = var.project_id + dataset_id = "new_york_taxi_trips" + table_id = "tlc_yellow_trips" + description = "New York yellow taxi trips table" + depends_on = [ + google_bigquery_dataset.new_york_taxi_trips + ] +} + +output "bigquery_table-new_york_taxi_trips_tlc_yellow_trips-table_id" { + value = google_bigquery_table.new_york_taxi_trips_tlc_yellow_trips.table_id +} + +output "bigquery_table-new_york_taxi_trips_tlc_yellow_trips-id" { + value = google_bigquery_table.new_york_taxi_trips_tlc_yellow_trips.id +} diff --git a/datasets/new_york_taxi_trips/infra/provider.tf b/datasets/new_york_taxi_trips/infra/provider.tf new file mode 100644 index 000000000..23ab87dcd --- /dev/null +++ b/datasets/new_york_taxi_trips/infra/provider.tf @@ -0,0 +1,28 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +provider "google" { + project = var.project_id + impersonate_service_account = var.impersonating_acct + region = var.region +} + +data "google_client_openid_userinfo" "me" {} + +output "impersonating-account" { + value = data.google_client_openid_userinfo.me.email +} diff --git a/datasets/new_york_taxi_trips/infra/variables.tf b/datasets/new_york_taxi_trips/infra/variables.tf new file mode 100644 index 000000000..53f483735 --- /dev/null +++ b/datasets/new_york_taxi_trips/infra/variables.tf @@ -0,0 +1,26 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +variable "project_id" {} +variable "bucket_name_prefix" {} +variable "impersonating_acct" {} +variable "region" {} +variable "env" {} +variable "iam_policies" { + default = {} +} + diff --git a/datasets/new_york_taxi_trips/pipelines/_images/run_csv_transform_kub/Dockerfile b/datasets/new_york_taxi_trips/pipelines/_images/run_csv_transform_kub/Dockerfile new file mode 100644 index 000000000..7265a1b71 --- /dev/null +++ b/datasets/new_york_taxi_trips/pipelines/_images/run_csv_transform_kub/Dockerfile @@ -0,0 +1,37 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The base image for this build +FROM python:3.8 + +# Allow statements and log messages to appear in Cloud logs +ENV PYTHONUNBUFFERED True + +# Copy the requirements file into the image +COPY requirements.txt ./ + +# Install the packages specified in the requirements file +RUN python3 -m pip install --no-cache-dir -r requirements.txt + +# The WORKDIR instruction sets the working directory for any RUN, CMD, +# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. +# If the WORKDIR doesn’t exist, it will be created even if it’s not used in +# any subsequent Dockerfile instruction +WORKDIR /custom + +# Copy the specific data processing script/s in the image under /custom/* +COPY ./csv_transform.py . + +# Command to run the data processing script when the container is run +CMD ["python3", "csv_transform.py"] diff --git a/datasets/new_york_taxi_trips/pipelines/_images/run_csv_transform_kub/csv_transform.py b/datasets/new_york_taxi_trips/pipelines/_images/run_csv_transform_kub/csv_transform.py new file mode 100644 index 000000000..d4e1358c4 --- /dev/null +++ b/datasets/new_york_taxi_trips/pipelines/_images/run_csv_transform_kub/csv_transform.py @@ -0,0 +1,536 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import os +import pathlib +import typing +from datetime import datetime + +import pandas as pd +import requests +from google.cloud import bigquery, storage +from google.cloud.exceptions import NotFound + + +def main( + source_url: str, + source_file: pathlib.Path, + target_file: pathlib.Path, + project_id: str, + dataset_id: str, + table_id: str, + data_file_year_field: str, + data_file_month_field: str, + schema_path: str, + chunksize: str, + target_gcs_bucket: str, + target_gcs_path: str, + pipeline_name: str, + input_headers: typing.List[str], + data_dtypes: dict, + output_headers: typing.List[str], +) -> None: + logging.info(f"New York taxi trips - {pipeline_name} process started") + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + execute_pipeline( + source_url, + str(source_file), + str(target_file), + project_id, + dataset_id, + table_id, + data_file_year_field, + data_file_month_field, + schema_path, + chunksize, + target_gcs_bucket, + target_gcs_path, + pipeline_name, + input_headers, + data_dtypes, + output_headers, + ) + logging.info(f"New York taxi trips - {pipeline_name} process completed") + + +def execute_pipeline( + source_url: str, + source_file: str, + target_file: str, + project_id: str, + dataset_id: str, + table_id: str, + data_file_year_field: str, + data_file_month_field: str, + schema_path: str, + chunksize: str, + target_gcs_bucket: str, + target_gcs_path: str, + pipeline_name: str, + input_headers: typing.List[str], + data_dtypes: dict, + output_headers: typing.List[str], +) -> None: + for year_number in range(datetime.now().year, (datetime.now().year - 6), -1): + process_year_data( + source_url=source_url, + year_number=int(year_number), + source_file=source_file, + target_file=target_file, + project_id=project_id, + dataset_id=dataset_id, + table_id=table_id, + data_file_year_field=data_file_year_field, + data_file_month_field=data_file_month_field, + schema_path=schema_path, + chunksize=chunksize, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + pipeline_name=pipeline_name, + input_headers=input_headers, + data_dtypes=data_dtypes, + output_headers=output_headers, + ) + + +def process_year_data( + source_url: str, + year_number: int, + source_file: str, + target_file: str, + project_id: str, + dataset_id: str, + table_id: str, + data_file_year_field: str, + data_file_month_field: str, + schema_path: str, + chunksize: str, + target_gcs_bucket: str, + target_gcs_path: str, + pipeline_name: str, + input_headers: typing.List[str], + data_dtypes: dict, + output_headers: typing.List[str], +) -> None: + logging.info(f"Processing year {year_number}") + destination_table = f"{table_id}_{year_number}" + for month_number in range(1, 13): + padded_month = str(month_number).zfill(2) + process_year_month = f"{year_number}-{padded_month}" + logging.info(f"Processing month {process_year_month}") + month_data_already_loaded = table_has_month_data( + project_id=project_id, + dataset_id=dataset_id, + table_name=destination_table, + data_file_year_field=data_file_year_field, + year_number=year_number, + data_file_month_field=data_file_month_field, + month_number=month_number, + ) + if month_data_already_loaded: + logging.info(f"{process_year_month} data is already loaded. Skipping.") + else: + target_file_name = str.replace( + target_file, ".csv", f"_{year_number}-{month_number}.csv" + ) + process_month( + source_url=source_url, + year_number=year_number, + month_number=month_number, + source_file=source_file, + target_file=target_file, + project_id=project_id, + dataset_id=dataset_id, + table_id=destination_table, + target_file_name=target_file_name, + schema_path=schema_path, + chunksize=chunksize, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + input_headers=input_headers, + data_dtypes=data_dtypes, + output_headers=output_headers, + pipeline_name=pipeline_name, + ) + logging.info(f"Processing year {year_number} completed") + + +def table_has_month_data( + project_id: str, + dataset_id: str, + table_name: str, + data_file_year_field: str, + year_number: int, + data_file_month_field: str, + month_number: int, +) -> bool: + check_field_exists = field_exists( + project_id, dataset_id, table_name, data_file_month_field + ) + if check_field_exists: + client = bigquery.Client(project=project_id) + query = f""" + SELECT count(1) AS number_of_rows + FROM {dataset_id}.{table_name} + WHERE {data_file_year_field} = {year_number} + AND {data_file_month_field} = {month_number} + """ + job_config = bigquery.QueryJobConfig() + query_job = client.query(query, job_config=job_config) + for row in query_job.result(): + count_rows = row.number_of_rows + if int(count_rows) > 0: + return True + else: + return False + else: + return None + + +def table_exists(project_id: str, dataset_id: str, table_name: str) -> bool: + client = bigquery.Client(project=project_id) + tables = client.list_tables(dataset_id) + found_table = False + for tbl in tables: + if tbl.table_id == table_name: + found_table = True + return found_table + + +def field_exists( + project_id: str, dataset_id: str, table_name: str, field_name: str +) -> bool: + if table_exists(project_id, dataset_id, table_name): + client = bigquery.Client(project=project_id) + table_ref = f"{dataset_id}.{table_name}" + tbl_schema = client.get_table(table_ref).schema + found_field = False + for field in tbl_schema: + if field.name == field_name: + found_field = True + return found_field + else: + return False + + +def load_data_to_bq( + project_id: str, + dataset_id: str, + table_id: str, + file_path: str, + field_delimiter: str, +) -> None: + logging.info( + f"Loading data from {file_path} into {project_id}.{dataset_id}.{table_id} started" + ) + client = bigquery.Client(project=project_id) + table_ref = client.dataset(dataset_id).table(table_id) + job_config = bigquery.LoadJobConfig() + job_config.source_format = bigquery.SourceFormat.CSV + job_config.field_delimiter = field_delimiter + job_config.skip_leading_rows = 1 # ignore the header + job_config.autodetect = False + with open(file_path, "rb") as source_file: + job = client.load_table_from_file(source_file, table_ref, job_config=job_config) + job.result() + logging.info( + f"Loading data from {file_path} into {project_id}.{dataset_id}.{table_id} completed" + ) + + +def create_dest_table( + project_id: str, + dataset_id: str, + table_id: str, + schema_filepath: list, + bucket_name: str, +) -> None: + table_ref = f"{project_id}.{dataset_id}.{table_id}" + logging.info(f"Attempting to create table {table_ref} if it doesn't already exist") + client = bigquery.Client() + try: + table_exists_id = client.get_table(table_ref).table_id + logging.info(f"Table {table_exists_id} currently exists.") + except NotFound: + logging.info( + ( + f"Table {table_ref} currently does not exist. Attempting to create table." + ) + ) + schema = create_table_schema([], bucket_name, schema_filepath) + table = bigquery.Table(table_ref, schema=schema) + client.create_table(table) + print(f"Table {table_ref} was created".format(table_id)) + + +def create_table_schema( + schema_structure: list, bucket_name: str = "", schema_filepath: str = "" +) -> list: + logging.info(f"Defining table schema... {bucket_name} ... {schema_filepath}") + schema = [] + if not (schema_filepath): + schema_struct = schema_structure + else: + storage_client = storage.Client() + bucket = storage_client.get_bucket(bucket_name) + blob = bucket.blob(schema_filepath) + schema_struct = json.loads(blob.download_as_string(client=None)) + for schema_field in schema_struct: + fld_name = schema_field["name"] + fld_type = schema_field["type"] + try: + fld_descr = schema_field["description"] + except KeyError: + fld_descr = "" + fld_mode = schema_field["mode"] + schema.append( + bigquery.SchemaField( + name=fld_name, field_type=fld_type, mode=fld_mode, description=fld_descr + ) + ) + return schema + + +def process_month( + source_url: str, + year_number: int, + month_number: int, + source_file: str, + target_file: str, + project_id: str, + dataset_id: str, + table_id: str, + target_file_name: str, + schema_path: str, + chunksize: str, + target_gcs_bucket: str, + target_gcs_path: str, + input_headers: typing.List[str], + data_dtypes: dict, + output_headers: typing.List[str], + pipeline_name: str, +) -> None: + padded_month = str(month_number).zfill(2) + process_year_month = f"{year_number}-{padded_month}" + source_url_to_process = f"{source_url}{process_year_month}.csv" + source_file_to_process = str(source_file).replace( + ".csv", f"_{process_year_month}.csv" + ) + successful_download = download_file(source_url_to_process, source_file_to_process) + if successful_download: + with pd.read_csv( + source_file_to_process, + engine="python", + encoding="utf-8", + quotechar='"', + chunksize=int(chunksize), + sep=",", + names=input_headers, + skiprows=1, + dtype=data_dtypes, + ) as reader: + for chunk_number, chunk in enumerate(reader): + logging.info( + f"Processing chunk #{chunk_number} of file {process_year_month} started" + ) + target_file_batch = str(target_file).replace( + ".csv", f"-{process_year_month}-{chunk_number}.csv" + ) + df = pd.DataFrame() + df = pd.concat([df, chunk]) + process_chunk( + df, + target_file_batch, + target_file_name, + month_number == 1 and chunk_number == 0, + month_number == 1 and chunk_number == 0, + output_headers, + pipeline_name, + year_number, + month_number, + ) + logging.info( + f"Processing chunk #{chunk_number} of file {process_year_month} completed" + ) + if not table_exists(project_id, dataset_id, table_id): + # Destination able doesn't exist + create_dest_table( + project_id=project_id, + dataset_id=dataset_id, + table_id=table_id, + schema_filepath=schema_path, + bucket_name=target_gcs_bucket, + ) + load_data_to_bq( + project_id=project_id, + dataset_id=dataset_id, + table_id=table_id, + file_path=target_file_name, + field_delimiter="|", + ) + upload_file_to_gcs( + file_path=target_file_name, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=str(target_gcs_path).replace( + ".csv", f"_{process_year_month}.csv" + ), + ) + else: + logging.info( + f"Informational: The data file {target_file_name} was not generated because no data was available for year {year_number}. Continuing." + ) + logging.info(f"Processing {process_year_month} completed") + + +def download_file(source_url: str, source_file: pathlib.Path) -> bool: + logging.info(f"Downloading {source_url} into {source_file}") + success = True + r = requests.get(source_url, stream=True) + with open(source_file, "wb") as f: + for chunk in r: + f.write(chunk) + # if the file contains the string "NoSuchKey" then the url returned + # that it could not locate the respective file + if open(source_file, "rb").read().find(b"NoSuchKey") > -1: + success = False + if success: + logging.info(f"Download {source_url} to {source_file} complete.") + else: + logging.info( + f"Unable to download {source_url} to {source_file} at this time. The URL may not exist." + ) + return success + + +def process_chunk( + df: pd.DataFrame, + target_file_batch: str, + target_file: str, + include_header: bool, + truncate_file: bool, + output_headers: typing.List[str], + pipeline_name: str, + year_number: int, + month_number: int, +) -> None: + if pipeline_name == "tlc_green_trips": + df["distance_between_service"] = "" + df["time_between_service"] = "" + df["data_file_year"] = year_number + df["data_file_month"] = month_number + df = format_date_time(df, "pickup_datetime", "strftime", "%Y-%m-%d %H:%M:%S") + df = format_date_time(df, "dropoff_datetime", "strftime", "%Y-%m-%d %H:%M:%S") + df = remove_null_rows(df) + df = df[output_headers] + save_to_new_file(df, file_path=str(target_file_batch)) + append_batch_file(target_file_batch, target_file, include_header, truncate_file) + logging.info(f"Processing Batch {target_file_batch} completed") + + +def remove_null_rows(df: pd.DataFrame) -> pd.DataFrame: + logging.info("Removing Null rows... ") + df = df.dropna(axis=0, subset=["vendor_id"]) + return df + + +def format_date_time( + df: pd.DataFrame, field_name: str, str_pf_time: str, dt_format: str +) -> pd.DataFrame: + if str_pf_time == "strptime": + logging.info( + f"Transform: Formatting datetime for field {field_name} from datetime to {dt_format} " + ) + df[field_name] = df[field_name].apply(lambda x: datetime.strptime(x, dt_format)) + else: + logging.info( + f"Transform: Formatting datetime for field {field_name} from {dt_format} to datetime " + ) + df[field_name] = df[field_name].dt.strftime(dt_format) + return df + + +def save_to_new_file(df, file_path, sep="|") -> None: + logging.info(f"Saving to file {file_path} separator='{sep}'") + df.to_csv(file_path, sep=sep, index=False) + + +def append_batch_file( + batch_file_path: str, + target_file_path: str, + include_header: bool, + truncate_target_file: bool, +) -> None: + logging.info( + f"Appending file {batch_file_path} to file {target_file_path} with include_header={include_header} and truncate_target_file={truncate_target_file}" + ) + data_file = open(batch_file_path, "r") + if truncate_target_file: + target_file = open(target_file_path, "w+").close() + target_file = open(target_file_path, "a+") + if not include_header: + logging.info( + f"Appending batch file {batch_file_path} to {target_file_path} without header" + ) + next(data_file) + else: + logging.info( + f"Appending batch file {batch_file_path} to {target_file_path} with header" + ) + target_file.write(data_file.read()) + data_file.close() + target_file.close() + if os.path.exists(batch_file_path): + os.remove(batch_file_path) + + +def upload_file_to_gcs( + file_path: pathlib.Path, target_gcs_bucket: str, target_gcs_path: str +) -> None: + if os.path.exists(file_path): + logging.info( + f"Uploading output file {file_path} to gs://{target_gcs_bucket}/{target_gcs_path}" + ) + storage_client = storage.Client() + bucket = storage_client.bucket(target_gcs_bucket) + blob = bucket.blob(target_gcs_path) + blob.upload_from_filename(file_path) + else: + logging.info( + f"Cannot upload file {file_path} to gs://{target_gcs_bucket}/{target_gcs_path} as it does not exist." + ) + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + main( + source_url=os.environ["SOURCE_URL"], + source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), + target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), + project_id=os.environ["PROJECT_ID"], + dataset_id=os.environ["DATASET_ID"], + table_id=os.environ["TABLE_ID"], + data_file_year_field=os.environ["DATA_FILE_YEAR_FIELD"], + data_file_month_field=os.environ["DATA_FILE_MONTH_FIELD"], + schema_path=os.environ["SCHEMA_PATH"], + chunksize=os.environ["CHUNKSIZE"], + target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], + target_gcs_path=os.environ["TARGET_GCS_PATH"], + pipeline_name=os.environ["PIPELINE_NAME"], + input_headers=json.loads(os.environ["INPUT_CSV_HEADERS"]), + data_dtypes=json.loads(os.environ["DATA_DTYPES"]), + output_headers=json.loads(os.environ["OUTPUT_CSV_HEADERS"]), + ) diff --git a/datasets/new_york_taxi_trips/pipelines/_images/run_csv_transform_kub/green_trips_schema.json b/datasets/new_york_taxi_trips/pipelines/_images/run_csv_transform_kub/green_trips_schema.json new file mode 100644 index 000000000..562864dac --- /dev/null +++ b/datasets/new_york_taxi_trips/pipelines/_images/run_csv_transform_kub/green_trips_schema.json @@ -0,0 +1,140 @@ +[ + { + "name": "vendor_id", + "type": "STRING", + "description": "A code indicating the LPEP provider that provided the record. 1= Creative Mobile Technologies, LLC; 2= VeriFone Inc.", + "mode": "REQUIRED" + }, + { + "name": "pickup_datetime", + "type": "TIMESTAMP", + "description": "The date and time when the meter was engaged", + "mode": "NULLABLE" + }, + { + "name": "dropoff_datetime", + "type": "TIMESTAMP", + "description": "The date and time when the meter was disengaged", + "mode": "NULLABLE" + }, + { + "name": "store_and_fwd_flag", + "type": "STRING", + "description": "This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka 'store and forward,' because the vehicle did not have a connection to the server. Y= store and forward trip N= not a store and forward trip", + "mode": "NULLABLE" + }, + { + "name": "rate_code", + "type": "STRING", + "description": "The final rate code in effect at the end of the trip. 1= Standard rate 2=JFK 3=Newark 4=Nassau or Westchester 5=Negotiated fare 6=Group ride", + "mode": "NULLABLE" + }, + { + "name": "passenger_count", + "type": "INTEGER", + "description": "The number of passengers in the vehicle. This is a driver-entered value.", + "mode": "NULLABLE" + }, + { + "name": "trip_distance", + "type": "NUMERIC", + "description": "The elapsed trip distance in miles reported by the taximeter.", + "mode": "NULLABLE" + }, + { + "name": "fare_amount", + "type": "NUMERIC", + "description": "The time-and-distance fare calculated by the meter", + "mode": "NULLABLE" + }, + { + "name": "extra", + "type": "NUMERIC", + "description": "Miscellaneous extras and surcharges. Currently, this only includes the $0.50 and $1 rush hour and overnight charges", + "mode": "NULLABLE" + }, + { + "name": "mta_tax", + "type": "NUMERIC", + "description": "$0.50 MTA tax that is automatically triggered based on the metered rate in use", + "mode": "NULLABLE" + }, + { + "name": "tip_amount", + "type": "NUMERIC", + "description": "Tip amount. This field is automatically populated for credit card tips. Cash tips are not included.", + "mode": "NULLABLE" + }, + { + "name": "tolls_amount", + "type": "NUMERIC", + "description": "Total amount of all tolls paid in trip.", + "mode": "NULLABLE" + }, + { + "name": "ehail_fee", + "type": "NUMERIC", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "total_amount", + "type": "NUMERIC", + "description": "The total amount charged to passengers. Does not include cash tips.", + "mode": "NULLABLE" + }, + { + "name": "payment_type", + "type": "string", + "description": "A numeric code signifying how the passenger paid for the trip. 1= Credit card 2= Cash 3= No charge 4= Dispute 5= Unknown 6= Voided trip", + "mode": "NULLABLE" + }, + { + "name": "distance_between_service", + "type": "NUMERIC", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "time_between_service", + "type": "INTEGER", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "trip_type", + "type": "STRING", + "description": "A code indicating whether the trip was a street-hail or a dispatch that is automatically assigned based on the metered rate in use but can be altered by the driver. 1= Street-hail 2= Dispatch", + "mode": "NULLABLE" + }, + { + "name": "imp_surcharge", + "type": "NUMERIC", + "description": "$0.30 improvement surcharge assessed on hailed trips at the flag drop. The improvement surcharge began being levied in 2015.", + "mode": "NULLABLE" + }, + { + "name": "pickup_location_id", + "type": "STRING", + "description": "TLC Taxi Zone in which the taximeter was engaged", + "mode": "NULLABLE" + }, + { + "name": "dropoff_location_id", + "type": "STRING", + "description": "TLC Taxi Zone in which the taximeter was disengaged", + "mode": "NULLABLE" + }, + { + "name": "data_file_year", + "type": "INTEGER", + "description": "Datafile timestamp year value", + "mode": "NULLABLE" + }, + { + "name": "data_file_month", + "type": "INTEGER", + "description": "Datafile timestamp month value", + "mode": "NULLABLE" + } +] diff --git a/datasets/new_york_taxi_trips/pipelines/_images/run_csv_transform_kub/requirements.txt b/datasets/new_york_taxi_trips/pipelines/_images/run_csv_transform_kub/requirements.txt new file mode 100644 index 000000000..f87f393f3 --- /dev/null +++ b/datasets/new_york_taxi_trips/pipelines/_images/run_csv_transform_kub/requirements.txt @@ -0,0 +1,4 @@ +google-cloud-storage +google-cloud-bigquery +pandas +requests diff --git a/datasets/new_york_taxi_trips/pipelines/_images/run_csv_transform_kub/yellow_trips_schema.json b/datasets/new_york_taxi_trips/pipelines/_images/run_csv_transform_kub/yellow_trips_schema.json new file mode 100644 index 000000000..da5466cba --- /dev/null +++ b/datasets/new_york_taxi_trips/pipelines/_images/run_csv_transform_kub/yellow_trips_schema.json @@ -0,0 +1,116 @@ +[ + { + "name": "vendor_id", + "type": "STRING", + "description": "A code indicating the LPEP provider that provided the record. 1= Creative Mobile Technologies, LLC; 2= VeriFone Inc.", + "mode": "REQUIRED" + }, + { + "name": "pickup_datetime", + "type": "TIMESTAMP", + "description": "The date and time when the meter was engaged", + "mode": "NULLABLE" + }, + { + "name": "dropoff_datetime", + "type": "TIMESTAMP", + "description": "The date and time when the meter was disengaged", + "mode": "NULLABLE" + }, + { + "name": "passenger_count", + "type": "INTEGER", + "description": "The number of passengers in the vehicle. This is a driver-entered value.", + "mode": "NULLABLE" + }, + { + "name": "trip_distance", + "type": "NUMERIC", + "description": "The elapsed trip distance in miles reported by the taximeter.", + "mode": "NULLABLE" + }, + { + "name": "rate_code", + "type": "STRING", + "description": "The final rate code in effect at the end of the trip. 1= Standard rate 2=JFK 3=Newark 4=Nassau or Westchester 5=Negotiated fare 6=Group ride", + "mode": "NULLABLE" + }, + { + "name": "store_and_fwd_flag", + "type": "STRING", + "description": "This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka 'store and forward,' because the vehicle did not have a connection to the server. Y= store and forward trip N= not a store and forward trip", + "mode": "NULLABLE" + }, + { + "name": "payment_type", + "type": "string", + "description": "A numeric code signifying how the passenger paid for the trip. 1= Credit card 2= Cash 3= No charge 4= Dispute 5= Unknown 6= Voided trip", + "mode": "NULLABLE" + }, + { + "name": "fare_amount", + "type": "NUMERIC", + "description": "The time-and-distance fare calculated by the meter", + "mode": "NULLABLE" + }, + { + "name": "extra", + "type": "NUMERIC", + "description": "Miscellaneous extras and surcharges. Currently, this only includes the $0.50 and $1 rush hour and overnight charges", + "mode": "NULLABLE" + }, + { + "name": "mta_tax", + "type": "NUMERIC", + "description": "$0.50 MTA tax that is automatically triggered based on the metered rate in use", + "mode": "NULLABLE" + }, + { + "name": "tip_amount", + "type": "NUMERIC", + "description": "Tip amount. This field is automatically populated for credit card tips. Cash tips are not included.", + "mode": "NULLABLE" + }, + { + "name": "tolls_amount", + "type": "NUMERIC", + "description": "Total amount of all tolls paid in trip.", + "mode": "NULLABLE" + }, + { + "name": "imp_surcharge", + "type": "NUMERIC", + "description": "$0.30 improvement surcharge assessed on hailed trips at the flag drop. The improvement surcharge began being levied in 2015.", + "mode": "NULLABLE" + }, + { + "name": "total_amount", + "type": "NUMERIC", + "description": "The total amount charged to passengers. Does not include cash tips.", + "mode": "NULLABLE" + }, + { + "name": "pickup_location_id", + "type": "STRING", + "description": "TLC Taxi Zone in which the taximeter was engaged", + "mode": "NULLABLE" + }, + { + "name": "dropoff_location_id", + "type": "STRING", + "description": "TLC Taxi Zone in which the taximeter was disengaged", + "mode": "NULLABLE" + }, + { + "name": "data_file_year", + "type": "INTEGER", + "description": "Datafile timestamp year value", + "mode": "NULLABLE" + }, + { + "name": "data_file_month", + "type": "INTEGER", + "description": "Datafile timestamp month value", + "mode": "NULLABLE" + } +] diff --git a/datasets/new_york_taxi_trips/pipelines/dataset.yaml b/datasets/new_york_taxi_trips/pipelines/dataset.yaml new file mode 100644 index 000000000..46da9892e --- /dev/null +++ b/datasets/new_york_taxi_trips/pipelines/dataset.yaml @@ -0,0 +1,25 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +dataset: + name: new_york_taxi_trips + friendly_name: ~ + description: ~ + dataset_sources: ~ + terms_of_use: ~ + +resources: + - type: bigquery_dataset + dataset_id: new_york_taxi_trips + description: ~ diff --git a/datasets/new_york_taxi_trips/pipelines/new_york_taxi_trips/new_york_taxi_trips_dag.py b/datasets/new_york_taxi_trips/pipelines/new_york_taxi_trips/new_york_taxi_trips_dag.py new file mode 100644 index 000000000..500ac9e96 --- /dev/null +++ b/datasets/new_york_taxi_trips/pipelines/new_york_taxi_trips/new_york_taxi_trips_dag.py @@ -0,0 +1,125 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.providers.google.cloud.operators import kubernetes_engine + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="new_york_taxi_trips.new_york_taxi_trips", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + create_cluster = kubernetes_engine.GKECreateClusterOperator( + task_id="create_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + body={ + "name": "new-york-taxi-trips", + "initial_node_count": 2, + "network": "{{ var.value.vpc_network }}", + "node_config": { + "machine_type": "e2-standard-4", + "oauth_scopes": [ + "https://www.googleapis.com/auth/devstorage.read_write", + "https://www.googleapis.com/auth/cloud-platform", + ], + }, + }, + ) + + # Run CSV transform within kubernetes pod + green_trips = kubernetes_engine.GKEStartPodOperator( + task_id="green_trips", + startup_timeout_seconds=600, + name="load_tlc_green_trips", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="new-york-taxi-trips", + image_pull_policy="Always", + image="{{ var.json.new_york_taxi_trips.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "{{ var.json.new_york_taxi_trips.container_registry.green_trips_source_url }}", + "SOURCE_FILE": "files/data_green_trips.csv", + "TARGET_FILE": "files/data_output_green_trips.csv", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "{{ var.json.new_york_taxi_trips.container_registry.green_trips_dataset_id }}", + "TABLE_ID": "{{ var.json.new_york_taxi_trips.container_registry.green_trips_table_id }}", + "DATA_FILE_YEAR_FIELD": "data_file_year", + "DATA_FILE_MONTH_FIELD": "data_file_month", + "SCHEMA_PATH": "{{ var.json.new_york_taxi_trips.container_registry.green_trips_schema_path }}", + "CHUNKSIZE": "{{ var.json.new_york_taxi_trips.container_registry.green_trips_chunk_size }}", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "{{ var.json.new_york_taxi_trips.container_registry.green_trips_target_gcs_path }}", + "PIPELINE_NAME": "tlc_green_trips", + "INPUT_CSV_HEADERS": '["vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code",\n "pickup_location_id", "dropoff_location_id", "passenger_count", "trip_distance", "fare_amount",\n "extra", "mta_tax", "tip_amount", "tolls_amount", "ehail_fee",\n "imp_surcharge", "total_amount", "payment_type", "trip_type", "congestion_surcharge" ]', + "DATA_DTYPES": '{ "vendor_id": "str",\n "pickup_datetime": "datetime64[ns]",\n "dropoff_datetime": "datetime64[ns]",\n "store_and_fwd_flag": "str",\n "rate_code": "str",\n "pickup_location_id": "str",\n "dropoff_location_id": "str",\n "passenger_count": "str",\n "trip_distance": "float64",\n "fare_amount": "float64",\n "extra": "float64",\n "mta_tax": "float64",\n "tip_amount": "float64",\n "tolls_amount": "float64",\n "ehail_fee": "float64",\n "imp_surcharge": "float64",\n "total_amount": "float64",\n "payment_type": "str",\n "trip_type": "str",\n "congestion_surcharge": "float64" }', + "OUTPUT_CSV_HEADERS": '[ "vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code",\n "passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax",\n "tip_amount", "tolls_amount", "ehail_fee", "total_amount", "payment_type",\n "distance_between_service", "time_between_service", "trip_type", "imp_surcharge", "pickup_location_id",\n "dropoff_location_id", "data_file_year", "data_file_month" ]', + }, + resources={ + "request_memory": "12G", + "request_cpu": "1", + "request_ephemeral_storage": "16G", + }, + ) + + # Run CSV transform within kubernetes pod + yellow_trips = kubernetes_engine.GKEStartPodOperator( + task_id="yellow_trips", + startup_timeout_seconds=600, + name="load_tlc_yellow_trips", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="new-york-taxi-trips", + image_pull_policy="Always", + image="{{ var.json.new_york_taxi_trips.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_source_url }}", + "SOURCE_FILE": "files/data_yellow_trips.csv", + "TARGET_FILE": "files/data_output_yellow_trips.csv", + "DATA_FILE_YEAR_FIELD": "data_file_year", + "DATA_FILE_MONTH_FIELD": "data_file_month", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_dataset_id }}", + "TABLE_ID": "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_table_id }}", + "SCHEMA_PATH": "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_schema_path }}", + "CHUNKSIZE": "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_chunk_size }}", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_target_gcs_path }}", + "PIPELINE_NAME": "tlc_yellow_trips", + "INPUT_CSV_HEADERS": '[ "vendor_id", "pickup_datetime", "dropoff_datetime", "passenger_count", "trip_distance",\n "rate_code", "store_and_fwd_flag", "pickup_location_id", "dropoff_location_id",\n "payment_type", "fare_amount", "extra", "mta_tax", "tip_amount",\n "tolls_amount", "imp_surcharge", "total_amount", "congestion_surcharge" ]', + "DATA_DTYPES": '{ "vendor_id": "str",\n "pickup_datetime": "datetime64[ns]",\n "dropoff_datetime": "datetime64[ns]",\n "passenger_count": "str",\n "trip_distance": "float64",\n "rate_code": "str",\n "store_and_fwd_flag": "str",\n "pickup_location_id": "str",\n "dropoff_location_id": "str",\n "payment_type": "str",\n "fare_amount": "float64",\n "extra": "float64",\n "mta_tax": "float64",\n "tip_amount": "float64",\n "tolls_amount": "float64",\n "imp_surcharge": "float64",\n "total_amount": "float64",\n "congestion_surcharge": "float64" }', + "OUTPUT_CSV_HEADERS": '[ "vendor_id", "pickup_datetime", "dropoff_datetime", "passenger_count", "trip_distance",\n "rate_code", "store_and_fwd_flag", "payment_type", "fare_amount", "extra",\n "mta_tax", "tip_amount", "tolls_amount", "imp_surcharge", "total_amount",\n "pickup_location_id", "dropoff_location_id", "data_file_year", "data_file_month" ]', + }, + ) + delete_cluster = kubernetes_engine.GKEDeleteClusterOperator( + task_id="delete_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + name="new-york-taxi-trips", + ) + + create_cluster >> [green_trips, yellow_trips] >> delete_cluster diff --git a/datasets/new_york_taxi_trips/pipelines/new_york_taxi_trips/pipeline.yaml b/datasets/new_york_taxi_trips/pipelines/new_york_taxi_trips/pipeline.yaml new file mode 100644 index 000000000..4ee775731 --- /dev/null +++ b/datasets/new_york_taxi_trips/pipelines/new_york_taxi_trips/pipeline.yaml @@ -0,0 +1,175 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + - type: bigquery_table + table_id: "tlc_green_trips" + description: "New York green taxi trips table" + - type: bigquery_table + table_id: "tlc_yellow_trips" + description: "New York yellow taxi trips table" + +dag: + airflow_version: 2 + initialize: + dag_id: new_york_taxi_trips + default_args: + owner: "Google" + depends_on_past: False + start_date: "2021-03-01" + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + tasks: + - operator: "GKECreateClusterOperator" + args: + task_id: "create_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + body: + name: new-york-taxi-trips + initial_node_count: 2 + network: "{{ var.value.vpc_network }}" + node_config: + machine_type: e2-standard-4 + oauth_scopes: + - https://www.googleapis.com/auth/devstorage.read_write + - https://www.googleapis.com/auth/cloud-platform + - operator: "GKEStartPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "green_trips" + startup_timeout_seconds: 600 + name: "load_tlc_green_trips" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: new-york-taxi-trips + image_pull_policy: "Always" + image: "{{ var.json.new_york_taxi_trips.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URL: "{{ var.json.new_york_taxi_trips.container_registry.green_trips_source_url }}" + SOURCE_FILE: "files/data_green_trips.csv" + TARGET_FILE: "files/data_output_green_trips.csv" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "{{ var.json.new_york_taxi_trips.container_registry.green_trips_dataset_id }}" + TABLE_ID: "{{ var.json.new_york_taxi_trips.container_registry.green_trips_table_id }}" + DATA_FILE_YEAR_FIELD: "data_file_year" + DATA_FILE_MONTH_FIELD: "data_file_month" + SCHEMA_PATH: "{{ var.json.new_york_taxi_trips.container_registry.green_trips_schema_path }}" + CHUNKSIZE: "{{ var.json.new_york_taxi_trips.container_registry.green_trips_chunk_size }}" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "{{ var.json.new_york_taxi_trips.container_registry.green_trips_target_gcs_path }}" + PIPELINE_NAME: "tlc_green_trips" + INPUT_CSV_HEADERS: >- + ["vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code", + "pickup_location_id", "dropoff_location_id", "passenger_count", "trip_distance", "fare_amount", + "extra", "mta_tax", "tip_amount", "tolls_amount", "ehail_fee", + "imp_surcharge", "total_amount", "payment_type", "trip_type", "congestion_surcharge" ] + DATA_DTYPES: >- + { "vendor_id": "str", + "pickup_datetime": "datetime64[ns]", + "dropoff_datetime": "datetime64[ns]", + "store_and_fwd_flag": "str", + "rate_code": "str", + "pickup_location_id": "str", + "dropoff_location_id": "str", + "passenger_count": "str", + "trip_distance": "float64", + "fare_amount": "float64", + "extra": "float64", + "mta_tax": "float64", + "tip_amount": "float64", + "tolls_amount": "float64", + "ehail_fee": "float64", + "imp_surcharge": "float64", + "total_amount": "float64", + "payment_type": "str", + "trip_type": "str", + "congestion_surcharge": "float64" } + OUTPUT_CSV_HEADERS: >- + [ "vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code", + "passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax", + "tip_amount", "tolls_amount", "ehail_fee", "total_amount", "payment_type", + "distance_between_service", "time_between_service", "trip_type", "imp_surcharge", "pickup_location_id", + "dropoff_location_id", "data_file_year", "data_file_month" ] + resources: + request_memory: "12G" + request_cpu: "1" + request_ephemeral_storage: "16G" + - operator: "GKEStartPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "yellow_trips" + startup_timeout_seconds: 600 + name: "load_tlc_yellow_trips" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: new-york-taxi-trips + image_pull_policy: "Always" + image: "{{ var.json.new_york_taxi_trips.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URL: "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_source_url }}" + SOURCE_FILE: "files/data_yellow_trips.csv" + TARGET_FILE: "files/data_output_yellow_trips.csv" + DATA_FILE_YEAR_FIELD: "data_file_year" + DATA_FILE_MONTH_FIELD: "data_file_month" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_dataset_id }}" + TABLE_ID: "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_table_id }}" + SCHEMA_PATH: "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_schema_path }}" + CHUNKSIZE: "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_chunk_size }}" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_target_gcs_path }}" + PIPELINE_NAME: "tlc_yellow_trips" + INPUT_CSV_HEADERS: >- + [ "vendor_id", "pickup_datetime", "dropoff_datetime", "passenger_count", "trip_distance", + "rate_code", "store_and_fwd_flag", "pickup_location_id", "dropoff_location_id", + "payment_type", "fare_amount", "extra", "mta_tax", "tip_amount", + "tolls_amount", "imp_surcharge", "total_amount", "congestion_surcharge" ] + DATA_DTYPES: >- + { "vendor_id": "str", + "pickup_datetime": "datetime64[ns]", + "dropoff_datetime": "datetime64[ns]", + "passenger_count": "str", + "trip_distance": "float64", + "rate_code": "str", + "store_and_fwd_flag": "str", + "pickup_location_id": "str", + "dropoff_location_id": "str", + "payment_type": "str", + "fare_amount": "float64", + "extra": "float64", + "mta_tax": "float64", + "tip_amount": "float64", + "tolls_amount": "float64", + "imp_surcharge": "float64", + "total_amount": "float64", + "congestion_surcharge": "float64" } + OUTPUT_CSV_HEADERS: >- + [ "vendor_id", "pickup_datetime", "dropoff_datetime", "passenger_count", "trip_distance", + "rate_code", "store_and_fwd_flag", "payment_type", "fare_amount", "extra", + "mta_tax", "tip_amount", "tolls_amount", "imp_surcharge", "total_amount", + "pickup_location_id", "dropoff_location_id", "data_file_year", "data_file_month" ] + - operator: "GKEDeleteClusterOperator" + args: + task_id: "delete_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + name: new-york-taxi-trips + graph_paths: + - "create_cluster >> [ green_trips, yellow_trips ] >> delete_cluster"