In [1]:
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession, functions
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType, FloatType

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


In [2]:
spark = SparkSession.builder.master('local[*]').appName('Anime_recommender').getOrCreate()

ratings     = spark.read.csv('data/rating_complete_small.csv', sep=',', header=True, 
                    schema= StructType([ 
                            StructField('user'   , IntegerType()), 
                            StructField('item'   , IntegerType()),
                            StructField('rating' , IntegerType()) 
                        ]))

EP_ratings  = spark.read.csv('data/valoraciones_EP.csv', sep=',', header=True, 
                    schema= StructType([ 
                            StructField('user'   , IntegerType()), 
                            StructField('item'   , IntegerType()),
                            StructField('rating' , IntegerType()) 
                        ]))
series      = spark.read.csv('data/anime.csv', sep=',', header=True,
                    schema=StructType([ 
                            StructField('item'     , IntegerType()), 
                            StructField('Name'   , StringType()),
                            StructField('Score'  , FloatType()) ,
                            StructField('Genres'  , StringType()) ,
                            StructField('English name'  , StringType()) ,
                            StructField('Japanese name'  , StringType()) ,
                            StructField('Type'  , StringType()) 
                        ]))

In [3]:
all_ratings = ratings.union(EP_ratings)

all_ratings_wtype = all_ratings.join(series['item', 'Type'], on='item')

n_ratings = all_ratings_wtype.select("rating").count()

n_users = all_ratings_wtype.select("user").distinct().count()
n_items = all_ratings_wtype.select("item").distinct().count()

sparse_percentage = (1.0 - (n_ratings *1.0)/(n_users * n_items))*100

print("Sparsity in data:", round(sparse_percentage, 3))

Sparsity in data: 98.87


In [4]:
n_ratings_user = ratings.groupBy("user").count().orderBy('count', ascending=False)
n_ratings_user.show()


+-----+-----+
| user|count|
+-----+-----+
|68042|13462|
|10255| 8215|
|64807| 6656|
|38143| 6043|
| 4773| 5616|
|18355| 5311|
|71931| 5088|
|25411| 4862|
|55748| 4810|
|15609| 4593|
|23930| 4579|
|34764| 4478|
|63339| 4266|
| 7179| 4045|
|32075| 3982|
|67428| 3975|
|54171| 3741|
|71433| 3685|
|10367| 3603|
| 5648| 3571|
+-----+-----+
only showing top 20 rows



In [5]:
n_ratings_movie = ratings.groupBy("item").count().where('count > 39').orderBy('count', ascending=True)
n_ratings_movie.show() # 16463 -> 9144


+-----+-----+
| item|count|
+-----+-----+
| 1959|   40|
| 4190|   40|
| 5217|   40|
| 2610|   40|
| 5383|   40|
| 4025|   40|
| 7504|   40|
| 5953|   40|
|15159|   40|
|10869|   40|
|38036|   40|
|34503|   40|
| 3890|   40|
|13029|   40|
|30086|   40|
| 3529|   40|
|43465|   40|
|28641|   40|
| 3726|   40|
| 2803|   40|
+-----+-----+
only showing top 20 rows



In [6]:
(train, test) = ratings.randomSplit([0.8, 0.2], seed = 1234)

als = ALS(userCol="user", itemCol="item", ratingCol="rating", nonnegative = True, implicitPrefs = False, coldStartStrategy="drop")

# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 150]) \
            .addGrid(als.regParam, [.01,.15]) \
            .build()
            #             .addGrid(als.maxIter, [5, 50, 100, 200]) \

           
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=2)



Num models to be tested:  4


In [None]:
model = cv.fit(train)

best_model = model.bestModel

In [None]:
#all_ratings = ratings.union(EP_ratings)
# DONE ABOVE
#all_ratings_wtype = all_ratings.join(series['item', 'Type'], on='item')

model = als.fit(all_ratings)
model.setPredictionCol('predic')
#model.userFactors.orderBy("id").collect()

type(best_model)

In [None]:

# View the predictions
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)
test_predictions.show()


In [10]:
als = ALS(rank=10, seed=0)

als.setMaxIter(5)

ALS_174cfffc4c1a

In [13]:
#EP_ratings.show(4)
#ratings.show(4)
#
#all_ratings = ratings.union(EP_ratings)
#
#all_ratings.show(4)

In [None]:
#all_ratings = ratings.union(EP_ratings)
# DONE ABOVE
#all_ratings_wtype = all_ratings.join(series['item', 'Type'], on='item')

model = als.fit(all_ratings)
model.setPredictionCol('predic')
#model.userFactors.orderBy("id").collect()

In [None]:

from pyspark.sql.functions import when
not_seen = all_ratings_wtype.withColumn('user', when(all_ratings_wtype['user'] != 666666, 666666).otherwise(-1)).where('Type == "TV" or Type == "Movie"').dropDuplicates(['item']).where('user != -1')

#predictions = sorted(model.transform(not_seen['user','item','Type']).where('Type == "TV"').collect(), key=lambda r: r[3], reverse=True)

#model.transform(not_seen['user','item','Type']).sort('Type', 'predic', ascending=False).show(300)

test = model.transform(not_seen['user','item', 'Type']).sort('Type', 'predic', ascending=False).collect()

print(type(test))
if test[len(test)//2]['Type'] == 'Movie':
    pass
elif test[3*len(test)//5]['Type'] == 'Movie':
    for i in range (2*len(test)//3, 0, -1):
        print(test[i]['Type'])
        if test[i]['Type'] == "TV":
            print(i)
            break
elif test[4*len(test)//5]['Type'] == 'Movie':
    for i in range (3*len(test)//4, 0, -1):
        print(test[i]['Type'])
        if test[i]['Type'] == "TV":
            print(i,2)
            break
#model.transform(not_seen['user','item','Type']).sort('Type', 'predic', ascending=False).show(300)
#print(predictions[:10])

#predictions = model.transform(not_seen['user','item','Type']).sort(['Type','predic'], ascending=False).show(10)


In [6]:
from pyspark.sql.functions import when

all_ratings = ratings.union(EP_ratings)
test = all_ratings.groupBy('user','item').count().where('user != 666666')

other = all_ratings.withColumn('user', when(all_ratings['user'] != 666666, 666666))

test.show(3)
other.show(3)

+----+-----+-----+
|user| item|count|
+----+-----+-----+
|   0| 2248|    1|
|   3|36038|    1|
|   3|14199|    1|
+----+-----+-----+
only showing top 3 rows

+------+----+------+
|  user|item|rating|
+------+----+------+
|666666| 430|     9|
|666666|1004|     5|
|666666|3010|     7|
+------+----+------+
only showing top 3 rows



In [7]:
test = all_ratings.groupBy('user','item').count().where('user != 666666')
test.show(10)

+----+-----+-----+
|user| item|count|
+----+-----+-----+
|   0| 2248|    1|
|   3|36038|    1|
|   3|14199|    1|
|   6| 1496|    1|
|   7|22199|    1|
|  14|38524|    1|
|  16| 2508|    1|
|  17|24459|    1|
|  17| 6919|    1|
|  17|10719|    1|
+----+-----+-----+
only showing top 10 rows



In [9]:
from pyspark.sql.functions import when
other = all_ratings.withColumn('user', when(all_ratings['user'] != 666666, 666666)).dropDuplicates()
other.show(10)

+------+-----+------+
|  user| item|rating|
+------+-----+------+
|666666|10396|     8|
|666666| 4353|     7|
|666666| 1335|     8|
|666666|  143|    10|
|666666| 3588|     8|
|666666|  392|     6|
|666666|32438|     5|
|666666| 2251|     8|
|666666| 6702|     8|
|666666|28735|     8|
+------+-----+------+
only showing top 10 rows



In [4]:
user_recs = model.recommendForAllUsers(50)
#test = user_recs.where(user_recs.user == 666666).select("recommendations.item", "recommendations.rating").collect()



In [6]:
test = user_recs.where(user_recs.user == 666666).show(10)

test

+------+--------------------+
|  user|     recommendations|
+------+--------------------+
|666666|[{35054, 12.45934...|
+------+--------------------+



In [106]:
#series['id', 'Type'][series['id'].isin(35035)].where('Type == "Movie"')

all_ratings.join(series['id', 'Type'], )

+----+----+------+
|user|item|rating|
+----+----+------+
|   0| 430|     9|
|   0|1004|     5|
|   0|3010|     7|
+----+----+------+
only showing top 3 rows



In [104]:
#series['id', 'Type'].filter(series.id.isin()).show(2)
series['id', 'Type'][series['id'].isin(test[0]['item'])].where('Type == "OVA"').show(10)

+-----+----+
|   id|Type|
+-----+----+
|18743| OVA|
|23731| OVA|
|26091| OVA|
|26281| OVA|
|26293| OVA|
|28561| OVA|
|29365| OVA|
|30941| OVA|
|31004| OVA|
|33823| OVA|
+-----+----+
only showing top 10 rows



In [None]:
series = spark.read.csv('data/anime.csv', header=True)

all_ratings.join(series['id', 'Type'], all_ratings['item'] == series['id']).where('Type == "TV"').show(3)

# series['id', 'Type'].show(3)


+----+----+------+----+----+
|user|item|rating|  id|Type|
+----+----+------+----+----+
|   0|3010|     7|3010|  TV|
|   0|2762|     9|2762|  TV|
|   0|1571|    10|1571|  TV|
+----+----+------+----+----+
only showing top 3 rows



In [31]:
import subprocess
dlp_out = subprocess.run(['yt-dlp', '--get-url', 'ytsearch1:"cowboy bebop trailer anime series original"'], stdout=subprocess.PIPE)
dlp_out.stdout.decode('utf-8').split('\n',1)

['https://rr4---sn-gqn-h5qs.googlevideo.com/videoplayback?expire=1640169994&ei=qq3CYd2hBpCEWIzPkOAD&ip=2a0c%3A5a80%3A4703%3Ab600%3A512f%3A8d8b%3Ac19%3A797b&id=o-AHtGT5vPyCuaNB6ckEJv8GQiH1WNPpyT-xcGikkW8Kkm&itag=248&source=youtube&requiressl=yes&mh=6e&mm=31%2C29&mn=sn-gqn-h5qs%2Csn-h5q7knel&ms=au%2Crdu&mv=m&mvi=4&pcm2cms=yes&pl=36&initcwndbps=1175000&vprv=1&mime=video%2Fwebm&gir=yes&clen=9868364&dur=60.059&lmt=1633383944636798&mt=1640148061&fvip=4&keepalive=yes&fexp=24001373%2C24007246&c=ANDROID&txp=5316224&sparams=expire%2Cei%2Cip%2Cid%2Citag%2Csource%2Crequiressl%2Cvprv%2Cmime%2Cgir%2Cclen%2Cdur%2Clmt&sig=AOq0QJ8wRgIhANzNPKjtpI6lKfF7EEMrhfbfa7iga9EUn3kypoFmZQjFAiEA29Rhmug67_LKyLunBtbz3O_sAFRSvHhREDO4tAVXbZY%3D&lsparams=mh%2Cmm%2Cmn%2Cms%2Cmv%2Cmvi%2Cpcm2cms%2Cpl%2Cinitcwndbps&lsig=AG3C_xAwRgIhAJ6A_o1rCUR-r9TH483yB-qSZGJfKBZWfXpWrBIW1aC5AiEAyEAH5-Uu3h1KfimzlK9vhRc9AjdGQZWw0-8PdpV21uw%3D',
 'https://rr4---sn-gqn-h5qs.googlevideo.com/videoplayback?expire=1640169994&ei=qq3CYd2hBpCEWIzPkOA

In [26]:
dlp_out = subprocess.run(['yt-dlp', '--get-id', 'ytsearch1:"cowboy bebop trailer anime series"'], stdout=subprocess.PIPE)
dlp_out.stdout.decode('utf-8')

'Oqd2C3oZkBU\n'