## Notebook de ejemplo uso de pySpark

### Prerequisitos y consideraciones
- Instalar las extensiones de Python y Jupyter en Visual Studio Code.
- Tener el entorno Python configurado: 
    * Conda env con Python 3.7 (máxima compatible con pyspark:2.4) y librerías instaladas (en principio pyspark:2.4 y sus dependencias). 
    * Se encuentra disponible un entorno conda en la ruta */data/DWRM/pyenv/.conda/envs/spark24* del servidor *dpyserver02*.
- Activar el entorno en la terminal ejecutando: *conda activate spark24*. Si esto no funciona, chequear que existan las siguientes variables de entorno en el perfil de su usuario (archivos *$HOME/.profile* y *$HOME/.bash_profile*) y abrir una nueva terminal:
    ```
        export CONDA_PKGS_DIRS=/data/DWRM/pyenv/.conda/pkgs
        export CONDA_ENVS_PATH=/data/DWRM/pyenv/.conda/envs
    ```
- Tener el kernel de Jupyter instalado. Se instala ejecutando:
    ```
        python -m ipykernel install --user --name spark24 --display-name "Python_spark2.4_kernel"
    ```
- Abrir la notebook y elegir el kernel desde Visual Studio Code en la parte superior derecha según el display-name que se eligió en el paso anterior, en caso de no verlo .
- Para conectarse al clúster pedir previamente el ticket de Kerberos ejecutando el comando *kinit* desde la terminal del VS Code. Cuestiones a tener en cuenta: 
    * El comando por defecto los identifica con el ID de usuario de sistema operativo (legajo) y al pedirles la contraseña deberían ingresar la que corresponde a la cuenta de LDAP.
    * Los tickets (token) tienen un vencimiento, duran apróximadamente 24hs y luego de ese tiempo deberían generar uno nuevo con el comando *kinit*. Cada vez que hagan esto van a tener que reiniciar el kernel de la notebook para que lo reconozca.
    * **Extra**: Una opción alternativa para pedir el ticket es usar un archivo de clave encriptado o *keytab*, éste lo deberían actualizar cada vez que hagan un cambio de contraseña. Para crearlo o modificarlo:
        1. Abrir la consola de administración de keytabs:
        ```
            ktutil
        ```
        2. Agregamos la cuenta que vamos a usar para el keytab:
        ```
            addent -password -p {idUsuario}@BGCMZ.BANCOGALICIA.COM.AR -k 1 -e RC4-HMAC
        ```
        3. Al presionar enter nos va a pedir la contraseña:
        ```
            - enter password for username -
        ```
        4. Usamos la siguiente instrucción para guardar el keytab:
        ```
            wkt {idUsuario}.keytab
        ```
        > *Nota: si no especificamos una ruta se va a usar el directorio actual, por seguridad lo recomendable es que quede en el home de su usurio ($HOME), más allá de que el archivo se va a generar con permisos para que solo pueda leerse y modificarse con su cuenta.*
        5. Salimos de la consola:
        ```
            q
        ```
        6. Probamos de solicitar el ticket usando el keytab:
        ```
            kinit -kt $HOME/{idUsuario}.keytab {idUsuario}
        ```
- Para trabajar desde un script, recordar activar el entorno previamente a ejecutarlo y, si van a usar Hadoop, tener un ticket de Kerberos activo y prestarle atención, tanto a las variables de entorno, como a los parámetros que se le van a pasar a la sesión de Spark (mencionados a continuación).


### Importamos las librerias y paquetes a utilizar

In [1]:
import pyspark
from pyspark.sql import SparkSession
import os

### Ejecución en el cluster
Creando una sesión que utilice **yarn** como master, las ejecuciones las realizará el cluster Hadoop.

#### Definición de variables de entorno
Primero debemos definir las siguientes variables

In [2]:
os.environ['SPARK_HOME'] = '/data/DWRM/pyenv/spark/spark-2.4.0-bin-hadoop2.7'
os.environ['HADOOP_CONF_DIR'] = '/data/DWRM/pyenv/conf/hadoop'
os.environ['PATH'] = os.environ['SPARK_HOME'] + '/bin' + ':' + os.environ['PATH']
os.environ['JAVA_HOME'] = '/data/jdk1.8.0_172'
os.environ['HOSTALIASES'] = '/data/DWRM/pyenv/conf/hadoop/hosts'

#### Luego construimos la sesión

In [3]:
spark = SparkSession.builder \
.master('yarn') \
.appName('Test hive') \
.config('spark.hadoop.hive.exec.stagingdir', '/tmp/.hive-staging') \
.enableHiveSupport() \
.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


2022-08-24 11:44:11 WARN  Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
2022-08-24 11:44:11 WARN  Utils:66 - Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
2022-08-24 11:44:12 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2022-08-24 11:44:25 WARN  Client:66 - Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
2022-08-24 11:44:34 WARN  HiveConf:2753 - HiveConf of name hive.vectorized.use.checked.expressions does not exist
2022-08-24 11:44:34 WARN  HiveConf:2753 - HiveConf of name hive.strict.checks.no.partition.filter does not exist
2022-08-24 11:44:34 WARN  HiveConf:2753 - HiveConf of name hive.vectorized.use.vector.serde.deserialize does not exist
2022-08-24 11:44:34 WARN  HiveConf:2753 - HiveConf of name hive.strict.checks.orderby.no.limit does not exist
2022-08-24 11:44:34 WARN  HiveConf:

Se debe seleccionar BD

In [4]:
spark.sql('use academia_lnd').show()
spark.sql('show tables').show()

2022-08-24 11:47:13 WARN  HiveConf:2753 - HiveConf of name hive.vectorized.use.checked.expressions does not exist
2022-08-24 11:47:13 WARN  HiveConf:2753 - HiveConf of name hive.strict.checks.no.partition.filter does not exist
2022-08-24 11:47:13 WARN  HiveConf:2753 - HiveConf of name hive.vectorized.use.vector.serde.deserialize does not exist
2022-08-24 11:47:13 WARN  HiveConf:2753 - HiveConf of name hive.strict.checks.orderby.no.limit does not exist
2022-08-24 11:47:13 WARN  HiveConf:2753 - HiveConf of name hive.vectorized.adaptor.usage.mode does not exist
2022-08-24 11:47:13 WARN  HiveConf:2753 - HiveConf of name hive.vectorized.use.vectorized.input.format does not exist
2022-08-24 11:47:13 WARN  HiveConf:2753 - HiveConf of name hive.vectorized.input.format.excludes does not exist
2022-08-24 11:47:13 WARN  HiveConf:2753 - HiveConf of name hive.strict.checks.bucketing does not exist
2022-08-24 11:47:13 WARN  HiveConf:2753 - HiveConf of name hive.strict.checks.type.safety does not exi

se cargan las tablas en Dataframe

In [5]:
fact_df = spark.table("academia_lnd.l0507199_fact")
productos_df = spark.table("academia_lnd.l0507199_productos")
sucursales_df = spark.table("academia_lnd.l0507199_sucursales")
empleados_df = spark.table("academia_lnd.l0507199_empleados")
fact_df.show(10)
productos_df.show(10)
sucursales_df.show(10)
empleados_df.show(10)

                                                                                

+----------+----+--------+--------+
|     fecha| sku|vendedor|cantidad|
+----------+----+--------+--------+
| timestamp|null|    null|    null|
| 11/1/2019|  63|      42|       5|
| 16/4/2019| 126|      32|       2|
|  8/3/2019|   6|      57|       8|
| 11/4/2019|  72|      41|       2|
|31/12/2019|  37|      50|       9|
|  2/7/2019| 117|      37|       6|
| 25/9/2019|  44|      31|       6|
| 28/5/2019|  65|      21|       6|
| 22/9/2019|  36|      17|       6|
+----------+----+--------+--------+
only showing top 10 rows



                                                                                

+-----------+-------+----------+---------------+
|id_producto|familia|    nombre|precio_unitario|
+-----------+-------+----------+---------------+
|       null|familia|    nombre|           null|
|          1|  Leche|     Leche|             62|
|          2|  Leche|      Nido|            177|
|          3|  Leche|      Klim|             36|
|          4|  Leche|   Nan Pro|            156|
|          5|  Leche| Nestogeno|            134|
|          6|  Leche|La lechera|             68|
|          7|  Leche| Canprolac|             36|
|          8|  Leche|Sativa 2,3|             22|
|          9|  Leche|    Nidina|            247|
+-----------+-------+----------+---------------+
only showing top 10 rows

+-----------+--------------------+--------------------+
|id_sucursal|              nombre|                tipo|
+-----------+--------------------+--------------------+
|       null|              nombre|                tipo|
|          1|      Éxito Aranjuez|        Supermercado|
|       

se tranforman Dataframe a vistas temporales

In [6]:
empleados_df.createOrReplaceTempView('empleados')
fact_df.createOrReplaceTempView('fact')
productos_df.createOrReplaceTempView('productos')
sucursales_df.createOrReplaceTempView('sucursales')


##### Se generan tablones para reporteria

In [7]:
Tablon1_df = spark.sql('''select a.familia, SUM(b.cantidad * a.precio_unitario) AS Monto_Facturado from productos a inner join fact b on a.id_producto = b.sku group by a.familia''')

In [8]:
Tablon1_df.show(10)

                                                                                

+--------------------+---------------+
|             familia|Monto_Facturado|
+--------------------+---------------+
|             Helados|        6809166|
|            Papillas|        3044895|
|               Leche|        8478184|
|            Cereales|       15703271|
|                Café|        5799239|
|Barritas de Cereales|        3302855|
|          Culinarios|        2989699|
|          Chocolates|       13582196|
|Comidas para anim...|        4070014|
|       Otras bebidas|        6275111|
+--------------------+---------------+



In [9]:
Tablon2_df = spark.sql('''SELECT 
a.sku as id_producto,
a.fecha as cd_periodo,
b.nombre as nombre_producto,
b.precio_unitario,
c.id_sucursal,
c.nombre as nombre_sucursal,
sum(a.cantidad) as total_ventas
FROM fact a
join productos b on a.sku=b.id_producto
join empleados d on a.vendedor=d.id_vendedor
join sucursales c on d.sucursal=c.id_sucursal
group by a.sku, a.fecha, b.nombre, b.precio_unitario, c.id_sucursal, c.nombre''')

In [10]:
Tablon2_df.show(10)



+-----------+----------+------------------+---------------+-----------+----------------+------------+
|id_producto|cd_periodo|   nombre_producto|precio_unitario|id_sucursal| nombre_sucursal|total_ventas|
+-----------+----------+------------------+---------------+-----------+----------------+------------+
|         84| 24/2/2019|          Pela-Pop|             98|          3|     Éxito Belén|           4|
|         88|15/11/2019|             Maggi|            162|          1|  Éxito Aranjuez|           4|
|         95| 12/3/2019|    Nestlé Postres|            216|          8|  Éxito Gran Vía|           1|
|         46| 10/7/2019|   Zucosos (Chile)|            174|         14|     Éxito La 33|           9|
|         91| 23/2/2019|           Litoral|            216|         11|Éxito La Central|           1|
|         94|18/10/2019|  Nestlé Extrafino|             65|          6|  Éxito Envigado|           8|
|         28|21/12/2019|Fitness Choc White|            176|          1|  Éxito Ara

                                                                                

##### Guardamos el resultado en HDFS sobrescribiendo los datos
Al guardar un DataFrame podemos usar distintos [modos](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes) de escritura (errorifexists, append, overwrite e ignore), formatos (los más conocidos: json, parquet, orc, csv y text) y [opciones extra](https://dbmstutorials.com/pyspark/spark-read-write-dataframe-options.html).

In [14]:
Tablon1_df.coalesce(1).write.mode("overwrite").csv("hdfs://GALICIAHADOOP/galicia/d/landing_files/academia_de/Squad_Financiera_Tablon1/")
Tablon2_df.coalesce(1).write.mode("overwrite").csv("hdfs://GALICIAHADOOP/galicia/d/landing_files/academia_de/Squad_Financiera_Tablon2/")

                                                                                

### Cerramos la sesión
Como estamos trabajando de manera interactiva, debemos cerrar explícitamente la sesión para no dejar el Job corriendo en YARN y liberar los recursos asignados

In [None]:
spark.stop()