Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add PM25_FRM_DAILY_SUMMARY Pipeline To Epa_Historical_Air_Quality Dataset #518

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ef51bf1
fix: Resolving issues as yet unresolved.
nlarge-google May 31, 2022
3f1c953
fix: black issue
nlarge-google Jun 1, 2022
1488e2a
fix: resolved datatype date not known
nlarge-google Jun 1, 2022
5f3e058
fix: submit new changes following issues in production after deployment.
nlarge-google Jun 2, 2022
2e13209
fix: miscellaneous fixes.
nlarge-google Jun 2, 2022
4e31a4b
fix: Annual summaries now works.
nlarge-google Jun 2, 2022
478e030
fix: co_hourly_summary now successful.
nlarge-google Jun 2, 2022
b0254aa
fix: black hook issues
nlarge-google Jun 2, 2022
fb4cbfc
fix: data type corrections for some fields.
nlarge-google Jun 2, 2022
c7c3d30
fix: Resolved file naming breaking issue in ozone_hourly pipeline. No…
nlarge-google Jun 3, 2022
2377622
fix: Now fixed annual summaries. Ready for AF test in dev.
nlarge-google Jun 3, 2022
8e8f285
fix: Tested full load all pipelines.
nlarge-google Jun 4, 2022
ebeb91a
fix: Replicate full load fixes into incremental load dag.
nlarge-google Jun 4, 2022
92ef531
fix: Resolved black hooks
nlarge-google Jun 6, 2022
2986dbe
Merge branch 'main' into epa_hist_air_quality
nlarge-google Jun 6, 2022
360f7f2
fix: Change to schedule
nlarge-google Jun 9, 2022
678c9cd
Merge remote-tracking branch 'origin/epa_hist_air_quality' into epa_h…
nlarge-google Jun 9, 2022
f12b18c
fix: Merging with main.
nlarge-google Oct 26, 2022
864152c
feat: Added PM25_FRM_DAILY_SUMMARY pipeline.
nlarge-google Oct 26, 2022
24a16ae
fix: Resolve JSON issue in pipeline.yaml.
nlarge-google Oct 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -42,6 +42,7 @@ def main(
pipeline_name: str,
input_csv_headers: typing.List[str],
data_dtypes: dict,
rename_headers_list: dict,
output_headers: typing.List[str],
drop_dest_table: str,
) -> None:
Expand All @@ -63,6 +64,7 @@ def main(
input_headers=input_csv_headers,
output_headers=output_headers,
data_dtypes=data_dtypes,
rename_headers_list=rename_headers_list,
chunksize=chunksize,
field_delimiter="|",
drop_dest_table=drop_dest_table,
Expand All @@ -85,6 +87,7 @@ def execute_pipeline(
input_headers: typing.List[str],
output_headers: typing.List[str],
data_dtypes: dict,
rename_headers_list: dict,
chunksize: str,
field_delimiter: str,
drop_dest_table: str = "N",
Expand Down Expand Up @@ -112,6 +115,7 @@ def execute_pipeline(
input_headers=input_headers,
output_headers=output_headers,
data_dtypes=data_dtypes,
rename_headers_list=rename_headers_list,
chunksize=chunksize,
field_delimiter=field_delimiter,
target_gcs_bucket=target_gcs_bucket,
Expand All @@ -133,6 +137,7 @@ def execute_pipeline(
input_headers=input_headers,
output_headers=output_headers,
data_dtypes=data_dtypes,
rename_headers_list=rename_headers_list,
chunksize=chunksize,
field_delimiter=field_delimiter,
target_gcs_bucket=target_gcs_bucket,
Expand All @@ -153,6 +158,7 @@ def process_year_data(
input_headers: typing.List[str],
output_headers: typing.List[str],
data_dtypes: dict,
rename_headers_list: dict,
chunksize: str,
field_delimiter: str,
target_gcs_bucket: str,
Expand All @@ -169,14 +175,14 @@ def process_year_data(
)
else:
src_url = source_url.replace("YEAR_ITERATOR", str(year))
url_file = os.path.split(src_url)[1]
url_file_csv = url_file.replace(".zip", ".csv")
url_file = os.path.split(src_url)[1].lower()
url_file_csv = url_file.replace(".zip", ".csv").lower()
source_file = f"{dest_path}/source_{url_file}"
source_file = source_file.lower()
# source_file = source_file.lower()
source_csv_file = f"{dest_path}/{url_file_csv}"
source_csv_file = source_csv_file.lower()
# source_csv_file = source_csv_file.lower()
target_file = f"{dest_path}/target_{url_file_csv}"
target_file = target_file.lower()
# target_file = target_file.lower()
file_exists = download_file_http(
source_url=src_url,
source_file=source_file,
Expand All @@ -193,6 +199,7 @@ def process_year_data(
dtypes=data_dtypes,
chunksize=chunksize,
field_delimiter=field_delimiter,
rename_headers_list=rename_headers_list,
)
load_data_to_bq(
project_id=project_id,
Expand Down Expand Up @@ -310,6 +317,7 @@ def process_source_file(
dtypes: dict,
chunksize: str,
field_delimiter: str,
rename_headers_list: dict,
) -> None:
logging.info(f"Opening batch file {source_file}")
with pd.read_csv(
Expand Down Expand Up @@ -339,6 +347,7 @@ def process_source_file(
truncate_file=(chunk_number == 0),
field_delimiter=field_delimiter,
output_headers=output_headers,
rename_headers_list=rename_headers_list,
)


Expand Down Expand Up @@ -524,7 +533,10 @@ def process_chunk(
truncate_file: bool,
field_delimiter: str,
output_headers: typing.List[str],
rename_headers_list: dict,
) -> None:
if rename_headers_list:
df = df.rename(columns=rename_headers_list)
date_fields = ["date_local", "date_of_last_change"]
df = resolve_date_format(df, date_fields, "%Y-%m-%d %H:%M:%S")
df = truncate_date_field(df, date_fields, "%Y-%m-%d %H:%M:%S")
Expand Down Expand Up @@ -687,6 +699,7 @@ def upload_file_to_gcs(
pipeline_name=os.environ.get("PIPELINE_NAME", ""),
input_csv_headers=json.loads(os.environ.get("INPUT_CSV_HEADERS", r"[]")),
data_dtypes=json.loads(os.environ.get("DATA_DTYPES", r"{}")),
rename_headers_list=json.loads(os.environ.get("RENAME_HEADERS_LIST", r"{}")),
output_headers=json.loads(os.environ.get("OUTPUT_CSV_HEADERS", r"[]")),
drop_dest_table=os.environ.get("DROP_DEST_TABLE", "N"),
)
@@ -0,0 +1,176 @@
[
{
"name": "state_code",
"type": "string",
"description": "The FIPS code of the state in which the monitor resides.",
"mode": "nullable"
},
{
"name": "county_code",
"type": "string",
"description": "The FIPS code of the county in which the monitor resides.",
"mode": "nullable"
},
{
"name": "site_num",
"type": "string",
"description": "A unique number within the county identifying the site.",
"mode": "nullable"
},
{
"name": "parameter_code",
"type": "integer",
"description": "The AQS code corresponding to the parameter measured by the monitor.",
"mode": "nullable"
},
{
"name": "poc",
"type": "integer",
"description": "This is the “Parameter Occurrence Code” used to distinguish different instruments that measure the same parameter at the same site.",
"mode": "nullable"
},
{
"name": "latitude",
"type": "float",
"description": "The monitoring site’s angular distance north of the equator measured in decimal degrees.",
"mode": "nullable"
},
{
"name": "longitude",
"type": "float",
"description": "The monitoring site’s angular distance east of the prime meridian measured in decimal degrees.",
"mode": "nullable"
},
{
"name": "datum",
"type": "string",
"description": "The Datum associated with the Latitude and Longitude measures.",
"mode": "nullable"
},
{
"name": "parameter_name",
"type": "string",
"description": "The name or description assigned in AQS to the parameter measured by the monitor. Parameters may be pollutants or non-pollutants.",
"mode": "nullable"
},
{
"name": "sample_duration",
"type": "string",
"description": "The length of time that air passes through the monitoring device before it is analyzed (measured). So, it represents an averaging period in the atmosphere (for example, a 24-hour sample duration draws ambient air over a collection filter for 24 straight hours). For continuous monitors, it can represent an averaging time of many samples (for example, a 1-hour value may be the average of four one-minute samples collected during each quarter of the hour).",
"mode": "nullable"
},
{
"name": "pollutant_standard",
"type": "string",
"description": "A description of the ambient air quality standard rules used to aggregate statistics. (See description at beginning of document.)",
"mode": "nullable"
},
{
"name": "date_local",
"type": "date",
"description": "The calendar date for the summary. All daily summaries are for the local standard day (midnight to midnight) at the monitor.",
"mode": "nullable"
},
{
"name": "units_of_measure",
"type": "string",
"description": "The unit of measure for the parameter. QAD always returns data in the standard units for the parameter. Submitters are allowed to report data in any unit and EPA converts to a standard unit so that we may use the data in calculations.",
"mode": "nullable"
},
{
"name": "event_type",
"type": "string",
"description": "Indicates whether data measured during exceptional events are included in the summary. A wildfire is an example of an exceptional event; it is something that affects air quality, but the local agency has no control over. No Events means no events occurred. Events Included means events occurred and the data from them is included in the summary. Events Excluded means that events occurred but data form them is excluded from the summary. Concurred Events Excluded means that events occurred but only EPA concurred exclusions are removed from the summary. If an event occurred for the parameter in question, the data will have multiple records for each monitor.",
"mode": "nullable"
},
{
"name": "observation_count",
"type": "integer",
"description": "The number of observations (samples) taken during the day.",
"mode": "nullable"
},
{
"name": "observation_percent",
"type": "float",
"description": "The percent representing the number of observations taken with respect to the number scheduled to be taken during the day. This is only calculated for monitors where measurements are required (e.g., only certain parameters).",
"mode": "nullable"
},
{
"name": "arithmetic_mean",
"type": "float",
"description": "The average (arithmetic mean) value for the day.",
"mode": "nullable"
},
{
"name": "first_max_value",
"type": "float",
"description": "The highest value for the day.",
"mode": "nullable"
},
{
"name": "first_max_hour",
"type": "integer",
"description": "The hour (on a 24-hour clock) when the highest value for the day (the previous field) was taken.",
"mode": "nullable"
},
{
"name": "aqi",
"type": "integer",
"description": "The Air Quality Index for the day for the pollutant, if applicable.",
"mode": "nullable"
},
{
"name": "method_code",
"type": "integer",
"description": "An internal system code indicating the method (processes, equipment, and protocols) used in gathering and measuring the sample. The method name is in the next column.",
"mode": "nullable"
},
{
"name": "method_name",
"type": "string",
"description": "A short description of the processes, equipment, and protocols used in gathering and measuring the sample.",
"mode": "nullable"
},
{
"name": "local_site_name",
"type": "string",
"description": "The name of the site (if any) given by the State, local, or tribal air pollution control agency that operates it.",
"mode": "nullable"
},
{
"name": "address",
"type": "string",
"description": "The approximate street address of the monitoring site.",
"mode": "nullable"
},
{
"name": "state_name",
"type": "string",
"description": "The name of the state where the monitoring site is located.",
"mode": "nullable"
},
{
"name": "county_name",
"type": "string",
"description": "The name of the county where the monitoring site is located.",
"mode": "nullable"
},
{
"name": "city_name",
"type": "string",
"description": "The name of the city where the monitoring site is located. This represents the legal incorporated boundaries of cities and not urban areas.",
"mode": "nullable"
},
{
"name": "cbsa_name",
"type": "string",
"description": "The name of the core bases statistical area (metropolitan area) where the monitoring site is located.",
"mode": "nullable"
},
{
"name": "date_of_last_change",
"type": "date",
"description": "The date the last time any numeric values in this record were updated in the AQS data system.",
"mode": "nullable"
}
]
@@ -1,4 +1,4 @@
# Copyright 2021 Google LLC
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,7 +27,7 @@
dag_id="epa_historical_air_quality.epa_historical_air_quality",
default_args=default_args,
max_active_runs=1,
schedule_interval="@once",
schedule_interval="0 2 * * 6",
catchup=False,
default_view="graph",
) as dag:
Expand Down Expand Up @@ -544,6 +544,40 @@
resources={"limit_memory": "16G", "limit_cpu": "2"},
)

# Run CSV transform within kubernetes pod
pm25_frm_daily_summary = kubernetes_engine.GKEStartPodOperator(
task_id="pm25_frm_daily_summary",
startup_timeout_seconds=600,
name="load_data",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="epa-hist-air-quality",
image_pull_policy="Always",
image="{{ var.json.epa_historical_air_quality.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://aqs.epa.gov/aqsweb/airdata/daily_88101_YEAR_ITERATOR.zip",
"START_YEAR": "1997",
"SOURCE_FILE": "files/pm25_frm_daily_summary_data.csv",
"PROJECT_ID": "{{ var.value.gcp_project }}",
"DATASET_ID": "epa_historical_air_quality",
"TABLE_ID": "pm25_frm_daily_summary",
"YEAR_FIELD_NAME": "date_local",
"YEAR_FIELD_TYPE": "DATE",
"SCHEMA_PATH": "data/epa_historical_air_quality/schemas/epa_pm25_frm_daily_summary_schema.json",
"CHUNKSIZE": "1500000",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/epa_historical_air_quality/pm25_frm_daily_summary/data_output.csv",
"PIPELINE_NAME": "epa_historical_air_quality - pm25_frm_daily_summaries",
"INPUT_CSV_HEADERS": '[\n "State Code", "County Code", "Site Num", "Parameter Code", "POC",\n "Latitude", "Longitude", "Datum", "Parameter Name", "Sample Duration",\n "Pollutant Standard", "Date Local", "Units of Measure", "Event Type", "Observation Count",\n "Observation Percent", "Arithmetic Mean", "1st Max Value", "1st Max Hour", "AQI",\n "Method Code", "Method Name", "Local Site Name", "Address", "State Name",\n "County Name", "City Name", "CBSA Name", "Date of Last Change"\n]',
"DATA_DTYPES": '{\n "State Code": "str", "County Code": "str", "Site Num": "str", "Parameter Code": "int32", "POC": "int32",\n "Latitude": "float64", "Longitude": "float64", "Datum": "str", "Parameter Name": "str", "Sample Duration": "str",\n "Pollutant Standard": "str", "Date Local": "str", "Units of Measure": "str", "Event Type": "str", "Observation Count": "int32",\n "Observation Percent": "float64", "Arithmetic Mean": "float64", "1st Max Value": "float64", "1st Max Hour": "int32", "AQI": "str",\n "Method Code": "str", "Method Name": "str", "Local Site Name": "str", "Address": "str", "State Name": "str",\n "County Name": "str", "City Name": "str", "CBSA Name": "str", "Date of Last Change": "str"\n}',
"RENAME_HEADERS": '{ "State Code": "state_code",\n "County Code": "county_code",\n "Site Num": "site_num",\n "Parameter Code": "parameter_code",\n "POC": "poc",\n "Latitude": "latitude",\n "Longitude": "longitude",\n "Datum": "datum",\n "Parameter Name": "parameter_name",\n "Sample Duration": "sample_duration",\n "Pollutant Standard": "pollutant_standard",\n "Date Local": "date_local",\n "Units of Measure": "units_of_measure",\n "Event Type": "event_type",\n "Observation Count": "observation_count",\n "Observation Percent": "observation_percent",\n "Arithmetic Mean": "arithmetic_mean",\n "1st Max Value": "first_max_value",\n "1st Max Hour": "first_max_hour",\n "AQI": "aqi",\n "Method Code": "method_code",\n "Method Name": "method_name",\n "Local Site Name": "local_site_name",\n "Address": "address",\n "State Name": "state_name",\n "County Name": "county_name",\n "City Name": "city_name",\n "CBSA Name": "cbsa_name",\n "Date of Last Change": "date_of_last_change"\n}',
"OUTPUT_CSV_HEADERS": '[\n "state_code",\n "county_code",\n "site_num",\n "parameter_code",\n "poc",\n "latitude",\n "longitude",\n "datum",\n "parameter_name",\n "sample_duration",\n "pollutant_standard",\n "date_local",\n "units_of_measure",\n "event_type",\n "observation_count",\n "observation_percent",\n "arithmetic_mean",\n "first_max_value",\n "first_max_hour",\n "aqi",\n "method_code",\n "method_name",\n "local_site_name",\n "address",\n "state_name",\n "county_name",\n "city_name",\n "cbsa_name",\n "date_of_last_change"\n]',
"DROP_DEST_TABLE": "N",
},
resources={"limit_memory": "16G", "limit_cpu": "2"},
)

# Run CSV transform within kubernetes pod
pm25_nonfrm_daily_summary = kubernetes_engine.GKEStartPodOperator(
task_id="pm25_nonfrm_daily_summary",
Expand Down Expand Up @@ -1096,6 +1130,7 @@
pm10_daily_summary,
pm10_hourly_summary,
pm25_frm_hourly_summary,
pm25_frm_daily_summary,
pm25_nonfrm_daily_summary,
pm25_nonfrm_hourly_summary,
pm25_speciation_daily_summary,
Expand Down