-
Notifications
You must be signed in to change notification settings - Fork 62
/
epa_historical_air_quality_full_load_dag.py
1152 lines (1114 loc) · 102 KB
/
epa_historical_air_quality_full_load_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow import DAG
from airflow.providers.google.cloud.operators import kubernetes_engine
default_args = {
"owner": "Google",
"depends_on_past": False,
"start_date": "2021-03-01",
}
with DAG(
dag_id="epa_historical_air_quality.epa_historical_air_quality_full_load",
default_args=default_args,
max_active_runs=1,
schedule_interval="@once",
catchup=False,
default_view="graph",
) as dag:
create_cluster = kubernetes_engine.GKECreateClusterOperator(
task_id="create_cluster",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
body={
"name": "epa-hist-air-quality",
"initial_node_count": 8,
"network": "{{ var.value.vpc_network }}",
"node_config": {
"machine_type": "e2-standard-16",
"oauth_scopes": [
"https://www.googleapis.com/auth/devstorage.read_write",
"https://www.googleapis.com/auth/cloud-platform",
],
},
},
)
# Run CSV transform within kubernetes pod
annual_summaries = kubernetes_engine.GKEStartPodOperator(
task_id="annual_summaries",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/annual_conc_by_monitor_YEAR_ITERATOR.zip",
"START_YEAR": "1980",
"SOURCE_FILE": "files/annual_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "air_quality_annual_summary",
"YEAR_FIELD_NAME": "year",
"YEAR_FIELD_TYPE": "INT",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_annual_summaries_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/annual_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - annual_summaries",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "metric_used", "method_name", "year", "units_of_measure",\n "event_type", "observation_count", "observation_percent", "completeness_indicator", "valid_day_count",\n "required_day_count", "exceptional_data_count", "null_data_count", "primary_exceedance_count", "secondary_exceedance_count",\n "certification_indicator", "num_obs_below_mdl", "arithmetic_mean", "arithmetic_standard_dev", "first_max_value",\n "first_max_datetime", "second_max_value", "second_max_datetime", "third_max_value", "third_max_datetime",\n "fourth_max_value", "fourth_max_datetime", "first_max_non_overlapping_value", "first_no_max_datetime", "second_max_non_overlapping_value",\n "second_no_max_datetime", "ninety_nine_percentile", "ninety_eight_percentile", "ninety_five_percentile", "ninety_percentile",\n "seventy_five_percentile", "fifty_percentile", "ten_percentile", "local_site_name", "address",\n "state_name", "county_name", "city_name", "cbsa_name", "date_of_last_change"]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "sample_duration": "str",\n "pollutant_standard": "str", "metric_used": "str", "method_name": "str", "year": "int32", "units_of_measure": "str",\n "event_type": "str", "observation_count": "int32", "observation_percent": "float64", "completeness_indicator": "str", "valid_day_count": "int32",\n "required_day_count": "int32", "exceptional_data_count": "int32", "null_data_count": "int32", "primary_exceedance_count": "str", "secondary_exceedance_count": "str",\n "certification_indicator": "str", "num_obs_below_mdl": "int32", "arithmetic_mean": "float64", "arithmetic_standard_dev": "float64", "first_max_value": "float64",\n "first_max_datetime": "datetime64[ns]", "second_max_value": "float64", "second_max_datetime": "datetime64[ns]", "third_max_value": "float64", "third_max_datetime": "datetime64[ns]",\n "fourth_max_value": "float64", "fourth_max_datetime": "datetime64[ns]", "first_max_non_overlapping_value": "float64", "first_no_max_datetime": "datetime64[ns]", "second_max_non_overlapping_value": "float64",\n "second_no_max_datetime": "datetime64[ns]", "ninety_nine_percentile": "float64", "ninety_eight_percentile": "float64", "ninety_five_percentile": "float64", "ninety_percentile": "float64",\n "seventy_five_percentile": "float64", "fifty_percentile": "float64", "ten_percentile": "float64", "local_site_name": "str", "address": "str",\n "state_name": "str", "county_name": "str", "city_name": "str", "cbsa_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "metric_used", "method_name", "year", "units_of_measure",\n "event_type", "observation_count", "observation_percent", "completeness_indicator", "valid_day_count",\n "required_day_count", "exceptional_data_count", "null_data_count", "primary_exceedance_count", "secondary_exceedance_count",\n "certification_indicator", "num_obs_below_mdl", "arithmetic_mean", "arithmetic_standard_dev", "first_max_value",\n "first_max_datetime", "second_max_value", "second_max_datetime", "third_max_value", "third_max_datetime",\n "fourth_max_value", "fourth_max_datetime", "first_max_non_overlapping_value", "first_no_max_datetime", "second_max_non_overlapping_value",\n "second_no_max_datetime", "ninety_nine_percentile", "ninety_eight_percentile", "ninety_five_percentile", "ninety_percentile",\n "seventy_five_percentile", "fifty_percentile", "ten_percentile", "local_site_name", "address",\n "state_name", "county_name", "city_name", "cbsa_name", "date_of_last_change"]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
co_daily_summary = kubernetes_engine.GKEStartPodOperator(
task_id="co_daily_summary",
startup_timeout_seconds=600,
name="load_co_daily_summary",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/daily_42101_YEAR_ITERATOR.zip",
"START_YEAR": "1990",
"SOURCE_FILE": "files/co_daily_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "co_daily_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_co_daily_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/co_daily_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - co_daily_summary",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "str", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "sample_duration": "str",\n "pollutant_standard": "str", "date_local": "str", "units_of_measure": "str", "event_type": "str", "observation_count": "int32",\n "observation_percent": "float64", "arithmetic_mean": "float64", "first_max_value": "float64", "first_max_hour": "int32", "aqi": "str",\n "method_code": "str", "method_name": "str", "local_site_name": "str", "address": "str", "state_name": "str",\n "county_name": "str", "city_name": "str", "cbsa_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
co_hourly_summary = kubernetes_engine.GKEStartPodOperator(
task_id="co_hourly_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/hourly_42101_YEAR_ITERATOR.zip",
"START_YEAR": "1990",
"SOURCE_FILE": "files/co_hourly_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "co_hourly_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_co_hourly_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/co_hourly_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - co_hourly_summaries",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code",\n "method_name", "state_name", "county_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "str", "longitude": "str", "datum": "str", "parameter_name": "str", "date_local": "str", "time_local": "str",\n "date_gmt": "datetime64[ns]", "time_gmt": "str", "sample_measurement": "str", "units_of_measure": "str",\n "mdl": "float64", "uncertainty": "str", "qualifier": "str", "method_type": "str", "method_code": "str",\n "method_name": "str", "state_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code",\n "method_name", "state_name", "county_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
hap_daily_summary = kubernetes_engine.GKEStartPodOperator(
task_id="hap_daily_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/daily_HAPS_YEAR_ITERATOR.zip",
"START_YEAR": "1990",
"SOURCE_FILE": "files/hap_daily_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "hap_daily_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_hap_daily_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/hap_daily_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - hap_daily_summary",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "sample_duration": "str",\n "pollutant_standard": "str", "date_local": "str", "units_of_measure": "str", "event_type": "str", "observation_count": "int32",\n "observation_percent": "float64", "arithmetic_mean": "float64", "first_max_value": "float64", "first_max_hour": "int32", "aqi": "str",\n "method_code": "str", "method_name": "str", "local_site_name": "str", "address": "str", "state_name": "str",\n "county_name": "str", "city_name": "str", "cbsa_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
hap_hourly_summary = kubernetes_engine.GKEStartPodOperator(
task_id="hap_hourly_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/hourly_HAPS_YEAR_ITERATOR.zip",
"START_YEAR": "1990",
"SOURCE_FILE": "files/hap_hourly_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "hap_hourly_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_hap_hourly_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/hap_hourly_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - hap_hourly_summaries",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "date_local": "str",\n "time_local": "str", "date_gmt": "datetime64[ns]", "time_gmt": "str", "sample_measurement": "float64", "units_of_measure": "str",\n "mdl": "float64", "uncertainty": "float64", "qualifier": "str", "method_type": "str", "method_code": "int32", "method_name": "str",\n "state_name": "str", "county_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
lead_daily_summary = kubernetes_engine.GKEStartPodOperator(
task_id="lead_daily_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/daily_LEAD_YEAR_ITERATOR.zip",
"START_YEAR": "1980",
"SOURCE_FILE": "files/lead_daily_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "lead_daily_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_lead_daily_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/lead_daily_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - lead_daily_summaries",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "sample_duration": "str",\n "pollutant_standard": "str", "date_local": "str", "units_of_measure": "str", "event_type": "str", "observation_count": "int32",\n "observation_percent": "float64", "arithmetic_mean": "float64", "first_max_value": "float64", "first_max_hour": "int32", "aqi": "str",\n "method_code": "str", "method_name": "str", "local_site_name": "str", "address": "str", "state_name": "str",\n "county_name": "str", "city_name": "str", "cbsa_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
no2_daily_summary = kubernetes_engine.GKEStartPodOperator(
task_id="no2_daily_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/daily_42602_YEAR_ITERATOR.zip",
"START_YEAR": "1990",
"SOURCE_FILE": "files/no2_daily_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "no2_daily_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_no2_daily_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/no2_daily_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - no2_daily_summaries",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "sample_duration": "str",\n "pollutant_standard": "str", "date_local": "str", "units_of_measure": "str", "event_type": "str", "observation_count": "int32",\n "observation_percent": "float64", "arithmetic_mean": "float64", "first_max_value": "float64", "first_max_hour": "int32", "aqi": "str",\n "method_code": "str", "method_name": "str", "local_site_name": "str", "address": "str", "state_name": "str",\n "county_name": "str", "city_name": "str", "cbsa_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
no2_hourly_summary = kubernetes_engine.GKEStartPodOperator(
task_id="no2_hourly_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/hourly_42602_YEAR_ITERATOR.zip",
"START_YEAR": "1990",
"SOURCE_FILE": "files/no2_hourly_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "no2_hourly_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_no2_hourly_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/no2_hourly_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - no2_hourly",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "date_local": "str",\n "time_local": "str", "date_gmt": "datetime64[ns]", "time_gmt": "str", "sample_measurement": "float64", "units_of_measure": "str",\n "mdl": "float64", "uncertainty": "float64", "qualifier": "str", "method_type": "str", "method_code": "int32", "method_name": "str",\n "state_name": "str", "county_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
nonoxnoy_daily_summary = kubernetes_engine.GKEStartPodOperator(
task_id="nonoxnoy_daily_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/daily_NONOxNOy_YEAR_ITERATOR.zip",
"START_YEAR": "1990",
"SOURCE_FILE": "files/nonoxnoy_daily_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "nonoxnoy_daily_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_nonoxnoy_daily_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/nonoxnoy_daily_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - nonoxnoy_daily",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "sample_duration": "str",\n "pollutant_standard": "str", "date_local": "str", "units_of_measure": "str", "event_type": "str", "observation_count": "int32",\n "observation_percent": "float64", "arithmetic_mean": "float64", "first_max_value": "float64", "first_max_hour": "int32", "aqi": "str",\n "method_code": "str", "method_name": "str", "local_site_name": "str", "address": "str", "state_name": "str",\n "county_name": "str", "city_name": "str", "cbsa_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
nonoxnoy_hourly_summary = kubernetes_engine.GKEStartPodOperator(
task_id="nonoxnoy_hourly_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/hourly_NONOxNOy_YEAR_ITERATOR.zip",
"START_YEAR": "1990",
"SOURCE_FILE": "files/nonoxnoy_hourly_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "nonoxnoy_hourly_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_nonoxnoy_hourly_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/nonoxnoy_hourly_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - nonoxnoy_hourly",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "date_local": "str",\n "time_local": "str", "date_gmt": "datetime64[ns]", "time_gmt": "str", "sample_measurement": "float64", "units_of_measure": "str",\n "mdl": "float64", "uncertainty": "float64", "qualifier": "str", "method_type": "str", "method_code": "int32", "method_name": "str",\n "state_name": "str", "county_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
ozone_daily_summary = kubernetes_engine.GKEStartPodOperator(
task_id="ozone_daily_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/daily_44201_YEAR_ITERATOR.zip",
"START_YEAR": "1990",
"SOURCE_FILE": "files/o3_daily_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "o3_daily_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_ozone_daily_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/ozone_daily_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - ozone_daily_summary",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "sample_duration": "str",\n "pollutant_standard": "str", "date_local": "str", "units_of_measure": "str", "event_type": "str", "observation_count": "int32",\n "observation_percent": "float64", "arithmetic_mean": "float64", "first_max_value": "float64", "first_max_hour": "int32", "aqi": "str",\n "method_code": "str", "method_name": "str", "local_site_name": "str", "address": "str", "state_name": "str",\n "county_name": "str", "city_name": "str", "cbsa_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
ozone_hourly_summary = kubernetes_engine.GKEStartPodOperator(
task_id="ozone_hourly_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/hourly_44201_YEAR_ITERATOR.zip",
"START_YEAR": "1980",
"SOURCE_FILE": "files/o3_hourly_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "o3_hourly_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_ozone_hourly_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/ozone_hourly_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - ozone_hourly_summary",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "date_local": "str",\n "time_local": "str", "date_gmt": "datetime64[ns]", "time_gmt": "str", "sample_measurement": "float64", "units_of_measure": "str",\n "mdl": "float64", "uncertainty": "float64", "qualifier": "str", "method_type": "str", "method_code": "int32", "method_name": "str",\n "state_name": "str", "county_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
pm10_daily_summary = kubernetes_engine.GKEStartPodOperator(
task_id="pm10_daily_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/daily_81102_YEAR_ITERATOR.zip",
"START_YEAR": "1990",
"SOURCE_FILE": "files/pm10_daily_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "pm10_daily_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_pm10_daily_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/pm10_daily_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - pm10_daily_summaries",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "sample_duration": "str",\n "pollutant_standard": "str", "date_local": "str", "units_of_measure": "str", "event_type": "str", "observation_count": "int32",\n "observation_percent": "float64", "arithmetic_mean": "float64", "first_max_value": "float64", "first_max_hour": "int32", "aqi": "str",\n "method_code": "str", "method_name": "str", "local_site_name": "str", "address": "str", "state_name": "str",\n "county_name": "str", "city_name": "str", "cbsa_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
pm10_hourly_summary = kubernetes_engine.GKEStartPodOperator(
task_id="pm10_hourly_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/hourly_81102_YEAR_ITERATOR.zip",
"START_YEAR": "1990",
"SOURCE_FILE": "files/pm10_hourly_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "pm10_hourly_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_pm10_hourly_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/pm10_hourly_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - pm10_hourly_summaries",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "date_local": "str",\n "time_local": "str", "date_gmt": "datetime64[ns]", "time_gmt": "str", "sample_measurement": "float64", "units_of_measure": "str",\n "mdl": "float64", "uncertainty": "float64", "qualifier": "str", "method_type": "str", "method_code": "int32", "method_name": "str",\n "state_name": "str", "county_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
pm25_frm_hourly_summary = kubernetes_engine.GKEStartPodOperator(
task_id="pm25_frm_hourly_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/hourly_88101_YEAR_ITERATOR.zip",
"START_YEAR": "1980",
"SOURCE_FILE": "files/pm25_frm_hourly_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "pm25_frm_hourly_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_pm25_frm_hourly_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/pm25_frm_hourly_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - pm25_frm_hourly_summaries",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "date_local": "str",\n "time_local": "str", "date_gmt": "datetime64[ns]", "time_gmt": "str", "sample_measurement": "float64", "units_of_measure": "str",\n "mdl": "float64", "uncertainty": "float64", "qualifier": "str", "method_type": "str", "method_code": "int32", "method_name": "str",\n "state_name": "str", "county_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
pm25_frm_daily_summary = kubernetes_engine.GKEStartPodOperator(
task_id="pm25_frm_daily_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/daily_88101_YEAR_ITERATOR.zip",
"START_YEAR": "1997",
"SOURCE_FILE": "files/pm25_frm_daily_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "pm25_frm_daily_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_pm25_frm_daily_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/pm25_frm_daily_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - pm25_frm_daily_summaries",
"INPUT_CSV_HEADERS": '[\n "State Code", "County Code", "Site Num", "Parameter Code", "POC",\n "Latitude", "Longitude", "Datum", "Parameter Name", "Sample Duration",\n "Pollutant Standard", "Date Local", "Units of Measure", "Event Type", "Observation Count",\n "Observation Percent", "Arithmetic Mean", "1st Max Value", "1st Max Hour", "AQI",\n "Method Code", "Method Name", "Local Site Name", "Address", "State Name",\n "County Name", "City Name", "CBSA Name", "Date of Last Change"\n]',
"DATA_DTYPES": '{\n "State Code": "str", "County Code": "str", "Site Num": "str", "Parameter Code": "int32", "POC": "int32",\n "Latitude": "float64", "Longitude": "float64", "Datum": "str", "Parameter Name": "str", "Sample Duration": "str",\n "Pollutant Standard": "str", "Date Local": "str", "Units of Measure": "str", "Event Type": "str", "Observation Count": "int32",\n "Observation Percent": "float64", "Arithmetic Mean": "float64", "1st Max Value": "float64", "1st Max Hour": "int32", "AQI": "str",\n "Method Code": "str", "Method Name": "str", "Local Site Name": "str", "Address": "str", "State Name": "str",\n "County Name": "str", "City Name": "str", "CBSA Name": "str", "Date of Last Change": "str"\n}',
"RENAME_HEADERS_LIST": '{ "State Code": "state_code",\n "County Code": "county_code",\n "Site Num": "site_num",\n "Parameter Code": "parameter_code",\n "POC": "poc",\n "Latitude": "latitude",\n "Longitude": "longitude",\n "Datum": "datum",\n "Parameter Name": "parameter_name",\n "Sample Duration": "sample_duration",\n "Pollutant Standard": "pollutant_standard",\n "Date Local": "date_local",\n "Units of Measure": "units_of_measure",\n "Event Type": "event_type",\n "Observation Count": "observation_count",\n "Observation Percent": "observation_percent",\n "Arithmetic Mean": "arithmetic_mean",\n "1st Max Value": "first_max_value",\n "1st Max Hour": "first_max_hour",\n "AQI": "aqi",\n "Method Code": "method_code",\n "Method Name": "method_name",\n "Local Site Name": "local_site_name",\n "Address": "address",\n "State Name": "state_name",\n "County Name": "county_name",\n "City Name": "city_name",\n "CBSA Name": "cbsa_name",\n "Date of Last Change": "date_of_last_change"\n}',
"OUTPUT_CSV_HEADERS": '[\n "state_code",\n "county_code",\n "site_num",\n "parameter_code",\n "poc",\n "latitude",\n "longitude",\n "datum",\n "parameter_name",\n "sample_duration",\n "pollutant_standard",\n "date_local",\n "units_of_measure",\n "event_type",\n "observation_count",\n "observation_percent",\n "arithmetic_mean",\n "first_max_value",\n "first_max_hour",\n "aqi",\n "method_code",\n "method_name",\n "local_site_name",\n "address",\n "state_name",\n "county_name",\n "city_name",\n "cbsa_name",\n "date_of_last_change"\n]',
"DROP_DEST_TABLE": "N",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
pm25_nonfrm_daily_summary = kubernetes_engine.GKEStartPodOperator(
task_id="pm25_nonfrm_daily_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/daily_88502_YEAR_ITERATOR.zip",
"START_YEAR": "1980",
"SOURCE_FILE": "files/pm25_nonfrm_daily_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "pm25_nonfrm_daily_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_pm25_nonfrm_daily_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/pm25_nonfrm_daily_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - pm25_nonfrm_daily_summaries",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "sample_duration": "str",\n "pollutant_standard": "str", "date_local": "str", "units_of_measure": "str", "event_type": "str", "observation_count": "int32",\n "observation_percent": "float64", "arithmetic_mean": "float64", "first_max_value": "float64", "first_max_hour": "int32", "aqi": "str",\n "method_code": "str", "method_name": "str", "local_site_name": "str", "address": "str", "state_name": "str",\n "county_name": "str", "city_name": "str", "cbsa_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
pm25_nonfrm_hourly_summary = kubernetes_engine.GKEStartPodOperator(
task_id="pm25_nonfrm_hourly_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/hourly_88502_YEAR_ITERATOR.zip",
"START_YEAR": "1990",
"SOURCE_FILE": "files/pm25_nonfrm_hourly_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "pm25_nonfrm_hourly_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_pm25_nonfrm_hourly_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/pm25_nonfrm_hourly_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - pm25_nonfrm_hourly_summaries",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "date_local": "str",\n "time_local": "str", "date_gmt": "datetime64[ns]", "time_gmt": "str", "sample_measurement": "float64", "units_of_measure": "str",\n "mdl": "float64", "uncertainty": "float64", "qualifier": "str", "method_type": "str", "method_code": "int32", "method_name": "str",\n "state_name": "str", "county_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
pm25_speciation_daily_summary = kubernetes_engine.GKEStartPodOperator(
task_id="pm25_speciation_daily_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/daily_SPEC_YEAR_ITERATOR.zip",
"START_YEAR": "1990",
"SOURCE_FILE": "files/pm25_speciation_daily_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "pm25_speciation_daily_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_pm25_speciation_daily_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/pm25_speciation_daily_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - pm25_speciation_daily_summaries",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "sample_duration": "str",\n "pollutant_standard": "str", "date_local": "str", "units_of_measure": "str", "event_type": "str", "observation_count": "int32",\n "observation_percent": "float64", "arithmetic_mean": "float64", "first_max_value": "float64", "first_max_hour": "int32", "aqi": "str",\n "method_code": "str", "method_name": "str", "local_site_name": "str", "address": "str", "state_name": "str",\n "county_name": "str", "city_name": "str", "cbsa_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
pm25_speciation_hourly_summary = kubernetes_engine.GKEStartPodOperator(
task_id="pm25_speciation_hourly_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/hourly_SPEC_YEAR_ITERATOR.zip",
"START_YEAR": "1990",
"SOURCE_FILE": "files/pm25_speciation_hourly_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "pm25_speciation_hourly_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_pm25_speciation_hourly_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/pm25_speciation_hourly_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - pm25_speciation_hourly_summary",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "date_local": "str",\n "time_local": "str", "date_gmt": "datetime64[ns]", "time_gmt": "str", "sample_measurement": "float64", "units_of_measure": "str",\n "mdl": "float64", "uncertainty": "float64", "qualifier": "str", "method_type": "str", "method_code": "int32", "method_name": "str",\n "state_name": "str", "county_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
pressure_daily_summary = kubernetes_engine.GKEStartPodOperator(
task_id="pressure_daily_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/daily_PRESS_YEAR_ITERATOR.zip",
"START_YEAR": "1990",
"SOURCE_FILE": "files/pressure_daily_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "pressure_daily_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_pressure_daily_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/pressure_daily_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - pressure_daily_summaries",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "sample_duration": "str",\n "pollutant_standard": "str", "date_local": "str", "units_of_measure": "str", "event_type": "str", "observation_count": "int32",\n "observation_percent": "float64", "arithmetic_mean": "float64", "first_max_value": "float64", "first_max_hour": "int32", "aqi": "str",\n "method_code": "str", "method_name": "str", "local_site_name": "str", "address": "str", "state_name": "str",\n "county_name": "str", "city_name": "str", "cbsa_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
pressure_hourly_summary = kubernetes_engine.GKEStartPodOperator(
task_id="pressure_hourly_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/hourly_PRESS_YEAR_ITERATOR.zip",
"START_YEAR": "1990",
"SOURCE_FILE": "files/pressure_hourly_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "pressure_hourly_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_pressure_hourly_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/pressure_hourly_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - pressure_hourly_summary",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "date_local": "str",\n "time_local": "str", "date_gmt": "datetime64[ns]", "time_gmt": "str", "sample_measurement": "float64", "units_of_measure": "str",\n "mdl": "float64", "uncertainty": "float64", "qualifier": "str", "method_type": "str", "method_code": "int32", "method_name": "str",\n "state_name": "str", "county_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
rh_and_dp_daily_summary = kubernetes_engine.GKEStartPodOperator(
task_id="rh_and_dp_daily_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/daily_RH_DP_YEAR_ITERATOR.zip",
"START_YEAR": "1990",
"SOURCE_FILE": "files/rh_and_dp_daily_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "rh_and_dp_daily_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_rh_and_dp_daily_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/rh_and_dp_daily_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - rh_and_dp_daily_summaries",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "sample_duration": "str",\n "pollutant_standard": "str", "date_local": "str", "units_of_measure": "str", "event_type": "str", "observation_count": "int32",\n "observation_percent": "float64", "arithmetic_mean": "float64", "first_max_value": "float64", "first_max_hour": "int32", "aqi": "str",\n "method_code": "str", "method_name": "str", "local_site_name": "str", "address": "str", "state_name": "str",\n "county_name": "str", "city_name": "str", "cbsa_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
rh_and_dp_hourly_summary = kubernetes_engine.GKEStartPodOperator(
task_id="rh_and_dp_hourly_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/hourly_RH_DP_YEAR_ITERATOR.zip",
"START_YEAR": "1990",
"SOURCE_FILE": "files/rh_and_dp_hourly_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "rh_and_dp_hourly_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_rh_and_dp_hourly_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/rh_and_dp_hourly_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - rh_and_dp_hourly_summary",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "date_local": "str",\n "time_local": "str", "date_gmt": "datetime64[ns]", "time_gmt": "str", "sample_measurement": "float64", "units_of_measure": "str",\n "mdl": "float64", "uncertainty": "float64", "qualifier": "str", "method_type": "str", "method_code": "int32", "method_name": "str",\n "state_name": "str", "county_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
so2_daily_summary = kubernetes_engine.GKEStartPodOperator(
task_id="so2_daily_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/daily_42401_YEAR_ITERATOR.zip",
"START_YEAR": "1990",
"SOURCE_FILE": "files/so2_daily_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "so2_daily_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_so2_daily_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/so2_daily_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - so2_daily_summary",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "sample_duration": "str",\n "pollutant_standard": "str", "date_local": "str", "units_of_measure": "str", "event_type": "str", "observation_count": "int32",\n "observation_percent": "float64", "arithmetic_mean": "float64", "first_max_value": "float64", "first_max_hour": "int32", "aqi": "str",\n "method_code": "str", "method_name": "str", "local_site_name": "str", "address": "str", "state_name": "str",\n "county_name": "str", "city_name": "str", "cbsa_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
so2_hourly_summary = kubernetes_engine.GKEStartPodOperator(
task_id="so2_hourly_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/hourly_42401_YEAR_ITERATOR.zip",
"START_YEAR": "1990",
"SOURCE_FILE": "files/so2_hourly_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "so2_hourly_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_so2_hourly_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/so2_hourly_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - so2_hourly_summary",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "date_local": "str",\n "time_local": "str", "date_gmt": "datetime64[ns]", "time_gmt": "str", "sample_measurement": "float64", "units_of_measure": "str",\n "mdl": "float64", "uncertainty": "float64", "qualifier": "str", "method_type": "str", "method_code": "int32", "method_name": "str",\n "state_name": "str", "county_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
temperature_daily_summary = kubernetes_engine.GKEStartPodOperator(
task_id="temperature_daily_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/daily_TEMP_YEAR_ITERATOR.zip",
"START_YEAR": "1990",
"SOURCE_FILE": "files/temperature_daily_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "temperature_daily_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_temperature_daily_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/temperature_daily_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - temperature_daily_summary",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "sample_duration": "str",\n "pollutant_standard": "str", "date_local": "str", "units_of_measure": "str", "event_type": "str", "observation_count": "int32",\n "observation_percent": "float64", "arithmetic_mean": "float64", "first_max_value": "float64", "first_max_hour": "int32", "aqi": "str",\n "method_code": "str", "method_name": "str", "local_site_name": "str", "address": "str", "state_name": "str",\n "county_name": "str", "city_name": "str", "cbsa_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "sample_duration",\n "pollutant_standard", "date_local", "units_of_measure", "event_type", "observation_count",\n "observation_percent", "arithmetic_mean", "first_max_value", "first_max_hour", "aqi",\n "method_code", "method_name", "local_site_name", "address", "state_name",\n "county_name", "city_name", "cbsa_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
temperature_hourly_summary = kubernetes_engine.GKEStartPodOperator(
task_id="temperature_hourly_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/hourly_TEMP_YEAR_ITERATOR.zip",
"START_YEAR": "1990",
"SOURCE_FILE": "files/temperature_hourly_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "temperature_hourly_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_temperature_hourly_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/temperature_hourly_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - temperature_hourly_summary",
"INPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DATA_DTYPES": '{ "state_code": "str", "county_code": "str", "site_num": "str", "parameter_code": "int32", "poc": "int32",\n "latitude": "float64", "longitude": "float64", "datum": "str", "parameter_name": "str", "date_local": "str",\n "time_local": "str", "date_gmt": "datetime64[ns]", "time_gmt": "str", "sample_measurement": "float64", "units_of_measure": "str",\n "mdl": "float64", "uncertainty": "float64", "qualifier": "str", "method_type": "str", "method_code": "int32", "method_name": "str",\n "state_name": "str", "county_name": "str", "date_of_last_change": "str" }',
"OUTPUT_CSV_HEADERS": '[ "state_code", "county_code", "site_num", "parameter_code", "poc",\n "latitude", "longitude", "datum", "parameter_name", "date_local",\n "time_local", "date_gmt", "time_gmt", "sample_measurement", "units_of_measure",\n "mdl", "uncertainty", "qualifier", "method_type", "method_code", "method_name",\n "state_name", "county_name", "date_of_last_change" ]',
"DROP_DEST_TABLE": "Y",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)
# Run CSV transform within kubernetes pod
voc_daily_summary = kubernetes_engine.GKEStartPodOperator(
task_id="voc_daily_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/daily_VOCS_YEAR_ITERATOR.zip",
"START_YEAR": "1990",
"SOURCE_FILE": "files/voc_daily_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "voc_daily_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_voc_daily_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/voc_daily_summary/data_output.csv",