In [2]:
import pandas as pd
from datetime import datetime, timedelta
from sqlalchemy import create_engine, MetaData, Table
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.ensemble import IsolationForest
# from airflow import DAG
# from airflow.operators.python_operator import PythonOperator
# from airflow.operators.dummy_operator import DummyOperator

le = LabelEncoder()
isolation_forest = IsolationForest(random_state=42)

In [3]:
# Definir los argumentos del DAG
default_args = {
    'owner': 'Oscar C',
    'depends_on_past': False,
    'email_on_failure': False,
    'email': ['oecorrechag@gmail.com'],
    'retries': 1,
    'start_date': datetime(2024, 5, 20),
    'retry_delay': timedelta(minutes=1),
}

In [16]:
def drop_table(table_name):
    # Conexión a MySQL (en docker)
    engine = create_engine('mysql+pymysql://root:airflow@mysql:3306/db')
    # engine = create_engine('mysql+pymysql://root:airflow@127.0.0.1:3306/db')
    metadata = MetaData()
    mi_tabla = Table(table_name, metadata)
    mi_tabla.drop(engine)
    ## otra forma de eliminar
    # metadata.drop_all(engine, tables=[mi_tabla])

# drop_table('raw_data')
# drop_table('test_data')
# drop_table('clean_data')
# drop_table('penguin_data')

In [19]:
def input_data():
    # Conexión a la base de datos MySQL
    engine = create_engine('mysql+pymysql://root:airflow@mysql:3306/db')
    # engine = create_engine('mysql+pymysql://root:airflow@127.0.0.1:3306/db')
    # Consulta para cargar los datos desde la tabla en la base de datos
    query = "SELECT * FROM raw_data"
    # Leer los datos desde MySQL
    df = pd.read_sql(query, con=engine)
    # print(df.shape)
    return df.head()

input_data()

Unnamed: 0,brokered_by,status,price,bed,bath,acre_lot,street,city,state,zip_code,house_size,prev_sold_date
0,101640.0,for_sale,289900.0,4.0,2.0,0.38,1758218.0,East Windsor,Connecticut,6016.0,1617.0,1999-09-30
1,107951.0,for_sale,299900.0,3.0,2.0,0.87,1336295.0,Vernon,Connecticut,6066.0,1850.0,2015-11-09
2,80935.0,for_sale,299000.0,3.0,2.0,0.35,920059.0,North Canaan,Connecticut,6018.0,1620.0,2011-08-23
3,33714.0,for_sale,221000.0,4.0,2.0,0.32,731702.0,Windsor Locks,Connecticut,6096.0,1735.0,2014-03-03
4,29997.0,for_sale,175000.0,3.0,2.0,0.19,1382878.0,Winchester,Connecticut,6098.0,2005.0,2007-07-19


In [5]:
def raw_data():

    # Conexión a MySQL (en docker)
    engine = create_engine('mysql+pymysql://root:airflow@mysql:3306/db')
    # engine = create_engine('mysql+pymysql://root:airflow@127.0.0.1:3306/db')


    # load data
    # df = pd.read_csv('data/realtor-data.csv', sep = ',', decimal = '.', header = 0, encoding = 'utf-8')
    df = pd.read_parquet('data/df_g1_b0.parquet.gzip')
    df.columns = ['brokered_by','status','price','bed','bath','acre_lot','street','city','state',
                  'zip_code','house_size','prev_sold_date']
    print(df.shape)

    # Guardar los datos en MySQL
    df.to_sql('raw_data', con=engine, if_exists='append', index=False)

    
    print("Datos raw_data guardados en MySQL") 

    return df.head()

raw_data()

(73784, 12)
Datos raw_data guardados en MySQL


Unnamed: 0,brokered_by,status,price,bed,bath,acre_lot,street,city,state,zip_code,house_size,prev_sold_date
3907,101640.0,for_sale,289900.0,4.0,2.0,0.38,1758218.0,East Windsor,Connecticut,6016.0,1617.0,1999-09-30
3914,107951.0,for_sale,299900.0,3.0,2.0,0.87,1336295.0,Vernon,Connecticut,6066.0,1850.0,2015-11-09
4285,80935.0,for_sale,299000.0,3.0,2.0,0.35,920059.0,North Canaan,Connecticut,6018.0,1620.0,2011-08-23
4294,33714.0,for_sale,221000.0,4.0,2.0,0.32,731702.0,Windsor Locks,Connecticut,6096.0,1735.0,2014-03-03
4311,29997.0,for_sale,175000.0,3.0,2.0,0.19,1382878.0,Winchester,Connecticut,6098.0,2005.0,2007-07-19


In [6]:
def raw_data2():

    # Conexión a MySQL (en docker)
    engine = create_engine('mysql+pymysql://root:airflow@mysql:3306/db')
    # engine = create_engine('mysql+pymysql://root:airflow@127.0.0.1:3306/db')


    # load data
    # df = pd.read_csv('data/realtor-data.csv', sep = ',', decimal = '.', header = 0, encoding = 'utf-8')
    df = pd.read_parquet('data/df_g1_b1.parquet.gzip')
    df.columns = ['brokered_by','status','price','bed','bath','acre_lot','street','city','state',
                  'zip_code','house_size','prev_sold_date']
    print(df.shape)

    # Guardar los datos en MySQL
    df.to_sql('test_data', con=engine, if_exists='append', index=False)
    

    print("Datos test_data guardados en MySQL") 

    return df.head()

raw_data2()

(94551, 12)
Datos test_data guardados en MySQL


Unnamed: 0,brokered_by,status,price,bed,bath,acre_lot,street,city,state,zip_code,house_size,prev_sold_date
502,92147.0,for_sale,110000.0,7.0,3.0,0.09,1842706.0,Dorado,Puerto Rico,949.0,1192.0,2019-06-28
3903,91020.0,for_sale,215000.0,2.0,1.0,0.91,1062364.0,East Windsor,Connecticut,6016.0,960.0,2012-06-06
3905,10585.0,for_sale,144900.0,2.0,1.0,0.36,765673.0,Vernon,Connecticut,6066.0,860.0,2016-09-02
3913,22611.0,for_sale,239900.0,3.0,1.0,1.43,1244868.0,East Windsor,Connecticut,6016.0,1351.0,2009-06-05
3925,75650.0,for_sale,249900.0,3.0,1.0,0.23,114997.0,Enfield,Connecticut,6082.0,1220.0,2003-11-14


In [17]:
def clean_data():
    # Conexión a la base de datos MySQL
    engine = create_engine('mysql+pymysql://root:airflow@mysql:3306/db')
    # engine = create_engine('mysql+pymysql://root:airflow@127.0.0.1:3306/db')
    # Consulta para cargar los datos desde la tabla en la base de datos
    query = "SELECT * FROM raw_data"
    # Leer los datos desde MySQL
    df = pd.read_sql(query, con=engine)


    # Selecciono como prueba solo las variables numericas
    df = df.loc[:,['price','bed','bath','acre_lot','state','house_size','prev_sold_date']]
    # Eliminar los registros con faltantes
    df = df.dropna()
    # limpieza

    df["año"] = pd.to_datetime(df['prev_sold_date']).dt.year
    df["decada"] = (df["año"] // 10) * 10


    df = df[df['bed'] < 7]
    df = df[df['bath'] < 5]
    df = df[df['price'] < 300000]
    df = df[df['acre_lot'] <= 0.0894211]
    df = df[df['house_size'] < 3500]
    df = df[df['decada'] >= 1980]

    encoded_labels = le.fit_transform(df['state'])
    df['states'] = encoded_labels

    isolation_forest.fit(df.loc[:,['price', 'bed', 'bath', 'acre_lot', 'states', 'house_size']])
    anomalies = isolation_forest.predict(df.loc[:,['price', 'bed', 'bath', 'acre_lot', 'states', 'house_size']])
    df = df[anomalies == 1]
    
    df = df.loc[:,['price','bed','bath','acre_lot','states','house_size']]

    # Guardar los datos en MySQL
    df.to_sql('clean_data', con=engine, if_exists='append', index=False)

    print("Datos limpios guardados en MySQL") 

    return df.head()

clean_data() 

Datos limpios guardados en MySQL


Unnamed: 0,price,bed,bath,acre_lot,states,house_size
225,255000.0,3.0,2.0,0.08,6,1836.0
269,109000.0,4.0,3.0,0.05,31,1890.0
329,208000.0,3.0,2.0,0.06,6,1580.0
443,170000.0,4.0,2.0,0.05,38,2331.0
445,279900.0,4.0,2.0,0.02,38,1711.0


In [14]:
def load_and_slip():
    # Conexión a la base de datos MySQL
    engine = create_engine('mysql+pymysql://root:airflow@mysql:3306/db')
    # engine = create_engine('mysql+pymysql://root:airflow@127.0.0.1:3306/db')
    # Consulta para cargar los datos desde la tabla en la base de datos
    query = "SELECT * FROM clean_data"
    # Leer los datos desde MySQL
    df = pd.read_sql(query, con=engine)
    # Convertir las columnas 'Sex' y 'Species' a tipo categórico
    # df[['Wilderness_Area', 'Soil_Type','Cover_Type']] = df[['Wilderness_Area', 'Soil_Type','Cover_Type']].astype('category')
    # Dividir los datos en características (X) y etiquetas (y)
    X = df.drop(columns='price')
    y = df['price']
    # Dividir los datos en conjuntos de entrenamiento y prueba
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) 
    
    print("Datos limpios cargados desde MySQL")  

    return X_train, X_test, y_train, y_test

load_and_slip()

Datos limpios cargados desde MySQL


(      bed  bath  acre_lot  states  house_size prev_sold_date
 1402  3.0   2.0      0.06      37      1508.0     2021-05-03
 3764  3.0   3.0      0.05      16      1536.0     2016-04-06
 3878  3.0   2.0      0.01      18      1776.0     2015-05-01
 3514  4.0   3.0      0.04      13      1790.0     1996-07-15
 1623  2.0   2.0      0.04      20      1908.0     2014-09-15
 ...   ...   ...       ...     ...         ...            ...
 3444  3.0   3.0      0.05      13      1653.0     2007-07-06
 466   3.0   2.0      0.08      29      1758.0     1990-02-20
 3092  3.0   3.0      0.06      13      1732.0     2019-12-17
 3772  3.0   3.0      0.05      24      1865.0     2009-11-02
 860   3.0   2.0      0.07      37      1810.0     2021-06-11
 
 [3399 rows x 6 columns],
       bed  bath  acre_lot  states  house_size prev_sold_date
 2269  3.0   3.0      0.04       9      1564.0     2007-07-05
 1192  3.0   2.0      0.06      20      1790.0     2017-03-31
 2623  3.0   2.0      0.07      17      17

In [15]:
print('ok_')

ok_
