In [59]:
import os, glob, csv, statistics, math, numpy as np, exifread, geopandas as gpd, pandas as pd, cameratransform as ct, pathlib
from osgeo import gdal, osr
from shapely.geometry import Point, Polygon
from PIL import Image, ExifTags
from scipy.spatial.transform import Rotation

# set the base directory where everything lives, including all photos
base_dir = pathlib.Path(r"D:\Example_datasets\UAS_survey-JHI_WE_S2\F1")
out_dir = base_dir

# set the subdirectory where the images live
img_dir = base_dir.joinpath("OUTPUT")

# set the subdirectory where the flight log lives
log_dir = base_dir.joinpath("FLIGHT RECORD")

# set the subdirectories where the georeferenced images will be output and where the footprints feature will be output
imgout_dir = out_dir.joinpath("Georeferenced")
shpout_dir = out_dir.joinpath("Shapefiles")

# create output directories, if necessary
os.makedirs(shpout_dir, exist_ok = True)
os.makedirs(imgout_dir, exist_ok = True)

# pull all images into a single list from the directory
img_list = list(img_dir.glob("**/*.jpg"))

# this is a dictionary that I manually look up online and maintain for camera models we might use so far
sensor_width_dictionary = {"SONY ILCE-6100": 23.5,
                      "Canon EOS 5DS R": 36,
                     "NIKON D810": 35.9}

In [78]:
# This section sets up metadata scraping functions and reads in a sample image for camera parameters
# There's almost certainly a more efficient way to pull all exif tags at once and sort them
# but it would be a lot of work and harder to interpret, and exif handling is a whole beast of its own

sample_image = img_list[0]

def DMS_to_DD(degrees, minutes, seconds, direction):
    dd = float(degrees) + float(minutes)/60 + float(seconds)/(60*60);
    if direction == 'W' or direction == 'S': dd *= -1
    return dd;

def pull_exif(img, remove_unresolved_tags = True, clean_make_model = True):
    with Image.open(img) as im:
        if remove_unresolved_tags == True:
            exif_pull = {ExifTags.TAGS[k]: v for k, v in im._getexif().items() if k in ExifTags.TAGS if r"\x" not in str(v)}
        else:
            exif_pull = {ExifTags.TAGS[k]: v for k, v in im._getexif().items() if k in ExifTags.TAGS}
    if clean_make_model == True: # elimate redundancy between make and model
        if exif_pull['Make'] in exif_pull['Model']: exif_pull['Model'] = exif_pull['Model'].replace(exif_pull['Make'], "").strip()
    return exif_pull

def pull_GPS(img, remove_unresolved_tags = True, dd = True):
    with Image.open(img) as im:
        if remove_unresolved_tags == True:
            GPS_pull = {ExifTags.GPSTAGS.get(k): v for k, v in im.getexif().get_ifd(ExifTags.IFD.GPSInfo).items() if r"\x" not in str(v)}
        else:
            GPS_pull = {ExifTags.GPSTAGS.get(k): v for k, v in im.getexif().get_ifd(ExifTags.IFD.GPSInfo).items()}
        if dd == True:
            GPS_pull["GPSLatitude"], GPS_pull["GPSLongitude"] = \
                DMS_to_DD(*GPS_pull["GPSLatitude"], GPS_pull["GPSLatitudeRef"]), DMS_to_DD(*GPS_pull["GPSLongitude"], GPS_pull["GPSLongitudeRef"])
            del GPS_pull["GPSLatitudeRef"], GPS_pull["GPSLongitudeRef"]
    return GPS_pull

def pull_XMP(img):
    with Image.open(img) as im:
        XMP_pull = im.getxmp()
        if 'xmpmeta' in XMP_pull.keys():
            XMP_pull = XMP_pull['xmpmeta']
            if 'RDF' in XMP_pull.keys():
                XMP_pull = XMP_pull['RDF']
                if 'Description' in XMP_pull.keys():
                    XMP_pull = XMP_pull['Description']
    return XMP_pull

# pull exif data and set up camera parameters
exif_data = pull_exif(sample_image)

# clean up any redundancy in the make and model, e.g. [Canon] [Canon EOS 5DS R]
make_model = f"{exif_data['Make']} {exif_data['Model']}"

# This section creates a class that stores important photogrammetry parameters
class Camera:
    def __init__(self, name, sensorW, focalL, imageW, imageH):
        # name, focal length in mm
        self.name, self.fl = name, focalL
        # sensor width, height, diagonal in mm
        self.sw, self.sh = sensorW, (imageH/imageW)*sensorW
        self.sd = math.hypot(sensorW, self.sh)
        # image width, height, diagonal in pixels
        self.imw, self.imh, self.imd = imageW, imageH, math.hypot(imageW, imageH)
        # angle of view width, heigh, diagonal in degrees
        self.aovw, self.aovh, self.aovd = math.degrees(2*math.atan(sensorW/(2*focalL))), math.degrees(2*math.atan(self.sh/(2*focalL))), math.degrees(2*math.atan(self.sd/(2*focalL)))
        # image and sensor dimensions in single variables, for convenience
        self.imsz = self.imw, self.imh
        self.ssz = self.sw, self.sh

if make_model in sensor_width_dictionary:
    sensor_width = sensor_width_dictionary[make_model]
else:
    print("Model name not found in dictionary, estimating from focal lengths (may include roundoff error)")
    sensor_width = float(36*exif_data['FocalLength']/exif_data['FocalLengthIn35mmFilm'])

camobj = Camera(make_model, sensor_width, exif_data['FocalLength'], exif_data['ExifImageWidth'], exif_data['ExifImageHeight'])
# print(vars(camobj)) # uncomment if you want to check the camera parametrs

# set up adjustments (based on time offset) to get datetime into UTC
if (exif_data['OffsetTime'][0] == "+") or (exif_data['OffsetTime'][0] == "-"): offset_sign = int(f"{exif_data['OffsetTime'][0]}1")
else: offset_sign = 1
hoffset, moffset = exif_data['OffsetTime'].replace('+', '').replace('-', '').split(":")
# can simply use GPSTimeStamp if available, but GPS data tend to be less consistently available than Exif among different camera systems

In [None]:
# start building the dataframe for all photos
df = pd.DataFrame({'ImagePath':img_list})
df['ImageName'] = [img.name for img in img_list]
df['DateTime(UTC)'] = pd.to_datetime([pull_exif(img)['DateTime'] for img in img_list], format = "%Y:%m:%d %H:%M:%S")\
    -offset_sign*(pd.offsets.Hour(int(hoffset)) + pd.offsets.Minute(int(moffset)))
df = pd.merge(df, pd.DataFrame([pull_GPS(img) for img in img_list])[['GPSLatitude', 'GPSLongitude', 'GPSAltitude']], left_index=True, right_index=True)
df = pd.merge(df, pd.DataFrame([pull_XMP(img) for img in img_list])[['Pitch', 'Yaw', 'Roll']], left_index=True, right_index=True)

# this converts our drone PYR orientation streams (in ZXY format) to camera HTR orientation (in ZXZ format) for camera projection
# the (1, -1, -1) conversion array was determined using trial and error with toy datasets and the results look reasonable
# I expect this concerns the conversion from drone-based perspective to camera-based perspective, but could be wrong.
# similarly, inverting the pitch seems to be appropriate for this system, based on observed values coming off the drone
# (predominantly negative) and trial and error with toy datasets
df['Pitch'] = -1*df['Pitch'].astype(float)
rotated_PYR = pd.DataFrame([(np.array([1, -1, -1]) * Rotation.from_euler('ZXY' , [i.Yaw, i.Pitch, i.Roll], degrees=True).as_euler('ZXZ', degrees=True)).tolist() for i in df.itertuples()]).rename(columns= {0:'Heading_ZXZ', 1:'Tilt_ZXZ', 2:'Roll_ZXZ'})
df = pd.merge(df, rotated_PYR, left_index=True, right_index=True)

# Altitude data often shouldn't be taken at face value. Barometric altitude measurements are often "good enough" but imprecise
# can drift over long flight-times, and is measured relative to the point of take-off, which isn't always sea-level.
# GPS altitude is measured relative to a vertical datum, which doesn't always correspond to mean sea level and as a static value
# definitely doesn't correspond to sea level at the time of survey. With tide data and a vertical datum reference on MSL
# this can be calculated, but sometimes manual offsets are needed to massage the data into its correct altitude.

# For this script, we use barometric altitude as "good enough"

# read in the CSV flight log and rename the columns
csv_list = list(log_dir.glob("*.csv"))
print(f"Reading log file {csv_list[0]}")
log_df = pd.read_csv(csv_list[0]).rename(columns={'time(epoch)':'DateTime(Epoch)', 'time(UTC)':'DateTime(UTC)', 'lat':'DroneLatitude', 'lon':'DroneLongitude', 'alt':'DroneAltitude'})
# format and merge the flight log table into our dataframe
log_df['DateTime(UTC)']=pd.to_datetime(log_df['DateTime(UTC)'], format = "%Y-%m-%d %H:%M:%S")
df = pd.merge(df, log_df[['DateTime(UTC)', 'DroneAltitude']], on='DateTime(UTC)', how='left')
if df.isnull().values.any():
    print("Not all image timestamps have exact matches for DroneAltitude. This often indicates an error, but could reflect some imperfect alignment. \
     Possible solutions include relaxing the timestamp merge conditions or applying a approximate offset to GPSAltitude.")

# these are some alternative options for altitude

# this one uses drone altitude to figure out the offset between GPS altitude (which is more precise) and barometric altitude (which is more accurate)
# then applies the accuracy offset to the GPS measurements
use_offset = False
if use_offset:
     altitude_offset = statistics.mode(log_df['DroneAltitude']) - statistics.mode(df['GPSAltitude'].astype(float))
     df['DroneAltitude'] = df['GPSAltitude'] + altitude_offset

# I don't recommend using an unaltered GPS altitude. At the very least, convert from WGS84 to a more accurate EGM value
use_GPSAltitude = False
if use_GPSAltitude:
     altitude_offset = 0.0
     df['DroneAltitude'] = df['GPSAltitude'].astype('float64') + altitude_offset

# The following sections involve imagery projection. They continue to use the same dataframe for data keeping.

# set the rectilinear projection variable for our camera
rlp = ct.RectilinearProjection(focallength_mm = camobj.fl, sensor = camobj.ssz, image = camobj.imsz)

# create the camera perspectives based on orientation data
df['CameraPerspective'] = [ct.Camera(rlp, 
                                      ct.SpatialOrientation(elevation_m = i.DroneAltitude,
                                                            tilt_deg = i.Tilt_ZXZ,
                                                            roll_deg = i.Roll_ZXZ,
                                                            heading_deg = i.Heading_ZXZ, 
                                                            pos_x_m = 0, pos_y_m = 0
                                                           )
                                     ) for i in df.itertuples()
                           ]

# assign the GPS positions. For whatever reason, this works in a loop but not in list comprehension.
for i in df.itertuples():
    i.CameraPerspective.setGPSpos(i.GPSLatitude, i.GPSLongitude, i.DroneAltitude)
    
# define the edge points that we're going to anchor/warp using GCPs
img_keypoints = [[0, 0], # top left
                  [camobj.imw-1, 0], # top right
                  [camobj.imw-1, camobj.imh-1], # bottom right
                  [0, camobj.imh-1], # bottom left
                  
                  [int((camobj.imw)/2), 0], # top midpoint
                  [camobj.imw-1, int((camobj.imh)/2)], # right midpoint
                  [int((camobj.imw)/2), camobj.imh-1], # bottom midpoint
                  [0, int((camobj.imh)/2)], # left midpoint

                  [int((camobj.imw)/2), int((camobj.imh)/2)], # middle midpoint

                  [int((camobj.imw)*0.75), int((camobj.imh)/2)], # h midline midpoint
                  [int((camobj.imw)*0.25), int((camobj.imh)/2)], # h midline midpoint
                  [int((camobj.imw)/2), int((camobj.imh)*0.75)], # v midline midpoint
                  [int((camobj.imw)/2), int((camobj.imh)*0.25)] # v midline midpoint
                  ]

# create the keypoints based on camera perspectives
df['Keypoints'] = [
    [i.CameraPerspective.gpsFromImage(j).tolist()[0:2][::-1] + [0] + j for j in img_keypoints]
    for i in df.itertuples()
]

# create quadrangles from the first 4 GCPs
df['geometry'] = [[Polygon([j[0:2]for j in i.Keypoints][0:4])] for i in df.itertuples()]
df['geometry'] = df.geometry.explode() # necessary because the iterator returns lists

# This section outputs the geodataframe to a shapefile with all metadata as attributes.
# Note that attribute field names get truncated to 10 characters

# This section tries to suppress some of those warnings
import warnings
warnings.filterwarnings("ignore", message="Normalized/laundered field name")

out_path = f"{shpout_dir}\\Image_footprints.shp"
df['DateTime(UTC)'] = df['DateTime(UTC)'].astype('string')
gpd.GeoDataFrame(df, crs="EPSG:4326").drop(["CameraPerspective", "Keypoints"], axis=1).to_file(out_path)
print(f'Shapefile produced at {shpout_dir}')

In [None]:
# This section reads in each image and its GCPs, references and warps the image, then saves it out as a JPEG
# It loops through the entire directory, which can take a long time. This can be rewritten to only call target
# files or segments of the directory, but I haven't implemented that in this script.

# alternative format "GTiff" is much larger per file
out_format = "JPEG"

gdal.UseExceptions()

count = 0
for index, record in df.iterrows():

    # Read in the image file:
    ds = gdal.Open(record['ImagePath'])
    if ds is None:
        print(f"Could not open image: {record['ImagePath']}")
    # Set spatial reference:
    sr = osr.SpatialReference()
    # if GPS data are a different spatial reference you'll have to change this manually
    sr.ImportFromEPSG(4326)

    # Import GCPs
    gcps = [gdal.GCP(*i) for i in record['Keypoints']]

    # Apply the GCPs to the open output file then warp it
    ds.SetGCPs(gcps, sr.ExportToWkt())

    if out_format=="JPEG":
        kwargs = {'format': 'JPEG', 'polynomialOrder':3, 'srcNodata': '0,0,0', 'dstNodata': 'nodata'}
        ds = gdal.Warp(imgout_dir.joinpath(f"{record['ImageName']}_geoloc.jpg"), ds, **kwargs)
    elif out_format=="GTiff":
        kwargs = {'format': 'GTiff', 'polynomialOrder':3, 'srcNodata': '0,0,0', 'dstNodata': 'nodata'}
        ds = gdal.Warp(imgout_dir.joinpath(f"{record['ImageName']}_geoloc.tif"), ds, **kwargs)
    else:
        print(fr"format '{out_format}' not currently supported, code it yourself from https://gdal.org/en/latest/drivers/raster/index.html")
        break
    # Clear the variable (not sure if necessary, but good form)
    ds = None
    # counter, just because it can be a long process
    count+=1
    if (index+1) % 10 == 0:
        print(f"Completed file {count} out of {len(df)}, record {index+1}")
print("finished processing images")