In [None]:
# Create Spark Session and Spark Context

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark-intro').getOrCreate()
sc = spark.sparkContext

## Create RDDs from Python variables

In [3]:
rdd = sc.parallelize(range(20))

In [4]:
rdd

PythonRDD[1] at RDD at PythonRDD.scala:53

In [6]:
rdd.first()

0

In [10]:
rdd.take(2)

[0, 1]

In [8]:
rdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

We can apply functions to each element

In [26]:
def less_than_10(x):
    if x < 10:
        return True
    else:
        return False

In [27]:
# show that it is lazy evaluation
rdd.filter(less_than_10)

PythonRDD[106] at RDD at PythonRDD.scala:53

In [28]:
rdd.filter(less_than_10).collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [29]:
rdd.filter(less_than_10).count()

10

In [30]:
rdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

In [31]:
def square(x):
    return x*x # x**2

In [34]:
rdd.map(square)

PythonRDD[111] at RDD at PythonRDD.scala:53

In [35]:
rdd.map(square).collect()

[0,
 1,
 4,
 9,
 16,
 25,
 36,
 49,
 64,
 81,
 100,
 121,
 144,
 169,
 196,
 225,
 256,
 289,
 324,
 361]

In [36]:
def multiple_of_10(x):
    if x % 10 == 0:
        return True
    else:
        return False

In [37]:
rdd.map(square).filter(multiple_of_10).collect()

[0, 100]

## Read from HDFS

In [38]:
sotu_rdd = sc.textFile('datasets/shakespeare.txt')

In [39]:
sotu_rdd.id()

115

In [40]:
sotu_rdd.first()

'The Project Gutenberg EBook of The Complete Works of William Shakespeare, by '

In [41]:
sotu_rdd.take(10)

['The Project Gutenberg EBook of The Complete Works of William Shakespeare, by ',
 'William Shakespeare',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever.  You may copy it, give it away or',
 're-use it under the terms of the Project Gutenberg License included',
 'with this eBook or online at www.gutenberg.org',
 '',
 '** This is a COPYRIGHTED Project Gutenberg eBook, Details Below **',
 '**     Please follow the copyright guidelines in this file.     **']

1- Check how many times the word `love` appears

In [None]:
def count_love(line):
    return ??

In [None]:
sotu_rdd.map(count_love).take(10)

In [None]:
sotu_rdd.map(count_love).sum()

In [None]:
def has_love(line):
    # should return True if line has word `love`, and False otherwise
    return ??

In [None]:
sotu_rdd.filter(has_love).take(3)

# My first map reduce job

The classic mapreduce paradigm can be accomplished by using `map` or `flatMap` (if multiple key-value pairs are generated) and `reduceByKey`.

In [46]:
import pyspark

In [47]:
example_dataset = [
['JAN', 'NY', 3.],
['JAN', 'PA', 1.],
['JAN', 'NJ', 2.],
['JAN', 'CT', 4.],
['FEB', 'PA', 1.],
['FEB', 'NJ', 1.],
['FEB', 'NY', 2.],
['FEB', 'VT', 1.],
['MAR', 'NJ', 2.],
['MAR', 'NY', 1.],
['MAR', 'VT', 2.],
['MAR', 'PA', 3.]]

In [48]:
dataset_rdd = sc.parallelize(example_dataset)

## Compute the number of orders per month

1- Generate key-value pairs

In [49]:
dataset_rdd

ParallelCollectionRDD[118] at readRDDFromFile at PythonRDD.scala:262

In [52]:
def map_func(row):
    return [row[0], row[2]]

In [53]:
dataset_rdd.map(map_func).take(5)

[['JAN', 3.0], ['JAN', 1.0], ['JAN', 2.0], ['JAN', 4.0], ['FEB', 1.0]]

2- Reduce to count the number of orders per month

In [54]:
def reduce_func(value1, value2):
    return value1 + value2

In [55]:
dataset_rdd.map(map_func).reduceByKey(reduce_func).collect()

[('JAN', 10.0), ('FEB', 5.0), ('MAR', 8.0)]

## Compute the average number of orders per month

In [16]:
dataset_rdd.take(4)

[['JAN', 'NY', 3.0],
 ['JAN', 'PA', 1.0],
 ['JAN', 'NJ', 2.0],
 ['JAN', 'CT', 4.0]]

In [68]:
def avg_map_func(row):
    month,state,orders = row
    return month,[orders,1]
dataset_rdd.map(avg_map_func).take(5)

[('JAN', [3.0, 1]),
 ('JAN', [1.0, 1]),
 ('JAN', [2.0, 1]),
 ('JAN', [4.0, 1]),
 ('FEB', [1.0, 1])]

In [73]:
def avg_reduce_func(value1, value2):
    a1,n1 = value1
    a2,n2 = value2
    n_new = n1+n2
    a_new = (a1*n1+a2*n2)/(n1+n2)
    return [a_new,n_new]

In [74]:
dataset_rdd.map(avg_map_func).reduceByKey(avg_reduce_func).collect()

[('JAN', [2.5, 4]), ('FEB', [1.25, 4]), ('MAR', [2.0, 4])]

In [59]:
dataset_rdd.collect()

[['JAN', 'NY', 3.0],
 ['JAN', 'PA', 1.0],
 ['JAN', 'NJ', 2.0],
 ['JAN', 'CT', 4.0],
 ['FEB', 'PA', 1.0],
 ['FEB', 'NJ', 1.0],
 ['FEB', 'NY', 2.0],
 ['FEB', 'VT', 1.0],
 ['MAR', 'NJ', 2.0],
 ['MAR', 'NY', 1.0],
 ['MAR', 'VT', 2.0],
 ['MAR', 'PA', 3.0]]

## Count the frequency of words appearing in the Shakespeare sonets

In [89]:
sotu_rdd = sc.textFile('datasets/shakespeare.txt')
sotu_rdd.take(10)

['The Project Gutenberg EBook of The Complete Works of William Shakespeare, by ',
 'William Shakespeare',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever.  You may copy it, give it away or',
 're-use it under the terms of the Project Gutenberg License included',
 'with this eBook or online at www.gutenberg.org',
 '',
 '** This is a COPYRIGHTED Project Gutenberg eBook, Details Below **',
 '**     Please follow the copyright guidelines in this file.     **']

In [94]:
def word_count(l):
    list_1 = []
    for word in l.lower().split():
        list_1.append((word,1))
    return list_1
    

In [101]:
sotu_rdd.map(word_count)

PythonRDD[188] at RDD at PythonRDD.scala:53

In [99]:
sotu_rdd.flatMap(word_count).take(20)

[('the', 1),
 ('project', 1),
 ('gutenberg', 1),
 ('ebook', 1),
 ('of', 1),
 ('the', 1),
 ('complete', 1),
 ('works', 1),
 ('of', 1),
 ('william', 1),
 ('shakespeare,', 1),
 ('by', 1),
 ('william', 1),
 ('shakespeare', 1),
 ('this', 1),
 ('ebook', 1),
 ('is', 1),
 ('for', 1),
 ('the', 1),
 ('use', 1)]

In [102]:
def reduce_func(value1, value2):
    return (value1+value2)

In [104]:
sotu_rdd.flatMap(word_count).reduceByKey(reduce_func).take(5)

[('project', 320),
 ('gutenberg', 250),
 ('ebook', 13),
 ('of', 18126),
 ('shakespeare', 270)]

In [109]:
sotu_rdd.flatMap(word_count).reduceByKey(reduce_func).sortBy(lambda x:-x[1]).take(5)

[('the', 27730), ('and', 26099), ('i', 19540), ('to', 18763), ('of', 18126)]

## Explore the effect of caching in RAM

## Try again the count from before

# Spark 2.0

You can create `DataFrames` programatically

In [110]:
from pyspark.sql import Row

In [111]:
raw_data = [Row(state='NY', month='JAN', orders=3),
            Row(state='NJ', month='JAN', orders=4),
            Row(state='NY', month='FEB', orders=5),
           ]

In [112]:
raw_data

[Row(state='NY', month='JAN', orders=3),
 Row(state='NJ', month='JAN', orders=4),
 Row(state='NY', month='FEB', orders=5)]

In [113]:
data_df = spark.createDataFrame(raw_data)

In [114]:
data_df

DataFrame[state: string, month: string, orders: bigint]

In [115]:
data_df.printSchema()

root
 |-- state: string (nullable = true)
 |-- month: string (nullable = true)
 |-- orders: long (nullable = true)



In [116]:
data_df.show()

+-----+-----+------+
|state|month|orders|
+-----+-----+------+
|   NY|  JAN|     3|
|   NJ|  JAN|     4|
|   NY|  FEB|     5|
+-----+-----+------+



In [117]:
raw_data2 = [Row(state='NY', month='MAR', orders=10),
             Row(state='NJ', month='MAR', orders=3),
             Row(state='NY', month='APR', orders=1),
           ]

In [118]:
data_df2 = spark.createDataFrame(raw_data2)

You can merge them:

In [119]:
all_data_df = data_df.union(data_df2)

In [120]:
all_data_df.show()

+-----+-----+------+
|state|month|orders|
+-----+-----+------+
|   NY|  JAN|     3|
|   NJ|  JAN|     4|
|   NY|  FEB|     5|
|   NY|  MAR|    10|
|   NJ|  MAR|     3|
|   NY|  APR|     1|
+-----+-----+------+



Or you can also display with Pandas

In [121]:
# make sure you limit first
all_data_df.limit(10).toPandas()

Unnamed: 0,state,month,orders
0,NY,JAN,3
1,NJ,JAN,4
2,NY,FEB,5
3,NY,MAR,10
4,NJ,MAR,3
5,NY,APR,1


or `display` in DataBricks format

In [122]:
display(all_data_df)

DataFrame[state: string, month: string, orders: bigint]

### Access columns

In [123]:
all_data_df['month']

Column<b'month'>

In [124]:
all_data_df.month

Column<b'month'>

In [125]:
all_data_df['month'] + 1

Column<b'(month + 1)'>

### Selections

In [126]:
condition_month_jan = (all_data_df['month'] == "JAN")

In [127]:
condition_month_jan

Column<b'(month = JAN)'>

In [128]:
all_data_df.where(condition_month_jan)

DataFrame[state: string, month: string, orders: bigint]

In [129]:
all_data_df[condition_month_jan]

DataFrame[state: string, month: string, orders: bigint]

In [130]:
all_data_df[condition_month_jan].show()

+-----+-----+------+
|state|month|orders|
+-----+-----+------+
|   NY|  JAN|     3|
|   NJ|  JAN|     4|
+-----+-----+------+



The conditions are symbolic objects

In [131]:
(all_data_df['month']  == 'MAR') & (all_data_df['orders'] > 5)

Column<b'((month = MAR) AND (orders > 5))'>

In [132]:
all_data_df[(all_data_df['month']  == 'MAR') & (all_data_df['orders'] > 5)].show()

+-----+-----+------+
|state|month|orders|
+-----+-----+------+
|   NY|  MAR|    10|
+-----+-----+------+



You can create new columns

In [133]:
all_data_df.show()

+-----+-----+------+
|state|month|orders|
+-----+-----+------+
|   NY|  JAN|     3|
|   NJ|  JAN|     4|
|   NY|  FEB|     5|
|   NY|  MAR|    10|
|   NJ|  MAR|     3|
|   NY|  APR|     1|
+-----+-----+------+



In [134]:
all_data_df['orders'] + 1

Column<b'(orders + 1)'>

In [135]:
all_data_df.withColumn('order_plus_1', all_data_df['orders'] + 1).printSchema()

root
 |-- state: string (nullable = true)
 |-- month: string (nullable = true)
 |-- orders: long (nullable = true)
 |-- order_plus_1: long (nullable = true)



In [136]:
all_data_df.withColumn('order_plus_1', all_data_df['orders'] + 1).show()

+-----+-----+------+------------+
|state|month|orders|order_plus_1|
+-----+-----+------+------------+
|   NY|  JAN|     3|           4|
|   NJ|  JAN|     4|           5|
|   NY|  FEB|     5|           6|
|   NY|  MAR|    10|          11|
|   NJ|  MAR|     3|           4|
|   NY|  APR|     1|           2|
+-----+-----+------+------------+



You can perform some basic grouping operations

In [137]:
all_data_df.groupBy('month')

<pyspark.sql.group.GroupedData at 0x7fd5cb585ca0>

In [138]:
all_data_df.groupBy('month').count()

DataFrame[month: string, count: bigint]

In [139]:
all_data_df.groupBy('month').count().show()

+-----+-----+
|month|count|
+-----+-----+
|  APR|    1|
|  FEB|    1|
|  JAN|    2|
|  MAR|    2|
+-----+-----+



You can order by a certain column or group of columns

In [140]:
all_data_df.orderBy('orders').show()

+-----+-----+------+
|state|month|orders|
+-----+-----+------+
|   NY|  APR|     1|
|   NY|  JAN|     3|
|   NJ|  MAR|     3|
|   NJ|  JAN|     4|
|   NY|  FEB|     5|
|   NY|  MAR|    10|
+-----+-----+------+



In [141]:
all_data_df.orderBy('orders', ascending=False).show()

+-----+-----+------+
|state|month|orders|
+-----+-----+------+
|   NY|  MAR|    10|
|   NY|  FEB|     5|
|   NJ|  JAN|     4|
|   NJ|  MAR|     3|
|   NY|  JAN|     3|
|   NY|  APR|     1|
+-----+-----+------+



You can register as tables and perform SQL

In [142]:
all_data_df.registerTempTable('orders')

In [143]:
spark.sql('select count(*) from orders')

DataFrame[count(1): bigint]

In [144]:
spark.sql('select count(*) from orders').show()

+--------+
|count(1)|
+--------+
|       6|
+--------+



The `DataFrame` object can read from multiple sources.

In [145]:
%ls /datasets/

ls: cannot access '/datasets/': No such file or directory


In [146]:
spotify_df = spark.read.csv('datasets/spotify_songs.csv', 
                             header=True, 
                             inferSchema=True)

In [147]:
spotify_df.printSchema()

root
 |-- song_title: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- danceability: double (nullable = true)
 |-- duration_ms: integer (nullable = true)
 |-- energy: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- key: integer (nullable = true)
 |-- liveness: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: integer (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- valence: double (nullable = true)

