In [1]:
import os
import pandas as pd
import plotly.express as px
import xml.etree.ElementTree as ET
from datetime import datetime
from glob import glob

1. Define the DIGGS Parsing Function

In [2]:
def parse_piezometer_xml(file_path):
    # Define namespaces
    namespaces = {
        'diggs': 'http://diggsml.org/schemas/2.6',
        'gml': 'http://www.opengis.net/gml/3.2',
        'lr': 'http://www.opengis.net/gml/3.3/lr',
        'xlink': 'http://www.w3.org/1999/xlink'
    }
    
    tree = ET.parse(file_path)
    root = tree.getroot()
    
    # Extract Well information
    well = root.find('.//diggs:Well', namespaces)
    if well is None:
        print(f"No <diggs:Well> element found in {file_path}")
        return None
    
    point_loc = well.find('.//diggs:PointLocation', namespaces)
    if point_loc is None:
        print(f"No <diggs:PointLocation> element found in {file_path}")
        return None
    
    pos = point_loc.find('.//gml:pos', namespaces)
    if pos is None:
        print(f"No <gml:pos> element found in {file_path}")
        return None
    
    lat, lon, _ = map(float, pos.text.strip().split())
    
    # Extract measurement data
    monitor = root.find('.//diggs:Monitor', namespaces)
    if monitor is None:
        return None
    
    reading = monitor.find('.//diggs:Reading', namespaces)
    if reading is None:
        return None
    
    outcome = reading.find('.//diggs:outcome', namespaces)
    if outcome is None:
        return None
    
    monitor_result = outcome.find('.//diggs:MonitorResult', namespaces)
    if monitor_result is None:
        return None
    
    # Extract time positions
    time_position_list = monitor_result.find('.//diggs:timeDomain/diggs:TimePositionList/diggs:timePositionList', namespaces)
    if time_position_list is None:
        return None
    
    times = time_position_list.text.strip().split()
    times = [datetime.fromisoformat(t) for t in times]
    
    # Extract data values
    results = monitor_result.find('.//diggs:results/diggs:ResultSet', namespaces)
    if results is None:
        return None
    
    data_values = results.find('.//diggs:dataValues', namespaces)
    if data_values is None:
        return None
    
    data_lines = data_values.text.strip().split('\n')
    data_lines = [line.strip() for line in data_lines if line.strip()]
    
    # Handle cases where timestamps and data values don't match
    data = []
    for i, time in enumerate(times):
        if i < len(data_lines):
            line = data_lines[i]
            values = line.split(',')
            if len(values) >= 2:
                elevation_h2o = float(values[1])
            else:
                elevation_h2o = None  # Missing value
        else:
            elevation_h2o = None  # No data value for this timestamp
        
        data.append({
            'latitude': lat,
            'longitude': lon,
            'time': time,
            'elevation_h2o': elevation_h2o
        })
    
    return data


2. Parse All DIGGS Files and Compile Data

In [3]:
import pandas as pd
import os
import glob
import xml.etree.ElementTree as ET
from datetime import datetime

# Define the directory containing XML files
xml_directory = '/workspaces/Diggs_Piezometer_Converter/DIGGS_Piezometers'  # Replace with your directory path

# Use glob to find all XML files in the directory
xml_files = glob.glob(os.path.join(xml_directory, '*.xml'))
print(xml_files)

# Initialize an empty list to store all data
all_data = []

# Iterate over each XML file and parse data
for file in xml_files:
    print(f"Parsing file: {file}")
    tree = ET.parse(file)
    root = tree.getroot()

    # Define namespaces
    namespaces = {
        'diggs': 'http://diggsml.org/schemas/2.6',
        'gml': 'http://www.opengis.net/gml/3.2',
        'lr': 'http://www.opengis.net/gml/3.3/lr',
        'xlink': 'http://www.w3.org/1999/xlink'
    }

    # Extract Well information
    well = root.find('.//diggs:Well', namespaces)
    if well is None:
        print(f"No <diggs:Well> element found in {file}")
        continue

    point_loc = well.find('.//diggs:PointLocation', namespaces)
    if point_loc is None:
        print(f"No <diggs:PointLocation> element found in {file}")
        continue

    pos = point_loc.find('.//gml:pos', namespaces)
    if pos is None:
        print(f"No <gml:pos> element found in {file}")
        continue

    lat, lon, _ = map(float, pos.text.strip().split())

    # Extract measurement data
    monitor = root.find('.//diggs:Monitor', namespaces)
    if monitor is None:
        continue

    reading = monitor.find('.//diggs:Reading', namespaces)
    if reading is None:
        continue

    outcome = reading.find('.//diggs:outcome', namespaces)
    if outcome is None:
        continue

    monitor_result = outcome.find('.//diggs:MonitorResult', namespaces)
    if monitor_result is None:
        continue

    # Extract time positions
    time_position_list = monitor_result.find('.//diggs:timeDomain/diggs:TimePositionList/diggs:timePositionList', namespaces)
    if time_position_list is None:
        continue

    times = time_position_list.text.strip().split()
    times = [datetime.fromisoformat(t) for t in times]

    # Extract data values
    results = monitor_result.find('.//diggs:results/diggs:ResultSet', namespaces)
    if results is None:
        continue

    data_values = results.find('.//diggs:dataValues', namespaces)
    if data_values is None:
        continue

    # Split dataValues into lines and extract the middle value (elevation_h2o)
    data_lines = data_values.text.strip().split()
    elevation_h2o_values = [float(line.split(',')[1]) for line in data_lines if len(line.split(',')) >= 3]

    # Ensure the number of times matches the number of elevation values
    for i, time in enumerate(times):
        elevation_h2o = elevation_h2o_values[i] if i < len(elevation_h2o_values) else None
        all_data.append({
            'latitude': lat,
            'longitude': lon,
            'time': time,
            'elevation_h2o': elevation_h2o
        })

# Create a Pandas DataFrame
df = pd.DataFrame(all_data)

# Drop rows with missing latitude or longitude
df = df.dropna(subset=['latitude', 'longitude'])

# Count the number of rows in the DataFrame
num_rows = len(df)
print(f"Total number of rows in the DataFrame: {num_rows}")

# Save the DataFrame to a CSV file
output_file = 'output.csv'  # Specify the desired file name
df.to_csv(output_file, index=False)  # Set index=False to exclude the index column
print(f"DataFrame saved to {output_file}")


['/workspaces/Diggs_Piezometer_Converter/DIGGS_Piezometers/modified_PZ-B-10A-23.xml', '/workspaces/Diggs_Piezometer_Converter/DIGGS_Piezometers/modified_PZ-B-14A.xml', '/workspaces/Diggs_Piezometer_Converter/DIGGS_Piezometers/modified_PZ-B-11A-23.xml', '/workspaces/Diggs_Piezometer_Converter/DIGGS_Piezometers/modified_PZ-B-05A-23.xml', '/workspaces/Diggs_Piezometer_Converter/DIGGS_Piezometers/modified_PZ-B-11B-23.xml', '/workspaces/Diggs_Piezometer_Converter/DIGGS_Piezometers/modified_PZ-B-13-23.xml', '/workspaces/Diggs_Piezometer_Converter/DIGGS_Piezometers/modified_PZ-B-07-23.xml', '/workspaces/Diggs_Piezometer_Converter/DIGGS_Piezometers/modified_PZ-B-06-23.xml', '/workspaces/Diggs_Piezometer_Converter/DIGGS_Piezometers/modified_PZ-B-03-23.xml', '/workspaces/Diggs_Piezometer_Converter/DIGGS_Piezometers/modified_PZ-B-11-23.xml', '/workspaces/Diggs_Piezometer_Converter/DIGGS_Piezometers/modified_PZ-B-08-23.xml', '/workspaces/Diggs_Piezometer_Converter/DIGGS_Piezometers/modified_PZ-B-0

In [4]:
import pandas as pd
import plotly.express as px
from datetime import datetime, timedelta

# Ensure 'time' is in datetime format
df['time'] = pd.to_datetime(df['time'], errors='coerce')  # Coerce invalid dates to NaT

# Drop rows with NaN values in required columns
df = df.dropna(subset=['latitude', 'longitude', 'time', 'elevation_h2o'])

# Specify the number of days
num_days = 2  # Adjust this value as needed
end_date = df['time'].max()  # The most recent timestamp in the data
start_date = end_date - timedelta(days=num_days)

# Filter the DataFrame for the specified date range
df = df[(df['time'] >= start_date) & (df['time'] <= end_date)]

# Sort the DataFrame by time in ascending order
df = df.sort_values(by='time', ascending=True)

# Calculate the bounding box
min_lat, max_lat = df['latitude'].min(), df['latitude'].max()
min_lon, max_lon = df['longitude'].min(), df['longitude'].max()

# Calculate the center of the map
center_lat = (min_lat + max_lat) / 2
center_lon = (min_lon + max_lon) / 2

# Estimate a zoom level based on the data's bounding box
lat_range = max_lat - min_lat
lon_range = max_lon - min_lon
max_range = max(lat_range, lon_range)
zoom = max(1, 12 - max_range * 10) + 5

# Define the color scale range
color_min = df['elevation_h2o'].min()
color_max = df['elevation_h2o'].max()

# Create the scatter mapbox using an open-source map style
fig = px.scatter_mapbox(
    df,
    lat='latitude',
    lon='longitude',
    color='elevation_h2o',
    size='elevation_h2o',
    color_continuous_scale='Viridis',
    range_color=[color_min, color_max],  # Lock the color scale
    size_max=15,
    mapbox_style='open-street-map',
    animation_frame=df['time'].dt.strftime('%Y-%m-%d %H:%M:%S'),
    title=f'Piezometer Water Elevation Over the Last {num_days} Days',
    labels={'elevation_h2o': 'Elevation H₂O (ft)'}
)

# Update layout to dynamically center and zoom the map
fig.update_layout(
    margin={"r": 0, "t": 50, "l": 0, "b": 0},
    coloraxis_colorbar=dict(title="Elevation H₂O (ft)"),
    mapbox=dict(center={"lat": center_lat, "lon": center_lon}, zoom=zoom)
)

# Save the figure as an HTML file
fig.write_html(
    "piezometer_visualization.html",
    full_html=True,
    include_plotlyjs=True,
    config={'displayModeBar': True}
)

print("Visualization has been saved as 'piezometer_visualization.html'")

Visualization has been saved as 'piezometer_visualization.html'
