In [74]:
import glob
import numpy as np
import datetime
import copy

In [75]:
try:
    filename = glob.glob('in_out/*_in.csv')[0]
    file = open(filename, 'r')
    obs_list = list(file)
    
except IndexError:
    print('Please make sure that the file exist and the path is ok')
finally:
    all_points = {}
    keys = []
    values = []

In [76]:
i = 0
for item in range(len(obs_list)):
    obs_list[i] = obs_list[i].replace(',,', '')
    obs_list[i] = obs_list[i].strip()
    if obs_list[i].endswith(','):
        obs_list[i] = obs_list[i][0:-1]
    i += 1

## Gather points to adjust

In [77]:
points_to_adjust = []
to_copy = False
for line in obs_list:
    if line == "A,B,d,e_d" or line == "C,L,P,alpha,e_alpha" or line == "tolerance":
        to_copy = False
    elif line == "Nr,X,Y":
        to_copy = True
    if to_copy:
        line = line.split(',')
        if not line[2].isalpha():
            values.append([float(line[1]), float(line[2])])
            keys.append(line[0])
            if not line[-1] == "$":
                points_to_adjust.append(line[0])

all_points = dict(zip(keys, values))
initial_points = copy.deepcopy(all_points)

## Gather distances

In [78]:
to_copy = False
obs_distances = []

for line in obs_list:
    if line == "Nr,X,Y" or line == "C,L,P,alpha,e_alpha" or line == "tolerance":
        to_copy = False
    elif line == "A,B,d,e_d":
        to_copy = True
    if to_copy:
        line = line.split(',')
        if not line[2].isalpha():
            obs_distances.append([line[0], line[1], float(line[2]), float(line[3])])

## Gather angles

In [79]:
to_copy = False
obs_angles = []

for line in obs_list:
    if line == "A,B,d,e_d" or line == "Nr,X,Y" or line == "tolerance":
        to_copy = False
    elif line == "C,L,P,alpha,e_alpha":
        to_copy = True
    if to_copy:
        line = line.split(',')
        if not line[3].isalpha():
            obs_angles.append([line[0], line[1], line[2], float(line[3]), float(line[4])])

## Set tolerance

In [80]:
to_copy = False
for line in obs_list:
    if line == "A,B,d,e_d" or line == "Nr,X,Y" or line == "C,L,P,alpha,e_alpha":
        to_copy = False
    elif line == "tolerance":
        to_copy = True
    if to_copy:
        if not line.isalpha():
            tolerance = float(line)

In [81]:
report_path = 'in_out/report.txt'

In [82]:
def write_report(array2d):
    array2d2 = []
    for item in array2d:
        item2 = []
        for i in range(len(item)):
            if type(item[i]) != r"<class 'str'>":
                item2.append(str(item[i]))
            else:
                item2.append(item[i])
            i += 1
        array2d2.append(item2)
    f = open(report_path, 'a')
    for item in array2d2:
        f.write(', '.join(item) + "\n")
    f.close()

In [83]:
keys = points_to_adjust
values = []

for item in all_points:
    if item in keys:
        values.append(all_points.get(item))
adjusted_points = dict(zip(keys, values))

In [84]:
def adjustment(nr_iter):
    if nr_iter == 1:
        f = open(report_path, 'w')
        f.write("Input data:\nDistances\n(FROM, TO, distance [m], error [m]): \n")
        f.close()
        write_report(obs_distances)
        f = open(report_path, 'a')
        f.write("\n")
        f.write("Angles\n(C, L, P, angle [g], error [cc]): \n")
        f.close()
        write_report(obs_angles)

    # creating a system of observational equations
    A = []
    L = []
    
    for rekord in obs_distances:  # insert the coefficients from distance observations into the matrix A
        P = rekord[0]  # distance from
        K = rekord[1]  # distance to
        d = rekord[2]  # value
        dx = all_points[K][0] - all_points[P][0]  # length increase by X coordinate
        dy = all_points[K][1] - all_points[P][1]  # length increase by Y coordinate
        d_approx = np.sqrt(dx ** 2 + dy ** 2)  # approximate distance calculated from coordinates

        A_row = []
        for x in range(2 * len(points_to_adjust)): # initially filling the row with zeros
            A_row.append(0)

        dXP = -dx / (np.sqrt(dx ** 2 + dy ** 2))
        dYP = -dy / (np.sqrt(dx ** 2 + dy ** 2))
        dXK = dx / (np.sqrt(dx ** 2 + dy ** 2))
        dYK = dy / (np.sqrt(dx ** 2 + dy ** 2))

        L_row = d - d_approx  # creating a constant term vector simultaneously

        # the following loops put the coefficient in the right place
        odl_p = 0
        for w in points_to_adjust:
            if P == w:
                A_row[2 * odl_p] = dXP
                A_row[2 * odl_p + 1] = dYP
            odl_p += 1

        odl_k = 0
        for w in points_to_adjust:
            if K == w:
                A_row[2 * odl_k] = dXK
                A_row[2 * odl_k + 1] = dYK
            odl_k += 1

        A.append(A_row)
        L.append(L_row)

    for rekord in obs_angles:  # insert the coefficients from angle observations into the matrix 
        C = rekord[0]
        Le = rekord[1]
        P = rekord[2]
        alpha = rekord[3]

        A_row = []
        for x in range(2 * len(points_to_adjust)):
            A_row.append(0)

        pXL = all_points[Le][0] - all_points[C][0]
        pYL = all_points[Le][1] - all_points[C][1]
        pXP = all_points[P][0] - all_points[C][0]
        pYP = all_points[P][1] - all_points[C][1]
        dL = (pXL ** 2 + pYL ** 2) ** (1 / 2)  # distance to the left point
        dP = (pXP ** 2 + pYP ** 2) ** (1 / 2)  # distance to the right point

        az_cp = np.arctan2(pYP, pXP)
        if az_cp < 0:
            az_cp += 2 * np.pi

        az_cl = np.arctan2(pYL, pXL)
        if az_cl < 0:
            az_cl += 2 * np.pi

        alpha_approx = (az_cp - az_cl) * 200 / np.pi  # angle counted as a difference of azimuths

        if alpha_approx < 0:
            alpha_approx += 400

        L_row = (alpha - alpha_approx) * 10000  # value in centicentigradians [cc]

        AL = pXL * 636620 / dL ** 2  # [cc/m]
        AP = pXP * 636620 / dP ** 2
        BL = pYL * 636620 / dL ** 2
        BP = pYP * 636620 / dP ** 2

        odl_c = 0
        for w in points_to_adjust:
            if C == w:
                A_row[2 * odl_c] = -(BL - BP)
                A_row[2 * odl_c + 1] = AL - AP
            odl_c += 1

        odl_l = 0
        for w in points_to_adjust:
            if Le == w:
                A_row[2 * odl_l] = BL
                A_row[2 * odl_l + 1] = -AL
            odl_l += 1

        odl_p = 0
        for w in points_to_adjust:
            if P == w:
                A_row[2 * odl_p] = -BP
                A_row[2 * odl_p + 1] = AP
            odl_p += 1

        A.append(A_row)
        L.append(L_row)

    # create the weighing matrix
    size = len(obs_distances) + len(obs_angles)

    P = np.zeros((size, size))
    for i in range(len(obs_distances) + len(obs_angles)):
        if i < len(obs_distances):
            P[i][i] = 1 / obs_distances[i][3] ** 2
        elif i < len(obs_angles) + len(obs_distances):
            P[i][i] = 1 / obs_angles[i - len(obs_distances)][4] ** 2
        i += 1
    A = np.array(A)  # convertion to numpy arrays
    L = np.array(L)
    
    ATP = A.T @ P
    X = ATP @ A  # AtPA
    X = np.linalg.inv(X)  # AtPA -1
    X = X @ (ATP @ L)
    X = X.tolist()

    i = 0
    for point in adjusted_points:  # adjust the coordinates
        adjusted_points[point][0] += X[i]
        adjusted_points[point][1] += X[i + 1]
        i += 2

    V = A @ X - L  # corrections vector

    def matrix_print(matrix, places):
        matrix = matrix.tolist()
        for w in range(len(matrix)):
            k = 0
            for k in range(len(matrix[w])):
                if k < len(matrix[w]) - 1:
                    f.write(str(round(matrix[w][k] * 1000000, places)) + " ")
                    k += 1
                elif k == len(matrix[w]) - 1:
                    f.write(str(round(matrix[w][k] * 1000000, places)) + "\n")
                    k += 1
            w += 1

    for item in points_to_adjust:
        all_points[item] = adjusted_points[item]

    if nr_iter == -1:
        f = open(report_path, 'a')
        vtpv = (V.T @ P) @ V
        f.write("__________________________________________________\n")
        f.write("Residual variance estimator [-]\n")
        m02 = vtpv / (len(obs_angles) + len(obs_distances) - 2 * len(points_to_adjust))
        f.write("m0^2 = " + str(round(m02, 4)) + '\n\n')

        f.write("__________________________________________________\n")
        f.write("Root mean square error [-]\n")
        m0 = m02 ** (1 / 2)
        f.write("m0 = " + str(round(m0, 4)) + '\n\n')

        f.write("__________________________________________________\n")
        f.write("Covariance matrix for the adjusted coordinates [m2] \n")
        COVH = m02 * np.linalg.inv(ATP @ A)
        matrix_print(COVH, 4)

        m0_H = []
        COVH = COVH.tolist()

        w = 0
        for row in COVH:
            k = 0
            for col in row:
                if w == k:
                    m0_H.append((COVH[w][k]) ** (1 / 2))
                k += 1
            w += 1

        COVdh = (A @ COVH) @ A.T  # covariance matrix for observations

        m0_dh = []
        w = 0
        for row in COVdh:
            k = 0
            for col in row:
                if w == k:
                    m0_dh.append((COVdh[w][k]) ** (1 / 2))
                k += 1
            w += 1

        f.write("__________________________________________________\n")
        i = 0
        if obs_distances:
            f.write("\nStandard deviations of distances:\nFrom    To    m_d_adj [mm]\n")
            for obs in obs_distances:
                f.write(
                    str(obs[0]) + "    " + str(obs[1]) + "    " + str("%.1f" % round(m0_dh[i] * 1000, 1)) + "\n")
                i += 1
        if obs_angles:
            f.write("\nStandard deviations of angles:\nC    L    P    m_k_wyr [cc]\n")
            for obs in obs_angles:
                f.write(str(obs[0]) + "    " + str(obs[1]) + "    " + str(obs[2]) + "    " + str(
                    "%.1f" % round(m0_dh[i], 1)) + "\n")
                i += 1

        f.write("__________________________________________________\n")
        f.write("Error of adjusted coordinates and summary horizontal point error [mm]\n")

        i = 0
        j = 0
        f.write('Nr    mX    mY    mP\n')
        errors = []
        for item in range(int(0.5 * len(m0_H))):
            blad = np.sqrt((m0_H[i] * 1000) ** 2 + (m0_H[i + 1] * 1000) ** 2)
            f.write(str(points_to_adjust[j]) + "    " + str("%.1f" % round(m0_H[i] * 1000, 1)) + "    " + str(
                "%.1f" % round(m0_H[i + 1] * 1000, 1)) + "    " + str(
                round(blad, 1)) + '\n')
            errors.append(blad)
            j += 1
            i += 2

        f.write("__________________________________________________\n")
        f.write("Average point position error [mm]\n")
        f.write(str(round(np.average(errors), 1)) + "\n")
     
        f.write("__________________________________________________\n")
        f.write("Adjusted coordinates (X,Y) [m] (the \"$\" symbol marks the control points)\n")
        for i in all_points:
            if i not in points_to_adjust:
                f.write(str(i) + "$" + "    " + str("%.3f" % round(all_points[i][0], 3)) + "    " + str(
                    "%.3f" % round(all_points[i][1], 3)) + "\n")
            else:
                f.write(str(i) + "    " + str("%.3f" % round(all_points[i][0], 3)) + "    " + str(
                    "%.3f" % round(all_points[i][1], 3)) + "\n")
        f.close()

    return adjusted_points

In [85]:
result = adjustment(1).copy()
i = 1
while True:
    finished = False
    array = []
    for item in result:
        row = [result[item][0], result[item][1]]
    array.append(row)
    array2 = []
    result = adjustment(i).copy()
    i += 1

    for item in result:
        row = [result[item][0], result[item][1]]
    array2.append(row)
    j = 0
    for item in range(len(array)):
        if abs(array[j][0] - array2[j][0]) < tolerance and abs(array[j][1] - array2[j][1]) < tolerance:
            finished = True
        j += 1
    if i == 100:
        print("Error: After 100 repetitions the differences between iterations are still too high")
        break

    if finished:
        f = open(report_path, 'a')
        f.write("__________________________________________________\n")
        f.write("    Iteration number " + str(i + 1) + "\n")
        f.write("    Tolerance parameter (maximal difference between coordinates from consecutive iterations)[m]: " + str(
            tolerance) + "\n")
        f.close()
        result = (adjustment(-1))
        
        f = open(report_path, 'a')
        now = datetime.datetime.now()
        f.write(now.strftime("\nDone: %d-%m-%Y %H:%M \n"))
        f.close()
        print("Calculations done. Please check the report at /" + report_path)
        break

Calculations done. Please check the report at /in_out/report.txt
