In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

spark = SparkSession.builder.appName("MyFirstCSVLoad").getOrCreate()

In [5]:
ratings = spark.read.csv(
    path="ratings.csv",
    sep=",",
    header=True,
    quote='"',
    schema="userId INT, movieId INT, rating DOUBLE, timestamp INT",
).withColumn("timestamp", f.to_timestamp(f.from_unixtime("timestamp")))

movies = spark.read.csv(
    path="movies.csv",
    sep=",",
    header=True,
    quote='"',
    schema="movieId INT, title STRING, genres STRING",
)

movie_genre = (
    movies.withColumn("genres_array", f.split("genres", "\|"))
    .withColumn("genre", f.explode("genres_array"))
    .select("movieId", "title", "genre")
)

links = spark.read.csv(
    path="links.csv",
    sep=",",
    header=True,
    quote='"',
    schema="movieId INT, imdbId STRING, tmdbId INT",
)

tags = spark.read.csv(
    path="tags.csv",
    sep=",",
    header=True,
    quote='"',
    schema="userId INT, movieId INT, tag STRING, timestamp INT",
).withColumn("timestamp", f.to_timestamp(f.from_unixtime("timestamp")))

available_genres = movie_genre.select("genre").distinct()
movies_without_genre = movies.where(f.col("genres") == "(no genres listed)")
movies_per_genre = movie_genre.groupBy("genre").count()

AnalysisException: 'Path does not exist: file:/home/jovyan/work/ratings.csv;'

In [None]:
ratings.groupBy("userId").agg({"timestamp": "min"}).show()

In [None]:
# shorthand
ratings.agg({"*": "count"})#.show()

# for this longer notation
ratings.groupBy().agg({"*": "count"})#.show()

# result is equivalent to performing, but as a transformation rather than an action!
# print(ratings.count())

# but it is much more interesting to see count per column
# agg takes a dictionary as input, with the key representing the name of the column and the value being what to run on it
# possible aggregates are avg, max, min, sum, count

# giving * as key is a shorthand for saying (run on all columns)
# the result will be a sum of counts across columns, not especially useful in my opinion, but a nice one to know about
ratings.groupBy("movieId").agg({"*": "count"})#.show()

# these fail with Invalid number of arguments for function...'
# ratings.groupBy("movieId").agg({"*": "avg"}).show()
# ratings.groupBy("movieId").agg({"*": "sum"}).show()
# ratings.groupBy("movieId").agg({"*": "min"}).show()
# ratings.groupBy("movieId").agg({"*": "max"}).show()

# See when the last rating was provided for a given movie
ratings.groupBy("movieId").agg({"timestamp": "max"})#.show()

# Aggregates run on a single scope, meaning that when for example trying
# to see per movie what the min and max ratings are is not as simple as providing it like this
ratings.groupBy("movieId").agg({"rating": "max", "rating": "min"})#.show()

# Instead you would have to do something more like this
ratings.groupBy("movieId").agg({"rating": "max"}).join(
    ratings.groupBy("movieId").agg({"rating": "min"}), ["movieId"]
)#.show()

# Or shorter
ratings_per_movie = ratings.groupBy("movieId")
ratings_per_movie.agg({"rating": "min"}).join(ratings_per_movie.agg({"rating": "max"}), ["movieId"])#.show()

# Or put it in a function

def plural_aggs(df, groupby_col: str, agg_col: str, aggs: list):
    df_grouped = df.groupBy(groupby_col)
    dfs = [df_grouped.agg({agg_col: agg}) for agg in aggs]
    for i, dfi in enumerate(dfs):
        df = dfi if i == 0 else df.join(dfi, [groupby_col])
    return df

plural_aggs(ratings, "movieId", "rating", ["min", "max", "avg", "count"])#.show()

# Or as a SQL statement...
# First register as temp view
ratings.createOrReplaceTempView("ratings")
spark.sql("""
SELECT movieId, min(rating), max(rating), avg(rating), count(rating) FROM ratings GROUP BY movieId
""").show()

# So as you see, Spark SQL can also do 'regular' SQL statements

In [None]:
tags.show()

In [None]:
tags.select("movieId").distinct().count()

In [None]:
ratings.select("userId").distinct().count()

In [None]:
ratings.select("movieId").distinct().count()

In [None]:
distinct_values = dict()
for col in tags.columns:
    distinct_values[col] = tags.select(col).distinct().count()

print(distinct_values)

In [None]:
tags.describe().show()

In [None]:
def distinct_count_per_column(df):
    print({col: df.select(col).distinct().count() for col in df.columns})

distinct_count_per_column(tags)
distinct_count_per_column(ratings)
distinct_count_per_column(movies)

In [None]:
tags_per_movie = (
    tags.withColumn("tag", f.lower(f.col("tag")))
    .groupBy("movieId")
    .agg(f.collect_set("tag"))
    .withColumnRenamed("collect_set(tag)", "tag_list")
)

tags_per_user = (
    tags.withColumn("tag", f.lower(f.col("tag")))
    .groupBy("userId")
    .agg(f.collect_set("tag"))
    .withColumnRenamed("tag_list", "collect_set(tag)")
)

In [None]:
tags_per_movie.show(20, False)

In [None]:
tags.select("tag").distinct().orderBy("tag").show(150, False)

In [None]:
tags.describe(["tag"]).show()

In [None]:
tags.describe().show()

In [None]:
tags.summary().show()

In [None]:
tags.summary(["min", "max", "mean"]).show()

In [None]:
# example of performing split
train, validate, test = ratings.randomSplit([6.0, 2.0, 2.0])

# dropping column before splitting
train, validate, test = ratings.drop("timestamp").randomSplit([6.0, 2.0, 2.0])

# optional seed
train, validate, test = ratings.randomSplit([6.0, 2.0, 2.0], seed=42)


In [None]:
train.show()

In [None]:
# https://www.codementor.io/jadianes/building-a-recommender-with-apache-spark-python-example-app-part1-du1083qbw
small_ratings = ratings.select("UserID", "MovieID", "Rating")
small_movies = movies.select("MovieID", "Title")

In [None]:
small_ratings.show(5)
small_movies.show(5)

#### ALS:
Spark MLlib library for Machine Learning provides a Collaborative Filtering implementation by using __Alternating Least Squares__

```python
class pyspark.ml.recommendation.ALS(
    rank=10,
    maxIter=10,
    regParam=0.1,
    numUserBlocks=10,
    numItemBlocks=10,
    implicitPrefs=False,
    alpha=1.0,
    userCol="user",
    itemCol="item",
    seed=None,
    ratingCol="rating",
    nonnegative=False,
    checkpointInterval=10,
    intermediateStorageLevel="MEMORY_AND_DISK",
    finalStorageLevel="MEMORY_AND_DISK",
    coldStartStrategy="nan",
)
```

In [None]:
from pyspark.ml.recommendation import ALS

The implementation in ml has the following parameters:

* __rank__ is the number of latent factors in the model.
* __maxIter__ is the maximum number of iterations to run.
* __regParam__ specifies the regularization parameter in ALS.
* __implicitPrefs__ specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.
* __alpha__ is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.

In [None]:
small_ratings.printSchema()
training_df, validation_df, test_df = small_ratings.randomSplit(
    [6.0, 2.0, 2.0], seed=42
)
# validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
# test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [None]:
# Very uniform looking data
training_df.summary().show()
validation_df.summary().show()
test_df.summary().show()

In [None]:
# Select only first 2 columns
validation_for_predict_df = validation_df.select(validation_df.columns[:2])
validation_for_predict_rdd = validation_df.rdd.map(lambda x: (x[0], x[1]))

In [6]:
validation_for_predict_df.show()
validation_for_predict_rdd.show()

NameError: name 'validation_for_predict_df' is not defined