In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, round
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

import random
import mlflow
import mlflow.pyfunc
from mlflow.tracking import MlflowClient
from mlflow.models.signature import infer_signature

database_name: abtest_workshop_sixuan_he


In [0]:
# Create a Spark session
#spark = SparkSession.builder.appName("AirbnbSimulator").getOrCreate()

# Define the number of rows for the simulated dataset
num_rows = 1000

# Generate the simulated data
data = []
for i in range(num_rows):
    listing_id = i + 1
    region = f"Region_{i % 10}"
    review_score = round(random.gauss(80, 5), 2)  # Generating review scores from 10 to 100
    create_date = f"2023-01-{(i % 30) + 1}"  # Creating dates within January 2023
    number_of_rooms = (i % 5) + 1
    
    data.append((listing_id, region, review_score, create_date, number_of_rooms))

# Create a DataFrame from the simulated data
listing_df = spark.createDataFrame(data, ["listing_id", "region", "review_score", "create_date", "number_of_rooms"])

# Show the first few rows of the DataFrame
listing_df.show()

# Stop the Spark session
#spark.stop()


+----------+--------+------------+-----------+---------------+
|listing_id|  region|review_score|create_date|number_of_rooms|
+----------+--------+------------+-----------+---------------+
|         1|Region_0|       86.41|  2023-01-1|              1|
|         2|Region_1|       75.91|  2023-01-2|              2|
|         3|Region_2|       89.08|  2023-01-3|              3|
|         4|Region_3|       76.62|  2023-01-4|              4|
|         5|Region_4|       76.85|  2023-01-5|              5|
|         6|Region_5|       79.71|  2023-01-6|              1|
|         7|Region_6|       86.15|  2023-01-7|              2|
|         8|Region_7|       87.88|  2023-01-8|              3|
|         9|Region_8|       76.85|  2023-01-9|              4|
|        10|Region_9|       79.66| 2023-01-10|              5|
|        11|Region_0|       82.55| 2023-01-11|              1|
|        12|Region_1|       83.24| 2023-01-12|              2|
|        13|Region_2|       76.97| 2023-01-13|         

In [0]:
listing_df.printSchema()

root
 |-- listing_id: long (nullable = true)
 |-- region: string (nullable = true)
 |-- review_score: double (nullable = true)
 |-- create_date: string (nullable = true)
 |-- number_of_rooms: long (nullable = true)



In [0]:
# Define the function to rank listings by region
def rank_listings_by_region(area,spark):
    # Load the listings DataFrame from the specified table
    spark.sql("CREATE OR REPLACE TEMPORARY VIEW listings_df AS SELECT * FROM listing_df")
    # Apply filters and sorting using SQL query
    ranked_listings = spark.sql(f"""
        SELECT listing_id
        FROM listings_df
        WHERE region = '{area}'
        ORDER BY review_score DESC
    """)
    return ranked_listings


# Create a temp table from DataFrame generated for example
listing_df.createOrReplaceTempView("listing_df")
# Call the function to rank listings by region
ranked_listings = rank_listings_by_region("Region_3", spark)
# Show the ranked listings
ranked_listings.show()

+----------+
|listing_id|
+----------+
|       814|
|       404|
|        74|
|       224|
|        64|
|       294|
|       394|
|       924|
|       364|
|       344|
|       914|
|       554|
|       164|
|       884|
|       984|
|       674|
|       624|
|       704|
|       634|
|       734|
+----------+
only showing top 20 rows



In [0]:
# Define your SimplifiedRandomModel class that inherits from mlflow.pyfunc.PythonModel
class RankingModel(mlflow.pyfunc.PythonModel):
    def predict(self, context, model_input):
        # Initialize Spark session
        spark = SparkSession.builder.appName('RankingModel').getOrCreate()
        # Extract region from the model input dictionary (not used in this case)
        region = model_input["region"][0]
        # Call the rank_listings_by_region function
        return rank_listings_by_region(region, spark).toPandas()

# Example usage
if __name__ == "__main__":
    # Create an instance of the RankingModel class
    model = RankingModel()

    # Define model_input as a dictionary (region is not used in this case)
    model_input = {"region": ["Region_3"], 'user_id': [100]}

    # Call the predict method of the model
    result = model.predict(None, model_input)

    # Print the resulting ranked listings
    print(result)

    listing_id
0          814
1          404
2           74
3          224
4           64
..         ...
95         804
96         484
97         384
98         794
99         934

[100 rows x 1 columns]


In [0]:
# Create a sample test DataFrame
import pandas as pd
test_data = pd.DataFrame({
    'region': ['Region_2'],
    'user_id': [100]
})

In [0]:
import mlflow
from mlflow.tracking import MlflowClient
from mlflow.models.signature import infer_signature

# Initialize MLflow client
client = MlflowClient()

# Create an instance of the SimplifiedRandomModel
model = RankingModel()
model.predict(None, test_data)

Unnamed: 0,listing_id
0,383
1,463
2,473
3,3
4,73
...,...
95,943
96,643
97,893
98,933


In [0]:
# Start an MLflow run
with mlflow.start_run() as run:
    # Call the `predict()` method on the instantiated model with the required arguments
    prediction = model.predict(context=None, model_input=test_data)

    # Infer the signature of the predict function
    signature = infer_signature(test_data, prediction)

    # Log the model artifact to MLflow
    mlflow.pyfunc.log_model(
        artifact_path="sql_model",
        python_model=model,
        input_example=test_data,
        signature=signature
    )

    # Register the model to the model registry
    mv = mlflow.register_model(f'runs:/{run.info.run_id}/sql_model', "sql_model")
    client.transition_model_version_stage(f'sql_model', mv.version, "Production", archive_existing_versions=True)

  inputs = _infer_schema(model_input)
  outputs = _infer_schema(model_output) if model_output is not None else None
Registered model 'sql_model' already exists. Creating a new version of this model...
2023/09/06 02:16:51 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: sql_model, version 2
Created version '2' of model 'sql_model'.


In [0]:
# Load the registered model
model_uri = "models:/sql_model/2"  # Update with the correct URI for your model
loaded_model = mlflow.pyfunc.load_model(model_uri)

# Create a new test DataFrame
new_test_data = pd.DataFrame({
    'region': ['Region_1'],
    'user_id': [300]
})

# Make predictions using the loaded model
predictions = loaded_model.predict(new_test_data)

# Display the predictions
print(predictions)

    listing_id
0          882
1          822
2          352
3          872
4          292
..         ...
95         842
96         832
97         622
98          52
99         662

[100 rows x 1 columns]
