diff --git a/datasets/noaa/pipelines/_images/noaa_historic_severe_storms_schema.json b/datasets/noaa/pipelines/_images/noaa_historic_severe_storms_schema.json new file mode 100644 index 000000000..ced1e6c88 --- /dev/null +++ b/datasets/noaa/pipelines/_images/noaa_historic_severe_storms_schema.json @@ -0,0 +1,200 @@ +[ + { + "name":"episode_id", + "type":"string", + "description":"ID assigned by NWS to denote the storm episode; links the event details file with the information within location file", + "mode":"nullable" + }, + { + "name":"event_id", + "type":"string", + "description":"ID assigned by NWS to note a single, small part that goes into a specific storm episode; links the storm episode between the three files downloaded from SPC’s website", + "mode":"nullable" + }, + { + "name":"state", + "type":"string", + "description":"The full text state name where the event occurred", + "mode":"nullable" + }, + { + "name":"state_fips_code", + "type":"string", + "description":"Unique FIPS code identifier assigned to each state.   State names and their corresponding FIPS codes are available as a BigQuery Public Dataset: bigquery-public-data.census_fips_codes.states_2016  The geographic polygons that define the perimeter of each state are available as a BigQuery Public Dataset: bigquery-public-data.geo_us_boundaries.us_states", + "mode":"nullable" + }, + { + "name":"event_type", + "type":"string", + "description":"The only events permitted in Storm Data are listed in Table 1 of Section 2.1.1 of NWS Directive 10-1605 at http://www.nws.noaa.gov/directives/sym/pd01016005curr.pdf. The chosen event type is the one that most accurately describes the meteorological event leading to fatalities, injuries, damage, etc. However, significant events, such as tornadoes, having no impact or causing no damage, are also included in Storm Data.", + "mode":"nullable" + }, + { + "name":"cz_type", + "type":"string", + "description":"Indicates whether the event happened in   - C: County/Parish  - Z: NWS zone  - M: Marine", + "mode":"nullable" + }, + { + "name":"cz_fips_code", + "type":"string", + "description":"Unique FIPS code identifier assigned to each county.   State names and their corresponding FIPS codes are available as a BigQuery Public Dataset: bigquery-public-data.census_fips_codes.counties_2016  The geographic polygons that define the perimeter of each state are available as a BigQuery Public Dataset: bigquery-public-data.geo_us_boundaries.us_counties", + "mode":"nullable" + }, + { + "name":"cz_name", + "type":"string", + "description":"(County/Parish, Zone or Marine Name assigned to the county FIPS number or NWS Forecast Zone  NWS Forecast Zones are available as a BigQuery Public Dataset: bigquery-public-data.noaa_historic_severe_storms.nws_forecast_zones", + "mode":"nullable" + }, + { + "name":"wfo", + "type":"string", + "description":"National Weather Service Forecast Office’s area of responsibility (County Warning Area) in which the event occurred", + "mode":"nullable" + }, + { + "name":"event_begin_time", + "type":"datetime", + "description":"The date and time that the event began. Note that episodes and events may have different start and end times if multiple events occured in the same episode", + "mode":"nullable" + }, + { + "name":"event_timezone", + "type":"string", + "description":"The time zone in which the event_begin_time and the event_end_time is recorded.", + "mode":"nullable" + }, + { + "name":"event_end_time", + "type":"datetime", + "description":"The date and time that the event ended. Note that episodes and events may have different start and end times if multiple events occured in the same episode", + "mode":"nullable" + }, + { + "name":"injuries_direct", + "type":"integer", + "description":"The number of injuries directly related to the weather event", + "mode":"nullable" + }, + { + "name":"injuries_indirect", + "type":"integer", + "description":"The number of injuries indirectly related to the weather event", + "mode":"nullable" + }, + { + "name":"deaths_direct", + "type":"integer", + "description":"The number of deathes directly related to the weather event", + "mode":"nullable" + }, + { + "name":"deaths_indirect", + "type":"integer", + "description":"The number of deathes indirectly related to the weather event", + "mode":"nullable" + }, + { + "name":"damage_property", + "type":"integer", + "description":"The estimated amount of damage to property incurred by the weather event, in USD at the time of the event. Values are not adjusted for inflation  Note: Values listed as 0 do not necessarily mean that no property damage occurred as a result of the event", + "mode":"nullable" + }, + { + "name":"damage_crops", + "type":"integer", + "description":"The estimated amount of damage to crops incurred by the weather event, in USD at the time of the storm. Values are not adjusted for inflation  Note: Values listed as 0 do not necessarily mean that no property damage occurred as a result of the event", + "mode":"nullable" + }, + { + "name":"source", + "type":"string", + "description":"Source reporting the weather event  Note: This can be any entry. Values are not restricted to specific categories", + "mode":"nullable" + }, + { + "name":"magnitude", + "type":"float", + "description":"Measured extent of the magnitude type. This is only used for wind speeds and hail size.   Wind speeds are in MPH; Hail sizes are in inches", + "mode":"nullable" + }, + { + "name":"magnitude_type", + "type":"string", + "description":"Differentiates between the type of mangitude measured.    - EG = Wind Estimated Gust  - ES = Estimated Sustained Wind  - MS = Measured Sustained Wind  - MG = Measured Wind Gust  No magnitude type is included for hail", + "mode":"nullable" + }, + { + "name":"flood_cause", + "type":"string", + "description":"Reported or estimated cause of the flood", + "mode":"nullable" + }, + { + "name":"tor_f_scale", + "type":"string", + "description":"Enhanced Fujita Scale describes the strength of the tornado based on the amount and type of damage caused by the tornado. The F-scale of damage will vary in the destruction area; therefore, the highest value of the F-scale is recorded for each event.    - EF0 – Light Damage (40 – 72 mph)   - EF1 – Moderate Damage (73 – 112 mph)   - EF2 – Significant damage (113 – 157 mph)   - EF3 – Severe Damage (158 – 206 mph)   - EF4 – Devastating Damage (207 – 260 mph)   - EF5 – Incredible Damage (261 – 318 mph)", + "mode":"nullable" + }, + { + "name":"tor_length", + "type":"string", + "description":"Length of the tornado or tornado segment while on the ground (minimal of tenths of miles)", + "mode":"nullable" + }, + { + "name":"tor_width", + "type":"string", + "description":"Width of the tornado or tornado segment while on the ground (in feet)", + "mode":"nullable" + }, + { + "name":"tor_other_wfo", + "type":"string", + "description":"Indicates the continuation of a tornado segment as it crossed from one National Weather Service Forecast Office to another. The subsequent WFO identifier is provided within this field.", + "mode":"nullable" + }, + { + "name":"location_index", + "type":"string", + "description":"Number assigned by NWS to specific locations within the same Storm event. Each event’s sequentially increasing location index number will have a corresponding lat/lon point", + "mode":"nullable" + }, + { + "name":"event_range", + "type":"float", + "description":"A hydro-meteorological event will be referenced, minimally, to the nearest tenth of a mile, to the geographical center (not from the village/city boundaries or limits) of a particular village/city, airport, or inland lake, providing that the reference point is documented in the Storm Data software location database.", + "mode":"nullable" + }, + { + "name":"event_azimuth", + "type":"string", + "description":"16-point compass direction from a particular village/city, airport, or inland lake, providing that the reference point is documented in the Storm Data software location database of > 130,000 locations.", + "mode":"nullable" + }, + { + "name":"reference_location", + "type":"string", + "description":"Reference location of the center from which the range is calculated and the azimuth is determined", + "mode":"nullable" + }, + { + "name":"event_latitude", + "type":"float", + "description":"The latitude where the event occurred (rounded to the hundredths in decimal degrees; includes an ‘-‘ if it’s S of the Equator)", + "mode":"nullable" + }, + { + "name":"event_longitude", + "type":"float", + "description":"The longitude where the event occurred (rounded to the hundredths in decimal degrees; includes an ‘-‘ if it’s W of the Prime Meridian)", + "mode":"nullable" + }, + { + "name":"event_point", + "type":"geography", + "description":"Geographic representation of the event_longitude and latitude", + "mode":"nullable" + } +] diff --git a/datasets/noaa/pipelines/_images/run_csv_transform_kub/csv_transform.py b/datasets/noaa/pipelines/_images/run_csv_transform_kub/csv_transform.py index 852be8a07..cb44d9c58 100644 --- a/datasets/noaa/pipelines/_images/run_csv_transform_kub/csv_transform.py +++ b/datasets/noaa/pipelines/_images/run_csv_transform_kub/csv_transform.py @@ -25,16 +25,18 @@ import typing from urllib.request import Request, urlopen +import numpy as np import pandas as pd import requests from bs4 import BeautifulSoup from google.api_core.exceptions import NotFound from google.cloud import bigquery, storage +from sh import sed def main( pipeline_name: str, - source_url: str, + source_url: dict, source_file: pathlib.Path, target_file: pathlib.Path, chunksize: str, @@ -107,7 +109,7 @@ def main( def execute_pipeline( pipeline_name: str, - source_url: str, + source_url: dict, source_file: pathlib.Path, target_file: pathlib.Path, chunksize: str, @@ -151,7 +153,9 @@ def execute_pipeline( source_file_unzipped = str.replace(str(source_zipfile), ".csv.gz", ".csv") target_file_year = str.replace(str(target_file), ".csv", f"_{yr_str}.csv") destination_table_year = f"{destination_table}_{yr_str}" - source_url_year = str.replace(source_url, ".csv.gz", f"{yr_str}.csv.gz") + source_url_year = str.replace( + source_url["ghcnd_by_year"], ".csv.gz", f"{yr_str}.csv.gz" + ) target_gcs_path_year = str.replace( target_gcs_path, ".csv", f"_{yr_str}.csv" ) @@ -198,6 +202,7 @@ def execute_pipeline( int_date_list=int_date_list, gen_location_list=gen_location_list, ) + return None if pipeline_name in [ "GHCND countries", "GHCND inventory", @@ -205,8 +210,9 @@ def execute_pipeline( "GHCND stations", "GSOD stations", ]: - ftp_filename = os.path.split(source_url)[1] - download_file_ftp(ftp_host, ftp_dir, ftp_filename, source_file, source_url) + src_url = source_url[pipeline_name.replace(" ", "_").lower()] + ftp_filename = os.path.split(src_url)[1] + download_file_ftp(ftp_host, ftp_dir, ftp_filename, source_file, src_url) if number_of_header_rows > 0: remove_header_rows(source_file, number_of_header_rows=number_of_header_rows) else: @@ -215,7 +221,7 @@ def execute_pipeline( source_file=source_file, target_file=target_file, pipeline_name=pipeline_name, - source_url=source_url, + source_url=src_url, chunksize=chunksize, project_id=project_id, dataset_id=dataset_id, @@ -238,8 +244,10 @@ def execute_pipeline( int_date_list=int_date_list, gen_location_list=gen_location_list, ) + return None if pipeline_name == "GHCND hurricanes": - download_file(source_url, source_file) + src_url = source_url[pipeline_name.replace(" ", "_").lower()] + download_file(src_url, source_file) if number_of_header_rows > 0: remove_header_rows(source_file, number_of_header_rows=number_of_header_rows) else: @@ -248,7 +256,7 @@ def execute_pipeline( source_file=source_file, target_file=target_file, pipeline_name=pipeline_name, - source_url=source_url, + source_url=src_url, chunksize=chunksize, project_id=project_id, dataset_id=dataset_id, @@ -271,73 +279,458 @@ def execute_pipeline( int_date_list=int_date_list, gen_location_list=gen_location_list, ) + return None if pipeline_name == "NOAA lightning strikes by year": - url_path = os.path.split(source_url)[0] - file_pattern = str.split(os.path.split(source_url)[1], "*")[0] - url_list = url_directory_list(f"{url_path}/", file_pattern) - if full_data_load == "N": - start = datetime.datetime.now().year - 6 + src_url = source_url["lightning_strikes_by_year"] + process_lightning_strikes_by_year( + source_file=source_file, + target_file=target_file, + pipeline_name=pipeline_name, + source_url=src_url, + chunksize=chunksize, + project_id=project_id, + dataset_id=dataset_id, + destination_table=destination_table, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + schema_path=schema_path, + drop_dest_table=drop_dest_table, + input_field_delimiter=input_field_delimiter, + full_data_load=full_data_load, + start_year=start_year, + input_csv_headers=input_csv_headers, + data_dtypes=data_dtypes, + reorder_headers_list=reorder_headers_list, + null_rows_list=null_rows_list, + date_format_list=date_format_list, + slice_column_list=slice_column_list, + regex_list=regex_list, + rename_headers_list=rename_headers_list, + remove_source_file=remove_source_file, + delete_target_file=delete_target_file, + number_of_header_rows=number_of_header_rows, + int_date_list=int_date_list, + gen_location_list=gen_location_list, + ) + return None + if pipeline_name == "NOAA Storms database by year": + process_storms_database_by_year( + source_url=source_url, + source_file=source_file, + target_file=target_file, + project_id=project_id, + dataset_id=dataset_id, + destination_table=destination_table, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + schema_path=schema_path, + drop_dest_table=drop_dest_table, + start_year=start_year, + reorder_headers_list=reorder_headers_list, + date_format_list=date_format_list, + rename_headers_list=rename_headers_list, + gen_location_list=gen_location_list, + ) + return None + + +def process_storms_database_by_year( + source_url: dict, + source_file: pathlib.Path, + target_file: pathlib.Path, + project_id: str, + dataset_id: str, + destination_table: str, + target_gcs_bucket: str, + target_gcs_path: str, + schema_path: str, + drop_dest_table: str, + start_year: str, + reorder_headers_list: typing.List[str], + date_format_list: typing.List[str], + rename_headers_list: dict, + gen_location_list: dict, +) -> None: + host = source_url["root"].split("ftp://")[1].split("/")[0] + cwd = source_url["root"].split("ftp://")[1][len(host) :] + list_of_details_files = sorted( + ftp_list_of_files(host=host, cwd=cwd, filter_expr="StormEvents_details") + ) + list_of_locations_files = sorted( + ftp_list_of_files(host=host, cwd=cwd, filter_expr="StormEvents_locations") + ) + for year_to_process in range(int(start_year), datetime.date.today().year + 1): + locations_file = list( + filter( + lambda x: x.startswith( + f"StormEvents_locations-ftp_v1.0_d{str(year_to_process)}" + ), + list_of_locations_files, + ) + ) + details_file = list( + filter( + lambda x: x.startswith( + f"StormEvents_details-ftp_v1.0_d{str(year_to_process)}" + ), + list_of_details_files, + ) + ) + if locations_file: + ftp_filename = locations_file[0] + local_file = str(source_file).replace( + ".csv", f"_{str(year_to_process)}_locations.csv" + ) + local_zipfile = f"{os.path.dirname(local_file)}/{ftp_filename}" + ftp_zipfile_path = f'{source_url["root"]}/{ftp_filename}' + logging.info("Processing Storms Locations File ...") + logging.info( + f" host={host} cwd={cwd} ftp_filename={ftp_filename} local_file={local_file} local_zipfile={local_zipfile} source_url={ftp_zipfile_path} " + ) + df_locations = FTP_to_DF( + host=host, + cwd=cwd, + ftp_filename=ftp_filename, + local_file=local_zipfile, + source_url=ftp_zipfile_path, + ) else: - start = int(start_year) - for yr in range(start, datetime.datetime.now().year): - for url in url_list: - url_file_name = os.path.split(url)[1] - if str(url_file_name).find(f"{file_pattern}{yr}") >= 0: - source_file_path = os.path.split(source_file)[0] - source_file_zipped = f"{source_file_path}/{url_file_name}" - source_file_year = str.replace( - str(source_file), ".csv", f"_{yr}.csv" - ) - target_file_year = str.replace( - str(target_file), ".csv", f"_{yr}.csv" - ) - download_file(url, source_file_zipped) - gz_decompress( - infile=source_file_zipped, - tofile=source_file_year, - delete_zipfile=True, + logging.info("Storms Locations File does not exist!") + df_locations = create_storms_locations_df() + ftp_filename = details_file[0] + local_file = str(source_file).replace( + ".csv", f"_{str(year_to_process)}_detail.csv" + ) + local_zipfile = f"{os.path.dirname(local_file)}/{ftp_filename}" + ftp_zipfile_path = f'{source_url["root"]}/{ftp_filename}' + logging.info("Processing Storms Detail File ...") + logging.info( + f" host={host} cwd={cwd} ftp_filename={ftp_filename} local_file={local_file} local_zipfile={local_zipfile} source_url={ftp_zipfile_path} " + ) + df_details = FTP_to_DF( + host=host, + cwd=cwd, + ftp_filename=ftp_filename, + local_file=local_zipfile, + source_url=ftp_zipfile_path, + ) + logging.info("Merging Details and Locations files") + df = pd.merge( + df_details, + df_locations, + left_on="EVENT_ID", + right_on="EVENT_ID", + how="left", + ) + df = rename_headers(df=df, rename_headers_list=rename_headers_list) + df["event_latitude"] = df["event_latitude"].apply( + lambda x: x - 60 if x > 90 else x + ) + df = generate_location(df, gen_location_list) + df = reorder_headers(df, reorder_headers_list=reorder_headers_list) + for dt_fld in date_format_list.items(): + logging.info(f"Resolving date formats in field {dt_fld}") + df[dt_fld[0]] = df[dt_fld[0]].apply( + lambda x: pd.to_datetime(str(x), format="%d-%b-%y %H:%M:%S") + ) + df[dt_fld[0]] = df[dt_fld[0]].apply( + lambda x: f"{year_to_process}-{str(x)[5:]}" + ) + df = fix_data_anomolies_storms(df) + targ_file_yr = str.replace(str(target_file), ".csv", f"_{year_to_process}.csv") + save_to_new_file(df=df, file_path=targ_file_yr, sep="|", quotechar="^") + sed(["-i", "s/|nan|/||/g", targ_file_yr]) + sed(["-i", "s/|/|/g", targ_file_yr]) + upload_file_to_gcs( + file_path=targ_file_yr, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + ) + drop_table = drop_dest_table == "Y" + table_exists = create_dest_table( + project_id=project_id, + dataset_id=dataset_id, + table_id=f"{destination_table}_{str(year_to_process)}", + schema_filepath=schema_path, + bucket_name=target_gcs_bucket, + drop_table=drop_table, + ) + if table_exists: + load_data_to_bq( + project_id=project_id, + dataset_id=dataset_id, + table_id=f"{destination_table}_{str(year_to_process)}", + file_path=targ_file_yr, + truncate_table=True, + field_delimiter="|", + quotechar="^", + ) + + +def clean_source_file(source_file: str) -> None: + logging.info("Cleaning source file") + sed(["-i", 's/,\\"\\"\\"/,\\"\\|\\\'\\|\\\'/g;', source_file]) + sed(["-i", "s/\\\"\\\" /\\|\\'\\|\\' /g;", source_file]) + sed(["-i", "s/ \\\"\\\"/ \\|\\'\\|\\'/g;", source_file]) + sed(["-i", "s/ \\\"/ \\|\\'/g;", source_file]) + sed(["-i", "s/\\\" /\\|\\' /g;", source_file]) + + +def fix_data_anomolies_storms(df: pd.DataFrame) -> pd.DataFrame: + logging.info("Cleansing data") + df["damage_property"] = ( + df["damage_property"] + .apply(lambda x: shorthand_to_number(x)) + .fillna(0) + .astype(np.int64) + ) + df["damage_crops"] = ( + df["damage_crops"] + .apply(lambda x: shorthand_to_number(x)) + .fillna(0) + .astype(np.int64) + ) + df["event_type"] = df["event_type"].apply(lambda x: str(x).lower()) + df["state"] = df["state"].apply( + lambda x: f"{str.capitalize(x)[0]}{str.lower(x)[1]}" + ) + df["event_point"] = df["event_point"].apply( + lambda x: str(x).replace("POINT(nan nan)", "") + ) + return df + + +def shorthand_to_number(x) -> float: + if type(x) == float or type(x) == int: + return x + if "K" in x: + if len(x) > 1: + return float(x.replace("K", "")) * 10**3 + return 10**3 + if "M" in x: + if len(x) > 1: + return float(x.replace("M", "")) * 10**6 + return 10**6 + if "B" in x: + if len(x) > 1: + return float(x.replace("B", "")) * 10**9 + return 10**9 + if "T" in x: + if len(x) > 1: + return float(x.replace("T", "")) * 10**12 + return 10**12 + if "Q" in x: + if len(x) > 1: + return float(x.replace("Q", "")) * 10**15 + return 10**15 + return 0.0 + + +def generate_location(df: pd.DataFrame, gen_location_list: dict) -> pd.DataFrame: + logging.info("Generating location data") + for key, values in gen_location_list.items(): + logging.info(f"Generating location data for field {key}") + df[key] = ( + "POINT(" + + df[values[0]][:].astype("string") + + " " + + df[values[1]][:].astype("string") + + ")" + ) + return df + + +def FTP_to_DF( + host: str, + cwd: str, + ftp_filename: str, + local_file: str, + source_url: str, + sep: str = ",", +) -> pd.DataFrame: + download_file_ftp( + ftp_host=host, + ftp_dir=cwd, + ftp_filename=ftp_filename, + local_file=local_file, + source_url=source_url, + ) + logging.info(f"Loading file {local_file} into DataFrame") + decompressed_source_file = local_file.replace(".gz", "") + gz_decompress( + infile=local_file, + tofile=decompressed_source_file, + delete_zipfile=False, + ) + if "locations" in decompressed_source_file: + df = pd.read_csv( + decompressed_source_file, + engine="python", + encoding="utf-8", + quotechar='"', + sep=sep, + quoting=csv.QUOTE_ALL, + header=0, + keep_default_na=True, + na_values=[" "], + ) + else: + clean_source_file(decompressed_source_file) + # import pdb; pdb.set_trace() + df = pd.read_csv( + decompressed_source_file, + engine="python", + encoding="utf-8", + quotechar='"', + sep=sep, + header=0, + keep_default_na=True, + na_values=[" "], + ) + for col in df: + if str(df[col].dtype) == "object": + logging.info(f"Replacing values in column {col}") + df[col] = df[col].apply(lambda x: str(x).replace("|'", '"')) + else: + pass + return df + + +def create_storms_locations_df() -> pd.DataFrame: + df_loc = pd.DataFrame( + columns=[ + "YEARMONTH", + "EPISODE_ID", + "EVENT_ID", + "LOCATION_INDEX", + "RANGE", + "AZIMUTH", + "LOCATION", + "LATITUDE", + "LONGITUDE", + "LAT2", + "LON2", + ] + ) + return df_loc + + +def ftp_list_of_files(host: str, cwd: str, filter_expr: str = "") -> typing.List[str]: + try_count = 0 + while True: + try: + ftp = ftplib.FTP(host) + ftp.login() + ftp.cwd(cwd) + file_list = ftp.nlst() + if filter != "": + file_list = list( + filter(lambda x: str(x).find(filter_expr) >= 0, file_list) + ) + ftp.quit() + return file_list + except TimeoutError as e: + try_count += 1 + if try_count > 3: + raise e + else: + logging.info(f"{e}, Retrying ...") + time.sleep(try_count * 30) + + +def process_lightning_strikes_by_year( + pipeline_name: str, + source_url: dict, + source_file: pathlib.Path, + target_file: pathlib.Path, + chunksize: str, + project_id: str, + dataset_id: str, + destination_table: str, + target_gcs_bucket: str, + target_gcs_path: str, + schema_path: str, + drop_dest_table: str, + input_field_delimiter: str, + full_data_load: str, + start_year: str, + input_csv_headers: typing.List[str], + data_dtypes: dict, + reorder_headers_list: typing.List[str], + null_rows_list: typing.List[str], + date_format_list: typing.List[str], + slice_column_list: dict, + regex_list: dict, + remove_source_file: bool, + rename_headers_list: dict, + delete_target_file: bool, + number_of_header_rows: int, + int_date_list: typing.List[str], + gen_location_list: dict, +) -> None: + url_path = os.path.split(source_url)[0] + file_pattern = str.split(os.path.split(source_url)[1], "*")[0] + url_list = url_directory_list(f"{url_path}/", file_pattern) + if full_data_load == "N": + start = datetime.datetime.now().year - 6 + else: + start = int(start_year) + for yr in range(start, datetime.datetime.now().year): + for url in url_list: + url_file_name = os.path.split(url)[1] + if str(url_file_name).find(f"{file_pattern}{yr}") >= 0: + source_file_path = os.path.split(source_file)[0] + source_file_zipped = f"{source_file_path}/{url_file_name}" + source_file_year = str.replace(str(source_file), ".csv", f"_{yr}.csv") + target_file_year = str.replace(str(target_file), ".csv", f"_{yr}.csv") + download_file(url, source_file_zipped) + gz_decompress( + infile=source_file_zipped, + tofile=source_file_year, + delete_zipfile=True, + ) + if number_of_header_rows > 0: + remove_header_rows( + source_file_year, + number_of_header_rows=number_of_header_rows, ) - if number_of_header_rows > 0: - remove_header_rows( - source_file_year, - number_of_header_rows=number_of_header_rows, - ) - else: - pass - if not full_data_load: - delete_source_file_data_from_bq( - project_id=project_id, - dataset_id=dataset_id, - table_id=destination_table, - source_url=url, - ) - process_and_load_table( - source_file=source_file_year, - target_file=target_file_year, - pipeline_name=pipeline_name, - source_url=url, - chunksize=chunksize, + else: + pass + if not full_data_load: + delete_source_file_data_from_bq( project_id=project_id, dataset_id=dataset_id, - destination_table=destination_table, - target_gcs_bucket=target_gcs_bucket, - target_gcs_path=target_gcs_path, - schema_path=schema_path, - drop_dest_table=drop_dest_table, - input_field_delimiter=input_field_delimiter, - input_csv_headers=input_csv_headers, - data_dtypes=data_dtypes, - reorder_headers_list=reorder_headers_list, - null_rows_list=null_rows_list, - date_format_list=date_format_list, - slice_column_list=slice_column_list, - regex_list=regex_list, - rename_headers_list=rename_headers_list, - remove_source_file=remove_source_file, - delete_target_file=delete_target_file, - int_date_list=int_date_list, - gen_location_list=gen_location_list, + table_id=destination_table, + source_url=url, ) + process_and_load_table( + source_file=source_file_year, + target_file=target_file_year, + pipeline_name=pipeline_name, + source_url=url, + chunksize=chunksize, + project_id=project_id, + dataset_id=dataset_id, + destination_table=destination_table, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + schema_path=schema_path, + drop_dest_table=drop_dest_table, + input_field_delimiter=input_field_delimiter, + input_csv_headers=input_csv_headers, + data_dtypes=data_dtypes, + reorder_headers_list=reorder_headers_list, + null_rows_list=null_rows_list, + date_format_list=date_format_list, + slice_column_list=slice_column_list, + regex_list=regex_list, + rename_headers_list=rename_headers_list, + remove_source_file=remove_source_file, + delete_target_file=delete_target_file, + int_date_list=int_date_list, + gen_location_list=gen_location_list, + truncate_table=False, + ) def process_and_load_table( @@ -366,6 +759,7 @@ def process_and_load_table( delete_target_file: bool, int_date_list: typing.List[str], gen_location_list: dict, + truncate_table: bool = True, encoding: str = "utf-8", ) -> None: process_source_file( @@ -412,7 +806,8 @@ def process_and_load_table( dataset_id=dataset_id, table_id=destination_table, file_path=target_file, - truncate_table=True, + truncate_table=truncate_table, + source_url=source_url, field_delimiter="|", ) else: @@ -618,15 +1013,6 @@ def convert_date_from_int(df: pd.DataFrame, int_date_list: dict) -> pd.DataFrame return df -def generate_location(df: pd.DataFrame, gen_location_list: dict) -> pd.DataFrame: - logging.info("Generating location data") - for key, values in gen_location_list.items(): - df[key] = df[[values[0], values[1]]].apply( - lambda x: f"POINT({x[0]} {x[1]})", axis=1 - ) - return df - - def url_directory_list( source_url_path: str, file_pattern: str = "" ) -> typing.List[str]: @@ -778,7 +1164,9 @@ def load_data_to_bq( table_id: str, file_path: str, truncate_table: bool, + source_url: str = "", field_delimiter: str = "|", + quotechar: str = '"', ) -> None: logging.info( f"Loading data from {file_path} into {project_id}.{dataset_id}.{table_id} started" @@ -791,9 +1179,20 @@ def load_data_to_bq( if truncate_table: job_config.write_disposition = "WRITE_TRUNCATE" else: + if source_url == "": + pass + else: + delete_source_file_data_from_bq( + project_id=project_id, + dataset_id=dataset_id, + table_id=table_id, + source_url=source_url, + ) job_config.write_disposition = "WRITE_APPEND" job_config.skip_leading_rows = 1 # ignore the header job_config.autodetect = False + job_config.allow_quoted_newlines = True + job_config.quote_character = quotechar with open(file_path, "rb") as source_file: job = client.load_table_from_file(source_file, table_ref, job_config=job_config) job.result() @@ -907,9 +1306,11 @@ def create_table_schema( return schema -def save_to_new_file(df: pd.DataFrame, file_path: str, sep: str = "|") -> None: +def save_to_new_file( + df: pd.DataFrame, file_path: str, sep: str = "|", quotechar: str = '"' +) -> None: logging.info(f"Saving data to target file.. {file_path} ...") - df.to_csv(file_path, index=False, sep=sep) + df.to_csv(file_path, index=False, sep=sep, quotechar=quotechar) def append_batch_file( @@ -965,17 +1366,24 @@ def download_file_ftp( def download_file_ftp_single_try( ftp_host: str, ftp_dir: str, ftp_filename: str, local_file: pathlib.Path ) -> bool: - # try: - with ftplib.FTP(ftp_host, timeout=60) as ftp_conn: - ftp_conn.login("", "") - ftp_conn.cwd(ftp_dir) - ftp_conn.encoding = "utf-8" - with open(local_file, "wb") as dest_file: - ftp_conn.retrbinary("RETR %s" % ftp_filename, dest_file.write) - ftp_conn.quit() - return True - # except: - # return True + try_count = 0 + while True: + try: + with ftplib.FTP(ftp_host, timeout=60) as ftp_conn: + ftp_conn.login("", "") + ftp_conn.cwd(ftp_dir) + ftp_conn.encoding = "utf-8" + with open(local_file, "wb") as dest_file: + ftp_conn.retrbinary("RETR %s" % ftp_filename, dest_file.write) + ftp_conn.quit() + return True + except TimeoutError as e: + try_count += 1 + if try_count > 3: + raise e + else: + logging.info(f"{e}, Retrying ...") + time.sleep(try_count * 30) def upload_file_to_gcs( @@ -1000,7 +1408,7 @@ def upload_file_to_gcs( main( pipeline_name=os.environ.get("PIPELINE_NAME", ""), - source_url=os.environ.get("SOURCE_URL", ""), + source_url=json.loads(os.environ.get("SOURCE_URL", r"{}")), source_file=pathlib.Path(os.environ.get("SOURCE_FILE", "")).expanduser(), target_file=pathlib.Path(os.environ.get("TARGET_FILE", "")).expanduser(), chunksize=os.environ.get("CHUNKSIZE", "100000"), diff --git a/datasets/noaa/pipelines/_images/run_csv_transform_kub/requirements.txt b/datasets/noaa/pipelines/_images/run_csv_transform_kub/requirements.txt index 4c7f177b1..a8d9af4e9 100644 --- a/datasets/noaa/pipelines/_images/run_csv_transform_kub/requirements.txt +++ b/datasets/noaa/pipelines/_images/run_csv_transform_kub/requirements.txt @@ -3,3 +3,4 @@ google-cloud-bigquery google-cloud-storage numpy pandas +sh diff --git a/datasets/noaa/pipelines/noaa/noaa_dag.py b/datasets/noaa/pipelines/noaa/noaa_dag.py index d4d19ac47..d28f363a9 100644 --- a/datasets/noaa/pipelines/noaa/noaa_dag.py +++ b/datasets/noaa/pipelines/noaa/noaa_dag.py @@ -1,4 +1,4 @@ -# Copyright 2021 Google LLC +# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -61,7 +61,7 @@ image="{{ var.json.noaa.container_registry.run_csv_transform_kub }}", env_vars={ "PIPELINE_NAME": "GHCND by year", - "SOURCE_URL": "ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/.csv.gz", + "SOURCE_URL": '{\n "ghcnd_by_year": "ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/.csv.gz"\n}', "SOURCE_FILE": "files/data_ghcnd_by_year.csv", "TARGET_FILE": "files/data_output_ghcnd_by_year.csv", "CHUNKSIZE": "750000", @@ -102,7 +102,7 @@ image="{{ var.json.noaa.container_registry.run_csv_transform_kub }}", env_vars={ "PIPELINE_NAME": "GHCND countries", - "SOURCE_URL": "ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-countries.txt", + "SOURCE_URL": '{\n "ghcnd_countries": "ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-countries.txt"\n}', "SOURCE_FILE": "files/data_ghcnd_countries.csv", "TARGET_FILE": "files/data_output_ghcnd_countries.csv", "CHUNKSIZE": "750000", @@ -138,7 +138,7 @@ image="{{ var.json.noaa.container_registry.run_csv_transform_kub }}", env_vars={ "PIPELINE_NAME": "GHCND inventory", - "SOURCE_URL": "ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-inventory.txt", + "SOURCE_URL": '{\n "ghcnd_inventory": "ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-inventory.txt"\n}', "SOURCE_FILE": "files/data_ghcnd_inventory.csv", "TARGET_FILE": "files/data_output_ghcnd_inventory.csv", "CHUNKSIZE": "750000", @@ -174,7 +174,7 @@ image="{{ var.json.noaa.container_registry.run_csv_transform_kub }}", env_vars={ "PIPELINE_NAME": "GHCND states", - "SOURCE_URL": "ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-states.txt", + "SOURCE_URL": '{\n "ghcnd_states": "ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-states.txt"\n}', "SOURCE_FILE": "files/data_ghcnd_states.csv", "TARGET_FILE": "files/data_output_ghcnd_states.csv", "CHUNKSIZE": "750000", @@ -210,7 +210,7 @@ image="{{ var.json.noaa.container_registry.run_csv_transform_kub }}", env_vars={ "PIPELINE_NAME": "GHCND stations", - "SOURCE_URL": "ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt", + "SOURCE_URL": '{\n "ghcnd_stations": "ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt"\n}', "SOURCE_FILE": "files/data_ghcnd_stations.csv", "TARGET_FILE": "files/data_output_ghcnd_stations.csv", "CHUNKSIZE": "750000", @@ -246,7 +246,7 @@ image="{{ var.json.noaa.container_registry.run_csv_transform_kub }}", env_vars={ "PIPELINE_NAME": "GSOD stations", - "SOURCE_URL": "ftp://ftp.ncdc.noaa.gov/pub/data/noaa/isd-history.txt", + "SOURCE_URL": '{\n "gsod_stations": "ftp://ftp.ncdc.noaa.gov/pub/data/noaa/isd-history.txt"\n}', "SOURCE_FILE": "files/data_gsod_stations.csv", "TARGET_FILE": "files/data_output_gsod_stations.csv", "CHUNKSIZE": "750000", @@ -285,7 +285,7 @@ image="{{ var.json.noaa.container_registry.run_csv_transform_kub }}", env_vars={ "PIPELINE_NAME": "GHCND hurricanes", - "SOURCE_URL": "https://www.ncei.noaa.gov/data/international-best-track-archive-for-climate-stewardship-ibtracs/v04r00/access/csv/ibtracs.ALL.list.v04r00.csv", + "SOURCE_URL": '{\n "ghcnd_hurricanes": "https://www.ncei.noaa.gov/data/international-best-track-archive-for-climate-stewardship-ibtracs/v04r00/access/csv/ibtracs.ALL.list.v04r00.csv"\n}', "SOURCE_FILE": "files/data_ghcnd_hurricanes.csv", "TARGET_FILE": "files/data_output_ghcnd_hurricanes.csv", "CHUNKSIZE": "750000", @@ -319,7 +319,7 @@ image="{{ var.json.noaa.container_registry.run_csv_transform_kub }}", env_vars={ "PIPELINE_NAME": "NOAA lightning strikes by year", - "SOURCE_URL": "https://www1.ncdc.noaa.gov/pub/data/swdi/database-csv/v2/nldn-tiles-*.csv.gz", + "SOURCE_URL": '{\n "lightning_strikes_by_year": "https://www1.ncdc.noaa.gov/pub/data/swdi/database-csv/v2/nldn-tiles-*.csv.gz"\n}', "SOURCE_FILE": "files/data_lightning_strikes.csv", "TARGET_FILE": "files/data_output_lightning_strikes.csv", "CHUNKSIZE": "1000000", @@ -347,6 +347,42 @@ }, resources={"request_ephemeral_storage": "16G", "limit_cpu": "3"}, ) + + # Run NOAA load processes - Storms Database + storms_database_by_year = kubernetes_engine.GKEStartPodOperator( + task_id="storms_database_by_year", + name="noaa.storms_database_by_year", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="noaa", + namespace="default", + image_pull_policy="Always", + image="{{ var.json.noaa.container_registry.run_csv_transform_kub }}", + env_vars={ + "PIPELINE_NAME": "NOAA Storms database by year", + "SOURCE_URL": '{\n "root": "ftp://ftp.ncdc.noaa.gov/pub/data/swdi/stormevents/csvfiles",\n "storms_details": "StormEvents_details-ftp_v1.0_d",\n "storms_locations": "StormEvents_locations-ftp_v1.0_d"\n}', + "SOURCE_FILE": "files/data_storms_database.csv", + "TARGET_FILE": "files/data_output_storms_database.csv", + "CHUNKSIZE": "500000", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "noaa_historic_severe_storms", + "TABLE_ID": "storms", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/noaa/storms_db/data_output.csv", + "SCHEMA_PATH": "data/noaa/schema/noaa_historic_severe_storms_schema.json", + "DROP_DEST_TABLE": "N", + "INPUT_FIELD_DELIMITER": ",", + "FULL_DATA_LOAD": "N", + "REMOVE_SOURCE_FILE": "Y", + "DELETE_TARGET_FILE": "Y", + "START_YEAR": "1950", + "RENAME_HEADERS_LIST": '{\n "EPISODE_ID_x": "episode_id",\n "EVENT_ID": "event_id",\n "STATE": "state",\n "STATE_FIPS": "state_fips_code",\n "EVENT_TYPE": "event_type",\n "CZ_TYPE": "cz_type",\n "CZ_FIPS": "cz_fips_code",\n "CZ_NAME": "cz_name",\n "WFO": "wfo",\n "BEGIN_DATE_TIME": "event_begin_time",\n "CZ_TIMEZONE": "event_timezone",\n "END_DATE_TIME": "event_end_time",\n "INJURIES_DIRECT": "injuries_direct",\n "INJURIES_INDIRECT": "injuries_indirect",\n "DEATHS_DIRECT": "deaths_direct",\n "DEATHS_INDIRECT": "deaths_indirect",\n "DAMAGE_PROPERTY": "damage_property",\n "DAMAGE_CROPS": "damage_crops",\n "SOURCE": "source",\n "MAGNITUDE": "magnitude",\n "MAGNITUDE_TYPE": "magnitude_type",\n "FLOOD_CAUSE": "flood_cause",\n "TOR_F_SCALE": "tor_f_scale",\n "TOR_LENGTH": "tor_length",\n "TOR_WIDTH": "tor_width",\n "TOR_OTHER_WFO": "tor_other_wfo",\n "LOCATION_INDEX": "location_index",\n "RANGE": "event_range",\n "AZIMUTH": "event_azimuth",\n "LOCATION": "reference_location",\n "LATITUDE": "event_latitude",\n "LONGITUDE": "event_longitude"\n}', + "DATE_FORMAT_LIST": '{\n "event_begin_time": "%Y-%m-%d %H:%M:%S",\n "event_end_time": "%Y-%m-%d %H:%M:%S"\n}', + "GEN_LOCATION_LIST": '{\n "event_point": ["event_longitude", "event_latitude"]\n}', + "REORDER_HEADERS_LIST": '[\n "episode_id",\n "event_id",\n "state",\n "state_fips_code",\n "event_type",\n "cz_type",\n "cz_fips_code",\n "cz_name",\n "wfo",\n "event_begin_time",\n "event_timezone",\n "event_end_time",\n "injuries_direct",\n "injuries_indirect",\n "deaths_direct",\n "deaths_indirect",\n "damage_property",\n "damage_crops",\n "source",\n "magnitude",\n "magnitude_type",\n "flood_cause",\n "tor_f_scale",\n "tor_length",\n "tor_width",\n "tor_other_wfo",\n "location_index",\n "event_range",\n "event_azimuth",\n "reference_location",\n "event_latitude",\n "event_longitude",\n "event_point"\n]', + }, + resources={"request_ephemeral_storage": "16G", "limit_cpu": "3"}, + ) delete_cluster = kubernetes_engine.GKEDeleteClusterOperator( task_id="delete_cluster", project_id="{{ var.value.gcp_project }}", @@ -357,14 +393,15 @@ ( create_cluster >> [ - ghcnd_by_year, - ghcnd_countries, - ghcnd_inventory, ghcnd_states, ghcnd_stations, gsod_stations, - ghcnd_hurricanes, - lightning_strikes_by_year, + ghcnd_countries, + ghcnd_inventory, ] + >> storms_database_by_year + >> ghcnd_by_year + >> ghcnd_hurricanes + >> lightning_strikes_by_year >> delete_cluster ) diff --git a/datasets/noaa/pipelines/noaa/pipeline.yaml b/datasets/noaa/pipelines/noaa/pipeline.yaml index d98ec82cb..b2fd0b6a4 100644 --- a/datasets/noaa/pipelines/noaa/pipeline.yaml +++ b/datasets/noaa/pipelines/noaa/pipeline.yaml @@ -80,7 +80,10 @@ dag: image: "{{ var.json.noaa.container_registry.run_csv_transform_kub }}" env_vars: PIPELINE_NAME: "GHCND by year" - SOURCE_URL: "ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/.csv.gz" + SOURCE_URL: >- + { + "ghcnd_by_year": "ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/.csv.gz" + } SOURCE_FILE: "files/data_ghcnd_by_year.csv" TARGET_FILE: "files/data_output_ghcnd_by_year.csv" CHUNKSIZE: "750000" @@ -159,7 +162,10 @@ dag: image: "{{ var.json.noaa.container_registry.run_csv_transform_kub }}" env_vars: PIPELINE_NAME: "GHCND countries" - SOURCE_URL: "ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-countries.txt" + SOURCE_URL: >- + { + "ghcnd_countries": "ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-countries.txt" + } SOURCE_FILE: "files/data_ghcnd_countries.csv" TARGET_FILE: "files/data_output_ghcnd_countries.csv" CHUNKSIZE: "750000" @@ -211,7 +217,10 @@ dag: image: "{{ var.json.noaa.container_registry.run_csv_transform_kub }}" env_vars: PIPELINE_NAME: "GHCND inventory" - SOURCE_URL: "ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-inventory.txt" + SOURCE_URL: >- + { + "ghcnd_inventory": "ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-inventory.txt" + } SOURCE_FILE: "files/data_ghcnd_inventory.csv" TARGET_FILE: "files/data_output_ghcnd_inventory.csv" CHUNKSIZE: "750000" @@ -271,7 +280,10 @@ dag: image: "{{ var.json.noaa.container_registry.run_csv_transform_kub }}" env_vars: PIPELINE_NAME: "GHCND states" - SOURCE_URL: "ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-states.txt" + SOURCE_URL: >- + { + "ghcnd_states": "ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-states.txt" + } SOURCE_FILE: "files/data_ghcnd_states.csv" TARGET_FILE: "files/data_output_ghcnd_states.csv" CHUNKSIZE: "750000" @@ -323,7 +335,10 @@ dag: image: "{{ var.json.noaa.container_registry.run_csv_transform_kub }}" env_vars: PIPELINE_NAME: "GHCND stations" - SOURCE_URL: "ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt" + SOURCE_URL: >- + { + "ghcnd_stations": "ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt" + } SOURCE_FILE: "files/data_ghcnd_stations.csv" TARGET_FILE: "files/data_output_ghcnd_stations.csv" CHUNKSIZE: "750000" @@ -389,7 +404,10 @@ dag: image: "{{ var.json.noaa.container_registry.run_csv_transform_kub }}" env_vars: PIPELINE_NAME: "GSOD stations" - SOURCE_URL: "ftp://ftp.ncdc.noaa.gov/pub/data/noaa/isd-history.txt" + SOURCE_URL: >- + { + "gsod_stations": "ftp://ftp.ncdc.noaa.gov/pub/data/noaa/isd-history.txt" + } SOURCE_FILE: "files/data_gsod_stations.csv" TARGET_FILE: "files/data_output_gsod_stations.csv" CHUNKSIZE: "750000" @@ -486,7 +504,10 @@ dag: image: "{{ var.json.noaa.container_registry.run_csv_transform_kub }}" env_vars: PIPELINE_NAME: "GHCND hurricanes" - SOURCE_URL: "https://www.ncei.noaa.gov/data/international-best-track-archive-for-climate-stewardship-ibtracs/v04r00/access/csv/ibtracs.ALL.list.v04r00.csv" + SOURCE_URL: >- + { + "ghcnd_hurricanes": "https://www.ncei.noaa.gov/data/international-best-track-archive-for-climate-stewardship-ibtracs/v04r00/access/csv/ibtracs.ALL.list.v04r00.csv" + } SOURCE_FILE: "files/data_ghcnd_hurricanes.csv" TARGET_FILE: "files/data_output_ghcnd_hurricanes.csv" CHUNKSIZE: "750000" @@ -889,7 +910,10 @@ dag: image: "{{ var.json.noaa.container_registry.run_csv_transform_kub }}" env_vars: PIPELINE_NAME: "NOAA lightning strikes by year" - SOURCE_URL: "https://www1.ncdc.noaa.gov/pub/data/swdi/database-csv/v2/nldn-tiles-*.csv.gz" + SOURCE_URL: >- + { + "lightning_strikes_by_year": "https://www1.ncdc.noaa.gov/pub/data/swdi/database-csv/v2/nldn-tiles-*.csv.gz" + } SOURCE_FILE: "files/data_lightning_strikes.csv" TARGET_FILE: "files/data_output_lightning_strikes.csv" CHUNKSIZE: "1000000" @@ -946,6 +970,123 @@ dag: resources: request_ephemeral_storage: "16G" limit_cpu: "3" + - operator: "GKEStartPodOperator" + description: "Run NOAA load processes - Storms Database" + args: + task_id: "storms_database_by_year" + name: "noaa.storms_database_by_year" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: noaa + namespace: "default" + image_pull_policy: "Always" + image: "{{ var.json.noaa.container_registry.run_csv_transform_kub }}" + env_vars: + PIPELINE_NAME: "NOAA Storms database by year" + SOURCE_URL: >- + { + "root": "ftp://ftp.ncdc.noaa.gov/pub/data/swdi/stormevents/csvfiles", + "storms_details": "StormEvents_details-ftp_v1.0_d", + "storms_locations": "StormEvents_locations-ftp_v1.0_d" + } + SOURCE_FILE: "files/data_storms_database.csv" + TARGET_FILE: "files/data_output_storms_database.csv" + CHUNKSIZE: "500000" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "noaa_historic_severe_storms" + TABLE_ID: "storms" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/noaa/storms_db/data_output.csv" + SCHEMA_PATH: "data/noaa/schema/noaa_historic_severe_storms_schema.json" + DROP_DEST_TABLE: "N" + INPUT_FIELD_DELIMITER: "," + FULL_DATA_LOAD: "N" + REMOVE_SOURCE_FILE: "Y" + DELETE_TARGET_FILE: "Y" + START_YEAR: "1950" + RENAME_HEADERS_LIST: >- + { + "EPISODE_ID_x": "episode_id", + "EVENT_ID": "event_id", + "STATE": "state", + "STATE_FIPS": "state_fips_code", + "EVENT_TYPE": "event_type", + "CZ_TYPE": "cz_type", + "CZ_FIPS": "cz_fips_code", + "CZ_NAME": "cz_name", + "WFO": "wfo", + "BEGIN_DATE_TIME": "event_begin_time", + "CZ_TIMEZONE": "event_timezone", + "END_DATE_TIME": "event_end_time", + "INJURIES_DIRECT": "injuries_direct", + "INJURIES_INDIRECT": "injuries_indirect", + "DEATHS_DIRECT": "deaths_direct", + "DEATHS_INDIRECT": "deaths_indirect", + "DAMAGE_PROPERTY": "damage_property", + "DAMAGE_CROPS": "damage_crops", + "SOURCE": "source", + "MAGNITUDE": "magnitude", + "MAGNITUDE_TYPE": "magnitude_type", + "FLOOD_CAUSE": "flood_cause", + "TOR_F_SCALE": "tor_f_scale", + "TOR_LENGTH": "tor_length", + "TOR_WIDTH": "tor_width", + "TOR_OTHER_WFO": "tor_other_wfo", + "LOCATION_INDEX": "location_index", + "RANGE": "event_range", + "AZIMUTH": "event_azimuth", + "LOCATION": "reference_location", + "LATITUDE": "event_latitude", + "LONGITUDE": "event_longitude" + } + DATE_FORMAT_LIST: >- + { + "event_begin_time": "%Y-%m-%d %H:%M:%S", + "event_end_time": "%Y-%m-%d %H:%M:%S" + } + GEN_LOCATION_LIST: >- + { + "event_point": ["event_longitude", "event_latitude"] + } + REORDER_HEADERS_LIST: >- + [ + "episode_id", + "event_id", + "state", + "state_fips_code", + "event_type", + "cz_type", + "cz_fips_code", + "cz_name", + "wfo", + "event_begin_time", + "event_timezone", + "event_end_time", + "injuries_direct", + "injuries_indirect", + "deaths_direct", + "deaths_indirect", + "damage_property", + "damage_crops", + "source", + "magnitude", + "magnitude_type", + "flood_cause", + "tor_f_scale", + "tor_length", + "tor_width", + "tor_other_wfo", + "location_index", + "event_range", + "event_azimuth", + "reference_location", + "event_latitude", + "event_longitude", + "event_point" + ] + resources: + request_ephemeral_storage: "16G" + limit_cpu: "3" - operator: "GKEDeleteClusterOperator" args: task_id: "delete_cluster" @@ -954,4 +1095,4 @@ dag: name: noaa graph_paths: - - "create_cluster >> [ghcnd_by_year, ghcnd_countries, ghcnd_inventory, ghcnd_states, ghcnd_stations, gsod_stations, ghcnd_hurricanes, lightning_strikes_by_year] >> delete_cluster" + - "create_cluster >> [ghcnd_states, ghcnd_stations, gsod_stations, ghcnd_countries, ghcnd_inventory] >> storms_database_by_year >> ghcnd_by_year >> ghcnd_hurricanes >> lightning_strikes_by_year >> delete_cluster"