In [1]:
import findspark
findspark.init('/usr/hdp/3.1.0.0-78/spark2')
# Installed findspark first. 

In [2]:
import pyspark
# Installed pyspark prior to this. 

In [3]:
# Here instead of creating the process distribution acoss both vms using 'sc = pyspark.SparkContext
# (appName="InvertedIndex")' I decided to only use the local machine
sc = pyspark.SparkContext("local[*]")

In [4]:
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import *
# Use the SparkSql module to read data into a dataframe

In [5]:
spark = SparkSession.builder.appName('InvertedIndex').getOrCreate()
# Here is what runs out process. 

In [6]:
df = spark.read.csv('hdfs:///user/vagrant/DataForProject.csv', mode="DROPMALFORMED", inferSchema=True, header=True)
# Here, I called the CSV file from the HDFS. 

In [7]:
df.columns
# The four columns for this dataframe are below. 

['id', 'CreationDate', 'Tags', 'PostTypeId']

In [8]:
df.dtypes
# The datatypes of each column is listed. 

[('id', 'int'),
 ('CreationDate', 'timestamp'),
 ('Tags', 'string'),
 ('PostTypeId', 'int')]

In [9]:
df.show(10)

+--------+-------------------+--------------------+----------+
|      id|       CreationDate|                Tags|PostTypeId|
+--------+-------------------+--------------------+----------+
|45673148|2017-08-14 11:07:44|<scala><playframe...|         1|
|45673149|2017-08-14 11:07:45|<ios><iphone><cor...|         1|
|45673150|2017-08-14 11:07:45|<apache-kafka><ap...|         1|
|45673151|2017-08-14 11:07:47|                null|         2|
|45673152|2017-08-14 11:08:06|                null|         2|
|45673153|2017-08-14 11:08:09|<node.js><google-...|         1|
|45673154|2017-08-14 11:08:13|                null|         2|
|45673155|2017-08-14 11:08:28|                null|         2|
|45673157|2017-08-14 11:08:29|        <regex><sed>|         1|
|45673158|2017-08-14 11:08:30|                null|         2|
+--------+-------------------+--------------------+----------+
only showing top 10 rows



In [10]:
# Below, I used the regexp_replace function to call the Tags column and insert a ',' 
## between each word that is tagged by < > inorder to separate them. 
newsdf = df.withColumn('Tags', regexp_replace(col('Tags'), "\\>\\<", ">,<"))
## After the this process, the 'split' function split each word into a list separated by ','
TagsId = newsdf.withColumn("Tags", split("Tags", ","))
## The following line selects the two columsn (Tags and id); and names the modified dataframe as 'TagsId'
TagsId = TagsId.select("Tags","id")
## For displaying a clean, I converted the dataframe into RDD (TagsIdDisplay) and kept the original (TagsId). 
TagsIdDisplay = TagsId.rdd.map(list)
TagsIdDisplay.take(10)

[[['<scala>', '<playframework>', '<sbt>'], 45673148],
 [['<ios>', '<iphone>', '<cordova>', '<itunesconnect>', '<ipa>'], 45673149],
 [['<apache-kafka>', '<apache-kafka-streams>'], 45673150],
 [None, 45673151],
 [None, 45673152],
 [['<node.js>', '<google-authentication>'], 45673153],
 [None, 45673154],
 [None, 45673155],
 [['<regex>', '<sed>'], 45673157],
 [None, 45673158]]

In [11]:
# Nevertheless, we can simply make a key/value connection between the tags and ids by using the 'explode function'
# Here the data fame is called again to take each tag separated by ',' in 'Tags' column and map it the prospectie
# id in the 'id' column
detailed = newsdf.withColumn("Tags", explode(split("Tags", "\\,")))
detailed = detailed.select("Tags","id")
detailedDisplay = detailed.rdd.map(list)
detailedDisplay.take(10)

[['<scala>', 45673148],
 ['<playframework>', 45673148],
 ['<sbt>', 45673148],
 ['<ios>', 45673149],
 ['<iphone>', 45673149],
 ['<cordova>', 45673149],
 ['<itunesconnect>', 45673149],
 ['<ipa>', 45673149],
 ['<apache-kafka>', 45673150],
 ['<apache-kafka-streams>', 45673150]]

In [12]:
# Once we complete the mapping, we now group each observation in the 'Tags' column its consecutive id. 
indexed = detailed.groupby("Tags").agg(collect_list("id"))
# The groupby function may also be used if we use the 'id' keys and 'Tags' as values. 
indexedDisplay = indexed.rdd.map(list)
# I am converting the Dataframe to RDD just for the sake of displaying the outcome. 
indexedDisplay.take(10)

[['<.net-core-1.1>', [45712067]],
 ['<8051>', [46722156]],
 ['<activemerchant>', [46720159]],
 ['<aframe>', [46713776, 46797635, 46799726, 46838749, 46860404]],
 ['<aliases>', [46792950]],
 ['<android-fusedlocation>', [46803119]],
 ['<application-structure>', [46796373]],
 ['<arm-template>', [46833296]],
 ['<aws-codepipeline>', [46720870]],
 ['<azure-iot-hub>', [45703990, 45716714, 46838666]]]

In [13]:
# import pandas
# Below is the code I used to export the result into a json file and save in the Hadoop file system. .
import json
collected_df = str(indexedDisplay.collect())
with open('FinalProject.json', 'w') as outfile:
    json.dump(collected_df, outfile)
