# Big Data Platform
## Assignment 3: ServerLess

**By:**  

Oren Ben-Eliyahu 204079453 
<br>Yuval Barkan, 205714447

<br><br>

**The goal of this assignment is to:**
- Understand and practice the details of Serverless

**Instructions:**
- Students will form teams of two people each, and submit a single homework for each team.
- The same score for the homework will be given to each member of your team.
- Your solution is in the form of a Jupyter notebook file (with extension ipynb).
- Images/Graphs/Tables should be submitted inside the notebook.
- The notebook should be runnable and properly documented. 
- Please answer all the questions and include all your code.
- You are expected to submit a clear and pythonic code.
- You can change functions signatures/definitions.

**Submission:**
- Submission of the homework will be done via Moodle by uploading (not Zip):
    - Jupyter Notebook
    - 2 Log files
    - Additional local scripts
- The homework needs to be entirely in English.
- The deadline for submission is on Moodle.
- Late submission won't be allowed.

  
- In case of identical code submissions - both groups will get a Zero. 
- Some groups might be selected randomly to present their code.

**Requirements:**  
- Python 3.6 should be used.  
- You should implement the algorithms by yourself using only basic Python libraries (such as numpy,pandas,etc.)

<br><br><br><br>

**Grading:**
- Q0 - 10 points - Setup
- Q1 - 40 points - Serverless MapReduceEngine
- Q2 - 20 points - MapReduce job to calculate inverted index
- Q3 - 30 points - Shuffle

`Total: 100`

<br><br>

# Question 0
## Setup

1. Navigate to IBM Cloud and open a trial account. No need to provide a credit card
2. Choose IBM Cloud Object Storage service from the catalog
3. Create a new bucket in IBM Cloud Object Storage
4. Create credentials for the bucket with HMAC (access key and secret key)
5. Choose IBM Cloud Functions service from the catalog and create a service


#### Lithops setup
1. By using “git” tool, install master branch of the Lithops project from
https://github.com/lithops-cloud/lithops
2. Follow Lithops documentation and configure Lithops against IBM Cloud Functions and IBM Cloud Object Storage
3. Configure Lithops log level to be in DEBUG mode
4. Run Hello World example by using Futures API and verify all is working properly.


#### IBM Cloud Object Storage setup
1. Upload all the input CSV files that you used in homework 2 into the bucket you created in IBM Cloud Object Storage


<br><br><br>

In [1]:
!pip install git+https://github.com/lithops-cloud/lithops.git

Collecting git+https://github.com/lithops-cloud/lithops.git
  Cloning https://github.com/lithops-cloud/lithops.git to /private/var/folders/lg/pbb4ypp52kx1shc2q76nnnlc0000gn/T/pip-req-build-hq6wsr1f


Building wheels for collected packages: lithops
  Building wheel for lithops (setup.py) ... [?25ldone
[?25h  Created wheel for lithops: filename=lithops-2.5.9.dev0-py3-none-any.whl size=313590 sha256=4b5718c6cb068658adf9d6b2667492e511987a5ed80b15a17f1daffba3219f7d
  Stored in directory: /private/var/folders/lg/pbb4ypp52kx1shc2q76nnnlc0000gn/T/pip-ephem-wheel-cache-odav6kbn/wheels/f2/7a/47/12c1c1acee8bccb6b8e4b06ee26d13fe6f304831832bb2a33d
Successfully built lithops


In [2]:
!lithops test

2022-01-07 14:29:45,505 [INFO] lithops.config -- Lithops v2.5.9.dev0
2022-01-07 14:29:45,509 [INFO] lithops.storage.backends.localhost.localhost -- Localhost storage client created
2022-01-07 14:29:45,509 [INFO] lithops.localhost.localhost -- Localhost compute client created
2022-01-07 14:29:45,509 [INFO] lithops.invokers -- ExecutorID be84c7-0 | JobID A000 - Selected Runtime: python 
2022-01-07 14:29:45,512 [INFO] lithops.invokers -- ExecutorID be84c7-0 | JobID A000 - Starting function invocation: hello() - Total: 1 activations
2022-01-07 14:29:45,881 [INFO] lithops.invokers -- ExecutorID be84c7-0 | JobID A000 - View execution logs at /private/var/folders/lg/pbb4ypp52kx1shc2q76nnnlc0000gn/T/lithops/logs/be84c7-0-A000.log
2022-01-07 14:29:45,881 [INFO] lithops.wait -- ExecutorID be84c7-0 - Getting results from functions

  100%|██████████████████████████████████████████████████████████████████| 1/1  

2022-01-07 14:29:47,930 [INFO] lithops.executors -- ExecutorID be84c7-0 - Cleaning te

In [3]:
!pip install lithops aws



In [4]:
!pip install boto3



In [9]:
import lithops
import os
import boto3
from io import StringIO
import pandas as pd

In [6]:
AWSAccessKeyId = os.environ.get("AWSAccessKeyId")
AWSSecretKey = os.environ.get("AWSSecretKey")

In [7]:
config = {'lithops': {'backend': 'aws_lambda', 'storage': 'aws_s3', 'log_level':'DEBUG'},

          'aws':  {'access_key_id': AWSAccessKeyId,
                   'secret_access_key': AWSSecretKey},

          'aws_s3': {'storage_bucket': 'idc-big-data',
                     'region_name': 'eu-central-1'},
          
          'aws_lambda': {'execution_role': 'arn:aws:iam::981299761374:role/lithops-execution-role',
                     'region_name': 'eu-central-1'}}

In [8]:
# def hello(name):
#     return 'Hello {}!'.format(name)

# with lithops.FunctionExecutor(config=config) as fexec:
#     fut = fexec.call_async(hello, 'World')
#     print(fut.result())

# Question 1
## Serverless MapReduceEngine

Modify MapReduceEngine from homework 2 into the MapReduceServerlessEngine where map and reduce tasks executed as a serverless actions, instead of local threads. In particular:
1. Deploy all map tasks as a serverless actions by using Lithops against IBM Cloud Functions.
2. Collect results from all map tasks and store them in the same SQLite as you used in MapReduceEngine and use the same code for the sort and shuffle phase.
3. Deploy reduce tasks by using Lithops against IBM Cloud Functions. Instead of persisting results from reduce tasks, return results back to the MapReduceServerlessEngine and proceed with the same workflow as in MapReduceEngine
4. Return results of reduce tasks to the user

**Please attach:**  
Text file with all log messages Lithops printed to console during the execution. Make
sure log level is set to DEBUG mode.

#### Code:

In [10]:
import sqlite3
from sqlite3 import Error

In [24]:
kwargs = {"oren":1}

In [51]:
def my_map_function(id, x):
    print("I'm activation number {}".format(id))
    return x[1]['oren']

In [55]:
from lithops import Storage
storage = Storage(config=config)

2022-01-07 14:52:30,647 [INFO] lithops.storage.backends.aws_s3.aws_s3 -- S3 client created - Region: eu-central-1


In [72]:
# list_of_values = ['csv_files/myCSV1.csv', {'column':2}]

def my_map_function(list_of_values):
    file = pd.read_csv("/Users/ah11500/Documents/GitHub/IDC/big_data/sec_ex/csv_files/myCSV1.csv")
    return file.values

In [73]:
my_map_function(['yuval','oren'])

array([['Steven', 'Dana', 'London'],
       ['Marc', 'Michael', 'Haifa'],
       ['Johanna', 'Marc', 'New York'],
       ['Steven', 'Marc', 'Kiel'],
       ['Michael', 'John', 'London'],
       ['John', 'Albert', 'London'],
       ['Michael', 'Marc', 'Kiel'],
       ['Marc', 'Johanna', 'Palo Alto'],
       ['John', 'Dana', 'London'],
       ['Steven', 'Albert', 'New York']], dtype=object)

In [74]:
fexec = lithops.FunctionExecutor()
fexec.call_async(my_map_function, data=['csv_files/myCSV1.csv', {'column':2}])
print(fexec.get_result())
fexec.clean()

2022-01-07 14:58:38,792 [INFO] lithops.config -- Lithops v2.5.9.dev0
2022-01-07 14:58:38,793 [INFO] lithops.storage.backends.localhost.localhost -- Localhost storage client created
2022-01-07 14:58:38,794 [INFO] lithops.localhost.localhost -- Localhost compute client created
2022-01-07 14:58:38,794 [INFO] lithops.invokers -- ExecutorID 835ef1-30 | JobID A000 - Selected Runtime: python 
2022-01-07 14:58:38,798 [INFO] lithops.invokers -- ExecutorID 835ef1-30 | JobID A000 - Starting function invocation: my_map_function() - Total: 1 activations
2022-01-07 14:58:39,077 [INFO] lithops.invokers -- ExecutorID 835ef1-30 | JobID A000 - View execution logs at /private/var/folders/lg/pbb4ypp52kx1shc2q76nnnlc0000gn/T/lithops/logs/835ef1-30-A000.log
2022-01-07 14:58:39,082 [INFO] lithops.wait -- ExecutorID 835ef1-30 - Getting results from functions


HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=1.0), HTML(value='')))

2022-01-07 14:58:41,120 [INFO] lithops.executors -- ExecutorID 835ef1-30 - Cleaning temporary data



[['Steven' 'Dana' 'London']
 ['Marc' 'Michael' 'Haifa']
 ['Johanna' 'Marc' 'New York']
 ['Steven' 'Marc' 'Kiel']
 ['Michael' 'John' 'London']
 ['John' 'Albert' 'London']
 ['Michael' 'Marc' 'Kiel']
 ['Marc' 'Johanna' 'Palo Alto']
 ['John' 'Dana' 'London']
 ['Steven' 'Albert' 'New York']]


In [99]:
def create_connection(db_data):
    try:
        conn = sqlite3.connect(db_data)
        print("Establish connection")
    except Error as e:
        print(e)
    finally:
        return conn


def create_table(conn, create_table_query):
    try:
        c = conn.cursor()
        c.execute(create_table_query)
    except Error as e:
        print(e)
        
def query(conn, select_query):
    try:
        c = conn.cursor()
        select_all = select_query
        rows = c.execute(select_all).fetchall()
        # Output to the console screen
        return rows
    
    except Error as e:
        print(e)

In [100]:
MYDATA_DB = 'temp_results.db'

create_table_temp_results = '''CREATE TABLE IF NOT EXISTS temp_results(
                                key TEXT,
                                value TEXT)
                            '''
conn = create_connection(MYDATA_DB)
create_table(conn,create_table_temp_results)

Establish connection


In [None]:
import concurrent.futures as cf

class MapReduceEngine():
    
    def execute(self, input_data, map_function, reduce_function, params):
                        
        # map function
        s3_resource = boto3.resource('s3')
        map_collector = []
        for count, csv_file in enumerate(input_data):
            map_result = fexec.call_async(map_function, csv_file, **params)
            map_collector.append(map_result)

        # Load contect of all CSV files into the temp_results table
        for i in range(len(input_data)):
            data = pd.read_csv(f'mapreducetemp/part-tmp-{i+1}.csv')
            data.to_sql('temp_results',conn, if_exists='append',index=False)
        
        # SQL statements:
        grouping_query = "SELECT key, GROUP_CONCAT(value,',') FROM temp_results GROUP BY key"
        reduce_input_values = query(conn,grouping_query)
        
        unique_keys = "SELECT  count(distinct key) FROM temp_results"
        reduce_threads = query(conn,unique_keys)[0][0]
        
        # reduce function
        threads = []
        
        for i in range(reduce_threads):
            with cf.ThreadPoolExecutor() as executor:
                key = reduce_input_values[i][0]
                values = reduce_input_values[i][1].split(',')
                
                t = executor.submit(reduce_function, key, values)             
                df = t.result()
                df.to_csv(f'mapreducefinal/part-{i+1}-final.csv', index=False)
                threads.append(t)
        
        # Check if all threads are completed:   
        if len(threads) != reduce_threads:
            print("MapReduce Failed")
            return
        
        else:
            print("MapReduce Completed")         

In [104]:
from lithops import Storage

In [106]:
storage = Storage(config=config)

2022-01-07 13:18:42,087 [DEBUG] lithops.config -- Loading Storage backend module: aws_s3
2022-01-07 13:18:42,088 [DEBUG] lithops.storage.backends.aws_s3.aws_s3 -- Creating S3 client
2022-01-07 13:18:42,094 [INFO] lithops.storage.backends.aws_s3.aws_s3 -- S3 client created - Region: eu-central-1


In [115]:
files_list = storage.list_objects('idc-big-data', prefix='csv_files/')

In [207]:
input_data

['csv_files/myCSV1.csv',
 'csv_files/myCSV10.csv',
 'csv_files/myCSV11.csv',
 'csv_files/myCSV12.csv',
 'csv_files/myCSV13.csv',
 'csv_files/myCSV14.csv',
 'csv_files/myCSV15.csv',
 'csv_files/myCSV16.csv',
 'csv_files/myCSV17.csv',
 'csv_files/myCSV18.csv',
 'csv_files/myCSV19.csv',
 'csv_files/myCSV2.csv',
 'csv_files/myCSV20.csv',
 'csv_files/myCSV3.csv',
 'csv_files/myCSV4.csv',
 'csv_files/myCSV5.csv',
 'csv_files/myCSV6.csv',
 'csv_files/myCSV7.csv',
 'csv_files/myCSV8.csv',
 'csv_files/myCSV9.csv']

In [119]:
input_data =[]
for file in files_list:
    input_data.append(file['Key'])

In [212]:
from io import StringIO
params = {'column':2}
list_mycsv = []
def inverted_map(input_data, column_index=0):
    s=str(storage.get_object('idc-big-data',document_name),'utf-8')
    data_1 = StringIO(s)
    file=pd.read_csv(data_1)
    for value in file.iloc[:,column_index['column']].values:
        list_mycsv.append((value,document_name))
    return list_mycsv

In [213]:
#inverted_map('csv_files/myCSV1.csv',params)

In [218]:
map_collector=[]
fexec = lithops.FunctionExecutor(config=config)
fexec.map(inverted_map, 'csv_files/myCSV9.csv')
print(fut.get_result())
fexec.clean()
#map_collector.append(map_result)

2022-01-07 14:28:56,141 [INFO] lithops.config -- Lithops v2.5.9.dev0
2022-01-07 14:28:56,142 [DEBUG] lithops.config -- Loading Serverless backend module: aws_lambda
2022-01-07 14:28:56,142 [DEBUG] lithops.config -- Loading Storage backend module: aws_s3
2022-01-07 14:28:56,143 [DEBUG] lithops.storage.backends.aws_s3.aws_s3 -- Creating S3 client
2022-01-07 14:28:56,148 [INFO] lithops.storage.backends.aws_s3.aws_s3 -- S3 client created - Region: eu-central-1
2022-01-07 14:28:56,149 [DEBUG] lithops.serverless.backends.aws_lambda.aws_lambda -- Creating AWS Lambda client
2022-01-07 14:28:56,149 [DEBUG] lithops.serverless.backends.aws_lambda.aws_lambda -- Creating Boto3 AWS Session and Lambda Client
2022-01-07 14:28:57,129 [INFO] lithops.serverless.backends.aws_lambda.aws_lambda -- AWS Lambda client created - Region: eu-central-1
2022-01-07 14:28:57,131 [DEBUG] lithops.invokers -- ExecutorID 1f1011-26 - Invoker initialized. Max workers: 1000
2022-01-07 14:28:57,131 [DEBUG] lithops.invokers -

TypeError: cannot pickle '_hashlib.HASH' object

In [211]:
map_collector=[]
with lithops.FunctionExecutor(config=config) as fexec:
    for count, csv_file in enumerate(input_data):
        fut = fexec.map(inverted_map, [csv_file])
        map_result = fut.get_result()
        map_collector.append(map_result)

2022-01-07 14:24:21,663 [INFO] lithops.config -- Lithops v2.5.9.dev0
2022-01-07 14:24:21,664 [DEBUG] lithops.config -- Loading Serverless backend module: aws_lambda
2022-01-07 14:24:21,664 [DEBUG] lithops.config -- Loading Storage backend module: aws_s3
2022-01-07 14:24:21,665 [DEBUG] lithops.storage.backends.aws_s3.aws_s3 -- Creating S3 client
2022-01-07 14:24:21,669 [INFO] lithops.storage.backends.aws_s3.aws_s3 -- S3 client created - Region: eu-central-1
2022-01-07 14:24:21,669 [DEBUG] lithops.serverless.backends.aws_lambda.aws_lambda -- Creating AWS Lambda client
2022-01-07 14:24:21,670 [DEBUG] lithops.serverless.backends.aws_lambda.aws_lambda -- Creating Boto3 AWS Session and Lambda Client
2022-01-07 14:24:22,401 [INFO] lithops.serverless.backends.aws_lambda.aws_lambda -- AWS Lambda client created - Region: eu-central-1
2022-01-07 14:24:22,403 [DEBUG] lithops.invokers -- ExecutorID 1f1011-23 - Invoker initialized. Max workers: 1000
2022-01-07 14:24:22,404 [DEBUG] lithops.invokers -

TypeError: cannot pickle '_hashlib.HASH' object

In [178]:
params

{'column': 2}

In [70]:
session = boto3.Session(aws_access_key_id = os.environ.get("AWSAccessKeyId"),
                    aws_secret_access_key = os.environ.get("AWSSecretKey"))

s3 = session.resource('s3')

my_bucket = s3.Bucket('idc-big-data')
input_data = []

for file in my_bucket.objects.filter(Prefix = 'csv_files/'):
    input_data.append(file.key)

In [88]:
#input_data

In [73]:
bucket = 'idc-big-data'
s3 = boto3.client('s3')

In [81]:
# The goal of map function is to create csv files. 
# Therfore we chose to optimize the process and not create unnecessary variables (such as list) and return a data frame. 

def inverted_map(document_name:str, column_index:dict):
    obj = s3.get_object(Bucket= bucket, Key= document_name)
    file = pd.read_csv(obj['Body'])
    print(type(file))
    df = pd.DataFrame({'key':file.iloc[:,column_index['column']],'value':csv_dir})
    return df 


In [86]:
# params = {'column':2}
# s3_resource = boto3.resource('s3')
# for count, csv_file in enumerate(input_data):
#     fexec = lithops.FunctionExecutor(config=config)
#     fexec.map(inverted_map, csv_file, extra_args=params)
#     df = fexec.get_result()
#     csv_buffer = StringIO()
#     df.to_csv(csv_buffer)
#     s3_resource.Object('idc-big-data', f'mapreducetemp/part-tmp-{count+1}.csv').put(Body=csv_buffer.getvalue())

In [94]:
fexec = lithops.FunctionExecutor(config=config)
fexec.map(inverted_map, 'myCSV1.csv', extra_args=params)
df = fexec.get_result()
csv_buffer = StringIO()
df.to_csv(csv_buffer)
s3_resource.Object('idc-big-data', 'mapreducetemp/part-tmp-1.csv').put(Body=csv_buffer.getvalue())

2022-01-05 21:19:59,449 [INFO] lithops.config -- Lithops v2.5.9.dev0
2022-01-05 21:19:59,450 [DEBUG] lithops.config -- Loading Serverless backend module: aws_lambda
2022-01-05 21:19:59,451 [DEBUG] lithops.config -- Loading Storage backend module: aws_s3
2022-01-05 21:19:59,452 [DEBUG] lithops.storage.backends.aws_s3.aws_s3 -- Creating S3 client
2022-01-05 21:19:59,456 [INFO] lithops.storage.backends.aws_s3.aws_s3 -- S3 client created - Region: eu-central-1
2022-01-05 21:19:59,457 [DEBUG] lithops.serverless.backends.aws_lambda.aws_lambda -- Creating AWS Lambda client
2022-01-05 21:19:59,457 [DEBUG] lithops.serverless.backends.aws_lambda.aws_lambda -- Creating Boto3 AWS Session and Lambda Client
2022-01-05 21:20:00,268 [INFO] lithops.serverless.backends.aws_lambda.aws_lambda -- AWS Lambda client created - Region: eu-central-1
2022-01-05 21:20:00,270 [DEBUG] lithops.invokers -- ExecutorID 1f1011-10 - Invoker initialized. Max workers: 1000
2022-01-05 21:20:00,271 [DEBUG] lithops.invokers -

TypeError: cannot pickle '_hashlib.HASH' object

In [None]:
# The goal of reduce function is to create csv files. 
# Therfore we chose to optimize the process and not create unnecessary variables (such as list) and return a data frame. 

def inverted_reduce(key,documents):
    df = pd.DataFrame({'key':key,'value':list(set(documents))})
    return df



In [53]:
import boto3
from botocore.exceptions import ClientError

s3_client = boto3.client('s3', region_name='eu-central-1', 
                         aws_access_key_id = os.environ.get("AWSAccessKeyId"),
                         aws_secret_access_key = os.environ.get("AWSSecretKey"))

def upload_my_file(bucket, folder, file_to_upload, file_name):

    key = folder+"/"+file_name
    try:
        response = s3_client.upload_file(file_to_upload, bucket, key)
    except ClientError as e:
        print(e)
        return False
    except FileNotFoundError as e:
        print(e)
        return False
    return True

In [None]:
mapreduce = MapReduceEngine()
params = {'column':2}
status = mapreduce.execute(input_data, inverted_map, inverted_reduce, params)

# Task 2
## Submit MapReduce job to calculate inverted index
1. Use input_data: `cos://bucket/<path to CSV data>`
2. Submit MapReduce job with reduce and map functions as you used in homework 2, as follows

    `mapreduce = MapReduceServerlessEngine()`  
    `results = mapreduce.execute(input_data, inverted_map, inverted_index)`   
    `print(results)`

**Please attach:**  
Text file with all log messages Lithops printed to console during the execution. Make
sure log level is set to DEBUG mode.

#### Code:

# Question 3
## Shuffle

MapReduceServerlessEngine deploys both map and reduce tasks as serverless invocations.   
However, once map stage completed, the result are transferred from the map tasks to the SQLite database located on the client machine (laptop in your case), then performed local shuffle and then invoked reduce tasks passing them relevant parameters.

(To support your answers, feel free to use examples, Images, etc.)
<br><br>

**1. Explain why this approach is not efficient and what are cons and pros of such architecture in general. In broader scope you may assume that MapReduceServerlessEngine executed in some powerful machine and not just laptop.**

\<your answer here>

<br><br>
**2. Suggest how can you improve shuffle so intermediate data will not be downloaded to the client at all and shuffle performed in the cloud as well. Explain pros and cons of the approaches you suggest.**


\<your answer here>

<br><br>
**3. Can you make serverless shuffle?**


\<your answer here>

<br><br><br><br>
Good Luck :) 