# **Libraries**

In [1]:
!pip install seaborn matplotlib --quiet # This lines are only used in the jupyter, that's why we do not include it in the requirements.txt

import seaborn as sns
import matplotlib.pyplot as plt

[0m

In [15]:
import os

import pandas as pd
import numpy as np
import sqlalchemy
import pymysql
import mlflow

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import make_column_transformer # for dummies
from sklearn.pipeline import Pipeline # creating a pipeline
from sklearn.model_selection import GridSearchCV

from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import RandomForestRegressor

# **Reading Data**

Conection details

In [13]:
DB_HOST = "mysql" # "10.43.101.158" # "localhost" "10.43.101.158"  # Using INTERNET!
DB_USER = "root"
DB_PASSWORD = "airflow" 
DB_NAME = "project_4"
PORT= 3306

Read table from MySQL

In [14]:
connection = pymysql.connect(host=DB_HOST,
                             user=DB_USER,
                             password=DB_PASSWORD,
                             db=DB_NAME,
                             cursorclass=pymysql.cursors.DictCursor)  # Using DictCursos to obtain results as dictionaries
try:
    with connection.cursor() as cursor:
        # Query the database
        cursor.execute("SELECT * FROM project_4.raw_data;")
        result = cursor.fetchall()
    # Convert into a pd.DataFrame
    df = pd.DataFrame(result)
except Exception as e:
    # If error returns the exact error
    raise HTTPException(status_code=500, detail=str(e))
finally:
    connection.close()
# Show df
print(f"The dataframe has {len(df)} rows")
df.head()

The dataframe has 73784 rows


Unnamed: 0,brokered_by,status,price,bed,bath,acre_lot,street,city,state,zip_code,house_size,prev_sold_date
0,101640.0,for_sale,289900.0,4.0,2.0,0.38,1758218.0,East Windsor,Connecticut,6016.0,1617.0,1999-09-30
1,107951.0,for_sale,299900.0,3.0,2.0,0.87,1336295.0,Vernon,Connecticut,6066.0,1850.0,2015-11-09
2,80935.0,for_sale,299000.0,3.0,2.0,0.35,920059.0,North Canaan,Connecticut,6018.0,1620.0,2011-08-23
3,33714.0,for_sale,221000.0,4.0,2.0,0.32,731702.0,Windsor Locks,Connecticut,6096.0,1735.0,2014-03-03
4,29997.0,for_sale,175000.0,3.0,2.0,0.19,1382878.0,Winchester,Connecticut,6098.0,2005.0,2007-07-19


# **Data Understanding**

In [8]:
df.dtypes

brokered_by       float64
status             object
price             float64
bed               float64
bath              float64
acre_lot          float64
street            float64
city               object
state              object
zip_code          float64
house_size        float64
prev_sold_date     object
dtype: object

In [9]:
df["price"].unique()

array([289900., 299900., 299000., ..., 212400., 189200., 241750.])

In [10]:
df["status"].unique()

array(['for_sale'], dtype=object)

In [11]:
df["city"].unique()

array(['East Windsor', 'Vernon', 'North Canaan', ..., 'Soldotna',
       'Fairbanks', 'Ketchikan'], dtype=object)

In [12]:
df["state"].unique()

array(['Connecticut', 'Vermont', 'New Hampshire', 'New York',
       'Rhode Island', 'Maine', 'New Jersey', 'Pennsylvania', 'Delaware',
       'West Virginia', 'Ohio', 'Maryland', 'Virginia',
       'District of Columbia', 'North Carolina', 'Kentucky', 'Tennessee',
       'South Carolina', 'Georgia', 'Alabama', 'Florida', 'Mississippi',
       'Arkansas', 'Missouri', 'Louisiana', 'Indiana', 'Illinois',
       'Michigan', 'Wisconsin', 'Iowa', 'Minnesota', 'Nebraska',
       'South Dakota', 'North Dakota', 'Montana', 'Idaho', 'Kansas',
       'Oklahoma', 'Colorado', 'Wyoming', 'Texas', 'New Mexico', 'Utah',
       'Washington', 'Oregon', 'Arizona', 'California', 'Nevada',
       'Hawaii', 'Alaska'], dtype=object)

Reviewing if the is any null value

In [29]:
# Putting "" as null 
df.replace("", np.nan, inplace=True)
df.replace("?", np.nan, inplace=True)

In [30]:
pd.DataFrame(df.isna().sum()) / len(df)

Unnamed: 0,0
brokered_by,0.0
status,0.0
price,0.0
bed,0.0
bath,0.0
acre_lot,0.0
street,0.0
city,0.0
state,0.0
zip_code,0.0


Reviewing duplicated values

In [16]:
df.duplicated().sum()

0

# **Data Procesing**

## _Removing not needed files_

Converting 10 first columns and the result variable into numeric

In [15]:
unique_columns_to_use = ["price", "bed", "bath", "acre_lot", "street", "city", "state", "house_size"]
df = df[unique_columns_to_use]

In [16]:
df.dtypes

price         float64
bed           float64
bath          float64
acre_lot      float64
street        float64
city           object
state          object
house_size    float64
dtype: object

## _Delete Nulls_

In [17]:
# Putting "" as null 
df.replace("", np.nan, inplace=True)
df.replace("?", np.nan, inplace=True)
df = df.dropna()

## _Split data into train and test_

In [18]:
# Division between y and the rest of variables

y = df["price"]
X = df.drop(columns="price")

# Split train and test (80% train, 20% test)
X_train, X_test_val, y_train, y_test_val = train_test_split(X, y, test_size=0.20, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_test_val, y_test_val, test_size=0.50, random_state=42)

## __Save Data to Clean Data__

In [21]:
DB_HOST = "mysql" # "10.43.101.158" # "localhost" "10.43.101.158"  # Using INTERNET!
DB_USER = "root"
DB_PASSWORD = "airflow" 
DB_NAME = "project_4"
PORT= 3306

In [22]:
# Creating final DataFrame to Upload
df_train_final = pd.concat([X_train, pd.DataFrame(y_train)], axis=1)
df_val_final = pd.concat([X_val, pd.DataFrame(y_val)], axis=1)
df_test_final = pd.concat([X_test, pd.DataFrame(y_test)], axis=1)

# Connect to MySQL
engine = sqlalchemy.create_engine(f'mysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:3306/{DB_NAME}')

# Save data, if exits append into the current table (TRAIN)
df_train_final.to_sql('clean_data_train', con=engine, if_exists='replace', index=False)
df_val_final.to_sql('clean_data_val', con=engine, if_exists='replace', index=False)
df_test_final.to_sql('clean_data_test', con=engine, if_exists='replace', index=False)
print("Saved into MySQL!")

Saved into MySQL!


# **Read Data from Clean Data**

Reading data from **train**

In [2]:
DB_HOST = "mysql" # "10.43.101.158" # "localhost" "10.43.101.158"  # Using INTERNET!
DB_USER = "root"
DB_PASSWORD = "airflow" 
DB_NAME = "project_4"
PORT= 3306

In [3]:
connection = pymysql.connect(host=DB_HOST,
                             user=DB_USER,
                             password=DB_PASSWORD,
                             db=DB_NAME,
                             port=PORT,
                             cursorclass=pymysql.cursors.DictCursor)  # Using DictCursos to obtain results as dictionaries
try:
    with connection.cursor() as cursor:
        # Query the database
        cursor.execute("SELECT * FROM project_4.clean_data_train;")
        result = cursor.fetchall()
    # Convert into a pd.DataFrame
    df_train_clean = pd.DataFrame(result)
except Exception as e:
    # If error returns the exact error
    raise HTTPException(status_code=500, detail=str(e))
finally:
    connection.close()
# Show df
print(f"The dataframe has {len(df_train_clean)} rows")
df_train_clean.head()

The dataframe has 59027 rows


Unnamed: 0,bed,bath,acre_lot,street,city,state,house_size,price
0,5.0,5.0,12.83,839259.0,Elizabeth,Pennsylvania,1564.0,219900.0
1,3.0,2.0,0.13,836897.0,Millersburg,Pennsylvania,2502.0,239900.0
2,3.0,2.0,0.33,186361.0,Erwin,Tennessee,1625.0,218000.0
3,3.0,3.0,0.12,142885.0,Macon,Georgia,1566.0,219990.0
4,3.0,2.0,0.55,1509210.0,Saint Augustine,Florida,1620.0,240000.0


Reading data from **validation**

In [4]:
connection = pymysql.connect(host=DB_HOST,
                             user=DB_USER,
                             password=DB_PASSWORD,
                             db=DB_NAME,
                             port=PORT,
                             cursorclass=pymysql.cursors.DictCursor)  # Using DictCursos to obtain results as dictionaries
try:
    with connection.cursor() as cursor:
        # Query the database
        cursor.execute("SELECT * FROM project_4.clean_data_val;")
        result = cursor.fetchall()
    # Convert into a pd.DataFrame
    df_validation_clean = pd.DataFrame(result)
except Exception as e:
    # If error returns the exact error
    raise HTTPException(status_code=500, detail=str(e))
finally:
    connection.close()
# Show df
print(f"The dataframe has {len(df_validation_clean)} rows")
df_validation_clean.head()

The dataframe has 7378 rows


Unnamed: 0,bed,bath,acre_lot,street,city,state,house_size,price
0,3.0,2.0,0.75,730015.0,Guthrie,Kentucky,1600.0,199900.0
1,3.0,2.0,0.46,167629.0,Granite City,Illinois,2092.0,165000.0
2,3.0,2.0,0.35,1398953.0,Alexandria,Louisiana,1906.0,240000.0
3,5.0,3.0,0.14,181978.0,Cleveland,Ohio,2557.0,80000.0
4,3.0,2.0,0.56,361606.0,Carrollton,Georgia,1886.0,299900.0


Reading data from **test**

In [5]:
connection = pymysql.connect(host=DB_HOST,
                             user=DB_USER,
                             password=DB_PASSWORD,
                             db=DB_NAME,
                             port=PORT,
                             cursorclass=pymysql.cursors.DictCursor)  # Using DictCursos to obtain results as dictionaries
try:
    with connection.cursor() as cursor:
        # Query the database
        cursor.execute("SELECT * FROM project_4.clean_data_test;")
        result = cursor.fetchall()
    # Convert into a pd.DataFrame
    df_test_clean = pd.DataFrame(result)
except Exception as e:
    # If error returns the exact error
    raise HTTPException(status_code=500, detail=str(e))
finally:
    connection.close()
# Show df
print(f"The dataframe has {len(df_test_clean)} rows")
df_test_clean.head()

The dataframe has 7379 rows


Unnamed: 0,bed,bath,acre_lot,street,city,state,house_size,price
0,3.0,1.0,0.46,1218523.0,Graysville,Alabama,1519.0,165000.0
1,4.0,1.0,0.2,649858.0,New Hampton,Iowa,1718.0,49900.0
2,3.0,2.0,0.19,1777223.0,Indianapolis,Indiana,1826.0,265000.0
3,3.0,2.0,0.3,623098.0,New Bern,North Carolina,1520.0,250000.0
4,3.0,2.0,0.24,848478.0,Ocala,Florida,2060.0,267000.0


# **Modeling with MLFlow**

In [11]:
y_train = df_train_clean['price']
X_train = df_train_clean.drop(columns = 'price')

y_val = df_validation_clean['price']
X_val = df_validation_clean.drop(columns = 'price')

y_test = df_test_clean['price']
X_test = df_test_clean.drop(columns = 'price')

## _Dummy_

Dummy variables for categorical features

In [12]:
column_trans = make_column_transformer((OneHotEncoder(handle_unknown='ignore'),
                                        ["city", "state"]),
                                      remainder='passthrough') # pass all the numeric values through the pipeline without any changes.

column_trans

## _Standarization_

In [23]:
pipe = Pipeline(steps=[
    ("column_trans", column_trans),
    ("scaler", StandardScaler(with_mean=False)),
    ("RandomForestRegressor", RandomForestRegressor())
])

pipe

Hyperparameters

In [24]:
param_grid =  dict()
param_grid["RandomForestRegressor__max_depth"] = [1,2,3,10] 
param_grid['RandomForestRegressor__n_estimators'] = [10,11]

search = GridSearchCV(pipe, param_grid, cv=10, n_jobs=2)
search

In [25]:
# YOU MUST TAKE THE API NOT THE WEBAPP IN MY CASE IT WAS "http://0.0.0.0:8083" BUT API "9000"
# WE ARE ALSO TAKING THE NETWORK VALUE NEVERTHELESS YOU CAN USE THE CONTEINER NAME (IN OUR CASE S3)

os.environ['MLFLOW_S3_ENDPOINT_URL'] = "http://s3:8084" 
os.environ['AWS_ACCESS_KEY_ID'] = 'admin'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'supersecret'

# connect to mlflow
mlflow.set_tracking_uri("http://mlflow:8087") # "http://0.0.0.0:8087")
mlflow.set_experiment("mlflow_project_4")

mlflow.sklearn.autolog(log_model_signatures=True, log_input_examples=True, registered_model_name="modelo1")

with mlflow.start_run(run_name="autolog_pipe_model_reg") as run:
    search.fit(X_train, y_train)
print("fin")

Successfully registered model 'modelo1'.
2024/05/17 02:39:52 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: modelo1, version 1
Created version '1' of model 'modelo1'.
2024/05/17 02:39:53 INFO mlflow.sklearn.utils: Logging the 5 best runs, 3 runs will be omitted.


fin


In [18]:
print('tracking uri:', mlflow.get_tracking_uri())
print('artifact uri:', mlflow.get_artifact_uri())

tracking uri: http://mlflow:8087
artifact uri: s3://project2bucket/1/c86c4bc824d54852a28907acd1b89bfc/artifacts


In [31]:
# YOU MUST TAKE THE API NOT THE WEBAPP IN MY CASE IT WAS "http://0.0.0.0:8083" BUT API "9000"
# WE ARE ALSO TAKING THE NETWORK VALUE NEVERTHELESS YOU CAN USE THE CONTEINER NAME (IN OUR CASE S3)

os.environ['MLFLOW_S3_ENDPOINT_URL'] = "http://10.56.1.22:9000" # YOU MUST TAKE THE API NOT THE WEBAPP IN MY CASE IT WAS "http://0.0.0.0:8083" BUT API "9000"
os.environ['AWS_ACCESS_KEY_ID'] = 'admin'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'supersecret'

# connect to mlflow
mlflow.set_tracking_uri("http://mlflow:8087") # "http://0.0.0.0:8087")

model_name = "modelo1"

# logged_model = 'runs:/71428bebed2b4feb9635714ea3cdb562/model'
model_production_uri = "models:/{model_name}/production".format(model_name=model_name)

print(model_production_uri)

# Load model as a PyFuncModel.
loaded_model = mlflow.pyfunc.load_model(model_uri=model_production_uri)
loaded_model
example_test = X_test.iloc[0:2]#.to_frame().T
#print(example_test)
print('real: ', y_test.iloc[0:2])
print('prediction: ', loaded_model.predict(example_test))

a = loaded_model.predict(example_test)



models:/modelo1/production
real:  2042    6
157     1
Name: 12, dtype: int64
prediction:  [6 1]


In [20]:
X_test.iloc[0:2]

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11
2042,3448,311,25,127,1,1518,146,214,204,1869,Neota,C8772
157,2693,68,14,256,76,2391,234,210,105,883,Commanche,C2703


In [35]:
len(a)

2

In [39]:
final_response = [a[i].item() for i, _ in enumerate(a)]
final_response

[6, 1]