# ModOP Création d'un env isolé pour PySpark


Dans l'environnement virtuel on crée un répertoire, dans ce même répertoire il faut descendre les paquets que l'on souhaite 
rendre disponible sur le cluster, par exemple pour le paquet numpy:
pip download numpy tous les paquets qui serons descendu dans le répertoire serons potentiellement de deux types:
- whl (wheel)
- tar.gz

Dans le cas ou nous avons un *__tar.gz__* il faut générer un *__whl__*, pour cela il faut untar l'archive se placer dans le répertoire et lancé la commande  
__python setup.py bdist_wheel__.

Une fois tous les fichiers __Wheel__ générés, il faut dézipper les fichiers avec __unzip package.whl__ .

__ATTENTION !!!__ : certain package nécessitent des fichiers système inhérent à l'architecture cible qui ne peuvent malheureusement pas être portés par le package, cela nécessite l'intervention d'un admin afin d'installer ces dépendances.

Une fois l'environnement crée, mettre ce répertoire sur HDFS par exemple :

__hdfs dfs -put /tmp/envs/py27PySpark/venv /applis/hadd/produits/anaconda2/envs/py27PySpark/venv__

Penser aux droits sur le répertoire !

##### *Remarques*:
Dans un contexte ou du PySpark est executer via jupyter, on est dans une configuration yarn client de ce faite le driver est l'edge sur laquelle vous avez lancez jupyter, vous pouvez tout aussi bien vous passer la partie mise sur HDFS et pointer directement sur le répertoire __/applis/hadd/produits/anaconda2/envs/py27PySpark/venv__ (par exemple).

Par contre si vous lancez votre job via __*spark-submit*__ en yarn cluster le driver n'étant pas l'edge d'exécution, le passage par HDFS est indispensable pour l'instant. Effectivement à terme nous souhaitons faire passer l'environnement dans une archive que nous soumettrons via une option de configuration Spark *--archive*, mais cette méthode n'a pas encore été éprouvée.





In [None]:
# My basic imports
import pandas as pd
import numpy as np
import os
import sys
%matplotlib inline
import matplotlib.pyplot as plt

In [2]:
ENV_DIR="/tmp/envs/py27PySpark/venv"

In [3]:
def list_path_names(path):
    from pyspark.context import SparkContext
    """List files and directories in an HDFS path

    Args:
        path (str): HDFS path to directory

    Returns:
        [str]: list of file/directory names
    """
    sc = SparkContext.getOrCreate()
    # low-level access to hdfs driver
    hadoop = sc._gateway.jvm.org.apache.hadoop
    path = hadoop.fs.Path(path)
    config = hadoop.conf.Configuration()

    status = hadoop.fs.FileSystem.get(config).listStatus(path)
    return (path_status.getPath().getName() for path_status in status)

In [4]:
def distribute_hdfs_files(hdfs_path):
    from pyspark.context import SparkContext
    """Distributes recursively a given directory in HDFS to Spark

    Args:
        hdfs_path (str): path to directory
    """
    sc = SparkContext.getOrCreate()

    for path_name in list_path_names(hdfs_path):
        path = os.path.join(hdfs_path, path_name)
        print("Distributing {}...".format(path))
        sc.addFile(path, recursive=True)

In [5]:
def init():

    plt.style.use('ggplot')
    spark_home = "/usr/hdp/current/spark2-client"
    sys.path.insert(0, spark_home + "/python")
    sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.4-src.zip'))
    filename = os.path.join(spark_home, 'python/pyspark/shell.py')  
    return filename

In [6]:
def app(spark):
    from pyspark.context import SparkContext
    from pyspark.sql.functions import udf
    import pyspark.sql.types as T
    from pyspark.sql import Row
    from pyspark.sql.types import FloatType
    
    sc = SparkContext.getOrCreate()
    
    lines =  sc.textFile("hdfs:///tmp/testFilePySpark.txt")
    parts = lines.map(lambda l: l.split(" "))
    p = parts.map(lambda p: Row(name=p[0],value=float(p[1])))
    df = spark.createDataFrame(p)
    udf_var = udf(lambda x: round(np.var(np.array(x)),10), returnType=FloatType()) 
    df = df.withColumn('WF_Var',udf_var('value'))
    print(df.show())

In [7]:
def main():
    
    filename = init()
    
    from pyspark.sql import SparkSession
    from pyspark.context import SparkContext
    spark = SparkSession.\
        builder.\
        config("spark.app.name","testest").\
        config("spark.master","yarn").\
        config("spark.executor.memory","2G").\
        config("spark.executor.instances","2").\
        config("spark.driver.memory","2G").\
        config("spark.executor.cores", "2"). \
        config("spark.driver.maxResultSize", "1G").\
        config("spark.executor.extraJavaOptions","-XX:+UseG1GC").\
        getOrCreate()
    exec(compile(open(filename, "rb").read(), filename, 'exec'))
     
    env_dir = 'hdfs://' + ENV_DIR
    # make sure we have the latest version available on HDFS
    distribute_hdfs_files(env_dir)
    
    
    app(spark)

In [8]:
def run():
     main()

In [9]:
run()

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.2.0.2.6.4.0-91
      /_/

Using Python version 2.7.13 (default, Dec 20 2016 23:09:15)
SparkSession available as 'spark'.
Distributing hdfs:///tmp/envs/py27PySpark/venv/dateutil...
Distributing hdfs:///tmp/envs/py27PySpark/venv/numpy...
Distributing hdfs:///tmp/envs/py27PySpark/venv/numpy-1.12.1-cp27-cp27mu-manylinux1_x86_64.whl...
Distributing hdfs:///tmp/envs/py27PySpark/venv/numpy-1.12.1.data...
Distributing hdfs:///tmp/envs/py27PySpark/venv/numpy-1.12.1.dist-info...
Distributing hdfs:///tmp/envs/py27PySpark/venv/pandas...
Distributing hdfs:///tmp/envs/py27PySpark/venv/pandas-0.20.3-cp27-cp27mu-linux_x86_64.whl...
Distributing hdfs:///tmp/envs/py27PySpark/venv/pandas-0.20.3.dist-info...
Distributing hdfs:///tmp/envs/py27PySpark/venv/python_dateutil-2.6.0-py2.py3-none-any.whl...
Distributing hdfs:///tmp/envs/py27PySpark/venv/python_dateutil-2.6.0.dist