In [1]:
from pyspark.sql import SparkSession

# Exploring a Spark Session
1. Initialize application and get a spark session
2. This represents your job that runs on a cluster

In [2]:
spark = SparkSession.builder.appName("Basics").getOrCreate()

In [3]:
type(spark)

pyspark.sql.session.SparkSession

In [4]:
type(spark)

pyspark.sql.session.SparkSession

# How to read a text file in/using the Session
1. You get a property called (read) DataFrameReader
2. which has methods to read a variety of files
3. text is a method to read text files into a data frame (set of records: rows and columns)

In [5]:
dfr = spark.read

In [6]:
type(dfr)

pyspark.sql.readwriter.DataFrameReader

In [7]:
#This reads a whole directory
dfr.text(r"C:\satya\data\code\pyspark")

DataFrame[value: string]

In [8]:
#we don't want that. So just read ONE Sonnet text file
lines=dfr.text(r"C:\satya\data\code\pyspark\sonnet2.txt")

In [9]:
type(lines)

pyspark.sql.dataframe.DataFrame

# Print a DataFrame

In [10]:
lines.show()

+--------------------+
|               value|
+--------------------+
|When forty winter...|
|And dig deep tren...|
|Thy youth's proud...|
|Will be a tatter'...|
|Then being ask'd ...|
|Where all the tre...|
|To say, within th...|
|Were an all-eatin...|
|How much more pra...|
|If thou couldst a...|
|Shall sum my coun...|
|Proving his beaut...|
|This were to be n...|
|And see thy blood...|
+--------------------+



# Explore RDD from the DataFrame
1. Examine the type of RDD
2. directly printing RDD fails as it is distributed on the cluster
3. you have to bring the data back to the main node through "collect" method
4. The collected RDD becomes a plaing python list with rows and columns

In [11]:
asrdd = lines.rdd

In [12]:
type(asrdd)

pyspark.rdd.RDD

In [13]:
print(asrdd.collect())

[Row(value='When forty winters shall beseige thy brow,'), Row(value="And dig deep trenches in thy beauty's field,"), Row(value="Thy youth's proud livery, so gazed on now,"), Row(value="Will be a tatter'd weed, of small worth held:"), Row(value="Then being ask'd where all thy beauty lies,"), Row(value='Where all the treasure of thy lusty days,'), Row(value='To say, within thine own deep-sunken eyes,'), Row(value='Were an all-eating shame and thriftless praise.'), Row(value="How much more praise deserved thy beauty's use,"), Row(value="If thou couldst answer 'This fair child of mine"), Row(value="Shall sum my count and make my old excuse,'"), Row(value='Proving his beauty by succession thine!'), Row(value='This were to be new made when thou art old,'), Row(value="And see thy blood warm when thou feel'st it cold.")]


In [14]:
rddCollected = asrdd.collect()

In [15]:
print(rddCollected)

[Row(value='When forty winters shall beseige thy brow,'), Row(value="And dig deep trenches in thy beauty's field,"), Row(value="Thy youth's proud livery, so gazed on now,"), Row(value="Will be a tatter'd weed, of small worth held:"), Row(value="Then being ask'd where all thy beauty lies,"), Row(value='Where all the treasure of thy lusty days,'), Row(value='To say, within thine own deep-sunken eyes,'), Row(value='Were an all-eating shame and thriftless praise.'), Row(value="How much more praise deserved thy beauty's use,"), Row(value="If thou couldst answer 'This fair child of mine"), Row(value="Shall sum my count and make my old excuse,'"), Row(value='Proving his beauty by succession thine!'), Row(value='This were to be new made when thou art old,'), Row(value="And see thy blood warm when thou feel'st it cold.")]


In [16]:
print(asrdd)

MapPartitionsRDD[7] at javaToPython at <unknown>:0


In [17]:
type(rddCollected)

list

# Break the lines of the Sonnet into words with map
1. See map syntax with a lambda function
2. See how the row object is used with indexing and also explicit column name
3. Understand the type returened by a map: PipelinedRDD
4. Notice this mapped RDD cannot be printed directly unless collected

In [18]:
asrdd.map(lambda row: row[0])

PythonRDD[8] at RDD at PythonRDD.scala:53

In [19]:
mappedRDD = asrdd.map(lambda row: row[0])

In [20]:
type(mappedRDD)

pyspark.rdd.PipelinedRDD

In [21]:
mapRDD2 = asrdd.map(lambda row: row.value)

In [22]:
print(mapRDD2.collect())

['When forty winters shall beseige thy brow,', "And dig deep trenches in thy beauty's field,", "Thy youth's proud livery, so gazed on now,", "Will be a tatter'd weed, of small worth held:", "Then being ask'd where all thy beauty lies,", 'Where all the treasure of thy lusty days,', 'To say, within thine own deep-sunken eyes,', 'Were an all-eating shame and thriftless praise.', "How much more praise deserved thy beauty's use,", "If thou couldst answer 'This fair child of mine", "Shall sum my count and make my old excuse,'", 'Proving his beauty by succession thine!', 'This were to be new made when thou art old,', "And see thy blood warm when thou feel'st it cold."]


In [23]:
print(asrdd.collect())

[Row(value='When forty winters shall beseige thy brow,'), Row(value="And dig deep trenches in thy beauty's field,"), Row(value="Thy youth's proud livery, so gazed on now,"), Row(value="Will be a tatter'd weed, of small worth held:"), Row(value="Then being ask'd where all thy beauty lies,"), Row(value='Where all the treasure of thy lusty days,'), Row(value='To say, within thine own deep-sunken eyes,'), Row(value='Were an all-eating shame and thriftless praise.'), Row(value="How much more praise deserved thy beauty's use,"), Row(value="If thou couldst answer 'This fair child of mine"), Row(value="Shall sum my count and make my old excuse,'"), Row(value='Proving his beauty by succession thine!'), Row(value='This were to be new made when thou art old,'), Row(value="And see thy blood warm when thou feel'st it cold.")]


# Explore PipelineRDD (output of map)
1. Interesting! Read on
2. An RDD may contain a collection of any type of objects (may be!)
3. The RDD read from a text file definitely has Row/Column types
4. However when a map is used, what goes into the PipelineRDD is decided by the map function.
5. that is, what ever the map functionr returns. In this case it is a string. You can see that in the following examples
6. The collect will merely yield a list of whatever the data type is that is returned by map or what is underneath that RDD, including the PipelineRDD

In [24]:
type(mapRDD2)

pyspark.rdd.PipelinedRDD

In [25]:
firstObject = asrdd.first()
firstObjectFromMappedRDD = mapRDD2.first()

In [26]:
print(type(firstObject))
print(type(firstObjectFromMappedRDD))

<class 'pyspark.sql.types.Row'>
<class 'str'>


In [27]:
print(firstObject.value)

When forty winters shall beseige thy brow,


In [28]:
mappedRDD = asrdd.map(lambda row: row.value)

In [29]:
type(mappedRDD)

pyspark.rdd.PipelinedRDD

In [30]:
print(mappedRDD)

PythonRDD[12] at RDD at PythonRDD.scala:53


In [31]:
print(mappedRDD.collect())

['When forty winters shall beseige thy brow,', "And dig deep trenches in thy beauty's field,", "Thy youth's proud livery, so gazed on now,", "Will be a tatter'd weed, of small worth held:", "Then being ask'd where all thy beauty lies,", 'Where all the treasure of thy lusty days,', 'To say, within thine own deep-sunken eyes,', 'Were an all-eating shame and thriftless praise.', "How much more praise deserved thy beauty's use,", "If thou couldst answer 'This fair child of mine", "Shall sum my count and make my old excuse,'", 'Proving his beauty by succession thine!', 'This were to be new made when thou art old,', "And see thy blood warm when thou feel'st it cold."]


In [32]:
mappedRDDEachLineAList = asrdd.map(lambda row: row.value.split(' '))

In [33]:
type(mappedRDDEachLineAList)

pyspark.rdd.PipelinedRDD

In [34]:
print(mappedRDDEachLineAList.collect())

[['When', 'forty', 'winters', 'shall', 'beseige', 'thy', 'brow,'], ['And', 'dig', 'deep', 'trenches', 'in', 'thy', "beauty's", 'field,'], ['Thy', "youth's", 'proud', 'livery,', 'so', 'gazed', 'on', 'now,'], ['Will', 'be', 'a', "tatter'd", 'weed,', 'of', 'small', 'worth', 'held:'], ['Then', 'being', "ask'd", 'where', 'all', 'thy', 'beauty', 'lies,'], ['Where', 'all', 'the', 'treasure', 'of', 'thy', 'lusty', 'days,'], ['To', 'say,', 'within', 'thine', 'own', 'deep-sunken', 'eyes,'], ['Were', 'an', 'all-eating', 'shame', 'and', 'thriftless', 'praise.'], ['How', 'much', 'more', 'praise', 'deserved', 'thy', "beauty's", 'use,'], ['If', 'thou', 'couldst', 'answer', "'This", 'fair', 'child', 'of', 'mine'], ['Shall', 'sum', 'my', 'count', 'and', 'make', 'my', 'old', "excuse,'"], ['Proving', 'his', 'beauty', 'by', 'succession', 'thine!'], ['This', 'were', 'to', 'be', 'new', 'made', 'when', 'thou', 'art', 'old,'], ['And', 'see', 'thy', 'blood', 'warm', 'when', 'thou', "feel'st", 'it', 'cold.']]


In [35]:
topOfMappedRDD = mappedRDDEachLineAList.top(10)

In [36]:
type(topOfMappedRDD)

list

In [37]:
print(topOfMappedRDD)

[['Will', 'be', 'a', "tatter'd", 'weed,', 'of', 'small', 'worth', 'held:'], ['Where', 'all', 'the', 'treasure', 'of', 'thy', 'lusty', 'days,'], ['When', 'forty', 'winters', 'shall', 'beseige', 'thy', 'brow,'], ['Were', 'an', 'all-eating', 'shame', 'and', 'thriftless', 'praise.'], ['To', 'say,', 'within', 'thine', 'own', 'deep-sunken', 'eyes,'], ['Thy', "youth's", 'proud', 'livery,', 'so', 'gazed', 'on', 'now,'], ['This', 'were', 'to', 'be', 'new', 'made', 'when', 'thou', 'art', 'old,'], ['Then', 'being', "ask'd", 'where', 'all', 'thy', 'beauty', 'lies,'], ['Shall', 'sum', 'my', 'count', 'and', 'make', 'my', 'old', "excuse,'"], ['Proving', 'his', 'beauty', 'by', 'succession', 'thine!']]


In [38]:
topOfRDD = asrdd.top(10)

In [39]:
type(topOfRDD)

list

In [40]:
print(topOfRDD)

[Row(value="Will be a tatter'd weed, of small worth held:"), Row(value='Where all the treasure of thy lusty days,'), Row(value='When forty winters shall beseige thy brow,'), Row(value='Were an all-eating shame and thriftless praise.'), Row(value='To say, within thine own deep-sunken eyes,'), Row(value="Thy youth's proud livery, so gazed on now,"), Row(value='This were to be new made when thou art old,'), Row(value="Then being ask'd where all thy beauty lies,"), Row(value="Shall sum my count and make my old excuse,'"), Row(value='Proving his beauty by succession thine!')]


In [41]:
print(mappedRDD.top(10))

["Will be a tatter'd weed, of small worth held:", 'Where all the treasure of thy lusty days,', 'When forty winters shall beseige thy brow,', 'Were an all-eating shame and thriftless praise.', 'To say, within thine own deep-sunken eyes,', "Thy youth's proud livery, so gazed on now,", 'This were to be new made when thou art old,', "Then being ask'd where all thy beauty lies,", "Shall sum my count and make my old excuse,'", 'Proving his beauty by succession thine!']


In [42]:
type(mappedRDD.top(10))

list

# Data Frame
1. is a collection of Rows
2. Each row has a set of columns
3. it's underlying RDD can be obtaine through property df.rdd
4. lines var below is a DF
5. var "asrdd" is taken from lines.rdd

In [43]:
line1 = lines.first()

In [44]:
type(line1)

pyspark.sql.types.Row

In [45]:
print(line1)

Row(value='When forty winters shall beseige thy brow,')


In [46]:
type(lines)

pyspark.sql.dataframe.DataFrame

In [47]:
type(asrdd)

pyspark.rdd.RDD