In [None]:
import os
import pyspark
import re
from pyspark.sql import SQLContext, SparkSession
from pyspark.ml.feature import StopWordsRemover, Tokenizer
from pyspark.sql.functions import regexp_replace, trim, col, lower
from pyspark.sql.types import StructField, StructType, StringType, IntegerType

sc = SparkSession \
        .builder \
        .master('local[8]') \
        .appName("bdcc2") \
        .getOrCreate()

print("Spark Version: " + sc.version)
print("PySpark Version: " + pyspark.__version__)

Spark Version: 3.1.1
PySpark Version: 3.1.1


In [None]:
!hdfs dfs -copyFromLocal -f /home/bdccuser/notebooks/mapreduce/data/posts.csv

2021-06-11 21:11:44,007 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false


In [None]:
struct_fields = [StructField("document_id", IntegerType(), True),
                 StructField("column_2", IntegerType(), True),
                 StructField("column_3", StringType(), True),
                 StructField("title_of_blog_posts", StringType(), True)]

sch = StructType(struct_fields)

path = "/user/bdccuser/posts.csv"

df = sc.read.option('delimiter', ',').schema(sch).csv(path)

df = df.select("document_id","title_of_blog_posts")

In [None]:
df1 = df.select(regexp_replace("title_of_blog_posts", "\p{Punct}", "").alias("title_of_blog_posts"), "document_id")
token = Tokenizer(inputCol="title_of_blog_posts", outputCol="terms")

df2 = token.transform(df1)
stopwords = StopWordsRemover(inputCol="terms", outputCol="final_terms")

df3 = stopwords.transform(df2)

In [None]:
def inverted_index(index):
    index = index.map(lambda x: (x[1],x[0])).flatMapValues(lambda x: x)
    index = index.map(lambda x:((x[1],x[0]),1))
    index = index.groupByKey()
    index = index.mapValues(sum).map(lambda x: (x[1],x[0]))
    return index

data = df3.select("final_terms", "document_id").rdd
   
inverted_final = inverted_index(data)

inverted_final = inverted_final.collect()
for row in inverted_final:
    print(row[0],str(row[1]))

1 ('tuaw', 2)
1 ('history', 2)
1 ('23', 3)
1 ('working', 4)
1 ('talkcast', 6)
1 ('live', 6)
1 ('150', 7)
1 ('prices', 7)
1 ('beta', 8)
1 ('17', 9)
1 ('details', 10)
1 ('greensboro', 11)
1 ('apple', 11)
1 ('reinstalling', 12)
1 ('preview', 13)
1 ('goes', 14)
1 ('app', 15)
1 ('hackers', 16)
1 ('achieve', 16)
1 ('linux', 16)
1 ('boot', 16)
1 ('doorbusters', 17)
1 ('quicktime', 21)
1 ('apple', 23)
1 ('posts', 23)
1 ('releases', 24)
1 ('site', 25)
1 ('iphone', 26)
1 ('home', 26)
1 ('iphone', 28)
1 ('new', 30)
1 ('laptop', 33)
1 ('blur', 34)
2 ('2', 34)
1 ('blur', 35)
1 ('ask', 37)
1 ('discount', 39)
1 ('os', 40)
1 ('blackberry', 41)
1 ('storm', 41)
1 ('tv', 42)
1 ('093', 43)
1 ('input', 43)
1 ('npd', 44)
1 ('q3', 44)
1 ('introduces', 47)
1 ('watchdogs', 48)
1 ('iphone', 48)
1 ('use', 49)
1 ('gives', 52)
1 ('classic', 52)
1 ('messaging', 54)
1 ('schramms', 55)
1 ('new', 56)
1 ('december', 56)
1 ('friday', 59)
1 ('apple', 67)
1 ('rumor', 68)
1 ('two', 75)
1 ('steve', 78)
1 ('campaign', 79)
1 