## ALS

The final goal was to attempt to create a basic recommendation system using past reservations. This uses [Alternating Least Squares](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.recommendation.ALS.html), a native recommender in PySpark's ML library. This documentation, along with [this walkthrough](https://github.com/shashwatwork/Building-Recommeder-System-in-PySpark/blob/master/Crafting%20Recommedation%20System%20with%20PySpark.ipynb) guided this implementation.

ALS is fairly straightforward, using three inputs, all integers, to build the model:
- userCol - person record in the transaction, customerzip was used in this model
- itemCol - facilityid, the product identifier
- ratingCol - a score to the item assigned by the user, used a "days stayed" calculation to simulate this value

In [1]:
from pyspark.sql import SparkSession

MAX_MEMORY = "8g"

spark = SparkSession.builder.appName('recreation.gov reservations') \
    .config("spark.executor.memory", MAX_MEMORY) \
    .config("spark.driver.memory", MAX_MEMORY) \
    .getOrCreate()

In [2]:
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType

schemaRating = StructType([
    StructField("item", IntegerType(), True),
    StructField("user", FloatType(), True),
    StructField("rating", FloatType(), True),
])

In [3]:
dfReservations2021 = spark.read.format('csv').schema(schemaRating).csv('REC_ALS.csv', header=True, ignoreTrailingWhiteSpace=True)

In [4]:
dfReservations2021 = dfReservations2021.withColumn('user', dfReservations2021['user'].cast(IntegerType()))

In [5]:
dfReservations2021 = dfReservations2021.withColumn('rating', dfReservations2021['rating'].cast(IntegerType()))

In [6]:
dfReservations2021.filter(dfReservations2021.rating.isNull()).show()

+------+-----+------+
|  item| user|rating|
+------+-----+------+
|250877|96555|  null|
|232490|55424|  null|
|232490|91501|  null|
|232490|33774|  null|
|247571|33176|  null|
|247571|92831|  null|
|247661|16823|  null|
|247661|16823|  null|
|247661|16823|  null|
|247661|16823|  null|
|247661|16823|  null|
|247661|16823|  null|
|247661|16823|  null|
|247661|14830|  null|
|250009|80516|  null|
|258830|15017|  null|
|272266|11771|  null|
|272266|20782|  null|
|258887|49046|  null|
|251833|21401|  null|
+------+-----+------+
only showing top 20 rows



In [7]:
dfReservations2021.show(truncate=False)

+------+-----+------+
|item  |user |rating|
+------+-----+------+
|639772|99709|2     |
|639772|99706|0     |
|639772|99706|2     |
|639772|99709|0     |
|639772|99709|1     |
|639772|84401|4     |
|639772|99709|4     |
|99821 |99709|2     |
|639772|99743|0     |
|639772|99743|4     |
|99799 |99708|12    |
|639772|84401|4     |
|639772|99710|2     |
|99821 |99705|2     |
|99799 |99709|2     |
|99821 |99709|2     |
|99862 |99712|4     |
|639822|99556|0     |
|99799 |99709|2     |
|99766 |99712|3     |
+------+-----+------+
only showing top 20 rows



In [8]:
dfReservations2021 = dfReservations2021.dropna()

In [9]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

#split train and test
trainDF, testDF = dfReservations2021.randomSplit([0.8, 0.2])
trainDF.cache()

# build model
# coldStartStrategy - helped drop nulls
als = ALS(coldStartStrategy="drop", implicitPrefs=True)
model = als.fit(trainDF)

In [10]:
# generate predictions
predictions = model.transform(testDF)

# evalute model using root mean squared evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
evaluator.evaluate(predictions)

47.27241222984656

In [11]:
# create a test user from Silver Spring, MD
test_user = testDF.filter('user == 20901').select('user', 'item', 'rating')
test_user.show()

+-----+-----+------+
| user| item|rating|
+-----+-----+------+
|20901| 1538|     6|
|20901| 1656|     6|
|20901| 1682|     2|
|20901| 1780|     8|
|20901| 1810|    15|
|20901| 2193|     5|
|20901| 5700|    10|
|20901| 5715|    54|
|20901| 5732|     2|
|20901| 5733|     4|
|20901| 5736|     6|
|20901| 5741|     4|
|20901| 5773|     3|
|20901| 5796|     6|
|20901| 5800|     5|
|20901| 6459|     2|
|20901| 6495|     2|
|20901| 6496|    10|
|20901| 7889|     2|
|20901|10330|     4|
+-----+-----+------+
only showing top 20 rows



In [12]:
# get recommendations for test user
recommendations = model.transform(test_user)
recommendations.sort('prediction', ascending=False).show()

+-----+------+------+----------+
| user|  item|rating|prediction|
+-----+------+------+----------+
|20901|  5700|    10| 0.3850173|
|20901|  5715|    54|0.37054873|
|20901|  6496|    10|0.35504553|
|20901|  5796|     6|0.34057924|
|20901|  1810|    15|0.28507954|
|20901|  1682|     2|0.28400856|
|20901|  5741|     4| 0.2742345|
|20901|  5800|     5|0.26223382|
|20901|  6495|     2| 0.2545454|
|20901| 40525|    12|0.25160825|
|20901|  5773|     3|0.24713314|
|20901| 79730|     1| 0.2075089|
|20901|100801|     2|0.19341911|
|20901|  6459|     2|0.19307832|
|20901|  5733|     4|0.19158922|
|20901|  1780|     8|0.18888271|
|20901|  5732|     2|0.18696325|
|20901|  5736|     6|0.18155275|
|20901| 10330|     4|0.17646047|
|20901| 99743|     8|0.16214325|
+-----+------+------+----------+
only showing top 20 rows



In [47]:
recs = model.recommendForUserSubset(test_user, 5)
recs.sort('recommendations', ascending=False).show()

+-----+--------------------+
| user|     recommendations|
+-----+--------------------+
|20901|[{6487, 1.0007242...|
+-----+--------------------+



In [48]:
from pyspark.sql.functions import explode

recs.select(explode('recommendations').alias('recs')).select('recs.item', 'recs.rating').sort('recs.rating', ascending=False).show()

+-----+---------+
| item|   rating|
+-----+---------+
| 6487|1.0007242|
| 5757|0.8668041|
| 5780|0.8436577|
| 6490|0.8140193|
|86126| 0.810674|
+-----+---------+



In [13]:
%pip install requests

Note: you may need to restart the kernel to use updated packages.


In [14]:
import requests
r = requests.get('https://ridb.recreation.gov/api/v1/campsites/19758', headers={'apikey': '6DD62AA61A9A4211BD4414728184BFE4'})
r.json()

[{'CampsiteID': '19758',
  'FacilityID': '232095',
  'CampsiteName': '053',
  'CampsiteType': 'WALK TO',
  'TypeOfUse': 'Overnight',
  'Loop': 'WALK',
  'CampsiteAccessible': False,
  'CampsiteReservable': True,
  'CampsiteLongitude': -79.381987,
  'CampsiteLatitude': 38.8316930000001,
  'CreatedDate': '2014-05-02',
  'LastUpdatedDate': '2020-10-15',
  'ATTRIBUTES': [{'AttributeName': 'Checkin Time',
    'AttributeValue': '2:00 PM'},
   {'AttributeName': 'Min Num of People', 'AttributeValue': '1'},
   {'AttributeName': 'Campfire Allowed', 'AttributeValue': 'Yes'},
   {'AttributeName': 'Checkout Time', 'AttributeValue': '1:00 PM'},
   {'AttributeName': 'IS EQUIPMENT MANDATORY', 'AttributeValue': 'true'},
   {'AttributeName': 'Picnic Table', 'AttributeValue': 'Y'},
   {'AttributeName': 'Site Rating', 'AttributeValue': 'Preferred'},
   {'AttributeName': 'Max Vehicle Length', 'AttributeValue': '0'},
   {'AttributeName': 'Placed on Map', 'AttributeValue': '1'},
   {'AttributeName': 'Fire Pi

In [15]:
from pyspark.ml.tuning import ParamGridBuilder

# set parameters for tuning
paramGrid = ParamGridBuilder()\
    .addGrid(als.maxIter, [5, 10, 15])\
    .addGrid(als.rank, [10, 20, 50])\
    .addGrid(als.regParam, [0.001, 0.01, 0.1])\
    .build()

In [16]:
from pyspark.ml.tuning import CrossValidator

crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator)

# cross validate create best model
cvModel = crossval.fit(trainDF)

In [46]:
# assess prediction model
cvPred = cvModel.bestModel.transform(testDF)
evaluator.evaluate(cvPred)

47.236821305779365

In [17]:
cvModel.bestModel

ALSModel: uid=ALS_04cd5ee26c97, rank=50

In [31]:
cvRecommendations = cvModel.bestModel.recommendForUserSubset(test_user, 5)
cvRecommendations.sort('recommendations', ascending=False).show()

+-----+--------------------+
| user|     recommendations|
+-----+--------------------+
|20901|[{6487, 1.3010163...|
+-----+--------------------+



In [44]:
from pyspark.sql.functions import explode

cvRecommendations.select(explode('recommendations').alias('recs')).select('recs.item', 'recs.rating').sort('recs.rating', ascending=False).show()

+----+---------+
|item|   rating|
+----+---------+
|6487|1.3010163|
|5757|1.2409183|
|6488|1.2215271|
|5780| 1.173392|
|6489| 1.132679|
+----+---------+



In [32]:
r = requests.get('https://ridb.recreation.gov/api/v1/campsites/6487', headers={'apikey': '6DD62AA61A9A4211BD4414728184BFE4'})
r.json()

[{'CampsiteID': '6487',
  'FacilityID': '232507',
  'CampsiteName': 'G2',
  'CampsiteType': 'GROUP TENT ONLY AREA NONELECTRIC',
  'TypeOfUse': 'Overnight',
  'Loop': 'Oceanside Group Sites',
  'CampsiteAccessible': False,
  'CampsiteReservable': True,
  'CampsiteLongitude': -75.151904079,
  'CampsiteLatitude': 38.2041493470001,
  'CreatedDate': '2014-05-02',
  'LastUpdatedDate': '2021-08-23',
  'ATTRIBUTES': [{'AttributeName': 'Site Access', 'AttributeValue': 'Hike-In'},
   {'AttributeName': 'Condition Rating', 'AttributeValue': 'Good'},
   {'AttributeName': 'Location Rating', 'AttributeValue': 'Good'},
   {'AttributeName': 'Pets Allowed', 'AttributeValue': 'Yes'},
   {'AttributeName': 'Checkout Time', 'AttributeValue': '11:00 AM'},
   {'AttributeName': 'Proximity to Water', 'AttributeValue': 'Island'},
   {'AttributeName': 'Min Num of People', 'AttributeValue': '7'},
   {'AttributeName': 'Hike In Distance to Site', 'AttributeValue': '600'},
   {'AttributeName': 'Checkin Time', 'Attrib