# Práctica ETL - Mapeado de datos LAV

- **Master in Data Science - UC, UIMP, CSIC.**
- **Subject:** Data Life Cycle.
- **Author:** Ignacio Iker Prado Rujas.

## Generación y preparación de las fuentes de datos

Cargamos las librerías relevantes:

In [1]:
import sqlite3
import json
from faker import Faker

Creamos un Faker para cada una de las fuentes, de acuerdo a su país de procedencia:

In [2]:
fake_UK = Faker('en_UK')
fake_ES = Faker('es_ES')
#fake_US = Faker('en_US')

### Librería británica

Generamos datos para la librería británica:

In [3]:
!rm bookstore_UK.db

In [4]:
conn = sqlite3.connect('bookstore_UK.db')
c = conn.cursor()

# Creamos la tabla Book
c.execute('''CREATE TABLE Book
             (idBook integer primary key,
              title text,
              authors text, 
              publisher text, 
              year integer,
              price text,
              ISBN text)''')

# Generamos valores con Faker
fake_vals = []
for i in range(100):
    idBook = i
    title = fake_UK.catch_phrase()
    authors = fake_UK.name()
    publisher = fake_UK.company()
    year = fake_UK.year()
    price = '£' + str(fake_UK.random_int(min=1, max=50))
    ISBN = fake_UK.isbn13()
    fake_vals.append([idBook, title, authors, publisher, year, price, ISBN])

# Los introducimos en la base de datos
c.executemany('''INSERT INTO Book (idBook, title, authors, publisher, year, price, ISBN)
                 VALUES (?, ?, ?, ?, ?, ?, ?)''', fake_vals)

# Miramos los primeros valores insertados
c.execute('SELECT * FROM Book where idBook < 3;')
for row in c.fetchall():
    print(row)

conn.commit()
conn.close()

(0, 'Managed national projection', 'Hannah Lane', 'Ellis, Holmes and Harrison', 1971, '£31', '978-0-15-448990-6')
(1, 'Progressive scalable budgetary management', 'Ms. Carol Woods', 'Harris, Swift and Kent', 1990, '£19', '978-0-332-04962-5')
(2, 'Balanced leadingedge utilization', 'Mr. Neil Campbell', 'Martin, Carter and Cooke', 1972, '£11', '978-1-908737-80-9')


### Librería española

Ahora para la librería española:

In [5]:
!rm bookstore_ES.db

In [6]:
conn = sqlite3.connect('bookstore_ES.db')
c = conn.cursor()

# Creamos la tabla Persona
c.execute('''CREATE TABLE Persona
             (idPersona integer primary key,
              Nombre text,
              Apellido1 text, 
              Apellido2 text, 
              FechaNacimiento date)''')

# Creamos la tabla Publicacion
c.execute('''CREATE TABLE Publicacion
             (idPublicacion integer primary key,
              Titulo text,
              Editorial text, 
              Lengua text, 
              FechaPublicacion date,
              Precio float)''')

# Creamos la tabla Persona_has_Pubicacion
c.execute('''CREATE TABLE Persona_has_Publicacion
             (Persona_idPersona integer,
              Publicacion_idPublicacion integer,
              Rol_idRol integer, 
              primary key(Persona_idPersona, 
                          Publicacion_idPublicacion))''')

# Creamos la tabla Rol
c.execute('''CREATE TABLE Rol
             (idRol integer primary key,
              rol text)''')

# Generamos valores con Faker

# Para Persona
n = 200
fake_vals = []
for i in range(n):
    idPersona = i
    Nombre = fake_ES.name().split()[0]
    Apellido1 = fake_ES.name().split()[1]
    aux = fake_ES.name().split()[1]
    Apellido2 = fake_ES.name().split()[1] if aux in ['el', 'del', 'de'] else aux
    FechaNacimiento = str(fake_ES.date_between(end_date='today', start_date='-150y'))
    fake_vals.append([idPersona, Nombre, Apellido1, Apellido2, FechaNacimiento])
    
c.executemany('''INSERT INTO Persona (idPersona, Nombre, Apellido1, Apellido2, FechaNacimiento)
                 VALUES (?, ?, ?, ?, ?)''', fake_vals)

# Para Publicacion

fake_vals = []
m = 300
for i in range(m):
    idPublicacion = i
    Titulo = fake_ES.catch_phrase()
    Editorial = fake_ES.company()
    Lengua = fake_ES.language_code()
    FechaPublicacion = str(fake_ES.date_between(end_date='today', start_date='-120y'))
    Precio = str(fake_UK.random_int(min=1, max=50)) + '€'
    fake_vals.append([idPublicacion, Titulo, Editorial, Lengua, FechaPublicacion, Precio])
    
c.executemany('''INSERT INTO Publicacion (idPublicacion, Titulo, Editorial, Lengua, FechaPublicacion, Precio)
                 VALUES (?, ?, ?, ?, ?, ?)''', fake_vals)

# Para Rol
r = 30
fake_vals = []
for i in range(r):
    idRol = i
    Rol = fake_ES.word()
    fake_vals.append([idRol, Rol])
    
c.executemany('''INSERT INTO Rol (idRol, Rol)
                 VALUES (?, ?)''', fake_vals)

# Para Persona_has_Publicacion
fake_vals = []
set_aux = set()
for i in range(500):
    Persona_idPersona = fake_ES.random_int(min=0, max=n-1)
    Publicacion_idPublicacion = fake_ES.random_int(min=0, max=m-1)
    while (Persona_idPersona, Publicacion_idPublicacion) in set_aux:
        Persona_idPersona = fake_ES.random_int(min=0, max=n-1)
        Publicacion_idPublicacion = fake_ES.random_int(min=0, max=m-1)
    else:
        set_aux.add((Persona_idPersona, Publicacion_idPublicacion))
    Rol_idRol = fake_ES.random_int(min=0, max=r-1)
    fake_vals.append([Persona_idPersona, Publicacion_idPublicacion, Rol_idRol])
    
c.executemany('''INSERT INTO Persona_has_Publicacion (Persona_idPersona, Publicacion_idPublicacion, Rol_idRol)
                 VALUES (?, ?, ?)''', fake_vals)

# Miramos los primeros valores insertados
c.execute('SELECT * FROM Persona where idPersona < 3;')
print('Personas:')
for row in c.fetchall():
    print(row)
c.execute('SELECT * FROM Publicacion where idPublicacion < 3;')
print('Publicaciones:')
for row in c.fetchall():
    print(row)
c.execute('SELECT * FROM Rol where idRol < 3;')
print('Roles:')
for row in c.fetchall():
    print(row)
c.execute('SELECT * FROM Persona_has_Publicacion where Publicacion_idPublicacion in (3, 8);')
print('Publicaciones de personas:')
for row in c.fetchall():
    print(row)

conn.commit()
conn.close()

Personas:
(0, 'Sara', 'Ávila', 'Aguilar', '2013-09-19')
(1, 'Josefa', 'Córdoba-Cazorla', 'España-Llabrés', '1891-04-13')
(2, 'Gema', 'Cabañas', 'Esparza', '1894-11-29')
Publicaciones:
(0, 'Robust upward-trending database', 'Sancho Inc', 'ml', '1992-04-06', '50€')
(1, 'Reverse-engineered client-server data-warehouse', 'Méndez, Gonzalo and Rosell', 'cv', '1915-10-20', '28€')
(2, 'Advanced actuating open architecture', 'Esparza, Luján and Luján', 'fr', '1941-11-02', '28€')
Roles:
(0, 'molestias')
(1, 'cum')
(2, 'tempora')
Publicaciones de personas:
(88, 8, 23)


### Librería americana

Probamos a cargar los datos de la librería americana:  
**Nota:** Para poder parsear el fichero de manera adecuada, fue necesario introducir comas entre cada elemento del `json`.

In [7]:
with open('USA_books.json', encoding='utf-8') as f:
    data = json.load(f)
eval(str(data[0]))

{'index': 0,
 'title': 'National kind probably later across against require can.',
 'last_name': 'Hall',
 'first_name': 'Debbie',
 'publisher': 'Stewart Group',
 'ISBN': '978-0-88280-857-4',
 'summary': 'Exist change affect still consumer professional win audience. Responsibility generation picture.\nStop team recently administration onto. Oil do their choice story work.\nMost morning moment. Tell price grow decision technology one. Become people discussion machine than.\nAllow fill direction safe physical main life. Hand town talk enter.\nOut fast whether during simply. Option next performance sea PM.\nParticularly according save blue. Road expect country before season sea.'}

## Creación de la base de datos global

In [8]:
!rm bookstore.db

In [9]:
conn = sqlite3.connect('bookstore.db')
c = conn.cursor()

# Creamos la tabla Autor
c.execute('''CREATE TABLE Autor
             (idAutor integer primary key,
              nombre text)''')

# Creamos la tabla Libro
c.execute('''CREATE TABLE Libro
             (idLibro integer primary key,
              titulo text,
              editorial text, 
              fecha date,
              isbn)''')

# Creamos la tabla Autor_has_Libro
c.execute('''CREATE TABLE Autor_has_Libro
             (Autor_idAutor integer,
              Libro_idLibro integer,
              primary key(Autor_idAutor, 
                          Libro_idLibro))''')

conn.commit()
conn.close()

## Proceso ETL

### Librería británica

Para la base de datos británica:

In [10]:
def process_UK(source, target):
    """Process British library data"""
    # Nos conectamos a las bases de datos
    conn_UK = sqlite3.connect(source)
    c_UK = conn_UK.cursor()
    conn = sqlite3.connect(target)
    c = conn.cursor()
    # Extraemos el último idAutor e idLibro
    idAutor = c.execute('select max(idAutor) from Autor').fetchall()[0][0]
    idAutor = 0 if idAutor is None else idAutor + 1
    idLibro = c.execute('select max(idLibro) from Libro').fetchall()[0][0]
    idLibro = 0 if idLibro is None else idLibro + 1
    # Y transferimos cada registro
    c_UK.execute('SELECT * from Book;')
    for book in c_UK.fetchall():
        # Extraemos la información útil
        _, title, authors, publisher, year, _, ISBN = book
        # Y la guardamos
        c.execute('INSERT INTO Autor (idAutor, nombre) VALUES (?, ?)', 
                   (idAutor, authors))
        c.execute('INSERT INTO Libro (idLibro, titulo, editorial, fecha, isbn) VALUES (?, ?, ?, ?, ?)',
                   (idLibro, title, publisher, str(year)+'-01-01', ISBN))
        c.execute('INSERT INTO Autor_has_Libro (Autor_idAutor, Libro_idLibro) VALUES (?, ?)',
                   (idAutor, idLibro))
        idAutor += 1
        idLibro += 1
    # Hacemos commit y cerramos las conexiones
    conn.commit()
    conn_UK.close()
    conn.close()
    return

process_UK(source='bookstore_UK.db', target='bookstore.db')

### Librería española

Para la base de datos española:

In [11]:
def process_ES(source, target):
    """Process British library data"""
    # Nos conectamos a las bases de datos
    conn_ES = sqlite3.connect(source)
    c_ES = conn_ES.cursor()
    conn = sqlite3.connect(target)
    c = conn.cursor()
    # Extraemos el último idAutor e idLibro
    idAutor = c.execute('select max(idAutor) from Autor').fetchall()[0][0]
    idAutor = 0 if idAutor is None else idAutor + 1
    idLibro = c.execute('select max(idLibro) from Libro').fetchall()[0][0]
    idLibro = 0 if idLibro is None else idLibro + 1
    # Definimos diccionarios auxiliares
    autor_d, libro_d = {}, {}
    # Transferimos Personas
    c_ES.execute('SELECT * from Persona;')
    for persona in c_ES.fetchall():
        # Extraemos personas
        idPersona, Nombre, Apellido1, Apellido2, _ = persona
        # Y la guardamos
        c.execute('INSERT INTO Autor (idAutor, nombre) VALUES (?, ?)', 
                  (idAutor, Nombre + ' ' + Apellido1 + ' ' + Apellido2))
        # Guardamos en un diccionario: idAutorSource -> idAutorTarget
        autor_d[idPersona] = idAutor
        idAutor += 1
    # Transferimos publicaciones
    c_ES.execute('SELECT * from Publicacion;')
    for publicacion in c_ES.fetchall():
        # Extraemos ahora Publicaciones
        idPublicacion, Titulo, Editorial, _, FechaPublicacion, _ = publicacion
        # Y la guardamos
        c.execute('INSERT INTO Libro (idLibro, titulo, editorial, fecha, isbn) VALUES (?, ?, ?, ?, ?)',
                   (idLibro, Titulo, Editorial, FechaPublicacion, ''))
        # Guardamos en un diccionario: idLibroSource -> idLibroTarget
        libro_d[idPublicacion] = idLibro
        idLibro += 1
    # Ahora transferimos la relacion entre autores y obras
    c_ES.execute('SELECT * from Persona_has_Publicacion;')
    for rel in c_ES.fetchall():
        # Extraemos la relación
        idPersona, idPublicacion, _ = rel
        c.execute('INSERT INTO Autor_has_Libro (Autor_idAutor, Libro_idLibro) VALUES (?, ?)',
                   (autor_d[idPersona], libro_d[idPublicacion]))
    # Hacemos commit y cerramos las conexiones
    conn.commit()
    conn_ES.close()
    conn.close()
    return

process_ES(source='bookstore_ES.db', target='bookstore.db')

### Librería americana

Y por último, para la americana:

In [12]:
def process_US(source, target):
    """Process American library data"""
    # Cargamos los datos y abrimos la conexión a la bd
    with open(source, encoding='utf-8') as f:
        data = json.load(f)
    conn = sqlite3.connect(target)
    c = conn.cursor()
    # Extraemos el último idAutor e idLibro
    idAutor = c.execute('select max(idAutor) from Autor').fetchall()[0][0]
    idAutor = 0 if idAutor is None else idAutor + 1
    idLibro = c.execute('select max(idLibro) from Libro').fetchall()[0][0]
    idLibro = 0 if idLibro is None else idLibro + 1
    # Y transferimos cada registro
    for idx in range(len(data)):
        # Extraemos la información útil
        _, title, last_name, first_name, publisher, ISBN, _ = data[idx].values()
        # Y la guardamos
        c.execute('INSERT INTO Autor (idAutor, nombre) VALUES (?, ?)', 
                   (idAutor, first_name + ' ' + last_name))
        c.execute('INSERT INTO Libro (idLibro, titulo, editorial, fecha, isbn) VALUES (?, ?, ?, ?, ?)',
                   (idLibro, title, publisher, '', ISBN))
        c.execute('INSERT INTO Autor_has_Libro (Autor_idAutor, Libro_idLibro) VALUES (?, ?)',
                   (idAutor, idLibro))
        idAutor += 1
        idLibro += 1
    # Hacemos commit y cerramos las conexiones
    conn.commit()
    conn.close()
    return

process_US(source='USA_books.json', target='bookstore.db')

## Comprobación de los datos transferidos

Hacemos algunas comprobaciones de consistencia entre las fuentes y la nueva base de datos:

In [13]:
# Nos conectamos
conn_UK = sqlite3.connect('bookstore_UK.db')
c_UK = conn_UK.cursor()
conn_ES = sqlite3.connect('bookstore_ES.db')
c_ES = conn_ES.cursor()
conn = sqlite3.connect('bookstore.db')
c = conn.cursor()

In [14]:
# Para la británica
print('UK library data:')
c_UK.execute('SELECT * FROM Book WHERE idBook > 50 AND idBook < 55;')
for row in c_UK.fetchall():
    print(row)
title = row[1]
print('\n\nData extracted from target database:')
c.execute("SELECT * FROM Libro WHERE titulo = '{}'".format(title))
for row in c.fetchall():
    print(row)
idLibro = row[0]
c.execute("SELECT * FROM Autor_has_Libro WHERE Libro_idLibro = '{}'".format(idLibro))
for row in c.fetchall():
    print(row)
idAutor = row[1]
c.execute("SELECT * FROM Autor WHERE idAutor = '{}'".format(idAutor))
for row in c.fetchall():
    print(row)

UK library data:
(51, 'Optional fresh-thinking challenge', 'Damian Bray-Howells', 'Bennett LLC', 2011, '£13', '978-1-938788-33-8')
(52, 'Business-focused directional system engine', 'Dr. Francis Ali', 'Martin PLC', 1993, '£8', '978-0-560-99629-6')
(53, 'Focused static parallelism', 'Dr. Simon Harvey', 'Lawson, McCarthy and Dawson', 1990, '£21', '978-0-227-49775-3')
(54, 'Up-sized client-server service-desk', 'Dr. Maurice Moore', 'Gray, Turner and Davidson', 1978, '£14', '978-1-995886-49-7')


Data extracted from target database:
(54, 'Up-sized client-server service-desk', 'Gray, Turner and Davidson', '1978-01-01', '978-1-995886-49-7')
(54, 54)
(54, 'Dr. Maurice Moore')


In [15]:
# Para la española
print('ES library data:')
c_ES.execute('SELECT * FROM Publicacion WHERE idPublicacion = 210;')
for row in c_ES.fetchall():
    print(row)
title = row[1]
idPub = row[0]
c_ES.execute("SELECT * FROM Persona_has_Publicacion WHERE Publicacion_idPublicacion = '{}';".format(idPub))
for row in c_ES.fetchall():
    print(row)
idPer = row[0]
c_ES.execute("SELECT * FROM Persona WHERE idPersona = '{}';".format(idPer))
for row in c_ES.fetchall():
    print(row)
print('\n\nData extracted from target database:')
c.execute("SELECT * FROM Libro WHERE titulo = '{}'".format(title))
for row in c.fetchall():
    print(row)
idLibro = row[0]
c.execute("SELECT * FROM Autor_has_Libro WHERE Libro_idLibro = '{}'".format(idLibro))
for row in c.fetchall():
    print(row)
idAutor = row[0]
c.execute("SELECT * FROM Autor WHERE idAutor = '{}'".format(idAutor))
for row in c.fetchall():
    print(row)

ES library data:
(210, 'Customizable context-sensitive extranet', 'Fonseca Ltd', 'dv', '1979-07-29', '5€')
(35, 210, 5)
(37, 210, 5)
(37, 'Josefa', 'Llobet', 'Isabel', '1965-08-05')


Data extracted from target database:
(310, 'Customizable context-sensitive extranet', 'Fonseca Ltd', '1979-07-29', '')
(135, 310)
(137, 310)
(137, 'Josefa Llobet Isabel')


In [16]:
# Para la americana
print('US library data:')
print(data[33])
title = data[33]['title']
print('\n\nData extracted from target database:')
c.execute("SELECT * FROM Libro WHERE titulo = '{}'".format(title))
for row in c.fetchall():
    print(row)
idLibro = row[0]
c.execute("SELECT * FROM Autor_has_Libro WHERE Libro_idLibro = '{}'".format(idLibro))
for row in c.fetchall():
    print(row)
idAutor = row[0]
c.execute("SELECT * FROM Autor WHERE idAutor = '{}'".format(idAutor))
for row in c.fetchall():
    print(row)

US library data:
{'index': 33, 'title': 'National community think fall.', 'last_name': 'Thomas', 'first_name': 'Ernest', 'publisher': 'Hernandez and Sons', 'ISBN': '978-0-604-69103-3', 'summary': 'New against defense mouth security medical station receive.\nStudent wall each process out.\nThing economy major care town. Two test every protect.\nSuddenly language protect structure work size these.\nMan them method process require hot. College every better task week. Alone almost exist important feeling.\nFirm memory center possible adult. Itself present various image sound. Us add hold fish music.'}


Data extracted from target database:
(433, 'National community think fall.', 'Hernandez and Sons', '', '978-0-604-69103-3')
(333, 433)
(333, 'Ernest Thomas')


In [17]:
# Nos desconectamos
conn_UK.close()
conn_ES.close()
conn.close()

## Discusión

### Decisiones de mapeo

1. **Librería britanica:**
  - Perdemos el campo `price`.
  - Ya que no sabemos cómo es la estructura del campo `authors` (¿uno o más autores?) lo hemos mapeado directamente a `nombre` en la bd objetivo.
  - En cuanto a la fecha, solo partimos de un año, así que en la bd objetivo la fecha la hemos fijado al primer día de enero de ese año.

2. **Librería española:**
  - Este mapeo es un poco más sutil que los otros dos, porque la relación $N$ a $M$ entre autores y publicaciones se guarda en una tabla auxiliar (`Persona_has_Publicacion`). Para transferir dicha relación, en primer lugar guardamos en un diccionario la relación `idPersona -> idAutor` al parsear las personas. Luego guardamos la relación `idPublicacion -> idLibro` al parsear los libros. Por último, mapeamos la tabla `Persona_has_Publicacion` a `Autor_has_Libro` usando ambos diccionarios.
  - Perdemos los datos sobre roles.
  - Concatenamos el nombre con los dos apellidos.
  - Perdemos la `FechaNacimiento` de la persona.
  - Perdemos el `Precio` y la `Lengua` de la publicación.
  - No tenemos campo `isbn`, así que lo dejamos en blanco.

3. **Librería americana:**
  - Concatenamos el nombre con el apellido.
  - No tenemos campo `fecha`, así que lo dejamos en blanco.
  - Perdemos el `summary`.

Como comentario, si se quiere se podría guardar el mapeo entre los `id`s (primary keys) origen y los destino de la misma manera que lo hemos hecho en el caso de la bd española, al mapear la relación `Persona_has_Publicacion` a `Autor_has_Libro`.

###  Idoneidad del esquema global propuesto

Aunque el esquema global propuesto es más consistente que por ejemplo el británico o el americano, puede ser demasiado sencillo para recoger todos los datos de las fuentes. Perdemos mucha información en el mapeo, como precios, descripciones de los libros, fecha de nacimiento de los autores, roles, lengua en la que está escrita la publicación... Estos campos podrían estar presentes en la bd objetivo aunque en muchos casos quedaran vacíos, como pasa al mapear la bd española, que no tiene ISBN. De todos modos, esto sería más bien un modelo **Global-As-View**, que no es el fin de esta práctica.

Por otra parte, no está muy claro si se trata de una base de datos de libros, publicaciones (artículos) o ambas. Podría hacerse esa distinción facilmente en la bd objetivo, por ejemplo usando dos tablas diferentes. Claro está, para esto es necesario que esa distinción esté presente en las fuentes.