In [1]:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

sc=SparkContext()
spark = SparkSession.builder.getOrCreate()

In [9]:
trigram = spark.sparkContext.sequenceFile('s3://datasets.elasticmapreduce/ngrams/books/20090715/eng-gb-all/1gram/data')

In [None]:
trigram.count()

In [4]:
import sys
import os,datetime, re
from pyspark.sql import Row
from pyspark.sql.types import *

In [5]:
def split_tp(tp):
    (index, line) = tp
    tri_gram, y, n1, n2, n3 = re.split(r'[\t]', line)
    words = tri_gram.split()
    while len(words)<3:
        words.append('Null')   
    match_count=int(n1)
    return (words[0],words[1],words[2]), match_count 

In [6]:
new_gram = trigram.map(lambda ls: split_tp(ls))

In [7]:
new_gram.take(10)

[(('!', '!', 'That'), 1),
 (('!', '!', 'That'), 2),
 (('!', '!', 'That'), 2),
 (('!', '!', 'That'), 1),
 (('!', '!', 'That'), 1),
 (('!', '!', 'That'), 1),
 (('!', '!', 'That'), 6),
 (('!', '!', 'That'), 1),
 (('!', '!', 'That'), 1),
 (('!', '!', 'That'), 3)]

In [14]:
def add(a,b):
    return a+b

In [15]:
reduced_rdd = new_gram.reduceByKey(add).cache()

In [16]:
reduced_rdd.take(20)

[(('".', 'Arensky', 'Null'), 44),
 (('.', 'Assumes', 'a'), 211),
 (('.', 'At', 'Almaden'), 63),
 (('.', 'At', 'Avranches'), 62),
 (('.', 'Buses', 'Terminal'), 581),
 (('.', 'But', 'obstacles'), 156),
 (('.', 'But', 'patriotism'), 249),
 (('.', 'By', 'caste'), 43),
 (('.', 'CORROSIVE', 'SUBLIMATE'), 95),
 (('.', 'Can', 'work'), 94),
 (('.', 'Carrying', 'all'), 97),
 (('.', 'Catharina', 'Alexowna'), 52),
 (('.', 'Cavalry', 'was'), 343),
 (('.', 'Chancellor', 'Adenauer'), 100),
 (('.', 'Chief', 'in'), 246),
 (('.', 'Climatic', 'Conditions'), 79),
 (('".', 'Cobbctt', 'Null'), 78),
 (('.', 'Common', 'rules'), 60),
 (('.', 'Comprising', 'also'), 406),
 (('.', 'Desiccating', '—'), 82)]

In [58]:
def Rowtransfer(element):
    words, value = element
    wd1, wd2, wd3 = words
    return Row(word_12= wd1+' '+wd2, word_3=wd3, 
               count=value)

In [59]:
rdd = reduced_rdd.map(Rowtransfer)

In [60]:
rdd.take(10)

[Row(count=44, word_12='". Arensky', word_3='Null'),
 Row(count=211, word_12='. Assumes', word_3='a'),
 Row(count=63, word_12='. At', word_3='Almaden'),
 Row(count=62, word_12='. At', word_3='Avranches'),
 Row(count=581, word_12='. Buses', word_3='Terminal'),
 Row(count=156, word_12='. But', word_3='obstacles'),
 Row(count=249, word_12='. But', word_3='patriotism'),
 Row(count=43, word_12='. By', word_3='caste'),
 Row(count=95, word_12='. CORROSIVE', word_3='SUBLIMATE'),
 Row(count=94, word_12='. Can', word_3='work')]

In [61]:
wd_df = spark.createDataFrame(rdd)

In [62]:
wd_df.show(20)

+-----+-------------+----------+
|count|      word_12|    word_3|
+-----+-------------+----------+
|   44|   ". Arensky|      Null|
|  211|    . Assumes|         a|
|   63|         . At|   Almaden|
|   62|         . At| Avranches|
|  581|      . Buses|  Terminal|
|  156|        . But| obstacles|
|  249|        . But|patriotism|
|   43|         . By|     caste|
|   95|  . CORROSIVE| SUBLIMATE|
|   94|        . Can|      work|
|   97|   . Carrying|       all|
|   52|  . Catharina|  Alexowna|
|  343|    . Cavalry|       was|
|  100| . Chancellor|  Adenauer|
|  246|      . Chief|        in|
|   79|   . Climatic|Conditions|
|   78|   ". Cobbctt|      Null|
|   60|     . Common|     rules|
|  406| . Comprising|      also|
|   82|. Desiccating|         —|
+-----+-------------+----------+
only showing top 20 rows



In [63]:
df_sql = wd_df.createOrReplaceTempView('df_sql')
df_sql = spark.sql("CACHE TABLE df_sql")

In [64]:
df_count = spark.sql('with cte as\
                         (select word_12,word_3,count,sum(count) over (partition by word_12) as total_count from df_sql)\
                     select word_12,word_3, count/total_count as probability from cte\
                     order by word_12,probability desc')
df_count.show()

+-------+------+--------------------+
|word_12|word_3|         probability|
+-------+------+--------------------+
|    ! !|     !|  0.3469251002630342|
|    ! !|     '| 0.06572854471081241|
|    ! !|     .| 0.06372828467700802|
|    ! !|     )|  0.0459259703761489|
|    ! !|     I| 0.03392441017332253|
|    ! !|   The|0.028523708082050667|
|    ! !|     (|0.022542930580975527|
|    ! !|   and| 0.01760228829747867|
|    ! !|    of|0.014631902147279146|
|    ! !|   the| 0.01145148869353016|
|    ! !|    It|0.010401352175782851|
|    ! !|   And|0.009621250762599138|
|    ! !|   But|0.008701131147049117|
|    ! !|  This|0.008301079140288237|
|    ! !|     A|0.008131057037414863|
|    ! !|     -|0.007520977727104523|
|    ! !|    We|0.006870893216118095|
|    ! !|    He|0.006850890615780051|
|    ! !|     *|0.006580855511216458|
|    ! !|    In|0.006570854211047437|
+-------+------+--------------------+
only showing top 20 rows



In [65]:
df_model = df_count.createOrReplaceTempView('model')
df_model = spark.sql("CACHE TABLE model")

In [69]:
df1 = spark.sql('''select * from model where word_12 = 'I am'
''')

In [70]:
df1.show()

+-------+---------+--------------------+
|word_12|   word_3|         probability|
+-------+---------+--------------------+
|   I am|      not| 0.09659228248056738|
|   I am|     sure| 0.04816873417215208|
|   I am|        a| 0.03582667933025214|
|   I am|   afraid|0.021979895643062568|
|   I am|     very|0.021506266043526238|
|   I am|      the|0.021107614072265572|
|   I am|       in|0.019769075179189473|
|   I am|    going|0.019073257691677246|
|   I am|     glad| 0.01893294913834549|
|   I am|    sorry|0.017583116543939164|
|   I am|      now|0.017302421008794186|
|   I am|       to|0.016348024817908674|
|   I am|        .|0.012685493162212845|
|   I am|       so|0.012609652820641122|
|   I am|     told|0.010630510091000567|
|   I am|    quite|0.009766306653793938|
|   I am|convinced|0.008903514929253589|
|   I am|       no| 0.00879316605583951|
|   I am|       of|0.008537489206280238|
|   I am| indebted|0.007908179071046009|
+-------+---------+--------------------+
only showing top

In [85]:
import random
words = ['romeo', 'et']
sentence_finished = False
while not sentence_finished:
    r = random.uniform(0.02,0.001)
    if "'" in words[-2:]:
        a = spark.sql('select * from model where word_12="%s%s%s" and probability>%s'%(words[-2],' ',words[-1],r))
    else:
        a = spark.sql("select * from model where word_12 = '%s%s%s' and probability>%s" %(words[-2],' ',words[-1],r))        
    if a.count() != 0:
        words.append(a.rdd.takeSample(False,1)[0][1])
    else:
        words.append(None)
        
    if words[-2:]==[None, None]:
        sentence_finished=True
print(' '.join([t for t in words if t]))

Even though some were for many people who had not heard the word the old system ; a few other particulars ; first appeared . A general name applied in practice this would mean ' in his way of doing much more difficult of attainment by the same name with an intention to write a note with regret on quitting it and I know it not a single - handed ; a man were made in the same year his only hope I have often told his wife — I should say . The first two parts . One can only get some good work in connection with the same time the most beautiful girl was in no degree from a of our nature are often not very great and dangerous in themselves so . ' • It is a large scale the rocks under the supervision and the other day ? Is there something that the of thefe things ( p . 5 ) of a of us . I shall write of you ? Do we then have we to know something . Even in her own hands ) I can say — ' The
