In [None]:
# Descargamos los datos
# =====================

In [1]:
filenames = [
    "drivers.csv",
    "timesheet.csv",
    "truck_event_text_partition.csv",
]

url = "https://raw.githubusercontent.com/jdvelasq/datalabs/master/datasets/drivers/"

for filename in filenames:
    !wget --quiet {url + filename} -P /tmp/

In [6]:
# Preparamos una base de datos MySQL
# ==================================

In [5]:
import mariadb

conn = mariadb.connect(
    user="root",
    password="",
)

cur = conn.cursor()

#
# Creación de la BD
#
cur.execute("DROP DATABASE IF EXISTS demo_db;")
cur.execute("CREATE DATABASE demo_db;")
cur.execute("USE demo_db;")

cur.execute(
    """
    DROP TABLE IF EXISTS drivers;
    """
)

cur.execute(
    """
    CREATE TABLE drivers (
        driverId       INT,
        name           VARCHAR(20),
        ssn            VARCHAR(20),
        location       VARCHAR(40),
        certified      VARCHAR(20),
        wage_plan      VARCHAR(20)
    );
    """
)

cur.execute(
    """
    DROP TABLE IF EXISTS timesheet;
    """
)

cur.execute(
    """
    CREATE TABLE timesheet (
        driverId       INT,
        week           INT,
        hours_logged   INT,
        miles_logged   INT
    );
    """
)

conn.commit()

import pandas as pd
drivers = pd.read_csv('/tmp/drivers.csv')

for i, row in drivers.iterrows():
    sql = "INSERT INTO drivers VALUES (%s,%s,%s,%s,%s,%s)"
    cur.execute(sql, tuple(row))
    conn.commit()

cur.execute("SELECT * FROM drivers LIMIT 5;")
result = cur.fetchall()


#
# Creación y permisos para el usuario remoto
#
cur.execute("CREATE USER 'sqoop'@'%' IDENTIFIED BY 'secret'; ")
cur.execute("GRANT ALL ON demo_db.* TO 'sqoop'@'%';")


conn.close()
result

[(10, 'George Vetticaden', '621011971', '244-4532 Nulla Rd.', 'N', 'miles'),
 (11, 'Jamie Engesser', '262112338', '366-4125 Ac Street', 'N', 'miles'),
 (12, 'Paul Coddin', '198041975', 'Ap #622-957 Risus. Street', 'Y', 'hours'),
 (13, 'Joe Niemiec', '139907145', '2071 Hendrerit. Ave', 'Y', 'hours'),
 (14, 'Adis Cesir', '820812209', 'Ap #810-1228 In St.', 'Y', 'hours')]

In [9]:
# Preparamos un script con un trabajo Sqoop:
# Conexión a MySQL, listado de tablas
# ==========================================

In [10]:
%%writefile list-tables.sh
sqoop list-tables \
    --connect jdbc:mysql://localhost:3306/demo_db \
    --username sqoop \
    --password secret

Writing list-tables.sh


In [12]:
!bash list-tables.sh

timesheet
drivers


In [13]:
# Verificamos que Sqoop accede a los datos usando querys
# ======================================================

In [14]:
%%writefile query.sh
sqoop eval \
    --connect jdbc:mysql://localhost:3306/demo_db \
    --username sqoop \
    --password secret \
    --query "SELECT * FROM drivers LIMIT 3"

Writing query.sh


In [15]:
!bash query.sh

----------------------------------------------------------------------------------------------------------------------------------
| driverId    | name                 | ssn                  | location             | certified            | wage_plan            | 
----------------------------------------------------------------------------------------------------------------------------------
| 10          | George Vetticaden    | 621011971            | 244-4532 Nulla Rd.   | N                    | miles                | 
| 11          | Jamie Engesser       | 262112338            | 366-4125 Ac Street   | N                    | miles                | 
| 12          | Paul Coddin          | 198041975            | Ap #622-957 Risus. Street | Y                    | hours                | 
----------------------------------------------------------------------------------------------------------------------------------


In [16]:
# Importamos (Sqoop) la tabla completa
# Comprobamos que se ha creado el fichero HDFS
# y que tiene los contenidos esperados
# ============================================

In [17]:
%%writefile full_import.sh

sqoop import \
    --connect jdbc:mysql://localhost:3306/demo_db \
    --username sqoop \
    --password secret \
    --table drivers \
    --target-dir /tmp/drivers \
    --m 1

Writing full_import.sh


In [18]:
!bash full_import.sh

Note: /tmp/sqoop-root/compile/6b0f300e12e110262c7c3947e95bdd7d/drivers.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.


In [19]:
!hdfs dfs -ls /tmp/drivers

Found 2 items
-rw-r--r--   1 root supergroup          0 2022-11-17 14:14 /tmp/drivers/_SUCCESS
-rw-r--r--   1 root supergroup       1963 2022-11-17 14:14 /tmp/drivers/part-m-00000


In [20]:
!hdfs dfs -cat /tmp/drivers/part-m-00000 | head -n 10

10,George Vetticaden,621011971,244-4532 Nulla Rd.,N,miles
11,Jamie Engesser,262112338,366-4125 Ac Street,N,miles
12,Paul Coddin,198041975,Ap #622-957 Risus. Street,Y,hours
13,Joe Niemiec,139907145,2071 Hendrerit. Ave,Y,hours
14,Adis Cesir,820812209,Ap #810-1228 In St.,Y,hours
15,Rohit Bakshi,239005227,648-5681 Dui- Rd.,Y,hours
16,Tom McCuch,363303105,P.O. Box 313- 962 Parturient Rd.,Y,hours
17,Eric Mizell,123808238,P.O. Box 579- 2191 Gravida. Street,Y,hours
18,Grant Liu,171010151,Ap #928-3159 Vestibulum Av.,Y,hours
19,Ajay Singh,160005158,592-9430 Nonummy Avenue,Y,hours


In [21]:
# Usamos Sqoop para importar parte de una tabla a HDFS
# ====================================================

In [22]:
%%writefile partial-import.sh

hdfs dfs -rm -r /tmp/drivers

sqoop import \
    --connect jdbc:mysql://localhost:3306/demo_db \
    --username sqoop \
    --password secret \
    --table drivers \
    --target-dir /tmp/drivers/ \
    -m 1 \
    --where "driverId=10"

Writing partial-import.sh


In [23]:
!bash partial-import.sh

Deleted /tmp/drivers
Note: /tmp/sqoop-root/compile/58aaa05d4a7c69aaa9314937740cbdfc/drivers.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.


In [24]:
!hdfs dfs -ls /tmp/drivers/

Found 2 items
-rw-r--r--   1 root supergroup          0 2022-11-17 14:17 /tmp/drivers/_SUCCESS
-rw-r--r--   1 root supergroup         58 2022-11-17 14:17 /tmp/drivers/part-m-00000


In [25]:
!hdfs dfs -cat /tmp/drivers/part-m-00000

10,George Vetticaden,621011971,244-4532 Nulla Rd.,N,miles


In [46]:
# Llevamos el fichero timesheet.csv al HDFS
# Antes hacemos una copia, eliminando la
# primera fila del fichero (etiquetas)
# ========================================
!tail +2 /tmp/timesheet.csv > /tmp/timesheet1.csv
!head -n 5 /tmp/timesheet1.csv








In [36]:
!hdfs dfs -rm /tmp/timesheet.csv
!hdfs dfs -copyFromLocal /tmp/timesheet1.csv /tmp/timesheet.csv
!hdfs dfs -ls /tmp/

rm: `/tmp/timesheet.csv': No such file or directory
Found 3 items
drwxr-xr-x   - root supergroup          0 2022-11-17 14:17 /tmp/drivers
drwxrwx---   - root supergroup          0 2022-11-17 13:58 /tmp/hadoop-yarn
-rw-r--r--   1 root supergroup      26164 2022-11-17 14:30 /tmp/timesheet.csv


In [28]:
# Exportamos datos desde HDFS a MySQL
# Y verificamos que están correctamente
# =====================================

In [29]:
%%writefile export.sh

sqoop export \
    --connect jdbc:mysql://localhost:3306/demo_db \
    --username sqoop \
    --password secret \
    --table timesheet \
    --export-dir /tmp/timesheet.csv

Writing export.sh


In [37]:
!bash export.sh

Note: /tmp/sqoop-root/compile/d0350e188606b0e90304bf2ad0bde78d/timesheet.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.


In [45]:
conn = mariadb.connect(
    user="root",
    password="",
    database="demo_db"
)
cur = conn.cursor()

cur.execute(
    """
    SELECT * FROM timesheet ORDER BY 1 LIMIT 5;
    """
)
result = cur.fetchall()
conn.close()
result

[(10, 1, 70, 3300),
 (10, 2, 70, 3300),
 (10, 5, 70, 3200),
 (10, 3, 60, 2800),
 (10, 4, 70, 3100)]

In [None]:
# Ahora tocaría hacer limpieza
# !rm *.java *.sh *.log
# ============================