In [1]:
# Create Spark Session and Spark Context
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark-intro').getOrCreate()
sc = spark.sparkContext
from pyspark.sql import Row
from pyspark.sql import functions as fn

## Create RDDs from Python variables

Create a "Resilient Distributed Dataset (RDD)

In [2]:
rdd = sc.parallelize(range(20))

Print the RDD

In [3]:
print(rdd)

PythonRDD[1] at RDD at PythonRDD.scala:53


Print the RDD type

In [4]:
type(rdd)

pyspark.rdd.PipelinedRDD

Show the first element of the RDD

In [5]:
rdd.first()

0

Create a python list containing the first 2 elements of the RDD

In [6]:
rdd.take(2)

[0, 1]

Create a python list containing all elements in the RDD

In [7]:
rdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

We can apply functions to each element.  Define such a function.

In [8]:
def less_than_10(x):
    if x < 10:
        return True
    else:
        return False

In [9]:
# show that it is lazy evaluation
rdd.filter(less_than_10)

PythonRDD[4] at RDD at PythonRDD.scala:53

In [10]:
rdd.filter(less_than_10).collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [11]:
rdd.filter(less_than_10).count()

10

Note that we didn't modify the rdd.  If we convert the rdd to a python list, all original values are unchanged.

In [12]:
rdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

Define a function named `square` to apply to each element of the RDD

In [13]:
def square(x):
    return x*x # x**2

Apply the square function to each element of the rdd using the map function.

In [14]:
rdd.map(square).collect()

[0,
 1,
 4,
 9,
 16,
 25,
 36,
 49,
 64,
 81,
 100,
 121,
 144,
 169,
 196,
 225,
 256,
 289,
 324,
 361]

Define a new function `multiple_of_10` to apply to each element of the RDD

In [15]:
def multiple_of_10(x):
    if x % 10 == 0:
        return True
    else:
        return False

Apply `multiple_of_10` to each element of the RDD using the map operation.

In [16]:
rdd.map(square).filter(multiple_of_10).collect()

[0, 100]

## Read from HDFS

In [17]:
# read only cell

import os
import pandas as pd

# get the databricks runtime version
db_env = os.getenv("DATABRICKS_RUNTIME_VERSION")

# Define a function to read the data file.  The full path data file name is constructed
# by checking runtime environment variables to determine if the runtime environment is 
# databricks, grading, or a student's personal computer.  The full path file name is then
# constructed based on the runtime env.
#
# Notes:
#   Graders, set the GRADING_RUNTME_ENV environment variable to contain the full path 
#   to the data file for this assignment.  For example, my grading_env var is set as 
#   follows on Windows:
#   set GRADING_RUNTIME_ENV=c:/Users/Will/Desktop/SU/IST-718/datasets
# 
# Params
#   data_file_name: The base name of the data file to load
# 
# Returns the full path file name based on the runtime env
#
def get_training_filename(data_file_name):    
    # The grading_env variable contains the full path to the 
    # directory containing the data file.  
    grading_env = os.getenv("GRADING_RUNTIME_ENV")
    
    # if the databricks env var exists
    if db_env != None:
        # build the full path file name assuming data brick env
        full_path_name = "/FileStore/tables/%s" % data_file_name
    # else if the grading environment variable exists
    elif grading_env != None:
        # build the full path file name assuming a grading env
        full_path_name = "%s/%s" % (grading_env, data_file_name)
    # else the data is assumed to be in the same dir as this notebook
    else:
        # Assume the student is running on their own computer and load the data
        # file from the same dir as this notebook
        full_path_name = data_file_name
    
    # return the full path file name to the caller
    return full_path_name

In [18]:
# read the rdd from file
full_path_file_name = get_training_filename('shakespeare.txt')
sotu_rdd = sc.textFile(full_path_file_name)

Print the unique ID for this RDD (within its SparkContext).

In [19]:
sotu_rdd.id()

10

Print the first element of the RDD.

In [20]:
sotu_rdd.first()

'The Project Gutenberg EBook of The Complete Works of William Shakespeare, by '

Note that the call to `first` actually returns a Python string:

In [21]:
type(sotu_rdd.first())

str

Convert the first 10 elements of the RDD to a python list.

In [22]:
sotu_rdd.take(10)

['The Project Gutenberg EBook of The Complete Works of William Shakespeare, by ',
 'William Shakespeare',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever.  You may copy it, give it away or',
 're-use it under the terms of the Project Gutenberg License included',
 'with this eBook or online at www.gutenberg.org',
 '',
 '** This is a COPYRIGHTED Project Gutenberg eBook, Details Below **',
 '**     Please follow the copyright guidelines in this file.     **']

1- Check how many times the word `love` appears

In [23]:
def count_love(line):
    return line.lower().split().count("love")
    # return "love" in line

In [24]:
sotu_rdd.map(count_love).take(10)

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

In [25]:
sotu_rdd.map(count_love).sum()

1279

In [26]:
def has_love(line):
    # should return True if line has word `love`, and False otherwise
    return "love" in line.lower()

In [27]:
sotu_rdd.filter(has_love).take(3)

['  Of his self-love to stop posterity?',
 '  Calls back the lovely April of her prime,',
 '  Unthrifty loveliness why dost thou spend,']

# My first map reduce job

The classic mapreduce paradigm can be accomplished by using `map` or `flatMap` (if multiple key-value pairs are generated) and `reduceByKey`.  The following RDD contains month, state, and number of orders per month.

In [28]:
example_dataset = [
['JAN', 'NY', 3.],
['JAN', 'PA', 1.],
['JAN', 'NJ', 2.],
['JAN', 'CT', 4.],
['FEB', 'PA', 1.],
['FEB', 'NJ', 1.],
['FEB', 'NY', 2.],
['FEB', 'VT', 1.],
['MAR', 'NJ', 2.],
['MAR', 'NY', 1.],
['MAR', 'VT', 2.],
['MAR', 'PA', 3.]]

The parallelize method converts the python list to an RDD.

In [29]:
dataset_rdd = sc.parallelize(example_dataset)

## Compute the number of orders per month

*1- Generate key-value pairs.  Start out by printing the dataset_rdd.*

In [30]:
dataset_rdd

ParallelCollectionRDD[17] at parallelize at PythonRDD.scala:195

Print the dataset_rdd type.

In [31]:
type(dataset_rdd)

pyspark.rdd.RDD

Define the map function to apply to each element of the RDD.

In [32]:
def map_func(row):
    return [row[0], row[2]]

Apply `map_func` to each element of the RDD.

In [33]:
print("raw data:", dataset_rdd.collect())
print("mapped data:", dataset_rdd.map(map_func).collect())

raw data: [['JAN', 'NY', 3.0], ['JAN', 'PA', 1.0], ['JAN', 'NJ', 2.0], ['JAN', 'CT', 4.0], ['FEB', 'PA', 1.0], ['FEB', 'NJ', 1.0], ['FEB', 'NY', 2.0], ['FEB', 'VT', 1.0], ['MAR', 'NJ', 2.0], ['MAR', 'NY', 1.0], ['MAR', 'VT', 2.0], ['MAR', 'PA', 3.0]]
mapped data: [['JAN', 3.0], ['JAN', 1.0], ['JAN', 2.0], ['JAN', 4.0], ['FEB', 1.0], ['FEB', 1.0], ['FEB', 2.0], ['FEB', 1.0], ['MAR', 2.0], ['MAR', 1.0], ['MAR', 2.0], ['MAR', 3.0]]


*2- Reduce to count the number of orders per month*

Define the reduce function which will be used by the reduceByKey function to accumulate the totals.

In [34]:
def reduce_func(value1, value2):
    return value1 + value2

Put it all together:

In [35]:
dataset_rdd.map(map_func).reduceByKey(reduce_func).collect()

[('JAN', 10.0), ('FEB', 5.0), ('MAR', 8.0)]

## Compute the average number of orders per month

In [36]:
dataset_rdd.collect()

[['JAN', 'NY', 3.0],
 ['JAN', 'PA', 1.0],
 ['JAN', 'NJ', 2.0],
 ['JAN', 'CT', 4.0],
 ['FEB', 'PA', 1.0],
 ['FEB', 'NJ', 1.0],
 ['FEB', 'NY', 2.0],
 ['FEB', 'VT', 1.0],
 ['MAR', 'NJ', 2.0],
 ['MAR', 'NY', 1.0],
 ['MAR', 'VT', 2.0],
 ['MAR', 'PA', 3.0]]

The cells below define a function which will be called in the map function.  The avg_map_func takes a row from the rdd defined above, and returns the value in the first col, and a tuple containing the the value in the 3rd col followd by a 1.  The 1 will be used to count the number of items for the key where the key is the month.

In [37]:
def avg_map_func(row):
    return (row[0], (row[2], 1))

The avg_reduce_func takes value 1 and vlaue 2 as inputs.  Value 1 and value 2 are expected to be the tuples defined in the output from avg_map_func above.  The goal of the function is to add up the floats and the 1's in the tuples.  We are essentially summing up the floats and the 1's associated with each unique key.  Note that the key is not one of the args, the reduceByKey function below will strip the keys out of the data returned by the map function.

In [38]:
def avg_reduce_func(value1, value2):
    # (current sum of orders + new num orders), (current number of keys + new num keys)
    return ((value1[0] + value2[0], value1[1] + value2[1]))

Below we test the map and reduceByKey functions.  The map function returns the month (used as the key for the reduceByKey function), and a tuple containing the 3rd col floating point value followed by a 1.

In [39]:
dataset_rdd.map(avg_map_func).collect()

[('JAN', (3.0, 1)),
 ('JAN', (1.0, 1)),
 ('JAN', (2.0, 1)),
 ('JAN', (4.0, 1)),
 ('FEB', (1.0, 1)),
 ('FEB', (1.0, 1)),
 ('FEB', (2.0, 1)),
 ('FEB', (1.0, 1)),
 ('MAR', (2.0, 1)),
 ('MAR', (1.0, 1)),
 ('MAR', (2.0, 1)),
 ('MAR', (3.0, 1))]

In [40]:
dataset_rdd.map(avg_map_func).reduceByKey(avg_reduce_func).collect()

[('JAN', (10.0, 4)), ('FEB', (5.0, 4)), ('MAR', (8.0, 4))]

Finally, we present 2 different ways to compute the final average using `map` and `mapValues` functions to divide the sum of the floats by the sum of the 1's.  The mapValues funtion excludes the keys so there is no need for double indexing.  The sum of the 1's is the number of rows per key so the result is the average.

In [41]:
print("Using mapValues:", dataset_rdd.map(avg_map_func).reduceByKey(avg_reduce_func).mapValues(lambda x: x[0]/x[1]).collect())
print("Using map:", dataset_rdd.map(avg_map_func).reduceByKey(avg_reduce_func).map(lambda x: (x[0], x[1][0]/x[1][1])).collect())

Using mapValues: [('JAN', 2.5), ('FEB', 1.25), ('MAR', 2.0)]
Using map: [('JAN', 2.5), ('FEB', 1.25), ('MAR', 2.0)]


## Count the frequency of words appearing in the Shakespeare sonets

Print the first 10 lines of sotu_rdd.

In [42]:
sotu_rdd.take(10)

['The Project Gutenberg EBook of The Complete Works of William Shakespeare, by ',
 'William Shakespeare',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever.  You may copy it, give it away or',
 're-use it under the terms of the Project Gutenberg License included',
 'with this eBook or online at www.gutenberg.org',
 '',
 '** This is a COPYRIGHTED Project Gutenberg eBook, Details Below **',
 '**     Please follow the copyright guidelines in this file.     **']

Define some utility functions to use in flatMap and reduceByKey.

In [43]:
def count_words(corpus):
    return [(word.lower(), 1) for word in corpus.split()]

def sum_words(first, second):
    return first + second

sotu_rdd.flatMap(count_words).reduceByKey(sum_words).take(10)

[('project', 320),
 ('gutenberg', 250),
 ('ebook', 13),
 ('of', 18126),
 ('shakespeare', 270),
 ('this', 5930),
 ('is', 9168),
 ('use', 509),
 ('anyone', 5),
 ('anywhere', 4)]

Lets break up the flatMap and reduceByKey operations. The flatMap operation takes a single element (in this case a list of words), and returns 0 or more output items.

In [44]:
sotu_rdd.flatMap(count_words).take(25)

[('the', 1),
 ('project', 1),
 ('gutenberg', 1),
 ('ebook', 1),
 ('of', 1),
 ('the', 1),
 ('complete', 1),
 ('works', 1),
 ('of', 1),
 ('william', 1),
 ('shakespeare,', 1),
 ('by', 1),
 ('william', 1),
 ('shakespeare', 1),
 ('this', 1),
 ('ebook', 1),
 ('is', 1),
 ('for', 1),
 ('the', 1),
 ('use', 1),
 ('of', 1),
 ('anyone', 1),
 ('anywhere', 1),
 ('at', 1),
 ('no', 1)]

Now when we add the reduceByKey function onto the flatMap function, the reduceByKey function groups common words by key, and adds up all the ones associated with each word / key.

In [45]:
sotu_rdd.flatMap(count_words).reduceByKey(sum_words).take(10)

[('project', 320),
 ('gutenberg', 250),
 ('ebook', 13),
 ('of', 18126),
 ('shakespeare', 270),
 ('this', 5930),
 ('is', 9168),
 ('use', 509),
 ('anyone', 5),
 ('anywhere', 4)]

# Spark 2.0

You can create `DataFrames` programatically

In [46]:
from pyspark.sql import Row

Create a python list of spark row objects.

In [47]:
raw_data = [Row(state='NY', month='JAN', orders=3),
            Row(state='NJ', month='JAN', orders=4),
            Row(state='NY', month='FEB', orders=5),
           ]

In [48]:
print("raw_data:\n", raw_data)
print("type:", type(raw_data))

raw_data:
 [Row(month='JAN', orders=3, state='NY'), Row(month='JAN', orders=4, state='NJ'), Row(month='FEB', orders=5, state='NY')]
type: <class 'list'>


Create a spark data frame from the raw data.

In [49]:
spark_df = spark.createDataFrame(raw_data)

Print the spark dataframe and it's type

In [50]:
print("spark_df:", spark_df)
print("type:", type(spark_df))

spark_df: DataFrame[month: string, orders: bigint, state: string]
type: <class 'pyspark.sql.dataframe.DataFrame'>


Print the schema.  The dataframe schema defines the column names and types (and other things).

In [51]:
spark_df.printSchema()

root
 |-- month: string (nullable = true)
 |-- orders: long (nullable = true)
 |-- state: string (nullable = true)



In [52]:
spark_df.show()

+-----+------+-----+
|month|orders|state|
+-----+------+-----+
|  JAN|     3|   NY|
|  JAN|     4|   NJ|
|  FEB|     5|   NY|
+-----+------+-----+



Create another python list containing new raw data.

In [53]:
raw_data2 = [Row(state='NY', month='MAR', orders=10),
             Row(state='NJ', month='MAR', orders=3),
             Row(state='NY', month='APR', orders=1),
           ]

Create a new spark dataframe from the new raw data

In [54]:
spark_df2 = spark.createDataFrame(raw_data2)

Merge the 2 data frames

In [55]:
print("spark_df:")
spark_df.show()
print("spark_df2:") 
spark_df2.show()
print("union:")
all_data_df = spark_df.union(spark_df2)
all_data_df.show()

spark_df:
+-----+------+-----+
|month|orders|state|
+-----+------+-----+
|  JAN|     3|   NY|
|  JAN|     4|   NJ|
|  FEB|     5|   NY|
+-----+------+-----+

spark_df2:
+-----+------+-----+
|month|orders|state|
+-----+------+-----+
|  MAR|    10|   NY|
|  MAR|     3|   NJ|
|  APR|     1|   NY|
+-----+------+-----+

union:
+-----+------+-----+
|month|orders|state|
+-----+------+-----+
|  JAN|     3|   NY|
|  JAN|     4|   NJ|
|  FEB|     5|   NY|
|  MAR|    10|   NY|
|  MAR|     3|   NJ|
|  APR|     1|   NY|
+-----+------+-----+



Or display the merged data frame using Pandas

In [56]:
# make sure you limit first
all_data_df.limit(10).toPandas()

Unnamed: 0,month,orders,state
0,JAN,3,NY
1,JAN,4,NJ
2,FEB,5,NY
3,MAR,10,NY
4,MAR,3,NJ
5,APR,1,NY


`display` produces a nice formatted tabular output in databricks.  `display` does work outside of databricks but the output is minimal.

In [57]:
display(all_data_df)

DataFrame[month: string, orders: bigint, state: string]

### Access columns

In [58]:
all_data_df['month']

Column<b'month'>

In [59]:
all_data_df.month

Column<b'month'>

In [60]:
all_data_df['month'] + 1

Column<b'(month + 1)'>

It's hard to print out the data in the above functions because columns do not have methods to show the data.  Another way to see data in the columns is using the `select` method.

In [61]:
all_data_df.select('month').show()

+-----+
|month|
+-----+
|  JAN|
|  JAN|
|  FEB|
|  MAR|
|  MAR|
|  APR|
+-----+



### Selections

In [62]:
condition_month_jan = (all_data_df['month'] == "JAN")

In [63]:
condition_month_jan

Column<b'(month = JAN)'>

In [64]:
all_data_df.where(condition_month_jan)

DataFrame[month: string, orders: bigint, state: string]

In [65]:
all_data_df[condition_month_jan]

DataFrame[month: string, orders: bigint, state: string]

In [66]:
all_data_df[condition_month_jan].show()

+-----+------+-----+
|month|orders|state|
+-----+------+-----+
|  JAN|     3|   NY|
|  JAN|     4|   NJ|
+-----+------+-----+



The conditions are symbolic objects.  Create a column selection variable named logic

In [67]:
logic = (all_data_df['month']  == 'MAR') & (all_data_df['orders'] > 5)

Now use the variable named logic to select data in the data frame.

In [68]:
all_data_df[logic].show()

+-----+------+-----+
|month|orders|state|
+-----+------+-----+
|  MAR|    10|   NY|
+-----+------+-----+



You can create new columns

In [69]:
all_data_df.show()

+-----+------+-----+
|month|orders|state|
+-----+------+-----+
|  JAN|     3|   NY|
|  JAN|     4|   NJ|
|  FEB|     5|   NY|
|  MAR|    10|   NY|
|  MAR|     3|   NJ|
|  APR|     1|   NY|
+-----+------+-----+



In [70]:
all_data_df['orders'] + 1

Column<b'(orders + 1)'>

In [71]:
all_data_df.withColumn('order_plus_1', all_data_df['orders'] + 1).printSchema()

root
 |-- month: string (nullable = true)
 |-- orders: long (nullable = true)
 |-- state: string (nullable = true)
 |-- order_plus_1: long (nullable = true)



In [72]:
all_data_df.withColumn('order_plus_1', all_data_df['orders'] + 1).show()

+-----+------+-----+------------+
|month|orders|state|order_plus_1|
+-----+------+-----+------------+
|  JAN|     3|   NY|           4|
|  JAN|     4|   NJ|           5|
|  FEB|     5|   NY|           6|
|  MAR|    10|   NY|          11|
|  MAR|     3|   NJ|           4|
|  APR|     1|   NY|           2|
+-----+------+-----+------------+



You can perform some basic grouping operations

In [73]:
all_data_df.groupBy('month')

<pyspark.sql.group.GroupedData at 0x226c9b3c308>

In [74]:
all_data_df.groupBy('month').count()

DataFrame[month: string, count: bigint]

In [105]:
all_data_df.groupBy('month').count().show()

+-----+-----+
|month|count|
+-----+-----+
|  APR|    1|
|  FEB|    1|
|  JAN|    2|
|  MAR|    2|
+-----+-----+



You can order by a certain column or group of columns

In [76]:
all_data_df.orderBy('orders').show()

+-----+------+-----+
|month|orders|state|
+-----+------+-----+
|  APR|     1|   NY|
|  MAR|     3|   NJ|
|  JAN|     3|   NY|
|  JAN|     4|   NJ|
|  FEB|     5|   NY|
|  MAR|    10|   NY|
+-----+------+-----+



In [77]:
all_data_df.orderBy('orders', ascending=False).show()

+-----+------+-----+
|month|orders|state|
+-----+------+-----+
|  MAR|    10|   NY|
|  FEB|     5|   NY|
|  JAN|     4|   NJ|
|  MAR|     3|   NJ|
|  JAN|     3|   NY|
|  APR|     1|   NY|
+-----+------+-----+



You can register as tables and perform SQL

In [78]:
all_data_df.registerTempTable('orders')

In [79]:
spark.sql('select count(*) from orders')

DataFrame[count(1): bigint]

In [80]:
spark.sql('select count(*) from orders').show()

+--------+
|count(1)|
+--------+
|       6|
+--------+



The `DataFrame` object can read from multiple sources.

In [81]:
full_path_spotify_name = get_training_filename('spotify_songs.csv')
spotify_df = spark.read.csv(full_path_spotify_name, header=True, inferSchema=True, mode="DROPMALFORMED")

In [82]:
spotify_df.printSchema()

root
 |-- song_title: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- danceability: double (nullable = true)
 |-- duration_ms: integer (nullable = true)
 |-- energy: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- key: integer (nullable = true)
 |-- liveness: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: integer (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- valence: double (nullable = true)



In [83]:
spotify_df.count()

2009

Note that data bricks changed the name of the json we uploaded!  The original name was sample-S2-records.json and the file name on databricks is sample_S2_records-3dd18.json.  If you look carefully, you will notice that in addition to adding '-3dd18' to the file name, databricks also changed some dash characters to underscores (sample-S2-records -> sample_S2_records).  The cell below shows hot to get help on databricks filesystem utils.  The cell after that uses the databricks filesystem utils to do a directory listing if we are running on databcicks.  See [databricks_utils_help](https://docs.databricks.com/dev-tools/databricks-utils.html) for more information.

In [84]:
# get help for the databricks filesystem ls command
if db_env != None:
    dbutils.fs.help("ls")

In [85]:
# this cell finds the name of the S2 records json file 

# if we are running in the databricks environment
if db_env != None:
  # get a list of dbfs file objects
  files = dbutils.fs.ls("/FileStore/tables")
  
  # for each file object in the list of file objects
  for file in files:
    # if the string 'json' is in the file name
    if "json" in file.name and "S2" in file.name:
      # print the file name
      print(file.name)
      
      # save the file name
      renamed_s2_file_name = file.name
      
      break

In [86]:
# if we are running on databricks
if db_env != None:
  # read the semantic scholar json file using the specific json file name found above 
  semantic_scholar_full_path_name = get_training_filename(renamed_s2_file_name)
  semantic_scholar = spark.read.json(semantic_scholar_full_path_name)
else:
  semantic_scholar_full_path_name = get_training_filename("sample-S2-records.json")
  semantic_scholar = spark.read.json(semantic_scholar_full_path_name)

In [87]:
# complex schema
semantic_scholar.printSchema()

root
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ids: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |-- doi: string (nullable = true)
 |-- doiUrl: string (nullable = true)
 |-- entities: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: string (nullable = true)
 |-- inCitations: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- journalName: string (nullable = true)
 |-- journalPages: string (nullable = true)
 |-- journalVolume: string (nullable = true)
 |-- outCitations: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- paperAbstract: string (nullable = true)
 |-- pdfUrls: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- pmid: string (nullable = true)
 |-- s2PdfUrl: string (nullable = true)
 |-- s2Url: string (nullable = true)
 |-- sourc

In [88]:
semantic_scholar.take(1)

[Row(authors=[Row(ids=['6323488'], name='Jose Alejandro Rauh-Hain'), Row(ids=['40439126'], name='Noah Rodriguez'), Row(ids=['5695062'], name='Whitfield B. Growdon'), Row(ids=['47348020'], name='Anne Kathryn Goodman'), Row(ids=['21470081'], name='David M. Boruta Ii'), Row(ids=['8523393'], name='Neil S Horowitz'), Row(ids=['16734596'], name='Mph Marcela  G.  del  Carmen  MD'), Row(ids=['1739284'], name='John Schorge')], doi='10.1245/s10434-011-2100-x', doiUrl='https://doi.org/10.1245/s10434-011-2100-x', entities=['Epithelial ovarian cancer', 'Excision', 'Extraction', 'Hospital admission', 'Malignant neoplasm of ovary', 'Morbidity - disease rate', 'Neoadjuvant Therapy', 'Neoplasms', 'Overall Survival', 'Patients', 'Postoperative Complications', 'Residual Tumor', 'SLC13A5 gene', 'Stage IV Ovarian Carcinoma', 'Tumor Debulking', 'intensive care unit', 'ovarian neoplasm', "stage IV childhood Hodgkin's lymphoma"], id='4cbba8127c8747a3b2cfb9c1f48c43e5c15e323e', inCitations=['505715691a8360f67f1

## The below cells implement examples from the lecture slides

### Map reduce examples follow

In [89]:
def map_func(el):
    return (1, el)

def reduce_min(el1, el2):
    return min(el1, el2)

def reduce_max(el1, el2):
    return max(el1, el2)

def reduce_sum(el1, el2):
    return el1 + el2

rdd = sc.parallelize(range(10))

print("rdd:", rdd.collect())
print("min:", rdd.map(map_func).reduce(reduce_min))
print("type(rdd.map(map_func).reduce(reduce_min))", type(rdd.map(map_func).reduce(reduce_min)))
print("max:", rdd.map(map_func).reduce(reduce_max))
print("sum:", rdd.reduce(reduce_sum))

rdd: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
min: (1, 0)
type(rdd.map(map_func).reduce(reduce_min)) <class 'tuple'>
max: (1, 9)
sum: 45


In [90]:
data = ["foo foo1 foo2", "bar bar1 bar2", "foobar foobar1 foobar2"]
rdd = sc.parallelize(data)

def f(e):
    results = []
    for word in e.split():
        results.append((word))
    return results

print("map only:")
display(rdd.map(f).collect())
print("map and reduce:")
display(rdd.map(f).reduce(lambda v1, v2: v1 + v2))

map only:


[['foo', 'foo1', 'foo2'],
 ['bar', 'bar1', 'bar2'],
 ['foobar', 'foobar1', 'foobar2']]

map and reduce:


['foo', 'foo1', 'foo2', 'bar', 'bar1', 'bar2', 'foobar', 'foobar1', 'foobar2']

### The cell below demonstrates a left outer and right outer join using RDD's

In [91]:
locations_list = [['loc1', 'NY'], ['loc2', 'NY'], ['loc3', 'PA'], ['loc4', 'FL']]
transactions_list = [['loc1', 2.0], ['loc1', 3.0], ['loc2', 5.0], ['loc5', 5.0]]
locations = sc.parallelize(locations_list)
transactions = sc.parallelize(transactions_list)

print("locations:\n", locations.collect())
print("transactions:\n", transactions.collect())
print("left outer join:\n", locations.leftOuterJoin(transactions).collect())
print("right outer join:\n", locations.rightOuterJoin(transactions).collect())

locations:
 [['loc1', 'NY'], ['loc2', 'NY'], ['loc3', 'PA'], ['loc4', 'FL']]
transactions:
 [['loc1', 2.0], ['loc1', 3.0], ['loc2', 5.0], ['loc5', 5.0]]
left outer join:
 [('loc3', ('PA', None)), ('loc1', ('NY', 2.0)), ('loc1', ('NY', 3.0)), ('loc2', ('NY', 5.0)), ('loc4', ('FL', None))]
right outer join:
 [('loc1', ('NY', 2.0)), ('loc1', ('NY', 3.0)), ('loc5', (None, 5.0)), ('loc2', ('NY', 5.0))]


### The cell below demonstrates join and group by using spark dataframes

In [92]:
# create row data used to create data frames
locations_data = [Row(location_id='loc1', n_employees=3, state='NY'),
                 Row(location_id='loc2', n_employees=8, state='NY'),
                 Row(location_id='loc3', n_employees=3, state='PA'),
                 Row(location_id='loc4', n_employees=1, state='FL')]

transactions_data = [Row(location_id='loc1', n_orders=2.0, transaction_id=1),
                    Row(location_id='loc1', n_orders=3.0, transaction_id=2),
                    Row(location_id='loc3', n_orders=5.0, transaction_id=3),
                    Row(location_id='loc5', n_orders=5.0, transaction_id=4)]

# create the data frames from the row data above
locations_df = spark.createDataFrame(locations_data)
transactions_df = spark.createDataFrame(transactions_data)

Dataframe inner join example

In [93]:
print("locations_df:")
display(locations_df.toPandas())
print("transactions_df:")
display(transactions_df.toPandas())
print("inner join:")
display(locations_df.join(transactions_df, on='location_id').toPandas())

locations_df:


Unnamed: 0,location_id,n_employees,state
0,loc1,3,NY
1,loc2,8,NY
2,loc3,3,PA
3,loc4,1,FL


transactions_df:


Unnamed: 0,location_id,n_orders,transaction_id
0,loc1,2.0,1
1,loc1,3.0,2
2,loc3,5.0,3
3,loc5,5.0,4


inner join:


Unnamed: 0,location_id,n_employees,state,n_orders,transaction_id
0,loc1,3,NY,2.0,1
1,loc1,3,NY,3.0,2
2,loc3,3,PA,5.0,3


Dataframe left outer join example

In [94]:
print("locations_df:")
display(locations_df.toPandas())
print("transactions_df:")
display(transactions_df.toPandas())
print("left outer join:")
display(locations_df.join(transactions_df, on='location_id', how='left').toPandas())

locations_df:


Unnamed: 0,location_id,n_employees,state
0,loc1,3,NY
1,loc2,8,NY
2,loc3,3,PA
3,loc4,1,FL


transactions_df:


Unnamed: 0,location_id,n_orders,transaction_id
0,loc1,2.0,1
1,loc1,3.0,2
2,loc3,5.0,3
3,loc5,5.0,4


left outer join:


Unnamed: 0,location_id,n_employees,state,n_orders,transaction_id
0,loc1,3,NY,2.0,1.0
1,loc1,3,NY,3.0,2.0
2,loc3,3,PA,5.0,3.0
3,loc2,8,NY,,
4,loc4,1,FL,,


Dataframe right outer join example

In [95]:
print("locations_df:")
display(locations_df.toPandas())
print("transactions_df:")
display(transactions_df.toPandas())
print("right outer join:")
display(locations_df.join(transactions_df, on='location_id', how='right').toPandas())

locations_df:


Unnamed: 0,location_id,n_employees,state
0,loc1,3,NY
1,loc2,8,NY
2,loc3,3,PA
3,loc4,1,FL


transactions_df:


Unnamed: 0,location_id,n_orders,transaction_id
0,loc1,2.0,1
1,loc1,3.0,2
2,loc3,5.0,3
3,loc5,5.0,4


right outer join:


Unnamed: 0,location_id,n_employees,state,n_orders,transaction_id
0,loc1,3.0,NY,2.0,1
1,loc1,3.0,NY,3.0,2
2,loc3,3.0,PA,5.0,3
3,loc5,,,5.0,4


Dataframe outer join example

In [96]:
print("locations_df:")
display(locations_df.toPandas())
print("transactions_df:")
display(transactions_df.toPandas())
print("outer join")
display(locations_df.join(transactions_df, on='location_id', how='outer').toPandas())

locations_df:


Unnamed: 0,location_id,n_employees,state
0,loc1,3,NY
1,loc2,8,NY
2,loc3,3,PA
3,loc4,1,FL


transactions_df:


Unnamed: 0,location_id,n_orders,transaction_id
0,loc1,2.0,1
1,loc1,3.0,2
2,loc3,5.0,3
3,loc5,5.0,4


outer join


Unnamed: 0,location_id,n_employees,state,n_orders,transaction_id
0,loc1,3.0,NY,2.0,1.0
1,loc1,3.0,NY,3.0,2.0
2,loc3,3.0,PA,5.0,3.0
3,loc2,8.0,NY,,
4,loc4,1.0,FL,,
5,loc5,,,5.0,4.0


Dataframe group by example

In [97]:
print("locations_df:")
display(locations_df.toPandas())
print("transactions_df:")
display(transactions_df.toPandas())
print("group by state, aggregate by n_orders:")
display(locations_df.join(transactions_df, on='location_id'). \
        groupBy('state').agg(fn.sum('n_orders')).toPandas())

locations_df:


Unnamed: 0,location_id,n_employees,state
0,loc1,3,NY
1,loc2,8,NY
2,loc3,3,PA
3,loc4,1,FL


transactions_df:


Unnamed: 0,location_id,n_orders,transaction_id
0,loc1,2.0,1
1,loc1,3.0,2
2,loc3,5.0,3
3,loc5,5.0,4


group by state, aggregate by n_orders:


Unnamed: 0,state,sum(n_orders)
0,PA,5.0
1,NY,5.0


### Selecting, modifying, symbolic operations

In [98]:
print("locations_df:")
display(locations_df.toPandas())
print("Select and create new col:")
display(locations_df.select(1 + fn.col('n_employees')).toPandas())

locations_df:


Unnamed: 0,location_id,n_employees,state
0,loc1,3,NY
1,loc2,8,NY
2,loc3,3,PA
3,loc4,1,FL


Select and create new col:


Unnamed: 0,(n_employees + 1)
0,4
1,9
2,4
3,2


In [99]:
# create an expression
new_column = 1 + fn.col('n_employees')
print("locations_df:")
display(locations_df.toPandas())
print("locations_df new col:")
display(locations_df.select(new_column.alias('new_column')).toPandas())
print("locations_df new col and cast:")
display(locations_df.select(new_column.alias('new_column').cast('float')).toPandas())

locations_df:


Unnamed: 0,location_id,n_employees,state
0,loc1,3,NY
1,loc2,8,NY
2,loc3,3,PA
3,loc4,1,FL


locations_df new col:


Unnamed: 0,new_column
0,4
1,9
2,4
3,2


locations_df new col and cast:


Unnamed: 0,new_column
0,4.0
1,9.0
2,4.0
3,2.0


In [100]:
print("location_df:")
display(locations_df.toPandas())
print("location_df with select:")
display(locations_df.select('n_employees', 'location_id', 'state').toPandas())
print("location_df with select and new col using alias:")
display(locations_df.select('n_employees', 'location_id', 'state', \
       (fn.col('n_employees') + 1).alias('n_employees_plus_one')).toPandas())

location_df:


Unnamed: 0,location_id,n_employees,state
0,loc1,3,NY
1,loc2,8,NY
2,loc3,3,PA
3,loc4,1,FL


location_df with select:


Unnamed: 0,n_employees,location_id,state
0,3,loc1,NY
1,8,loc2,NY
2,3,loc3,PA
3,1,loc4,FL


location_df with select and new col using alias:


Unnamed: 0,n_employees,location_id,state,n_employees_plus_one
0,3,loc1,NY,4
1,8,loc2,NY,9
2,3,loc3,PA,4
3,1,loc4,FL,2


### Filtering

In [101]:
print("locations_df:")
display(locations_df.toPandas())
print("Filtering example:")
display(locations_df.where((fn.col('n_employees') > 2) & (fn.col('state') == 'PA')).toPandas())

locations_df:


Unnamed: 0,location_id,n_employees,state
0,loc1,3,NY
1,loc2,8,NY
2,loc3,3,PA
3,loc4,1,FL


Filtering example:


Unnamed: 0,location_id,n_employees,state
0,loc3,3,PA


Create spark dataframe from lists

In [114]:
from pyspark.sql import Row
R = Row('val1', 'val2')

list1 = [1, 2, 3, 4]
list2 = ["foo", "bar", "baz", "foobar"]
df = spark.createDataFrame([R(val1, val2) for val1, val2 in zip(list1, list2)]).show()

+----+------+
|val1|  val2|
+----+------+
|   1|   foo|
|   2|   bar|
|   3|   baz|
|   4|foobar|
+----+------+

