In [None]:
#!/usr/bin/env python3

# https://wellstar-public.conservation.ca.gov/General/PublicDownloads/Index


import os
import sys
import csv
import json
import datetime
import logging

import requests
import MySQLdb

In [None]:
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

In [None]:
# Fetches the database password of the mysql database.
# You can either configuration your system with an OS
# ENV VAR called MYSQL_DB_PASSWD or you can create a json
# file called `config,json` in this directory and its content
# should look like: {"mysql_db_passwd": "my-passwordd"}
# Linux / Mac: (export MYSQL_DB_PASSWD=my-password)

def get_database_password():
    """
    Args = None

    Return: db_passwd<string> or None
    """
    
    config_json_file = "config.json"
    try:
        db_passwd = os.environ.get("MYSQL_DB_PASSWD")
        if db_passwd is not None:
            logger.info("Using MYSQL password from OS ENV VAR.")
            return db_passwd
        else:
            logger.debug("Didnt find MYSQL password in OS ENV VAR.")
            
    except KeyError as k_err:
        logger.debug("MYSQL_DB_PASSWD not found in OS ENV VARs. Checking config file.")
        
    except Exception as err:
        logger.error(f" Unknown error happened attempting to get os env var.: {err}")

    if os.path.exists(config_json_file):
        try:

            with open(config_json_file, 'r') as fp:
                json_data = json.loads(fp.read())
                db_passwd = json_data["mysql_db_passwd"]
                
                logger.info("Using MYSQL password from config.json file.")
                return db_passwd
            
        except KeyError as err:
            logger.debug("JSON schema failure for config.json. use: {'mysql_db_passwd': '<passwordhere>'}")
            return

        except Exception as err:
            logger.error(f"Unable to get mysql password from configuration file: {err}")
            return

    else:
        logger.debug("Unable to find configuration file.")
        return
    
# CalGEM Configuration
CALGEM_REPORTING_YEAR = "2022"
CALGEM_HOST = "calgem-pid.conservation.ca.gov"
DOWNLOAD_CALGEM_DATA = True
DROP_CALGEM_TABLES = False
DROP_LOOKUP_TABLES = True

# File Configuration
# FILENAMES
INJECTION_CSV_FILENAME = f"{CALGEM_REPORTING_YEAR}CaliforniaOilAndGasWellMonthlyInjection.csv"
PRODUCTION_CSV_FILENAME = f"{CALGEM_REPORTING_YEAR}CaliforniaOilAndGasWellMonthlyProduction.csv"
WELLS_CSV_FILENAME = f"{CALGEM_REPORTING_YEAR}CaliforniaOilAndGasWells.csv"

# FILE PATHS
INJECTION_CSV_FILE_PATH = os.path.join("data", INJECTION_CSV_FILENAME)
PRODUCTION_CSV_FILE_PATH = os.path.join("data", PRODUCTION_CSV_FILENAME)
WELLS_CSV_FILE_PATH = os.path.join("data", WELLS_CSV_FILENAME)

# Find and use MYSQL_DB_PASSWD
DB_PASSWD = get_database_password()
if not DB_PASSWD:
    logger.error("Couldnt successfully find a mysql database password. Quitting.")
    sys.exit(-1)

# Database Configuration
DB_CONFIG = {
    "DB_HOST": os.environ.get("DB_HOST") or "localhost",
    "DB_PORT": os.environ.get("DB_PORT") or 3306,
    "DB_USER": os.environ.get("DB_USER") or "appuser",
    "DB_PASSWD": DB_PASSWD, 
    "DB_NAME": os.environ.get("DB_NAME") or "calgem"
}

INJECTIONS_TABLE_NAME = "Injections"
PRODUCTION_TABLE_NAME = "Production"
WELLS_TABLE_NAME = "Wells"

In [None]:
# Connect to the MySQL database before we begin any of the data processing. 
# Our ultimate goal is to store all of the collected data from CalGEM in 
# the MySQL database, then without the connection we shouldnt continue.

def connnect_database(db_config) -> MySQLdb.Connection or None:
    """ 
    Args = None

    Connect to the mysql database with the defined connection string and
    authentication in the GLOBAL variables.

    Return: MySQLdb.connect or None
    """

    try:
        logger.info(" Connecting to MySQL database.")
        
        mysql_db = MySQLdb.connect(
            host=db_config['DB_HOST'], port=db_config['DB_PORT'], user=db_config['DB_USER'], 
            password=db_config['DB_PASSWD'], database=db_config['DB_NAME']
        )

        logger.info(" Database connected.")
        return mysql_db
    
    except MySQLdb.OperationalError as op_err:
        logger.error(f"Unable to establish a database connection: {op_err}")
        return None
    
    except Exception as err:
        logger.error(f"Unable to establish a database connection: {err}") 
        return None


db = connnect_database(db_config=DB_CONFIG)
cursor = db.cursor()

if db is None or cursor is None:
    logger.error("Unable to establish database connection. Quitting.")
    sys.exit(-1)

In [None]:
# This is the only function that calls the CAlGEM Webapp.
# This function fetches CSV data from CALGEM webapp based
# on the name of the CSV file we want to fetch.

def fetch_calgem_csv_file(CALGEM_HOSTNAME: str, FILENAME: str) -> True or None:
     """ 
     Args: CALGEM_HOSTNAME<string>, FILENAME<string>


     Return: True | None
     """
     if not CALGEM_HOSTNAME:
          logger.error("Invalid CALGEM_HOSTNAME configuration value. please update.")
          return
     if not isinstance(CALGEM_HOSTNAME, str):
          logger.error("Invalid CALGEM_HOSTNAME configuration value. please update.")
          return

     if not FILENAME:
          logger.error("Invalid FILENAME configuration value. please update.")
          return
     if not isinstance(FILENAME, str):
          logger.error("Invalid FILENAME configuration value. please update.")
          return

     logger.info("Calling CalGEM webapp..")
     
     url = f"https://{CALGEM_HOSTNAME}/pid/{FILENAME}"
     logger.info(f"calling: {url}")
     
     r = requests.get(
          url=url, 
          headers={"User-Agent": "Mozilla/5.0 (WIndows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101"})

     if r.status_code in [200, 201, 202]:
          with open(os.path.join('data', FILENAME), 'wb') as fp:
               fp.write(r.content)
          logger.info(f"Download & save of {FILENAME} csv file complete.")
          return True
     else:
          logger.error(f"Invalid status code received from CalGEM webapp: {r.status_code}")
          return

In [None]:
# Downloads the oil/gas/water production information from calgem 
# and stores it in a CSV file in a relative path to this file.

def fetch_and_store_new_calgem_csv(load_data: bool, calgem_hostname: str, csv_file_to_process: str) -> None:
    """ 
    Args = reload_data<string>, calgem_hostname<string>, csv_file_to_process<string>

    Return: None
    """

    if load_data not in [True, False]:
        logger.error("Invalid configuration for load_data argument. Must be: True|False.")
        return
    
    if load_data is not True:
        logger.error(f"Reloading CALGEM {csv_file_to_process} data disabled. Skipping download of new CSV file.")
        return

    get_new_csv = fetch_calgem_csv_file(
                CALGEM_HOSTNAME=calgem_hostname, FILENAME=csv_file_to_process)

    logger.info(f"{csv_file_to_process} file download complete.") if get_new_csv \
        else logger.error(f"Unknown error fetching {csv_file_to_process} file.")
    
    return


fetch_and_store_new_calgem_csv(
    load_data=DOWNLOAD_CALGEM_DATA, 
    calgem_hostname=CALGEM_HOST, 
    csv_file_to_process=PRODUCTION_CSV_FILENAME)

fetch_and_store_new_calgem_csv(
    load_data=DOWNLOAD_CALGEM_DATA, 
    calgem_hostname=CALGEM_HOST, 
    csv_file_to_process=INJECTION_CSV_FILENAME)

fetch_and_store_new_calgem_csv(
    load_data=DOWNLOAD_CALGEM_DATA, 
    calgem_hostname=CALGEM_HOST, 
    csv_file_to_process=WELLS_CSV_FILENAME)

In [None]:
# This function will drop the three mysql tables we are using to store the calgem information.

def drop_data_tables() -> None:
    """
    Args = None

    Return: None
    """
    if DROP_CALGEM_TABLES is not True:
        logger.info("Skipping table deletion before we insert data based on configuration.")
        return

    logger.info("Preparing to drop all of the needed tables, so we can reload data.")

    drop_injects_if_exists = f"DROP TABLE IF EXISTS {INJECTIONS_TABLE_NAME}"
    drop_prod_if_exists = f"DROP TABLE IF EXISTS {PRODUCTION_TABLE_NAME}"
    drop_wells_if_exists = f"DROP TABLE IF EXISTS {WELLS_TABLE_NAME}"


    cursor.execute(drop_injects_if_exists)
    cursor.execute(drop_prod_if_exists)
    cursor.execute(drop_wells_if_exists)
    db.commit()

    logger.info("Table drops have been completed.")
    return

drop_data_tables()

In [None]:
# Create a new mysql database table to store the well injection data.

def create_injections_table() -> None:
    """
    Args = None

    Return: None
    """

    logger.info(f"Creating new table `{INJECTIONS_TABLE_NAME}` in mysql.")

    create_table = f"""
    CREATE TABLE IF NOT EXISTS {INJECTIONS_TABLE_NAME} (
        RequestedDataYear int,
        CalGEMRecordEntryDate datetime,
        ReportType varchar(255),
        APINumber BIGINT,
        FieldCode int,
        AreaCode int,
        PoolCode int,
        WellTypeCode varchar(10),
        InjectionDate datetime,
        InjectionStatus varchar(255),
        GasAirInjected varchar(10),
        SteamWaterInjected varchar(10),
        DaysInjecting varchar(10),
        SurfaceInjectionPressure varchar(10),
        CasingInjectionPressure varchar(10),
        WaterSource varchar(10),
        ReportedOrEstimated varchar(255)
    );
    """

    cursor.execute(create_table)
    db.commit()

    logger.info("Table successfully created, if it didnt already exist.")
    return

create_injections_table()

In [None]:
# Create a new mysql database table to store the well production data.

def create_production_table() -> None:
    """
    Args = None

    Return: None
    """
    
    logger.info(f"Creating new table `{PRODUCTION_TABLE_NAME}` in MySQL.")

    create_table = f"""
    CREATE TABLE IF NOT EXISTS {PRODUCTION_TABLE_NAME} (
        RequestedDataYear int,
        CalGEMRecordEntryDate datetime,
        ReportType varchar(255),
        APINumber BIGINT,
        FieldCode int,
        AreaCode int,
        PoolCode int,
        WellTypeCode varchar(10),   
        ProductionReportDate datetime,
        ProductionStatus varchar(255),
        CasingPressure varchar(50),
        TubingPressure varchar(50),
        BTUofGasProduced varchar(50),
        MethodOfOperation varchar(100),
        APIGravityofOil varchar(50),
        WaterDisposition varchar(50),
        OilorCondensateProduced varchar(50),
        DaysProducing varchar(10),
        GasProduced varchar(50),
        WaterProduced varchar(50),
        ReportedOrEstimated varchar(100)
    );
    """

    cursor.execute(create_table)
    db.commit()

    logger.info("Table successfully created, if it didnt already exist.")
    return

create_production_table()


In [None]:
# Create a new mysql database table to store the well data.

def create_wells_table() -> None:
    """
    Args = None

    Return: None
    """

    logger.info(f"Creating new table `{WELLS_TABLE_NAME}` in MySQL.")
    
    create_table = f"""
    CREATE TABLE IF NOT EXISTS {WELLS_TABLE_NAME} (
        RequestedDataYear int,
        CalGEMRecordEntryDate datetime,
        API BIGINT,
        FieldCode int,
        AreaCode int,
        PoolCode int,
        WellTypeCode varchar(10),
        LeaseName varchar(100),
        FieldName varchar(100),
        AreaName varchar(100),
        WellNumber varchar(50),
        WellStatus varchar(50),
        PoolWellTypeStatus varchar(50),
        County varchar(100),
        District varchar(100),
        Section varchar(10),
        SubSection varchar(10),
        Township varchar(10),
        Ranges varchar(10),
        BM varchar(10),
        Operatorcode varchar(100),
        OperatorName varchar(255),
        OperatorStatus varchar(50),
        SystemEntryDate datetime
    );
    """

    cursor.execute(create_table)
    db.commit()

    logger.info("Table successfully created, if it didnt already exist.")
    return

create_wells_table()

In [None]:
# This function will parse the CSV records from their csv file 
# and construct dict() objects of the rows.

def parse_csv_records(file_to_process: str) -> list or None:
    """
    Args = file_to_process<string> # Relative path to a file 
    in data/*.csv.

    Return: None
    """
    
    try:
        _records = []
        
        with open(file_to_process, 'r') as csv_file:
            csv_reader = csv.DictReader(csv_file, delimiter=',')
            for row in csv_reader:
                _records.append(row)
        
        logger.info(f"Found {len(_records)} {file_to_process} records to process.")
        return _records
    
    except FileNotFoundError as fnf_err:
        logger.error(f"Unable to read from {file_to_process} file. error: {fnf_err}")
    
    except Exception as err:
        logger.error(f"Unable to obtain injection csv information. error: {err}")
    return

INJECTION_RECORDS = parse_csv_records(INJECTION_CSV_FILE_PATH)
PRODUCTION_RECORDS = parse_csv_records(PRODUCTION_CSV_FILE_PATH)
WELL_RECORDS = parse_csv_records(WELLS_CSV_FILE_PATH)

In [None]:
# Insert the injection csv data into the injection table

def insert_injection_records(records: list) -> None:
    """
    Args = records<list>

    Return: None
    """

    for injection in records:
        injection_record_insert_statement = f"""
        INSERT INTO {INJECTIONS_TABLE_NAME} (
            RequestedDataYear,
            CalGEMRecordEntryDate,
            ReportType,
            APINumber,
            FieldCode,
            AreaCode,
            PoolCode,
            WellTypeCode,
            InjectionDate,
            InjectionStatus,
            GasAirInjected,
            SteamWaterInjected,
            DaysInjecting,
            SurfaceInjectionPressure,
            CasingInjectionPressure,
            WaterSource,
            ReportedOrEstimated
        ) VALUES (
            {CALGEM_REPORTING_YEAR},
            "{datetime.datetime.now()}",
            "{injection['ReportType']}",
            "{injection['APINumber']}",
            "{injection['FieldCode']}",
            "{injection['AreaCode']}",
            "{injection['PoolCode']}",
            "{injection['WellTypeCode']}",
            "{injection['InjectionDate']}",
            "{injection['InjectionStatus']}",
            "{injection['GasAirInjected']}",
            "{injection['SteamWaterInjected']}",
            "{injection['DaysInjecting']}",
            "{injection['SurfaceInjectionPressure']}",
            "{injection['CasingInjectionPressure']}",
            "{injection['WaterSource']}",
            "{injection['ReportedOrEstimated']}")
        """

        cursor.execute(injection_record_insert_statement)
    db.commit()

    logger.info(f"Inserted {len(records)} injection records into the Injections table.")
    return

insert_injection_records(records=INJECTION_RECORDS)

In [None]:
# Insert the production csv data into the production table

def insert_production_records(records: list) -> None:
    """
    Args = records<list>

    Return = None
    """

    for production in records:
        prod_record_insert_statement = f"""
        INSERT INTO {PRODUCTION_TABLE_NAME} (
            RequestedDataYear,
            CalGEMRecordEntryDate,
            ReportType,
            APINumber,
            FieldCode,
            AreaCode,
            PoolCode,
            WellTypeCode,   
            ProductionReportDate,
            ProductionStatus,
            CasingPressure,
            TubingPressure,
            BTUofGasProduced,
            MethodOfOperation,
            APIGravityofOil,
            WaterDisposition,
            OilorCondensateProduced,
            DaysProducing,
            GasProduced,
            WaterProduced,
            ReportedOrEstimated
        ) VALUES (
            {CALGEM_REPORTING_YEAR},
            "{datetime.datetime.now()}",
            "{production['ReportType']}",
            "{production['APINumber']}",
            "{production['FieldCode']}",
            "{production['AreaCode']}",
            "{production['PoolCode']}",
            "{production['WellTypeCode']}",
            "{production['ProductionReportDate']}",
            "{production['ProductionStatus']}",
            "{production['CasingPressure']}",
            "{production['TubingPressure']}",
            "{production['BTUofGasProduced']}",
            "{production['MethodOfOperation']}",
            "{production['APIGravityofOil']}",
            "{production['WaterDisposition']}",
            "{production['OilorCondensateProduced']}",
            "{production['DaysProducing']}",
            "{production['GasProduced']}",
            "{production['WaterProduced']}",
            "{production['ReportedOrEstimated']}"
        )
        """
        
        cursor.execute(prod_record_insert_statement)
    db.commit()

    logger.info(f"Inserted {len(records)} production records into the Production table.")
    return

insert_production_records(PRODUCTION_RECORDS)

In [None]:
# Insert the well csv data into the well table

def insert_well_records(records: list) -> None:
    """ 
    Args = records<list>

    Return = None
    """
    
    for well in records:
        well_insert_statement = f"""
        INSERT INTO {WELLS_TABLE_NAME} (
            RequestedDataYear,
            CalGEMRecordEntryDate,
            API,
            FieldCode,
            AreaCode,
            PoolCode,
            WellTypeCode,
            LeaseName,
            FieldName,
            AreaName,
            WellNumber,
            WellStatus,
            PoolWellTypeStatus,
            County,
            District,
            Section,
            SubSection,
            Township,
            Ranges,
            BM,
            Operatorcode,
            OperatorName,
            OperatorStatus,
            SystemEntryDate
        ) VALUES (
            {CALGEM_REPORTING_YEAR},
            "{datetime.datetime.now()}",
            "{well['API']}",
            "{well['FieldCode']}",
            "{well['AreaCode']}",
            "{well['PoolCode']}",
            "{well['WellTypeCode']}",
            "{well['LeaseName']}",
            "{well['FieldName']}",
            "{well['AreaName']}",
            "{well['WellNumber']}",
            "{well['WellStatus']}",
            "{well['PoolWellTypeStatus']}",
            "{well['County']}",
            "{well['District']}",
            "{well['Section']}",
            "{well['SubSection']}",
            "{well['Township']}",
            "{well['Range']}",
            "{well['BM']}",
            "{well['Operatorcode']}",
            "{well['OperatorName']}",
            "{well['OperatorStatus']}",
            "{well['SystemEntryDate']}"
        )
        """

        cursor.execute(well_insert_statement)
    db.commit()

    logger.info(f"Inserted {len(records)} well records into the Wells table.")
    return

insert_well_records(WELL_RECORDS)

In [None]:
# Close the connection to the database.

def close_database_connection() -> None:
    """ 
    Args = None

    Return = None
    """
    try:
        db.close()
        logger.info("Database connection closed.")
    
    except MySQLdb.OperationalError as op_err:
        logger.error(f"Unable to close the database connection: {op_err}")
 
    except Exception as err:
        logger.error("Unknown error occured attempting to close the database connection.")
    return

close_database_connection()