# Spark and DataFrames Tutorial

The next cell installs pyspark in the Google Colab environment. Spark is written in Scala and runs in a Java virtual machine.  Pyspark is a python interface to a spark backend virtual machine (VM).  There are Java, python, R, and .net frontend interfaces to Spark.  Essentially, pyspark sends the python spark commands to the spark VM for evaluation, then the results are returned to the pyspark frontend.

In [14]:
%%bash
# Do not change or modify this cell
# Need to install pyspark
# if pyspark is already installed, will print a message indicating requirement already satisfied
!pip install pyspark >& /dev/null

CalledProcessError: Command 'b'# Do not change or modify this cell\n# Need to install pyspark\n# if pyspark is already installed, will print a message indicating requirement already satisfied\n!pip install pyspark >& /dev/null\n'' returned non-zero exit status 127.

In [19]:
# Create Spark Session and Spark Context
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark-intro').getOrCreate()
sc = spark.sparkContext
from pyspark.sql import Row
from pyspark.sql import functions as fn

Py4JError: An error occurred while calling None.org.apache.spark.sql.SparkSession. Trace:
py4j.Py4JException: Constructor org.apache.spark.sql.SparkSession([class org.apache.spark.SparkContext, class java.util.HashMap]) does not exist
	at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
	at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
	at py4j.Gateway.invoke(Gateway.java:237)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)



Print the spark version which we installed above.

In [13]:
spark

NameError: name 'spark' is not defined

In [4]:
sc

## Create RDDs from Python variables

Create a "Resilient Distributed Dataset (RDD)

In [5]:
rdd = sc.parallelize(range(20))

Print the RDD

In [6]:
print(rdd)

PythonRDD[1] at RDD at PythonRDD.scala:53


Print the RDD type

In [7]:
type(rdd)

pyspark.rdd.PipelinedRDD

Show the first element of the RDD

In [8]:
rdd.first()

0

Create a python list containing the first 2 elements of the RDD.  The take method is a heavy weight operation because data has to be transferred from HDFS into the python interpreter's memory space. If you only take 2 then it's not a big deal but the more you take the heavier the operation becomes.

In [9]:
rdd.take(2)

[0, 1]

Create a python list containing all elements in the RDD.  Note that this is a very expensive operation as all of the data in the Spark Java VM memory space has to be collected and transferred into the python interpreter's memory space.

In [10]:
rdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

We can apply functions to each element.  Define such a function.

In [11]:
def less_than_10(x):
    if x < 10:
        return True
    else:
        return False

In [12]:
# show that it is lazy evaluation
rdd.filter(less_than_10)

PythonRDD[4] at RDD at PythonRDD.scala:53

In [13]:
rdd.filter(less_than_10).collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [14]:
rdd.filter(less_than_10).count()

10

Note that we didn't modify the rdd.  If we convert the rdd to a python list, all original values are unchanged.

In [15]:
rdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

Define a function named `square` to apply to each element of the RDD

In [16]:
def square(x):
    return x*x # x**2

Apply the square function to each element of the rdd using the map function.

In [17]:
rdd.map(square).collect()

[0,
 1,
 4,
 9,
 16,
 25,
 36,
 49,
 64,
 81,
 100,
 121,
 144,
 169,
 196,
 225,
 256,
 289,
 324,
 361]

Define a new function `multiple_of_10` to apply to each element of the RDD

In [18]:
def multiple_of_10(x):
    if x % 10 == 0:
        return True
    else:
        return False

The cell below shows the types which result from separate operations.  The map and filter functions return new RDDs because RDDs are immutable.  

In [19]:
rdd1 = rdd.map(square)
print("type(rdd1):", type(rdd1))
rdd2 = rdd1.filter(multiple_of_10)
print("type(rdd2):", type(rdd2))
result = rdd2.collect()
print("type(result):", type(result))

type(rdd1): <class 'pyspark.rdd.PipelinedRDD'>
type(rdd2): <class 'pyspark.rdd.PipelinedRDD'>
type(result): <class 'list'>


In this cell, we chain the operations together from the previous cell and apply `multiple_of_10` to each element of the RDD using the map operation.  The "chaining" works because the output of each stage is a new RDD as shown in the previous cell.

In [20]:
rdd.map(square).filter(multiple_of_10).collect()

[0, 100]

## Read from HDFS

In [9]:
%%bash
# Download the data files from github
# If the data file does not exist in the colab environment
data_file=shakespeare.txt
if [[ ! -f ${data_file} ]]; then 
   # download the data file from github and save it in this colab environment instance
   wget https://raw.githubusercontent.com/wewilli1/ist718_data/master/${data_file} 
fi

data_file1=sample_S2_records.json
if [[ ! -f ${data_file1} ]]; then 
   # download the data file from github and save it in this colab environment instance
   wget https://raw.githubusercontent.com/wewilli1/ist718_data/master/${data_file1}  
fi

Connecting to raw.githubusercontent.com (185.199.109.133:443)
shakespeare.txt       85% |***************************     | 4656k  0:00:00 ETA
shakespeare.txt      100% |********************************| 5451k  0:00:00 ETA

Connecting to raw.githubusercontent.com (185.199.109.133:443)
sample_S2_records.js 100% |********************************|  232k  0:00:00 ETA



Print the unique ID for this RDD (within its SparkContext).

In [22]:
sotu_rdd = sc.textFile("shakespeare.txt")
sotu_rdd.id()

11

Print the first element of the RDD.

In [23]:
sotu_rdd.first()

'This is the 100th Etext file presented by Project Gutenberg, and'

Note that the call to `first` actually returns a Python string:

In [24]:
type(sotu_rdd.first())

str

Convert the first 10 elements of the RDD to a python list.

In [25]:
sotu_rdd.take(10)

['This is the 100th Etext file presented by Project Gutenberg, and',
 'is presented in cooperation with World Library, Inc., from their',
 'Library of the Future and Shakespeare CDROMS.  Project Gutenberg',
 'often releases Etexts that are NOT placed in the Public Domain!!',
 '',
 'Shakespeare',
 '',
 '*This Etext has certain copyright implications you should read!*',
 '',
 '<<THIS ELECTRONIC VERSION OF THE COMPLETE WORKS OF WILLIAM']

1- Check how many times the word `love` appears

In [26]:
def count_love(line):
    return line.lower().split().count("love")

In [27]:
sotu_rdd.map(count_love).take(10)

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

In [28]:
sotu_rdd.map(count_love).sum()

1279

In [29]:
def has_love(line):
    # should return True if line has word `love`, and False otherwise
    return "love" in line.lower()

In [30]:
sotu_rdd.filter(has_love).take(3)

['  Of his self-love to stop posterity?  ',
 '  Calls back the lovely April of her prime,',
 '  Unthrifty loveliness why dost thou spend,']

# My first map reduce job

The classic mapreduce paradigm can be accomplished by using `map`, `flatMap`, and `reduceByKey`.  The following RDD contains month, state, and number of orders per month.

In [31]:
example_dataset = [
['JAN', 'NY', 3.],
['JAN', 'PA', 1.],
['JAN', 'NJ', 2.],
['JAN', 'CT', 4.],
['FEB', 'PA', 1.],
['FEB', 'NJ', 1.],
['FEB', 'NY', 2.],
['FEB', 'VT', 1.],
['MAR', 'NJ', 2.],
['MAR', 'NY', 1.],
['MAR', 'VT', 2.],
['MAR', 'PA', 3.]]

The parallelize method converts the python list to an RDD.

In [32]:
dataset_rdd = sc.parallelize(example_dataset)

## Compute the number of orders per month

*1- Generate key-value pairs.  Start out by printing the dataset_rdd.*

In [33]:
dataset_rdd

ParallelCollectionRDD[18] at readRDDFromFile at PythonRDD.scala:274

Print the dataset_rdd type.

In [34]:
type(dataset_rdd)

pyspark.rdd.RDD

Define the map function to apply to each element of the RDD.

In [35]:
def map_func(row):
    return [row[0], row[2]]

Apply `map_func` to each element of the RDD.

In [36]:
print("raw data:", dataset_rdd.collect())
print("mapped data:", dataset_rdd.map(map_func).collect())

raw data: [['JAN', 'NY', 3.0], ['JAN', 'PA', 1.0], ['JAN', 'NJ', 2.0], ['JAN', 'CT', 4.0], ['FEB', 'PA', 1.0], ['FEB', 'NJ', 1.0], ['FEB', 'NY', 2.0], ['FEB', 'VT', 1.0], ['MAR', 'NJ', 2.0], ['MAR', 'NY', 1.0], ['MAR', 'VT', 2.0], ['MAR', 'PA', 3.0]]
mapped data: [['JAN', 3.0], ['JAN', 1.0], ['JAN', 2.0], ['JAN', 4.0], ['FEB', 1.0], ['FEB', 1.0], ['FEB', 2.0], ['FEB', 1.0], ['MAR', 2.0], ['MAR', 1.0], ['MAR', 2.0], ['MAR', 3.0]]


*2- Reduce to count the number of orders per month*

Define the reduce function which will be used by the reduceByKey function to accumulate the totals.

In [37]:
def reduce_func(value1, value2):
    return value1 + value2

Put it all together:

In [38]:
dataset_rdd.map(map_func).reduceByKey(reduce_func).collect()

[('JAN', 10.0), ('FEB', 5.0), ('MAR', 8.0)]

## Compute the average number of orders per month

In [39]:
dataset_rdd.collect()

[['JAN', 'NY', 3.0],
 ['JAN', 'PA', 1.0],
 ['JAN', 'NJ', 2.0],
 ['JAN', 'CT', 4.0],
 ['FEB', 'PA', 1.0],
 ['FEB', 'NJ', 1.0],
 ['FEB', 'NY', 2.0],
 ['FEB', 'VT', 1.0],
 ['MAR', 'NJ', 2.0],
 ['MAR', 'NY', 1.0],
 ['MAR', 'VT', 2.0],
 ['MAR', 'PA', 3.0]]

The cells below defines a function which will be called in the map function.  The avg_map_func takes a row from the rdd defined above, and returns the value in the first col, and a tuple containing the the value in the 3rd col followd by a 1.  The 1 will be used in the reducer to count the number of items for the key where the key is the month.

In [40]:
MONTH_INDEX = 0
ORDER_INDEX = 2

def avg_map_func(row):
    return (row[MONTH_INDEX], (row[ORDER_INDEX], 1))

The avg_reduce_func takes value 1 and vlaue 2 as inputs.  Value 1 and value 2 are expected to be the tuples defined in the output from avg_map_func above.  The goal of the function is to add up the floats and the 1's in the tuples.  We are essentially summing up the floats and the 1's associated with each unique key.  Note that the key is not one of the args, the reduceByKey function below will strip the keys out of the data returned by the map function.

In [41]:
COUNT_INDEX = 1

def avg_reduce_func(value1, value2):
    # (current sum of orders + new num orders), (current number of keys + new num keys)
    return ((value1[MONTH_INDEX] + value2[MONTH_INDEX], value1[COUNT_INDEX] + value2[COUNT_INDEX]))

In [42]:
dataset_rdd.map(avg_map_func).collect()

[('JAN', (3.0, 1)),
 ('JAN', (1.0, 1)),
 ('JAN', (2.0, 1)),
 ('JAN', (4.0, 1)),
 ('FEB', (1.0, 1)),
 ('FEB', (1.0, 1)),
 ('FEB', (2.0, 1)),
 ('FEB', (1.0, 1)),
 ('MAR', (2.0, 1)),
 ('MAR', (1.0, 1)),
 ('MAR', (2.0, 1)),
 ('MAR', (3.0, 1))]

Below we test the map and reduceByKey functions.  The map function returns the month (used as the key for the reduceByKey function), and a tuple containing the 3rd col floating point value followed by a 1.

In [43]:
dataset_rdd.map(avg_map_func).reduceByKey(avg_reduce_func).collect()

[('JAN', (10.0, 4)), ('FEB', (5.0, 4)), ('MAR', (8.0, 4))]

Finally, we present 2 different ways to compute the final average using `map` and `mapValues` functions to divide the sum of the floats by the sum of the 1's.  The mapValues funtion excludes the keys so there is no need for double indexing.  The sum of the 1's is the number of rows per key so the result is the average.

In [44]:
TOTAL_INDEX = 0
print("Using mapValues:", dataset_rdd.map(avg_map_func).reduceByKey(avg_reduce_func).mapValues(lambda x: x[TOTAL_INDEX]/x[COUNT_INDEX]).collect())

KEY_INDEX = 0
VALUE_INDEX = 1
TOTAL_ORDER_INDEX = 0
COUNT_INDEX = 1
print("Using map:", dataset_rdd.map(avg_map_func).reduceByKey(avg_reduce_func).map(lambda x: (x[KEY_INDEX], x[VALUE_INDEX][TOTAL_ORDER_INDEX]/x[VALUE_INDEX][COUNT_INDEX])).collect())

Using mapValues: [('JAN', 2.5), ('FEB', 1.25), ('MAR', 2.0)]
Using map: [('JAN', 2.5), ('FEB', 1.25), ('MAR', 2.0)]


## Count the frequency of words appearing in the Shakespeare sonets

Print the first 10 lines of sotu_rdd.

In [45]:
sotu_rdd.take(10)

['This is the 100th Etext file presented by Project Gutenberg, and',
 'is presented in cooperation with World Library, Inc., from their',
 'Library of the Future and Shakespeare CDROMS.  Project Gutenberg',
 'often releases Etexts that are NOT placed in the Public Domain!!',
 '',
 'Shakespeare',
 '',
 '*This Etext has certain copyright implications you should read!*',
 '',
 '<<THIS ELECTRONIC VERSION OF THE COMPLETE WORKS OF WILLIAM']

Define some utility functions to use in flatMap and reduceByKey.

In [46]:
def count_words(corpus):
    return [(word.lower(), 1) for word in corpus.split()]

def sum_words(first, second):
    return first + second

sotu_rdd.flatMap(count_words).reduceByKey(sum_words).take(10)

[('this', 5882),
 ('is', 9145),
 ('presented', 14),
 ('project', 244),
 ('gutenberg,', 1),
 ('in', 10671),
 ('cooperation', 1),
 ('world', 568),
 ('library,', 224),
 ('library', 7)]

Lets break up the flatMap and reduceByKey operations. The flatMap operation takes a single element (in this case a list of words), and returns 0 or more output items.

In [47]:
sotu_rdd.flatMap(count_words).take(25)

[('this', 1),
 ('is', 1),
 ('the', 1),
 ('100th', 1),
 ('etext', 1),
 ('file', 1),
 ('presented', 1),
 ('by', 1),
 ('project', 1),
 ('gutenberg,', 1),
 ('and', 1),
 ('is', 1),
 ('presented', 1),
 ('in', 1),
 ('cooperation', 1),
 ('with', 1),
 ('world', 1),
 ('library,', 1),
 ('inc.,', 1),
 ('from', 1),
 ('their', 1),
 ('library', 1),
 ('of', 1),
 ('the', 1),
 ('future', 1)]

For comparison purposes only, here is what happens if we use map instead of flatMap. Notice how map returns a list of lists while flatMap returns a single list.

In [48]:
sotu_rdd.map(count_words).take(5)

[[('this', 1),
  ('is', 1),
  ('the', 1),
  ('100th', 1),
  ('etext', 1),
  ('file', 1),
  ('presented', 1),
  ('by', 1),
  ('project', 1),
  ('gutenberg,', 1),
  ('and', 1)],
 [('is', 1),
  ('presented', 1),
  ('in', 1),
  ('cooperation', 1),
  ('with', 1),
  ('world', 1),
  ('library,', 1),
  ('inc.,', 1),
  ('from', 1),
  ('their', 1)],
 [('library', 1),
  ('of', 1),
  ('the', 1),
  ('future', 1),
  ('and', 1),
  ('shakespeare', 1),
  ('cdroms.', 1),
  ('project', 1),
  ('gutenberg', 1)],
 [('often', 1),
  ('releases', 1),
  ('etexts', 1),
  ('that', 1),
  ('are', 1),
  ('not', 1),
  ('placed', 1),
  ('in', 1),
  ('the', 1),
  ('public', 1),
  ('domain!!', 1)],
 []]

Now when we add the reduceByKey function onto the flatMap function, the reduceByKey function groups common words by key, and adds up all the ones associated with each word / key.

In [49]:
sotu_rdd.flatMap(count_words).reduceByKey(sum_words).take(10)

[('this', 5882),
 ('is', 9145),
 ('presented', 14),
 ('project', 244),
 ('gutenberg,', 1),
 ('in', 10671),
 ('cooperation', 1),
 ('world', 568),
 ('library,', 224),
 ('library', 7)]

# Spark 2.0

You can create `DataFrames` programatically

In [15]:
from pyspark.sql import Row

Create a python list of spark row objects.

In [16]:
raw_data = [Row(state='NY', month='JAN', orders=3),
            Row(state='NJ', month='JAN', orders=4),
            Row(state='NY', month='FEB', orders=5),
           ]

In [17]:
print("raw_data:\n", raw_data)
print("type:", type(raw_data))

raw_data:
 [Row(state='NY', month='JAN', orders=3), Row(state='NJ', month='JAN', orders=4), Row(state='NY', month='FEB', orders=5)]
type: <class 'list'>


Create a spark data frame from the raw data.

In [18]:
spark_df = spark.createDataFrame(raw_data)

NameError: name 'spark' is not defined

Print the spark dataframe and it's type

In [54]:
print("spark_df:", spark_df)
print("type:", type(spark_df))

spark_df: DataFrame[state: string, month: string, orders: bigint]
type: <class 'pyspark.sql.dataframe.DataFrame'>


Print the schema.  The dataframe schema defines the column names and types (and other things).

In [55]:
spark_df.printSchema()

root
 |-- state: string (nullable = true)
 |-- month: string (nullable = true)
 |-- orders: long (nullable = true)



In [56]:
spark_df.show()

+-----+-----+------+
|state|month|orders|
+-----+-----+------+
|   NY|  JAN|     3|
|   NJ|  JAN|     4|
|   NY|  FEB|     5|
+-----+-----+------+



Create another python list containing new raw data.

In [57]:
raw_data2 = [Row(state='NY', month='MAR', orders=10),
             Row(state='NJ', month='MAR', orders=3),
             Row(state='NY', month='APR', orders=1),
           ]

Create a new spark dataframe from the new raw data

In [58]:
spark_df2 = spark.createDataFrame(raw_data2)

Merge the 2 data frames

In [59]:
print("spark_df:")
spark_df.show()
print("spark_df2:") 
spark_df2.show()
print("union:")
all_data_df = spark_df.union(spark_df2)
all_data_df.show()

spark_df:
+-----+-----+------+
|state|month|orders|
+-----+-----+------+
|   NY|  JAN|     3|
|   NJ|  JAN|     4|
|   NY|  FEB|     5|
+-----+-----+------+

spark_df2:
+-----+-----+------+
|state|month|orders|
+-----+-----+------+
|   NY|  MAR|    10|
|   NJ|  MAR|     3|
|   NY|  APR|     1|
+-----+-----+------+

union:
+-----+-----+------+
|state|month|orders|
+-----+-----+------+
|   NY|  JAN|     3|
|   NJ|  JAN|     4|
|   NY|  FEB|     5|
|   NY|  MAR|    10|
|   NJ|  MAR|     3|
|   NY|  APR|     1|
+-----+-----+------+



Or display the merged data frame using Pandas

In [60]:
# make sure you limit first
all_data_df.limit(10).toPandas()

Unnamed: 0,state,month,orders
0,NY,JAN,3
1,NJ,JAN,4
2,NY,FEB,5
3,NY,MAR,10
4,NJ,MAR,3
5,NY,APR,1


### Access columns

In [61]:
all_data_df['month']

Column<'month'>

In [62]:
all_data_df.month

Column<'month'>

In [63]:
all_data_df['month'] + 1

Column<'(month + 1)'>

It's hard to print out the data in the above functions because columns do not have methods to show the data.  Another way to see data in the columns is using the `select` method.  Note that the select method returns a new dataframe type.

In [64]:
print(all_data_df.select('month').show())
print(type(all_data_df))

+-----+
|month|
+-----+
|  JAN|
|  JAN|
|  FEB|
|  MAR|
|  MAR|
|  APR|
+-----+

None
<class 'pyspark.sql.dataframe.DataFrame'>


### Selections

In [65]:
condition_month_jan = (all_data_df['month'] == "JAN")

In [66]:
condition_month_jan

Column<'(month = JAN)'>

In [67]:
all_data_df.where(condition_month_jan)

DataFrame[state: string, month: string, orders: bigint]

In [68]:
all_data_df[condition_month_jan]

DataFrame[state: string, month: string, orders: bigint]

In [69]:
all_data_df[condition_month_jan].show()

+-----+-----+------+
|state|month|orders|
+-----+-----+------+
|   NY|  JAN|     3|
|   NJ|  JAN|     4|
+-----+-----+------+



The conditions are symbolic objects.  Create a column selection variable named logic

In [70]:
logic = (all_data_df['month']  == 'MAR') & (all_data_df['orders'] > 5)

Now use the variable named logic to select data in the data frame.

In [71]:
all_data_df[logic].show()

+-----+-----+------+
|state|month|orders|
+-----+-----+------+
|   NY|  MAR|    10|
+-----+-----+------+



You can create new columns

In [72]:
all_data_df.show()

+-----+-----+------+
|state|month|orders|
+-----+-----+------+
|   NY|  JAN|     3|
|   NJ|  JAN|     4|
|   NY|  FEB|     5|
|   NY|  MAR|    10|
|   NJ|  MAR|     3|
|   NY|  APR|     1|
+-----+-----+------+



In [73]:
all_data_df['orders'] + 1

Column<'(orders + 1)'>

In [74]:
all_data_df.withColumn('order_plus_1', all_data_df['orders'] + 1).printSchema()

root
 |-- state: string (nullable = true)
 |-- month: string (nullable = true)
 |-- orders: long (nullable = true)
 |-- order_plus_1: long (nullable = true)



In [75]:
all_data_df.withColumn('order_plus_1', all_data_df['orders'] + 1).show()

+-----+-----+------+------------+
|state|month|orders|order_plus_1|
+-----+-----+------+------------+
|   NY|  JAN|     3|           4|
|   NJ|  JAN|     4|           5|
|   NY|  FEB|     5|           6|
|   NY|  MAR|    10|          11|
|   NJ|  MAR|     3|           4|
|   NY|  APR|     1|           2|
+-----+-----+------+------------+



You can perform some basic grouping operations

In [76]:
all_data_df.groupBy('month')

<pyspark.sql.group.GroupedData at 0x7f826382b850>

In [77]:
all_data_df.groupBy('month').count()

DataFrame[month: string, count: bigint]

In [78]:
all_data_df.groupBy('month').count().show()

+-----+-----+
|month|count|
+-----+-----+
|  JAN|    2|
|  FEB|    1|
|  MAR|    2|
|  APR|    1|
+-----+-----+



You can order by a certain column or group of columns

In [79]:
all_data_df.orderBy('orders').show()

+-----+-----+------+
|state|month|orders|
+-----+-----+------+
|   NY|  APR|     1|
|   NY|  JAN|     3|
|   NJ|  MAR|     3|
|   NJ|  JAN|     4|
|   NY|  FEB|     5|
|   NY|  MAR|    10|
+-----+-----+------+



In [80]:
all_data_df.orderBy('orders', ascending=False).show()

+-----+-----+------+
|state|month|orders|
+-----+-----+------+
|   NY|  MAR|    10|
|   NY|  FEB|     5|
|   NJ|  JAN|     4|
|   NY|  JAN|     3|
|   NJ|  MAR|     3|
|   NY|  APR|     1|
+-----+-----+------+



You can register as tables and perform SQL

In [81]:
all_data_df.registerTempTable('orders')



In [82]:
spark.sql('select count(*) from orders')

DataFrame[count(1): bigint]

In [83]:
spark.sql('select count(*) from orders').show()

+--------+
|count(1)|
+--------+
|       6|
+--------+



Example of creating a dataframe from a .csv file.

In [84]:
semantic_scholar = spark.read.json("sample_S2_records.json")

In [85]:
# complex schema
semantic_scholar.printSchema()

root
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ids: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |-- doi: string (nullable = true)
 |-- doiUrl: string (nullable = true)
 |-- entities: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: string (nullable = true)
 |-- inCitations: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- journalName: string (nullable = true)
 |-- journalPages: string (nullable = true)
 |-- journalVolume: string (nullable = true)
 |-- outCitations: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- paperAbstract: string (nullable = true)
 |-- pdfUrls: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- pmid: string (nullable = true)
 |-- s2PdfUrl: string (nullable = true)
 |-- s2Url: string (nullable = true)
 |-- sourc

In [86]:
semantic_scholar.take(1)

[Row(authors=[Row(ids=['6323488'], name='Jose Alejandro Rauh-Hain'), Row(ids=['40439126'], name='Noah Rodriguez'), Row(ids=['5695062'], name='Whitfield B. Growdon'), Row(ids=['47348020'], name='Anne Kathryn Goodman'), Row(ids=['21470081'], name='David M. Boruta Ii'), Row(ids=['8523393'], name='Neil S Horowitz'), Row(ids=['16734596'], name='Mph Marcela  G.  del  Carmen  MD'), Row(ids=['1739284'], name='John Schorge')], doi='10.1245/s10434-011-2100-x', doiUrl='https://doi.org/10.1245/s10434-011-2100-x', entities=['Epithelial ovarian cancer', 'Excision', 'Extraction', 'Hospital admission', 'Malignant neoplasm of ovary', 'Morbidity - disease rate', 'Neoadjuvant Therapy', 'Neoplasms', 'Overall Survival', 'Patients', 'Postoperative Complications', 'Residual Tumor', 'SLC13A5 gene', 'Stage IV Ovarian Carcinoma', 'Tumor Debulking', 'intensive care unit', 'ovarian neoplasm', "stage IV childhood Hodgkin's lymphoma"], id='4cbba8127c8747a3b2cfb9c1f48c43e5c15e323e', inCitations=['505715691a8360f67f1

## The below cells implement examples from the lecture slides

### Map reduce examples follow

In [87]:
def map_func(el):
    return (1, el)

def reduce_min(el1, el2):
    return min(el1, el2)

def reduce_max(el1, el2):
    return max(el1, el2)

def reduce_sum(el1, el2):
    return el1 + el2

rdd = sc.parallelize(range(10))
print(rdd)
print("rdd:", rdd.collect())
print("min:", rdd.map(map_func).reduce(reduce_min))
print("type(rdd.map(map_func).reduce(reduce_min))", type(rdd.map(map_func).reduce(reduce_min)))
print("max:", rdd.map(map_func).reduce(reduce_max))
print("sum:", rdd.reduce(reduce_sum))

PythonRDD[132] at RDD at PythonRDD.scala:53
rdd: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
min: (1, 0)
type(rdd.map(map_func).reduce(reduce_min)) <class 'tuple'>
max: (1, 9)
sum: 45


In [88]:
data = ["foo foo1 foo2", "bar bar1 bar2", "foobar foobar1 foobar2"]
rdd = sc.parallelize(data)

def f(e):
    results = []
    for word in e.split():
        results.append((word))
    return results

print("map only:")
display(rdd.map(f).collect())
print("map and reduce:")
display(rdd.map(f).reduce(lambda v1, v2: v1 + v2))

map only:


[['foo', 'foo1', 'foo2'],
 ['bar', 'bar1', 'bar2'],
 ['foobar', 'foobar1', 'foobar2']]

map and reduce:


['foo', 'foo1', 'foo2', 'bar', 'bar1', 'bar2', 'foobar', 'foobar1', 'foobar2']

### The cell below demonstrates a left outer and right outer join using RDD's

In [89]:
locations_list = [['loc1', 'NY'], ['loc2', 'NY'], ['loc3', 'PA'], ['loc4', 'FL']]
transactions_list = [['loc1', 2.0], ['loc1', 3.0], ['loc2', 5.0], ['loc5', 5.0]]
locations = sc.parallelize(locations_list)
transactions = sc.parallelize(transactions_list)

print("locations:\n", locations.collect())
print("transactions:\n", transactions.collect())
print("left outer join:\n", locations.leftOuterJoin(transactions).collect())
print("right outer join:\n", locations.rightOuterJoin(transactions).collect())

locations:
 [['loc1', 'NY'], ['loc2', 'NY'], ['loc3', 'PA'], ['loc4', 'FL']]
transactions:
 [['loc1', 2.0], ['loc1', 3.0], ['loc2', 5.0], ['loc5', 5.0]]
left outer join:
 [('loc1', ('NY', 2.0)), ('loc1', ('NY', 3.0)), ('loc2', ('NY', 5.0)), ('loc3', ('PA', None)), ('loc4', ('FL', None))]
right outer join:
 [('loc1', ('NY', 2.0)), ('loc1', ('NY', 3.0)), ('loc5', (None, 5.0)), ('loc2', ('NY', 5.0))]


### The cell below demonstrates join and group by using spark dataframes

In [90]:
# create row data used to create data frames
locations_data = [Row(location_id='loc1', n_employees=3, state='NY'),
                 Row(location_id='loc2', n_employees=8, state='NY'),
                 Row(location_id='loc3', n_employees=3, state='PA'),
                 Row(location_id='loc4', n_employees=1, state='FL')]

transactions_data = [Row(location_id='loc1', n_orders=2.0, transaction_id=1),
                    Row(location_id='loc1', n_orders=3.0, transaction_id=2),
                    Row(location_id='loc3', n_orders=5.0, transaction_id=3),
                    Row(location_id='loc5', n_orders=5.0, transaction_id=4)]

# create the data frames from the row data above
locations_df = spark.createDataFrame(locations_data)
transactions_df = spark.createDataFrame(transactions_data)

Dataframe inner join example

In [91]:
print("locations_df:")
display(locations_df.toPandas())
print("transactions_df:")
display(transactions_df.toPandas())
print("inner join:")
display(locations_df.join(transactions_df, on='location_id').toPandas())

locations_df:


Unnamed: 0,location_id,n_employees,state
0,loc1,3,NY
1,loc2,8,NY
2,loc3,3,PA
3,loc4,1,FL


transactions_df:


Unnamed: 0,location_id,n_orders,transaction_id
0,loc1,2.0,1
1,loc1,3.0,2
2,loc3,5.0,3
3,loc5,5.0,4


inner join:


Unnamed: 0,location_id,n_employees,state,n_orders,transaction_id
0,loc1,3,NY,2.0,1
1,loc1,3,NY,3.0,2
2,loc3,3,PA,5.0,3


Dataframe left outer join example

In [92]:
print("locations_df:")
display(locations_df.toPandas())
print("transactions_df:")
display(transactions_df.toPandas())
print("left outer join:")
display(locations_df.join(transactions_df, on='location_id', how='left').toPandas())

locations_df:


Unnamed: 0,location_id,n_employees,state
0,loc1,3,NY
1,loc2,8,NY
2,loc3,3,PA
3,loc4,1,FL


transactions_df:


Unnamed: 0,location_id,n_orders,transaction_id
0,loc1,2.0,1
1,loc1,3.0,2
2,loc3,5.0,3
3,loc5,5.0,4


left outer join:


Unnamed: 0,location_id,n_employees,state,n_orders,transaction_id
0,loc1,3,NY,3.0,2.0
1,loc1,3,NY,2.0,1.0
2,loc2,8,NY,,
3,loc3,3,PA,5.0,3.0
4,loc4,1,FL,,


Dataframe right outer join example

In [93]:
print("locations_df:")
display(locations_df.toPandas())
print("transactions_df:")
display(transactions_df.toPandas())
print("right outer join:")
display(locations_df.join(transactions_df, on='location_id', how='right').toPandas())

locations_df:


Unnamed: 0,location_id,n_employees,state
0,loc1,3,NY
1,loc2,8,NY
2,loc3,3,PA
3,loc4,1,FL


transactions_df:


Unnamed: 0,location_id,n_orders,transaction_id
0,loc1,2.0,1
1,loc1,3.0,2
2,loc3,5.0,3
3,loc5,5.0,4


right outer join:


Unnamed: 0,location_id,n_employees,state,n_orders,transaction_id
0,loc1,3.0,NY,2.0,1
1,loc1,3.0,NY,3.0,2
2,loc3,3.0,PA,5.0,3
3,loc5,,,5.0,4


Dataframe outer join example

In [94]:
print("locations_df:")
display(locations_df.toPandas())
print("transactions_df:")
display(transactions_df.toPandas())
print("outer join")
display(locations_df.join(transactions_df, on='location_id', how='outer').toPandas())

locations_df:


Unnamed: 0,location_id,n_employees,state
0,loc1,3,NY
1,loc2,8,NY
2,loc3,3,PA
3,loc4,1,FL


transactions_df:


Unnamed: 0,location_id,n_orders,transaction_id
0,loc1,2.0,1
1,loc1,3.0,2
2,loc3,5.0,3
3,loc5,5.0,4


outer join


Unnamed: 0,location_id,n_employees,state,n_orders,transaction_id
0,loc1,3.0,NY,2.0,1.0
1,loc1,3.0,NY,3.0,2.0
2,loc2,8.0,NY,,
3,loc3,3.0,PA,5.0,3.0
4,loc4,1.0,FL,,
5,loc5,,,5.0,4.0


Dataframe group by example

In [95]:
print("locations_df:")
display(locations_df.toPandas())
print("transactions_df:")
display(transactions_df.toPandas())
print("group by state, aggregate by n_orders:")
display(locations_df.join(transactions_df, on='location_id'). \
        groupBy('state').agg(fn.sum('n_orders')).toPandas())

locations_df:


Unnamed: 0,location_id,n_employees,state
0,loc1,3,NY
1,loc2,8,NY
2,loc3,3,PA
3,loc4,1,FL


transactions_df:


Unnamed: 0,location_id,n_orders,transaction_id
0,loc1,2.0,1
1,loc1,3.0,2
2,loc3,5.0,3
3,loc5,5.0,4


group by state, aggregate by n_orders:


Unnamed: 0,state,sum(n_orders)
0,PA,5.0
1,NY,5.0


### Selecting, modifying, symbolic operations

In [96]:
# create an expression
new_column = 1 + fn.col('n_employees')
print("new_column type:", type(new_column))
print("locations_df:")
display(locations_df.toPandas())

print("locations_df new col:")
display(locations_df.select(new_column.alias('new_column')).toPandas())

print("locations_df new col and cast:")
display(locations_df.select(new_column.alias('new_column').cast('float')).toPandas())

new_column type: <class 'pyspark.sql.column.Column'>
locations_df:


Unnamed: 0,location_id,n_employees,state
0,loc1,3,NY
1,loc2,8,NY
2,loc3,3,PA
3,loc4,1,FL


locations_df new col:


Unnamed: 0,new_column
0,4
1,9
2,4
3,2


locations_df new col and cast:


Unnamed: 0,new_column
0,4.0
1,9.0
2,4.0
3,2.0


In [97]:
print("location_df:")
display(locations_df.toPandas())

print("location_df with select:")
display(locations_df.select('n_employees', 'location_id', 'state').toPandas())

print("location_df with select and new col using alias:")
display(locations_df.select('n_employees', 'location_id', 'state', \
       (fn.col('n_employees') + 1).alias('n_employees_plus_one')).toPandas())

location_df:


Unnamed: 0,location_id,n_employees,state
0,loc1,3,NY
1,loc2,8,NY
2,loc3,3,PA
3,loc4,1,FL


location_df with select:


Unnamed: 0,n_employees,location_id,state
0,3,loc1,NY
1,8,loc2,NY
2,3,loc3,PA
3,1,loc4,FL


location_df with select and new col using alias:


Unnamed: 0,n_employees,location_id,state,n_employees_plus_one
0,3,loc1,NY,4
1,8,loc2,NY,9
2,3,loc3,PA,4
3,1,loc4,FL,2


### Filtering

In [98]:
print("locations_df:")
display(locations_df.toPandas())

print("Filter by n_employees > 2 & state == PA:")
display(locations_df.where((fn.col('n_employees') > 2) & (fn.col('state') == 'PA')).toPandas())

locations_df:


Unnamed: 0,location_id,n_employees,state
0,loc1,3,NY
1,loc2,8,NY
2,loc3,3,PA
3,loc4,1,FL


Filter by n_employees > 2 & state == PA:


Unnamed: 0,location_id,n_employees,state
0,loc3,3,PA


Create spark dataframe from lists

In [99]:
from pyspark.sql import Row

# create a row expression
R = Row('val1', 'val2')

list1 = [1, 2, 3, 4]
list2 = ["foo", "bar", "baz", "foobar"]
df = spark.createDataFrame([R(val1, val2) for val1, val2 in zip(list1, list2)])
df.show()
df.printSchema()

+----+------+
|val1|  val2|
+----+------+
|   1|   foo|
|   2|   bar|
|   3|   baz|
|   4|foobar|
+----+------+

root
 |-- val1: long (nullable = true)
 |-- val2: string (nullable = true)

