# PROJECT

In [None]:
# all lib but also available by function
from sqlalchemy import create_engine, text, update, Table, Column, Integer, MetaData, Float
from sqlalchemy.orm import Session
from dotenv import load_dotenv
from datetime import datetime
from io import StringIO
import pandas as pd
import requests
import mlflow
import boto3
import json
import os


variable needed :

    MLFLOW_TRACKING_URI
    AWS_ACCESS_KEY_ID
    AWS_SECRET_ACCESS_KEY
    POSTGRES_DATABASE
    IMMO_API_TOKEN
    S3_BUCKET


## create and fill database from CSV

In [None]:
from sqlalchemy import create_engine, Table, Column, Integer, MetaData, Float
from dotenv import load_dotenv
import pandas as pd
import os

# Start engine
load_dotenv()
postgres_database = os.environ["POSTGRES_DATABASE"]
engine = create_engine(postgres_database, echo=True)

# create table
metadata = MetaData()
table_name = 'housing_prices'

my_table = Table(
    table_name, metadata,
    Column('id', Integer, primary_key=True),
    Column('square_feet', Float),
    Column('num_bedrooms', Integer),
    Column('num_bathrooms', Integer),
    Column('num_floors', Integer),
    Column('year_built', Integer),
    Column('has_garden', Integer),
    Column('has_pool', Integer),
    Column('garage_size', Integer),
    Column('location_score', Float),
    Column('distance_to_center', Float),
    Column('price', Float),
    Column('price_predict', Float),

)

#run the engine
metadata.create_all(engine)

#import data
data= pd.read_csv('https://fp-private-bucket.s3.eu-west-3.amazonaws.com/housing_prices/real_estate_dataset.csv')
data.columns = data.columns.str.lower()   # sql don't like CASE ....


# fill table with csv
data.to_sql(table_name, con=engine, index=False, if_exists="append")

## new data FROM API to Postgres

In [None]:
from sqlalchemy import create_engine, text
from dotenv import load_dotenv
import pandas as pd
import requests
import json
import os


#start engine
load_dotenv()
postgres_database = os.environ["POSTGRES_DATABASE"]
engine = create_engine(postgres_database, echo=True)
table_name = 'housing_prices'

# synchronyse table with sqlalchemy. If not, he restarts at 1. ####
with engine.connect() as conn:
    fix_sequence_sql = f"""
    SELECT setval(
        pg_get_serial_sequence('{table_name}', 'id'),
        (SELECT MAX(id) FROM {table_name})
    );
    """
    conn.execute(text(fix_sequence_sql))
    conn.commit()

# pull from API
user_token = os.environ["IMMO_API_TOKEN"]
response = requests.get(
    f"https://api.eu-central-1.aws.tinybird.co/v0/pipes/housing_prices_endpoint.json?token={user_token}"
)
data = response.json()

# transform data
df = pd.DataFrame(data['data'])
df.columns = df.columns.str.lower()   # sql don't like CASE ....
df.head()

# push data to neon
df.to_sql(table_name, con=engine, if_exists="append", index=False)

## Predict

In [None]:
from sqlalchemy import create_engine, text
from dotenv import load_dotenv
import pandas as pd
import mlflow
import os


#start engine
load_dotenv()
postgres_database = os.environ["POSTGRES_DATABASE"]
engine = create_engine(postgres_database, echo=True)
table_name = 'housing_prices'


#  search for data to predict
with engine.connect() as conn:
    #### double condition :  null sur le price et null sur le predict.
    #Si price_predict, pas besoin de le re-traiter
    #si price, elles proviennent du dataset de base
    stmt = text(f"""
        SELECT * FROM {table_name} 
        WHERE price IS NULL AND price_predict IS NULL
    """)
    result = conn.execute(stmt)
    df_to_predict = pd.DataFrame(result.fetchall(), columns=result.keys())


# load model from mlflow
mlflow.set_tracking_uri(os.environ["MLFLOW_TRACKING_URI"])
logged_model = 'runs:/50d46c317826461bb4cf503b7414634d/Housing_prices_estimator'   # new model to use with lower case
loaded_model = mlflow.pyfunc.load_model(logged_model)

# feature selection for using model
features = ['square_feet', 'num_bedrooms', 'num_bathrooms', 'num_floors',
            'year_built', 'has_garden', 'has_pool', 'garage_size',
            'location_score', 'distance_to_center']

X = df_to_predict[features]

# Prediction
df_to_predict['price_predict'] = loaded_model.predict(X)

df_to_predict.head()

## Feed prediction on S3

In [None]:
# df_to_predict  NEEDED by runnning PREDICT Cell

from dotenv import load_dotenv
from datetime import datetime
from io import StringIO
import boto3
import os

load_dotenv()

# connection to S3
aws_access_key_id = os.environ["AWS_ACCESS_KEY_ID"]
aws_secret_access_key = os.environ["AWS_SECRET_ACCESS_KEY"]

session = boto3.Session(aws_access_key_id, 
                        aws_secret_access_key)

s3 = boto3.resource('s3')
bucket = s3.Bucket(os.environ["S3_BUCKET"])

# generate filename and folder
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"predict_{timestamp}.csv"
s3_key = f"housing_prices/predictions/{filename}"

# Using memory buffer
csv_buffer = StringIO()    
df_to_predict.to_csv(csv_buffer, index=False)

# Push to S3
bucket.put_object(
    Key=s3_key,
    Body=csv_buffer.getvalue()
)

## Feed prediction on Neon

In [None]:
# df_to_predict  NEEDED by runnning PREDICT Cell

from sqlalchemy import update, Table, MetaData
from sqlalchemy.orm import Session
from dotenv import load_dotenv
import os

# Start engine
load_dotenv()
postgres_database = os.environ["POSTGRES_DATABASE"]
engine = create_engine(postgres_database, echo=True)
table_name = 'housing_prices'

metadata = MetaData()
housing_prices = Table(table_name, metadata, autoload_with=engine)

# Pushing to S3.
with Session(engine) as session:
    for index, row in df_to_predict.iterrows():
        stmt = (
            update(housing_prices)
            .where(housing_prices.c.id == row["id"])
            .values(price_predict=row["price_predict"])
        )
        session.execute(stmt)

    session.commit()