# Spark Session

In [2]:
from pyspark.sql import SparkSession

# Spark session & context
spark = (SparkSession
         .builder
         .master("local")
         .appName("read-postgres")
         # Add postgres jar
         .config("spark.driver.extraClassPath", "/home/jovyan/work/jars/postgresql-9.4.1207.jar")
         .getOrCreate())
sc = spark.sparkContext

# Read Postgres

In [3]:
df_movies = (
    spark.read
    .format("jdbc")
    .option("url", "jdbc:postgresql://host.docker.internal:15432/rainbow_database")
    .option("dbtable", "public.movies")
    .option("user", "unicorn_user")
    .option("password", "magical_password")
    .load()
)

In [4]:
df_ratings = (
    spark.read
    .format("jdbc")
    .option("url", "jdbc:postgresql://host.docker.internal:15432/rainbow_database")
    .option("dbtable", "public.ratings")
    .option("user", "unicorn_user")
    .option("password", "magical_password")
    .load()
)

# Top 10 movies with more ratings

In [9]:
df_movies = df_movies.alias("m")
df_ratings = df_ratings.alias("r")

df_join = df_ratings.join(df_movies, df_ratings.movieId == df_movies.movieId).select("r.*","m.title")

In [10]:
from pyspark.sql import functions as F

df_result = (
    df_join
    .groupBy("title")
    .agg(
        F.count("timestamp").alias("qty_ratings")
        ,F.mean("rating").alias("avg_rating")
    )
    .sort(F.desc("qty_ratings"))
    .limit(10)
)

In [11]:
df_result.coalesce(1).write.format("csv").mode("overwrite").save("/home/jovyan/work/data/output_postgres", header=True)

In [12]:
df_result.show()

KeyboardInterrupt: 

In [None]:
df_result.show()