In [1]:
# importing pyspark and starting a session to use pyspark
import pyspark
from pyspark.sql import SparkSession
sc = pyspark.SparkContext.getOrCreate()

In [2]:
# Making a directory to save my stack overflow query to use as input
!hdfs dfs -mkdir /input1

mkdir: `/input1': File exists


In [3]:
# Saving Stack overflow query file to new directory
!hdfs dfs -copyFromLocal Query.csv /input1

copyFromLocal: `/input1/Query.csv': File exists


In [4]:
# Checking to make sure the file was saved correctly to the directory
!hdfs dfs -ls /input1

Found 2 items
-rw-r--r--   2 root hadoop     147147 2021-03-04 04:55 /input1/Query.csv
-rw-r--r--   2 root hadoop     348070 2021-02-27 22:04 /input1/QueryResults.csv


In [5]:
# Taking the query and making it workable by creating an RDD
myrdd = sc.textFile("/input1/Query.csv") 

In [6]:
#Checking to make sure my file was created as an RDD
from pyspark.rdd import RDD
isinstance(myrdd, RDD)

True

In [7]:
#Checking first five lines of RDD
myrdd.take(5)

['<google-cloud-messaging>"',
 '"50682076","<java><spring><oauth-2.0><spring-security-oauth2>"',
 '"50682080","<dialogflow-es>"',
 '"50682087","<spring-mvc><jstl>"',
 '"50682089","<reactjs><amazon-web-services><amazon-s3><amazon-sns>"']

In [8]:
# importing function to get rid of header in file
from itertools import islice

# Getting rid of the header row by slicing the first line of the RDD
myrdd = myrdd.mapPartitionsWithIndex(
    lambda idx, it: islice(it, 1, None) if idx == 0 else it)

In [9]:
# Checking to make sure the header line was correctly taken off
myrdd.take(5)

['"50682076","<java><spring><oauth-2.0><spring-security-oauth2>"',
 '"50682080","<dialogflow-es>"',
 '"50682087","<spring-mvc><jstl>"',
 '"50682089","<reactjs><amazon-web-services><amazon-s3><amazon-sns>"',
 '"50682093","<batch-file>"']

In [10]:
# cleaning up the data by creating an index of characters to get rid of 
char_to_replace = {'<': ' ',
                   '>': ' ',
                   '"': '',
                  ',':''}

# Making my rdd data all lowercase, and using the translate function to translate all characters created in the previous index to blanks
wordlist = myrdd.map(lambda x: x.lower().translate(str.maketrans(char_to_replace)).split())
wordlist.take(5)

[['50682076', 'java', 'spring', 'oauth-2.0', 'spring-security-oauth2'],
 ['50682080', 'dialogflow-es'],
 ['50682087', 'spring-mvc', 'jstl'],
 ['50682089', 'reactjs', 'amazon-web-services', 'amazon-s3', 'amazon-sns'],
 ['50682093', 'batch-file']]

In [11]:
# Testing the lambda function to see if I can pull the first value of each line in the RDD
word_tuple = wordlist.map(lambda x: x[0])
word_tuple.take(5)

['50682076', '50682080', '50682087', '50682089', '50682093']

In [12]:
# Testing the lamda function with a for loop to take the first value and making a pair with all other values in each line
word_tuple1 = wordlist.map(lambda list: [(word, list[0]) for word in list])

In [13]:
#checking results
word_tuple1.take(5)

[[('50682076', '50682076'),
  ('java', '50682076'),
  ('spring', '50682076'),
  ('oauth-2.0', '50682076'),
  ('spring-security-oauth2', '50682076')],
 [('50682080', '50682080'), ('dialogflow-es', '50682080')],
 [('50682087', '50682087'), ('spring-mvc', '50682087'), ('jstl', '50682087')],
 [('50682089', '50682089'),
  ('reactjs', '50682089'),
  ('amazon-web-services', '50682089'),
  ('amazon-s3', '50682089'),
  ('amazon-sns', '50682089')],
 [('50682093', '50682093'), ('batch-file', '50682093')]]

In [14]:
# Using a lambda function with a for loop to create a tuple for each element of each line along with the first element starting with the second value so I don't make the first value a tuple with itself
# Basically takeing the length of list minus one to not loop through the first value but to get the index of each element with the first element
word_tuple2 = wordlist.map(lambda list: [((i +1 % len(list)), list[0]) for i in (range(len(list)-1))])

In [15]:
#checking results
word_tuple2.take(5)

[[(1, '50682076'), (2, '50682076'), (3, '50682076'), (4, '50682076')],
 [(1, '50682080')],
 [(1, '50682087'), (2, '50682087')],
 [(1, '50682089'), (2, '50682089'), (3, '50682089'), (4, '50682089')],
 [(1, '50682093')]]

In [17]:
#adding list to the front of the above lambda function to reference the value of the elements rather than the order of where they fall in the list
word_tuple3 = wordlist.map(lambda list: [(list[(i +1 % len(list))], list[0]) for i in (range(len(list)-1))])

In [18]:
#checking results
word_tuple3.take(5)

[[('java', '50682076'),
  ('spring', '50682076'),
  ('oauth-2.0', '50682076'),
  ('spring-security-oauth2', '50682076')],
 [('dialogflow-es', '50682080')],
 [('spring-mvc', '50682087'), ('jstl', '50682087')],
 [('reactjs', '50682089'),
  ('amazon-web-services', '50682089'),
  ('amazon-s3', '50682089'),
  ('amazon-sns', '50682089')],
 [('batch-file', '50682093')]]

In [19]:
# Now running the same lamda function inside of a flatmap function to get rid of the list of lists and make a list of all tuples
word_tuple4 = word_tuple3.flatMap(lambda tuplelist: [(tuple) for tuple in tuplelist])

In [20]:
word_tuple4.take(15)

[('java', '50682076'),
 ('spring', '50682076'),
 ('oauth-2.0', '50682076'),
 ('spring-security-oauth2', '50682076'),
 ('dialogflow-es', '50682080'),
 ('spring-mvc', '50682087'),
 ('jstl', '50682087'),
 ('reactjs', '50682089'),
 ('amazon-web-services', '50682089'),
 ('amazon-s3', '50682089'),
 ('amazon-sns', '50682089'),
 ('batch-file', '50682093'),
 ('sql', '50683180'),
 ('sql-server', '50683180'),
 ('date', '50683180')]

In [21]:
# Reducer function to take each tag and make a list of all post id's that include the tag key.
word_tuple5 = word_tuple4.groupByKey()

In [22]:
# Checking results to see if the inverted index has been created
word_tuple5.take(15)

[('java', <pyspark.resultiterable.ResultIterable at 0x7f0b34348a90>),
 ('oauth-2.0', <pyspark.resultiterable.ResultIterable at 0x7f0b34348990>),
 ('spring-security-oauth2',
  <pyspark.resultiterable.ResultIterable at 0x7f0b34059910>),
 ('jstl', <pyspark.resultiterable.ResultIterable at 0x7f0b34059750>),
 ('reactjs', <pyspark.resultiterable.ResultIterable at 0x7f0b34059a90>),
 ('amazon-web-services',
  <pyspark.resultiterable.ResultIterable at 0x7f0b34056dd0>),
 ('amazon-sns', <pyspark.resultiterable.ResultIterable at 0x7f0b34058750>),
 ('sql-server', <pyspark.resultiterable.ResultIterable at 0x7f0b340597d0>),
 ('es6-promise', <pyspark.resultiterable.ResultIterable at 0x7f0b3405a910>),
 ('scss-mixins', <pyspark.resultiterable.ResultIterable at 0x7f0b3405a9d0>),
 ('resttemplate', <pyspark.resultiterable.ResultIterable at 0x7f0b3405aa90>),
 ('core-data', <pyspark.resultiterable.ResultIterable at 0x7f0b3405ab50>),
 ('visual-studio', <pyspark.resultiterable.ResultIterable at 0x7f0b3405ad50>

In [23]:
# Checking the count of all the different tags
word_tuple5.count()

2928

In [24]:
# Creating an output file to save the inverted index to
!hdfs dfs -mkdir /output1

mkdir: `/output1': File exists


In [26]:
# Saving my RDD back to disk in my output directory
word_tuple5.saveAsTextFile('/output1/spark_results2')

In [30]:
# Checking that the file saved correctly to my directory 
!hdfs dfs -ls /output1/spark_results1

Found 3 items
-rw-r--r--   2 root hadoop          0 2021-03-05 19:14 /output1/spark_results1/_SUCCESS
-rw-r--r--   2 root hadoop     119480 2021-03-05 19:14 /output1/spark_results1/part-00000
-rw-r--r--   2 root hadoop     118544 2021-03-05 19:14 /output1/spark_results1/part-00001


In [31]:
# checking one of the nodes to see what was saved
!hdfs dfs -cat /output1/spark_results/part-00000 | head

('java', <pyspark.resultiterable.ResultIterable object at 0x7f3dc38d83d0>)
('oauth-2.0', <pyspark.resultiterable.ResultIterable object at 0x7f3dc38d83d0>)
('spring-security-oauth2', <pyspark.resultiterable.ResultIterable object at 0x7f3dc38d83d0>)
('jstl', <pyspark.resultiterable.ResultIterable object at 0x7f3dc38d83d0>)
('reactjs', <pyspark.resultiterable.ResultIterable object at 0x7f3dc38d83d0>)
('amazon-web-services', <pyspark.resultiterable.ResultIterable object at 0x7f3dc38d83d0>)
('amazon-sns', <pyspark.resultiterable.ResultIterable object at 0x7f3dc38d83d0>)
('sql-server', <pyspark.resultiterable.ResultIterable object at 0x7f3dc38d83d0>)
('es6-promise', <pyspark.resultiterable.ResultIterable object at 0x7f3dc38d83d0>)
('scss-mixins', <pyspark.resultiterable.ResultIterable object at 0x7f3dc38d83d0>)
cat: Unable to write to output stream.


In [89]:
# putting data into datframe
import pandas as pd
df = pd.DataFrame(data, columns =['Tag', 'Id'])

In [90]:
#checking dataframe
df.head()

Unnamed: 0,Tag,Id
0,java,"(50682076, 50683730, 50683786, 50684395, 50684..."
1,oauth-2.0,"(50682076, 50686068, 50635455)"
2,spring-security-oauth2,(50682076)
3,jstl,"(50682087, 50734468)"
4,reactjs,"(50682089, 50683201, 50684367, 50684628, 50684..."


In [109]:
df['Id'] = df['Id'].astype(str)

In [43]:
df.to_csv('index.csv') 

In [116]:
myrdd1 = sc.textFile('index.csv')