This notebook will show, how to:
- Connect to a spark cluster (master node)
- Read in a file located in HDFS
- Convert that file into a dataframe
- Perform an aggregation on that dataframe

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions

In [2]:
# Spark session & context
spark = SparkSession.builder.master('spark://spark-master:7077').getOrCreate()
sc = spark.sparkContext

21/11/11 12:42:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# Function to parse each row of the movie file
def parse_movies(movie):
    fields = movie.split('\t')
    return Row(movie_id=int(fields[0]), rating=float(fields[2]))

In [4]:
# Link to the movie data location
movie_file = sc.textFile("hdfs://namenode:8020/user/root/playground/ml-100k/u.data")

# Convert it to a RDD of Row objects with (movieID, rating)
movie_rdd = movie_file.map(parse_movies)

# Convert that to a DataFrame
movie_df = spark.createDataFrame(movie_rdd)

movie_df.show(5)

[Stage 1:>                                                          (0 + 1) / 1]

+--------+------+
|movie_id|rating|
+--------+------+
|     196|   3.0|
|     186|   3.0|
|      22|   1.0|
|     244|   2.0|
|     166|   1.0|
+--------+------+
only showing top 5 rows



                                                                                

In [5]:
# Compute average rating for each movie_id
avg_rating = movie_df.groupBy("movie_id").avg("rating")
avg_rating.show(5)

                                                                                

+--------+------------------+
|movie_id|       avg(rating)|
+--------+------------------+
|      26|  2.94392523364486|
|     474|  4.08256880733945|
|      29|3.6470588235294117|
|     191|3.6296296296296298|
|      65|            3.9375|
+--------+------------------+
only showing top 5 rows



In [6]:
# Stop the session
spark.stop()