### In this project, Pyspark DataFrames were created for the txt format of KJV Bible after being processed with RDD. Additional columns were made for SQL query purposes.

#### Load KJV txt file from external path

In [276]:
import re

In [277]:
k = sc.textFile("/Users/binggangliu/Downloads/Bible/kjv/KJV_c.txt", minPartitions=4)
k.count()

32423

In [278]:
k.take(5)

['',
 'Genesis',
 '',
 'Gen.1:1 In the beginning God created the heaven and the earth.',
 'Gen.1:2 And the earth was without form, and void; and darkness was upon the face of the deep. And the Spirit of God moved upon the face of the waters.']

#### Prepare data for DataFrame, first remove the empty lines and the lines with book names, and only keep the lines for all verses

In [280]:
def find_verses(line):
    pattern = re.compile(r'\.(\d|\d\d|\d\d\d):')
    match = pattern.findall(line)
    if match:
        return line

In [281]:
kv = k.filter(lambda x: find_verses(x))

In [282]:
kv.take(5)

['Gen.1:1 In the beginning God created the heaven and the earth.',
 'Gen.1:2 And the earth was without form, and void; and darkness was upon the face of the deep. And the Spirit of God moved upon the face of the waters.',
 'Gen.1:3 And God said, Let there be light: and there was light.',
 'Gen.1:4 And God saw the light, that it was good: and God divided the light from the darkness.',
 'Gen.1:5 And God called the light Day, and the darkness he called Night. And the evening and the morning were the first day.']

##### Total number of verses:

In [283]:
kv.count()

31102

#### Splitting the verses

In [284]:
def split(lines):
    lines = lines.split()
    return lines

kv_split = kv.map(split)

In [285]:
kv_split.take(2)

[['Gen.1:1',
  'In',
  'the',
  'beginning',
  'God',
  'created',
  'the',
  'heaven',
  'and',
  'the',
  'earth.'],
 ['Gen.1:2',
  'And',
  'the',
  'earth',
  'was',
  'without',
  'form,',
  'and',
  'void;',
  'and',
  'darkness',
  'was',
  'upon',
  'the',
  'face',
  'of',
  'the',
  'deep.',
  'And',
  'the',
  'Spirit',
  'of',
  'God',
  'moved',
  'upon',
  'the',
  'face',
  'of',
  'the',
  'waters.']]

#### Keep the verse title, and join the remaining words together as a single verse

Generate an index for each verse, as the verse ID (VID)

In [286]:
from pyspark.sql import Row

In [287]:
kv1 = kv_split.zipWithIndex()

In [659]:
kv1.take(2)

[(['Gen.1:1',
   'In',
   'the',
   'beginning',
   'God',
   'created',
   'the',
   'heaven',
   'and',
   'the',
   'earth.'],
  0),
 (['Gen.1:2',
   'And',
   'the',
   'earth',
   'was',
   'without',
   'form,',
   'and',
   'void;',
   'and',
   'darkness',
   'was',
   'upon',
   'the',
   'face',
   'of',
   'the',
   'deep.',
   'And',
   'the',
   'Spirit',
   'of',
   'God',
   'moved',
   'upon',
   'the',
   'face',
   'of',
   'the',
   'waters.'],
  1)]

Separate the title from the verse as verse name (VName)

In [289]:
ktv = kv1.map(lambda r: Row(VName = r[0][0], verse = ' '.join(r[0][1:]), VID= r[1]+1))

In [532]:
ktv.take(3)

[Row(VID=1, VName='Gen.1:1', verse='In the beginning God created the heaven and the earth.'),
 Row(VID=2, VName='Gen.1:2', verse='And the earth was without form, and void; and darkness was upon the face of the deep. And the Spirit of God moved upon the face of the waters.'),
 Row(VID=3, VName='Gen.1:3', verse='And God said, Let there be light: and there was light.')]

#### Convert to DataFrame from RDD

In [291]:
df = ktv.toDF()

In [292]:
df.take(3)

[Row(VID=1, VName='Gen.1:1', verse='In the beginning God created the heaven and the earth.'),
 Row(VID=2, VName='Gen.1:2', verse='And the earth was without form, and void; and darkness was upon the face of the deep. And the Spirit of God moved upon the face of the waters.'),
 Row(VID=3, VName='Gen.1:3', verse='And God said, Let there be light: and there was light.')]

In [660]:
from pyspark.sql.functions import col, udf

#from pyspark.sql.functions import monotonically_increasing_id
#df0 = df.withColumn('VID', monotonically_increasing_id())
#Initially the monotonically_increasing_id() method was used to generate the unique ID's for the verses,
# but the results showed that the consecutiveness was not guaranteed by this method, hence the zipWithIndex() method for RDD is used above


#### Add columns for book name (BName) and chapter name (CName), generating the names by extracting the required characters from the 'VName' column

In [706]:

bookname_udf = udf(lambda VName: VName[:2] if VName[2]=='.' else VName[:3] if VName[3]=='.' else VName[:4] if VName[4]=='.' else VName[:5])
chaptername_udf = udf(lambda VName: VName[:4] if VName[4]==':' else VName[:5] if VName[5]==':' else VName[:6] if VName[6]==':' else VName[:7] if VName[7]==':' else VName[:8])


df1 = df.withColumn('BName', bookname_udf(df.VName)).withColumn('CName', chaptername_udf(df.VName))

#### Create a function below to generate ID's for book name (BName) and chapeter name (CName)

Because rows in Pyspark a DataFrame are not iterable, the function below cannot take 'CName' and 'BName' columns from the DataFrame as input argument directly. In order to generate ID's for these two columns, 'CName' and 'BName' columns are first converted to lists and then RDD's are made of the lists with index zipped.
The function convert_id() is created to convert the list of names to lists of ID's.

In [707]:
list_b = df1.select('BName').rdd.map(lambda x: x).collect()

In [708]:
#list_b

In [709]:
list_c = df1.select('CName').rdd.flatMap(lambda x: x).collect()

In [710]:
#list_c

In above, flatMap() and map() both work, since there is only one element per line in the lists created.

In [711]:
def convert_id(list):
    row_id = 0
    Check = None
    id_list = []
    for name in list:
        if Check != name:
            row_id += 1
            Check = name
            id_list.append(row_id)
        else:
            row_id += 0   
            id_list.append(row_id)
    return id_list


Convert lists of names to lists of ID's:

In [712]:
list_bid = convert_id(list_b)

In [713]:
type(list_bid)

list

In [714]:
list_cid = convert_id(list_c)

In [715]:
#type(list_cid)

Generate an index for each ID:

RDD and DF with index for BID (ID for BName):

In [716]:
rdd_bid = sc.parallelize(list_bid).zipWithIndex()

In [717]:
rdd_bid.take(3)

[(1, 0), (1, 1), (1, 2)]

In [718]:
rdd_bid_col = rdd_bid.map(lambda r: Row(BID = r[0], BID_index = r[1]+1))

In [719]:
rdd_bid_col.take(3)

[Row(BID=1, BID_index=1), Row(BID=1, BID_index=2), Row(BID=1, BID_index=3)]

In [720]:
df_bid = rdd_bid_col.toDF()

In [721]:
df_bid.take(3)

[Row(BID=1, BID_index=1), Row(BID=1, BID_index=2), Row(BID=1, BID_index=3)]

RDD and DF with index for CID (ID for CName):

In [722]:
rdd_cid = sc.parallelize(list_cid).zipWithIndex()

In [723]:
rdd_cid.take(3)

[(1, 0), (1, 1), (1, 2)]

In [724]:
rdd_cid_col = rdd_cid.map(lambda r: Row(CID = r[0], CID_index = r[1]+1))

In [725]:
rdd_cid_col.take(3)

[Row(CID=1, CID_index=1), Row(CID=1, CID_index=2), Row(CID=1, CID_index=3)]

In [726]:
df_cid = rdd_cid_col.toDF()

In [727]:
df_cid.take(3)

[Row(CID=1, CID_index=1), Row(CID=1, CID_index=2), Row(CID=1, CID_index=3)]

#### Join the DataFrames to add the BID and CID columns 

In [728]:
df2 = df1.join(df_bid, df1.VID == df_bid.BID_index, 'left').drop('BID_index')

In [729]:
df2.take(2)

[Row(VID=26, VName='Gen.1:26', verse='And God said, Let us make man in our image, after our likeness: and let them have dominion over the fish of the sea, and over the fowl of the air, and over the cattle, and over all the earth, and over every creeping thing that creepeth upon the earth.', BName='Gen', CName='Gen.1', BID=1),
 Row(VID=29, VName='Gen.1:29', verse='And God said, Behold, I have given you every herb bearing seed, which is upon the face of all the earth, and every tree, in the which is the fruit of a tree yielding seed; to you it shall be for meat.', BName='Gen', CName='Gen.1', BID=1)]

In [730]:
df3 = df2.join(df_cid, df2.VID == df_cid.CID_index, 'left').drop('CID_index')

In [731]:
df3.take(2)

[Row(VID=26, VName='Gen.1:26', verse='And God said, Let us make man in our image, after our likeness: and let them have dominion over the fish of the sea, and over the fowl of the air, and over the cattle, and over all the earth, and over every creeping thing that creepeth upon the earth.', BName='Gen', CName='Gen.1', BID=1, CID=1),
 Row(VID=29, VName='Gen.1:29', verse='And God said, Behold, I have given you every herb bearing seed, which is upon the face of all the earth, and every tree, in the which is the fruit of a tree yielding seed; to you it shall be for meat.', BName='Gen', CName='Gen.1', BID=1, CID=1)]

In [732]:
df3.show()

+----+----------+--------------------+-----+-------+---+---+
| VID|     VName|               verse|BName|  CName|BID|CID|
+----+----------+--------------------+-----+-------+---+---+
|  26|  Gen.1:26|And God said, Let...|  Gen|  Gen.1|  1|  1|
|  29|  Gen.1:29|And God said, Beh...|  Gen|  Gen.1|  1|  1|
| 474| Gen.19:16|And while he ling...|  Gen| Gen.19|  1| 19|
| 964|  Gen.33:3|And he passed ove...|  Gen| Gen.33|  1| 33|
|1677| Exod.6:21|And the sons of I...| Exod| Exod.6|  2| 56|
|1697| Exod.7:11|Then Pharaoh also...| Exod| Exod.7|  2| 57|
|1806|Exod.10:28|And Pharaoh said ...| Exod|Exod.10|  2| 60|
|1950| Exod.16:2|And the whole con...| Exod|Exod.16|  2| 66|
|2040|Exod.19:13|There shall not a...| Exod|Exod.19|  2| 69|
|2214|Exod.25:18|And thou shalt ma...| Exod|Exod.25|  2| 75|
|2250|Exod.26:14|And thou shalt ma...| Exod|Exod.26|  2| 76|
|2453|Exod.32:14|And the Lord repe...| Exod|Exod.32|  2| 82|
|2509|Exod.34:12|Take heed to thys...| Exod|Exod.34|  2| 84|
|2529|Exod.34:32|And aft

#### Start to perform Spark sql operations 

In [733]:
df3.createOrReplaceTempView('df3_table')

In [734]:
spark.sql("select BID, BName, CID, CName, VID, VName from df3_table where VName == '1John.2:3'").show()

+---+-----+----+-------+-----+---------+
|BID|BName| CID|  CName|  VID|    VName|
+---+-----+----+-------+-----+---------+
| 62|1John|1161|1John.2|30554|1John.2:3|
+---+-----+----+-------+-----+---------+



In [735]:
spark.sql("select distinct BID, CID from df3_table where CName == 'Luke.2'").show()

+---+---+
|BID|CID|
+---+---+
| 42|975|
+---+---+



In [745]:
spark.sql("select distinct BID, BName from df3_table order by BID desc").take(5)

[Row(BID=66, BName='Rev'),
 Row(BID=65, BName='Jude'),
 Row(BID=64, BName='3John'),
 Row(BID=63, BName='2John'),
 Row(BID=62, BName='1John')]

Find total number of verses in the book of John:

In [752]:
spark.sql("select count(BName) from df3_table where BName == 'John' ").show()

+------------+
|count(BName)|
+------------+
|         879|
+------------+



In [754]:
spark.sql("select count(VName) from df3_table where BName == 'John' ").show()

+------------+
|count(VName)|
+------------+
|         879|
+------------+



Find total number of chapters in the book of John:

In [753]:
spark.sql("select count(distinct CName) from df3_table where BName == 'John' ").show()

+---------------------+
|count(DISTINCT CName)|
+---------------------+
|                   21|
+---------------------+



Find total number of chapters in the Bible:

In [755]:
spark.sql("select count(distinct CName) from df3_table ").show()

+---------------------+
|count(DISTINCT CName)|
+---------------------+
|                 1189|
+---------------------+



Find total number of verses in the Bible:

In [756]:
spark.sql("select count(VName) from df3_table").show()

+------------+
|count(VName)|
+------------+
|       31102|
+------------+



#### Query with Spark DataFrame functions:

In [738]:
df3.filter(df3.verse.contains('Lords')).collect()

[Row(VID=31034, VName='Rev.19:16', verse='And he hath on his vesture and on his thigh a name written, King Of Kings, And Lord Of Lords .', BName='Rev', CName='Rev.19', BID=66, CID=1186)]

In [739]:
df3.filter(df3.verse.endswith('Lords .')).collect()

[Row(VID=31034, VName='Rev.19:16', verse='And he hath on his vesture and on his thigh a name written, King Of Kings, And Lord Of Lords .', BName='Rev', CName='Rev.19', BID=66, CID=1186)]

In [740]:
df3.filter(df3.verse.like('%ords .')).collect()

[Row(VID=31034, VName='Rev.19:16', verse='And he hath on his vesture and on his thigh a name written, King Of Kings, And Lord Of Lords .', BName='Rev', CName='Rev.19', BID=66, CID=1186)]

In [741]:
#df3[df3.CName.isin("1John.1", "Ezek.59")].collect()

In [742]:
#df3.filter(df3.CName.like('%zek.39%')).collect()

In [743]:
#df3.groupBy('BName').count().show()

In [757]:
#df3.orderBy(desc('BID'), 'VID').show()