# About
Processing environmental data

# Libraries

In [1]:
import json
import glob
from datetime import datetime, timedelta

# User-Defined Functions

In [3]:
class raw_data_to_time_series(object):
    """
    Sequential processing of data to obtain time series.
    
    Activities:
    - Drop column `created` 
    - Format column `date`
    - Split wind into two columns
    - Split gas into multiple columns (one for each gas)
    - Format columns
    - Deal with duplicates
    - Rearrange sequentially every datapoint. Highlighting missing values.
    """

    def __init__(self, df):
        """
        df: Raw dataset
        """
        self.df = df
        self.min_date = datetime.strptime(min(self.df["date"])[:10], '%Y-%m-%d')
        self.max_date = datetime.strptime(max(self.df["date"])[:10], '%Y-%m-%d')
        self.diff_days = (self.max_date - self.min_date).days

    # Processing `created` column
    def process_created_col(self):
        return self.df.drop("created", axis = 1)

    # Processing `date` column
    def process_date_col(self):
        df = self.process_created_col()
        # Split rows into two columns by string "T"
        tmp_date = df["date"].str.split("T", n = 2, expand = True)
        
        # Split rows of second column into two columns by string "Z"
        tmp_time = tmp_date[1].str.split("Z", n = 0, expand = True)
        
        df["date"] = pd.to_datetime(df["date"], format='%Y-%m-%dT%H:%M:%S.%fZ')
        df["day"] = tmp_date[0]
        df["time"] = tmp_time[0]
        
        return df

    # Processing `wind` column
    def process_wind_col(self):
        df = self.process_date_col()
        # Split arrays in dataframe column "wind" into two columns
        tmp_wind = pd.DataFrame(df["wind"].to_list(), columns=['c0','c1'])
        df["wind_northward"] = tmp_wind["c0"]
        df['wind_eastward'] = tmp_wind["c1"]
        return df
        
    # Processing `gas` column
    def process_gas_col(self):
        df = self.process_wind_col()
        
        gas_cols = list(df["gas"])
        # Split dictionary values in column "gas" into three columns
        tmp_gas = pd.DataFrame(gas_cols, columns=["o3", "no2", "pm25_gcc"])
        df["O3"] = tmp_gas["o3"]
        df["NO2"] = tmp_gas["no2"]
        df["PM25"] = tmp_gas["pm25_gcc"]
        return df[['file_name', 'date', 'day', 'time', 'station', 'pressure', 'temp', 'precip', 'wind_northward', 'wind_eastward', 'O3', 'NO2', 'PM25']]
    
    # Formatting data columns
    def data_formatting(self):
        df = self.process_gas_col()
        
        df['day'] = pd.to_datetime(df['day'], format='%Y-%m-%d')
        df['time'] = pd.to_datetime(df['time'], format='%H:%M:%S.%f').dt.time
        df['station'] = df["station"].astype(str)
        df['pressure'] = pd.to_numeric(df["pressure"], downcast='float')
        df['temp'] = pd.to_numeric(df["temp"], downcast='float')
        df['precip'] = pd.to_numeric(df["precip"], downcast='float')
        df['wind_northward'] = pd.to_numeric(df["wind_northward"], downcast='float')
        df['wind_eastward'] = pd.to_numeric(df["wind_eastward"], downcast='float')
        df['O3'] = pd.to_numeric(df["O3"], downcast='float')
        df['NO2'] = pd.to_numeric(df["NO2"], downcast='float')
        df['PM25'] = pd.to_numeric(df["PM25"], downcast='float')
        
        df = df.reset_index(drop = True)
        
        return df

    # Process for similar duplicate rows
    def duplicate_rows_test(self):    
        processed_df = self.data_formatting()
        processed_df= processed_df.drop_duplicates()
        
        sql0 = "Select day, time, count(*) as conteo from processed_df group by day, time order by conteo desc"
        sql1 = "Select * from ({}) s1 where conteo>1".format(sql0)

        if sqldf(sql1).shape[0] > 0:
            rows_to_clean = sqldf(sql1)

            # Empty dataframe to save single rows
            single_rows = pd.DataFrame(columns = list(processed_df.columns))

            # Backup the row to keep
            for d, t in zip(rows_to_clean["day"], rows_to_clean["time"]):
                # Subset of duplicate rows
                tmp_rows = processed_df[(processed_df["day"]==d) & (processed_df["time"]==datetime.strptime(str(t)[:8], '%H:%M:%S').time())]

                # Saving a single row from duplicates
                single_rows = single_rows.append(tmp_rows.sort_values("file_name", ascending = True)[:1])

                # Delete duplicate rows from original dataframe
                processed_df = processed_df.drop(index = list(tmp_rows.index))

            processed_df = processed_df.append(single_rows)

        processed_df = processed_df.sort_values(["file_name", "day", "time"], ascending = True)

        return processed_df.reset_index(drop = True)

    def time_series_df(self):
        df = self.duplicate_rows_test()
        
        date_list = pd.date_range(self.min_date, freq="60min", periods=(self.diff_days+1)*24).tolist()
        date_df = pd.DataFrame(date_list, columns = ["full_date"])
        
        # This function transforms datetime into object types which facilitate column creation
        sqlq = """Select r1.file_name, l1.full_date as datetime, r1.day as date, r1.time, r1.station, r1.pressure, r1.temp, r1.precip, r1.wind_northward, r1.wind_eastward, r1.O3, r1.NO2, r1.PM25
        from date_df l1 
        left join df r1 
        ON l1.full_date  = r1.date 
        """
        
        arranged_df = sqldf(sqlq)
        
        tmp_time = arranged_df["datetime"].str.split(" ", n = 2, expand = True)
        arranged_df["t_day"] = tmp_time[0]
        
        # Trimming empty rows near min_date
        tmp_rows = list(arranged_df[(arranged_df["t_day"] == str(self.min_date.date())) & (arranged_df["date"].isna())].index)
        arranged_df = arranged_df.drop(index = tmp_rows)
        
        # Trimming empty rows near max_date
        tmp_rows = list(arranged_df[(arranged_df["t_day"] == str(self.max_date.date())) & (arranged_df["date"].isna())].index)
        arranged_df = arranged_df.drop(index = tmp_rows)
        
        arranged_df = arranged_df.drop(["t_day"], axis = 1)
        arranged_df = arranged_df.drop(["file_name"], axis = 1)
        
        # The query edited the datetime format, so it is formatted again.
        #arranged_df["datetime"] = pd.to_datetime(arranged_df["datetime"], format='%Y-%m-%d %H:%M:%S.%f')
        arranged_df['date'] = pd.to_datetime(arranged_df['date'], format='%Y-%m-%d')
        arranged_df['time'] = pd.to_datetime(arranged_df['time'], format='%H:%M:%S.%f')#.dt.time
        
        return arranged_df.reset_index(drop = True)

In [4]:
def mean_without_None(arr):
    from numpy import average
    return average([x for x in arr if x != None])

In [5]:
def missing_data_dummy_means(df):
    """
    A dummy process is defined to fill in empty values.
    The mean is calculated from values of the previous and following week at the same hour.
    
    In case there are no values before or after, 
    a special UDF `mean_without_None` has been defined to deal with None values.
    """
    
    # Filling in: `station`
    station_num = df[df["station"] != None]["station"][0]
    df["station"] = station_num

    # Filling in: `date`, `time`
    tmp_date = df["datetime"].str.split(" ", n = 2, expand = True)
    df["date"] = pd.to_datetime(tmp_date[0], format='%Y-%m-%d')
    df["time"] = tmp_date[1]

    # After splitting datetime string, it is formatted back to datetime type to use timedelta.
    df["datetime"] = pd.to_datetime(df["datetime"], format='%Y-%m-%d %H:%M:%S.%f')

    # Creating copies of df
    df_wb = df.copy()
    df_wb["datetime"] = [i + timedelta(days=7) for i in df["datetime"]]

    df_wa = df.copy()
    df_wa["datetime"] = [i - timedelta(days=7) for i in df["datetime"]]

    # Dataframe with dates to iterate through
    dates_to_fillup = list(df[df["PM25"].isnull()]["datetime"])

    # Columns to iterate through 
    cols_to_fillup = ['pressure', 'temp', 'precip', 'wind_northward', 'wind_eastward', 'O3', 'NO2', 'PM25']

    # Update
    for i in dates_to_fillup:
        for j in cols_to_fillup:
            df.at[list(df[df["datetime"] == i].index),j]=mean_without_None([df_wb[df_wb["datetime"] == i][j].values[0], df_wa[df_wa["datetime"] == i][j].values[0]])
            
    return df

In [11]:
def full_ts_data_preparation(folder_raw_data, station_number):
    # ------------------
    # Importing Data
    # ------------------
    print("Data preparation process for {}".format(station_number))
    
    # Concatenating raw data from multiple JSON files
    files_list = list(glob.iglob('{}{}*.json'.format(folder_raw_data, station_number)))

    # Defining empty list of dictionaries
    raw_data = list(dict())

    # Extending list of daily dictionaries
    for i in files_list:
        tmp = json.load(open(i))
        # Adding column "file_name"
        tmp = list(map(lambda item: dict(item, file_name=i[len(folder_raw_data):]), tmp))
        raw_data.extend(tmp)

    col_names = list(raw_data[0].keys())

    raw_df = pd.DataFrame(raw_data, columns=col_names)

    # ------------------
    # Data Processing
    # ------------------

    # Initialization of processing class
    main_processed_df = raw_data_to_time_series(raw_df)

    # Execution of processing functions
    processed_df = main_processed_df.time_series_df()

    # ------------------
    # Missing data 
    # ------------------
    update_df = missing_data_dummy_means(processed_df)
    
    return update_df.reset_index(drop = True)

# Data

The data returned will have the following units:

|Columns|Description|
|---	|---	|
|created|Data point creation date|
|date   |Measurement time|
|station|Meteorological station number|
|pressure|Surface Pressure (hPa)|
|temp|Temperature (K)|
|precipitation|Precipitation (mm)|
|wind|Wind (ms^-1)|
|gas|Including O3, NO2, PM2.5 (ug/m3)|

# Process

In [34]:
folder_raw_data = 'RawData/'
station_number = 's71'

df = full_ts_data_preparation(folder_raw_data, station_number)
df.to_csv('TimeSeriesFiles/{}_ts.csv'.format(station_number), encoding='utf-8', index=False)

Data preparation process for s71


In [28]:
df.describe()

Unnamed: 0,pressure,temp,precip,wind_northward,wind_eastward,O3,NO2,PM25
count,8784.0,8784.0,8784.0,8784.0,8784.0,8784.0,8784.0,8784.0
mean,893.285809,293.671323,41.921143,-1.243791,-0.197175,41.282197,15.109253,0.0
std,4.065992,6.689914,318.338548,1.58507,0.978695,22.505106,4.448817,0.0
min,879.658386,270.919312,0.0,-5.9367,-4.0243,-0.3168,-0.1301,0.0
25%,890.664337,289.613647,0.0,-2.257325,-0.864475,23.285376,12.300625,0.0
50%,893.267792,294.030991,0.0,-1.29895,0.043,36.388149,15.39645,0.0
75%,895.803528,298.560028,3.2857,-0.288625,0.551,57.51965,17.876976,0.0
max,908.806091,309.545593,8224.487305,7.2621,1.9575,121.299202,33.250801,0.0


In [17]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8880 entries, 0 to 8879
Data columns (total 12 columns):
 #   Column          Non-Null Count  Dtype         
---  ------          --------------  -----         
 0   datetime        8880 non-null   datetime64[ns]
 1   date            8880 non-null   datetime64[ns]
 2   time            8880 non-null   object        
 3   station         8880 non-null   object        
 4   pressure        8784 non-null   float64       
 5   temp            8784 non-null   float64       
 6   precip          8784 non-null   float64       
 7   wind_northward  8784 non-null   float64       
 8   wind_eastward   8784 non-null   float64       
 9   O3              8784 non-null   float64       
 10  NO2             8784 non-null   float64       
 11  PM25            8784 non-null   float64       
dtypes: datetime64[ns](2), float64(8), object(2)
memory usage: 832.6+ KB


# Relevant sources

Monterrey Raw Data:
* Obispado: http://gmao-aq-prod-1707436367.us-east-1.elb.amazonaws.com/station/32
* San Pedro: http://gmao-aq-prod-1707436367.us-east-1.elb.amazonaws.com/station/71
* La Pastora: http://gmao-aq-prod-1707436367.us-east-1.elb.amazonaws.com/station/31

Raw data information:
* https://resourcewatch.org/data/explore/cit004rw0-City-AQ-Forecasts?hash=further_information&section=Discover&selectedCollection=&zoom=0.8217739819705677&lat=0&lng=120.5424929385674&pitch=0&bearing=0&basemap=dark&labels=light&layers=%255B%257B%2522dataset%2522%253A%2522f5599d62-7f3d-41c7-b3fd-9f8e08ee7b2a%2522%252C%2522opacity%2522%253A1%252C%2522layer%2522%253A%252249ecb5d5-c9b0-4d4e-b049-aa872dd1a084%2522%257D%255D&aoi=&page=1&sort=most-viewed&sortDirection=-1
* https://airquality.gsfc.nasa.gov/cityaq-pilot-combine-local-monitoring-data-geos-cf-model-outputs-develop-city-scale-operational

Python Documentation:
* https://www.geeksforgeeks.org/read-json-file-using-python/
* https://docs.python.org/3/library/datetime.html#strftime-and-strptime-behavior

Papers:
* https://www.intechopen.com/chapters/74059
* https://www.sciencedirect.com/science/article/pii/S0167739X21003794
* https://www.researchgate.net/publication/335752400_Filling_the_gaps_of_in-situ_hourly_PM25_concentration_data_with_the_aid_of_empirical_orthogonal_function_constrained_by_diurnal_cycles


Others (Books, forums)
* https://www.researchgate.net/post/How-can-I-fill-the-missing-climatological-data
* https://www.fao.org/3/x0490e/x0490e07.htm