In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
import pyspark
from sn import stock_name
from pyspark import SparkConf
from sklearn.preprocessing import MultiLabelBinarizer
import pandas as pd
import os
import sys
import random
from pyspark.ml.evaluation import BinaryClassificationEvaluator,RankingEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator



In [20]:
def ROEM(predictions, userCol = "userId", itemCol = "stockId", ratingCol = "hold"):
              #Creates table that can be queried
              predictions.createOrReplaceTempView("predictions")

              #Sum of total number of plays of all songs
              denominator = predictions.groupBy().sum(ratingCol).collect()[0][0]

              #Calculating rankings of songs predictions by user
              spark.sql("SELECT " + userCol + " , " + ratingCol + " , PERCENT_RANK() OVER (PARTITION BY " + userCol + " ORDER BY prediction DESC) AS rank FROM predictions").createOrReplaceTempView("rankings")

              #Multiplies the rank of each song by the number of plays and adds the products together
              numerator = spark.sql('SELECT SUM(' + ratingCol + ' * rank) FROM rankings').collect()[0][0]

              performance = numerator/denominator

              return performance

In [3]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [4]:
sparkConf = SparkConf().setAppName('ALS')\
    .setMaster('local[*]')\
    .set('spark.executor.memory','4g')\
    .set('spark.driver.memory','4g')

spark = SparkSession.builder \
    .config(conf=sparkConf) \
    .getOrCreate()


In [5]:
stock_name = random.sample(stock_name,100)


In [6]:
num_lists = 100
max_list_length = 10

# Generate 100 lists
result_lists = []
for _ in range(num_lists):
    # Determine the length of the current list randomly
    list_length = random.randint(1, max_list_length)
    
    # Randomly pick stock names for the current list
    current_list = random.sample(stock_name, list_length)
    
    # Add the current list to the result_lists
    result_lists.append(current_list)


In [7]:
data = {'userId':[x+1 for x in range(100)],
        'stockName':result_lists}
stocks = pd.DataFrame({'stocks':stock_name})
# Create a PySpark DataFrame from the sample data
df = pd.DataFrame({'userId':[x+1 for x in range(100)]})
real = pd.DataFrame(data)
print(real)
real = real.explode('stockName')
real['hold'] = 1
real

    userId                                          stockName
0        1                                              [TTD]
1        2                    [BSQR, RCACW, TITN, GOEV, CPIX]
2        3                                            [HMACW]
3        4                           [GOEV, BFRG, ADIL, BLEU]
4        5                          [DUNEW, YMAB, PDCO, ZIVO]
..     ...                                                ...
95      96                          [HMACW, BSQR, NCPL, SLVR]
96      97                               [MNTK, PNACW, SVIIR]
97      98  [PLTN, ZIVO, JUGG, HMACW, PETV, EWCZ, SMAP, GO...
98      99                           [BFRG, AVT, GCMGW, PNAC]
99     100  [SQFT, FLNC, CVAC, APTMW, BFRI, ROSEW, SVIIR, ...

[100 rows x 2 columns]


Unnamed: 0,userId,stockName,hold
0,1,TTD,1
1,2,BSQR,1
1,2,RCACW,1
1,2,TITN,1
1,2,GOEV,1
...,...,...,...
99,100,ROSEW,1
99,100,SVIIR,1
99,100,MERC,1
99,100,TRVN,1


In [8]:
DreamedDF=df.merge(stocks,how='cross')
test = DreamedDF.merge(real,how='left',left_on=['userId','stocks'],right_on=['userId','stockName'])
test = test.drop('stockName',axis='columns')
test = test.fillna(0)
test

Unnamed: 0,userId,stocks,hold
0,1,MNTK,0.0
1,1,RNA,0.0
2,1,TWLVU,0.0
3,1,RIBT,0.0
4,1,CODX,0.0
...,...,...,...
9995,100,GNTA,0.0
9996,100,BFRI,1.0
9997,100,BANX,0.0
9998,100,IMTX,0.0


In [9]:
# Create a PySpark DataFrame from the sample data
sparkdf = spark.createDataFrame(test)
indexer = StringIndexer(inputCol='stocks', outputCol='stockId')
indexed_df = indexer.fit(sparkdf).transform(sparkdf)
indexed_df =indexed_df.withColumn('hold',indexed_df['hold'].cast('int'))
indexed_df =indexed_df.withColumn('stockId',indexed_df['stockId'].cast('int'))

(training, testing) = indexed_df.randomSplit([0.8, 0.2])

In [10]:
als = ALS(
    rank=10,
    maxIter=10,
    regParam=0.01,
    userCol="userId",
    itemCol='stockId',
    coldStartStrategy="drop",
    ratingCol ='hold',
    implicitPrefs=True,
    nonnegative=True
)


In [21]:
from pyspark.ml.evaluation import Evaluator

class ROEMEvaluator(Evaluator):
    def __init__(self, userCol="userId", itemCol="stockId", ratingCol="hold"):
        self.userCol = userCol
        self.itemCol = itemCol
        self.ratingCol = ratingCol

    def _evaluate(self, dataset):
        predictions = dataset.withColumnRenamed("prediction", "predictions")

        predictions.createOrReplaceTempView("predictions")
        denominator = predictions.groupBy().sum(self.ratingCol).collect()[0][0]
        spark.sql("SELECT " + self.userCol + " , " + self.ratingCol + " , PERCENT_RANK() OVER (PARTITION BY " + self.userCol + " ORDER BY hold DESC) AS rank FROM predictions").createOrReplaceTempView("rankings")
        numerator = spark.sql('SELECT SUM(' + self.ratingCol + ' * rank) FROM rankings').collect()[0][0]
        performance = numerator / denominator

        return performance


In [22]:
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 20,30 ]) \
            .addGrid(als.maxIter, [5,10  ]) \
            .addGrid(als.regParam, [.01, .1, 1]) \
            .build()
evaluator = ROEMEvaluator()

cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)

In [23]:
model = cv.fit(training)
best_model = model.bestModel

In [24]:
predictions = best_model.transform(testing)
predictions.show()

+------+------+----+-------+------------+
|userId|stocks|hold|stockId|  prediction|
+------+------+----+-------+------------+
|     1|   AVT|   0|      4|         0.0|
|     1|  BANX|   0|      5|         0.0|
|     1|  DCTH|   0|     20|  0.10284982|
|     1|  EWCZ|   0|     25|         0.0|
|     1|  FEXD|   0|     27|         0.0|
|     1|  FLNC|   0|     28|  0.06964305|
|     1| FNVTU|   0|     30|  0.04496716|
|     1|  GNTA|   0|     33|  0.04140578|
|     1|  MERC|   0|     59|   0.0189712|
|     1|  MTEM|   0|     63|  0.14189382|
|     1|  SBUX|   0|     80|0.0034173457|
|     1|  SKGR|   0|     82|0.0076106824|
|     1|  SMID|   0|     85| 0.022942737|
|     1|  TRVN|   0|     93|         0.0|
|     1| TWLVU|   0|     95|         0.0|
|     6|  AUGX|   0|      3|         0.0|
|     6|  BCAL|   0|      7|  0.09518931|
|     6|  BFRG|   0|      8|  0.12921979|
|     6|  DCTH|   0|     20| 0.040333107|
|     6|  EWCZ|   0|     25|  0.14346346|
+------+------+----+-------+------

In [25]:
ROEM(predictions,userCol = "userId", itemCol = "stockId", ratingCol = "hold")

0.475160469543364

In [30]:
recommendations = best_model.recommendForAllUsers(10)
recommendations.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    20|[{79, 0.56232935}...|
|    40|[{43, 0.3372355},...|
|   100|[{90, 0.58905655}...|
|    10|[{39, 0.62642473}...|
|    50|[{52, 0.598133}, ...|
|    80|[{5, 0.7704651}, ...|
|    70|[{39, 0.8731736},...|
|    60|[{70, 0.7191778},...|
|    90|[{52, 1.1672944},...|
|    30|[{11, 0.44006598}...|
|    31|[{89, 0.33500084}...|
|    81|[{79, 1.0095809},...|
|    91|[{90, 0.18109679}...|
|     1|[{89, 0.16228397}...|
|    41|[{79, 0.86852247}...|
|    61|[{89, 0.22136004}...|
|    51|[{71, 0.68776983}...|
|    21|[{45, 0.41265896}...|
|    11|[{79, 0.31077862}...|
|    71|[{38, 0.12593175}...|
+------+--------------------+
only showing top 20 rows



In [31]:
recommendations.filter(recommendations.userId==1).show(truncate=False)

+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                  |
+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1     |[{89, 0.16228397}, {98, 0.1601815}, {42, 0.15702002}, {94, 0.1526056}, {81, 0.14688861}, {63, 0.14189382}, {40, 0.13945818}, {66, 0.13099423}, {20, 0.10284982}, {88, 0.1019261}]|
+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+



In [None]:
from pyspark.sql.functions import explode

# Assuming `recommendations` is your DataFrame containing the ALS recommendations
flattened_df = recommendations.select("userId", explode("recommendations").alias("rec"))
flattened_df = flattened_df.select("userId", "rec.stockId", "rec.rating")
tojoin = indexed_df.select('stocks','stockID')
tojoin = tojoin.dropDuplicates(["stockID"])
flattened_df = flattened_df.join(tojoin,flattened_df.stockId==tojoin.stockID,how='inner')


In [None]:
flattened_df.count()

In [None]:
recommendations_pandas = flattened_df.toPandas()

In [None]:
recommendations_pandas.info()

In [None]:
#recommendations_pandas.to_csv('Recommended')