In [6]:
import os
from dotenv import load_dotenv

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, median, count, percentile

In [2]:
import ml_pipeline

In [3]:
load_dotenv()
dialect = os.getenv('DIALECT')
host = os.getenv('HOST')
port = os.getenv('PORT')
user = os.getenv('USER')
password = os.getenv('PASSWORD')
dbname = os.getenv('DBNAME')
datapath = os.getenv('DATAPATH')

In [4]:
spark_jar_packages = '../misc/postgresql-42.6.0.jar'
spark = SparkSession.builder.config("spark.jars", spark_jar_packages) \
    .master("local[*]").appName("Anime_Recommender").config("spark.driver.memory", "15g").getOrCreate()

rating_df = ml_pipeline.spark_sql2df(spark, 'mal_rating', host,
                                        port, user, password, dbname, dialect)

In [7]:
rating_df.groupBy('user_id').agg(count('*').alias('n_completed')).agg(
    percentile(col('n_completed'), 0.25).alias('25%'),
    percentile(col('n_completed'), 0.50).alias('50%'),
    percentile(col('n_completed'), 0.75).alias('75%'),
    ).show()
spark.sparkContext._jvm.System.gc()

+----+----+-----+
| 25%| 50%|  75%|
+----+----+-----+
|19.0|70.0|163.0|
+----+----+-----+

