In [1]:
from library import isfloat
from library import modify_station_code
import os
import re
import pandas as pd
from pandas import Series
from pandas import DataFrame
import json
from io import StringIO
from pymongo import MongoClient
import pymongo
import time
client = MongoClient('127.0.0.1', 27017)

aq_db = client['air_quality_model_hkust']
aq_collection_name = 'air_quality_model_hkust_enrich'
aq_collection = aq_db[aq_collection_name]
aq_station_collection = aq_db['aqi_station']


weather_db = client['weather_hkust']
sub_hour_weather_collection_name = 'subhour_weather_hkust'
sub_hour_weather_collection = weather_db[sub_hour_weather_collection_name]
subhour_weather_station = weather_db['subhour_weather_station']

fusion_db = client['parise_hk_fusion_db']


weather_schemas = ['relative_humidity', 'temperature', 'wind_speed', 'wind_direction']
aq_schemas = ['AQHI', 'AQI', 'CO', 'NO2', 'NOX', 'O3', 'PM10', 'PM2_5', 'SO2']

def isNaN(num):
    return num != num

In [2]:
class FusionDB:
    """
    This is a class consists of multiple static functions to create fusion database from different datasource.
    
    Collection(subhour_weather_hkust): store the weather data collected subhourly;
    Collection(subhour_weather_station): store the weather station, station code, station location;
    
    The weather with different parameter(wind, humudity, temperature) and from different stations are stored in different files,
    so this class first merge the data of same time and in same station together, and then store them into the database.  
    
    Further improvement, aggregate the data by time (every five minutes)
    """
    @classmethod
    def find_nearby_stations_by_type(self, lat,lon, distance, para = 'weather'):
        """
        This is a function initialize the collection of weather records
        :param station_code: code of station, should be of string type, specified in forecast_station_config
        :return: forecast data of the site
        """
        stations = []
        station_collection = subhour_weather_station if para == 'weather' else aq_station_collection if para == "aq" else None
        if station_collection == None:
            print('Error parameter para', para)
            return None
        for r in station_collection.find({
            'loc': {
                '$near': {
                    '$geometry': {
                        'type': "Point",
                        'coordinates': [lon, lat]
                    },
                    '$maxDistance': distance
                }
            }

        }):
            if "_id" in r:
                del r['_id']
            stations.append(r['station_code'])
            
        return stations
        
        
    @classmethod
    def find_nearby_stations(self, lat, lon, distance):
        # from pymongo import MongoClient
        aqi_stations = self.find_nearby_stations_by_type(lat, lon, distance, 'aq')
        weather_stations = self.find_nearby_stations_by_type(lat, lon, distance, 'weather')
        return {'AQI': aqi_stations, 'weather': weather_stations}
    
    @classmethod
    def aggregate_records(self, records, data_type = 'weather'):
        """
        data_type: AQI or Weather
        aggregation: {feature:{"sum":xx, "number"}}

        """
        all_schemas = weather_schemas if data_type == 'weather' else aq_schemas
        schema_map = {}
        for schema in all_schemas:
            schema_map[schema] = {'sum': 0, 'num': 0}
        for record in records:
            for schema in record:
                if schema not in schema_map:
                    continue

                value = record[schema]['obs'] if (type(record[schema]) == dict) else record[schema]
                
                if isfloat(value):
                    if not isNaN(value):
                        schema_map[schema]['sum'] += float(value)
                        schema_map[schema]['num'] += 1
                elif value != None:
                    print("Error value", value)

        output_schema = {}
        for schema in schema_map:
            schema_obj = schema_map[schema]
            output_schema[schema] = schema_obj['sum'] / schema_obj['num'] if schema_obj['num'] != 0 else None
        return output_schema
    
    
    @classmethod 
    def find_weather_aq_records(self, station_code, start_time, end_time, para = "weather"):
        collection = sub_hour_weather_collection if para == 'weather' else aq_collection if para == 'aq' else None
        records = collection.find({
            'station_code': station_code,
            'time': {
                '$gte': start_time,
                '$lte': end_time
            }
        })
        records = list(records)
        for r in records:
            if "_id" in r:
                del r['_id']
        return records
    
    
    @classmethod
    def query_spatial_temporal_record(self, lat, lon, distance, start_time, end_time):
        
        station_obj = self.find_nearby_stations(lat = lat, lon = lon, distance = distance)
        AQI_stations = station_obj['AQI']
        weather_stations = station_obj['weather']

        weather_records = []
        for station in weather_stations:
            weather_records += self.find_weather_aq_records(station_code=station, start_time = start_time, end_time = end_time, para = 'weather')  
        weather_aggregation = self.aggregate_records(weather_records, data_type='weather')


        AQI_records = []
        for station in AQI_stations:
            AQI_records += self.find_weather_aq_records(station_code=station, start_time = start_time, end_time = end_time, para = 'aq')
                    
        AQI_aggregation = self.aggregate_records(AQI_records, data_type='aqi')
        return {"AQI": AQI_aggregation, "weather": weather_aggregation}
        
   

    @classmethod
    def query_get_station_config(self, para = 'weather'):
        station_map = {}
        stations = subhour_weather_station if para == 'weather' else aq_station_collection if para == 'aq' else None
        for station in stations.find():
            station_code = station['station_code']
            if '_id' in station:
                del station['_id']
            m_station_code = modify_station_code(station_code)
            
            if m_station_code not in station_map:
                station_map[m_station_code] = station
        return station_map

    @classmethod 
    def query_spatial_temporal_record_by_station_code(self, station_code, distance, start_time, end_time):
        station_config = self.query_get_station_config('aq')
        lat = None
        lon = None
        station_type = None
        if station_code in station_config:
            [lon, lat] = station_config[station_code]['loc']
            station_type = 'AQI'

        station_config = self.query_get_station_config('weather')
        if station_code in station_config:
            [lon, lat] = station_config[station_code]['loc']
            station_type = 'weather'

        result = self.query_spatial_temporal_record(lat=lat, lon=lon, distance=distance, start_time=start_time, end_time=end_time)

        return result
    
    @classmethod
    def generate_aggregation_collection(self, output_collection_name, pre_time = 1800, after_time = 1800, distance = 5000):
        # from pymongo import MongoClient

        output_collection = fusion_db[output_collection_name]

        output_collection.remove({})

        insert_cache = []
        process_number = 0
        start_time = time.time()
        for record in aq_collection.find().sort('time'):
            aq_station_code = record['station_code']
            current_time = record['time']
            
            # Time range +,- 30min; distance: 5000
            aggregation_result = self.query_spatial_temporal_record_by_station_code(aq_station_code, 5000, current_time - pre_time, current_time + after_time)
     
            process_number += 1
            if process_number % 100 == 0:
                print(process_number)

            del record['_id']
            new_record = {}
            for schema in record:
                if schema not in aq_schemas:
                    new_record[schema] = record[schema]
            new_record['station_type'] = 'AQI'
            new_record['aggregation_AQI'] = aggregation_result['AQI']
            new_record['aggregation_weather'] = aggregation_result['weather']
            new_record['station_record'] = record
            insert_cache.append(new_record)
            if len(insert_cache) == 100:
                output_collection.insert_many(insert_cache)
                insert_cache = []
                end_time = time.time()
                print(process_number, end_time - start_time)
                start_time = end_time
        if len(insert_cache) != 0:
            output_collection.insert_many(insert_cache)


In [132]:
if __name__ == "__main__":
    FusionDB.generate_aggregation_collection('aqi_aggregation_hkust_5000_1800_0_subhour', pre_time = 1800, after_time = 0, distance = 5000)



100
100 1.5012235641479492
200
200 1.56355881690979
300
300 1.2433967590332031


KeyboardInterrupt: 