# Build test

* Connect to remote host

In [2]:
import os
from paramiko import SSHClient
from scp import SCPClient

USER='hadoop'
HOST='172.16.4.135'
PASSWD='hadoop'
REMOTE_WORKING_DIR = 'bloom'
REMOTE_PARAMS=f'/home/{USER}/{REMOTE_WORKING_DIR}/params'
REMOTE_LOGS=f'/home/{USER}/{REMOTE_WORKING_DIR}/logs'
PARAMS='data/params'
LOGS='data/logs'

client = SSHClient()
client.load_system_host_keys()
client.connect(hostname=HOST, username=USER, password=PASSWD)

* Upload compute params script

In [None]:
SCRIPTS = ['scripts/compute_params.sh']
scp = SCPClient(client.get_transport())
scp.put(SCRIPTS, remote_path=REMOTE_WORKING_DIR)
scp.close()

* Download params from cluster 

In [None]:
os.makedirs('data/', exist_ok=True)
scp = SCPClient(client.get_transport())
scp.get(REMOTE_PARAMS, recursive=True, local_path='data/')
scp.get(REMOTE_LOGS, recursive=True, local_path='data/')
scp.close()

* Create hadoop input files (M,K,#Mapper)

In [5]:
import os
import csv
from math import ceil
import json

folders = os.walk(PARAMS)
os.makedirs('scripts/input/hadoop', exist_ok=True)

INPUT='scripts/input/hadoop'

with open('data/title.ratings.tsv') as f_in: 
    N = num_lines = sum(1 for line in f_in) - 1

DEFAULT_MAPPERS = ceil(N / 8)
n_mappers = [4, 6, 8, 10, 12] # 2, 1.33, 1, 0.8, 0.66 map/core
n_splits = [ceil(N/m) for m in n_mappers]
parameters = {}
next(folders, None) # skip first element
for folder in folders:  

    tokens=folder[0].replace('\\','/').split('/')[-1].split('_')

    P=float(tokens[1][1:])
    K=int(tokens[2][1:])
    if K == 1:
        continue
    params_file = f'{folder[0]}/{folder[2][0]}'

    with(open(params_file)) as params:   
        reader = csv.reader(params, delimiter="\t")
        input = f'echo "title.ratings.tsv $1 {DEFAULT_MAPPERS} '
        input_filename = f'{tokens[1]}{tokens[2]}'
        m_size = ''
        k=0
        with open(f'{INPUT}/{input_filename}', 'w') as input_file:
            for line in reader:
                m_size += f'{line[3]} '
                k=line[4]
            input += f'{m_size}{k} $2"'
            input_file.write(input)
            parameters[f'P{P if P > 1e-05 else "0.00001"}K{K}'] = {'P': P, 'K': int(k), 'M': [int(m) for m in m_size.split(" ")[0:-1]], 'MAP': 8}
        if K == 0:
            for n_mapper, n_split in zip(n_mappers, n_splits):
                with open(f'{INPUT}/{input_filename}MAP{n_mapper}', 'w') as input_mapper_file:
                    input_mapper = f'echo "title.ratings.tsv $1 {n_split} {m_size}{k} $2"'
                    input_mapper_file.write(input_mapper)
                    parameters[f'P{P if P > 1e-05 else "0.00001"}K{K}MAP{n_mapper}'] = {'P': P, 'K': int(k), 'M': [int(m) for m in m_size.split(" ")[0:-1]], 'MAP': n_mapper}
                    
with open('data/parameters.json','w') as out:
    json.dump(parameters, out, indent=2)

* Create spark input files (M,K,#Mapper)

In [3]:
import os
import csv
from math import ceil
import json
folders = os.walk(PARAMS)
os.makedirs('scripts/input/spark', exist_ok=True)

INPUT='scripts/input/spark'
DEFAULT_PARTIONS=8

n_mappers = [1, 2, 4, 6, 8, 10, 12, 14, 16] # 2, 1.33, 1, 0.8, 0.66 map/core

next(folders, None) # skip first element
for folder in folders:  

    tokens=folder[0].replace('\\','/').split('/')[-1].split('_')

    P=float(tokens[1][1:])
    K=int(tokens[2][1:])
    if K == 1:
        continue
    params_file = f'{folder[0]}/{folder[2][0]}'

    parameters = {}
    with(open(params_file)) as params:   
        reader = csv.reader(params, delimiter="\t")
        input = f'echo "title.ratings.tsv $1 {DEFAULT_PARTIONS} '
        input_filename = f'{tokens[1]}{tokens[2]}'
        m_size = ''
        k=0
        with open(f'{INPUT}/{input_filename}', 'w') as input_file:
            for line in reader:
                m_size += f'{line[3]} '
                k=line[4]
            input += f'{m_size}{k} {P} $2"'
            input_file.write(input)

        if K == 0:
            for n_mapper in n_mappers:
                with open(f'{INPUT}/{input_filename}MAP{n_mapper}', 'w') as input_mapper_file:
                    input_mapper = f'echo "title.ratings.tsv $1 {n_mapper} {m_size}{k} {P} $2"'
    
                    input_mapper_file.write(input_mapper)


* Upload test and input

In [50]:

TEST_DIR = 'scripts/input'
SCRIPTS = ['scripts/hadoop_test.sh','scripts/spark_test.sh']
scp = SCPClient(client.get_transport())

scp.put(TEST_DIR, recursive=True, remote_path=REMOTE_WORKING_DIR)
scp.put(SCRIPTS, remote_path=REMOTE_WORKING_DIR)

scp.close()
client.close()