## RDD example

This notebook provide a dummy example of a map on Spark RDD, that can be used to check that parallelisation works fine on the cluster.

It consist of creating an RDD with `n_partitions` partitions, and apply a map function that waits for 2 seconds for each partition.

#### General imports

In [None]:
import os 
import sys
import time
import numpy as np
import pandas as pd

#### Start Spark session

A Spark session is created by using the pyspark.sql.SparkSession object. See [here](https://spark.apache.org/docs/latest/sql-programming-guide.html#starting-point-sparksession) for the API documentation on the SparkSession Object. 


In [None]:
#This is needed to start a Spark session from the notebook
os.environ['PYSPARK_SUBMIT_ARGS'] ="--conf spark.driver.memory=2g\
                                    pyspark-shell"

# For Yarn, so that Spark knows where it runs
os.environ['HADOOP_CONF_DIR']="/etc/hadoop/conf"
# For Yarn, so Spark knows which version to use (and we want Anaconda to be used, so we have access to numpy, pandas, and so forth)
os.environ['PYSPARK_PYTHON']="/usr/local/anaconda3/bin/python3"
os.environ['PYSPARK_DRIVER_PYTHON']="/usr/local/anaconda3/bin/python3"


from pyspark.sql import SparkSession

In [None]:
#Uncomment below to recreate a Spark session with other parameters
#spark.stop()
spark = SparkSession \
    .builder \
    .master("yarn") \
    .config("spark.executor.instances","5") \
    .appName("demoRDD") \
    .getOrCreate()
    
#When dealing with RDDs, we work the sparkContext object. See https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext
sc=spark.sparkContext

#### Start dummy Spark jobs

In [None]:
# Wait function
def wait2s(x):
    time.sleep(2)
    return x

In [None]:
n_partitions=8

data=range(0,n_partitions)
datardd=sc.parallelize(data,n_partitions)

datardd.map(wait2s).collect()

### Open Spark UI and check parallelisation

* Open Hadoop Web UI at `127.0.0.1:8088`, and click on your running Application. You should land on an URL similar to `http://127.0.0.1:8088/cluster/app/application_1523870291186_0032`
* Change `cluster/app` to `proxy` to get to the Spark UI: `http://127.0.0.1:8088/proxy/application_1523870291186_003`

### Stop session

In [None]:
spark.stop()

### Scalability

In [None]:
#Ten runs (rows), for number of executors in (1,2,5,10,20,50,100) (in columns)
n_executors=[1,2,5,10,20,50,100]
results_benchmark=np.zeros((10,7))


In [None]:
for i in range(len(n_executors)):
    
    print("Run benchmark with "+str(n_executors[i])+" executors")
    spark = SparkSession \
    .builder \
    .master("yarn") \
    .config("spark.executor.instances",str(n_executors[i])) \
    .appName("demoRDD") \
    .getOrCreate()
    
    sc=spark.sparkContext
    
    #100 partitions
    n_partitions=100
    data=range(0,n_partitions)
    datardd=sc.parallelize(data,n_partitions)

    for j in range(10):
        time_start=time.time()
        datardd.map(wait2s).collect()
        time_end=time.time()
        execution_time=time_end-time_start
        results_benchmark[j,i]=execution_time
    
    spark.stop()
        

In [None]:
pd_results=pd.DataFrame(results_benchmark)

In [None]:
pd_results

In [None]:
pd_results.to_csv("resultsBenchmark.csv",index=False,header=False)

### Scalability regression



In [None]:
def genData(N,n,random_seed):
    
    start = time.time()

    np.random.seed(0)   

    #Inputs and the weights of the linear combination are drawn at random
    X=np.random.rand(N,n)
    theta=np.random.rand(n)
    #noise=np.random.rand(N)

    Y=np.dot(X,theta)#+noise
    Y=Y[:,np.newaxis]
    Z=np.concatenate((X,Y),axis=1)

    print("Number of observations :",N)
    print("Number of features :",n)

    print("Dimension of X :",X.shape)
    print("Dimension of theta :",theta.shape)
    print("Dimension of Y :",Y.shape)

    end = time.time()
    print("Time to create artificial data: ",round(end - start,2),"seconds")
    
    return (X,Y,Z,theta)

In [None]:
#Let us generate the dataset 10M rows, 100 features
N=1000000
n=100
(X,Y,Z,theta)=genData(N,n,0)

In [None]:
sys.getsizeof(Z)

In [None]:
os.environ['PYSPARK_SUBMIT_ARGS'] ="--conf spark.driver.memory=20g\
                                    pyspark-shell"

# For Yarn, so that Spark knows where it runs
os.environ['HADOOP_CONF_DIR']="/etc/hadoop/conf"
# For Yarn, so Spark knows which version to use (and we want Anaconda to be used, so we have access to numpy, pandas, and so forth)
os.environ['PYSPARK_PYTHON']="/etc/anaconda3/bin/python"
os.environ['PYSPARK_DRIVER_PYTHON']="/etc/anaconda3/bin/python"


from pyspark.sql import SparkSession

In [None]:
def xtx_xty_row(z):
    x=np.array(z[:-1])
    y=z[-1]
    xtx=np.outer(x,x)
    xty=np.dot(x,y)
    return (xtx,xty)

In [None]:
#Ten runs (rows), for number of executors in (1,2,5,10,20,50,100) (in columns)
n_executors=[1,2,5,10,20,50,100]
results_benchmark=np.zeros((10,7))


In [None]:
for i in [6,5,4,3,2,1,0]:#range(len(n_executors)):
    
    #mem_per_exec=np.min(np.round(20/n_executors[i]),2)
    mem_per_exec=str(max([int(np.round(10/n_executors[i])),2]))+"g"
    
    print("Number of executors :"+str(n_executors[i]))
    print("Memory per executor: "+str(mem_per_exec))
    
    #spark.stop()
    print("Run benchmark with "+str(n_executors[i])+" executors")
    spark = SparkSession \
    .builder \
    .master("yarn") \
    .config("spark.executor.instances",n_executors[i]) \
    .config("spark.executor.memory",mem_per_exec) \
    .appName("demoRDD") \
    .getOrCreate()
    
    sc=spark.sparkContext
    
    time_start=time.time()
    
    B=400
    Z_RDD=sc.parallelize(Z,B)#.cache()
    
    time_end=time.time()
    
    print("Time to load data: "+str(time_end-time_start)+" s")
    
    print(Z_RDD.count())
    
    for j in range(10):
        time_start=time.time()
        
        (XtX,XtY)=Z_RDD.map(xtx_xty_row)\
        .reduce(lambda xtx_xty0,xtx_xty1:(xtx_xty0[0]+xtx_xty1[0],xtx_xty0[1]+xtx_xty1[1]))

        XtX_inverse=np.linalg.inv(XtX)

        theta_hat=np.dot(XtX_inverse,XtY)

        time_end=time.time()
        
        execution_time=time_end-time_start
        results_benchmark[j,i]=execution_time
    
    spark.stop()
        

In [None]:
pd_results=pd.DataFrame(results_benchmark)

In [None]:
pd_results

In [None]:
pd_results.to_csv("resultsBenchmarkRegression.csv",index=False,header=False)