# Week 16
# Spark with Python Examples 
Today we will see examples on how to use PySpark (the Spark API for Python) to handle data. In particular, we will learn how to create RDD objects, perform lazy evaluation, and use data frames to query from data.

## Resilient Distributed Dataset (RDD)

Creating a resilient distributed dataset (RDD) is a common entry point for Spark users. In this section, we will create RDDs using `SparkContext` class and learn its important functions: `map()`, `reduce()`, `filter()`, `collect()`.

In [None]:
from pyspark import SparkContext
import numpy as np
sc = SparkContext(master="local[4]") # 4 means assigning 4 computational cores for this task

In [None]:
# Create a RDD with 20 random integers.
lst = np.random.randint(0, 10, 20)
print(lst)
A = sc.parallelize(lst)

In [None]:
# A is an RDD object
type(A)

In [None]:
# Convert the data back to a regular list.
A.collect()

In [None]:
# A splits the data into 4 pieces.
A.glom().collect()

In [None]:
# Let's see what happens if we assign 2 cores instead.

sc.stop()  # stop current spark environment

sc = SparkContext(master="local[2]")  # Assign 2 cores

A = sc.parallelize(lst)
A.glom().collect()

The `map()` method applies a function to the values contained in one RDD and creates a new RDD with the results.

In [None]:
# We can create a new RDD by squaring the values contained in A. 
B = A.map(lambda x: x * x)  # the mapping is defined by a lambda expression
B.collect()

The `reduce()` method creates a new RDD by aggregating values in an RDD by certain rules.

In [None]:
# Calculate the sum of the data.
A.reduce(lambda x, y: x + y)

In [None]:
np.sum(lst)

In [None]:
# Find the maximum value.
A.reduce(lambda x,y: x if x > y else y)

In [None]:
np.max(lst)

The `filter()` creates a new RDD with data satisfying certain conditions.

In [None]:
# Find all integers in A that are divisible by 3.
A.filter(lambda x: x % 3 ==0).collect()

## Lazy Evaluation

Lazy evaluation is an computation strategy which prepares a detailed step-by-step internal map of the computations, but delays the final execution until when it is absolutely needed. This strategy can allow PySpark to optimize the way computations are executed.

In [None]:
%%time
rdd1 = sc.parallelize(range(100000))

In [None]:
# define a function that takes time to compute.
def hardtask(x):
    [np.cos(j) for j in np.arange(100)]
    return np.cos(x)

Executing `hardtask()` takes a wall time of 6.97 ms on my computer.

In [None]:
%%time
hardtask(2)

Now apply `hardtask()` to all values in `rdd1`.

In [None]:
%%time
result = rdd1.map(lambda x: hardtask(x))

Because of lazy evaluation, nothing was computed yet. Spark only makes a plan of execution.

In [None]:
%%time
result.sum()

Lazy evaluations postpones the execution until it is necessary. It gives room the Spark to optimize the order of computations and improved the execution efficiency.

## Spark Data Frames
A **Spark DataFrame** is a distributed collection of rows under named columns. It is conceptually equivalent to data frames provided by Pandas, but it is constructed upon data formats such as RDDs so that it can handle large amount of data efficiently.

We need to keep in mind that Spark DataFrame is immutable, which means that we can't change a data frame once it is created. In most cases, we need to create a new data frame after applying transformations to an existing one.

In [None]:
import urllib.request
import zipfile

# Download MovieLens 25M dataset
url = "http://files.grouplens.org/datasets/movielens/ml-25m.zip"
urllib.request.urlretrieve(url, 'Data/ml-25m.zip')
with zipfile.ZipFile('Data/ml-25m.zip', "r") as file:
    file.printdir()
    file.extractall('Data')

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc , col, max, struct, mean

`SparkSession()` is an environment introduced in Spark 2.0. It is the main entry point for creating data frames.

In [None]:
# Create a new spark session
spark = SparkSession.builder.appName('spark_app').getOrCreate()

In [None]:
# Import the ratings.csv file:
path = "Data/ml-25m/ratings.csv"
ratings = spark.read.format('csv')\
            .option('inferSchema', True)\  # Let spark decide the data types of each column
            .option('header', True)\       # Use the first row as column names
            .load(path)                    # Specify the source of data
ratings.show()

In [None]:
# "Delete" the timestamp column
ratings = ratings.drop('timestamp')
ratings.show()

In [None]:
# Show data types
ratings.printSchema()

In [None]:
# Shape of the data frame
print(ratings.count(), len(ratings.columns))

In [None]:
# Query 1: Select all ratings of movie 1.
q1 = ratings.select('*').filter(ratings.movieId == 1)
q1.show()

In [None]:
# Query 2: top 10 most-rated movies
ratings_count = ratings.groupby('movieId').agg(count('*').alias('count'))
ratings_count.show()

In [None]:
q2 = ratings_count.orderBy(desc('count')).limit(10)
q2.show()

In [None]:
# Exercise:
# Query 3: find top 10 users with most ratings



Now, let's merge the rating data with the movies data.

In [None]:
path = 'Data/ml-25m/movies.csv'

# Load movies.csv as a Spark data frame named movies




movies.show()

In [None]:
# merge movies with ratings
data = ratings.join(movies, how='inner', on=['movieId'])
data.show()

In [None]:
# Exercise:
# Merge q2 with movies to find out the title of the 10 most-rated movies



In [None]:
# Query 4: Find the average rating of each movie
avg_ratings = data.select('rating', 'title')\
        .groupby('title')\
        .agg(mean('rating').alias('AvgRating'))
avg_ratings.show()

In [None]:
avg_ratings = avg_ratings.orderBy(desc('AvgRating'))
avg_ratings.limit(10).show()

In [None]:
# Query 5: Select movies with at least 50 ratings.
movie_rating_count = data.groupby('title')\
                        .agg(count('*').alias('NumRatings'))\
                        .orderBy(desc('NumRatings'))
movie_rating_count.show()

In [None]:
movie_rating_count = movie_rating_count.filter(movie_rating_count.NumRatings >= 50)
movie_rating_count.show()

In [None]:
# Merge the above data frame with avg_ratings
movie_ratings = movie_rating_count.join(avg_ratings, how='left', on=['title'])
movie_ratings.show(False)

In [None]:
# Exercise:
# Find the top 10 highly-rated movies

