# Parallel programming code

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m26.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=7a2c77ee69f59e652aa09ccc5abe04d13dc12c73798ed3689cf1a7afb537bdf4
  Stored in directory: /root/.cache/pip/wheels/b1/59/a0/a1a0624b5e865fd389919c1a10f53aec9b12195d6747710baf
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [None]:
# Press Shift+F10 to execute it or replace it with your code.
# Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings.
from pyspark import SparkContext

In [None]:
class CheapestOperator():

  def __init__(self, number:str, sc: SparkContext):
    self.number= number
    self.sc = sc

  def setup_dataset(self, operators_url:dict):
    # create a dictionary to store the prices for each prefix and operator
    self.prefix_prices_op = {}

    # load the price lists as RDDs
    for operator, url in operators_url.items():
        operator_data_file = self.sc.textFile(url).map(lambda line: line.split() + [operator])
        if not self.prefix_prices_op:
          for prefix, price, operator in operator_data_file.collect():
            self.prefix_prices_op[prefix] = (float(price),operator)
        else:
          self.update_prefix_prices(operator_data_file)

  # update prefix_prices_op with operator B (taking new prefixes and skiping duplicates, with the goal of keeping only the cheapest values)
  def update_prefix_prices(self, operator_data_file):
    for prefix, price, operator in operator_data_file.collect():
        if prefix in self.prefix_prices_op:
          if (self.prefix_prices_op[prefix][0] > float(price)):
            min_price = float(price)
            self.prefix_prices_op[prefix] = (min_price, operator)
        else:
          self.prefix_prices_op[prefix] = (float(price), operator)

  # define a function to find the cheapest operator for a given telephone number which has longest prefix
  def cheapest_operator(self):
      cheapest_price = float('inf')
      cheapest_operator = None
      longest_prefix = 0
      for prefix in self.prefix_prices_op.keys():
          if self.number.startswith(prefix):
            if len(prefix) >= longest_prefix:
              longest_prefix = len(prefix)
              cheapest_price = self.prefix_prices_op[prefix][0] # Get price
              cheapest_operator = self.prefix_prices_op[prefix][1] #Get operator

      if cheapest_operator is not None:
        return cheapest_operator, cheapest_price
      else:
        return "No operator found for this number"

In [None]:
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
  # create a SparkContext
  sc = SparkContext("local", "CheapestOperator")

  # test the function
  number = "4673212345"
  operators_url = {
      'OperatorA':'./op_a.txt',
      'OperatorB':'./op_b.txt',
      'OperatorC':'./op_c.txt',
  }
  obj = CheapestOperator(number, sc)
  obj.setup_dataset(operators_url)
  print(obj.cheapest_operator())  # prints Cheapest operator and its price "Operator A 1.1"

  # stop the SparkContext
  sc.stop()

# Test paralell programming code


In [None]:
import unittest
from pyspark import SparkContext

In [None]:
class TestCheapestOperator(unittest.TestCase):

    def setUp(self):
        self.sc = SparkContext("local", "CheapestOperator")
        self.operators_url = {
            'OperatorA':'./op_a.txt',
            'OperatorB':'./op_b.txt',
            'OperatorC':'./op_c.txt',
        }
    
    def tearDown(self):
        self.sc.stop()

    def test_cheapest_operator(self):
        number = "4673212345"
        obj = CheapestOperator(number, self.sc)
        obj.setup_dataset(self.operators_url)
        self.assertEqual(obj.cheapest_operator(), ('OperatorA', 1.1))

    def test_invalid_number(self):
        number = "987654321"
        obj = CheapestOperator(number, self.sc)
        obj.setup_dataset(self.operators_url)
        self.assertEqual(obj.cheapest_operator(), "No operator found for this number")

    def test_operator_B_only(self):
        number = "123456789"
        obj = CheapestOperator(number, self.sc)
        obj.setup_dataset({'OperatorB':'./op_b.txt'})
        self.assertEqual(obj.cheapest_operator(), ('OperatorB', 0.92))

    def test_logest_prefix(self):
        number = "4673210000"
        obj = CheapestOperator(number, self.sc)
        obj.setup_dataset(self.operators_url)
        self.assertEqual(obj.cheapest_operator(), ('OperatorA', 1.1))

    def test_multiple_operators_same_price(self):
        number = "1234567890"
        obj = CheapestOperator(number, self.sc)
        obj.setup_dataset(self.operators_url)
        self.assertEqual(obj.cheapest_operator(), ('OperatorA', 0.9))

    def test_single_operator_single_prefix(self):
        number = "123456789"
        obj = CheapestOperator(number, self.sc)
        obj.setup_dataset({'OperatorA':'./op_a.txt'})
        self.assertEqual(obj.cheapest_operator(), ('OperatorA', 0.9))

    def test_single_operator_multiple_prefixes(self):
        number = "4692114455"
        obj = CheapestOperator(number, self.sc)
        obj.setup_dataset({'OperatorA':'./op_a.txt'})
        self.assertEqual(obj.cheapest_operator(), ('OperatorA', 0.17))

    def test_same_price_different_operator(self):
        number = "2682114466"
        obj = CheapestOperator(number, self.sc)
        obj.setup_dataset(self.operators_url)
        self.assertEqual(obj.cheapest_operator(), ('OperatorA', 5.1))

    def test_case_insensitive_dict(self):
        number = "4673212345"
        obj = CheapestOperator(number, self.sc)
        obj.setup_dataset({'operatora':'./op_a.txt'})
        self.assertEqual(obj.cheapest_operator(), ('operatora', 1.1))
   

if __name__ == '__main__':
    unittest.main(argv=['first-arg-is-ignored'], exit=False)

.........
----------------------------------------------------------------------
Ran 9 tests in 16.147s

OK
