<a href="https://colab.research.google.com/github/fdonadio1979/fraud-detector/blob/master/Migracion_equipo_CLONACION_v1_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Migracion de quipos por CLONACION
Crea nuevos registros clonando la infrastructura del equipo de origen (nuvos tanques, bombas, usuarios, etc), los relaciona con el equipo de destino y entre ellos. Por ultimo **duplica** las transacciones e historiales de tanque pero reflejando la infrastructura (IDs/Foreign keys) del equipo de destino.

En cada etapa podra revisar los datos originales y los datos finales antes de confirmar la migracion

## **Parametros basicos**
Esta seccion se debe ejecutar antes que cualquier otra
------

In [None]:
import sshtunnel
import pandas as pd
import numpy as np
from getpass import getpass
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError
import sys, logging
from termcolor import colored
from datetime import datetime, date, timedelta, time
from tqdm import tqdm_notebook as tqdm
import pymysql
from pymysql.constants import CLIENT
import math

pd.set_option('display.max_columns',10)  # descomentar para imprimir todas las columnas de un dataframe

def prepare_to_sql(obj):
  if (obj is None) or obj == np.nan:
    return 'NULL'
  elif isinstance(obj, str):
    if obj == "None":
      return 'NULL'
    return f"'{obj}'"
  elif isinstance(obj, date):
    return "'{}'".format(obj.strftime('%Y-%m-%d'))
  elif isinstance(obj, datetime):
    return "'{}'".format(obj.strftime('%Y-%m-%d %T'))
  elif isinstance(obj, time):
    return obj.strftime('%T')
  elif isinstance(obj, timedelta):
    return f"'{(datetime.min+obj).strftime('%T')}'"
  else:
    return obj

In [None]:
# Instalo algunos paquetes necesarios
!pip install sshtunnel pymysql termcolor

Collecting sshtunnel
  Downloading sshtunnel-0.4.0-py2.py3-none-any.whl (24 kB)
Collecting pymysql
  Downloading PyMySQL-1.0.2-py3-none-any.whl (43 kB)
[?25l[K     |███████▌                        | 10 kB 21.8 MB/s eta 0:00:01[K     |███████████████                 | 20 kB 27.6 MB/s eta 0:00:01[K     |██████████████████████▍         | 30 kB 31.9 MB/s eta 0:00:01[K     |██████████████████████████████  | 40 kB 17.9 MB/s eta 0:00:01[K     |████████████████████████████████| 43 kB 1.6 MB/s 
Collecting paramiko>=2.7.2
  Downloading paramiko-2.7.2-py2.py3-none-any.whl (206 kB)
[K     |████████████████████████████████| 206 kB 9.8 MB/s 
[?25hCollecting pynacl>=1.0.1
  Downloading PyNaCl-1.4.0-cp35-abi3-manylinux1_x86_64.whl (961 kB)
[K     |████████████████████████████████| 961 kB 42.3 MB/s 
[?25hCollecting cryptography>=2.5
  Downloading cryptography-3.4.7-cp36-abi3-manylinux2014_x86_64.whl (3.2 MB)
[K     |████████████████████████████████| 3.2 MB 42.9 MB/s 
[?25hCollecting bc

In [None]:
# logging
logger = logging.getLogger(__name__)
while logger.hasHandlers(): # esto evita que se dupliquen los handlers si se corre varias veces esta celda
    logger.removeHandler(logger.handlers[0])
stdout_hdr = logging.StreamHandler(sys.stdout)
stdout_hdr.setFormatter(
     logging.Formatter('%(message)s')
)

logger.addHandler(stdout_hdr)
logger.setLevel(logging.DEBUG)

In [None]:
# solicita las credenciales de usuario (linux)
SSH_HOST = 'cintelink.com.ar'
SSH_REMOTE_PORT = 3306
SSH_USER = input('SSH USER: ')
SSH_PASS = getpass("SSH PASSWD : ")
try:
  logger.info(f"Iniciando un tunnel ssh a {SSH_HOST}, al puerto {SSH_REMOTE_PORT}")
  tunnel = sshtunnel.open_tunnel(
      (SSH_HOST, 22),
      ssh_username=SSH_USER,
      ssh_password=SSH_PASS,
      remote_bind_address=('127.0.0.1', SSH_REMOTE_PORT),
  )
  tunnel.start()
except sshtunnel.BaseSSHTunnelForwarderError:
  logger.info(colored("Ocurrio un error al conectarse al servidor. es posible que las credenciales sean invalidas", "red"))
  
else:
  logger.info(colored("Tunnel abierto con exito", "green"))

  # solicita las credenciales de usuario (mysql)
  MYSQL_USER = input("MYSQL USER: ")
  MYSQL_PASS = getpass("MYSQL PASS: ")
  MYSQL_DB_NAME = 'cintelink_db'
  try:
    logger.info("Iniciando la conexion con base de datos")
    db_eng = create_engine(f'mysql+pymysql://{MYSQL_USER}:{MYSQL_PASS}@127.0.0.1:{tunnel.local_bind_port}/{MYSQL_DB_NAME}')
    db_eng.execute("select 'conexion ok'") # test de la conexion
  except OperationalError:
    logger.info(colored("Ocurrio un error al conectarse a la base de datos. es posible que las credenciales sean invalidas", "red"))
  else:
    logger.info(colored("conexion exitosa", "green"))



SSH USER: vlazbal
SSH PASSWD : ··········
Iniciando un tunnel ssh a cintelink.com.ar, al puerto 3306
[32mTunnel abierto con exito[0m
MYSQL USER: vlazbal
MYSQL PASS: ··········
Iniciando la conexion con base de datos
[32mconexion exitosa[0m


In [None]:
SOURCE_DEV = int(input('ID_EQUIPO DE ORIGEN: '))
DEST_DEV = int(input('ID_EQUIPO DE DESTINO: '))

# Read equipos
equipos = pd.read_sql(
    sql = f"select * from fs_equipo where id_equipo in ({SOURCE_DEV},{DEST_DEV})",
    con=db_eng
)
if DEST_DEV not in equipos.id_equipo.unique().astype(int):
  logger.error(colored("El equipo de destino no existe. porfavor solucionar esto antes de continuar", color='red'))
if SOURCE_DEV not in equipos.id_equipo.unique().astype(int):
  logger.error(colored("El equipo de origen no existe.", color='red'))

equipos


ID_EQUIPO DE ORIGEN: 588
ID_EQUIPO DE DESTINO: 705


Unnamed: 0,id_equipo,direccion_ip,id_empresa,descripcion,pass,...,ble_pass,mac_address,lock_first_sync,send_ra,timezone
0,588,1,1029,Tk 10553,2222,...,,,0,0,America/Argentina/Buenos_Aires
1,705,1,1029,Tk 10542,2222,...,,,0,0,America/Argentina/Buenos_Aires


## **Migraciones**
----

### Estructuras generales

In [None]:
# <===================================================================>
# <------------ Actualizacion de Asignacion de Productos ------------->
# <===================================================================>

logger.info("Actualizacion de Asignacion de Productos ...")

# Copio los productos aunque esten dados de baja para evitar inconsistencias
src_rows_prod = pd.read_sql(
    sql = (
        f"SELECT * "
        f"FROM `fs_asignacion_producto` "
        f"WHERE id_equipo={SOURCE_DEV} " 
        ),
        con=db_eng
)
logger.info(colored("Productos originales:", 'green'))
logger.info(src_rows_prod.to_string())

out_rows_prod = src_rows_prod.copy() # copio el df para modificarlo
# cambio los valores al destino
out_rows_prod['id_equipo'] = DEST_DEV
logger.info(colored("Nuevos productos:", 'green',attrs=['bold']))
logger.info(out_rows_prod.to_string())

# consulta si continuar
cont = input("Desea continuar con la insercion de nuevos productos? (y/n): ").lower()
if cont != 'y':
    logger.info("No se realizaron modificaciones en productos")

else:
  logger.info("Insertando nuevos productos")
  # itero sobre las filas para insertar de a una
  for r in out_rows_prod.applymap(prepare_to_sql).astype(str).to_numpy():
    sql_query = (
        f"INSERT INTO`fs_asignacion_producto` "
        f"({','.join(out_rows_prod.columns)}) "
        f"VALUES "
        f"({','.join(r)})"
    )
    logger.debug(sql_query)
    affected_rows = db_eng.execute(sql_query)
  
  logger.info("Se insertaron correctamante los nuevos productos")  
  logger.info("Actualizacion de Asignacion de Productos finalizada.\n")



Actualizacion de Asignacion de Productos ...
Actualizacion de Asignacion de Productos ...
[32mProductos originales:[0m
[32mProductos originales:[0m
   id_equipo producto nombre_producto codigo  precio_litro  baja  coef_var_vol density
0        970        0      Diesel 500   None           0.0     0         0.001    None
   id_equipo producto nombre_producto codigo  precio_litro  baja  coef_var_vol density
0        970        0      Diesel 500   None           0.0     0         0.001    None
[1m[32mNuevos productos:[0m
[1m[32mNuevos productos:[0m
   id_equipo producto nombre_producto codigo  precio_litro  baja  coef_var_vol density
0        882        0      Diesel 500   None           0.0     0         0.001    None
   id_equipo producto nombre_producto codigo  precio_litro  baja  coef_var_vol density
0        882        0      Diesel 500   None           0.0     0         0.001    None
Desea continuar con la insercion de nuevos productos? (y/n): y
Insertando nuevos productos

In [None]:
# <===================================================================>
# <------------------- Actualizacion de Alarmas ---------------------->
# <===================================================================>
logger.info("Actualizacion de Alarmas ...")

# busco las filas originales para verlas
# copio todo aunque este dado de baja para evitar inconsistencias
src_rows_alarms = pd.read_sql(
    sql=f"select * from `fs_alarma` where id_equipo={SOURCE_DEV}",
    con=db_eng
)
src_rows_alarms.drop(columns=['id'], inplace=True)
logger.info(colored("Alarmas originales:", 'green'))
logger.info(src_rows_alarms.to_string())

# preview de lo que va a quedar
out_rows_alarms = src_rows_alarms.copy()
out_rows_alarms['id_equipo'] = DEST_DEV
logger.info(colored("Nuevas alarmas:", 'green',attrs=['bold']))
logger.info(out_rows_alarms.to_string())

# consulta si continuar
cont = input("Desea continuar con la actualizacion de estas alarmas? (y/n): ").lower()
if cont != 'y':
    logger.info("No se realizaron modificaciones en alarmas")

else:
  logger.info("Actualizando alarmas")
  for r in out_rows_alarms.applymap(prepare_to_sql).astype(str).to_numpy():
    sql_query = (
        f"INSERT INTO`fs_alarma` "
        f"({','.join(out_rows_alarms.columns)}) "
        f"VALUES "
        f"({','.join(r)})"
    )
    logger.debug(sql_query)
    affected_rows = db_eng.execute(sql_query)
  
logger.info("Actualizacion de Alarmas finalizada.\n")

Actualizacion de Alarmas ...
Actualizacion de Alarmas ...
[32mAlarmas originales:[0m
[32mAlarmas originales:[0m
Empty DataFrame
Columns: [id_equipo, mail, nombre, nivel_bajo, baja]
Index: []
Empty DataFrame
Columns: [id_equipo, mail, nombre, nivel_bajo, baja]
Index: []
[1m[32mNuevas alarmas:[0m
[1m[32mNuevas alarmas:[0m
Empty DataFrame
Columns: [id_equipo, mail, nombre, nivel_bajo, baja]
Index: []
Empty DataFrame
Columns: [id_equipo, mail, nombre, nivel_bajo, baja]
Index: []
Desea continuar con la actualizacion de estas alarmas? (y/n): y
Actualizando alarmas
Actualizando alarmas
Actualizacion de Alarmas finalizada.

Actualizacion de Alarmas finalizada.



In [None]:
# <===================================================================>
# <------------------ Migracion de Vehiculos --------------------->
# <===================================================================>
logger.info("Migracion de Vehiculos ...")

src_rows_veh = pd.read_sql(
    sql=f"select * from `fs_vehiculos` where id_equipo={SOURCE_DEV}",
    con=db_eng
)
src_rows_veh.drop(columns=['id_vehiculo'], inplace=True)
logger.info(colored("Vehiculos originales:", 'green'))
logger.info(src_rows_veh.to_string())

out_rows_veh = src_rows_veh.copy()

# Se elimina esta seccion por pedido de Valen. Este requerimiento es para consolas muy viejas y actualmente confunde
#
# # pregunto si el tag esta en formato hexadecimal
# is_hex = input('El tag de vehiculo esta en formato hexadecimal? (y/n)').lower().strip()
# if is_hex == 'y':
#   """
#   Convierte los ultimos 6 caracteres de un tag hexadecimal a
#   decimal. El nuevo tag debe tener longitud 10.

#   Nota: Para llegar a la longitud 10, se rellena con 0 a la izquierda.
#   """
#   out_rows_veh.tag_id = out_rows_veh.tag_id.apply(lambda t: str(int(t[-6:], 16)).zfill(10))

out_rows_veh.id_equipo = DEST_DEV
out_rows_veh.sync = np.where(out_rows_veh.baja==0, 1, 0)  # Si no esta baja seteo sync
out_rows_veh.patente = np.where(
    out_rows_veh.patente.isna() | out_rows_veh.patente.isin(['','NaN','nan']), 
    None,  
    out_rows_veh.patente.astype(str)
)
out_rows_veh.id_tipo_patente = np.where(
    out_rows_veh.id_tipo_patente.isna() | out_rows_veh.id_tipo_patente.isin(['','NaN','nan']), 
    None,  
    out_rows_veh.id_tipo_patente.astype('Int64')
)


logger.info(colored("Nuevos Vehiculos:", 'green'))
logger.info(out_rows_veh.to_string())
# consulta si continuar
cont = input("Desea continuar con la migracion de estos vehiculos? (y/n): ").lower()
if cont != 'y':
    logger.info("No se realizaron modificaciones en vehiculos")

else:
  # consulta si eliminar registros viejos
  cont = input("Desea eliminar DE FORMA PERMANENETE los vehiculos existentes en el equipo de destino antes de migrar? (y/N): ").lower()
  if cont != 'y':
      logger.info("No se eliminaran los vehiculos antes de insertar")
  else:
    logger.info(f"Eliminando vehiculos del equipo {DEST_DEV}")
    sql_query = (
            f"DELETE FROM fs_vehiculos WHERE id_equipo={DEST_DEV}"
    )
    logger.debug(sql_query)
    db_eng.execute(sql_query)
    logger.info(f"Se eliminaron exitosamente los vehiculos del equipo {DEST_DEV}")


  logger.info("Insertando vehiculos")
  for r in out_rows_veh.applymap(prepare_to_sql).astype(str).to_numpy():
    sql_query = (
        f"INSERT INTO fs_vehiculos "
        f"({','.join(out_rows_veh.columns)}) "
        f"VALUES "
        f"({','.join(r)})"
        f"ON DUPLICATE KEY UPDATE descripcion=VALUES(descripcion), tag_id=VALUES(tag_id), departamento=VALUES(departamento), limite = VALUES(limite), verificar=VALUES(verificar), llave_tipo=VALUES(llave_tipo), condicion_desautorizacion=VALUES(condicion_desautorizacion), patente=VALUES(patente), id_tipo_patente=VALUES(id_tipo_patente), verificar=VALUES(verificar), main_id=VALUES(main_id)"
    )
    logger.debug(sql_query)
    db_eng.execute(sql_query)
  logger.info("Se insertaron exitosamente los vehiculos migrados")

logger.info("Migracion de Vehiculos finalizada.")

Migracion de Vehiculos ...
[32mVehiculos originales:[0m
     id_equipo      tag_id vehiculo                                    descripcion departamento limite verificar codigo odometro_inicio odometro_fin cargas_max_dia llave_tipo autorizacion condicion_desautorizacion cantidad_total cargas_hoy cargas_hasta_hoy ultima_fecha  sync  baja main_id  patente  id_tipo_patente
0          588  0009848058   000001                                         000001         0000      9         0   1234          000000       000000             01          0            0                         1      00183.779         01              020   2021-02-11     0     0  000001     None              NaN
1          588  000C000005   000002                                         000002         0001      9         0   1234          000000       000000             01          8            0                         1      00000.000         00              000   2001-01-01     0     0  000002     None            

In [None]:
# <===================================================================>
# <------------------ Migracion de Usuarios ---------------------->
# <===================================================================>

logger.info("Migracion de Usuarios ...")

# busco los usuarios aunque esten dados de baja
src_rows_us = pd.read_sql(
    sql=f"select * from `fs_usuarios_fuelsentry` where id_equipo={SOURCE_DEV}",
    con=db_eng
)
src_rows_us.drop(columns=['id_usuario_fuelsentry'], inplace=True)
logger.info(colored("Usuarios originales:", 'green'))
logger.info(src_rows_us.to_string())

out_rows_us = src_rows_us.copy()

# Eliminado por peticion de Valen. Este requerimiento es de equipos muy viejos y ya no se usa
#
# # pregunto si el tag esta en formato hexadecimal
# is_hex = input('El tag ed vehiculo esta en formato hexadecimal? (y/n)').lower().strip()
# if is_hex == 'y':
#   """
#   Convierte los ultimos 6 caracteres de un tag hexadecimal a
#   decimal. El nuevo tag debe tener longitud 10.

#   Nota: Para llegar a la longitud 10, se rellena con 0 a la izquierda.
#   """
#   out_rows_us.tag_id = out_rows_us.tag_id.apply(lambda t: str(int(t[-6:], 16)).zfill(10))

out_rows_us.id_equipo = DEST_DEV
out_rows_us.sync = np.where(out_rows_us.baja==0, 1, 0)  # Si no esta baja seteo sync
out_rows_us.dni = np.where(
    out_rows_us.dni.isna() | out_rows_us.dni.isin(['','NaN','nan']), 
    None,  
    out_rows_us.dni.astype('Int64')
)
out_rows_us.id_tipo_dni = np.where(
    out_rows_us.id_tipo_dni.isna() | out_rows_us.id_tipo_dni.isin(['','NaN','nan']), 
    None,  
    out_rows_us.id_tipo_dni.astype('Int64')
)


logger.info(colored("Nuevos Usuarios:", 'green'))
logger.info(out_rows_us.to_string())
# consulta si continuar
cont = input("Desea continuar con la migracion de estos usuarios? (y/n): ").lower()
if cont != 'y':
    logger.info("No se realizaron modificaciones en usuarios")
else:
  # consulta si eliminar registros viejos
  cont = input("Desea eliminar DE FORMA PERMANENETE los usuarios existentes en el equipo de destino antes de migrar? (y/n): ").lower()
  if cont != 'y':
      logger.info("No se eliminaran los usuarios antes de insertar")
  else:
    logger.info(f"Eliminando usuarios del equipo {DEST_DEV}")
    sql_query = (
            f"DELETE FROM fs_usuarios_fuelsentry WHERE id_equipo={DEST_DEV}"
    )
    logger.debug(sql_query)
    db_eng.execute(sql_query)
    logger.info(f"Se eliminaron exitosamente los usuarios del equipo {DEST_DEV}")


  logger.info("Insertando usuarios")
  for r in out_rows_us.applymap(prepare_to_sql).astype(str).to_numpy():
    sql_query = (
        f"INSERT INTO fs_usuarios_fuelsentry "
        f"({','.join(out_rows_us.columns)}) "
        f"VALUES "
        f"({','.join(r)})"
    )
    logger.debug(sql_query)
    db_eng.execute(sql_query)


logger.info("Migracion de Usuarios finalizada.\n")

Migracion de Usuarios ...
[32mUsuarios originales:[0m
    Nombre Apellido Cargo      tag_id  id_equipo usuario_fuelsentry departamento codigo autorizacion totalizador cargas_totales  sync  baja restart_date id_tipo_dni   dni
0      N/A      N/A   N/A  9876543210          2               0000         0000   0000            0   00000.000            000     0     1         None        None  None
1     Demo     Demo   N/A  0011656258          2               0001         0000   0000            1   00146.600            003     0     0         None        None  None
2      N/A      N/A   N/A  0004598792          2               0002         0000   0000            1   00000.000            000     0     0         None        None  None
3      N/A      N/A   N/A  5100B1DC42          2               0003         0001   0000            1   00273.400            008     0     0         None        None  None
4      N/A      N/A   N/A  0009848060          2               0004         0000   0000  

In [None]:
# <=====================================================================>
# <--------------------- Migracion de Tanques ---------------------->
# <=====================================================================>

logger.info("Migracion de Tanques ...")

# se cargan todos los tanques aunque esten dados de baja
src_rows_tanks = pd.read_sql(
    sql=f"select * from fs_tanques where id_equipo={SOURCE_DEV}",
    con=db_eng
)
src_rows_tanks.drop(columns=['id_tanque'], inplace=True)
logger.info(colored("Tanques originales:", 'green'))
logger.info(src_rows_tanks.to_string())

out_rows_tanks = src_rows_tanks.copy()
out_rows_tanks.id_equipo = DEST_DEV
out_rows_tanks.sync = np.where(out_rows_tanks.baja==0, 1, 0)  # Si no esta baja seteo sync

logger.info(colored("Nuevos Tanques:", 'green'))
logger.info(out_rows_tanks.to_string())
# consulta si continuar
cont = input("Desea continuar con la migracion de estos tanques? (y/n): ").lower()
if cont != 'y':
    logger.info("No se realizaron modificaciones en tanques")
else:
  logger.info("Insertando tanques")
  for r in out_rows_tanks.applymap(prepare_to_sql).astype(str).to_numpy():
    sql_query = (
        f"INSERT INTO fs_tanques "
        f"({','.join(out_rows_tanks.columns)}) "
        f"VALUES "
        f"({','.join(r)})"
    )
    logger.debug(sql_query)
    db_eng.execute(sql_query)

logger.info("Migracion de Tanques finalizada.\n")

In [None]:
# <===================================================================>
# <--------------------- Migracion de Bombas ------------------------->
# <===================================================================>

logger.info("Migracion de Bombas ...")

# Se cargan todas las bombas aunque esten dadas de baja para no corromper relaciones
src_rows_pumps = pd.read_sql(
    sql=f"select * from fs_bombas where id_equipo={SOURCE_DEV}",
    con=db_eng
)
src_rows_pumps.drop(columns=['id_bomba'], inplace=True)
logger.info(colored("Bombas originales:", 'green'))
logger.info(src_rows_pumps.to_string())

out_rows_pumps = src_rows_pumps.copy()
out_rows_pumps.id_equipo = DEST_DEV
out_rows_pumps.sync = np.where(out_rows_pumps.baja==0, 1, 0)  # Si no esta baja seteo sync

# convierto los pulsos por litro
out_rows_pumps.pulsos_litro =  out_rows_pumps.pulsos_litro.apply(
    lambda pl: str('{:.4f}'.format(float(pl) / 10)).zfill(9) if pl.isdigit() and len(pl) == 4 else pl
)

# Leo los tanques nuevos y viejos para relacionar los nuevos id_tanque
tanks = pd.read_sql(
    sql=(
        f"select o.id_tanque as old_id_tanque, n.id_tanque as new_id_tanque from "
        f"(select id_tanque, tanque from fs_tanques where id_equipo={SOURCE_DEV}) as o "
        f"join "
        f"(select id_tanque, tanque from fs_tanques where id_equipo={DEST_DEV}) as n "
        f"on o.tanque=n.tanque"
    ),
    con=db_eng
)
out_rows_pumps = out_rows_pumps.rename(columns={'id_tanque':'old_id_tanque'})
out_rows_pumps = out_rows_pumps.join(tanks.set_index('old_id_tanque'), on='old_id_tanque')
out_rows_pumps['id_tanque'] = out_rows_pumps.new_id_tanque
out_rows_pumps = out_rows_pumps.drop(columns=['old_id_tanque', 'new_id_tanque'])[src_rows_pumps.columns]

logger.info(colored("Nuevos Bombas:", 'green'))
logger.info(out_rows_pumps.to_string())
# consulta si continuar
cont = input("Desea continuar con la migracion de estas bombas? (y/n): ").lower()
if cont != 'y':
    logger.info("No se realizaron modificaciones en bombas")
else:
  logger.info("Insertando bombas")
  for r in out_rows_pumps.applymap(prepare_to_sql).astype(str).to_numpy():
    sql_query = (
        f"INSERT INTO fs_bombas "
        f"({','.join(out_rows_pumps.columns)}) "
        f"VALUES "
        f"({','.join(r)})"
    )
    logger.debug(sql_query)
    db_eng.execute(sql_query)

logger.info("Migracion de Bombas finalizada.\n")

In [None]:
# <===================================================================>
# <-------------------- Migracion de Nodos ----------------------->
# <===================================================================>
logger.info("Migracion de Nodos ...")

# Copio todos los nodos aunque esten de baja para evitar inconsistencias
src_rows_nodes = pd.read_sql(
    sql=f"select * from fs_nodos where id_equipo={SOURCE_DEV}",
    con=db_eng
)
src_rows_nodes.drop(columns=['id_nodo'],inplace=True)
logger.info(colored("Nodos originales:", 'green'))
logger.info(src_rows_nodes.to_string())

out_rows_nodos = src_rows_nodes.copy()
out_rows_nodos.id_equipo = DEST_DEV
out_rows_nodos.sync = np.where(out_rows_nodos.baja==0, 1, 0)  # Si no esta baja seteo sync

logger.info(colored("Nuevos Nodos:", 'green'))
logger.info(out_rows_nodos.to_string())
# consulta si continuar
cont = input("Desea continuar con la migracion de estos Nodos? (y/n): ").lower()
if cont != 'y':
    logger.info("No se realizaron modificaciones en Nodos")
else:
  logger.info("Insertando Nodos")
  for r in out_rows_nodos.applymap(prepare_to_sql).astype(str).to_numpy():
    sql_query = (
        f"INSERT INTO fs_nodos "
        f"({','.join(out_rows_nodos.columns)}) "
        f"VALUES "
        f"({','.join(r)})"
    )
    logger.debug(sql_query)
    db_eng.execute(sql_query)


logger.info("Migracion de Nodos finalizada.\n")

In [None]:
# <===================================================================>
# <------------------ Migracion de Sensores Sonda -------------------->
# <===================================================================>
logger.info("Migracion de Sensores Sonda ...")

# Cargo todo aunque este de baja para evitar inconsistencias
src_rows_ss = pd.read_sql(
    sql=f"select * from fs_sensores_sonda where id_equipo={SOURCE_DEV}",
    con=db_eng
)
src_rows_ss.drop(columns=['id_sonda'], inplace=True)
logger.info(colored("Sensores sonda originales:", 'green'))
logger.info(src_rows_ss.to_string())

out_rows_ss = src_rows_ss.copy()
out_rows_ss.id_equipo = DEST_DEV
out_rows_ss.sync = np.where(out_rows_ss.baja==0, 1, 0) # Si no esta baja, seteo sync

logger.info(colored("Nuevos Sensores sonda:", 'green'))
logger.info(out_rows_ss.to_string())
# consulta si continuar
cont = input("Desea continuar con la migracion de estos Sensores sonda? (y/n): ").lower()
if cont != 'y':
    logger.info("No se realizaron modificaciones en Sensores sonda")
else:
  logger.info("Insertando Sensores sonda")
  for r in out_rows_ss.applymap(prepare_to_sql).astype(str).to_numpy():
    sql_query = (
        f"INSERT INTO fs_sensores_sonda "
        f"({','.join(out_rows_ss.columns)}) "
        f"VALUES "
        f"({','.join(r)})"
    )
    logger.debug(sql_query)
    db_eng.execute(sql_query)

logger.info("Migracion de Sensores Sonda finalizada.\n")

### Transacciones e historial de tanques

> Indented block



In [None]:
# <===================================================================>
# <---------------- Migracion de Historial de Tanques ---------------->
# <===================================================================>

logger.info("Migracion de Historial Tanques ...")
TIMESERIES_FROM = input("Migrar desde [yyyy-mm-dd]. (Por defecto seran todos): ").strip()
TIMESERIES_FROM = datetime.strptime(TIMESERIES_FROM, "%Y-%m-%d").strftime("%Y-%m-%d") if TIMESERIES_FROM else '2000-01-01'
TIMESERIES_UPTO = input("Migrar hasta [yyyy-mm-dd]. (Por defecto seran todos): ").strip()
TIMESERIES_UPTO = datetime.strptime(TIMESERIES_UPTO, "%Y-%m-%d").strftime("%Y-%m-%d") if TIMESERIES_UPTO else '2100-01-01'

# busco las tablas 'sis_historial_%'
hist_tables = pd.read_sql(
    sql=f"select table_name from information_schema.tables where table_schema='{MYSQL_DB_NAME}' and table_name like 'sis_historial_%'",
    con=db_eng
)
hist_tables = hist_tables.sort_values('table_name', ascending=True)
#logger.debug(hist_tables.to_string())

# leo un tanklog de cada tabla para una demostracion
src_rows_hist = pd.DataFrame()
logger.info("Loading preview..")
for t in tqdm(hist_tables.table_name.to_numpy()):
  a_row = pd.read_sql(
      sql=f"select * from `{t}` where id_equipo={SOURCE_DEV} and fecha between '{TIMESERIES_FROM}' and '{TIMESERIES_UPTO}' limit 5",
      con=db_eng
  )
  src_rows_hist = src_rows_hist.append(a_row)
logger.info(colored("Tanklogs originales", 'green') + colored(" - Esto es solo una demo con una fila de cada tabla de historial".upper(),'magenta'))
logger.info(src_rows_hist.to_string())


# Leo los tanques nuevos y viejos para relacionar los nuevos id_tanque
tanks = pd.read_sql(
    sql=(
        f"select tanque, id_tanque as new_id_tanque from fs_tanques "
        f"where id_equipo={DEST_DEV} "
    ),
    con=db_eng
)
tanks.tanque = tanks.tanque.astype(int)
# demo de como queda la salida
out_rows_hist = src_rows_hist.copy()
out_rows_hist.id_equipo = DEST_DEV
# Asigno los nuevos ids de tanque en los tanklogs
out_rows_hist['join_tanque'] = out_rows_hist.tank_site_id.astype(int) # convirto a entero para que '01' = '1'
out_rows_hist = out_rows_hist.join(tanks.set_index('tanque'), on='join_tanque', how='left')
out_rows_hist.id_tanque = out_rows_hist.new_id_tanque
out_rows_hist = out_rows_hist[src_rows_hist.columns] # Reordeno las columnas

logger.info(colored("Nuevos tanklogs", 'green') + colored(" - Esto es solo una demo con una fila de cada tabla de historial".upper(),'magenta'))
logger.info(out_rows_hist.to_string())

# consulta si continuar
cont = input("Desea continuar con la migracion de estos Tanklogs? (y/n): ").lower()
if cont != 'y':
    logger.info("No se realizaron modificaciones en Tanklogs")
else:
  logger.info("Preparando Tanklogs")

  queries = []
  for t in tqdm(hist_tables.table_name.to_numpy()):
#    logger.info(f"Buscando historial de {t}")
    # Levanta todo aunque este dado de baja para que no queden inconsistencias
    src_rows_hist = pd.read_sql(
      sql=f"select * from `{t}` where id_equipo={SOURCE_DEV} and fecha between '{TIMESERIES_FROM}' and '{TIMESERIES_UPTO}'",
      con=db_eng
    )
#    logger.info("Reasignando Foreign keys")
    # Copio el df para la salida
    out_rows_hist = src_rows_hist.copy()
    out_rows_hist.id_equipo = DEST_DEV
    # Asigno los nuevos ids de tanque en los tanklogs
    out_rows_hist['join_tanque'] = out_rows_hist.tank_site_id.astype(int)
    out_rows_hist = out_rows_hist.join(tanks.set_index('tanque'), on='join_tanque', how='left')
    out_rows_hist.id_tanque = out_rows_hist.new_id_tanque
    out_rows_hist = out_rows_hist[src_rows_hist.columns] # Reordeno las columnas
    # Verifico que no haya transacciones con ids nulos
    if out_rows_hist.id_tanque.isna().sum():
      cont = logger.error(colored("No es posible completar la operacion porque existen filas con id_tanque nulo.\n Porfavor cree los tanques faltantes antes de continuar: ", 'red'))
      logger.debug(colored(out_rows_hist[out_rows_hist.id_tanque.isna()].to_string(),'red'))
      queries = None
      break

#    logger.info(f"Generando queries {t}")
    for r in out_rows_hist.applymap(prepare_to_sql).astype(str).to_numpy():
      sql_query = (
          f"INSERT INTO {t} ({','.join(out_rows_hist.columns)}) "
          f"VALUES "
          f"({','.join(r)})"
      )
      #logger.debug(sql_query)
      queries.append(sql_query)
    #logger.debug(f"New rows: {len(out_rows_hist)}")
  if queries:
    cont = "y" == input(
        "Desea continuar con la insercion de Tanklogs? (y/N): "
    )
    if cont:
      logger.info("Insertando Tanklogs")
      conn_params = {
          "host": '127.0.0.1',
          "password": MYSQL_PASS,
          "port": tunnel.local_bind_port,
          "user": MYSQL_USER,
          "database": MYSQL_DB_NAME,
          "client_flag": CLIENT.MULTI_STATEMENTS
      }
      chunk = 500 # inserto de a 500 transacciones
      # lo hago con pymysql para poder ejecutar muchas queries a la vez. sqlalchemy no lo soporta
      with pymysql.connect(**conn_params) as con:
          for i in tqdm(range(math.ceil(len(queries)/chunk))):
            con.cursor().execute(";".join(queries[i*chunk:(i+1)*chunk]))
          con.commit()
logger.info("Migracion de Historial Tanques finalizada.\n")

Migracion de Historial Tanques ...


KeyboardInterrupt: ignored

In [None]:
# <===================================================================>
# <-------------------- Migracion de Transacciones ------------------->
# <===================================================================>

logger.info("Migracion de Transacciones ...")

TIMESERIES_FROM = input("Migrar desde [yyyy-mm-dd]. (Por defecto seran todos): ").strip()
TIMESERIES_FROM = datetime.strptime(TIMESERIES_FROM, "%Y-%m-%d").strftime("%Y-%m-%d") if TIMESERIES_FROM else '2000-01-01'
TIMESERIES_UPTO = input("Migrar hasta [yyyy-mm-dd]. (Por defecto seran todos): ").strip()
TIMESERIES_UPTO = datetime.strptime(TIMESERIES_UPTO, "%Y-%m-%d").strftime("%Y-%m-%d") if TIMESERIES_UPTO else '2100-01-01'


# busco las tablas 'sis_historial_%'
transa_tables = pd.read_sql(
    sql=f"select table_name from information_schema.tables where table_schema='{MYSQL_DB_NAME}' and table_name like 'sis_transa_%'",
    con=db_eng
)
transa_tables = transa_tables.sort_values('table_name', ascending=True)
#logger.debug(transa_tables.to_string())
show_preview = "n" != input(
    "Desea un preview (Y/n): "
).lower()
if show_preview:
  # leo una transaccion de cada tabla para un preview
  src_rows_transa = pd.DataFrame()
  for t in tqdm(transa_tables.table_name.to_numpy()):
    a_row = pd.read_sql(
        sql=f"select * from `{t}` where id_equipo={SOURCE_DEV} and fecha between '{TIMESERIES_FROM}' and '{TIMESERIES_UPTO}' limit 3",
        con=db_eng
    )
    src_rows_transa = src_rows_transa.append(a_row)

  logger.info(colored("Transacciones originales", 'green') + colored(" - Esto es solo una demo con una fila de cada tabla de transacciones".upper(),'magenta'))
  logger.info(src_rows_transa.to_string())


# Leo los tanques nuevos y viejos para relacionar los nuevos id_tanque
tanks = pd.read_sql(
    sql=(
        f"select id_tanque as id_tanque, tanque from fs_tanques where id_equipo={DEST_DEV} "
    ),
    con=db_eng
)
tanks.tanque = tanks.tanque.astype(int)
# Leo las bombas nuevas y viejas para cambiar los FK
pumps = pd.read_sql(
    sql=(
        f"select id_bomba as id_bomba, bomba from fs_bombas where id_equipo={DEST_DEV}"
    ),
    con=db_eng
)
# Leo los vehiculos nuevos y viejos para actualizar los FK
vehs = pd.read_sql(
    sql=(
        f"select id_vehiculo as id_vehiculo, vehiculo from fs_vehiculos where id_equipo={DEST_DEV} "
    ),
    con=db_eng
)

# Leo los usuarios nuevos y viejos para actualizar los FK
users = pd.read_sql(
    sql=(
        f"select id_usuario_fuelsentry as id_usuario, usuario_fuelsentry as usuario from fs_usuarios_fuelsentry where id_equipo={DEST_DEV}"
    ),
    con=db_eng
)

if show_preview:
  # demo de como queda la salida
  out_rows_transa = src_rows_transa.copy()
  out_rows_transa.id_equipo = DEST_DEV
  # Cambio los foreign keys
  out_rows_transa.rename(
      columns={
        'id_tanque': 'old_id_tanque',
        'id_usuario': 'old_id_usuario',
        'id_vehiculo': 'old_id_vehiculo',
        'id_bomba': 'old_id_bomba'
      },
      inplace=True
  )

  out_rows_transa['join_tanque'] = out_rows_transa.tank_site_id.astype(int) # nuevo campo para que 01 = 1
  out_rows_transa = out_rows_transa.join(users.set_index('usuario'), on='user_site_id', how='left')
  out_rows_transa = out_rows_transa.join(tanks.set_index('tanque'), on='join_tanque', how='left')
  out_rows_transa = out_rows_transa.join(vehs.set_index('vehiculo'), on='veh_site_id', how='left')
  out_rows_transa = out_rows_transa.join(pumps.set_index('bomba'), on='pump_site_id', how='left')

  #reacomodo y recorto columnas
  out_rows_transa = out_rows_transa[src_rows_transa.columns]
  logger.info(colored("Nuevas Transacciones", 'green') + colored(" - Esto es solo una demo con una fila de cada tabla de transacciones".upper(),'magenta'))
  logger.info(out_rows_transa.to_string())

# consulta si continuar
cont = input("Desea continuar con la migracion de estas Transacciones? (y/N): ").lower()
if cont != 'y':
    logger.info("No se realizaron modificaciones en Transacciones")
else:
  logger.info("Preparando Transacciones")
  queries = []
  for t in tqdm(transa_tables.table_name.to_numpy()):
    src_rows_transa = pd.read_sql(
      # levanto todas las transacciones aunque esten dadas de baja
      sql=f"select * from `{t}` where id_equipo={SOURCE_DEV} and fecha between '{TIMESERIES_FROM}' and '{TIMESERIES_UPTO}'",
      con=db_eng
    )
    # genero el df de salida
    out_rows_transa = src_rows_transa.copy()
    out_rows_transa.id_equipo = DEST_DEV
    # Cambio los foreign keys
    out_rows_transa.rename(
        columns={
          'id_tanque': 'old_id_tanque',
          'id_usuario': 'old_id_usuario',
          'id_vehiculo': 'old_id_vehiculo',
          'id_bomba': 'old_id_bomba'
        },
        inplace=True
    )
    out_rows_transa['join_tanque'] = out_rows_transa.tank_site_id.astype(int) # nuevo campo para que 01 = 1
    out_rows_transa = out_rows_transa.join(users.set_index('usuario'), on='user_site_id', how='left')
    out_rows_transa = out_rows_transa.join(tanks.set_index('tanque'), on='join_tanque', how='left')
    out_rows_transa = out_rows_transa.join(vehs.set_index('vehiculo'), on='veh_site_id', how='left')
    out_rows_transa = out_rows_transa.join(pumps.set_index('bomba'), on='pump_site_id', how='left')
    #reacomodo y recorto columnas
    out_rows_transa = out_rows_transa[src_rows_transa.columns]

    # Verifico que no haya id_nulos
    if out_rows_transa.id_tanque.isna().sum() or \
      out_rows_transa.id_vehiculo.isna().sum() or \
      out_rows_transa.id_bomba.isna().sum() or \
      out_rows_transa.id_usuario.isna().sum() :
        cont = logger.error(colored("No es posible completar la operacion porque existen filas con IDs nulos (NaN).\n Porfavor cree los registros faltantes antes de continuar: ", 'red'))
        logger.debug(
            colored(
                out_rows_transa[
                  out_rows_transa.id_tanque.isna() | 
                  out_rows_transa.id_vehiculo.isna() | 
                  out_rows_transa.id_bomba.isna() | 
                  out_rows_transa.id_usuario.isna() 
                ][[
                   'id_transaccion','id_equipo','fecha','hora','cantidad',
                   'id_vehiculo','veh_site_id','veh_tag_id',
                   'id_usuario','user_site_id','user_tag_id',
                   'id_bomba', 'pump_site_id',
                   'id_tanque', 'tank_site_id'
                ]].to_string(),
                'red'
            )
        )
        cont = "y" == input(
            "Desea forzar la insercion de transacciones? \nUtilizar esto solo como ultimo recurso ya que quedaran datos sin relacionar (y/N): "
        )
        if cont:
          cont = "FiXiT" == input("Para FORZAR la insercion tipee 'FiXiT'")
          if cont:
            logger.info("Completando campos invalidos con valores por defecto")
            # TODO chequear si hay un registro dummy DEL EQUIPO en cada caso. sino crearlo. luego asignarlo a ese id.
            # esto es para evitr problemas de multitenant
            """
            Fix tanques
            """
            
            out_rows_transa.id_tanque = np.where(out_rows_transa.id_tanque.isna(), 0 ,out_rows_transa.id_tanque)
            out_rows_transa.id_bomba = np.where(out_rows_transa.id_bomba.isna(), 0 ,out_rows_transa.id_bomba)
            out_rows_transa.id_usuario = np.where(out_rows_transa.id_usuario.isna(), 0 ,out_rows_transa.id_usuario)
            out_rows_transa.id_vehiculo = np.where(out_rows_transa.id_vehiculo.isna(), 0 ,out_rows_transa.id_vehiculo)
          else:
            logger.info("No se realizaron modificaciones en Transacciones")
            queries = None
            break
        else:
          logger.info("No se realizaron modificaciones en Transacciones")
          queries = None
          break
    for r in out_rows_transa.applymap(prepare_to_sql).astype(str).to_numpy():
      sql_query = (
          f"INSERT INTO {t} ({','.join(out_rows_transa.columns)}) "
          f"VALUES "
          f"({','.join(r)})"
      )
      #logger.debug(sql_query)
      queries.append(sql_query)
    #logger.debug(f"New rows: {len(out_rows_transa)}")
  if queries:
    cont = "y" == input(
            "Desea continuar con la insercion de transacciones? (y/N): "
        )
    if cont:
      logger.info("Insertando Transacciones")
      conn_params = {
          "host": '127.0.0.1',
          "password": MYSQL_PASS,
          "port": tunnel.local_bind_port,
          "user": MYSQL_USER,
          "database": MYSQL_DB_NAME,
          "client_flag": CLIENT.MULTI_STATEMENTS
      }
      chunk = 500 # inserto de a 500 transacciones
      # lo hago con pymysql para poder ejecutar muchas queries a la vez. sqlalchemy no lo soporta
      with pymysql.connect(**conn_params) as con:
          for i in tqdm(range(math.ceil(len(queries)/chunk))):
            con.cursor().execute(";".join(queries[i*chunk:(i+1)*chunk]))
          con.commit()

logger.info("Migracion de Transacciones finalizada.\n")

## Otras migraciones


In [None]:
# <==============================================================================>
# <------------------ Migracion de Configuraciones de equipo -------------------->
# <==============================================================================>