In [9]:
#!pip install pql
import pql

In [10]:
from optimade.filter import Parser
from lark import Transformer
p = Parser(version=(0, 9, 6))

In [16]:
import pql
from optimade.filter import Parser
from lark import Transformer


optiMadeToPQLOperatorSwitch = {
    "=":"==",
    "<=":"<=",
    ">=":">=",
    "=>":">=",
    "!=": "!=",
    "<":"<",
    ">":">",
}

def OptiMadeToPQLOperatorValidator(x):
    return optiMadeToPQLOperatorSwitch[x]

def combineMultiple(string, index):
    firstQuoteIndex = 0
    for firstQuoteIndex in reversed(range(0, index)):
        if(ord(string[firstQuoteIndex]) == 34):
            firstQuoteIndex = firstQuoteIndex + 1
            break
    lastQuoteIndex = index
    for lastQuoteIndex in range(index, len(string)):
        if(ord(string[lastQuoteIndex])== 34):
            break
    insertion = string[firstQuoteIndex:lastQuoteIndex]
    insertion = insertion.split(",")
    return string[:firstQuoteIndex - 1] + 'all({})'.format(insertion) + string[lastQuoteIndex + 1:], lastQuoteIndex + 1

def parseInput(PQL):
    length = len(PQL)
    i = 0
    while(i < length):
        if(PQL[i] == ","):
            PQL, newIndex = combineMultiple(PQL, i)
            i = newIndex 
        i = i + 1 
    return PQL

def cleanMongo(rawMongoDbQuery):
    if(type(rawMongoDbQuery) != dict):
        return
    for k in rawMongoDbQuery:
        value = rawMongoDbQuery[k]
        if(type(value) == list):
            for v in value:
                cleanMongo(v)
        elif(type(value) == dict):
            cleanMongo(value)
        elif(type(value) == str):
            try:
                value = float(value)
                rawMongoDbQuery[k] = float(value)
            except:
                f = value
            
        else:
            print("value", value)


class OptimadeToPQLTransformer(Transformer):
    def comparison(self, args):
        A = str(args[0])
        B = ""
        for b in args[2:]:
            if B == "":
                B = b
            else:
                B = B + ", " + b 
        operator = OptiMadeToPQLOperatorValidator(args[1])
        return A + operator + '"' + B + '"'
    def atom(self, args):
        return args[0]
    def term(self, args):
        result = ""
        for arg in args:
            if arg.lower() == "and" or arg.lower() == "or":
                arg = arg.lower()
            result = result + " " + arg.lower()
        return "(" + result.lstrip() + ")"
    
    def expression(self, args):
        result = ""
        for arg in args:
            lower = arg.lower()
            if(lower == "and" or lower == "or"):
                arg = lower
            result = result + " " + arg
        return result.lstrip()
    def start(self, args):
        return args[1]
    def combined(self, args):
        return args[0]
    

            
def optimadeToMongoDBConverter(optimadeQuery):
    p = Parser(version=(0, 9, 6))
    
    tree = p.parse(optimadeQuery)
    
    rawPQL = OptimadeToPQLTransformer().transform(tree)
    cleanedPQL = parseInput(rawPQL)

    mongoDbQuery = {}
    try:
        mongoDbQuery = pql.find(cleanedPQL)
    except:
        return "ERROR: Unable to PQL to MongoDB execute conversion"
    
    cleanMongo(mongoDbQuery)
    return mongoDbQuery
    

In [45]:
import unittest
class ConverterTest(unittest.TestCase):
    def setup(self):
        pass
    def testAll(self):
        self.test_one_input()
        self.test_two_inputs_with_and()
        self.test_two_inputs_with_or()
        self.test_valid_numbers_with_positive_sign()
        self.test_multiple_entries()
        self.test_mixing_upper_case_and_lower_case()
        self.test_float()
        self.test_scientific_number()
        self.test_negative_number()
    def custom_test(self, testCase, answer):
        message = "custom_test failed with testCase " + testCase
        self.assertEqual(optimadeToMongoDBConverter(testCase), answer, message)
        print("passed: custom_test passed")
        
    def test_one_input(self):
        testCase = "filter=a<0" 
        answer = {'a': {'$lt': 0}}
        message = "test_one_input failed"
        self.assertEqual(optimadeToMongoDBConverter(testCase), answer, message)
        print("passed: test_one_input passed")
    def test_two_inputs_with_and(self):
        testCase = "filter=a<0 and b>2" 
        answer = {'$and': [{'a': {'$lt': 0}}, {'b': {'$gt': 2}}]}
        message = "test_two_inputs_with_and failed"
        self.assertEqual(optimadeToMongoDBConverter(testCase), answer, message)
        print("passed: test_two_inputs_with_and")
    def test_two_inputs_with_or(self):
        testCase = "filter=a<0 or b>2" 
        answer = {'$or': [{'a': {'$lt': 0}}, {'b': {'$gt': 2}}]}
        message = "test_two_inputs_with_or failed"
        self.assertEqual(optimadeToMongoDBConverter(testCase), answer, message)
        print("passed: test_two_inputs_with_or")
    def test_valid_numbers_with_positive_sign(self):
        testCase = "filter= a=+1" 
        answer = {'a': +1}
        message = "test_valid_numbers_positive failed"
        self.assertEqual(optimadeToMongoDBConverter(testCase), answer, message)
        print("passed: test_valid_numbers_positive")
    def test_multiple_entries(self):
        testCase = "filter = elements='Si,O'" 
        answer = {'elements': {'$all': ['si', ' o']}}
        message = "test_multiple_entries failed"
        self.assertEqual(optimadeToMongoDBConverter(testCase), answer, message)
        print("passed: test_multiple_entries")
    def test_mixing_upper_case_and_lower_case(self):
        testCase = "filter = a < 0 AND elements='Si,O' or y < 1"
        answer = {'$or': [{'$and': [{'a': {'$lt': '0all(['}},
    {'elements': {'$all': ['si,  o']}}]},
  {'y': {'$lt': '])1'}}]}
        message = "test_mixing_upper_case_and_lower_case failed"
        self.assertEqual(optimadeToMongoDBConverter(testCase), answer, message)
        print("passed: test_mixing_upper_case_and_lower_case")
    def test_float(self):
        testCase = "filter =a < 2.2" 
        answer = {'a': {'$lt': 2.2}}
        message = "test_float failed"
        self.assertEqual(optimadeToMongoDBConverter(testCase), answer, message)
        print("passed: test_float")
    def test_scientific_number(self):
        testCase = "filter =a < 2E100" 
        answer = {'a': {'$lt': 2e+100}}
        message = "test_scientific_number failed"
        self.assertEqual(optimadeToMongoDBConverter(testCase), answer, message)
        print("passed: test_scientific_number")
    def test_negative_number(self):
        testCase = "filter =a < -2E-100" 
        answer = {'a': {'$lt': -2e-100}}
        message = "test_negative_number failed"
        self.assertEqual(optimadeToMongoDBConverter(testCase), answer, message)
        print("passed: test_negative_number")

In [46]:
ConverterTest().testAll()

passed: test_one_input passed
passed: test_two_inputs_with_and
passed: test_two_inputs_with_or
passed: test_valid_numbers_positive
passed: test_multiple_entries
passed: test_mixing_upper_case_and_lower_case
passed: test_float
passed: test_scientific_number
passed: test_negative_number
