In [0]:
#  import mlflow and autolog machine learning runs

import mlflow

mlflow.pyspark.ml.autolog()

In [0]:
dbutils.fs.ls(("/FileStore/tables/"))

[FileInfo(path='dbfs:/FileStore/tables/BDTT_clinicaltrial_2023_csv.zip', name='BDTT_clinicaltrial_2023_csv.zip', size=57814956, modificationTime=1708930294000),
 FileInfo(path='dbfs:/FileStore/tables/Occupancy_Detection_Data.csv', name='Occupancy_Detection_Data.csv', size=50968, modificationTime=1709126923000),
 FileInfo(path='dbfs:/FileStore/tables/account-models/', name='account-models/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/account-models_Home/', name='account-models_Home/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/accounts/', name='accounts/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/accounts.zip', name='accounts.zip', size=5306059, modificationTime=1706713958000),
 FileInfo(path='dbfs:/FileStore/tables/activations/', name='activations/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/activations.zip', name='activations.zip', size=8400826, modificationTime=1706706652000),
 File

In [0]:
steamDF = spark.read.csv("/FileStore/tables/steam_200k.csv", inferSchema="true")

In [1]:
steamDF

NameError: name 'steamDF' is not defined

In [0]:
steamDF.display()



_c0,_c1,_c2,_c3
151603712,The Elder Scrolls V Skyrim,purchase,1.0
151603712,The Elder Scrolls V Skyrim,play,273.0
151603712,Fallout 4,purchase,1.0
151603712,Fallout 4,play,87.0
151603712,Spore,purchase,1.0
151603712,Spore,play,14.9
151603712,Fallout New Vegas,purchase,1.0
151603712,Fallout New Vegas,play,12.1
151603712,Left 4 Dead 2,purchase,1.0
151603712,Left 4 Dead 2,play,8.9


In [0]:
from pyspark.ml.feature import StringIndexer

# Assuming steamDF is your DataFrame and '_c1' is the column with game names

# Create an indexer object
indexer = StringIndexer(inputCol="_c1", outputCol="game_id")

# Fit the indexer to the data and create a new DataFrame with the indexed column
indexed = indexer.fit(steamDF).transform(steamDF)

# Now, you can drop the 'Behaviour details' column if it is no longer needed
finalDF = indexed.drop("Behaviour details")

# Show the schema to verify the new 'game_id' column
finalDF.printSchema()

# Show the resulting DataFrame
finalDF.show()

2024/03/04 16:26:40 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID 'e109fb671e204e398feda6bd141604ad', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: double (nullable = true)
 |-- game_id: double (nullable = false)

+---------+--------------------+--------+-----+-------+
|      _c0|                 _c1|     _c2|  _c3|game_id|
+---------+--------------------+--------+-----+-------+
|151603712|The Elder Scrolls...|purchase|  1.0|    8.0|
|151603712|The Elder Scrolls...|    play|273.0|    8.0|
|151603712|           Fallout 4|purchase|  1.0|  100.0|
|151603712|           Fallout 4|    play| 87.0|  100.0|
|151603712|               Spore|purchase|  1.0|  332.0|
|151603712|               Spore|    play| 14.9|  332.0|
|151603712|   Fallout New Vegas|purchase|  1.0|   29.0|
|151603712|   Fallout New Vegas|    play| 12.1|   29.0|
|151603712|       Left 4 Dead 2|purchase|  1.0|    4.0|
|151603712|       Left 4 Dead 2|    play|  8.9|    4.0|
|151603712|            HuniePop|purchase|  1.0|  867.0|
|151603712|            HunieP

In [0]:
# Assuming 'steamDF' is the DataFrame you are working with
# Rename the columns accordingly
finalDF = finalDF.withColumnRenamed("_c0", "UserID") \
                 .withColumnRenamed("_c1", "Name of the Game") \
                 .withColumnRenamed("_c2", "Behaviour details") \
                 .withColumnRenamed("_c3", "Value") \
                 .withColumnRenamed("game_id", "GameID") \
# Now display the DataFrame with the new column names
finalDF.display()

UserID,Name of the Game,Behaviour details,Value,GameID
151603712,The Elder Scrolls V Skyrim,purchase,1.0,8.0
151603712,The Elder Scrolls V Skyrim,play,273.0,8.0
151603712,Fallout 4,purchase,1.0,100.0
151603712,Fallout 4,play,87.0,100.0
151603712,Spore,purchase,1.0,332.0
151603712,Spore,play,14.9,332.0
151603712,Fallout New Vegas,purchase,1.0,29.0
151603712,Fallout New Vegas,play,12.1,29.0
151603712,Left 4 Dead 2,purchase,1.0,4.0
151603712,Left 4 Dead 2,play,8.9,4.0


In [0]:
from pyspark.sql.functions import when, col
from pyspark.ml.recommendation import ALS
from pyspark.sql.types import IntegerType, FloatType

# Load the DataFrame (assuming it's already loaded as steamDF with the appropriate columns)
# If 'Value' for 'purchase' is not binary, you need to transform it
finalDF = finalDF.withColumn('rating',
                             when(col('Behaviour details') == 'purchase', 1)
                             .otherwise(col('Value').cast(FloatType())))


# Cast the 'UserID' and 'Game_ID' columns to integer if they are not already
finalDF = finalDF.withColumn("UserID", col("UserID").cast(IntegerType())) \
                 .withColumn("GameID", col("GameID").cast(IntegerType()))

# Now you can initialize the ALS learner
als = ALS(
    userCol='UserID',
    itemCol='GameID',
    ratingCol='rating',
    nonnegative=True,  # Assuming you want nonnegative matrix factorization
    implicitPrefs=False  # Set to True if you're working with implicit feedback
)

# Fit the ALS model to the data
model = als.fit(finalDF)

# You can now use the model to make predictions or evaluate its performance
# For example, to recommend games for all users, you can use the recommendForAllUsers method
recommendations = model.recommendForAllUsers(5)  # Recommend top 5 items for all users

# Show some recommendations
finalDF.show()

2024/03/04 16:26:53 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '64e5696c5f7d4a5e89690c6ef4ae11eb', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


+---------+--------------------+-----------------+-----+------+------+
|   UserID|    Name of the Game|Behaviour details|Value|GameID|rating|
+---------+--------------------+-----------------+-----+------+------+
|151603712|The Elder Scrolls...|         purchase|  1.0|     8|   1.0|
|151603712|The Elder Scrolls...|             play|273.0|     8| 273.0|
|151603712|           Fallout 4|         purchase|  1.0|   100|   1.0|
|151603712|           Fallout 4|             play| 87.0|   100|  87.0|
|151603712|               Spore|         purchase|  1.0|   332|   1.0|
|151603712|               Spore|             play| 14.9|   332|  14.9|
|151603712|   Fallout New Vegas|         purchase|  1.0|    29|   1.0|
|151603712|   Fallout New Vegas|             play| 12.1|    29|  12.1|
|151603712|       Left 4 Dead 2|         purchase|  1.0|     4|   1.0|
|151603712|       Left 4 Dead 2|             play|  8.9|     4|   8.9|
|151603712|            HuniePop|         purchase|  1.0|   867|   1.0|
|15160

In [0]:

# If 'finalvw' was supposed to be a temporary view, ensure it's created first
finalDF.createOrReplaceTempView("finalDF")

# Now you can use Spark SQL to interact with the view
resultDF = spark.sql("SELECT * FROM finalDF")


#### Part 1: Exploratory analysis

In [0]:
%sql
SELECT count(UserID) as UserID, count(distinct UserID) as DistinctIDs
FROM finalDF

UserID,DistinctIDs
200000,12393


#### Top 10 Games by total hours of play. 

In [0]:
%sql
SELECT UserID, `Name of the Game`, SUM(Value) AS `Total Hours of play`
FROM finalDF
WHERE `Behaviour details` = 'play'
GROUP BY UserID, `Name of the Game`
ORDER BY `Total Hours of play` DESC
LIMIT (10)

UserID,Name of the Game,Total Hours of play
73017395,Sid Meier's Civilization V,11754.0
100630947,Dota 2,10442.0
153382649,Team Fortress 2,9640.0
130882834,Dota 2,7765.0
52567955,Dota 2,6964.0
121199670,Dota 2,6753.0
86256882,Dota 2,6015.0
70487610,Sid Meier's Civilization V,6013.0
101414179,Dota 2,5982.0
12660489,Dota 2,5970.0


Databricks visualization. Run in Databricks to view.


#### Top 10 purchase of all games and the percentage each game contributes

In [0]:
%sql
WITH TotalPurchases AS (
  SELECT 
    `Name of the Game`,
    SUM(Value) AS TotalValueForGame,
    SUM(SUM(Value)) OVER () AS TotalValueOfAllGames,
    count(distinct UserID) as UserID
  FROM 
    finalDF
  WHERE 
    `Behaviour details` = 'purchase'
  GROUP BY `Name of the Game`
)

SELECT 
  `Name of the Game`,
  TotalValueForGame,
  ROUND((TotalValueForGame / TotalValueOfAllGames) * 100, 2) AS PurchasePercentage,
  UserID
FROM TotalPurchases
ORDER BY TotalValueForGame DESC
LIMIT (10)

Name of the Game,TotalValueForGame,PurchasePercentage,UserID
Dota 2,4841.0,3.74,4841
Team Fortress 2,2323.0,1.79,2323
Unturned,1563.0,1.21,1563
Counter-Strike Global Offensive,1412.0,1.09,1412
Half-Life 2 Lost Coast,981.0,0.76,981
Counter-Strike Source,978.0,0.76,978
Left 4 Dead 2,951.0,0.73,951
Counter-Strike,856.0,0.66,856
Warframe,847.0,0.65,847
Half-Life 2 Deathmatch,823.0,0.64,823


Databricks visualization. Run in Databricks to view.


#### Part 2: Data Pre-processing

In [0]:
# First, filter the DataFrame to only include rows where 'Behaviour details' is 'play'
playDF = finalDF.filter(finalDF["Behaviour details"] == 'play')

# Now, create or replace a temporary view using the filtered DataFrame
# playDF.createOrReplaceTempView("steam_play_vw")

# You can now run SQL queries against this view

In [0]:
%r
`Name of the Game` ~ value + GameID + rating

`Name of the Game` ~ value + GameID + rating

In [0]:
%r
`Name of the Game` ~ .

`Name of the Game` ~ .

In [0]:
# preprocessing data in to correct format

# The first is ‘features’ and contains a vector which includes the values from the first four columns. The second is ‘label’ and contains the values from the Occupancy column.


from pyspark.ml.feature import RFormula

preprocess = RFormula(formula="`Name of the Game` ~ .")

finalDF = preprocess.fit(finalDF).transform(finalDF)

finalDF.show()

2024/03/04 16:28:17 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '5df8317f999540ee83e05ff6b1aa3f86', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
File [0;32m<command-4059651805996403>, line 10[0m
[1;32m      6[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01mml[39;00m[38;5;21;01m.[39;00m[38;5;21;01mfeature[39;00m [38;5;28;01mimport[39;00m RFormula
[1;32m      8[0m preprocess [38;5;241m=[39m RFormula(formula[38;5;241m=[39m[38;5;124m"[39m[38;5;124m`Name of the Game` ~ .[39m[38;5;124m"[39m)
[0;32m---> 10[0m finalDF [38;5;241m=[39m preprocess[38;5;241m.[39mfit(finalDF)[38;5;241m.[39mtransform(finalDF)
[1;32m     12[0m finalDF[38;5;241m.[39mshow()

File [0;32m/databricks/python/lib/python3.10/site-packages/mlflow/utils/autologging_utils/safety.py:571[0m, in [0;36msafe_patch.<locals>.safe_patch_function[0;34m(*args, **kwargs)[0m
[1;32m    569[0m     patch_function[38;5;241m.[39m


##### split the data into a training dataset and a test dataset

In [0]:
(trainingdataset_DF, testdataset_DF) = finalDF.randomSplit([0.7, 0.3], seed=100)

trainingdataset_DF.display()

[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
File [0;32m<command-4059651805996403>, line 10[0m
[1;32m      6[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01mml[39;00m[38;5;21;01m.[39;00m[38;5;21;01mfeature[39;00m [38;5;28;01mimport[39;00m RFormula
[1;32m      8[0m preprocess [38;5;241m=[39m RFormula(formula[38;5;241m=[39m[38;5;124m"[39m[38;5;124m`Name of the Game` ~ .[39m[38;5;124m"[39m)
[0;32m---> 10[0m finalDF [38;5;241m=[39m preprocess[38;5;241m.[39mfit(finalDF)[38;5;241m.[39mtransform(finalDF)
[1;32m     12[0m finalDF[38;5;241m.[39mshow()

File [0;32m/databricks/python/lib/python3.10/site-packages/mlflow/utils/autologging_utils/safety.py:571[0m, in [0;36msafe_patch.<locals>.safe_patch_function[0;34m(*args, **kwargs)[0m
[1;32m    569[0m     patch_function[38;5;241m.[39m


#### Part 5: Training the Model

In [0]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

# train the model

model = dt.fit(trainingdataset_DF)

[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
File [0;32m<command-4059651805996403>, line 10[0m
[1;32m      6[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01mml[39;00m[38;5;21;01m.[39;00m[38;5;21;01mfeature[39;00m [38;5;28;01mimport[39;00m RFormula
[1;32m      8[0m preprocess [38;5;241m=[39m RFormula(formula[38;5;241m=[39m[38;5;124m"[39m[38;5;124m`Name of the Game` ~ .[39m[38;5;124m"[39m)
[0;32m---> 10[0m finalDF [38;5;241m=[39m preprocess[38;5;241m.[39mfit(finalDF)[38;5;241m.[39mtransform(finalDF)
[1;32m     12[0m finalDF[38;5;241m.[39mshow()

File [0;32m/databricks/python/lib/python3.10/site-packages/mlflow/utils/autologging_utils/safety.py:571[0m, in [0;36msafe_patch.<locals>.safe_patch_function[0;34m(*args, **kwargs)[0m
[1;32m    569[0m     patch_function[38;5;241m.[39m


##### Part 6: Evaluating the Model

###### To evaluate the model, we need to use our trained model to make predictions on the test data, which we did not use when training the model. We generate predictions by using the transform() method on the test data.

In [0]:
# make predictons on the test dataset

predictions = model.transform(testdataset_DF)

predictions.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
File [0;32m<command-4059651805996403>, line 10[0m
[1;32m      6[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01mml[39;00m[38;5;21;01m.[39;00m[38;5;21;01mfeature[39;00m [38;5;28;01mimport[39;00m RFormula
[1;32m      8[0m preprocess [38;5;241m=[39m RFormula(formula[38;5;241m=[39m[38;5;124m"[39m[38;5;124m`Name of the Game` ~ .[39m[38;5;124m"[39m)
[0;32m---> 10[0m finalDF [38;5;241m=[39m preprocess[38;5;241m.[39mfit(finalDF)[38;5;241m.[39mtransform(finalDF)
[1;32m     12[0m finalDF[38;5;241m.[39mshow()

File [0;32m/databricks/python/lib/python3.10/site-packages/mlflow/utils/autologging_utils/safety.py:571[0m, in [0;36msafe_patch.<locals>.safe_patch_function[0;34m(*args, **kwargs)[0m
[1;32m    569[0m     patch_function[38;5;241m.[39m

In [0]:
# use evalutor to measure accuracy of predictions on test data

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

mlevaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

accurancy = mlevaluator.evaluate(predictions)

print("Accurancy = %g " % (accurancy))

[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
File [0;32m<command-4059651805996403>, line 10[0m
[1;32m      6[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01mml[39;00m[38;5;21;01m.[39;00m[38;5;21;01mfeature[39;00m [38;5;28;01mimport[39;00m RFormula
[1;32m      8[0m preprocess [38;5;241m=[39m RFormula(formula[38;5;241m=[39m[38;5;124m"[39m[38;5;124m`Name of the Game` ~ .[39m[38;5;124m"[39m)
[0;32m---> 10[0m finalDF [38;5;241m=[39m preprocess[38;5;241m.[39mfit(finalDF)[38;5;241m.[39mtransform(finalDF)
[1;32m     12[0m finalDF[38;5;241m.[39mshow()

File [0;32m/databricks/python/lib/python3.10/site-packages/mlflow/utils/autologging_utils/safety.py:571[0m, in [0;36msafe_patch.<locals>.safe_patch_function[0;34m(*args, **kwargs)[0m
[1;32m    569[0m     patch_function[38;5;241m.[39m

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Add GameID") \
    .getOrCreate()

# Create the DataFrame with your data
data = spark.createDataFrame([
    (151603712, "The Elder Scrolls V Skyrim", "purchase", 1),
    (151603712, "The Elder Scrolls V Skyrim", "play", 273),
    (151603712, "Fallout 4", "purchase", 1),
    (151603712, "Fallout 4", "play", 87),
    (151603712, "Spore", "purchase", 1),
    (151603712, "Spore", "play", 14.9),
    (151603712, "Fallout New Vegas", "purchase", 1),
    (151603712, "Fallout New Vegas", "play", 12.1),
    (151603712, "Left 4 Dead 2", "purchase", 1),
    (151603712, "Left 4 Dead 2", "play", 8.9)
], ["user_id", "game_title", "behavior", "value"])

# Generate unique GameID for each game title
data = data.withColumn("GameID", monotonically_increasing_id())

# Show the DataFrame with the added GameID
data.show()

# Stop the SparkSession
spark.stop()


  from pandas.core import (


TypeError: field value: Can not merge type <class 'pyspark.sql.types.LongType'> and <class 'pyspark.sql.types.DoubleType'>