# Building a Spark cluster on EC2 (Spot instance)

This note is on how to build a spark cluster on AWS ec2 from your jupyter notebook. Most of the tasks can be finished entirely within this notebook. However there are some dependencies and requirements that you will need to get ready:
* Python 3.3+ on linux for this notebook to run
* An AWS account with aws_access_key_id, and aws_secret_access_key
* Installed aws_cli
* boto3

ref: http://blog.insightdatalabs.com/spark-cluster-step-by-step/

###  1. Install boto3

boto3 is a handy aws resource management tool. We use it to build a simple cluster

In [44]:
# !pip install aws
# run 'aws configure' in a terminal

In [38]:
!pip install boto3

In [1]:
import boto3

** Check your aws account credentials **

In [None]:
# %load ~/.aws/credentials

### 2. Setting up the clusters

First we define two handy functions that we use often: check_status, and terminate

In [2]:
def check_status(ec2, ins_ids):
    """check statuses of instances 
    """
    for instance_id in ins_ids:
        print("{} is {}".format(instance_id, 
                                ec2.Instance(id=instance_id).state['Name']))
        
def terminate(ec2, ins_ids):
    """terminate instances 
    """    
    for instance_id in ins_ids:
        ec2.Instance(id=instance_id).terminate()    

#### Configurations 

In [3]:
# configurations
ubuntu14lts = 'ami-5ac2cd4d'  # image Ubuntu Server 14.04 LTS (HVM), SSD Volume Type
num_nodes = 4
instance_type = 'm4.large'
security_group_name = 'jupyter-spark-all' # security group name
key_pair_name = 'jupyter' # name for your pem file, will store in ~/.aws/
spot_price = '0.02'

In [4]:
ec2 = boto3.resource('ec2')
client = boto3.client('ec2')

#### Security group: open to all

For similicity, we allow all inbound traffic from anywhere. To be safe, worker nodes need to be carefully modified to only accept internal traffic.

In [11]:
# we will create a special security group 
response = ec2.create_security_group(GroupName=security_group_name,
                                     Description='security group for jupyter notebooks traffic')
# allow port 22 for ssh
client.authorize_security_group_ingress(GroupId=response.group_id,
                                     IpProtocol="tcp",
                                     CidrIp="0.0.0.0/0",
                                     FromPort=22,
                                     ToPort=22) 
# allow port 8080  
client.authorize_security_group_ingress(GroupId=response.group_id,
                                     IpProtocol="tcp",
                                     CidrIp="0.0.0.0/0",
                                     FromPort=8080,
                                     ToPort=8080) 

client.authorize_security_group_ingress(GroupId=response.group_id,
                                     IpProtocol='-1',
                                     CidrIp="0.0.0.0/0",
                                     FromPort=1,
                                     ToPort=65535) 

{'ResponseMetadata': {'HTTPHeaders': {'content-type': 'text/xml;charset=UTF-8',
   'date': 'Wed, 21 Dec 2016 23:46:40 GMT',
   'server': 'AmazonEC2',
   'transfer-encoding': 'chunked',
   'vary': 'Accept-Encoding'},
  'HTTPStatusCode': 200,
  'RequestId': 'e8cf1229-4e98-4414-9393-00bfb269b758',
  'RetryAttempts': 0}}

#### Create a new pem file

In [5]:
# create a new key pair (.pem)
from os.path import expanduser
from pathlib import Path
kp_path = expanduser("~") + '/.aws/'+key_pair_name + '.pem'
if not Path(kp_path).is_file():
    key_pair = client.create_key_pair(KeyName=key_pair_name)
    with open(kp_path,'w') as wt:
        wt.write(key_pair['KeyMaterial'])
else:
    print("{} already exists.".format(kp_path))

/home/ddu/.aws/jupyter.pem already exists.


In [11]:
!chmod 600 {kp_path}

#### Commands to run at launch for all nodes

We will let all the nodes run the following commands upon spinning up. <br>
It will perform the following tasks:
* Install java-8
* Download spark-1.6.3 and install
* Create ~/.profile and set PATH

In [6]:
user_data = '''#!/bin/bash
sudo add-apt-repository ppa:openjdk-r/ppa 
sudo apt-get update 
sudo apt-get install -y openjdk-8-jdk
sudo apt-get install scala
wget http://apache.mirrors.tds.net/spark/spark-1.6.3/spark-1.6.3-bin-hadoop2.4.tgz -P ~/Downloads
sudo tar zxvf ~/Downloads/spark-* -C /usr/local
sudo mv /usr/local/spark-* /usr/local/spark

sudo cat << EOF >> /home/ubuntu/.profile
export SPARK_HOME=/usr/local/spark  
export PATH=DOLLAR_SIGNPATH:DOLLAR_SIGNSPARK_HOME/bin  
EOF

sed -i -e 's/DOLLAR_SIGN/$/g' /home/ubuntu/.profile
source /home/ubuntu/.profile
sudo chown -R ubuntu $SPARK_HOME  

echo "#!/usr/bin/env bash" >> $SPARK_HOME/conf/spark-env.sh
echo "export JAVA_HOME=/usr" >> $SPARK_HOME/conf/spark-env.sh
echo "export SPARK_PUBLIC_DNS='current_node_public_dns' " >> $SPARK_HOME/conf/spark-env.sh
echo "export SPARK_WORKER_CORES={}" >> $SPARK_HOME/conf/spark-env.sh
'''.format( (num_nodes - 1)*2 )
user_data

'#!/bin/bash\nsudo add-apt-repository ppa:openjdk-r/ppa \nsudo apt-get update \nsudo apt-get install -y openjdk-8-jdk\nsudo apt-get install scala\nwget http://apache.mirrors.tds.net/spark/spark-1.6.3/spark-1.6.3-bin-hadoop2.4.tgz -P ~/Downloads\nsudo tar zxvf ~/Downloads/spark-* -C /usr/local\nsudo mv /usr/local/spark-* /usr/local/spark\n\nsudo cat << EOF >> /home/ubuntu/.profile\nexport SPARK_HOME=/usr/local/spark  \nexport PATH=DOLLAR_SIGNPATH:DOLLAR_SIGNSPARK_HOME/bin  \nEOF\n\nsed -i -e \'s/DOLLAR_SIGN/$/g\' /home/ubuntu/.profile\nsource /home/ubuntu/.profile\nsudo chown -R ubuntu $SPARK_HOME  \n\necho "#!/usr/bin/env bash" >> $SPARK_HOME/conf/spark-env.sh\necho "export JAVA_HOME=/usr" >> $SPARK_HOME/conf/spark-env.sh\necho "export SPARK_PUBLIC_DNS=\'current_node_public_dns\' " >> $SPARK_HOME/conf/spark-env.sh\necho "export SPARK_WORKER_CORES=6" >> $SPARK_HOME/conf/spark-env.sh\n'

In [7]:
import base64
user_data_encoded = base64.b64encode(user_data.encode('utf-8'))
user_data_encoded.decode('utf-8')

'IyEvYmluL2Jhc2gKc3VkbyBhZGQtYXB0LXJlcG9zaXRvcnkgcHBhOm9wZW5qZGstci9wcGEgCnN1ZG8gYXB0LWdldCB1cGRhdGUgCnN1ZG8gYXB0LWdldCBpbnN0YWxsIC15IG9wZW5qZGstOC1qZGsKc3VkbyBhcHQtZ2V0IGluc3RhbGwgc2NhbGEKd2dldCBodHRwOi8vYXBhY2hlLm1pcnJvcnMudGRzLm5ldC9zcGFyay9zcGFyay0xLjYuMy9zcGFyay0xLjYuMy1iaW4taGFkb29wMi40LnRneiAtUCB+L0Rvd25sb2FkcwpzdWRvIHRhciB6eHZmIH4vRG93bmxvYWRzL3NwYXJrLSogLUMgL3Vzci9sb2NhbApzdWRvIG12IC91c3IvbG9jYWwvc3BhcmstKiAvdXNyL2xvY2FsL3NwYXJrCgpzdWRvIGNhdCA8PCBFT0YgPj4gL2hvbWUvdWJ1bnR1Ly5wcm9maWxlCmV4cG9ydCBTUEFSS19IT01FPS91c3IvbG9jYWwvc3BhcmsgIApleHBvcnQgUEFUSD1ET0xMQVJfU0lHTlBBVEg6RE9MTEFSX1NJR05TUEFSS19IT01FL2JpbiAgCkVPRgoKc2VkIC1pIC1lICdzL0RPTExBUl9TSUdOLyQvZycgL2hvbWUvdWJ1bnR1Ly5wcm9maWxlCnNvdXJjZSAvaG9tZS91YnVudHUvLnByb2ZpbGUKc3VkbyBjaG93biAtUiB1YnVudHUgJFNQQVJLX0hPTUUgIAoKZWNobyAiIyEvdXNyL2Jpbi9lbnYgYmFzaCIgPj4gJFNQQVJLX0hPTUUvY29uZi9zcGFyay1lbnYuc2gKZWNobyAiZXhwb3J0IEpBVkFfSE9NRT0vdXNyIiA+PiAkU1BBUktfSE9NRS9jb25mL3NwYXJrLWVudi5zaAplY2hvICJleHBvcnQgU1BBUktfUFVCTElDX0ROUz0nY3VycmVudF9

### 3. Send out request to create instances, finally!!

For more details on other options (for example spot instances), pls refer the boto3 documentation linked below

http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances

In [8]:
volumns = { 'DeviceName': '/dev/sda1',
            'Ebs': { 'VolumeSize': 200,
                     'DeleteOnTermination': True,
                     'VolumeType': 'standard', # Magnetic
                    },
          }
volumns

{'DeviceName': '/dev/sda1',
 'Ebs': {'DeleteOnTermination': True,
  'VolumeSize': 200,
  'VolumeType': 'standard'}}

In [33]:
# now create instances
#instances = ec2.create_instances(ImageId = ubuntu14lts,
#                                 SecurityGroups = [ security_group_name ],
#                                 InstanceType = instance_type, 
#                                 KeyName = key_pair_name,
#                                 MinCount = num_nodes,
#                                 MaxCount = num_nodes,
#                                 UserData = user_data,    
#                                 BlockDeviceMappings=[volumns])

In [9]:
# spot instance
response = client.request_spot_instances(
    SpotPrice=spot_price,
    Type='one-time', #|'persistent',
    InstanceCount=num_nodes,
    LaunchSpecification={
        'ImageId': ubuntu14lts,
        'KeyName': key_pair_name,
        'SecurityGroups': [
            security_group_name,
        ],
        'UserData': user_data_encoded.decode('utf-8'),
        'InstanceType': instance_type,

        'BlockDeviceMappings': [ volumns],
    })

In [15]:
def check_spot_instances(response):
    req_ids = []
    ins_ids = []
    for ins in response['SpotInstanceRequests']:
        req_ids.append(ins['SpotInstanceRequestId'])  
    res = client.describe_spot_instance_requests( SpotInstanceRequestIds=req_ids )
    for i,ins in enumerate(res['SpotInstanceRequests']):
        print('{} {}'.format(req_ids[i],ins['Status']['Code']))
        #print(ins)
        if 'InstanceId' in ins:
            ins_ids.append(ins['InstanceId'])
    return ins_ids
    #print(res['SpotInstanceRequests'])

In [16]:
ins_ids = check_spot_instances(response)

sir-3vbg78kk fulfilled
sir-cq184ymk fulfilled
sir-hvrg7zej fulfilled
sir-ww4g7c7j fulfilled


#### Check this a couple of times until all are running/or terminated

In [20]:
check_status(ec2, ins_ids)

i-09dbbb009395ededa is running
i-06e44cd235931c050 is running
i-0e5f40df170d9cdc8 is running
i-0468a79307e26c9a3 is running


In [36]:
# BE CAUTIOUS!! This will kill the whole cluster
terminate_spot_instances(ec2, instances)

In [21]:
# we need a bunch of parameters that will be needed below
# We will set the first node to be the master, and others are slaves
pub_dns = []
pri_dns = []
for idx in ins_ids:
    #print("{} is {}".format(instance.id, ec2.Instance(id=instance.id).state['Name']))
    pub_dns.append(ec2.Instance(id=idx).public_dns_name)
    pri_dns.append(ec2.Instance(id=idx).private_dns_name.split('.')[0])
    print(ec2.Instance(id=idx).public_dns_name)

ec2-54-227-16-198.compute-1.amazonaws.com
ec2-54-161-48-132.compute-1.amazonaws.com
ec2-54-144-23-151.compute-1.amazonaws.com
ec2-54-227-162-108.compute-1.amazonaws.com


### 4. Now, we need to do a bunch of modifications to the spark config files

In [22]:
# first, upload a copy of pem file to master
!scp -i {kp_path} -o 'StrictHostKeyChecking no' {kp_path} ubuntu@{pub_dns[0]}:/home/ubuntu/.ssh

jupyter.pem                                   100% 1670     1.6KB/s   00:00    


In [23]:
# ssh_cmd: cmd to update ssh

pem_key_file = '/home/ubuntu/.ssh/'+key_pair_name+'.pem'
ssh_cmd = '''
cat << EOF | tee /home/ubuntu/.ssh/config
Host namenode
  HostName {}
  User ubuntu
  IdentityFile {}
'''.format( pub_dns[0], pem_key_file)
for i,dns in enumerate(pub_dns[1:]):
    ssh_cmd += '''
Host datanode{}
  HostName {}
  User ubuntu
  IdentityFile {}
    '''.format(i+1, dns, pem_key_file)
ssh_cmd += '''
EOF

ssh-keygen -f /home/ubuntu/.ssh/id_rsa -t rsa -P ''
cat /home/ubuntu/.ssh/id_rsa.pub >> /home/ubuntu/.ssh/authorized_keys
'''
for i,dns in enumerate(pub_dns[1:]):
    ssh_cmd += '''
    cat /home/ubuntu/.ssh/id_rsa.pub | ssh -o 'StrictHostKeyChecking no' datanode{} 'cat >> /home/ubuntu/.ssh/authorized_keys'
    '''.format(i+1)
#ssh_cmd

In [24]:
# update the ssh config at master
!ssh -i {kp_path} -o "StrictHostKeyChecking no" "ubuntu@"{pub_dns[0]} "{ssh_cmd}"

Host namenode
  HostName ec2-54-227-16-198.compute-1.amazonaws.com
  User ubuntu
  IdentityFile /home/ubuntu/.ssh/jupyter.pem

Host datanode1
  HostName ec2-54-161-48-132.compute-1.amazonaws.com
  User ubuntu
  IdentityFile /home/ubuntu/.ssh/jupyter.pem
    
Host datanode2
  HostName ec2-54-144-23-151.compute-1.amazonaws.com
  User ubuntu
  IdentityFile /home/ubuntu/.ssh/jupyter.pem
    
Host datanode3
  HostName ec2-54-227-162-108.compute-1.amazonaws.com
  User ubuntu
  IdentityFile /home/ubuntu/.ssh/jupyter.pem
    
Generating public/private rsa key pair.
Your identification has been saved in /home/ubuntu/.ssh/id_rsa.
Your public key has been saved in /home/ubuntu/.ssh/id_rsa.pub.
The key fingerprint is:
82:be:37:78:77:f4:bf:20:6f:0a:2f:35:94:81:76:9b ubuntu@ip-172-31-8-85
The key's randomart image is:
+--[ RSA 2048]----+
|         .       |
|        o o      |
|       . . =     |
|     .    E      |
|    . . S.       |
|   .   .  +      |
|    ..  .o.o.    |
|    ..+ oo.oo.   |
|   

In [28]:
# update each node (include master)
for everybody in pub_dns: # 
    update_cmd = 'source /home/ubuntu/.profile; '
    update_cmd += 'sudo sed -i "s/current_node_public_dns/{}/g" \$SPARK_HOME/conf/spark-env.sh;'.format(everybody)
    print(everybody)
    !ssh -i {kp_path} -o "StrictHostKeyChecking no" "ubuntu@"{everybody} "{update_cmd}" 

ec2-54-227-16-198.compute-1.amazonaws.com
ec2-54-161-48-132.compute-1.amazonaws.com
ec2-54-144-23-151.compute-1.amazonaws.com
ec2-54-227-162-108.compute-1.amazonaws.com


#### Now update the master node config

In [25]:
# master_cmd: modify master 

master_cmd = ' sudo rm -rf \$SPARK_HOME/conf/slaves; '
master_cmd += ' source /home/ubuntu/.profile; '
for slave in pub_dns[1:]:
    master_cmd += 'sudo echo "{}" | sudo tee --append \$SPARK_HOME/conf/slaves; '.format(slave)
master_cmd += ' sudo sed -i -e "s/current_node_public_dns/{}/g" \$SPARK_HOME/conf/spark-env.sh; '
master_cmd

' sudo rm -rf \\$SPARK_HOME/conf/slaves;  source /home/ubuntu/.profile; sudo echo "ec2-54-161-48-132.compute-1.amazonaws.com" | sudo tee --append \\$SPARK_HOME/conf/slaves; sudo echo "ec2-54-144-23-151.compute-1.amazonaws.com" | sudo tee --append \\$SPARK_HOME/conf/slaves; sudo echo "ec2-54-227-162-108.compute-1.amazonaws.com" | sudo tee --append \\$SPARK_HOME/conf/slaves; '

In [26]:
# modify master
!ssh -i {kp_path} -o "StrictHostKeyChecking no" "ubuntu@"{pub_dns[0]} "{master_cmd}"

ec2-54-161-48-132.compute-1.amazonaws.com
ec2-54-144-23-151.compute-1.amazonaws.com
ec2-54-227-162-108.compute-1.amazonaws.com


## Hurrah! You are ready to run your spark cluster!

It is recommended to use your termininal to start the cluster <br>
Open a terminal, ssh to your master node <br>
And run the commands:

```bash
master$ $SPARK_HOME/sbin/start-all.sh
```

In case to stop,
```bash
master$ $SPARK_HOME/sbin/stop-all.sh
```

For more details, check out Ouyang's post: <br>
http://blog.insightdatalabs.com/spark-cluster-step-by-step/

Now go to this place to find the status

In [35]:
start_cmd = ' source /home/ubuntu/.profile; \$SPARK_HOME/sbin/start-all.sh; '
!ssh -i {kp_path} -o "StrictHostKeyChecking no" "ubuntu@"{pub_dns[0]} "{start_cmd}"

starting org.apache.spark.deploy.master.Master, logging to /usr/local/spark/logs/spark-ubuntu-org.apache.spark.deploy.master.Master-1-ip-172-31-8-85.out
ec2-54-161-48-132.compute-1.amazonaws.com: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/spark/logs/spark-ubuntu-org.apache.spark.deploy.worker.Worker-1-ip-172-31-14-86.out
ec2-54-227-162-108.compute-1.amazonaws.com: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/spark/logs/spark-ubuntu-org.apache.spark.deploy.worker.Worker-1-ip-172-31-1-22.out
ec2-54-144-23-151.compute-1.amazonaws.com: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/spark/logs/spark-ubuntu-org.apache.spark.deploy.worker.Worker-1-ip-172-31-0-127.out


In [36]:
stop_cmd = ' source /home/ubuntu/.profile; \$SPARK_HOME/sbin/stop-all.sh; '
!ssh -i {kp_path} -o "StrictHostKeyChecking no" "ubuntu@"{pub_dns[0]} "{stop_cmd}"

ec2-54-227-162-108.compute-1.amazonaws.com: stopping org.apache.spark.deploy.worker.Worker
ec2-54-161-48-132.compute-1.amazonaws.com: stopping org.apache.spark.deploy.worker.Worker
ec2-54-144-23-151.compute-1.amazonaws.com: stopping org.apache.spark.deploy.worker.Worker
stopping org.apache.spark.deploy.master.Master


In [27]:
'https://{}:8080'.format(pub_dns[0])

'https://ec2-54-227-16-198.compute-1.amazonaws.com:8080'

### Setup to tunnel to local jupyter notebook

In [39]:
# get aws_access_key_id and aws_secret_access_key from your aws credentials
%load /home/ddu/.aws/credentials

In [42]:
from notebook.auth import passwd
notebook_pass = '123456'
encrypted = passwd(notebook_pass, algorithm='sha1')

In [46]:
# install ipython
master_cmd = ''
master_cmd += 'sudo apt-get install -y python-dev python-pip python-numpy python-scipy python-pandas gfortran; '
master_cmd += 'sudo pip install nose "ipython[notebook]" ; '
master_cmd += '''cat << EOF | tee --append /home/ubuntu/.profile
export AWS_ACCESS_KEY_ID={}
export AWS_SECRET_ACCESS_KEY={}
EOF

mkdir -p /home/ubuntu/.jupyter
if [ ! -f "/home/ubuntu/.jupyter/jupyter_notebook_config.py" ]; then
cat << EOF | tee /home/ubuntu/.jupyter/jupyter_notebook_config.py
c.NotebookApp.ip = '*'
c.NotebookApp.password = {}
c.NotebookApp.open_browser = False
# It is a good idea to set a known, fixed port for server access
c.NotebookApp.port = 8888
EOF
fi

PYSPARK_DRIVER_PYTHON=ipython
tmux
jupyter notebook

'''.format( aws_access_key_id,  aws_secret_access_key, encrypted)
master_cmd

'sudo apt-get install -y python-dev python-pip python-numpy python-scipy python-pandas gfortran; sudo pip install nose "ipython[notebook]" ; cat << EOF | tee --append /home/ubuntu/.profile\nexport AWS_ACCESS_KEY_ID=AKIAI374YOEQVFDUFCEA\nexport AWS_SECRET_ACCESS_KEY=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\nEOF\n\nmkdir -p /home/ubuntu/.jupyter\nif [ ! -f "/home/ubuntu/.jupyter/jupyter_notebook_config.py" ]; then\ncat << EOF | tee /home/ubuntu/.jupyter/jupyter_notebook_config.py\nc.NotebookApp.ip = \'*\'\nc.NotebookApp.password = sha1:9f3a012efb18:23c9567caba001c35910efb7d1cea0e7a9f093bb\nc.NotebookApp.open_browser = False\n# It is a good idea to set a known, fixed port for server access\nc.NotebookApp.port = 8888\nEOF\nfi\n\nPYSPARK_DRIVER_PYTHON=ipython\ntmux\njupyter notebook\n\n'

In [49]:
!ssh -i {kp_path} -o "StrictHostKeyChecking no" "ubuntu@"{pub_dns[0]} "{master_cmd}"

## Don't forget to terminate

In [128]:
terminate(ec2, ins_ids)

In [129]:
check_status(ec2, ins_ids)

i-012981a062f8abaa3 is shutting-down
i-090d3d3075ae24f14 is shutting-down
i-095864467acef2fe3 is shutting-down
i-038da90f77029607a is shutting-down
