# Overview

A Spark Cluster with access to the previous made hdfs cluster. 

(add topology diagram)

Details on the HDFS cluster: 
- [09-hdfs-on-arm-pi/run-book-hdfs-on-pi.ipynb](https://github.com/6za/runbooks-jupyter/blob/master/09-hdfs-on-arm-pi/run-book-hdfs-on-pi.ipynb)


This cluster has:
- 1 Spark Master Node
- 2 Spark Workers Node
- 1 Spark Driver/App Node

The solution is based on the docker-compose files at [6za/spark-sample](https://github.com/6za/spark-sample).

The files are enhanced by setup to add network to allow this solution to work over multiple docker hosts without meshing them into a single virtual network. Exploring some Spark configs to allow pre-defined port allocation. 

This setup is using Intel Servers on this inital deployment.

In [1]:
import pandas as pd
import yaml
import os
import subprocess

## Key Links
- HDFS name node
- Spark master node
- Spark driver node

In [2]:
hosts = pd.read_csv("../common/hosts.csv")
supressed_columns = ['ip','user']
raspi_name_hosts = hosts[(hosts.hostname == "pi-node8")]
spark_master_hosts = hosts[(hosts.hostname == "kx-Lenovo-H520g")]
spark_driver_hosts = hosts[(hosts.hostname == "nuc-01")]
spark_workers_hosts = hosts[(hosts.hostname == "nuc-02") | (hosts.hostname == "nuc-03")]



In [3]:
raspi_name_hosts.drop(columns=supressed_columns)

Unnamed: 0,hostname,arch,gpu
11,pi-node8,armv7l,0


In [4]:
spark_master_hosts.drop(columns=supressed_columns)

Unnamed: 0,hostname,arch,gpu
3,kx-Lenovo-H520g,x86_64,1


In [5]:
spark_driver_hosts.drop(columns=supressed_columns)

Unnamed: 0,hostname,arch,gpu
4,nuc-01,x86_64,0


In [6]:
spark_workers_hosts.drop(columns=supressed_columns)

Unnamed: 0,hostname,arch,gpu
5,nuc-02,x86_64,0
6,nuc-03,x86_64,0


## Clone Repo

In [7]:
%%bash
mkdir ~/repos
cd ~/repos
rm -rf ~/repos/spark-sample
git clone https://github.com/6za/spark-sample.git

mkdir: cannot create directory '/root/repos': File exists
Cloning into 'spark-sample'...


## Create host lists

### Driver Host List

In [8]:

namenode_ip = raspi_name_hosts.iloc[0]['ip']
namenode_config = 'hadoop-namenode:%(ip)s' %  {"ip": namenode_ip}
spark_master_ip = spark_master_hosts.iloc[0]['ip']
spark_master_config = 'spark-master:%(ip)s' %  {"ip": spark_master_ip}
driver_hosts_list = [namenode_config,spark_master_config]

len(driver_hosts_list)

2

### Worker Hosts List

In [9]:
spark_driver_ip = spark_driver_hosts.iloc[0]['ip']
spark_driver_config = 'spark-driver:%(ip)s' %  {"ip": spark_driver_ip}

worker_hosts_list = [namenode_config,spark_master_config,spark_driver_config]

len(worker_hosts_list)

3

## Deploy Spark Master

- Deploy as is no modification on files

In [10]:
for index, row in spark_master_hosts.iterrows():
    print('\x1b[1;35m'+ row['hostname']+'\x1b[0m')
    docker_host = 'source /root/common/env.sh && export DOCKER_HOST=\"tcp://%(ip)s:2376\"' %  {"ip": row['ip']}
    command = "docker-compose -f ~/repos/spark-sample/docker-compose-spark-master.yaml down"  
    result = subprocess.check_output("bash -c \"%s && %s  || : \" " %(docker_host,command)  , shell=True, encoding='utf-8')        
    command = "docker-compose -f ~/repos/spark-sample/docker-compose-spark-master.yaml up -d"  
    result = subprocess.check_output("bash -c \"%s && %s  || : \" " %(docker_host,command)  , shell=True, encoding='utf-8')        
    print("Spark Master Started")

[1;35mkx-Lenovo-H520g[0m
Spark Master Started


## Deploy Spark Workers

In [11]:
node_count = 0
with open('/root/repos/spark-sample/docker-compose-spark-worker.yaml',"r") as file:
    # The FullLoader parameter handles the conversion from YAML
    # scalar values to Python the dictionary format
    config = yaml.load(file, Loader=yaml.FullLoader)
    config['services']['spark-worker']['extra_hosts'] = worker_hosts_list
    for index, row in spark_workers_hosts.iterrows():
        node_count = node_count + 1
        config['services']['spark-worker']['hostname'] = 'spark-worker-' + row['hostname']
        with open(r'docker-compose-spark-worker-current.yaml', 'w') as outputfile:
            documents = yaml.dump(config, outputfile)    
        print('\x1b[1;35m'+ row['hostname']+'\x1b[0m')
        docker_host = 'source /root/common/env.sh && export DOCKER_HOST=\"tcp://%(ip)s:2376\"' %  {"ip": row['ip']}
        command = "docker-compose -f docker-compose-spark-worker-current.yaml down"
        result = subprocess.check_output("bash -c \"%s && %s  || : \" " %(docker_host,command)  , shell=True, encoding='utf-8')        
        command = "docker-compose -f docker-compose-spark-worker-current.yaml up -d"  
        result = subprocess.check_output("bash -c \"%s && %s  || : \" " %(docker_host,command)  , shell=True, encoding='utf-8')        
        print("  Spark Worker started:" + row['hostname'])
        !sleep 5
        command = "docker ps"  
        result = subprocess.check_output("bash -c \"%s && %s  || : \" " %(docker_host,command)  , shell=True, encoding='utf-8')        
        print(result)

        
    

[1;35mnuc-02[0m
  Spark Worker started:nuc-02
CONTAINER ID        IMAGE                        COMMAND                  CREATED             STATUS              PORTS                                                                    NAMES
d130f28621cf        jupyter/all-spark-notebook   "/usr/local/spark/bi…"   7 seconds ago       Up 5 seconds        0.0.0.0:4040->4040/tcp, 0.0.0.0:51814-51815->51814-51815/tcp, 8888/tcp   11-spark-cluster_spark-worker_1
e599fdae0217        amkay/sensor-exporter        "/go/bin/sensor-expo…"   3 weeks ago         Up 30 hours         0.0.0.0:9255->9255/tcp                                                   node-exporter-collectors_tempsensor_1
b2424bf443eb        prom/node-exporter:v0.18.0   "/bin/node_exporter …"   3 weeks ago         Up 30 hours         0.0.0.0:9100->9100/tcp                                                   nodeexporter

[1;35mnuc-03[0m
  Spark Worker started:nuc-03
CONTAINER ID        IMAGE                        COMMAND          

## Deploy Spark Driver

In [12]:
node_count = 0
with open('/root/repos/spark-sample/docker-compose-spark-jupyter.yaml',"r") as file:
    # The FullLoader parameter handles the conversion from YAML
    # scalar values to Python the dictionary format
    config = yaml.load(file, Loader=yaml.FullLoader)
    config['services']['spark-driver']['extra_hosts'] = driver_hosts_list
    for index, row in spark_driver_hosts.iterrows():
        node_count = node_count + 1
        #fix_env = config['services']['spark-driver']['environment'].append
        config['services']['spark-driver']['environment'].append("SPARK_PUBLIC_DNS=%(ip)s" % {"ip": row['ip']})
        with open(r'docker-compose-spark-jupyter-current.yaml', 'w') as outputfile:
            documents = yaml.dump(config, outputfile)    
        print('\x1b[1;35m'+ row['hostname']+'\x1b[0m')
        docker_host = 'source /root/common/env.sh && export DOCKER_HOST=\"tcp://%(ip)s:2376\"' %  {"ip": row['ip']}
        command = "docker-compose -f docker-compose-spark-jupyter-current.yaml down"
        result = subprocess.check_output("bash -c \"%s && %s  || : \" " %(docker_host,command)  , shell=True, encoding='utf-8')        
        command = "docker-compose -f docker-compose-spark-jupyter-current.yaml up -d"  
        result = subprocess.check_output("bash -c \"%s && %s  || : \" " %(docker_host,command)  , shell=True, encoding='utf-8')        
        print("  Spark Driver started:" + row['hostname'])
        !sleep 20
        command = "docker logs 11-spark-cluster_spark-driver_1"  
        result = subprocess.check_output("bash -c \"%s && %s  || : \" " %(docker_host,command)  , shell=True, encoding='utf-8')        
        print(result)

[1;35mnuc-01[0m
  Spark Driver started:nuc-01
Set username to: jovyan
Executing the command: jupyter notebook



## Test Cluster

Sample Driver app:

Create via: http://`$spark-driver-ip`:8888/
```python
from pyspark.sql import SparkSession

spark1 = SparkSession.builder.appName('Sample').config("spark.driver.port", 51816).config("spark.blockManager.port", 51814).master('spark://spark-master:7077').getOrCreate()
spark1.sparkContext._conf.getAll()


from pyspark.sql.functions import *

range100 = spark1.range(100).withColumn("TEST",lit("1fdgs;dlkfg;lsdkgkf")).withColumn("TEST2",lit("4dsklgdfs;kg;lsdkfl;hk"))

range100.show()

range100.write.mode('overwrite').parquet("hdfs://hadoop-namenode:9000/sample1.txt")


sampleRead = spark1.read.parquet("hdfs://hadoop-namenode:9000/sample1.txt")
sampleRead.show()
```

Check cluster health at: http://`$spark-master-ip`:8088/