Skip to content

Commit

Permalink
feat(ON-683): importer script for climatetrace data
Browse files Browse the repository at this point in the history
  • Loading branch information
lgloege committed Aug 25, 2023
1 parent e4be923 commit 7a89d6b
Showing 1 changed file with 74 additions and 55 deletions.
129 changes: 74 additions & 55 deletions global-api/importer/climatetrace_importer.py
Original file line number Diff line number Diff line change
@@ -1,82 +1,101 @@
import sys
import os

current_dir = os.path.dirname(os.path.abspath(__file__))
project_dir = os.path.dirname(current_dir)
sys.path.append(project_dir)

import argparse
from schema.asset import Asset
import csv
import logging
import math
import os
import pandas as pd
from pathlib import Path
from sqlalchemy import create_engine
from sqlalchemy import create_engine, insert, MetaData, Table
from sqlalchemy.orm import sessionmaker
from lat_lon_to_locode import point_to_locode, point_to_lat_lon

def record_generator(fl):
"""returns a generator for the csv file"""
with open(fl, "r") as file:
csv_reader = csv.DictReader(file)
for row in csv_reader:
yield row

def not_nan_or_none(value):
"""return true if value is not nan, none, or empty"""
if isinstance(value, float | int):
return not math.isnan(value)
return value is not None and value != ""


def number_records_in_file(fl):
"""returns number of records in csv file"""
with open(fl, 'r', newline='') as file:
next(file)
row_count = sum(1 for _ in file)
return row_count
def not_zero(value):
"""return true is value is not number or string 0"""
return not (value == 0 or value == "0")


if __name__ == '__main__':
def record_generator(fl):
"""returns a generator for the csv file"""
df = pd.read_csv(fl)
for _, row in df.iterrows():
yield row.to_dict()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--user', help='database user', default=os.environ.get("DB_USER"))
parser.add_argument('--password', help='database password', default=os.environ.get("DB_PASSWORD"))
parser.add_argument('--host', help='database host', default=os.environ.get("DB_HOST"))
parser.add_argument('--port', help='database host', default=os.environ.get("DB_PORT"))
parser.add_argument('--dbname', help='database name', default=os.environ.get("DB_NAME"))
parser.add_argument('--file', help='path to file to import')
parser.add_argument('--refno', help='GPC reference number')
parser.add_argument(
"--database_uri",
help="database URI (e.g. postgresql://ccglobal:@localhost/ccglobal)",
default=os.environ.get("DB_URI"),
)
parser.add_argument("--file", help="path to file to import")
parser.add_argument("--refno", help="GPC reference number")
parser.add_argument(
"--log_file", help="path to log file", default="./climatetrace_importer.log"
)
args = parser.parse_args()

database_uri = f"postgresql://{args.user}:{args.password}@{args.host}:{args.port}/{args.dbname}"
TONNES_TO_KG = 1000 # 1 tonne == 1000 kg

engine = create_engine(database_uri)
logging.basicConfig(
filename=args.log_file,
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)

engine = create_engine(args.database_uri)
metadata_obj = MetaData()
Session = sessionmaker(bind=engine)
session = Session()

fields = [col.name for col in Asset.__table__.columns]

refno = args.refno
asset = Table("asset", metadata_obj, autoload_with=engine)
fields = [col.name for col in asset.columns]

fl = os.path.abspath(args.file)
path = Path(fl)
n_records = number_records_in_file(fl)
generator = record_generator(path)

logging.basicConfig(filename=f'progress_{path.stem}.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

record_counter = 0
for record in generator:
record_counter = record_counter + 1
if record_counter % 10000 == 0:
logging.info(f"Processing record {record_counter}/{n_records}")
if record.get('emissions_quantity') != '0':
# remove keys with empty values
record = {key: value for key, value in record.items() if value != ''}

# add keys filename and reno to record
record['filename'] = path.stem
record['reference_number'] = refno
logging.info(f"Processing row num: {record_counter}")

# only keep keys if they are fields in the database
asset_data = {key: record.get(key) for key in record.keys() if key in fields}
if not_zero(record["emissions_quantity"]):
# convert emissions from tonnes to kg
record["emissions_quantity"] *= TONNES_TO_KG
record["emissions_quantity_units"] = "kg"

# create asset object and add/commit it to db
asset = Asset(**asset_data)
session.add(asset)
session.commit()
# get the locode and coordinate points
locode = point_to_locode(session, record["st_astext"])
coord_dic = point_to_lat_lon(record["st_astext"])
lat, lon = coord_dic["lat"], coord_dic["lon"]

session.close
logging.info("Processing completed.")
# add keys filename and reno to record
record["locode"] = locode
record["lat"] = lat
record["lon"] = lon
record["filename"] = path.stem
record["reference_number"] = args.refno

# remove keys with nan, none, and empty values
record = {
key: value for key, value in record.items() if not_nan_or_none(value)
}

table_data = {
key: record.get(key) for key in record.keys() if key in fields
}
ins = asset.insert().values(**table_data)

with engine.begin() as conn:
conn.execute(ins)

logging.info("Done!")
session.close()

0 comments on commit 7a89d6b

Please sign in to comment.