In [1]:
import geopandas as gpd
import pandas as pd
import osmnx as ox
from shapely.geometry import LineString, Point, Polygon
#from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
import fiona
import rasterio
from rasterio.plot import show, show_hist
from rasterio.merge import merge
from osgeo import gdal, ogr, osr
from rasterio.mask import mask

import os 

idx = pd.IndexSlice

# To get the current working directory use
# os.getcwd()

## Defining a set of functions to both extract and process data

In [None]:
# function that extracts polygon geometries from OSM and recoordinate them to EPSG:25833:
def building_polygon(city):
    
    # extracting Oslo boundary
    boundary = ox.geocode_to_gdf(city)
    
    # Extracting data associated with building and geometry
    building_geom = ox.features_from_polygon(boundary.geometry[0],tags = {'building': True})[['geometry', 'building']]
    
    # filtering only polygon data out from the oslo data
    return building_geom.loc[building_geom.geometry.geom_type=='Polygon'].to_crs('epsg:25833')
    

# Overlay building semantic data to OSM data
def overlay_osm_2_semdata(building_polygon,semdata):
    oslo_bygg = building_polygon.sjoin(semdata, how='left').droplevel(0)
    
    # dropping "Element_type" index and & extracting building types representing residential buildings & drop duplications
    return oslo_bygg[oslo_bygg['NÆRING']=='Bolig'].reset_index().drop_duplicates(subset='osmid').set_index('osmid')

# Loading building height data (DEM) data from Kartverket (https://hoydedata.no/LaserInnsyn2/) and calculating Canopy Height Model (CHM)
def data_coverage(building_polygon,semdata,overlay_osm_2_semdata):
    # doing checks on OSM
    osm_b = len(building_polygon['building'].index) - building_polygon['building'].isnull().sum()
    osm_b_ind = len(building_polygon['building'].index)
    if building_polygon['building'].isnull().sum() == 0:
        print('All buildings in '+ city+ ' are labelled in OSM')
    else:
        print(f'\033[1m{osm_b}\033[0m' + '  buildings out of ' + f'\033[1m{osm_b_ind}\033[0m' + ' in '+ city+ ' are labelled in OSM (missing info = ' + str(osm_b_ind-osm_b)+ ')')
        
    osm_g = len(building_polygon['geometry'].index) - building_polygon['geometry'].isnull().sum()
    osm_g_ind = len(building_polygon['geometry'].index)
    if building_polygon['geometry'].isnull().sum() == 0:
        print('All building geometries in '+ city + ' have geometry in OSM')
    else:
        print(f'\033[1m{osm_g}\033[0m' + ' building geometries out of '+ f'\033[1m{osm_g_ind}\033[0m' + ' in '+ city + ' have geometry in OSM (missing info = ' + str(osm_g_ind-osm_g)+ ')')
        
    # doing check on the attained semantic data
    år = len(semdata['ÅR'].index) - semdata['ÅR'].isnull().sum()
    år_ind = len(semdata['ÅR'].index)
    if semdata['ÅR'].isnull().sum() == 0:
        print('All buildings in '+ city + ' have construction year values')
    else:
        print(f'\033[1m{år}\033[0m' + ' buildings out of ' + f'\033[1m{år_ind}\033[0m' + ' in '+ city + ' have construction year values (missing info = ' + str(år_ind-år)+ ')')
        
    building_type = len(semdata['NÆRING'].index) - semdata['NÆRING'].isnull().sum()
    building_type_ind = len(semdata['NÆRING'].index)
    if semdata['NÆRING'].isnull().sum() == 0:
        print('All buildings in '+ city + ' are annotated by their types')
    else:
        print(f'\033[1m{building_type}\033[0m' + ' buildings out of ' + f'\033[1m{building_type_ind}\033[0m' + ' in '+ city + ' are annotated by their types (missing info = ' + str(building_type_ind-building_type)+ ')')
        
    # finding differences between OSM and the attained semantic data
    osm_semdata = len(building_polygon['geometry'].index) - len(semdata['geometry'].index)
    if osm_semdata == 0:
        print('OSM and the attained building semantic data in' + city + ' cover the same number of buildings')
    else:
        print(f'\033[1m{osm_semdata}\033[0m' + ' buildings are not cover by the attained building semantic data')
        
# Here, the DEM data are extracted manually from the web service. However, in the future, it might ...
# worthwhile finding an API solution to avoid manual data extraction routines.

# Reading and concatination of DEM files
def DEM_file(read_location,write_location):
    # get list of DEM files in the following directory
    fd = os.path.join(upath, read_location)
    files = []
                      
    
    for fn in os.listdir(fd):
        if os.path.isfile(os.path.join(fd, fn)) and fn.split('.')[-1] == 'tif':
            files.append(os.path.join(fd,fn))
                      
    dem_raster_to_mosaic = []
    
    for p in files:
        dem_raster_to_mosaic.append(rasterio.open(p)) 
    
    mosaic, output = merge(dem_raster_to_mosaic)
    
    print('done')

    out_meta = rasterio.open(p).meta.copy()
    out_meta.update({"driver": "GTiff",
                     "height": mosaic.shape[1],
                     "width": mosaic.shape[2],
                     "transform": output,
                     "crs": "+proj=utm +zone=33 +ellps=GRS80 +units=m +no_defs "}
                     )
    
    with rasterio.open(os.path.join(upath,
                                    write_location),
                       "w", **out_meta) as dest:
        dest.write(mosaic)

# calculating Canopy Height Model (CHM)        
def CHM(dsm_read,dtm_read,write_location):
    
    chm = rasterio.open(dsm_read).read(1, masked=True) - rasterio.open(dtm_read).read(1, masked=True)
    
    out_meta = rasterio.open(dtm_read).meta.copy()
    
    with rasterio.open(dtm_read) as src:
        output = src.transform[0:6]
    
    out_meta.update({"driver": "GTiff",
                     "height": chm.shape[0],
                     "width": chm.shape[1],
                     "transform": rasterio.Affine(output[0],output[1],output[2],output[3],output[4],output[5]),
                     "crs": "+proj=utm +zone=33 +ellps=GRS80 +units=m +no_defs "}
                     )

    # writing the CHM tif file and appending DTM metadata to it 
    with rasterio.open(write_location, 
                       "w", **out_meta) as dest:
        dest.write(chm,1)
        
    
# cropping out building from CHM data
def create_mask_from_shp(overlay_osm_2_semdata,read_tif,write_to):
    # open shapefile
    shapfile = overlay_osm_2_semdata['geometry']
    
    # open rasterfile
    with rasterio.open(read_tif, 'r') as src:
        out_image, out_transform = mask(src, shapfile, crop=False)
        out_meta = src.meta
   
    out_meta.update({"drive": "GTiff",
                     "height": out_image.shape[1],
                     "width": out_image.shape[2],
                     "transform": out_transform,
                     "crs": "+proj=utm +zone=33 +ellps=GRS80 +units=m +no_defs "})
    
    # writing the tif file and appending metadata to it 
    out_file = write_to
    
    with rasterio.open(out_file, "w", **out_meta) as dest:
        dest.write(out_image)
        
def building_height(overlay_osm_2_semdata,read_tif,write_shp):
    
    shapfile = overlay_osm_2_semdata
    
    with rasterio.open(read_tif, 'r') as src:
        
        for index, row in shapfile.iterrows():
        # a conditional statement testing if the geometry of each building overlaps with the canapy .tif file
            try:
                out_image, out_transform = mask(src, [shapfile.loc[idx[index],'geometry']], crop=True)
                shapfile.loc[idx[index],'height'] = np.mean(out_image[out_image>0])
                shapfile.loc[idx[index],'2D_area'] = shapfile.loc[idx[index],'geometry'].area
            except ValueError:
                continue
            
        return shapfile[shapfile['height']>0].to_file(write_shp)

# calculating shared walls between buildings
def Shared_wall(read_shp,write_shp):

    building = gpd.read_file(read_shp)
    
    for index, row in building.iterrows():
            
        # get 'not disjoint' buildings
        neighbors = building[~building.geometry.disjoint(row.geometry)].osmid.tolist()
        
        # remove own building ID of the building ID from the list
        neighbors = [builid for builid in neighbors if row.osmid != builid ]
        
        # add building ID of neighbors as NEIGHBORS value
        building.at[index, "NEIGHBORS"] = str(neighbors)
        
        # calculating shared walls with neighboring buildings
        wall_length = [row.geometry.intersection(building[building['osmid'] == i].geometry.iloc[0]).length for i in neighbors]
        
        # add lengths of shared walls of neighbors as WALL_LENGTH values
        building.at[index, "WALL_LENGTH"] = str(wall_length) 
        
        # extracting height of each shared wall with neighboring buildings
        wall_height = [building[building['osmid']==i].height.iloc[0] if row.height>=building[building['osmid']==i].height.iloc[0] else row.height for i in neighbors]
        
        # add heights of shared walls of neighbors as WALL_HEIGHT values
        building.at[index, "WALL_HEIGHT"] = str(wall_height)
        
        
        shared_wall = [wall_length[i]*wall_height[i] for i in range(len(neighbors))]
        building.at[index, "SHARED_WALL"] = sum(shared_wall)

    return building.to_file(write_shp)
        
# calculating of energy needed for heating
def QH_calc(read_file,write_file):
    
    # extracting parts of TABULA spreadsheet
    buil_set = pd.read_excel(os.path.join(upath, r'TABULA/subset_TABULA.xlsx'))
    
    # removing rows that don't represent a country 
    buil_set = buil_set[buil_set['Code_Country'].notna()]
    buil_set = buil_set[buil_set['Description_BuildingVariant'].notna()]
    buil_set = buil_set[buil_set.Code_Country != 'XX']

    # create a multiindex 
    buil_set.set_index(['Code_Country',
                        'Code_DataType_Building',
                        'Name_ClimateRegion',
                        'Description_BuildingVariant',
                        'Code_BuildingSizeClass',
                        'Year1_Building',
                        'Year2_Building'], 
                        inplace = True)

    buil_set = buil_set.sort_index(level=[0,1,2,3,4])
    
    
    bolig = gpd.read_file(read_file).set_index('osmid')

    # considering only building containing building age attribute are filtered out
    bolig = bolig[bolig['ÅR'].notna()]

    # country
    CO = 'NO'
    # Real example extracted from TABULA to enssure that they contain representative ...
    # architectural features
    Typo = 'ReEx'
    # climate
    Cl = 'National'
    # description of building condition
    COND = 'As built'

    # structing representative index by extracting building information
    for index, row in bolig.iterrows():
        # print(index)
        for i,j in bygg_dic.items():
            if int(row['BYGGTYPEKO']) in j:
                B_type = i

        # Building year
        Y = row['ÅR']


        for a,b in buil_set.loc[idx[CO,Typo,Cl,COND,B_type]].index:
            if a <= Y and b >= Y:
                y0 = a
                y1 = b
                i = CO,Typo,Cl,COND,B_type,y0,y1
                # print(i)
            elif a <= Y:
                y0 = a
                y1 = b
                i = CO,Typo,Cl,COND,B_type,y0,y1
                # print(i)
                break

       # estimated air conditioned area of a building is assumed to be 0.85 times of a building blueprint
        A_C_est = 0.85*bolig.loc[idx[index],['2D_area']].values

        # it is assumed that the total area of roof and floor is the building blueprint
        A_roof_est = 0.85*bolig.loc[idx[index],['2D_area']].values
        A_floor_est = 0.85*bolig.loc[idx[index],['2D_area']].values

        # according to Veillette et al. "window-to-wall ratio (WWR)" is assumed to be 0.13.
        # doi: https://doi.org/10.3389/frsc.2021.700794  
        # besides, for the time being, it is assumed that 75% of a building wall is shared with neighboring building
        A_wall_est = (bolig.loc[idx[index]].geometry.length *
                      bolig.loc[idx[index],['height']].values) * (1-0.13) * 0.75
        A_window_est = (bolig.loc[idx[index]].geometry.length *
                      bolig.loc[idx[index],['height']].values) * 0.13 * 0.75

        # for the time being, we neglect the area of enterance doors to a building
        A_door_est = 0

        # calculating overall heat transfer coefficient

        Htr_roof = (buil_set.loc[idx[i],'U_Roof_1'].values* 
                    A_roof_est* 
                    buil_set.loc[idx[i],'b_Transmission_Roof_1'].values)
        Htr_floor = (buil_set.loc[idx[i],'U_Floor_1'].values* 
                     A_floor_est* 
                     buil_set.loc[idx[i],'b_Transmission_Floor_1'].values)
        Htr_wall = (buil_set.loc[idx[i],'U_Wall_1'].values* 
                    A_wall_est* 
                    buil_set.loc[idx[i],'b_Transmission_Wall_1'].values)
        Htr_window = (buil_set.loc[idx[i],'U_Window_1'].values* 
                      A_window_est* 
                      1.00)
        Htr_door = (buil_set.loc[idx[i],'U_Door_1'].values* 
                    A_door_est* 
                    1.00)

        # calculating thermal bridging: surcharge on the U-values

        Htr_tb = (buil_set.loc[idx[i],'delta_U_ThermalBridging_Original'].values*
                  (A_roof_est+ A_floor_est+ A_wall_est+ A_window_est+ A_door_est)*
                  1.00)

        # heat transfer coefficient by transmission
        Htr = Htr_roof + Htr_floor + Htr_wall + Htr_window + Htr_door + Htr_tb

        # heat transfer coefficient by ventilation
        Cp_air = 0.34 #Volume specefic heat capacity air (Wh/m3K)
        Hve = (Cp_air* 
               (buil_set.loc[idx[i],'n_air_use'].values+
               buil_set.loc[idx[i],'n_air_infiltration'].values)*
               A_C_est *
               buil_set.loc[idx[i],'h_room'].values
              )


        # calculating internal temp.
        if buil_set.loc[idx[i],'theta_i'].values >=0:
            int_tmp = buil_set.loc[idx[i],'theta_i'].values

        else:
            int_tmp = (buil_set.loc[idx[i],'theta_i_htr1'].values +
                       (Htr - 1) /
                       (4-1)*
                       (buil_set.loc[idx[i],'theta_i_htr4'].values -
                        buil_set.loc[idx[i],'theta_i_htr1'].values)
                      )

        # accumulated differences between internal and external temperature (Kd/a)
        tmp = (int_tmp - buil_set.loc[idx[i],'Theta_e'].values) * buil_set.loc[idx[i],'HeatingDays'].values

        # calculating temperature reduction factor
        if Htr/A_C_est <= 1:
            Fred = min((buil_set.loc[idx[i],'F_red_htr1'].values + (1-597/159.4)/0.5*(1-buil_set.loc[idx[i],'F_red_htr1'].values),1))
        elif Htr/A_C_est >= 4:
            Fred = buil_set.loc[idx[i],'F_red_htr4'].values
        else:
            Fred = (buil_set.loc[idx[i],'F_red_htr1'].values + (Htr/A_C_est - 1) * 
                    (buil_set.loc[idx[i],'F_red_htr4'].values - buil_set.loc[idx[i],'F_red_htr1'].values) /
                    (4 - 1))

        # total heat transfer (kWh/a)
        Qht = (Htr + Hve) * Fred * (tmp * 0.024) 

        # calculating share of windows per building side

        # at this stage it is assumed that 1/4 of a window is on either North, South, East, or West.

        # Solar heat load during heating season Qsol

        for w in ['Hor','East','South','West','North']:
            Qsol = 0

            if w == 'Hor':
                Qsol += (buil_set.loc[idx[i],'F_sh_hor'].values *
                         (1 - buil_set.loc[idx[i],'F_f'].values) *
                         buil_set.loc[idx[i],'F_w'].values *
                         buil_set.loc[idx[i],'g_gl_n_Window_1'].values *
                         (0) * 
                         buil_set.loc[idx[i],('I_Sol_'+w)].values
                        )

            else:
                Qsol += (buil_set.loc[idx[i],['F_sh_vert']].values *
                         (1 - buil_set.loc[idx[i],['F_f']].values) *
                         buil_set.loc[idx[i],['F_w']].values *
                         buil_set.loc[idx[i],['g_gl_n_Window_1']].values *
                         (A_window_est*0.25) * 
                         buil_set.loc[idx[i],('I_Sol_'+w)].values
                        )

        # Internal heat source
        Qint = 0.024 * buil_set.loc[idx[i],'phi_int'].values * buil_set.loc[idx[i],'HeatingDays'].values * A_C_est

        # constant parameter aH
        aH = 0.8+((buil_set.loc[idx[i],'c_m'].values*A_C_est/(Htr+Hve))/30)

        # heat balance ratio
        lamd = (Qsol+Qint)/Qht

        # Gain utilization factor for heating
        mu = (1-lamd**aH)/(1-lamd**(aH+1))

        # energy needed for heating
        QH = Qht - mu*(Qsol + Qint)
        bolig.loc[idx[index],['QH_(kWh/a)']] = QH[0][0]
        bolig.loc[idx[index],['QH_(kWh/m2a)']] = QH[0][0]/A_C_est
        
    return bolig.to_file(write_file)

### Dictionaries

In [None]:
# dictionary of building types based on GeoNorge code variables
# https://objektkatalog.geonorge.no/Objekttype/Index/EAID_929B49A8_4688_4119_B6BA_D18F870601CC

bygg_dic = {
    'SFH' : [
        111,112,
    ],
    'TH' : [
        131,133,135
    ],
    'AB' : [
        136,121,141,142,143,144,145,146,152,122,124,523
    ]
}

## A list of parameters needed to be inserted first in order to perform the analysis

In [None]:
# Load building semantic data & reproject the coordinate to epsg:25833
semdata = gpd.read_file(os.path.join(upath,r'oslo_kommune/bygg/bygg_140623.shp')).to_crs('epsg:25833')

city = input('Insert a city name: ')

## Running the model based on the inserted parameters

In [None]:
%%time
data_coverage(building_polygon(city),
              semdata,
              overlay_osm_2_semdata(building_polygon(city),semdata))

### no need to re-run the following scripts

In [None]:
%%time
# creating a mosaic out of DSM dataset 
DEM_file(os.path.join(upath,(r'dom/data')),
         os.path.join(upath,(r'dem/oslo_dsm_mosaic.tif')))

In [None]:
%%time
# creating a mosaic out of DTM dataset
DEM_file(os.path.join(upath,(r'dtm/data')),
         os.path.join(upath,(r'dem/oslo_dtm_mosaic.tif')))

In [None]:
%%time
# creating CHM data
CHM(os.path.join(upath,(r'dem/oslo_dsm_mosaic.tif')),
    os.path.join(upath,(r'dem/oslo_dtm_mosaic.tif')),
    os.path.join(upath,(r'dem/oslo_chm_mosaic.tif')))

In [None]:
%%time
create_mask_from_shp(overlay_osm_2_semdata(building_polygon(city),semdata),
                     os.path.join(upath,(r'dem/oslo_chm_mosaic.tif')),
                     os.path.join(upath,(r'dem/oslo_chm_mask-building.tif')))

In [None]:
%%time
building_height(overlay_osm_2_semdata(building_polygon(city),semdata),
                os.path.join(upath,(r'dem/oslo_chm_mask-building.tif')),
                os.path.join(upath,(r'shapefile/oslo_building_height.shp')))

In [None]:
%%time
QH_calc(os.path.join(upath,r'shapefile/oslo_building_height.shp'),
        os.path.join(upath,r'shapefile/oslo_building-QH.shp'))

### Mapping the energy calculation results

In [None]:
gpd.read_file(r'/home/babakebrahimi/s3/data/shapefile/oslo_building-QH.shp').explore(column = 'QH_(kWh/m2',legend = True)

In [None]:
# Domestic hot water system


# energy needed for hot water
q_nd_w = AP31 CV #q_w_nd 
# energy losses due to distribution
q_d_w = AP119 DQ # q_d_w 
# energy losses due to storage
q_s_w = AP108 DP  # q_s_w 

# Heat generator output 
q_g_w_out = q_nd_w + q_d_w + q_s_w

# recoverable heat from hot water for space heating

# heat from distribution
q_d_w_h = AP120  DT # q_d_w_h
# heat from storage
q_s_w_h = AP109 DS # q_s_w_h

# recoverable heat
q_w_h = q_d_w_h + q_s_w_h

# energyware for domestic hot water

# share of energy mix for hot water
alfa_nd_w_i [(1-AP55+AP56),AP55,AP56] Fraction_Predefined_SysW_G_2,Fraction_Predefined_SysW_G_3 BS, BT
# expenditure factor
e_q_w_i = [AP93,AP94,AP95] e_g_w_Heat_1, e_g_w_Heat_2, e_g_w_Heat_3 DJ, DK, DL

# delivered energy
q_del_w_i = []

for i,j in enumerate([AP76,AP77,AP78]): Code_SysW_EC_1, Code_SysW_EC_2, Code_SysW_EC_3 BP, BQ, BR
    q_del_w_i = q_del_w_i.append(alfa_nd_w_i[i] * q_g_w_out * e_q_w_i[i])

# combined heat and power

# expenditure factor electricity generation
e_g_el_w_i = [AP96,AP97,AP98] e_g_w_Electricity_1, e_g_w_Electricity_2, e_g_w_Electricity_3 DM, DN, DO
# electricity production 
q_prod_el_w_i = [j/e_g_el_w_i[i] if e_g_el_w_i[i] > 0 else 0 for i,j in enumerate(e_g_el_w_i)]

q_del_w_aux = [0 if np.isnan(AP130) else AP130] q_del_w_aux DR
