In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install fastapi nest-asyncio pyngrok uvicorn

In [None]:
!pip install pyspark

In [None]:
from fastapi import FastAPI
from pyspark.sql import SparkSession
from fastapi.middleware.cors import CORSMiddleware
import nest_asyncio
from pyngrok import ngrok
import uvicorn
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pydantic import BaseModel
from typing import Union
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as ps

spark = SparkSession.builder.getOrCreate()
app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=['*'],
    allow_credentials=True,
    allow_methods=['*'],
    allow_headers=['*'],
)

data_path = '/content/drive/MyDrive/VNU_IS/HK VI/Advanced DA/Final/House_sale_final.csv'
mydata = spark.read.csv(data_path, header = True, inferSchema=True)
#Choose 4-5 models to build ML
models = {1: "model_lr_weight",       #Linear regression 
          2: "model_rd_weight",       #Random Forest
          3: "model_gbt_weight",      #GBTRegressor
          4: "model_dt_weight",       #Decision Tree
          5: "model_gn_weight"}      #GeneralizedLinearRegression

class Item(BaseModel):
    price: Union[float, None]
    bedrooms: int
    bathrooms: int
    sqft_living: int
    sqft_lot: int
    floors: int
    waterfront: int
    view: int
    condition: int
    sqft_above: int
    sqft_basement: int
    yr_built: int
    yr_renovated: int
    city: str

@app.get("/")
async def root():
    return {"message": "Hello class to our FastAPI Final project"}

# https://34ac-34-74-111-174.ngrok.io/getinfo
#Display dataset info
@app.get("/getinfo")
async def getinfo():
    columns = mydata.columns
    num_rows = mydata.count()

    jsons = {'columns': columns,
             'num_rows': num_rows,
             }
    return jsons

# https://34ac-34-74-111-174.ngrok.io/data/?row_id=5&col_id=4
#Display data by row and column id
@app.get("/data/")
async def get_data_by_row_and_col_id(row_id: int, col_id: int):
    column_name = mydata.columns[col_id]
    column_value = mydata.select(column_name).collect()[row_id][0]

    return {'row_id': row_id,
            'col_id': col_id,
            'column_name': column_name,
            'column_value': column_value}

# https://2bb4-34-74-111-174.ngrok.io/data1/?row_id=5
#Display data by row id
@app.get("/data1/")
async def get_data_by_row_id(row_id: int):
    row = mydata.take(row_id + 1)[-1]
    row_dict = row.asDict()
    return row_dict

# https://dc50-34-74-111-174.ngrok.io/train/?item_id=5
@app.get("/train/")                 
async def train(item_id: int):
    #Label encoder
    from pyspark.sql.functions import col
    from pyspark.ml.feature import StringIndexer

    # Create a StringIndexer object and encode "city" column
    stringIndexer = StringIndexer(inputCol= "city", outputCol="city_encoder")
    model_encode = stringIndexer.fit(mydata)
    encode_data = model_encode.transform(mydata)
    #encode_data.select("city", "city_encoder").show()

    #City encoder output
    before_transform = encode_data.select('city').distinct()
    after_transform = encode_data.select('city_encoder').distinct()
    city_encode = encode_data.select("city", "city_encoder").distinct().toPandas()
    #city_encode.head(20)
    #Drop city column
    final_data = encode_data.drop("city")
    #final_data.show(10)  

    # Create a vector assembler to combine all the feature columns into a single vector column
    assembler = VectorAssembler(inputCols=["bedrooms", "bathrooms", "sqft_living", "sqft_lot", "floors", "waterfront", "view", "condition", 
                                           "sqft_above", "sqft_basement", "yr_built", "yr_renovated", "city_encoder"], outputCol="features")

    # Apply the vector assembler to the dataset
    #final_data = assembler.transform(final_data)
    encode_data = assembler.transform(encode_data)
    # Split the dataset into training and test sets
    #(trainingData, testData) = final_data.randomSplit([0.7, 0.3], seed = 2023)
    (trainingData, testData) = encode_data.randomSplit([0.7, 0.3], seed = 2023)
    #print("Training Dataset Count: " + str(training_data.count()))
    #print("Test Dataset Count: " + str(test_data.count()))
      
    if item_id > max(models.keys()):    #Build models
        return {"Error"}
    elif item_id == 1:
        # Create a Linear Regression model
        from pyspark.ml.regression import LinearRegression
        lr = LinearRegression(featuresCol="features", labelCol="price")
        #Saving the Pipeline (weight model)
        lr_pipeline  = Pipeline(stages = [lr])
   
        model = lr_pipeline.fit(trainingData)
        
        model.write().overwrite().save('model_lr_weight')         
    elif item_id == 2:
        #Create a Random Forest Regressor
        from pyspark.ml.regression import RandomForestRegressor
        rd = RandomForestRegressor(featuresCol="features", labelCol="price", maxBins = 32)
        #Saving the Pipeline (weight model)
        rd_pipeline  = Pipeline(stages = [rd])
   
        model = rd_pipeline.fit(trainingData)
        
        model.write().overwrite().save('model_rd_weight')    
    elif item_id == 3:
        # Create GB Boost Regression model
        from pyspark.ml.regression import GBTRegressor
        gbt = GBTRegressor(featuresCol="features", labelCol="price")

        rd_pipeline  = Pipeline(stages = [gbt])
   
        model = rd_pipeline.fit(trainingData)
        
        model.write().overwrite().save('model_gbt_weight')  

    elif item_id == 4:
        # Create Decision Tree Regression model
        from pyspark.ml.regression import DecisionTreeRegressor
        dt = DecisionTreeRegressor(featuresCol="features", labelCol="price")
        
        dt_pipeline  = Pipeline(stages = [dt])
   
        model = dt_pipeline.fit(trainingData)
        
        model.write().overwrite().save('model_dt_weight')  
    elif item_id == 5:
        # Create Generalized Linear Regression model
        from pyspark.ml.regression import GeneralizedLinearRegression
        gn = GeneralizedLinearRegression(featuresCol="features", labelCol="price", maxIter = 10)
        gn_pipeline  = Pipeline(stages = [gn])
   
        model = gn_pipeline.fit(trainingData)
        
        model.write().overwrite().save('model_gn_weight')  
    # Evaluate model performance on the test set
    predictions = model.transform(testData)
    evaluator1 = RegressionEvaluator(predictionCol="prediction", labelCol="price", metricName="rmse")
    rmse = evaluator1.evaluate(predictions)

    evaluator2 = RegressionEvaluator(predictionCol="prediction", labelCol="price", metricName="mae")
    mae = evaluator2.evaluate(predictions)

    evaluator3 = RegressionEvaluator(predictionCol="prediction", labelCol="price", metricName="r2")
    r2 = evaluator3.evaluate(predictions)

    # Print the root mean squared error (RMSE)
    return {"R-squared (R2):": r2,
            'MAE:': mae,
            'RMSE:': rmse,
            "city_encode:" : city_encode}

# https://ec11-34-74-111-174.ngrok.io/predictions/?id=5&bedrooms=3&bathrooms=2&sqft_living=1000&sqft_lot=1500&floors=2&waterfront=2&view=3&condition=4&sqft_above=1250&sqft_basement=250&yr_built=2000&yr_renovated=2015&city_encoder=25            
#Predictions
@app.get("/predictions/")
async def get_predictions(id:int, bedrooms: int, bathrooms: int, sqft_living: int, sqft_lot: int, 
                          floors: int, waterfront: int, view: int, condition: int, sqft_above: int,
                          sqft_basement: int, yr_built: int, yr_renovated: int, city_encoder: int):

    x = [bedrooms, bathrooms, sqft_living, sqft_lot, floors, waterfront, view, condition,
                    sqft_above, sqft_basement, yr_built, yr_renovated, city_encoder]
  
    d_f = ps.DataFrame(columns=["bedrooms", "bathrooms", "sqft_living", "sqft_lot", "floors", "waterfront", "view", "condition",
                                "sqft_above", "sqft_basement", "yr_built", "yr_renovated", "city_encoder"])
  
    d_f.loc[len(d_f)] = x 
  
    d_f = spark.createDataFrame(d_f)

    assembler = VectorAssembler(inputCols=["bedrooms", "bathrooms", "sqft_living", "sqft_lot", "floors", "waterfront", "view", 
                                           "condition", "sqft_above", "sqft_basement", "yr_built", "yr_renovated", "city_encoder"], outputCol="features")
    
    output = assembler.transform(d_f)

    pipelineModel = PipelineModel.load(models[id])
  
    y = pipelineModel.transform(output)

    return{'prediction: ' : y.collect()[0][-1]}

#Add new value into dataset
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# Define the schema for the dataframe
schema = StructType([
    StructField("price", IntegerType(), True),
    StructField("bedrooms", IntegerType(), True),
    StructField("bathrooms", IntegerType(), True),
    StructField("sqft_living", IntegerType(), True),
    StructField("sqft_lot", IntegerType(), True),
    StructField("floors", IntegerType(), True),
    StructField("waterfront", IntegerType(), True),
    StructField("view", IntegerType(), True),
    StructField("condition", IntegerType(), True),
    StructField("sqft_above", IntegerType(), True),
    StructField("sqft_basement", IntegerType(), True),
    StructField("yr_built", IntegerType(), True),
    StructField("yr_renovated", IntegerType(), True),
    StructField("city", StringType(), True)
])

# https://ec11-34-74-111-174.ngrok.io/add_house_data/?price=2000000&bedrooms=2&bathrooms=1&sqft_living=750&sqft_lot=1000&floors=1&waterfront=3&view=3&condition=3&sqft_above=750&sqft_basement=250&yr_built=1999&yr_renovated=2008&city=HaNoi
@app.get("/add_house_data/")       
async def add_house_data(price: int, bedrooms: int, bathrooms: int, sqft_living: int, sqft_lot: int, 
                         floors: int, waterfront: int, view: int, condition: int, 
                         sqft_above: int, sqft_basement: int, yr_built: int, yr_renovated: int, city: str):
    # Create a new row of data as a tuple
     new_row = (price, bedrooms, bathrooms, sqft_living, sqft_lot, floors, waterfront, view, condition,
               sqft_above, sqft_basement, yr_built, yr_renovated, city)
    # Create a new dataframe with the new row
    new_df = spark.createDataFrame([new_row], schema=schema)
    # Append the new dataframe to the original dataframe
    data_path = '/content/drive/MyDrive/VNU_IS/HK VI/Advanced DA/Final/House_sale_final.csv'
    mydata = spark.read.csv(data_path, header = True, inferSchema=True)
    mydata1 = mydata.union(new_df)
    # Return the updated dataframe
    return mydata1.toPandas().to_dict(orient='records')

#Run FastAPI in Colab
ngrok_tunnel = ngrok.connect(8000)
print('Public URL:', ngrok_tunnel.public_url)
nest_asyncio.apply()
uvicorn.run(app, port=8000)

#PROJECT DONE

In [None]:
#Test
x = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

from pyspark.ml.regression import LinearRegression

d_f = pd.DataFrame(columns=["bedrooms", "bathrooms", "sqft_living", "sqft_lot", "floors", "waterfront", "view", "condition", "sqft_above", "sqft_basement", "yr_built", "yr_renovated"])
d_f.loc[len(d_f)] = x 
d_f = spark.createDataFrame(d_f)
assembler = VectorAssembler(inputCols=["bedrooms", "bathrooms", "sqft_living", "sqft_lot", "floors", "waterfront", "view", "condition", "sqft_above", "sqft_basement", "yr_built", "yr_renovated"], outputCol="features")

output = assembler.transform(d_f)

pipelineModel = PipelineModel.load("model_weight")

y = pipelineModel.transform(output)

In [None]:
y.collect()[0][-1]

4421435.433015432