In [None]:
%%capture
!pip install --upgrade azure-cognitiveservices-vision-computervision
!pip install pillow
!pip install sentence_transformers


In [None]:
from azure.cognitiveservices.vision.computervision import ComputerVisionClient
from azure.cognitiveservices.vision.computervision.models import OperationStatusCodes
from azure.cognitiveservices.vision.computervision.models import VisualFeatureTypes
from msrest.authentication import CognitiveServicesCredentials

from array import array
import os
from PIL import Image
import sys
import time

'''
Authenticate
Authenticates your credentials and creates a client.
'''
# TO DO: Replace it with real key
subscription_key = "FIND_IT_FROM_KEEPER"
endpoint = "https://ocr-read-image-resource.cognitiveservices.azure.com/"

In [None]:
import re

In [None]:
computervision_client = ComputerVisionClient(endpoint, CognitiveServicesCredentials(subscription_key))

In [None]:
# Retry if name resolution failure occurs
'''
OCR: Read File using the Read API, extract text - remote
This example will extract text in an image, then print results, line by line.
This API call can also extract handwriting style text (not shown).
'''

def extract_text(url):
    print("===== Read File - remote =====")
    # Get an image with text
    read_image_url = url

    # Call API with URL and raw response (allows you to get the operation location)
    read_response = computervision_client.read(read_image_url,  raw=True)

    # Get the operation location (URL with an ID at the end) from the response
    read_operation_location = read_response.headers["Operation-Location"]
    # # Grab the ID from the URL
    operation_id = read_operation_location.split("/")[-1]

    # operation_id = "fa0f4d51-c13f-4223-bac2-5b6c55c5f3da"


    # Call the "GET" API and wait for it to retrieve the results 
    while True:
        read_result = computervision_client.get_read_result(operation_id)
        if read_result.status not in ['notStarted', 'running']:
            break
        time.sleep(1)

    # Print the detected text, line by line
    texts = []
    if read_result.status == OperationStatusCodes.succeeded:
        for text_result in read_result.analyze_result.read_results:
            for line in text_result.lines:
                texts.append(line.text)
                # print(line.text)
                # print(line.bounding_box)
    return texts

In [None]:
class ScreenshotType:
    def __init__(self):
        pass
    
    def __call__(self, text_str):
        if "Heart Pts Steps" in text_str:
            return "Google Fit"
        if "fitbit" in text_str or 'fitbit premium' in text_str:
            return "Fitbit"
        if "Move" in text_str and "Exercise" in text_str and "Stand" in text_str:
            return "Apple Health"
        if "TIME" in text_str and "PACE" in text_str and "ELEV. GAIN" in text_str:
            return "Germin"
        if "In bed" in text_str:
            return "Sleep App"
        if "breathe in" in text_str or "breathe out" in text_str:
            return "Calm App"
        if "CADENCE" in text_str and "RESISTANCE" in text_str:
            return "Peloton App"
        if "Average Duration" in text_str:
            return "Headspace App"
        else:
            return "Screenshot Type NOT matched!"
            

In [None]:
class IsNumber:
    def __init__(self):
        self.s = None
        self.google_fit_time_format1_re = re.compile(r"\d+h\d+m")
        self.google_fit_time_format2_re = re.compile(r"\d+h \d+m")


    def isInt(self, s):
        try:
            int(s)
            return True
        except ValueError:
            return False
    def isFloat(self, s):
        try:
            float(s)
            return True
        except ValueError:
            return False
    
    def __call__(self, s):
        s = str(s)
        if re.match(self.google_fit_time_format1_re, s) or re.match(self.google_fit_time_format2_re, s):
            return True
        self.s = s.replace(',', '')
        if self.isInt(self.s) or self.isFloat(self.s):
            return True
        else:
            return False


In [None]:
# Test Case
class Validation:
    def __init__(self):
        self.at_least_one_digit_re = re.compile('\d')

    def at_least_one_digit_test_case(self, info):
        # digit should be at the beginning
        for key, value in info.items():
            if not self.at_least_one_digit_re.match(value):
                return False
        return True

In [None]:
class ExtractInformationFitBit:
    def __init__(self):
        self.is_number = IsNumber()
        self.n_steps_taken = 0 
        self.validator = Validation()
        
    def get_digits_start_and_end_idx(self, text_str):
        digits_start_idx = 0
        digits_started = False
        n_digits = 0

        for i in range(len(text_str)):
            elm = text_str[i]
            if elm in ['steps', 'Steps']:
                # activity_info.append(text_str[i-1])
                digits_start_idx = i + 1
                digits_started = True
                self.n_steps_taken = text_str[i-1]
                continue

            if digits_started:
                if self.is_number(elm):
                    n_digits = n_digits + 1
                else:
                    break
        digits_end_idx = digits_start_idx + (n_digits-1)

        return (digits_start_idx, digits_end_idx)

    def __call__(self, text_str):
        digits_st, digits_end = self.get_digits_start_and_end_idx(text_str)
        titles_st_idx = digits_end + 1
        titles_end_idx = titles_st_idx + (digits_end - digits_st)

        digits = text_str[digits_st:digits_end+1]
        titles = text_str[titles_st_idx:titles_end_idx+1]

        # append steps
        titles = ['steps'] + titles 
        digits = [self.n_steps_taken] + digits

        # clean digits
        digits = [elm.replace(',', '') for elm in digits]
        digits = [re.sub('[^0-9.]','', elm) for elm in digits] # remove all expect numbers (with decimals)


        # lower case titles
        titles = [elm.lower() for elm in titles]

        extracted_info = dict(zip(titles, digits))
        if self.validator.at_least_one_digit_test_case(extracted_info):
            return extracted_info
        else:
            return {"Parsing Error": "Screenshot was not parsed correctly. Upload a matching screenshot!"}

In [None]:
class ExtractInformationGoogleFit:
    def __init__(self):
        self.is_number = IsNumber()
        self.n_steps_taken = 0
        self.heart_pts = 0
        self.google_fit_time_format1_re = re.compile(r"\d+h\d+m")
        self.google_fit_time_format2_re = re.compile(r"\d+h \d+m")
        self.validator = Validation()
        
    def get_digits_start_and_end_idx(self, text_str):
        digits_start_idx = 0
        digits_started = False
        n_digits = 0

        for i in range(len(text_str)):
            elm = text_str[i]
            if elm in ['Heart Pts Steps', 'heart pts steps']:
                # activity_info.append(text_str[i-1])
                digits_start_idx = i + 1
                digits_started = True
                self.n_steps_taken = text_str[i-1]
                self.heart_pts = text_str[i-2]
                continue

            if digits_started:
                if self.is_number(elm):
                    n_digits = n_digits + 1
                else:
                    break
        digits_end_idx = digits_start_idx + (n_digits-1)

        return (digits_start_idx, digits_end_idx)

    def __call__(self, text_str):
        digits_st, digits_end = self.get_digits_start_and_end_idx(text_str)
        titles_st_idx = digits_end + 1
        titles_end_idx = titles_st_idx + (digits_end - digits_st)

        digits = text_str[digits_st:digits_end+1]
        titles = text_str[titles_st_idx:titles_end_idx+1]

        # append heart_pts and steps
        titles = ['heart_pts', 'steps'] + titles 
        digits = [self.heart_pts, self.n_steps_taken] + digits

        # clean digits
        digits = [elm.replace(',', '') for elm in digits]
        is_sleep_duration = lambda elm: (re.match(self.google_fit_time_format1_re, elm) or re.match(self.google_fit_time_format2_re, elm)) 
        digits = [re.sub('[^0-9.]','', elm) if not is_sleep_duration(elm) else elm for elm in digits] # remove all expect numbers (with decimals)


        # lower case titles
        titles = [elm.lower() for elm in titles]

        extracted_info = dict(zip(titles, digits))
        if self.validator.at_least_one_digit_test_case(extracted_info):
            return extracted_info
        else:
            return {"Parsing Error": "Screenshot was not parsed correctly. Upload a matching screenshot!"}

In [None]:
class ExtractInformationAppleHealth:
    def __init__(self):
        self.is_number = IsNumber()
        self.n_steps_taken = 0
        self.heart_pts = 0
        self.google_fit_time_format1_re = re.compile(r"\d+h\d+m")
        self.google_fit_time_format2_re = re.compile(r"\d+h \d+m")
        self.validator = Validation()
        
    def __call__(self, text_str):
        info_start_idx = 0
        for i in range(len(text_str)):
            elm = text_str[i]
            next_elm = text_str[i+1]
            clean_elm = re.sub(r'\W+', '', elm)
            if clean_elm == 'Activity' and ('AM' in next_elm or 'PM' in next_elm):
                info_titles_start_idx = i + 2
                break
        are_steps_found = False
        for i in range(len(text_str)):
            elm = text_str[i]
            if elm == 'steps':
                n_steps_taken = text_str[i-1]
                are_steps_found = True
                break

            
        titles = text_str[info_titles_start_idx:info_titles_start_idx+3]
        info_end_idx = info_titles_start_idx+2
        digits_start_idx = info_end_idx + 1
        digits = text_str[digits_start_idx:digits_start_idx+3]
        
        if are_steps_found:
            titles = ['steps'] + titles
            digits = [str(n_steps_taken)] + digits 
        
        # Clean
        digits = [elm.replace(',', '') for elm in digits]
        titles = [elm.lower() for elm in titles]

        extracted_info = dict(zip(titles, digits))
        if self.validator.at_least_one_digit_test_case(extracted_info):
            return extracted_info
        else:
            return {"Parsing Error": "Screenshot was not parsed correctly. Upload a matching screenshot!"}


In [None]:
class ExtractInformationGermin:
    def __init__(self):
        self.validator = Validation()

    def __call__(self, text_str):
        info = {}
        time_index = text_str.index('TIME')
        time_amount = text_str[time_index+1]
        info['time'] = time_amount
        info['pace'] = text_str[time_index+3]
        info['elev. gain'] = text_str[time_index+6]
        info['calories'] = text_str[time_index+8]
        steps_indx = text_str.index('Steps')
        info['steps'] = text_str[steps_indx+1].replace(',', '')

        if self.validator.at_least_one_digit_test_case(info):
            return info
        else:
            return {"Parsing Error": "Screenshot was not parsed correctly. Upload a matching screenshot!"}


In [None]:
class ExtractInformationSleepApp:
    def __init__(self):
        self.validator = Validation()

    def __call__(self, text_str):
        info = {}
        is_duration_noted = False
        is_quality_noted = False
        for elm in text_str:
            if sleep_duration_re.match(elm) and not is_duration_noted:
                info['In Bed Time'] = elm
                is_duration_noted = True
            elif quality_re.match(elm) and not is_quality_noted:
                info['Quality'] = elm
                is_quality_noted = True

        if self.validator.at_least_one_digit_test_case(info):
            return info
        else:
            return {"Parsing Error": "Screenshot was not parsed correctly. Upload a matching screenshot!"}


In [None]:
class ExtractInformationCalmApp:
    def __init__(self):
        pass
    def __call__(self, text_str):
        info = {}
        for elm in text_str:
            if time_re.match(elm):
                info['meditation duration'] = elm
            elif elm == 'breathe in':
                info['meditation type'] = elm
            elif elm == 'breathe out':
                info['meditation type'] = elm

        return info

In [None]:
class ExtractInformationPelotonApp:
    def __init__(self):
        pass
    def __call__(self, text_str):
        number_re = re.compile("(\d+\.*\d*)")
        info = {}
        if not None in list(map(lambda elm: number_re.match(elm), text_str[-4:])) and text_str[-5] == "CALORIES (kcal)":
            info['speed (mi/h)'], info['distance (mi)'], info['total output (kj)'], info['calories (kcal)'] = text_str[-4:]
        return info

In [None]:
class ExtractInformationHeadspaceApp:
    def __init__(self):
        pass
    def __call__(self, text_str):
        info = {}
        Average_Duration_idx = text_str.index('Average Duration')
        avg_dur = text_str[Average_Duration_idx - 1]
        info['average duration (mins)'] = avg_dur

        Total_number_idx = text_str.index('Total number')
        n_sessions = text_str[Total_number_idx + 1]
        info['total number of sessions'] = n_sessions

        meditating_idx = text_str.index('meditating')
        time_meditating = text_str[meditating_idx - 1]
        info['total time meditating (hrs)'] = time_meditating
        return info

In [None]:
time_re = re.compile('\d+:\d+')

In [None]:
get_screenshot_Type = ScreenshotType()
# invoke the OCR
image_url = "https://miro.medium.com/max/750/1*4WUSd34gHRAjhp20u9qpZA.png"
text_str = extract_text(image_url)
text_str
_type = get_screenshot_Type(text_str)
print('Detected App', _type)
if _type == "Apple Health":
    print('apple')
    info_extractor = ExtractInformationAppleHealth()
elif _type == "Fitbit":
    print('fitbit')
    info_extractor = ExtractInformationFitBit()
elif _type == "Google Fit":
    print('google fit')
    info_extractor = ExtractInformationGoogleFit()
elif _type == "Germin":
    info_extractor = ExtractInformationGermin()
elif _type == "Sleep App":
    info_extractor = ExtractInformationSleepApp()
elif _type == "Calm App":
    info_extractor = ExtractInformationCalmApp()
elif _type == "Peloton App":
    info_extractor = ExtractInformationPelotonApp()
elif _type == "Headspace App":
    info_extractor = ExtractInformationHeadspaceApp()
else:
    print('no extractor found!')
    info_extractor = None
info_extractor(text_str)

===== Read File - remote =====
Detected App Headspace App


{'average duration (mins)': '17',
 'total number of sessions': '153',
 'total time meditating (hrs)': '43'}

## Construct Data


In [None]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('paraphrase-MiniLM-L6-v2')

#Our sentences we like to encode


In [None]:
movement_urls = [
                 'https://pbs.twimg.com/media/DuHMDcPW0AEm6dy.jpg',
                 'https://pageflows.com/media/videos/thumbnail_75684d6b-b930-478a-88dc-5ff9fd36cdc6.jpg',
                 'https://play-lh.googleusercontent.com/n0OJCLgqxNGh1CkTRc884n979zGVuJ3c-jWH_KaNB_YxhwzKl3xqub5h-zxhTb-U4A',
                 'http://www.davidleeking.com/wp-content/uploads/2017/08/IMG_0955-169x300.png',
                 'https://storage.googleapis.com/gweb-uniblog-publish-prod/images/Fit_update.max-1000x1000.png',
                 'https://wearos.google.com/static/images/phones/pixel5-fit_1x.png',
                 'https://i.insider.com/5d236b85a17d6c44c3476e73?width=750&format=jpeg&auto=webp',
                 'https://connect.garmin.com/static/screen-phone-myday-717fec02a4d8eea2690241774363b357.png'
]

mindfulness_urls = [
                    'https://thesweetsetup.com/wp-content/uploads/2019/11/sleepcycle.jpg',
                    'https://i1.wp.com/articles.mytennights.com/wp-content/uploads/2020/05/Sleep-Cycle-Image.jpg?fit=960%2C635&ssl=1',
                    'https://pbs.twimg.com/media/C3dM8nmWIAIery5.jpg',
                    'https://www.gannett-cdn.com/presto/2019/11/14/PUPP/97bace99-07b8-44eb-a5fe-00e4fea80362-1119_UP_Calm_app1_copy.png?width=640',
                    'https://miro.medium.com/max/750/1*4WUSd34gHRAjhp20u9qpZA.png'
]

In [None]:
def extract_features(url):
    # feature dim: 384
    text_str = extract_text(url)
    clean_tokens = [t for t in text_str if re.match(r'[^\W\d]*$', t)]
    sentences = ','.join(clean_tokens)
    sentence_embeddings = model.encode(sentences)
    return sentence_embeddings.tolist()

In [None]:
get_screenshot_Type = ScreenshotType()
# invoke the OCR
image_url = "https://pageflows.com/media/videos/thumbnail_75684d6b-b930-478a-88dc-5ff9fd36cdc6.jpg"
Features = []
Labels = []
labels_map = {'movement': 0, 'mindfulness': 1}



for image_url in movement_urls:
    sentence_embeddings = extract_features(image_url)
    Features.append(sentence_embeddings)
    Labels.append(0)

for image_url in mindfulness_urls:
    sentence_embeddings = extract_features(image_url)
    Features.append(sentence_embeddings)
    Labels.append(1)

===== Read File - remote =====
===== Read File - remote =====
===== Read File - remote =====
===== Read File - remote =====
===== Read File - remote =====
===== Read File - remote =====
===== Read File - remote =====
===== Read File - remote =====
===== Read File - remote =====
===== Read File - remote =====
===== Read File - remote =====
===== Read File - remote =====
===== Read File - remote =====


In [None]:
assert len(Features) == len(Labels)

In [None]:
assert len(Features[0]) == 384

In [None]:
TrainingData = {'Features': Features[1:-1], 'Labels': Labels[1:-1]}
ValidationData = {'Features': [Features[0], Features[-1]], 'Labels': [Labels[0], Labels[-1]]}

In [None]:
import json

In [None]:
data = {
    "TrainingData": TrainingData,
    "ValidationData": ValidationData
}

In [None]:
with open('data.json', 'w') as f:
    json.dump(data, f)