In [None]:
# Create Spark Session and Spark Context

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark-intro').getOrCreate()
sc = spark.sparkContext

## Create RDDs from Python variables

In [None]:
rdd = sc.parallelize(range(20))

In [None]:
rdd

In [None]:
rdd.first()

In [None]:
rdd.take(2)

In [None]:
rdd.collect()

We can apply functions to each element

In [None]:
def less_than_10(x):
    if x < 10:
        return True
    else:
        return False

In [None]:
# show that it is lazy evaluation
rdd.filter(less_than_10)

In [None]:
rdd.filter(less_than_10).collect()

In [None]:
rdd.filter(less_than_10).count()

In [None]:
rdd.collect()

In [None]:
def square(x):
    return x*x # x**2

In [None]:
rdd.map(square)

In [None]:
rdd.map(square).collect()

In [None]:
def multiple_of_10(x):
    if x % 10 == 0:
        return True
    else:
        return False

In [None]:
rdd.map(square).filter(multiple_of_10).collect()

## Read from HDFS

In [None]:
sotu_rdd = sc.textFile('datasets/shakespeare.txt')

In [None]:
sotu_rdd.id()

In [None]:
sotu_rdd.first()

In [None]:
sotu_rdd.take(10)

1- Check how many times the word `love` appears

In [None]:
def count_love(line):
    return ??

In [None]:
sotu_rdd.map(count_love).take(10)

In [None]:
sotu_rdd.map(count_love).sum()

In [None]:
def has_love(line):
    # should return True if line has word `love`, and False otherwise
    return ??

In [None]:
sotu_rdd.filter(has_love).take(3)

# My first map reduce job

The classic mapreduce paradigm can be accomplished by using `map` or `flatMap` (if multiple key-value pairs are generated) and `reduceByKey`.

In [None]:
example_dataset = [
['JAN', 'NY', 3.],
['JAN', 'PA', 1.],
['JAN', 'NJ', 2.],
['JAN', 'CT', 4.],
['FEB', 'PA', 1.],
['FEB', 'NJ', 1.],
['FEB', 'NY', 2.],
['FEB', 'VT', 1.],
['MAR', 'NJ', 2.],
['MAR', 'NY', 1.],
['MAR', 'VT', 2.],
['MAR', 'PA', 3.]]

In [None]:
dataset_rdd = sc.parallelize(example_dataset)

## Compute the number of orders per month

1- Generate key-value pairs

In [None]:
dataset_rdd

In [None]:
def map_func(row):
    return [row[0], row[2]]

In [None]:
dataset_rdd.map(map_func).take(5)

2- Reduce to count the number of orders per month

In [None]:
def reduce_func(value1, value2):
    return value1 + value2

In [None]:
dataset_rdd.map(map_func).reduceByKey(reduce_func).collect()

## Compute the average number of orders per month

In [None]:
dataset_rdd.take(4)

In [None]:
def avg_map_func(row):
    return ??

In [None]:
def avg_reduce_func(value1, value2):
    return [
        ??
        ,
        ??
    ]

In [None]:
dataset_rdd.map(avg_map_func).reduceByKey(avg_reduce_func).collect()

In [None]:
dataset_rdd.collect()

## Count the frequency of words appearing in the Shakespeare sonets

## Explore the effect of caching in RAM

## Try again the count from before

# Spark 2.0

You can create `DataFrames` programatically

In [None]:
from pyspark.sql import Row

In [None]:
raw_data = [Row(state='NY', month='JAN', orders=3),
            Row(state='NJ', month='JAN', orders=4),
            Row(state='NY', month='FEB', orders=5),
           ]

In [None]:
raw_data

In [None]:
data_df = spark.createDataFrame(raw_data)

In [None]:
data_df

In [None]:
data_df.printSchema()

In [None]:
data_df.show()

In [None]:
raw_data2 = [Row(state='NY', month='MAR', orders=10),
             Row(state='NJ', month='MAR', orders=3),
             Row(state='NY', month='APR', orders=1),
           ]

In [None]:
data_df2 = spark.createDataFrame(raw_data2)

You can merge them:

In [None]:
all_data_df = data_df.union(data_df2)

In [None]:
all_data_df.show()

Or you can also display with Pandas

In [None]:
# make sure you limit first
all_data_df.limit(10).toPandas()

or `display` in DataBricks format

In [None]:
display(all_data_df)

### Access columns

In [None]:
all_data_df['month']

In [None]:
all_data_df.month

In [None]:
all_data_df['month'] + 1

### Selections

In [None]:
condition_month_jan = (all_data_df['month'] == "JAN")

In [None]:
condition_month_jan

In [None]:
all_data_df.where(condition_month_jan)

In [None]:
all_data_df[condition_month_jan]

In [None]:
all_data_df[condition_month_jan].show()

The conditions are symbolic objects

In [None]:
(all_data_df['month']  == 'MAR') & (all_data_df['orders'] > 5)

In [None]:
all_data_df[(all_data_df['month']  == 'MAR') & (all_data_df['orders'] > 5)].show()

You can create new columns

In [None]:
all_data_df.show()

In [None]:
all_data_df['orders'] + 1

In [None]:
all_data_df.withColumn('order_plus_1', all_data_df['orders'] + 1).printSchema()

In [None]:
all_data_df.withColumn('order_plus_1', all_data_df['orders'] + 1).show()

You can perform some basic grouping operations

In [None]:
all_data_df.groupBy('month')

In [None]:
all_data_df.groupBy('month').count()

In [None]:
all_data_df.groupBy('month').count().show()

You can order by a certain column or group of columns

In [None]:
all_data_df.orderBy('orders').show()

In [None]:
all_data_df.orderBy('orders', ascending=False).show()

You can register as tables and perform SQL

In [None]:
all_data_df.registerTempTable('orders')

In [None]:
spark.sql('select count(*) from orders')

In [None]:
spark.sql('select count(*) from orders').show()

The `DataFrame` object can read from multiple sources.

In [None]:
%ls /datasets/

In [None]:
spotify_df = spark.read.csv('datasets/spotify_songs.csv', 
                             header=True, 
                             inferSchema=True)

In [None]:
spotify_df.printSchema()