## InvertedIndex - Using RDD
1. View the contents of the file hortonworks.txt . It represents semi-structured
content where the first string in each line is a Web page URL, followed by a comma-
separated list of terms that appear on that particular Web page.

2. Write a Spark application using Rdd that takes a set of <key,value> pairs (key is Web
page URL and Value is list of terms on the web page) and inverts the index, so that
each value(term) becomes a key, and the list of Web page URLs where that term is
appearing as a list of values. You can see from the output file how this information
could be used as the beginnings of a web search engine.

3. Here is a subset of what the output should look like

download http://hortonworks.com/community/, http://hortonworks.com/,http://hortonworks.com/download/

downloads http://hortonworks.com/get-started/,http://hortonworks.com/events/

enterprise http://hortonworks.com/products/hortonworksdataplatform/, http://hortonworks.com/about-us/contact-us/,http://hortonworks.com/

feed http://hortonworks.com/products/, http://hortonworks.com/kb,http://hortonworks.com/resources/

founders http://hortonworks.com/about-us/,http://hortonworks.com/get-started/

hadoop http://hortonworks.com/,http://hortonworks.com/download/, http://hortonworks.com/community/,http:hortonworks.com/kb,http://hortonworks.com/about-us/,http://hortonworks.com/resources/,http://hortonworks.com/webinars/,http://hortonworks.com/resources/,http://hortonworks.com/hadoop-rooting/

Note: The inverted index problem is one of the earliest and most common use case of
MapReduce: invert the data so that the words on the Web page become the key, and the
Web page URLs become the value.

In [1]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [2]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [3]:
file_path = "file:///home/talentum/test-jupyter/EvaluationLabs/Lab1/hortonworks.txt"

# Create a baseRDD from the file path
baseRDD = sc.textFile(file_path)

# Split the lines of baseRDD
splitRDD = baseRDD.map(lambda x: x.split(','))

# Create a tuple of the (Key,urls)
splitRDD_no_stop_words = splitRDD.flatMap(lambda line: [(word,line[0]) for word in line[1:]])

# Reduce all values on basis of key
resultRDD = splitRDD_no_stop_words.reduceByKey(lambda x, y: x +' , '+ y)

# Display outputs of inverted key-value pairs
for key, value in resultRDD.collect():
    print(key,value)

hadoop http://hortonworks.com/ , http://hortonworks.com/download/ , http://hortonworks.com/community/ , http://hortonworks.com/kb , http://hortonworks.com/about-us/ , http://hortonworks.com/resources/ , http://hortonworks.com/webinars/ , http://hortonworks.com/resources/ , http://hortonworks.com/hadoop-training/
webinars http://hortonworks.com/
enterprise http://hortonworks.com/ , http://hortonworks.com/products/hortonworksdataplatform/ , http://hortonworks.com/about-us/contact-us/
team http://hortonworks.com/
reliability http://hortonworks.com/ , http://hortonworks.com/resources/
feed http://hortonworks.com/products/ , http://hortonworks.com/kb , http://hortonworks.com/resources/
board http://hortonworks.com/products/
password http://hortonworks.com/products/hortonworksdataplatform/ , http://hortonworks.com/community/
hdp http://hortonworks.com/get-started/ , http://hortonworks.com/download/ , http://hortonworks.com/about-us/contact-us/ , http://hortonworks.com/events/ , http://horton

In [4]:
from pyspark.sql.types import *

storeRDD = sc.parallelize(resultRDD.collect())

# Define a schema for creating a Dataframe
urls_schema = StructType([
  StructField('Key (Path)', StringType(), True),
  StructField('Value (Domain)', StringType(), True)
])

df = spark.createDataFrame(storeRDD, schema=urls_schema)
print(df.show()) # By default, it shows first 20 rows

+--------------+--------------------+
|    Key (Path)|      Value (Domain)|
+--------------+--------------------+
|        hadoop|http://hortonwork...|
|      webinars|http://hortonwork...|
|    enterprise|http://hortonwork...|
|          team|http://hortonwork...|
|   reliability|http://hortonwork...|
|          feed|http://hortonwork...|
|         board|http://hortonwork...|
|      password|http://hortonwork...|
|           hdp|http://hortonwork...|
| presentations|http://hortonwork...|
|       connect|http://hortonwork...|
| knowledgebase|http://hortonwork...|
|      platform|http://hortonwork...|
|       support|http://hortonwork...|
|      training|http://hortonwork...|
|       webinar|http://hortonwork...|
|instructor-led|http://hortonwork...|
|      articles|http://hortonwork...|
|      download|http://hortonwork...|
|   hortonworks|http://hortonwork...|
+--------------+--------------------+
only showing top 20 rows

None
