In [0]:
from pyspark.sql.functions import lower, regexp_replace, split, explode, col, udf
from pyspark.sql.types import ArrayType, StringType

In [0]:
# Step 1: Load the Data
fp = 'dbfs:/FileStore/tables/programming_assignment_1/Class_6___Text_File_for_Letter_Pairs.txt'
df = spark.read.text(fp)

In [0]:
# Verify the text was loaded one line at a time into a dataframe
display(df.head(20))

value
Data have become a torrent flowing into every area of the global economy.1 Companies churn out a
"burgeoning volume of transactional data, capturing trillions of bytes of information about their"
"customers, suppliers, and operations. millions of networked sensors are being embedded in the"
"physical world in devices such as mobile phones, smart energy meters, automobiles, and industrial"
"machines that sense, create, and communicate data in the age of the Internet of Things.2 Indeed, as"
"companies and organizations go about their business and interact with individuals, they are"
"generating a tremendous amount of digital “exhaust data,” i.e., data that"
"are created as a by-product of other activities. Social media sites, smartphones, and other"
consumer devices including PCs and laptops have allowed billions of individuals around the world to
contribute to the amount of big data available. And the growing volume of multimedia content has


In [0]:
# Step 2: Preprocess the Text
df = df.withColumn('text_lower', lower(col('value')))
df = df.withColumn('text_letters_only', regexp_replace(col('text_lower'), '[^a-z]', ' '))
df = df.withColumn('words', split(col('text_letters_only'), ' '))
words_df = df.select(explode(col('words')).alias('word'))
words_df = words_df.filter(col('word') != '')

In [0]:
# Verify the words have been properly preprocessed
display(words_df.head(20))

word
data
have
become
a
torrent
flowing
into
every
area
of


In [0]:
# Step 3: Extract Bi-grams
def get_bigrams(word):
    word = word.strip()
    bigrams = []
    if len(word) >= 2:
        for i in range(len(word) - 1):
            bigram = word[i:i+2]
            bigrams.append(bigram)
    return bigrams

get_bigrams_udf = udf(get_bigrams, ArrayType(StringType()))
bigrams_df = words_df.withColumn('bigrams', get_bigrams_udf(col('word')))
bigrams_exploded_df = bigrams_df.select(explode(col('bigrams')).alias('bigram'))

In [0]:
# Verify that each word was broken into its bigrams properly
display(bigrams_df.head(20))

word,bigrams
data,"List(da, at, ta)"
have,"List(ha, av, ve)"
become,"List(be, ec, co, om, me)"
a,List()
torrent,"List(to, or, rr, re, en, nt)"
flowing,"List(fl, lo, ow, wi, in, ng)"
into,"List(in, nt, to)"
every,"List(ev, ve, er, ry)"
area,"List(ar, re, ea)"
of,List(of)


In [0]:
# Verify that the bigrams_exploded_df was properly mapped from the bigrams_df
display(bigrams_exploded_df.head(20))

bigram
da
at
ta
ha
av
ve
be
ec
co
om


In [0]:
# Step 4: Count Bi-gram Frequencies
bigram_counts = bigrams_exploded_df.groupBy('bigram').count()

bigram,count
ct,30
ld,16
ey,7
rf,1
en,73
du,13
ye,5
ss,14
pi,3
pu,12


In [0]:
# Verify the counts are being done properly
display(bigram_counts.head(20))

In [0]:
# Step 5: Find the Top 5 Most and Least Frequent Bi-grams
# Top 5 Most Frequent Bi-grams
top5 = bigram_counts.orderBy(col('count').desc()).limit(5)
print("Top 5 Most Frequent Bi-grams:")
top5.show(truncate=False)

Top 5 Most Frequent Bi-grams:
+------+-----+
|bigram|count|
+------+-----+
|th    |146  |
|in    |128  |
|an    |127  |
|at    |126  |
|re    |102  |
+------+-----+



In [0]:
# Top 5 Least Frequent Bi-grams
bottom5 = bigram_counts.orderBy(col('count').asc()).limit(5)
print("Top 5 Least Frequent Bi-grams:")
bottom5.show(truncate=False)

Top 5 Least Frequent Bi-grams:
+------+-----+
|bigram|count|
+------+-----+
|uf    |1    |
|ju    |1    |
|lw    |1    |
|ft    |1    |
|cs    |1    |
+------+-----+





