In [1]:
!pip install mlflow==1.8.0

Collecting mlflow==1.8.0
  Downloading mlflow-1.8.0-py3-none-any.whl (10.4 MB)
[K     |████████████████████████████████| 10.4 MB 2.3 MB/s eta 0:00:01
[?25hCollecting alembic
  Downloading alembic-1.4.3-py2.py3-none-any.whl (159 kB)
[K     |████████████████████████████████| 159 kB 2.9 MB/s eta 0:00:01
[?25hCollecting querystring-parser
  Downloading querystring_parser-1.2.4-py2.py3-none-any.whl (7.9 kB)
Collecting click>=7.0
  Downloading click-7.1.2-py2.py3-none-any.whl (82 kB)
[K     |████████████████████████████████| 82 kB 463 kB/s eta 0:00:01
[?25hCollecting protobuf>=3.6.0
  Downloading protobuf-3.14.0-cp38-cp38-manylinux1_x86_64.whl (1.0 MB)
[K     |████████████████████████████████| 1.0 MB 3.3 MB/s eta 0:00:01
Collecting docker>=4.0.0
  Downloading docker-4.4.1-py2.py3-none-any.whl (146 kB)
[K     |████████████████████████████████| 146 kB 3.2 MB/s eta 0:00:01
[?25hCollecting sqlparse
  Downloading sqlparse-0.4.1-py3-none-any.whl (42 kB)
[K     |██████████████████████████

In [22]:
import os
# api and object access
os.environ['MLFLOW_TRACKING_URI'] = "http://mlflow.data:5000"
os.environ['MLFLOW_S3_ENDPOINT_URL'] = "http://minio-service.data:9000"
# minio credentials
os.environ['AWS_ACCESS_KEY_ID'] = "minio"
os.environ['AWS_SECRET_ACCESS_KEY'] = "minio123"

In [23]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import mlflow
import mlflow.spark
import pandas as pd


In [24]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DeltaLake-airbnb")\
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
    .config("spark.hadoop.fs.s3a.access.key","minio") \
    .config("spark.hadoop.fs.s3a.secret.key","minio123") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio-service.data.svc.cluster.local:9000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

In [27]:
rawDF = spark.read.csv("s3a://airbnb/sf-airbnb.csv", header="true", inferSchema="true", multiLine="true", escape='"')

In [28]:
display(rawDF)

rawDF.columns

DataFrame[id: int, listing_url: string, scrape_id: bigint, last_scraped: string, name: string, summary: string, space: string, description: string, experiences_offered: string, neighborhood_overview: string, notes: string, transit: string, access: string, interaction: string, house_rules: string, thumbnail_url: string, medium_url: string, picture_url: string, xl_picture_url: string, host_id: int, host_url: string, host_name: string, host_since: string, host_location: string, host_about: string, host_response_time: string, host_response_rate: string, host_acceptance_rate: string, host_is_superhost: string, host_thumbnail_url: string, host_picture_url: string, host_neighbourhood: string, host_listings_count: int, host_total_listings_count: int, host_verifications: string, host_has_profile_pic: string, host_identity_verified: string, street: string, neighbourhood: string, neighbourhood_cleansed: string, neighbourhood_group_cleansed: string, city: string, state: string, zipcode: string, ma

['id',
 'listing_url',
 'scrape_id',
 'last_scraped',
 'name',
 'summary',
 'space',
 'description',
 'experiences_offered',
 'neighborhood_overview',
 'notes',
 'transit',
 'access',
 'interaction',
 'house_rules',
 'thumbnail_url',
 'medium_url',
 'picture_url',
 'xl_picture_url',
 'host_id',
 'host_url',
 'host_name',
 'host_since',
 'host_location',
 'host_about',
 'host_response_time',
 'host_response_rate',
 'host_acceptance_rate',
 'host_is_superhost',
 'host_thumbnail_url',
 'host_picture_url',
 'host_neighbourhood',
 'host_listings_count',
 'host_total_listings_count',
 'host_verifications',
 'host_has_profile_pic',
 'host_identity_verified',
 'street',
 'neighbourhood',
 'neighbourhood_cleansed',
 'neighbourhood_group_cleansed',
 'city',
 'state',
 'zipcode',
 'market',
 'smart_location',
 'country_code',
 'country',
 'latitude',
 'longitude',
 'is_location_exact',
 'property_type',
 'room_type',
 'accommodates',
 'bathrooms',
 'bedrooms',
 'beds',
 'bed_type',
 'amenities',


In [29]:
columnsToKeep = [
  "host_is_superhost",
  "cancellation_policy",
  "instant_bookable",
  "host_total_listings_count",
  "neighbourhood_cleansed",
  "latitude",
  "longitude",
  "property_type",
  "room_type",
  "accommodates",
  "bathrooms",
  "bedrooms",
  "beds",
  "bed_type",
  "minimum_nights",
  "number_of_reviews",
  "review_scores_rating",
  "review_scores_accuracy",
  "review_scores_cleanliness",
  "review_scores_checkin",
  "review_scores_communication",
  "review_scores_location",
  "review_scores_value",
  "price"]

baseDF = rawDF.select(columnsToKeep)
baseDF.cache().count()
display(baseDF)


DataFrame[host_is_superhost: string, cancellation_policy: string, instant_bookable: string, host_total_listings_count: int, neighbourhood_cleansed: string, latitude: double, longitude: double, property_type: string, room_type: string, accommodates: int, bathrooms: double, bedrooms: int, beds: int, bed_type: string, minimum_nights: int, number_of_reviews: int, review_scores_rating: int, review_scores_accuracy: int, review_scores_cleanliness: int, review_scores_checkin: int, review_scores_communication: int, review_scores_location: int, review_scores_value: int, price: string]

In [30]:
from pyspark.sql.functions import col, translate

fixedPriceDF = baseDF.withColumn("price", translate(col("price"), "$,", "").cast("double"))

display(fixedPriceDF)


DataFrame[host_is_superhost: string, cancellation_policy: string, instant_bookable: string, host_total_listings_count: int, neighbourhood_cleansed: string, latitude: double, longitude: double, property_type: string, room_type: string, accommodates: int, bathrooms: double, bedrooms: int, beds: int, bed_type: string, minimum_nights: int, number_of_reviews: int, review_scores_rating: int, review_scores_accuracy: int, review_scores_cleanliness: int, review_scores_checkin: int, review_scores_communication: int, review_scores_location: int, review_scores_value: int, price: double]

In [31]:
display(fixedPriceDF.describe())

DataFrame[summary: string, host_is_superhost: string, cancellation_policy: string, instant_bookable: string, host_total_listings_count: string, neighbourhood_cleansed: string, latitude: string, longitude: string, property_type: string, room_type: string, accommodates: string, bathrooms: string, bedrooms: string, beds: string, bed_type: string, minimum_nights: string, number_of_reviews: string, review_scores_rating: string, review_scores_accuracy: string, review_scores_cleanliness: string, review_scores_checkin: string, review_scores_communication: string, review_scores_location: string, review_scores_value: string, price: string]

In [32]:
noNullsDF = fixedPriceDF.na.drop(subset=["host_is_superhost"])

In [33]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

integerColumns = [x.name for x in baseDF.schema.fields if x.dataType == IntegerType()]
doublesDF = noNullsDF

for c in integerColumns:
  doublesDF = doublesDF.withColumn(c, col(c).cast("double"))

columns = "\n - ".join(integerColumns)
print(f"Columns converted from Integer to Double:\n - {columns}")

Columns converted from Integer to Double:
 - host_total_listings_count
 - accommodates
 - bedrooms
 - beds
 - minimum_nights
 - number_of_reviews
 - review_scores_rating
 - review_scores_accuracy
 - review_scores_cleanliness
 - review_scores_checkin
 - review_scores_communication
 - review_scores_location
 - review_scores_value


In [34]:
from pyspark.sql.functions import when

imputeCols = [
  "bedrooms",
  "bathrooms",
  "beds", 
  "review_scores_rating",
  "review_scores_accuracy",
  "review_scores_cleanliness",
  "review_scores_checkin",
  "review_scores_communication",
  "review_scores_location",
  "review_scores_value"
]

for c in imputeCols:
  doublesDF = doublesDF.withColumn(c + "_na", when(col(c).isNull(), 1.0).otherwise(0.0))

display(doublesDF.describe())

from pyspark.ml.feature import Imputer

imputer = Imputer(strategy="median", inputCols=imputeCols, outputCols=imputeCols)

imputedDF = imputer.fit(doublesDF).transform(doublesDF)

DataFrame[summary: string, host_is_superhost: string, cancellation_policy: string, instant_bookable: string, host_total_listings_count: string, neighbourhood_cleansed: string, latitude: string, longitude: string, property_type: string, room_type: string, accommodates: string, bathrooms: string, bedrooms: string, beds: string, bed_type: string, minimum_nights: string, number_of_reviews: string, review_scores_rating: string, review_scores_accuracy: string, review_scores_cleanliness: string, review_scores_checkin: string, review_scores_communication: string, review_scores_location: string, review_scores_value: string, price: string, bedrooms_na: string, bathrooms_na: string, beds_na: string, review_scores_rating_na: string, review_scores_accuracy_na: string, review_scores_cleanliness_na: string, review_scores_checkin_na: string, review_scores_communication_na: string, review_scores_location_na: string, review_scores_value_na: string]

In [35]:
display(imputedDF.select("price").describe())

DataFrame[summary: string, price: string]

In [36]:
imputedDF.filter(col("price") == 0).count()

1

In [37]:
posPricesDF = imputedDF.filter(col("price") > 0)

In [38]:
display(posPricesDF.select("minimum_nights").describe())
display(posPricesDF
  .groupBy("minimum_nights").count()
  .orderBy(col("count").desc(), col("minimum_nights"))
)


DataFrame[summary: string, minimum_nights: string]

DataFrame[minimum_nights: double, count: bigint]

In [39]:
cleanDF = posPricesDF.filter(col("minimum_nights") <= 365)

display(cleanDF)

DataFrame[host_is_superhost: string, cancellation_policy: string, instant_bookable: string, host_total_listings_count: double, neighbourhood_cleansed: string, latitude: double, longitude: double, property_type: string, room_type: string, accommodates: double, bathrooms: double, bedrooms: double, beds: double, bed_type: string, minimum_nights: double, number_of_reviews: double, review_scores_rating: double, review_scores_accuracy: double, review_scores_cleanliness: double, review_scores_checkin: double, review_scores_communication: double, review_scores_location: double, review_scores_value: double, price: double, bedrooms_na: double, bathrooms_na: double, beds_na: double, review_scores_rating_na: double, review_scores_accuracy_na: double, review_scores_cleanliness_na: double, review_scores_checkin_na: double, review_scores_communication_na: double, review_scores_location_na: double, review_scores_value_na: double]

In [40]:
cleanDF.write.format("delta").save("s3a://airbnb/airbnb-clean")

In [44]:
spark.sql("CREATE database airbnb")

DataFrame[]

In [45]:
spark.sql("use airbnb")

DataFrame[]

In [49]:
spark.sql("CREATE TABLE IF NOT EXISTS airbnb using delta location 's3a://airbnb/airbnb-clean'")

DataFrame[]

In [50]:
spark.sql("show databases").show(10)

+---------+
|namespace|
+---------+
|   airbnb|
|  default|
+---------+



In [53]:
spark.sql("show tables").show(10)

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|  airbnb|   airbnb|      false|
+--------+---------+-----------+



In [54]:
spark.sql("select * from airbnb").show(3)

+-----------------+--------------------+----------------+-------------------------+----------------------+--------+----------+-------------+---------------+------------+---------+--------+----+--------+--------------+-----------------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+-----+-----------+------------+-------+-----------------------+-------------------------+----------------------------+------------------------+------------------------------+-------------------------+----------------------+
|host_is_superhost| cancellation_policy|instant_bookable|host_total_listings_count|neighbourhood_cleansed|latitude| longitude|property_type|      room_type|accommodates|bathrooms|bedrooms|beds|bed_type|minimum_nights|number_of_reviews|review_scores_rating|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scores_communication|review_scores_location|

In [55]:
def mlflow_rf(file_path, num_trees, max_depth):
  with mlflow.start_run(run_name="random-forest") as run:
    # Create train/test split
    spark = SparkSession.builder.appName("App").getOrCreate()
    airbnbDF = spark.read.parquet("s3a://airbnb/airbnb-clean/")
    (trainDF, testDF) = airbnbDF.randomSplit([.8, .2], seed=42)

    # Prepare the StringIndexer and VectorAssembler
    categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]
    indexOutputCols = [x + "Index" for x in categoricalCols]

    stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip")

    numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "price"))]
    assemblerInputs = indexOutputCols + numericCols
    vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
    
    # Log params: Num Trees and Max Depth
    mlflow.log_param("num_trees", num_trees)
    mlflow.log_param("max_depth", max_depth)

    rf = RandomForestRegressor(labelCol="price",
                               maxBins=40,
                               maxDepth=max_depth,
                               numTrees=num_trees,
                               seed=42)

    pipeline = Pipeline(stages=[stringIndexer, vecAssembler, rf])

    # Log model
    pipelineModel = pipeline.fit(trainDF)
    mlflow.spark.log_model(pipelineModel, "model")

    # Log metrics: RMSE and R2
    predDF = pipelineModel.transform(testDF)
    regressionEvaluator = RegressionEvaluator(predictionCol="prediction",
                                            labelCol="price")
    rmse = regressionEvaluator.setMetricName("rmse").evaluate(predDF)
    r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
    mlflow.log_metrics({"rmse": rmse, "r2": r2})

    # Log artifact: Feature Importance Scores
    rfModel = pipelineModel.stages[-1]
    pandasDF = (pd.DataFrame(list(zip(vecAssembler.getInputCols(),
                                    rfModel.featureImportances)),
                          columns=["feature", "importance"])
              .sort_values(by="importance", ascending=False))
    # First write to local filesystem, then tell MLflow where to find that file
    pandasDF.to_csv("/tmp/feature-importance.csv", index=False)
    os.makedirs("data", exist_ok=True)
    mlflow.log_artifact("data", artifact_path="airbnb.ipynb")

In [None]:
if __name__ == "__main__":
  mlflow_rf("./data",2,3)


The git executable must be specified in one of the following ways:
    - be included in your $PATH
    - be set via $GIT_PYTHON_GIT_EXECUTABLE
    - explicitly set via git.refresh()

All git commands will error until this is rectified.

$GIT_PYTHON_REFRESH environment variable. Use one of the following values:
    - error|e|raise|r|2: for a raised exception

Example:
    export GIT_PYTHON_REFRESH=quiet



In [None]:
if __name__ == "__main__":
  mlflow_rf("./data",3,4)