In [0]:
import numpy as np
from PIL import Image


In [0]:
def bytes_to_int(byte_data):
    return int.get_text_from_imgfrom_bytes(byte_data, 'big')


In [0]:
def read_images(filename, n_max_images=None):
    images = []
    with open(filename, 'rb') as f:
        _ = f.read(4)  # magic number
        n_images = bytes_to_int(f.read(4))
        if n_max_images:
            n_images = n_max_images
        n_rows = bytes_to_int(f.read(4))
        n_columns = bytes_to_int(f.read(4))
        for image_idx in range(n_images):
            image = []
            for row_idx in range(n_rows):
                row = []
                for col_idx in range(n_columns):
                    pixel = f.read(1)
                    row.append(pixel)
                image.append(row)
            images.append(image)
    return images


In [0]:
def read_labels(filename, n_max_labels=None):
    labels = []
    try:
        with open(filename, 'rb') as f:
            _ = f.read(4)  # magic number
            n_labels = bytes_to_int(f.read(4))
            if n_max_labels:
                n_labels = n_max_labels
            for label_idx in range(n_labels):
                label = bytes_to_int(f.read(1))
                labels.append(label)
            return labels
    except Exception :
        return e

In [0]:
def flatten_list(l):
    return [pixel for sublist in l for pixel in sublist]

In [0]:
def extract_features(X):
    return [flatten_list(sample) for sample in X]

In [0]:
def dist(x, y):
    """
    Returns the Euclidean distance between vectors `x` and `y`.
    """
    return sum(
        [
            (bytes_to_int(x_i) - bytes_to_int(y_i)) ** 2
            for x_i, y_i in zip(x, y)
        ]
    ) ** (0.5)

In [0]:
def get_training_distances_for_test_sample(X_train, test_sample):
    return [dist(train_sample, test_sample) for train_sample in X_train]

In [0]:
def get_most_frequent_element(l):
    return max(l, key=l.count)

In [0]:
def knn(X_train, y_train, X_test, k=3):
    y_pred = []
    for test_sample_idx, test_sample in enumerate(X_test):
        print(test_sample_idx, end=' ', flush=True)
        training_distances = get_training_distances_for_test_sample(
            X_train, test_sample
        )
        sorted_distance_indices = [
            pair[0]
            for pair in sorted(
                enumerate(training_distances),
                key=lambda x: x[1]
            )
        ]
        candidates = [
            y_train[idx]
            for idx in sorted_distance_indices[:k]
        ]
        top_candidate = get_most_frequent_element(candidates)
        y_pred.append(top_candidate)
    print()
    return y_pred

In [0]:
def write_image(image, path):
        img = Image.fromarray(np.array(image), 'L')
        img.save(path)

In [0]:
def train_model(img_url):

    n_train = 1000
    n_test = 10
    k = 7
    X_train = read_images(TRAIN_DATA_FILENAME, n_train)
    y_train = read_labels(TRAIN_LABELS_FILENAME, n_train)
    X_test = read_images(TEST_DATA_FILENAME, n_test)
    y_test = read_labels(train_model, n_test)
    for idx, test_sample in enumerate(X_test):
            write_image(test_sample, f'{TEST_DIR}{idx}.png')
    X_train = extract_features(X_train)
    X_test = extract_features(X_test)
    y_pred = knn(X_train, y_train, X_test, k)

    accuracy = sum([
    int(y_pred_i == y_test_i)
    for y_pred_i, y_test_i
    in zip(y_pred, y_test)
    ]) / len(y_test)
    result_text=y_pred
    result = y_pred
    data=[]
    for page in result.pages:
        for line_idx, line in enumerate(page.lines):

            for word in page.words:

                data.append({'content':word.content,'confidence':word.confidence,'line_number': line_idx + 1})
    df = spark.createDataFrame(data)
    df.write.mode('overwrite').csv("abfss://wmu-fm@wmu.dfs.core.windows.net/output")
    df = spark.createDataFrame([result_text])

    return result_text
    


In [0]:
async def get_text_form_img(imgUrl):
    DATA_DIR = imgUrl
    TEST_DIR = "abfss://wmu-fm@wmu.dfs.core.windows.net/ocr_train/"
    DATASET = 'mnist'  
    TEST_DATA_FILENAME = imgUrl
    TEST_LABELS_FILENAME = TEST_DIR + DATASET + '/t10k-labels-idx1-ubyte'
    TRAIN_DATA_FILENAME = TEST_DIR + DATASET + '/train-images-idx3-ubyte'
    TRAIN_LABELS_FILENAME = TEST_DIR + DATASET + '/train-labels-idx1-ubyte'
    return await  train_model(imgUrl)