## Initial Spark Jobs

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

conf = (
    SparkConf()
    .set("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
    .set("spark.hadoop.fs.s3a.access.key", "root")
    .set("spark.hadoop.fs.s3a.secret.key", "root12345")
    .set("spark.hadoop.fs.s3a.path.style.access", True)
    .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .set("spark.hadoop.fs.s3a.multipart.size", "104857600")
    .set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
)

spark = SparkSession.builder.appName('spark-minio').master("spark://spark-master:7077").config(conf=conf).getOrCreate()

If it success, Spark UI is accessible at <http://localhost:4040>

## Read movies Data From Minio and display

In [None]:
movies = spark.read\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .csv("s3a://datasource/movielens/movies.csv")
movies.createOrReplaceTempView("movies")

In [None]:
movies.show()

## Read ratings Data From Minio and display

In [None]:
ratings = spark.read\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .parquet("s3a://datasource/movielens/ratings.parquet")
ratings.createOrReplaceTempView("ratings")

In [None]:
ratings.show()

## Select sparkSQL for find top 100 movies

In [None]:
top_100_movies = spark.sql("""
    SELECT m.title, AVG(r.rating) as avg_rating
    FROM movies m
    LEFT JOIN ratings r ON m.movieId = r.movieID
    GROUP BY title
    HAVING COUNT(*) > 100
    ORDER BY avg_rating DESC
    LIMIT 100
""")

In [None]:
top_100_movies.show()

## Write Result as parquet

In [None]:
top_100_movies.write.mode('overwrite').parquet("s3a://datasource/movielens/results/top_100_movies")

## Read result from parquet

In [None]:
spark.read.parquet("s3a://datasource/movielens/results/top_100_movies").show()

## stop spark jobs

In [None]:
spark.stop()