Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Increase number of years to back date to 2009 in New York Taxi Trips Dataset #445

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
2025e09
fix: reference to container registry
nlarge-google Jun 14, 2022
df4e68b
feat: Onboard New York Taxi Trips Dataset
nlarge-google Jun 14, 2022
dd4f771
Update datasets/new_york_taxi_trips/pipelines/new_york_taxi_trips/pip…
adlersantos Jun 16, 2022
b2db15e
Update datasets/new_york_taxi_trips/pipelines/new_york_taxi_trips/pip…
adlersantos Jun 16, 2022
2f297af
Update datasets/new_york_taxi_trips/pipelines/new_york_taxi_trips/pip…
adlersantos Jun 16, 2022
1e9aa61
Update new_york_taxi_trips_dag.py
adlersantos Jun 16, 2022
86a9bdf
Update tlc_green_trips_pipeline.tf
adlersantos Jun 16, 2022
429088f
Delete new_york_dag.py
adlersantos Jun 16, 2022
8cab590
Delete pipeline.yaml
adlersantos Jun 16, 2022
bb6ce69
added back main branch files from NY pipeline
adlersantos Jun 16, 2022
79140cd
Merge branch 'main' into new_york_taxi_trips
adlersantos Jun 16, 2022
50391c8
revisions to TF files and added dataset.yaml
adlersantos Jun 16, 2022
2e7bfc3
fix: Resolve that files are no longer available from their initial so…
nlarge-google Aug 10, 2022
e74f12a
fix: Resolve flake hooks.
nlarge-google Aug 10, 2022
a136062
fix: Black hook issues.
nlarge-google Aug 10, 2022
a37c32d
fix: resolve data source dating back to (variable START_DATE) instead…
nlarge-google Aug 15, 2022
d7df35f
Fix: Extended start year to resolve date truncation.
nlarge-google Aug 16, 2022
4ef7b6d
Fix: Incorporating dag file for new york taxi trips.
nlarge-google Aug 16, 2022
583b203
fix: Changes to resolve resource issues.
nlarge-google Aug 17, 2022
cc03e0b
fix: Changed start dates for yellow taxi trips to 2011 and green to 2…
nlarge-google Aug 17, 2022
a71d012
Merge branch 'main' into new_york_taxi_trips
nlarge-google Aug 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -39,6 +39,7 @@ def main(
target_gcs_bucket: str,
target_gcs_path: str,
pipeline_name: str,
start_year: str,
input_headers: typing.List[str],
data_dtypes: dict,
output_headers: typing.List[str],
Expand All @@ -59,6 +60,7 @@ def main(
target_gcs_bucket,
target_gcs_path,
pipeline_name,
int(start_year),
input_headers,
data_dtypes,
output_headers,
Expand All @@ -80,11 +82,12 @@ def execute_pipeline(
target_gcs_bucket: str,
target_gcs_path: str,
pipeline_name: str,
start_year: int,
input_headers: typing.List[str],
data_dtypes: dict,
output_headers: typing.List[str],
) -> None:
for year_number in range(datetime.now().year, (datetime.now().year - 6), -1):
for year_number in range(datetime.now().year, (start_year - 1), -1):
process_year_data(
source_url=source_url,
year_number=int(year_number),
Expand Down Expand Up @@ -532,20 +535,21 @@ def upload_file_to_gcs(
logging.getLogger().setLevel(logging.INFO)

main(
source_url=os.environ["SOURCE_URL"],
source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(),
target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(),
project_id=os.environ["PROJECT_ID"],
dataset_id=os.environ["DATASET_ID"],
table_id=os.environ["TABLE_ID"],
data_file_year_field=os.environ["DATA_FILE_YEAR_FIELD"],
data_file_month_field=os.environ["DATA_FILE_MONTH_FIELD"],
schema_path=os.environ["SCHEMA_PATH"],
chunksize=os.environ["CHUNKSIZE"],
target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"],
target_gcs_path=os.environ["TARGET_GCS_PATH"],
pipeline_name=os.environ["PIPELINE_NAME"],
input_headers=json.loads(os.environ["INPUT_CSV_HEADERS"]),
data_dtypes=json.loads(os.environ["DATA_DTYPES"]),
output_headers=json.loads(os.environ["OUTPUT_CSV_HEADERS"]),
source_url=os.environ.get("SOURCE_URL", ""),
source_file=pathlib.Path(os.environ.get("SOURCE_FILE", "")).expanduser(),
target_file=pathlib.Path(os.environ.get("TARGET_FILE", "")).expanduser(),
project_id=os.environ.get("PROJECT_ID", ""),
dataset_id=os.environ.get("DATASET_ID", ""),
table_id=os.environ.get("TABLE_ID", ""),
data_file_year_field=os.environ.get("DATA_FILE_YEAR_FIELD", ""),
data_file_month_field=os.environ.get("DATA_FILE_MONTH_FIELD", ""),
schema_path=os.environ.get("SCHEMA_PATH", ""),
chunksize=os.environ.get("CHUNKSIZE", ""),
target_gcs_bucket=os.environ.get("TARGET_GCS_BUCKET", ""),
target_gcs_path=os.environ.get("TARGET_GCS_PATH", ""),
pipeline_name=os.environ.get("PIPELINE_NAME", ""),
start_year=os.environ.get("START_YEAR", "2009"),
input_headers=json.loads(os.environ.get("INPUT_CSV_HEADERS", "")),
data_dtypes=json.loads(os.environ.get("DATA_DTYPES", "")),
output_headers=json.loads(os.environ.get("OUTPUT_CSV_HEADERS", "")),
)
Expand Up @@ -37,7 +37,7 @@
location="us-central1-c",
body={
"name": "new-york-taxi-trips",
"initial_node_count": 2,
"initial_node_count": 3,
"network": "{{ var.value.vpc_network }}",
"node_config": {
"machine_type": "e2-standard-4",
Expand Down Expand Up @@ -70,10 +70,11 @@
"DATA_FILE_YEAR_FIELD": "data_file_year",
"DATA_FILE_MONTH_FIELD": "data_file_month",
"SCHEMA_PATH": "{{ var.json.new_york_taxi_trips.container_registry.green_trips_schema_path }}",
"CHUNKSIZE": "{{ var.json.new_york_taxi_trips.container_registry.green_trips_chunk_size }}",
"CHUNKSIZE": "500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "{{ var.json.new_york_taxi_trips.container_registry.green_trips_target_gcs_path }}",
"PIPELINE_NAME": "tlc_green_trips",
"START_YEAR": "2013",
"INPUT_CSV_HEADERS": '["vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code",\n "pickup_location_id", "dropoff_location_id", "passenger_count", "trip_distance", "fare_amount",\n "extra", "mta_tax", "tip_amount", "tolls_amount", "ehail_fee",\n "imp_surcharge", "total_amount", "payment_type", "trip_type", "congestion_surcharge" ]',
"DATA_DTYPES": '{ "vendor_id": "str",\n "pickup_datetime": "datetime64[ns]",\n "dropoff_datetime": "datetime64[ns]",\n "store_and_fwd_flag": "str",\n "rate_code": "str",\n "pickup_location_id": "str",\n "dropoff_location_id": "str",\n "passenger_count": "str",\n "trip_distance": "float64",\n "fare_amount": "float64",\n "extra": "float64",\n "mta_tax": "float64",\n "tip_amount": "float64",\n "tolls_amount": "float64",\n "ehail_fee": "float64",\n "imp_surcharge": "float64",\n "total_amount": "float64",\n "payment_type": "str",\n "trip_type": "str",\n "congestion_surcharge": "float64" }',
"OUTPUT_CSV_HEADERS": '[ "vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code",\n "passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax",\n "tip_amount", "tolls_amount", "ehail_fee", "total_amount", "payment_type",\n "distance_between_service", "time_between_service", "trip_type", "imp_surcharge", "pickup_location_id",\n "dropoff_location_id", "data_file_year", "data_file_month" ]',
Expand Down Expand Up @@ -106,14 +107,20 @@
"DATASET_ID": "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_dataset_id }}",
"TABLE_ID": "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_table_id }}",
"SCHEMA_PATH": "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_schema_path }}",
"CHUNKSIZE": "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_chunk_size }}",
"CHUNKSIZE": "500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_target_gcs_path }}",
"PIPELINE_NAME": "tlc_yellow_trips",
"START_YEAR": "2011",
"INPUT_CSV_HEADERS": '[ "vendor_id", "pickup_datetime", "dropoff_datetime", "passenger_count", "trip_distance",\n "rate_code", "store_and_fwd_flag", "pickup_location_id", "dropoff_location_id",\n "payment_type", "fare_amount", "extra", "mta_tax", "tip_amount",\n "tolls_amount", "imp_surcharge", "total_amount", "congestion_surcharge" ]',
"DATA_DTYPES": '{ "vendor_id": "str",\n "pickup_datetime": "datetime64[ns]",\n "dropoff_datetime": "datetime64[ns]",\n "passenger_count": "str",\n "trip_distance": "float64",\n "rate_code": "str",\n "store_and_fwd_flag": "str",\n "pickup_location_id": "str",\n "dropoff_location_id": "str",\n "payment_type": "str",\n "fare_amount": "float64",\n "extra": "float64",\n "mta_tax": "float64",\n "tip_amount": "float64",\n "tolls_amount": "float64",\n "imp_surcharge": "float64",\n "total_amount": "float64",\n "congestion_surcharge": "float64" }',
"OUTPUT_CSV_HEADERS": '[ "vendor_id", "pickup_datetime", "dropoff_datetime", "passenger_count", "trip_distance",\n "rate_code", "store_and_fwd_flag", "payment_type", "fare_amount", "extra",\n "mta_tax", "tip_amount", "tolls_amount", "imp_surcharge", "total_amount",\n "pickup_location_id", "dropoff_location_id", "data_file_year", "data_file_month" ]',
},
resources={
"request_memory": "12G",
"request_cpu": "1",
"request_ephemeral_storage": "16G",
},
)
delete_cluster = kubernetes_engine.GKEDeleteClusterOperator(
task_id="delete_cluster",
Expand Down
Expand Up @@ -41,7 +41,7 @@ dag:
location: "us-central1-c"
body:
name: new-york-taxi-trips
initial_node_count: 2
initial_node_count: 3
network: "{{ var.value.vpc_network }}"
node_config:
machine_type: e2-standard-4
Expand Down Expand Up @@ -70,10 +70,11 @@ dag:
DATA_FILE_YEAR_FIELD: "data_file_year"
DATA_FILE_MONTH_FIELD: "data_file_month"
SCHEMA_PATH: "{{ var.json.new_york_taxi_trips.container_registry.green_trips_schema_path }}"
CHUNKSIZE: "{{ var.json.new_york_taxi_trips.container_registry.green_trips_chunk_size }}"
CHUNKSIZE: "500000"
TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}"
TARGET_GCS_PATH: "{{ var.json.new_york_taxi_trips.container_registry.green_trips_target_gcs_path }}"
PIPELINE_NAME: "tlc_green_trips"
START_YEAR: "2013"
INPUT_CSV_HEADERS: >-
["vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code",
"pickup_location_id", "dropoff_location_id", "passenger_count", "trip_distance", "fare_amount",
Expand Down Expand Up @@ -132,10 +133,11 @@ dag:
DATASET_ID: "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_dataset_id }}"
TABLE_ID: "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_table_id }}"
SCHEMA_PATH: "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_schema_path }}"
CHUNKSIZE: "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_chunk_size }}"
CHUNKSIZE: "500000"
TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}"
TARGET_GCS_PATH: "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_target_gcs_path }}"
PIPELINE_NAME: "tlc_yellow_trips"
START_YEAR: "2011"
INPUT_CSV_HEADERS: >-
[ "vendor_id", "pickup_datetime", "dropoff_datetime", "passenger_count", "trip_distance",
"rate_code", "store_and_fwd_flag", "pickup_location_id", "dropoff_location_id",
Expand Down Expand Up @@ -165,6 +167,10 @@ dag:
"rate_code", "store_and_fwd_flag", "payment_type", "fare_amount", "extra",
"mta_tax", "tip_amount", "tolls_amount", "imp_surcharge", "total_amount",
"pickup_location_id", "dropoff_location_id", "data_file_year", "data_file_month" ]
resources:
request_memory: "12G"
request_cpu: "1"
request_ephemeral_storage: "16G"
- operator: "GKEDeleteClusterOperator"
args:
task_id: "delete_cluster"
Expand Down