# Procesamiento y transferencia de datos


## 1.1 Extracción de los datos
Cargamos los datos desde el archivo. Insepeccionamos en busca de anomalías

In [1]:
import pandas as pd
import numpy as np

In [2]:
data_pt = pd.read_csv("./data_prueba_técnica.csv")
data_pt.head()

Unnamed: 0,id,name,company_id,amount,status,created_at,paid_at
0,48ba4bdbfb56ceebb32f2bd0263e759be942af3d,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e,3.0,voided,2019-03-19,
1,05fc6f5ac66b6ee7e4253aa5d0c2299eb47aaaf4,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e,3.0,pending_payment,2019-05-06,
2,2cdce231c1fc6a2061bfa2f1d978351fe217245d,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e,3.0,voided,2019-02-22,
3,81633ba310a50b673efd469c37139576982901aa,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e,102.61,paid,2019-02-27,2019-02-27
4,6ccfc4c24e788e4bca448df343698782db6b0c0b,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e,184.49,paid,2019-02-05,2019-02-05


In [3]:
data_pt.info()

<class 'pandas.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 7 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   id          9997 non-null   str    
 1   name        9997 non-null   str    
 2   company_id  9996 non-null   str    
 3   amount      10000 non-null  float64
 4   status      10000 non-null  str    
 5   created_at  10000 non-null  str    
 6   paid_at     6009 non-null   str    
dtypes: float64(1), str(6)
memory usage: 547.0 KB


Vemos que existen 10000 entradas, pero existen entradas con el campo id y company_id con valor nulo. Necesitamos saber cuántos registros son y decidir cómo proceder.

In [4]:
missing_data = data_pt.isnull().sum()
missing_data

id               3
name             3
company_id       4
amount           0
status           0
created_at       0
paid_at       3991
dtype: int64

In [5]:
data_pt[data_pt['company_id'].isnull()]

Unnamed: 0,id,name,company_id,amount,status,created_at,paid_at
262,6b77d36f2ebd5e53a76195c6c1678a5027022f9d,MiPasajefy,,3.0,pending_payment,2019-02-01,
2378,28445567bf15d6751367e3828f39c255546cc1e1,MiPasajefy,,3.0,pending_payment,2019-03-23,
2445,654695699dc08392248aedef372f64f0284ecb68,MiPasajefy,,30.8,paid,2019-03-27,2019-03-27
5981,6f6e718b9993ac97ff6e19c61498fb8be82320df,MiPasajefy,,69.55,voided,2019-05-10,


In [6]:
data_pt[data_pt['id'].isnull()]

Unnamed: 0,id,name,company_id,amount,status,created_at,paid_at
272,,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e,66.16,pending_payment,2019-03-14,
9915,,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e,55.71,paid,2019-02-13,2019-02-13
9917,,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e,89.36,paid,2019-04-17,2019-04-17


In [7]:
cells_num = np.prod(data_pt.shape)
missing_cells_num = missing_data.sum()
missing_data_percentage = (missing_cells_num / cells_num) * 100
print(f'Porcentaje de valores nulos en columna alguna columna: {round(missing_data_percentage, 2)}%')

Porcentaje de valores nulos en columna alguna columna: 5.72%


En estos casos es necesario preguntarse por los objetivos de los datos, o posibles anomalías en el proceso de recolección. Para tomar la decisión se debe consultar con el equipo de trabajo.

Como el porcentaje de datos nulos es muy bajo para esta prueba simplemente excluiremos estos datos.

In [8]:
data_pt = data_pt.dropna(subset=['id', 'company_id'])
data_pt.info()

<class 'pandas.DataFrame'>
Index: 9993 entries, 0 to 9999
Data columns (total 7 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   id          9993 non-null   str    
 1   name        9990 non-null   str    
 2   company_id  9993 non-null   str    
 3   amount      9993 non-null   float64
 4   status      9993 non-null   str    
 5   created_at  9993 non-null   str    
 6   paid_at     6006 non-null   str    
dtypes: float64(1), str(6)
memory usage: 624.6 KB


In [9]:
data_pt.describe()

  sqr = _ensure_numeric((avg - values) ** 2)


Unnamed: 0,amount
count,9993.0
mean,inf
std,
min,2.99
25%,31.22
50%,60.62
75%,109.67
max,inf


Detectamos otra anomalía. La media y el máximo son infinitos. Este valor no está considerado dentro de nuestro esquema. Es necesario detectar cuántos registros están involucrados y decidir.

In [10]:
inf_values_num = np.isinf(data_pt['amount']).sum()
print(f'Existe(n) {int(inf_values_num)} valores infinitos')
data_pt[np.isinf(data_pt['amount'])]

Existe(n) 1 valores infinitos


Unnamed: 0,id,name,company_id,amount,status,created_at,paid_at
1752,94f33d3d5a142c7dfcbf247806ad68cf1dc93515,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e,inf,voided,2019-02-11,


Reemplazamos los valores infinitos con NaN para observar cómo cambia el describe

In [11]:
data_pt = data_pt.replace([np.inf, -np.inf], np.nan, inplace=True)
data_pt.describe()

Unnamed: 0,amount
count,9992.0
mean,3.002402e+30
std,3.001201e+32
min,2.99
25%,31.2175
50%,60.615
75%,109.6325
max,2.9999999999999997e+34


Observamos que el valor máximo de la columna 'amount' tiene 34 cifras. El esquema requerido tiene un valor máximo de 16 cifras. Es necesario identificar a los valores que no cumplen con esta característica.

In [12]:
too_big_amount = data_pt[data_pt['amount'] > 9_999_999_999_999_999].shape[0]
print(f'Existen {too_big_amount} registros con amount demasiado grande')

Existen 3 registros con amount demasiado grande


Como son pocos valores, en esta prueba, simplemente los excluímos.

In [13]:
data_pt = data_pt[data_pt['amount'] <= 9_999_999_999_999_999]
data_pt.info()

<class 'pandas.DataFrame'>
Index: 9989 entries, 0 to 9999
Data columns (total 7 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   id          9989 non-null   str    
 1   name        9986 non-null   str    
 2   company_id  9989 non-null   str    
 3   amount      9989 non-null   float64
 4   status      9989 non-null   str    
 5   created_at  9989 non-null   str    
 6   paid_at     6006 non-null   str    
dtypes: float64(1), str(6)
memory usage: 624.3 KB


Guardamos la información extraída de vuelta en formato CSV.

In [14]:
data_pt.to_csv('data_prueba_tecnica_extracted.csv',index=False)

# 1.2 Transformación de los datos
Pasamos al proceso de ajustar el formato de los datos al requerimiento del esquema

In [2]:
import pandas as pd
# Cargar archivo
df = pd.read_csv('data_prueba_tecnica_extracted.csv')

# Renombrar columnas
df = df.rename(columns={
    "name": "company_name"
})

# Conversión de tipos
df["id"] = df["id"].astype(str)
df["company_id"] = df["company_id"].astype(str)

df["amount"] = df["amount"].astype(float).round(2)

df["created_at"] = pd.to_datetime(df["created_at"], errors="coerce")
df["paid_at"] = pd.to_datetime(df["paid_at"], errors="coerce")

# Limitar longitudes
df["id"] = df["id"].str.slice(0, 24)
df["company_id"] = df["company_id"].str.slice(0, 24)
df["company_name"] = df["company_name"].str.slice(0, 130)
df["status"] = df["status"].str.slice(0, 30)
# Reordenar columnas según el esquema
df = df[
    [
        "id",
        "company_name",
        "company_id",
        "amount",
        "status",
        "created_at",
        "paid_at",
    ]
]
df.head()


Unnamed: 0,id,company_name,company_id,amount,status,created_at,paid_at
0,48ba4bdbfb56ceebb32f2bd0,MiPasajefy,cbf1c8b09cd5b549416d49d2,3.0,voided,2019-03-19,NaT
1,05fc6f5ac66b6ee7e4253aa5,MiPasajefy,cbf1c8b09cd5b549416d49d2,3.0,pending_payment,2019-05-06,NaT
2,2cdce231c1fc6a2061bfa2f1,MiPasajefy,cbf1c8b09cd5b549416d49d2,3.0,voided,2019-02-22,NaT
3,81633ba310a50b673efd469c,MiPasajefy,cbf1c8b09cd5b549416d49d2,102.61,paid,2019-02-27,2019-02-27
4,6ccfc4c24e788e4bca448df3,MiPasajefy,cbf1c8b09cd5b549416d49d2,184.49,paid,2019-02-05,2019-02-05


Ejecutamos validaciones para comprobar que se cumplen los requerimientos

In [3]:
# IDs no nulos
assert df["id"].notna().all()
assert df["company_id"].notna().all()

# Amount válido
assert (df["amount"] >= 0).all()
assert (df["amount"] <= 9_999_999_999_999_999).all()

# 1.3 Carga de los datos
En este proceso registramos los datos transformados de forma persistente en la base de datos

In [4]:
from sqlalchemy import create_engine
# Conexión MySQL
engine = create_engine(
    "mysql+mysqlconnector://appuser:apppass@localhost:3306/transacciones_db"
)

# Insertar datos
df.to_sql(
    name="cargo",
    con=engine,
    if_exists="append",
    index=False,
)

9989

El número resultado nos indica el número de columnas afectadas. Podemos comprobarlo directamente con una consulta SQL.

In [5]:
# Ejecutar consulta
from sqlalchemy import text
with engine.connect() as conn:
    result = conn.execute(text("SELECT COUNT(*) FROM cargo;"))
    total = result.scalar()

print(f"Total de registros en Cargo: {total}")

Total de registros en Cargo: 19978


Guardamos la información también en formato CSV para seguir procesándo en la etapa de normalización.

In [6]:
df.to_csv('data_prueba_tecnica_transformed.csv', index=False)