# <b>Migración de MySQL a Cassandra empleando Spark</b>

# <b>MySQL</b>

Se va a partir de una base de datos en MySQL que contendrá 3 tablas: una tabla llamada <b>sucursales</b>, que contendrá información relativa a cada oficina, otra tabla <b>empleados</b> en la que se almacenará información de cada trabajador y una tabla <b>op_clientes</b> donde cada línea hará referencia a una operación realizada por un cliente y que es atendida por un empleado concreto en una oficina determinada.

### <b>Instalación</b>

Abir una terminal y ejecutar:

<b><i>$ sudo apt-get install mysql-server</i></b>

Durante la instalación se pedirá introducir la contraseña del usuario root de MySQL. El siguiente comando permitirá establecer una serie de opciones de seguridad como validar la fortaleza de la contraseña de root, cambiar esa contraseña, eliminar el acceso remoto a dicho usuario, eliminar el acceso de usuarios anónimos, eliminar la base de datos de test, etc.

<b><i>$ sudo mysql_secure_installation utility</i></b>

Una vez hecho esto se arrancará el servicio de MySQL:

<b><i>$ sudo systemctl start mysql</i></b>

Para asegurarnos de que el servidor de MySQL se arranca al inicio de sesión ejecutaremos:

<b><i>$ sudo systemctl enable mysql</i></b>

Por defecto MySQL no estará escuchando en ningún interfaz de red que sea accesible remótamente. Para modificar esto habrá que editar el fichero <b><i>/etc/mysql/mysql.conf.d/mysqld.cnf</i></b> añadiendo las siguientes líneas para permitir que el servidor de BBDD esté escuchando en más interfaces de red:

bind-address    = 127.0.0.1 (valor por defecto)<br><br>
bind-address    = xxx.xxx.xxx.xxx (dirección IP del interfaz público de red)<br><br>
bind-address    = 0.0.0.0 (escuchar en todos los interfaces de red)<br><br>

Se reiniciará el servicio de MySQL para que tengan efectos esos cambios.

<b><i>$ sudo systemctl restart mysql</i></b>

### <b>Shell de MySQL</b>

#### <b>Login</b>

Para acceder a la shell de MySQL escribir en la terminal:

<b><i>$ mysql -u root -p"datahack"</i></b>

Alternativamente se podría haber escrito:

<b><i>$ mysql -u root -p</i></b>

De forma que al ejecutarlo se nos pedirá introducir la contraseña del usuario root de MySQL (datahack). De cualquiera de las 2 formas podremos ver que cambiará el prompt indicando que nos encontramos dentro de la shell de MySQL:

<b><i>mysql> </i></b>

Para cambiar la contraseña del usuario root habrá que ejecutar:

<b><i>mysql> UPDATE mysql.user SET authentication_string = PASSWORD('santander') WHERE User = 'root';</i></b>

Para hacer efectivo el cambio habrá que ejecutar:

<b><i>mysql> FLUSH PRIVILEGES;</i></b>

Para salir de la shell de MySQL:

<b><i>mysql> exit</i></b>

#### <b>Creación de Base de Datos</b>

Crearemos una base de datos llamada <b>banco</b>:

<b><i>mysql> CREATE DATABASE banco;</i></b>

Listaremos todas las bases de datos:

<b><i>mysql> SHOW DATABASES;</i></b>

#### <b>Usuarios de MySQL</b>

La información de los usuarios de MySQL se almacena en la tabla <b>user</b> de la base de datos <b>mysql</b>. Se pueden consultar ejecutando:

<b><i>mysql> SELECT User, Host, Authentication_string FROM mysql.user;</i></b>

Cada usuario estará asociado a un host desde el que se conecta al servidor de BBDD. Si el cliente desde el que se conecta al servidor de MySQL está en la misma máquina que di ho servidor, el host será localhost. En caso de que se conecte de forma remota, el campo host contendrá la IP o el DNS de la máquina remota desde la que se realiza la conexión.

Un usuario anónimo será aquel en el que aparezca un host en dicha tabla user pero que no se disponga de user ni authentication_string para esa entrada. Este tipo de usuarios suponen un riesgo por lo que habrá que eliminarlo o asginale una contraseña haciendo referencia a él mediante ''.

Si se quiere crear un nuevo usuario al que llamaremos usu_test se puede hacer añadiéndolo a mysql.user:

<b><i>mysql> INSERT INTO mysql.user (User, Host, Authentication_string, ssl_cipher, x509_issuer, x509_subject) VALUES ('usu_test', 'localhost', PASSWORD('datahack'), '', '', '');</i></b>

Se forzará a que el servidor de BBDD lea los cambios realizados mediante:

<b><i>mysql> FLUSH PRIVILEGES;</i></b>

Comprobaremos que se ha creado correctamente:

<b><i>mysql> SELECT User, Host, Authentication_string FROM mysql.user;</i></b>

Asignaremos permisos totales a este usuario sobre la base de datos banco:

<b><i>mysql> GRANT ALL PRIVILEGES ON banco.* TO usu_test@localhost;</i></b>

Haremos efectivo este cambio:

<b><i>mysql> FLUSH PRIVILEGES;</i></b>

Comprobaremos los privilegios del usuario usu_test:

<b><i>mysql> SHOW GRANTS FOR 'usu_test'@'localhost';</i></b>

MySQL devuelve la lista de comandos necesarios para reproducir los permisos de dicho usuario en caso de que hubiera que partir desde el principio. Usage on \*.\* significa que el usuario no consigue privilegios por defecto. Esto será sobreescrito por la siguiente cláusula:

GRANT USAGE ON \*.\* TO 'usu_test'@'localhost'<br><br>
GRANT ALL PRIVILEGES ON `banco`.\* TO 'usu_test'@'localhost' 

### <b>Creación de tablas</b>

Como se comentó anteriormente, se van a crear 3 tablas dentro de la base de datos banco. A continuación se muestran las cláusulas SQL que habrá que ejecutar desde dentro de la shell de MySQL:

<b><i>mysql> USE banco;</i></b>

<b><i>mysql> CREATE TABLE sucursales (cod_s VARCHAR(8) PRIMARY KEY, direc VARCHAR(80));</i></b>

<b><i>mysql> CREATE TABLE empleados (cod_e VARCHAR(14) PRIMARY KEY, nombre_e VARCHAR(40), cargo VARCHAR(30));</i></b> 

<b><i>mysql> CREATE TABLE op_clientes (id_op INT NOT NULL PRIMARY KEY, nombre_c VARCHAR(40), tipo_op VARCHAR(40), cods VARCHAR(8), code VARCHAR(14), FOREIGN KEY fk_cods(cods) REFERENCES sucursales(cod_s), FOREIGN KEY fk_code(code) REFERENCES empleados(cod_e));</i></b>

Ahora vamos a insertar una serie de filas de datos en dichas tablas: 

<b><i>mysql> INSERT INTO sucursales (cod_s, direc) VALUES ('1A', 'Atocha 41');</i></b>

<b><i>mysql> INSERT INTO sucursales (cod_s, direc) VALUES ('2A', 'Arenal 56');</i></b> 

<b><i>mysql> INSERT INTO empleados (cod_e, nombre_e, cargo) VALUES ('A1', 'Alberto Salgado', 'contable');</i></b>

<b><i>mysql> INSERT INTO empleados (cod_e, nombre_e, cargo) VALUES ('A2', 'Silvia Bueno', 'gerente');</i></b>

<b><i>mysql> INSERT INTO op_clientes (id_op, nombre_c, tipo_op, cods, code) VALUES ('3', 'Carlos Mateos', 'transferencia bancaria', '1A', 'A2');</i></b>

<b><i>mysql> INSERT INTO op_clientes (id_op, nombre_c, tipo_op, cods, code) VALUES ('8', 'Sara Gallego', 'ingreso', '2A', 'A1');</i></b>

<b><i>mysql> INSERT INTO op_clientes (id_op, nombre_c, tipo_op, cods, code) VALUES ('9', 'Sara Gallego', 'Compra', '2A', 'A1');</i></b>

<b><i>mysql> INSERT INTO op_clientes (id_op, nombre_c, tipo_op, cods, code) VALUES ('4', 'Sara Gallego', 'ingreso', '2A', 'A1');</i></b>

<b><i>mysql> INSERT INTO op_clientes (id_op, nombre_c, tipo_op, cods, code) VALUES ('1', 'Roberto Morgado', 'ingreso', '1A', 'A2');</i></b>

# <b>Spark</b>

Spark dispone tanto de un driver para interactuar con MySQL como otro driver para trabajar con Cassandra. La idea es emplear Spark para leer el contenido de las tablas de MySQL, almacenar dicho contenido en unas estrucuturas de datos llamadas Dataframes y finalmente volcar su contenido en Cassandra.

### <b>Instalación</b>

Para el objetivo de mostrar cómo emplear Spark para hacer de intermediario entre MySQL y Cassandra se puede optar por descargar Apache Spark de https://spark.apache.org/downloads.html y realizar una ejecución standalone.

Otras posibilidad sería realizar el despliegue de un cluster de Spark para aprovechar realmente el paralelismo y las optimizaciones de ejecución que proporciona este framework.

Una vez descargado el fichero se descomprimirá y se establecerán una serie de variables de entorno editando con gedit el fichero que está en /home/osboxes/.bashrc:

<b><i>$ gedit .bashrc</i></b>

<i>\# Se añadirán al final del fichero las siguientes líneas y se guardarán los cambios<br><br>
export SPARK_HOME=/home/osboxes/Downloads/spark-2.4.4-bin-hadoop2.7/<br><br>
export PYTHONPATH=/usr/lib/python2.7/dist-packages/<br><br>
export PYTHONPATH=\\$SPARK_HOME/python:\\$PYTHONPATH<br><br>
export PYSPARK_DRIVER_PYTHON="jupyter"<br><br>
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"<br><br>
export PATH=\\$SPARK_HOME/bin:\\$SPARK_HOME/sbin:\\$PATH<br><br>
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64<br><br></i>

Para hacer efectivos los cambios se forzará a leer dicho fichero ejecutando en la terminal:

<b><i>$ source .bashrc</i></b>

Como vamos a acceder a MySQL necesitaremos descargar el driver correspondiente. Lo haremos ejecutando:

<b><i>$ wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.45.tar.gz</i></b>

Descomprimiremos dicho fichero y nos quedaremos con la ruta de <b><i>mysql-connector-java-5.1.45-bin.jar</i></b> ya que nos hará falta a la hora de configurar pyspark.

El paquete del conector de Spark con Cassandra se puede encontrar en https://github.com/datastax/spark-cassandra-connector/blob/master/doc/0_quick_start.md. La información que nos interesa es <b><i>datastax:spark-cassandra-connector:2.4.0-s_2.11</i></b>.

Anteriormente se estableció a través de las variables de entorno relativas a <b>PYSPARK_DRIVER_PYTHON</b> y <b>PYSPARK_DRIVER_PYTHON_OPTS</b> que el driver de pyspark emplease los notebooks de jupyter como intérprete. Además habrá que indicar, por ejemplo en una celda del notebook, mediante la variable de entorno <b>PYSPARK_SUBMIT_ARGS</b> los paquetes y/o jars que se sean necesarios para cada caso particular. En el nuestro pasaremos el paquete del conector de Cassandra con Spark (se descargará de internet) y se añadirá el jar del conector de MySQL con Spark. Por último, se añadirá a PYSPARK_SUBMIT_ARGS pyspark-shell.

Una librería interesante es <b>https://github.com/minrk/findspark</b>. Como pyspark no se añade al path del sistema (sys.path) automáticamente, mediante esta librería se puede añadir en tiempo de ejecución. Dispone del método init() que si no recibe ningún argumento tomará el valor de $SPARK_HOME.

In [None]:
import os
import findspark
findspark.init()

In [None]:
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages datastax:spark-cassandra-connector:2.4.0-s_2.11 --jars /home/osboxes/mysql-connector-java-5.1.45-bin.jar --conf spark.cassandra.connection.host='localhost' pyspark-shell"

### <b>Lectura de MySQL desde Spark</b>

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

# Creamos SparkContext y SQLContext
appName = "PySpark"
conf = SparkConf().setAppName(appName).set("spark.cassandra.auth.username", "cassandra").set("spark.cassandra.auth.password", "cassandra")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

In [None]:
props = {
    'user': 'root',
    'password': 'santander',
    'driver': 'com.mysql.jdbc.Driver'
}

En este punto se puede realizar la lectura desde MySQL empleando la siguiente sintaxis:

In [None]:
host = 'localhost'
db = 'banco'
tabla = 'sucursales'
url='jdbc:mysql://{}/{}'.format(host, db)

df_sucursales = sqlContext.read.jdbc(url=url, table=tabla, properties=props)

In [None]:
df_sucursales

In [None]:
df_sucursales.show()

O también alternativamente se puede emplear esta otra:

In [None]:
df_sucursales = sqlContext.read.format("jdbc").option("url", url).option("dbtable", tabla).option('user', 'root').option('password', 'santander').option('driver', 'com.mysql.jdbc.Driver')

In [None]:
df_sucursales

In [None]:
df_sucursales = df_sucursales.load()

In [None]:
df_sucursales

In [None]:
df_sucursales.show()

En vez de consultar la tabla entera también se pueden incluir filtros de forma que sólo se recuperarán aquellas filas que coincidan con ellos empleando en la option dbtable una subquery entre paréntesis.

In [None]:
df_sucursal_1A = sqlContext.read.format("jdbc").option("url", url).option("dbtable", "(select * from sucursales where cod_s='1A') as sucursales").option("user", "root").option("password", "santander").option("driver", "com.mysql.jdbc.Driver") 

In [None]:
df_sucursal_1A

In [None]:
df_sucursal_1A = df_sucursal_1A.load()

In [None]:
df_sucursal_1A

In [None]:
df_sucursal_1A.show()

De forma alternativa a la option dbtable se puede emplear la option query para recuperar una consulta:

In [None]:
df_sucursal_1A = sqlContext.read.format("jdbc").option("url", url).option("query", "select * from sucursales where cod_s='1A'").option("user", "root").option("password", "santander").option("driver", "com.mysql.jdbc.Driver")

In [None]:
df_sucursal_1A = df_sucursal_1A.load()

In [None]:
df_sucursal_1A.show()

También se puede registrar un Dataframe de Spark como una tabla temporal dentro de Spark que sólo existirá durante el tiempo de vida de sqlContext y emplear el método sql de la siguiente forma:

In [None]:
# Suponiendo que df_sucursales está creado
sqlContext.registerDataFrameAsTable(df_sucursales, "t_sucursales")

In [None]:
df_sucursal_2A = sqlContext.sql("select * from t_sucursales where cod_s='2A'")

In [None]:
df_sucursal_2A

In [None]:
df_sucursal_2A.collect()

Para eliminar la tabla creada en Spark: 

In [None]:
sqlContext.dropTempTable("t_sucursales")

Si se vuelve a intentar consultar la tabla t_sucursales se lanazará una excepción indicando que no existe:

In [None]:
df_sucursal_2A = sqlContext.sql("select * from t_sucursales where cod_s='2A'")

### <b>Cache en Spark</b>

En caso de que el contenido de un mismo DataFrame se vaya a utilizar varias veces en el futuro se puede pensar en cachearlo. Hay que remarcar que las transformaciones en Spark son lazy y no se ejecutan realmente hasta que se ejecuta alguna acción sobre ellas. 

Si no se cachea un DataFrame será reevaluado cada vez que se aplique una acción sobre él.

Si un DataFrame invoca al método <b>cache()</b> es como si llamara al método <b>persist()</b> con el storageLevel a <b>MEMORY_AND_DISK</b>. Hay que tener presente que también son lazy por lo que hasta que si se cachea un DataFrame, dicha operación no será llevada a cabo hasta que no se ejecute una acción posteriormente sobre dicho DataFrame.

Para comprobar si un DataFrame está cacheado se puede invocar a su propiedad <b>storageLevel</b> que devolverá 5 valores correspondientes a StorageLevel(java_storage_level.useDisk(), java_storage_level.useMemory(), java_storage_level.useOffHeap(), java_storage_level.deserialized(), java_storage_level.replication()):

In [None]:
df_sucursales.storageLevel

Si ahora establecemos con el método persist() el nivel de MEMORY_AND_DISK (que sería equivalente a llamar directamente a df_sucursales.cache()) con replicación de las particiones del DataFrame 1:

In [None]:
import pyspark
df_sucursales.persist(storageLevel=pyspark.StorageLevel(True, True, False, False, 1))

In [None]:
df_sucursales.storageLevel

Si se quiere quitar de la cache a un DataFrame habrá que emplear el método <b>unpersist()</b>:

In [None]:
df_sucursales.unpersist()

Comprobaremos el estado de este DataFrame:

In [None]:
df_sucursales.storageLevel

### <b>Join en Spark</b>

Una herramienta que emplearemos a la hora de realizar la migración de MySQL a Cassandra serán los joins y emplearemos Spark como intermediario para realizarlos de forma distribuida en caso de contar con un cluster con múltiples workers.

Para hacer un recorrido por distintos tipos de joins vamos a definir 2 DataFrames de Spark:

In [None]:
dataf1 = sqlContext.createDataFrame([("sd3", "Alberto"), ("cv4", "Sara"), ("gw7", "Emilio"), ("sr6", "Lara")], ["cod", "nombre"])
dataf2 = sqlContext.createDataFrame([("cv4", 27), ("gw7", 32), ("sd3", 34), ("sd4", 28)], ["cod", "edad"])

Asignaremos un alias a cada uno de esos 2 dataframes:

In [None]:
d1 = dataf1.alias('d_1')
d2 = dataf2.alias('d_2')

El método join de Spark lo invoca un DataFrame (cuya posición a la hora de realizar la operación será el DF de la izquierda) y dicho método recibe 3 parámetros: el DF de la derecha, los campos que se emplean para buscar la coincidencia y el tipo de join. 

#### <b>Inner Join</b>

Este tipo de join devuelve las filas en las que el valor del campo de búsqueda existe en ambas tablas. Se trata del join por defecto.

In [None]:
df_j = d1.join(d2, d1.cod == d2.cod)

El mismo resultado se habría conseguido escrbiendo:

In [None]:
df_j = d1.join(d2, d1.cod == d2.cod, how="inner")

Esa línea de código sería equivalente a escribir en SQL:<br><br><b><i>SELECT * FROM d1 INNER JOIN d2 ON d1.cod = d2.cod;</i></b>

In [None]:
df_j.show()

Si posteriormente se quieren realizar filtros sobre el resultado del join se podrán emplear la transformación <b>filter</b> y a continuación la función <b>col</b> pudiendo hacer uso de los alias de las tablas para los campos que se llamen igual en ambas tablas del join:

In [None]:
from pyspark.sql.functions import col

In [None]:
res = df_j.filter(col('edad') > 27)

In [None]:
res

In [None]:
res.show()

Empleo del alias:

In [None]:
res2 = df_j.filter(col('d_1.cod') == 'sd3')

In [None]:
res2.show()

#### <b>Left Join</b>

El Left Join devuelve todos todos los registros de la tabla izquierda y aquellos registros de la tabla derecha en los que el valor del campo de búsqueda está presente tanto en la tabla izquierda como en la derecha. Aquellos casos en los que el valor del campo de búsqueda en la tabla izquierda no esté en la tabla derecha, los campos de la tabla derecha para dichas filas tomarán el valor NULL.

In [None]:
df_lj = d1.join(d2, d1.cod == d2.cod, 'left')

In [None]:
df_lj.show()

Es equivalente a escribir en SQL:<br><br><b><i>SELECT * FROM d1 LEFT JOIN d2 ON d1.cod = d2.cod;</i></b>

También se podría haber indicado en <b>how='left_outer'</b> obteniendo el mismo resultado:

In [None]:
df_loj = d1.join(d2, d1.cod == d2.cod, 'left_outer')

In [None]:
df_loj.show()

#### <b>Right Join</b>

El Right Join devuelve todos los registros de la tabla derecha y aquellos registros de la tabla izquierda en los que el valor del campo de búsqueda esté presente tanto en la tabla izquierda como en la derecha. Aquellos casos en los que el valor del campo de búsqueda en la tabla derecha no esté en la tabla izquierda, los campos de la tabla izquierda para dichas filas tomarán el valor NULL.

In [None]:
df_rj = d1.join(d2, d1.cod == d2.cod, 'right')

In [None]:
df_rj.show()

Es equivalente a escribir en SQL:

<b><i>SELECT * FROM d1 RIGHT JOIN d2 ON d1.cod = d2.cod;</i></b>

También se podría haber indicado en <b>how='right_outer'</b> obteniendo el mismo resultado:

In [None]:
df_roj = d1.join(d2, d1.cod == d2.cod, 'right_outer')

In [None]:
df_roj.show()

#### <b>Full Outer Join</b>

Full Outer Join combinará el left join con el right join devolviendo un resultado que es una combinación de ambos.

In [None]:
df_fj = d1.join(d2, d1.cod == d2.cod, 'outer')

In [None]:
df_fj.show()

Se obtendría el mismo resultado si <b>how='full'</b>:

In [None]:
df_oj = d1.join(d2, d1.cod == d2.cod, 'full')

In [None]:
df_oj.show()

#### <b>Left Semi Join</b>

Devuelve los registros del DataFrame izquierdo para aquellas filas en las que el valor de los campos de búsuqeda coinciden en el DF izquierdo y el derecho.

In [None]:
df_lsj = d1.join(d2, d1.cod == d2.cod, 'leftsemi')

In [None]:
df_lsj.show()

#### <b>Cross Join</b>

Devuelve el producto cartesiano del DF izquierdo con el DF de la derecha.

In [None]:
df_cj = d1.crossJoin(d2)

In [None]:
df_cj.show()

#### <b>Join empleando varios campos de búsqueda</b>

Se pueden emplear varios campos para construir la condición de join agrupándolos entre paréntesis y empleando operadores lógicos (&, |):

In [None]:
df_a = sqlContext.createDataFrame([("i23", "Rosa", "r@a.com"), ("i24", "Julio", "j@a.com"), ("i25", "Asier", "a@a.com"), ("i26", "Iker", "i@a.com")], ["cod", "nombre", "mail"])
df_b = sqlContext.createDataFrame([("i29", "Manuel", 27), ("i23", "Rosa", 26), ("i25", "Asier", 27)], ["cod", "nombre", "edad"])

In [None]:
df_j = df_a.join(df_b, (df_a.cod == df_b.cod) & (df_a.nombre == df_b.nombre), 'inner')

In [None]:
df_j.show()

### <b>Escritura en MySQL desde Spark</b>

Si quisieramos realizar alguna escritura en MySQL (aunque este no es el propósito de este ejercicio):

El modo de escritura puede tomar los siguientes valores:<br>
* error: es el valor por defecto y devuelve una excepción en caso de que los datos que se quieran escribir ya existen.
* append: concatena el contenido del Dataframe con los datos existentes en la tabla de destino.
* overwrite: sobreescribe los datos existentes.
* ignore: en caso de que los datos que se quieran escribir ya existien no realizará esta operación sin indicar ninguna excepción.

In [None]:
modo = 'append'
host = 'localhost'
db = 'banco'
tabla = 'sucursales'
url='jdbc:mysql://{}/{}'.format(host, db)
props = {
    'user': 'root',
    'password': 'santander',
    'driver': 'com.mysql.jdbc.Driver'
}

In [None]:
from pyspark.sql import Row

In [None]:
ls = [('3A', 'Legazpi 60'), ('4A', 'Goya 21'), ('5A', 'Castellana 34')]
rdd = sc.parallelize(ls)
sucus = rdd.map(lambda x: Row(cod_s=x[0], direc=x[1]))
df_sucus = sqlContext.createDataFrame(sucus)
df_sucus.write.jdbc(url=url, table=tabla, mode=modo, properties=props)

In [None]:
df_sucus.show()

In [None]:
df_sucursales = sqlContext.read.jdbc(url=url, table=tabla, properties=props)

In [None]:
df_sucursales.show()

### <b>Crear Modelo de Datos de Cassandra </b>

Cuando se va a realizar una migración de una base de datos relacional como MySQL hacia un motor NoSQL como Cassandra hay que pasar de pensar en cómo almacenar la información en forma de tablas normalizadas (MySQL) a pregutarse <b>qué tipo de consultas son las que se van a realizar en Cassandra</b> para determinar qué estructura es la que deberá tener para que sus consultas sean más eficientes.

#### <b>Caso 1</b>

Si por ejemplo una consulta que será necesaria realizar se corresponde con información contenida en una única tabla de MySQL, como el contenido de la tabla sucursales de MySQL empleando como partiton key de Cassandra la primary key cod_s de esa tabla en MySQL, el proceso de migración sería directo. 

El primer paso sería la creación en CQLSH de un <b>keyspace</b> llamado <b>banco_c</b> y de una <b>tabla sucursales_c</b> para albergar la información para este tipo de consulta. Nos iremos a la terminal de Linux y ejecutaremos lo siguiente:

<b><i>$ cqlsh -u cassandra -p cassandra</i></b>

<b><i>cassandra@cqlsh> create keyspace banco_c with replication={'class':'SimpleStrategy', 'replication_factor':1};<br><br>
cassandra@cqlsh> use banco_c;<br><br>
cassandra@cqlsh:banco_c> create table sucursales_c(cod_s text, direc text, primary key (cod_s));</i></b>

El siguiente paso será seguir analizando qué otros tipos de consultas interesa poder realizar a Cassandra. 

#### <b>Caso 2</b>

Una consulta de interés puede ser conocer el número de operaciones atendidas por cada empleado. Para alojar esta información se creará una <b>tabla</b> llamada <b>num_ops_por_empleado</b> en Cassandra que tendrá la siguiente estructura:

<b><i>cassandra@cqlsh:banco_c> create table num_ops_por_empleado(cod_e text, nombre_e text, cargo text, num_ops_emp int, primary key(cod_e, nombre_e));</i></b>

#### <b>Caso 3</b>

Otra consulta que se quiere poder realizar a Cassandra consiste en saber información de los clientes atendidos por cada empleado en cada sucursal. La tabla a crear recibirá el nombre de <b>clientes_por_emp_y_sucur</b> será la siguiente:

<b><i>cassandra@cqlsh:banco_c> create table clientes_por_emp_y_sucur(cod_e text, cod_s text, nombre_e text, direc text, nombre_c text, primary key((cod_e, cod_s), nombre_c));</i></b>

### <b>Mover datos de MySQL a Cassandra empleando Spark</b>

#### <b>Caso 1</b>

Una vez que ya se disponen de las tablas en Cassandra, de forma que cada una de ellas albergará los datos para satisfacer uno de los tipos de consulta de interés, se puede comenzar volcando el DataFrame de Spark df_sucursales (que contiene las filas de la tabla sucursales de MySQL) a la tabla <b>sucursales_c</b> de Cassandra: 

In [None]:
df_sucursales.write\
    .format("org.apache.spark.sql.cassandra")\
    .mode('append')\
    .options(table="sucursales_c", keyspace="banco_c")\
    .save()

Se puede consultar en la shell de Cassandra el contenido de sucursales_c:

<b><i>cassandra@cqlsh:banco_c> select * from sucursales_c;</i></b>

#### <b>Caso 2</b>

A continuación vamos a ver cómo realizar la migración de los datos necesarios de MySQL para construir la tabla de Cassandra <b>num_ops_por_empleado</b> que permitirá por ejemplo responder a la consulta del <b>número de operaciones atendidas por cada empleado</b>.

Comenzaremos cargando en un DataFrame de Spark llamado <b>df_op_clientes</b> el contenido de la tabla de MySQL op_clientes:

In [None]:
df_op_clientes = sqlContext.read.format("jdbc").option("url", url).option("dbtable", "op_clientes").option("user", "root").option("password", "santander").option("driver", "com.mysql.jdbc.Driver") 

In [None]:
df_op_clientes = df_op_clientes.load()

In [None]:
df_op_clientes.show()

In [None]:
from pyspark.sql import functions as F

In [None]:
df_count_ops_empleado = df_op_clientes.groupBy("code").agg(F.count(df_op_clientes.code).alias("num_ops_emp"))

In [None]:
df_count_ops_empleado.show()

Ahora vamos a traer a un DataFrame de Spark llamado <b>df_empleados</b> el contenido de la tabla de MySQL empleados:

In [None]:
df_empleados = sqlContext.read.format("jdbc").option("url", url).option("dbtable", "empleados").option("user", "root").option("password", "santander").option("driver", "com.mysql.jdbc.Driver")

In [None]:
df_empleados = df_empleados.load()

In [None]:
df_empleados.show()

Es turno de construir el DataFrame de Spark <b>df_num_ops_emp</b> que contendrá la información de cada empleado junto con un campo que indicará su número de operaciones atendidas:

In [None]:
df_num_ops_emp = df_empleados.join(df_count_ops_empleado, df_empleados.cod_e == df_count_ops_empleado.code, 'inner').select(df_empleados.cod_e, df_empleados.nombre_e, df_empleados.cargo, df_count_ops_empleado.num_ops_emp)

In [None]:
df_num_ops_emp.show()

En este punto, podremos escribir en la tabla de Cassandra <b>num_ops_por_empleado</b> el contenido del DataFrame de Spark df_num_ops_emp:

In [None]:
df_num_ops_emp.write.format("org.apache.spark.sql.cassandra").mode("append").options(keyspace="banco_c", table="num_ops_por_empleado").save()

Iremos a la terminal de CQLSH para consultar el contenido de dicha tabla de Cassandra:

<b><i>cassandra@cqlsh:banco_c> SELECT * FROM num_ops_por_empleado;</i></b>

Se comprobará que el contenido de esta tabla de Cassandra es el mismo que el del DataFrame df_num_ops_emp.

#### <b>Caso 3</b>

Se dispone de los DataFrames de Spark df_sucursales, df_empleados y df_op_clientes. Vamos a combinarlos para obtener el DataFrame que disponga de la información que nos interesa y al que llamaremos <b>df_clis_por_emp_sucur</b>.

Vamos a recordar la definición de la tabla de Cassandra objetivo:<br><br><b><i>cassandra@cqlsh:banco_c> create table clientes_por_emp_y_sucur(cod_e text, cod_s text, nombre_e text, direc text, nombre_c text, primary key((cod_e, cod_s), nombre_c));</i></b>

In [None]:
df_clis_por_emp_sucur = df_empleados.join(df_op_clientes, df_empleados.cod_e == df_op_clientes.code, 'inner').join(df_sucursales, df_sucursales.cod_s == df_op_clientes.cods, 'inner')

In [None]:
df_clis_por_emp_sucur.show()

Vamos a reestructurar ese DataFrame para que tenga la estrcutura que espera la tabla clientes_por_emp_y_sucur de Cassandra:

In [None]:
df_clis_por_emp_sucur_format = df_clis_por_emp_sucur.select(df_empleados.cod_e, df_sucursales.cod_s, df_empleados.nombre_e, df_sucursales.direc, df_op_clientes.nombre_c)

In [None]:
df_clis_por_emp_sucur_format.show()

Una vez hecho esto, vamos a escribir ese DataFrame en la tabla <b>clientes_por_emp_y_sucur</b> de Cassandra:

In [None]:
df_clis_por_emp_sucur_format.write.format("org.apache.spark.sql.cassandra").mode("append").options(keyspace="banco_c", table="clientes_por_emp_y_sucur").save()

Iremos a CQLSH para comprobar el contenido de dicha tabla en Cassandra:

<b><i>cassandra@cqlsh:banco_c> SELECT * FROM clientes_por_emp_y_sucur;</i></b>

También podemos realizar esa consulta desde Spark y almacenar el resultado en un DataFrame llamado <b>df_from_c</b> de la siguiente forma:

In [None]:
df_from_c = sqlContext.read.format("org.apache.spark.sql.cassandra").options(keyspace="banco_c", table="clientes_por_emp_y_sucur").load()

In [None]:
df_from_c.show()

Para recordar un ejercicio anterior podemos intentar recuperar el contenido de esa tabla empleando el driver de Python para Cassandra:

In [None]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

In [None]:
cluster = Cluster(auth_provider=PlainTextAuthProvider(username="cassandra", password="cassandra"))

In [None]:
sess = cluster.connect("banco_c")

In [None]:
res = sess.execute("SELECT * FROM clientes_por_emp_y_sucur;")

In [None]:
for f in res:
    print(f)