# Tutorial 3: Spark Programming

Credits and Referecens:

- Learning Spark by Holden Karau, Andy Konwinski, Patrick Wendell, and Matei Zaharia (O’Reilly). Copyright 2015 Databricks, 978-1-449-35862-4.
- Spark: The Definitive Guide by Bill Chambers and Matei Zaharia (O’Reilly). Copyright 2018 Databricks, Inc., 978-1-491-91221-8.”

## 1.- Getting started: The SparkSession and the Spark UI

In [1]:
import findspark
# Change the path to the Spark folder accordingly
findspark.init(spark_home="/home/ubuntu/software/spark-2.2.1-bin-hadoop2.7/")

In [2]:
data_folder = "/home/ubuntu/movielens_v2/movielens/"

#### Now we can import pyspark

In [3]:
import pyspark
import numpy as np # We'll be using numpy for some numeric operations
import os

In [4]:
# pyspark provides a SparkContext
sc = pyspark.SparkContext(master="local[*]", appName="tour")
# Now you can go to http://localhost:4040/ and see the Spark UI!
# Try re-running this line

In [5]:
# To try the SparkContext with other masters first stop the one that is already running
sc.stop()

- **local**: Run Spark locally with one worker thread (i.e. no parallelism at all).
- **local[K]**: Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
- **local[*]**: Run Spark locally with as many worker threads as logical cores on your machine.
- **spark://HOST:PORT**: Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default.
- **mesos://HOST:PORT**: Connect to the given Mesos cluster. The port must be whichever one your is configured to use, which is 5050 by default. Or, for a Mesos cluster using ZooKeeper, use mesos://zk://.... To submit with --deploy-mode cluster, the HOST:PORT should be configured to connect to the MesosClusterDispatcher.
- **yarn**: Connect to a YARN cluster in client or cluster mode depending on the value of --deploy-mode. The cluster location will be found based on the HADOOP_CONF_DIR or YARN_CONF_DIR variable.
- **yarn-client**: Equivalent to yarn with --deploy-mode client, which is preferred to `yarn-client`
- **yarn-cluster**: Equivalent to yarn with --deploy-mode cluster, which is preferred to `yarn-cluster`

## Creating RDDS

We saw that we can create RDDs by loading files from disk. We can also create RDDs from Python collections or transforming other RDDs.

In [132]:
help(sc.parallelize)

Help on method parallelize in module pyspark.context:

parallelize(c, numSlices=None) method of pyspark.context.SparkContext instance
    Distribute a local Python collection to form an RDD. Using xrange
    is recommended if the input represents a range for performance.
    
    >>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
    [[0], [2], [3], [4], [6]]
    >>> sc.parallelize(xrange(0, 6, 2), 5).glom().collect()
    [[], [0], [], [2], [4]]



In [133]:
# Creating an RDD from in-memory objects:
l_numbers = np.arange(0,100000)
numbers = sc.parallelize(l_numbers) # creation of RDD

In [142]:
ratings = sc.textFile(os.path.join(data_folder, "ratings.csv")).filter(lambda x: "movie_id" not in x) # load data from a file

## RDD Transformations and Actions

There are two types of RDD operations in Spark: **transformations** and **actions**.
- Transfromations: Create new RDDs from other RDDs. 
- Actions: Extract information from RDDs and return it to the driver program.

### Transformations

In [143]:
help(ratings.map)

Help on method map in module pyspark.rdd:

map(f, preservesPartitioning=False) method of pyspark.rdd.PipelinedRDD instance
    Return a new RDD by applying a function to each element of this RDD.
    
    >>> rdd = sc.parallelize(["b", "a", "c"])
    >>> sorted(rdd.map(lambda x: (x, 1)).collect())
    [('a', 1), ('b', 1), ('c', 1)]



In [144]:
ratings_splitted = ratings.map(lambda x: x.split(","))

In [145]:
ratings_splitted

PythonRDD[66] at RDD at PythonRDD.scala:48

### Actions

In [146]:
help(ratings.take)

Help on method take in module pyspark.rdd:

take(num) method of pyspark.rdd.PipelinedRDD instance
    Take the first num elements of the RDD.
    
    It works by first scanning one partition, and use the results from
    that partition to estimate the number of additional partitions needed
    to satisfy the limit.
    
    Translated from the Scala implementation in RDD#take().
    
    .. note:: this method should only be used if the resulting array is expected
        to be small, as all the data is loaded into the driver's memory.
    
    >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
    [2, 3]
    >>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
    [2, 3, 4, 5, 6]
    >>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)
    [91, 92, 93]



In [147]:
help(ratings.collect)

Help on method collect in module pyspark.rdd:

collect() method of pyspark.rdd.PipelinedRDD instance
    Return a list that contains all of the elements in this RDD.
    
    .. note:: This method should only be used if the resulting array is expected
        to be small, as all the data is loaded into the driver's memory.



In [148]:
help(ratings.count)

Help on method count in module pyspark.rdd:

count() method of pyspark.rdd.PipelinedRDD instance
    Return the number of elements in this RDD.
    
    >>> sc.parallelize([2, 3, 4]).count()
    3



In [149]:
ratings_splitted_top = ratings_splitted.take(5)

In [150]:
ratings_splitted_res = ratings_splitted.collect()

In [151]:
ratings_count = ratings_splitted.count()

In [152]:
ratings_count

1000209

## Lambda expressions

[Lambda expressions](https://docs.python.org/3.5/howto/functional.html#small-functions-and-the-lambda-expression) are an easy way to write short functions in Python.

In [153]:
f = lambda line: 'Spark' in line

In [154]:
f("we are learning park")

False

In [155]:
def f(line):
    return 'Spark' in line
f("we are learning Spark")

True

#### Let's try to get the zombie movies

In [156]:
dbpedia_movies = sc.textFile(os.path.join(data_folder, "dbpedia.csv")) # load data

In [157]:
# count only lines that mention "Spark"
zombie_movies= dbpedia_movies.filter(lambda line: 'zombie' in line).map(lambda x: x.split(",")[1])

In [158]:
zombie_movies

PythonRDD[71] at RDD at PythonRDD.scala:48

In [159]:
zombie_movies.collect()

['Last Dance (1996)',
 'Braindead (1992)',
 'House (1986)',
 'Night of the Comet (1984)',
 'Phantasm (1979)',
 'Night of the Creeps (1986)']

## Lazy evaluation

RDDs are **evaluated lazily**. This means that Spark will not materialize an RDD until it has to perform an action. In the example below, `primesRDD` is not evaluated until action `collect()` is performed on it.

In [160]:
def is_prime(num):
    """ return True if num is prime, False otherwise"""
    if num < 1 or num % 1 != 0:
        raise Exception("invalid argument")
    for d in range(2, int(np.sqrt(num) + 1)):
        if num % d == 0:
            return False
    return True

In [161]:
numbersRDD = sc.parallelize(range(1, 1000000)) # creation of RDD
primesRDD = numbersRDD.filter(is_prime) # transformation

In [162]:
# primesRDD has not been materialized until this point
primes = primesRDD.collect() # action

In [163]:
print(primes[0:15])
print(primesRDD.take(15))

[1, 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43]
[1, 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43]


## Persistence

RDDs are **ephemeral** by default, i.e. there is no guarantee they will remain in memory after they are materialized. If we want them to `persist` in memory, possibly to query them repeatedly or use them in multiple operations, we can ask Spark to do this by calling `persist()` on them.

In [164]:
primesRDD_persisted = numbersRDD.filter(is_prime).persist() # transformation # we're asking Spark to keep this RDD in memory. Note that cache is the same but as using persist for memory. However, persist allows you to define other types of storage

In [165]:
print("Found", primesRDD_persisted.count(), "prime numbers") # first action -- causes primesRDD_persisted to be materialized
print("Here are some of them:")

Found 78499 prime numbers
Here are some of them:


In [166]:
print(primesRDD_persisted.collect()[0:20]) # second action - RDD is already in memory

[1, 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67]


How long does it take to collect `primesRDD`? Let's time the operation.

In [167]:
%%timeit
primes = primesRDD.collect()

1.87 s ± 69.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


It took about 1.8s. That's because Spark had to evaluate `primesRDD` before performing `collect` on it.

How long would it take if `primesRDD_persisted` was already in memory?

In [169]:
%%timeit
primes = primesRDD_persisted.collect()

20 ms ± 2.18 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


It took about 20ms to collect `primesRDD_persisted`!

***

## map and flatmap

In [174]:
words = sc.textFile(os.path.join(data_folder, "dbpedia.csv")).filter(lambda x: "movie_id" not in x)

words_map = words.map(lambda phrase: phrase.split(" "))

l_words = words_map.collect() # This returns a list of lists

In [175]:
l_words[1][0:10]

['3,Grumpier',
 'Old',
 'Men',
 '(1995),http://dbpedia.org/resource/Grumpier_Old_Men,http://dbpedia.org/data/Grumpier_Old_Men.json,"{""abstract"":',
 '""Grumpier',
 'Old',
 'Men',
 'is',
 'a',
 '1995']

In [176]:
words_flatmap = words.flatMap(lambda phrase: phrase.split(" "))
words_flatmap.collect()[0:10] # This returns a list withe the combined elements of the list

['2,Jumanji',
 '(1995),http://dbpedia.org/resource/Jumanji,http://dbpedia.org/data/Jumanji.json,"{""abstract"":',
 '""Jumanji',
 'is',
 'a',
 '1995',
 'American',
 'family',
 'adventure',
 'film']

In [178]:
# We can use the flatmap to make a word count
words_flatmap.map(
    lambda x: (x,1)
).reduceByKey(
    lambda x,y: x+y
).collect()

[('(1995),http://dbpedia.org/resource/Jumanji,http://dbpedia.org/data/Jumanji.json,"{""abstract"":',
  1),
 ('is', 6302),
 ('1995', 316),
 ('American', 2176),
 ('family', 156),
 ('It', 2501),
 ('an', 1377),
 ('adaptation', 190),
 ('of', 9920),
 ('1981', 64),
 ('book', 198),
 ('name', 333),
 ('Chris', 122),
 ('Allsburg.', 1),
 ('The', 7332),
 ('was', 5728),
 ('Greg', 19),
 ('Jonathan', 66),
 ('Jim', 131),
 ('Dunst,', 5),
 ('Pierce,', 5),
 ('Alan', 122),
 ('Hyde,', 3),
 ('Bebe', 6),
 ('Industrial', 15),
 ('Magic', 20),
 ('Amalgamated', 2),
 ('Dynamics', 2),
 ('animatronics', 3),
 ('dedicated', 32),
 ('visual', 45),
 ('L.', 113),
 ('before', 247),
 ("film's", 509),
 ('release.', 53),
 ('story', 687),
 ('centers', 63),
 ('young', 201),
 ('Parrish,', 1),
 ('in', 7662),
 ('board', 12),
 ('his', 1652),
 ('best', 175),
 ('Whittle', 1),
 ('years', 197),
 ('Judy', 16),
 ('unwittingly', 7),
 ('now-adult', 1),
 ('Alan.', 1),
 ('tracking', 9),
 ('down', 59),
 ('resolve', 1),
 ('finish', 6),
 ('reve

***

## Set operations

In [181]:
oneRDD = sc.parallelize([1, 1, 1, 2, 3, 3, 4, 4])
oneRDD.persist()

ParallelCollectionRDD[97] at parallelize at PythonRDD.scala:489

In [182]:
otherRDD = sc.parallelize([1, 4, 4, 7])
otherRDD.persist()

ParallelCollectionRDD[98] at parallelize at PythonRDD.scala:489

In [183]:
unionRDD = oneRDD.union(otherRDD)
unionRDD.persist()

UnionRDD[99] at union at NativeMethodAccessorImpl.java:0

In [184]:
oneRDD.subtract(otherRDD).collect()

[2, 3, 3]

In [185]:
oneRDD.distinct().collect()

[1, 2, 3, 4]

In [186]:
oneRDD.intersection(otherRDD).collect() # removes duplicates

[1, 4]

In [187]:
oneRDD.cartesian(otherRDD).collect()[:5]

[(1, 1), (1, 4), (1, 4), (1, 7), (1, 1)]

***

## reduce

In [188]:
np.sum([1,43,62,23,52])

181

In [189]:
data = sc.parallelize([1,43,62,23,52])
data.reduce(lambda x, y: x + y)

181

In [190]:
data.reduce(lambda x, y: x * y)

3188536

In [191]:
1 * 43 * 62 * 23 * 52

3188536

In [64]:
data.reduce(lambda x, y: x**2 + y**2) # this does NOT compute the sum of squares of RDD elements

137823683725010149883130929

In [137]:
((((1 ** 2 + 43 ** 2) ** 2 + 62 ** 2) **2 + 23 ** 2) **2 + 52 **2)

137823683725010149883130929

In [100]:
data.reduce(lambda x, y: np.sqrt(x**2 + y**2)) ** 2

285.00000000000006

In [101]:
np.sum(np.array([1,43,62,23,52]) ** 2)

8927

***

## aggregate

In [65]:
help(data.aggregate)

Help on method aggregate in module pyspark.rdd:

aggregate(zeroValue, seqOp, combOp) method of pyspark.rdd.RDD instance
    Aggregate the elements of each partition, and then the results for all
    the partitions, using a given combine functions and a neutral "zero
    value."
    
    The functions C{op(t1, t2)} is allowed to modify C{t1} and return it
    as its result value to avoid object allocation; however, it should not
    modify C{t2}.
    
    The first function (seqOp) can return a different result type, U, than
    the type of this RDD. Thus, we need one operation for merging a T into
    an U and one operation for merging two U
    
    >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
    >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
    >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
    (10, 4)
    >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
    (0, 0)



In [123]:
def seq(x,y):
    print(x,y,"seq")
    return x[0] + y, x[1] + 1

In [124]:
def comb(x,y):
    print(x,y,"comb")
    return x[0] + y[0], x[1] + y[1]

In [125]:
np.sum([1,43,62,23,52])

181

In [133]:
data = sc.parallelize([1,43,62,23,52], 1) # Try different levels of paralellism
aggr = data.aggregate(zeroValue = (0,0),
                      seqOp = seq, #
                      combOp = comb)

(0, 0) (181, 5) comb


In [110]:
aggr

(181, 5)

In [75]:
aggr[0] / aggr[1] # average value of RDD elements

36.2

***

## reduceByKey

In [189]:
help(pairRDD.reduceByKey)

Help on method reduceByKey in module pyspark.rdd:

reduceByKey(func, numPartitions=None, partitionFunc=<function portable_hash at 0x000001E7C579D378>) method of pyspark.rdd.RDD instance
    Merge the values for each key using an associative reduce function.
    
    This will also perform the merging locally on each mapper before
    sending results to a reducer, similarly to a "combiner" in MapReduce.
    
    Output will be partitioned with C{numPartitions} partitions, or
    the default parallelism level if C{numPartitions} is not specified.
    Default partitioner is hash-partition.
    
    >>> from operator import add
    >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
    >>> sorted(rdd.reduceByKey(add).collect())
    [('a', 2), ('b', 1)]



In [76]:
pairRDD = sc.parallelize([('$APPL', 100.64), 
                          ('$APPL', 100.52), 
                          ('$GOOG', 706.2), 
                          ('$AMZN', 552.32), 
                          ('$AMZN', 552.32) ])

pairRDD.reduceByKey(lambda x,y: x + y).collect() # sum of values per key

[('$APPL', 201.16), ('$AMZN', 1104.64), ('$GOOG', 706.2)]

From https://github.com/vaquarkhan/vk-wiki-notes/wiki/reduceByKey--vs-groupBykey-vs-aggregateByKey-vs-combineByKey

ReduceByKey will aggregate y key before shuffling: 
![alt text](https://camo.githubusercontent.com/516114b94193cddf7e59bdd5368d6756d30dc8b4/687474703a2f2f7777772e727578697a68616e672e636f6d2f75706c6f6164732f342f342f302f322f34343032333436352f313836363838325f6f7269672e706e67)

GroupByKey will shuffle all the value key pairs as the diagrams show: 
![alt text](https://camo.githubusercontent.com/ed75baabdaee2198d3fc1390e04a5d20bcd2e484/687474703a2f2f7777772e727578697a68616e672e636f6d2f75706c6f6164732f342f342f302f322f34343032333436352f333030393135315f6f7269672e706e67)

## (inner) join

In [192]:
movies = sc.textFile(os.path.join(data_folder, "movies.csv")).filter(lambda x: "movie_id" not in x).map(lambda x: x.split(","))
ratings = sc.textFile(os.path.join(data_folder, "ratings.csv")).filter(lambda x: "movie_id" not in x).map(lambda x: x.split(",")[1:3])

In [193]:
ratings.take(10)

[['1193', '5'],
 ['661', '3'],
 ['914', '3'],
 ['3408', '4'],
 ['2355', '5'],
 ['1197', '3'],
 ['1287', '5'],
 ['2804', '5'],
 ['594', '4'],
 ['919', '4']]

In [194]:
movies.join(ratings).take(5)

[('4', ('Waiting to Exhale (1995)', '3')),
 ('4', ('Waiting to Exhale (1995)', '3')),
 ('4', ('Waiting to Exhale (1995)', '1')),
 ('4', ('Waiting to Exhale (1995)', '4')),
 ('4', ('Waiting to Exhale (1995)', '1'))]

***

## Accumulators

This example demonstrates how to use accumulators.
The map operations creates an RDD that contains the length of lines in the text file - and while the RDD is materialized, an accumulator keeps track of how many lines are long (longer than $30$ characters).

In [143]:
text = sc.textFile(os.path.join(data_folder, "dbpedia.csv"))
long_lines = sc.accumulator(0) # create accumulator

def line_data(line):
    global long_lines # to reference an accumulator, declare it as global variable
    length = len(line)
    if "abstract" in line:
        long_lines += 1 # update the accumulator
    return length

llengthRDD = text.map(line_data)
llengthRDD.count()

3267

In [144]:
long_lines.value # this is how we obtain the value of the accumulator in the driver program

3266

In [145]:
help(long_lines)

Help on Accumulator in module pyspark.accumulators object:

class Accumulator(builtins.object)
 |  A shared variable that can be accumulated, i.e., has a commutative and associative "add"
 |  operation. Worker tasks on a Spark cluster can add values to an Accumulator with the C{+=}
 |  operator, but only the driver program is allowed to access its value, using C{value}.
 |  Updates from the workers get propagated automatically to the driver program.
 |  
 |  While C{SparkContext} supports accumulators for primitive data types like C{int} and
 |  C{float}, users can also define accumulators for custom types by providing a custom
 |  L{AccumulatorParam} object. Refer to the doctest of this module for an example.
 |  
 |  Methods defined here:
 |  
 |  __iadd__(self, term)
 |      The += operator; adds a term to this accumulator's value
 |  
 |  __init__(self, aid, value, accum_param)
 |      Create a new Accumulator with a given initial value and AccumulatorParam object
 |  
 |  __reduce

### Warning

In the example above, we update the value of an accumulator within a transformation (map). This is **not recommended**, unless for debugging purposes! The reason is that, if there are failures during the materialization of `llengthRDD`, some of its partitions will be re-computed, possibly causing the accumulator to double-count some the the long lines.

It is advisable to use accumulators within actions - and particularly with the `foreach` action, as demonstrated below.

In [148]:
text = sc.textFile(os.path.join(data_folder, "dbpedia.csv"))
long_lines_2 = sc.accumulator(0)

def line_len(line):
    global long_lines_2
    length = len(line)
    if length > 30:
        long_lines_2 += 1

text.foreach(line_len)

long_lines_2.value

3267

## Broadcast variable

We use *broadcast variables* when many operations depend on the same large static object - e.g., a large lookup table that does not change but provides information for other operations. In such cases, we can make a broadcast variable out of the object and thus make sure that the object will be shipped to the cluster only once - and not for each of the operations we'll be using it for.

The example below demonstrates the usage of broadcast variables. In this case, we make a broadcast variable out of a dictionary that represents an address table. The tablke is shipped to cluster nodes only once across multiple operations.

In [196]:
def load_ages_catalog():
    return {1: "Under 18", 18: "18-24", 25: "25-34", 35: "35-44", 45: "45-49", 50: "50-55", 56: "56+"}
ages_catalog = sc.broadcast(load_ages_catalog())

In [197]:
def find_age(age_id):
    res = None
    if age_id in ages_catalog.value:
        res = ages_catalog.value[age_id]
    return res

ages = sc.parallelize([1,18,50])
pairRDD = ages.map(lambda age_id: (age_id, find_age(age_id)))
print(pairRDD.collect())

other_ages = sc.parallelize([35, 50, 1])
pairRDD = other_ages.map(lambda age_id: (age_id, find_age(age_id)))
print(pairRDD.collect())

[(1, 'Under 18'), (18, '18-24'), (50, '50-55')]
[(35, '35-44'), (50, '50-55'), (1, 'Under 18')]


## High-level structured

In [32]:
sc.textFile(os.path.join(data_folder, "users.csv")).take(5)

['user_id,gender,age,occupation,zip_code',
 '1,F,1,10,48067',
 '2,M,56,16,70072',
 '3,M,25,15,55117',
 '4,M,45,7,02460']

In [None]:
.filter(lambda x: "zombie" in x)

In [98]:
ratings_fr = sc.textFile(os.path.join(data_folder, "ratings.csv")).map(lambda x: (x.split(",")[0], x.split(",")[1],1))
f_users = sc.textFile(os.path.join(data_folder, "users.csv")).map(lambda x: (x.split(",")[0],x.split(",")[1])).filter(lambda x: x[1] == "F")
ratings_fr_res = ratings_fr.join(f_users).map(lambda x: (x[1][0], 1)).reduceByKey(lambda x,y: x + y).collect()

In [99]:
ratings_rf = sc.textFile(os.path.join(data_folder, "ratings.csv")).map(lambda x: (x.split(",")[0], x.split(",")[1],1))
users = sc.textFile(os.path.join(data_folder, "users.csv")).map(lambda x: (x.split(",")[0],x.split(",")[1]))
ratings_rf_res = ratings_rf.join(users).filter(lambda x: x[1][1] == "F").map(lambda x: (x[1][0], 1)).reduceByKey(lambda x,y: x + y).collect()

## What happened? Let's find out in Spark UI

- Working with RDD allows developers to have more freedom. 
- However, this is not recommended. There are new high-lever structured API that optimize many steps of the data transformations. 
- In general, it is pointless trying to beat a query optimizer.

In [66]:
from pyspark import sql

In [67]:
sql_sc = sql.SparkSession(sparkContext=sc)

In [68]:
ratings_file = os.path.join(data_folder, "ratings.csv")
ratings = sql_sc.read.option("inferSchema", "true").option("header", "true").csv(ratings_file)

In [69]:
ratings.createOrReplaceTempView("ratings")

In [75]:
users_file = os.path.join(data_folder, "users.csv")
users = sql_sc.read.option("inferSchema", "true").option("header", "true").csv(users_file)

In [76]:
users.createOrReplaceTempView("users")

In [92]:
sqlWay = sql_sc.sql("""
SELECT r.movie_id, count(r.rating)
FROM ratings r
INNER JOIN users u on r.user_id = u.user_id
WHERE u.gender = 'F'
GROUP BY r.movie_id
""")

In [93]:
dataFrameWay = ratings.join(users, ratings.user_id == users.user_id).filter(users.gender == 'F') \
  .groupBy(ratings.movie_id).agg({"rating": "count"})

In [94]:
%timeit sqlWay.collect()

394 ms ± 47.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [95]:
%timeit dataFrameWay.collect()

355 ms ± 38.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [96]:
sqlWay.explain()

== Physical Plan ==
*HashAggregate(keys=[movie_id#403], functions=[count(rating#404)])
+- Exchange hashpartitioning(movie_id#403, 200)
   +- *HashAggregate(keys=[movie_id#403], functions=[partial_count(rating#404)])
      +- *Project [movie_id#403, rating#404]
         +- *BroadcastHashJoin [user_id#402], [user_id#450], Inner, BuildRight
            :- *Project [user_id#402, movie_id#403, rating#404]
            :  +- *Filter isnotnull(user_id#402)
            :     +- *FileScan csv [user_id#402,movie_id#403,rating#404] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/ubuntu/movielens_v2/movielens/ratings.csv], PartitionFilters: [], PushedFilters: [IsNotNull(user_id)], ReadSchema: struct<user_id:int,movie_id:int,rating:int>
            +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
               +- *Project [user_id#450]
                  +- *Filter ((isnotnull(gender#451) && (gender#451 = F)) && isnotnull(user_id#450))
 

In [97]:
dataFrameWay.explain()

== Physical Plan ==
*HashAggregate(keys=[movie_id#403], functions=[count(rating#404)])
+- Exchange hashpartitioning(movie_id#403, 200)
   +- *HashAggregate(keys=[movie_id#403], functions=[partial_count(rating#404)])
      +- *Project [movie_id#403, rating#404]
         +- *BroadcastHashJoin [user_id#402], [user_id#450], Inner, BuildRight
            :- *Project [user_id#402, movie_id#403, rating#404]
            :  +- *Filter isnotnull(user_id#402)
            :     +- *FileScan csv [user_id#402,movie_id#403,rating#404] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/ubuntu/movielens_v2/movielens/ratings.csv], PartitionFilters: [], PushedFilters: [IsNotNull(user_id)], ReadSchema: struct<user_id:int,movie_id:int,rating:int>
            +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
               +- *Project [user_id#450]
                  +- *Filter ((isnotnull(gender#451) && (gender#451 = F)) && isnotnull(user_id#450))
 