##### Título: Conexion Spark con MongoDB
##### Autor: Dr. Gabriel Guerrero, saxsa2000@gmail.com
##### Fecha: 20190805
##### Referencia: gg20190805_spark_mongo



### Spark conexión con MongoDB

En este cuaderno jupyter muestra como leer una colección existente de mongodb con SparkSQL.

El mecanismo tiene varios pasos. 

El primer paso es leer una colección de mongoDB y convertirla a un DataFrame de SparkSQL.

El segundo paso es utilizar el DataFrame de Spark y ejecutar los  metódos del dataframe que se requieran en el procesamiento.

El tercer paso es guardar el resultado obtenido en un nuevo DataFrame de Spark.

En el cuarto paso se almacena el DataFrame en una coleccion mongodb.

Para facilitar el acceso con Jupyter al cluster Spark, utilizamos livy server

livy server es un servidor tipo REST para Spark

![mongo_spark](spark_mongo.jpg)

##### Comandos mágicos de Spark Sparkmagic 

Utilizamos comandos magicos de Spark Sparkmagic ("%%") para ejecutar enunciados del shell de linux

Sparkmagic es un conjunto de herramientas para el trabajo interactivo con clusters remotos de Spark por medio del servidor Livy, que es un servidor de tipo REST server para Spark, trabajando por medio de cuadernos Jupyter. 

Caracteristicas comandos mágicos de Spark (Sparkmagic) 

Run Spark code in multiple languages against any remote Spark cluster through Livy

Automatic SparkContext (sc) and HiveContext (sqlContext) creation

Easily execute SparkSQL queries with the %%sql magic

Automatic visualization of SQL queries in the PySpark, Spark and SparkR kernels; use an easy visual interface to interactively construct visualizations, no code required

Easy access to Spark application information and logs (%%info magic)
    
Ability to capture the output of SQL queries as Pandas dataframes to interact with other Python libraries (e.g. matplotlib)



##### ¿Por qué utilizar  livy + sparkmagic?

sparkmagic es un cliente del servidor livy que se utiliza en un cuaderno Jupyter.

Se tiene una arquitectura en donde ejecutamos los enunciados utilizando un cuaderno Jupyter

Cuando se escribe un código Spark en nuestro cliente local Jupyter, el programa sparkmagic lo manda ejecutar al cluster Spark por medio del servidor livy



Using sparkmagic + Jupyter notebook, data scientists can use Spark from their own Jupyter notebook, which is running on their localhost.

We don’t need any Spark configuration getting from the Spark cluster.

So we can execute Spark job in a cluster like running on a local machine.




![livy_spark](livyServer_Spark.png)

##### Análisis de procesos java por medio del enunciado jps

Analizamos los procesos java por medio del enunciado jps

In [1]:
%%bash

jps

11363 SecondaryNameNode
11539 Master
11044 NameNode
18710 Jps
11612 Worker
11725 LivyServer
11167 DataNode


#### Iniciar servicios Hadoop HDFS en caso de no estar activos

En este ejercicio se utiliza Hadoop HDFS, por lo que requerimos iniciar el servicio si este no esta activo


In [2]:
%%bash

start-dfs.sh

Starting namenodes on [hostsaxsa]
hostsaxsa: namenode running as process 11044. Stop it first.
hostsaxsa: datanode running as process 11167. Stop it first.
Starting secondary namenodes [0.0.0.0]
0.0.0.0: secondarynamenode running as process 11363. Stop it first.


#### Iniciar servicios Spark en caso de no estar activos

En este ejercicio se utiliza Spark, por lo que requerimos iniciar el servicio si este no esta activo

In [3]:
%%bash 

start-spark.sh

org.apache.spark.deploy.master.Master running as process 11539.  Stop it first.
hostsaxsa: org.apache.spark.deploy.worker.Worker running as process 11612.  Stop it first.


##### Análisis Status mongoDB

En caso de no estar activo el servidor MongoDB, debemos iniciarlo

In [4]:
%%bash

systemctl status mongod


● mongod.service - High-performance, schema-free document-oriented database
   Loaded: loaded (/usr/lib/systemd/system/mongod.service; enabled; vendor preset: disabled)
   Active: active (running) since lun 2019-08-05 13:17:26 CDT; 8h ago
     Docs: https://docs.mongodb.org/manual
  Process: 1169 ExecStart=/usr/bin/mongod $OPTIONS (code=exited, status=0/SUCCESS)
  Process: 1122 ExecStartPre=/usr/bin/chmod 0755 /var/run/mongodb (code=exited, status=0/SUCCESS)
  Process: 1098 ExecStartPre=/usr/bin/chown mongod:mongod /var/run/mongodb (code=exited, status=0/SUCCESS)
  Process: 1081 ExecStartPre=/usr/bin/mkdir -p /var/run/mongodb (code=exited, status=0/SUCCESS)
 Main PID: 1496 (mongod)
    Tasks: 23
   CGroup: /system.slice/mongod.service
           └─1496 /usr/bin/mongod -f /etc/mongod.conf


#### Verificación del contenido de la base en mongoDB que deseamos consultar

=== Ejecutar mongodb en una terminal Linux ===

mongo

=== Una vez iniciado el cliente MongoDB, mostrar bases en mongodb ===

show dbs

=== Usar la base de datos llamada bimbodb ===

use bimbodb

=== Mostrar colecciones de la base de datos ===

show collections

=== Mostrar el contenido de las colecciones ===

db.cliente.find()

db.producto.find()

db.town_state.find().pretty()

db.train.find().pretty()

=== Salir de mongodb ===

exit

livy’s Default port number is 8998

In [5]:
%%bash

curl localhost:8998/sessions

{"from":0,"total":0,"sessions":[]}

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100    34  100    34    0     0   3777      0 --:--:-- --:--:-- --:--:--  3777


#### Análisis de datos almacenados en MongoDB de datos abiertos de BIMBO

Las fuentes de información son datos abiertos de BIMBO, los cuales se obtuvieron de un concurso de kaggle 

In [6]:
from pyspark.sql.functions import count

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,,pyspark3,idle,,,✔


SparkSession available as 'spark'.


In [7]:
from datetime import datetime
# current date and time
now = datetime.now()
timestamp = datetime.timestamp(now)
date_time = now.strftime("%Y%m%d%H%M%S")
timestamp
now
date_time 

'20190805221113'

 pyspark.sql.functions.count(col)

    Aggregate function: returns the number of items in a group.

Spark SQL also includes a data source that can read data from other databases using JDBC.

This is because the results are returned as a DataFrame and they can easily be processed in Spark SQL or joined with other data sources. 

To get started you will need to include the JDBC driver for your particular database 

To read from a collection called "cliente" en una base en MongoDB llamada "bimbodb", especificamos "bombodb.cliente" en la entrada de la opcion "URI option".

### Lectura de la base bimbodb y la coleccion cliente

In [8]:
df_cliente = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
    .option("spark.mongodb.input.uri", "mongodb://127.0.0.1/bimbodb.cliente?readPreference=primaryPreferred")\
    .option("spark.mongodb.output.uri", "mongodb://127.0.0.1/bimbodb.cliente").load()

In [9]:
df_cliente.printSchema()

root
 |-- Cliente_ID: integer (nullable = true)
 |-- NombreCliente: string (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)

In [10]:
df_cliente

DataFrame[Cliente_ID: int, NombreCliente: string, _id: struct<oid:string>]

In [11]:
df_cliente.select('Cliente_ID').count()

450000

In [12]:
df_cliente_sin_id = df_cliente.drop('_id')
df_cliente_sin_id.printSchema()



root
 |-- Cliente_ID: integer (nullable = true)
 |-- NombreCliente: string (nullable = true)

In [13]:
df_cliente_sin_id.head(5)

[Row(Cliente_ID=4386922, NombreCliente='NO IDENTIFICADO'), Row(Cliente_ID=1982904, NombreCliente='LOS PEQUES'), Row(Cliente_ID=1960682, NombreCliente='MARIA DEL ROSARIO NARANJO NERI'), Row(Cliente_ID=183569, NombreCliente='LARIOS'), Row(Cliente_ID=163632, NombreCliente='CASA ZAMUDIO')]

In [14]:
df_cliente_sin_id.show(5)

+----------+--------------------+
|Cliente_ID|       NombreCliente|
+----------+--------------------+
|   4386922|     NO IDENTIFICADO|
|   1982904|          LOS PEQUES|
|   1960682|MARIA DEL ROSARIO...|
|    183569|              LARIOS|
|    163632|        CASA ZAMUDIO|
+----------+--------------------+
only showing top 5 rows

In [15]:
df_cliente_sin_id.describe().show()

+-------+------------------+--------------------+
|summary|        Cliente_ID|       NombreCliente|
+-------+------------------+--------------------+
|  count|            450000|              450000|
|   mean|2349831.5673133335|   4103.354430379747|
| stddev| 1915668.265326387|  22887.275797071834|
|    min|                 2|056 THE AIRPORT M...|
|    max|          19988629|                ÑEKA|
+-------+------------------+--------------------+

In [16]:
df_cliente_sin_id.select('Cliente_ID').distinct().count()

448852

In [17]:
df_cliente_id_duplicados = df_cliente_sin_id.select('Cliente_ID').groupby('Cliente_ID')\
            .agg(count('Cliente_ID').alias('Apariciones'))\
            .sort('Apariciones', ascending=False)\
            .where("Apariciones != 1")

df_cliente_id_duplicados.count()

1148

In [18]:
df_cliente_id_duplicados.head(10)

[Row(Cliente_ID=6137009, Apariciones=2), Row(Cliente_ID=525200, Apariciones=2), Row(Cliente_ID=149688, Apariciones=2), Row(Cliente_ID=406654, Apariciones=2), Row(Cliente_ID=370466, Apariciones=2), Row(Cliente_ID=101541, Apariciones=2), Row(Cliente_ID=164940, Apariciones=2), Row(Cliente_ID=103066, Apariciones=2), Row(Cliente_ID=496802, Apariciones=2), Row(Cliente_ID=1453247, Apariciones=2)]

In [19]:
df_cliente_id_duplicados.show(10)

+----------+-----------+
|Cliente_ID|Apariciones|
+----------+-----------+
|   6137009|          2|
|    525200|          2|
|    149688|          2|
|    406654|          2|
|    370466|          2|
|    101541|          2|
|    164940|          2|
|    103066|          2|
|    496802|          2|
|   1453247|          2|
+----------+-----------+
only showing top 10 rows

In [20]:

df_cliente_sin_id.filter("Cliente_ID == 6137009").show(2, False)

+----------+-----------------------+
|Cliente_ID|NombreCliente          |
+----------+-----------------------+
|6137009   |F GDL S403 LAS HILAMAS |
|6137009   |F  GDL S403 LAS HILAMAS|
+----------+-----------------------+

In [21]:
df_cliente_sin_id.filter("Cliente_ID == 1453247").show(2, False)

+----------+------------------------+
|Cliente_ID|NombreCliente           |
+----------+------------------------+
|1453247   |NOVEDADES Y V  J  INGRID|
|1453247   |NOVEDADES Y V J INGRID  |
+----------+------------------------+

In [22]:
df_cliente_sin_id.createOrReplaceTempView('cliente')

In [23]:
%%sql
SELECT * FROM cliente WHERE NombreCliente LIKE '%MISCE%' limit 50

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

### Lectura de la base bimbodb y la coleccion producto

In [24]:
df_producto = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
    .option("spark.mongodb.input.uri", "mongodb://127.0.0.1/bimbodb.producto?readPreference=primaryPreferred")\
    .option("spark.mongodb.output.uri", "mongodb://127.0.0.1/bimbodb.producto").load()

In [25]:
df_producto.printSchema()

root
 |-- NombreProducto: string (nullable = true)
 |-- Producto_ID: integer (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)

In [26]:
df_producto_sin_id = df_producto.drop('_id')
df_producto_sin_id.describe().show()

+-------+--------------------+------------------+
|summary|      NombreProducto|       Producto_ID|
+-------+--------------------+------------------+
|  count|                2592|              2592|
|   mean|                null|32591.095679012345|
| stddev|                null|13004.091023722118|
|    min|100pct Whole Whea...|                 0|
|    max|Wonderbutter 680g...|             49997|
+-------+--------------------+------------------+

In [27]:
df_producto_sin_id.createOrReplaceTempView('producto')

In [28]:
%%sql
SELECT * FROM producto WHERE NombreProducto LIKE '%Pan%' limit 100

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [29]:
df_consulta = spark.sql("SELECT * FROM producto WHERE NombreProducto LIKE '%Pan%'")

In [30]:
df_consulta.count()

280

In [31]:
df_consulta.show()

+--------------------+-----------+
|      NombreProducto|Producto_ID|
+--------------------+-----------+
|Pan Multigrano Li...|         73|
|Pan Blanco 567g W...|         99|
|Super Pan Bco Ajo...|        100|
|Pan Multicereal 4...|        109|
|Pan Multigrano 68...|        713|
|Pan 100pct Integr...|        715|
|Pan Blanco Siluet...|        779|
|Panitos Chocolate...|        357|
|Pan Bolsa 2a 500g...|       1031|
|Pan 12 Granos 680...|       1039|
|Panque Marmol 255...|       1064|
|Pan Blanco Chico ...|       1109|
|Pan Blanco 2Pq 13...|       1111|
|Super Pan Blanco ...|       1112|
|Pan Blanco 680g B...|       1120|
|Pan Integral Gde ...|       1143|
|Pan Integral 480g...|       1144|
|Pan Integral 680g...|       1145|
|Pan Integral 675g...|       1146|
|Panque Pasas 255g...|       1230|
+--------------------+-----------+
only showing top 20 rows

##### Almacenamiento en una nueva colección en  mongoDB del  DataFrame resultado de la consulta

In [32]:
df_consulta.count()

280

In [33]:
date_time

'20190805221113'

In [34]:
CadenaConexionMongo_1 = "mongodb://127.0.0.1/bimbodb.coleccion_" + date_time + "?readPreference=primaryPreferred"

CadenaConexionMongo_2 = "mongodb://127.0.0.1/bimbodb.coleccion_" + date_time 


df_consulta.write.format("com.mongodb.spark.sql.DefaultSource")\
.mode("append")\
.option("spark.mongodb.input.uri", "mongodb://127.0.0.1/bimbodb.coleccion_nueva2?readPreference=primaryPreferred")\
.option("spark.mongodb.output.uri", "mongodb://127.0.0.1/bimbodb.coleccion_nueva2").save()

In [35]:
df_consulta.write.format("com.mongodb.spark.sql.DefaultSource")\
.mode("append")\
.option("spark.mongodb.input.uri", CadenaConexionMongo_1)\
.option("spark.mongodb.output.uri", CadenaConexionMongo_2).save()

#### Lectura de la collección creada

df_colleccion_nueva = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
    .option("spark.mongodb.input.uri", "mongodb://127.0.0.1/bimbodb.coleccion_nueva2?readPreference=primaryPreferred")\
    .option("spark.mongodb.output.uri", "mongodb://127.0.0.1/bimbodb.coleccion_nueva2").load()

In [36]:
df_colleccion_nueva = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
    .option("spark.mongodb.input.uri", CadenaConexionMongo_1)\
    .option("spark.mongodb.output.uri", CadenaConexionMongo_2).load()

In [37]:
df_colleccion_nueva.printSchema()

root
 |-- NombreProducto: string (nullable = true)
 |-- Producto_ID: integer (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)

In [38]:
df_colleccion_nueva.select('NombreProducto').count()

280

In [39]:
df_colleccion_nueva.show()

+--------------------+-----------+--------------------+
|      NombreProducto|Producto_ID|                 _id|
+--------------------+-----------+--------------------+
|Pan Multigrano Li...|         73|[5d48f07f2687dd4b...|
|Pan Blanco 567g W...|         99|[5d48f07f2687dd4b...|
|Super Pan Bco Ajo...|        100|[5d48f07f2687dd4b...|
|Pan Multicereal 4...|        109|[5d48f07f2687dd4b...|
|Pan Multigrano 68...|        713|[5d48f07f2687dd4b...|
|Pan 100pct Integr...|        715|[5d48f07f2687dd4b...|
|Pan Blanco Siluet...|        779|[5d48f07f2687dd4b...|
|Panitos Chocolate...|        357|[5d48f07f2687dd4b...|
|Pan Bolsa 2a 500g...|       1031|[5d48f07f2687dd4b...|
|Pan 12 Granos 680...|       1039|[5d48f07f2687dd4b...|
|Panque Marmol 255...|       1064|[5d48f07f2687dd4b...|
|Pan Blanco Chico ...|       1109|[5d48f07f2687dd4b...|
|Pan Blanco 2Pq 13...|       1111|[5d48f07f2687dd4b...|
|Super Pan Blanco ...|       1112|[5d48f07f2687dd4b...|
|Pan Blanco 680g B...|       1120|[5d48f07f2687d