# Create, enrich, and clean station metadata from three different sources:
- https://cloford.com/resources/codes/index.htm
- https://www.ncei.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt
- https://www.ncei.noaa.gov/pub/data/ghcn/daily/ghcnd-countries.txt


#### 1. scrape data from [https://cloford.com/resources/codes/index.htm](https://cloford.com/resources/codes/index.htm)

In [None]:
import requests
from bs4 import BeautifulSoup
import pandas as pd
import numpy as np

In [None]:
def translate_table_to_dataframe(table):
    dataframe_friendly_array = []
    dataframe_ready_headers = []
    column_headers = table.find_all('th')
    for header in column_headers:
        dataframe_ready_headers.append(header.text)

    rows = table.find_all('tr')

    for row in rows:
        row_data = []
        for col in row.find_all('td'):
            try:
                row_data.append(col.text)
            except:
                continue
        dataframe_friendly_array.append(row_data)
            
    dataframe = pd.DataFrame(data = dataframe_friendly_array, columns = dataframe_ready_headers)
    
    # we drop index 0 because it is always a set of null values
    return dataframe.drop(index=0)

In [None]:
list_of_dataframes = []

# get hhe web page with the data
page = requests.get("https://cloford.com/resources/codes/index.htm")
soup = BeautifulSoup(page.text, 'html.parser')

# locate the tables within the page
tables = soup.find_all('table',class_='outlinetable')
print(f'There were {len(tables)} tables found:\n')

# merged the table into one dataframe
merged_tables_df = pd.DataFrame()

for each_table in tables:
    df = translate_table_to_dataframe(each_table)
    if merged_tables_df is None:
        merged_tables_df = df
    else:
        merged_tables_df = pd.concat([merged_tables_df, df])

    list_of_dataframes.append(df)

names = ["Country Codes", "Additional Codes", "Additional FIPS 10-4 Codes"]

for one_dataframe, one_name in zip(list_of_dataframes, names):
    one_dataframe.name = one_name
    print(one_dataframe.name)

In [None]:
# replace nonsense values with 'No Record'
remove_list = ['-', '--', '\xa0', np.nan]

for column in merged_tables_df.columns:
    merged_tables_df.loc[merged_tables_df[column].isin(remove_list), column] = 'No Record'

# Assign the value of column 'ISO (2)' and 'Internet' for country 'Namibia' to be 'NA'
merged_tables_df.loc[merged_tables_df.Country == 'Namibia', ['ISO (2)', 'Internet']] = 'NA'

# Make sure there is no null values in any column
merged_tables_df.isna().sum()

#### 2. process ghcnd-stations text file from URL https://www.ncei.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt

In [None]:
import urllib.request

# get all the staion id information
URL_station = "https://www.ncei.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt"

# processing text by splitting specific range of characters into meaningful list
with urllib.request.urlopen(URL_station) as file:
    lines = [line.decode('utf-8').rstrip() for line in file]
 
    ID_list = [line[:11] for line in lines]
 
    lat_list = [line[12:20].lstrip() for line in lines]
 
    lon_list = [line[21:30].lstrip() for line in lines]
   
    state_list = [line[38:40].lstrip() for line in lines]
 
    station_name_list = [line[41:71].strip() for line in lines]

# create a dataframe with the lists we just created 
metadata_station = pd.DataFrame({  
    'ID' : ID_list,
    'Latitude' : lat_list,
    'Longitude' : lon_list,
    'State' : state_list,
    'StationName' : station_name_list
})

# replace blanks with nan value
metadata_station['State'] = metadata_station['State'].replace('', np.nan)

# create new column 'FIPS' by extracting the first two characters of 'ID' column
metadata_station['FIPS'] = metadata_station.ID.str[:2]

#### 3. process ghcnd-countries text file from URL https://www.ncei.noaa.gov/pub/data/ghcn/daily/ghcnd-countries.txt

In [None]:
# get all the country codes
URL_countries = "https://www.ncei.noaa.gov/pub/data/ghcn/daily/ghcnd-countries.txt"

with urllib.request.urlopen(URL_countries) as file:
    lines = [line.decode("utf-8").rstrip().split(" ", 1) for line in file]

metadata_countries = pd.DataFrame(lines, columns=['FIPS', 'Country'])

#### 3. merge all three dataframes

In [None]:
# drop duplicated column 'Country' 
merged_tables_df.drop('Country', axis=1, inplace = True)

# merge three dataframes based on the 'FIPS' column
middle_metadata = pd.merge(metadata_station, metadata_countries, on = 'FIPS')
final_metadata = pd.merge(middle_metadata, merged_tables_df, on = 'FIPS', how = 'left')

#### 4. perform data cleansing

In [None]:
# assign missing continent and region value to specific FIPS
# values are hard-coded and searched on google and wikipedia
missing_data_fips_list = final_metadata[final_metadata.Region.isnull() == True]["FIPS"].unique()
missing_data_continent_list = ["Asia", "Atlantic Ocean", "Oceania", "Europe", "Americas", "Europe", "Americas"]
missing_data_region_list = [
    "South West Asia",
    "South Atlantic Ocean",
    "North Pacific Ocean",
    "South East Europe",
    "North America",
    "South East Europe",
    "South America",
]
missing_data_dict = {
    fips: [missing_continent, missing_region]
    for fips, missing_continent, missing_region in zip(
        missing_data_fips_list, missing_data_continent_list, missing_data_region_list
    )
}

for fips, missing_data in missing_data_dict.items():
    final_metadata.loc[final_metadata["FIPS"] == fips, "Continent"] = missing_data[0]

    final_metadata.loc[final_metadata["FIPS"] == fips, "Region"] = missing_data[1]

# replace null value in 'State' column with 'Out of States'
final_metadata.loc[final_metadata["State"].isna(), "State"] = "Out of States"
# assign Capital value of FIPS equals to 'AE' to be 'Abu Dhabi'
final_metadata.loc[final_metadata["FIPS"] == "AE", "Capital"] = "Abu Dhabi"
# rename column format so that databricks environment can recognize
final_metadata.rename({"ISO (2)": "ISO-2", "ISO (3)": "ISO-3", "ISO (No)": "ISO-No"}, axis=1, inplace=True)
# replace null values with 'No Record'
final_metadata.fillna("No Record", inplace=True)

# Make sure there is no null values in any column
final_metadata.isna().sum()

#### 5. create schema `ghcn`

In [None]:
%sql
CREATE SCHEMA IF NOT EXISTS ghcn;

#### 6. write the final result into `ghcn.station_metadata` table

In [None]:
spark.createDataFrame(final_metadata).write.mode("overwrite").saveAsTable("ghcn.station_metadata")