## Big Data Analytics
## Nimisha Asati 

### Install Java, Spark, and Findspark
This installs Apache Spark 2.2.1, Java 8, and [Findspark](https://github.com/minrk/findspark), a library that makes it easy for Python to find Spark.

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.1/spark-2.4.1-bin-hadoop2.7.tgz
!tar xf spark-2.4.1-bin-hadoop2.7.tgz
!pip install -q findspark

### Set Environment Variables
Set the locations where Spark and Java are installed.

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.1-bin-hadoop2.7"

### Start a SparkSession
This will start a local Spark session.

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
from pyspark import SparkConf
from pyspark.context import SparkContext

In [0]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

In [0]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer 
from pyspark.ml.clustering import KMeans

## K-Means with Spark MLlib


In [0]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.feature import HashingTF as MLHashingTF
from pyspark.ml.feature import IDF as MLIDF
from pyspark.ml.clustering import KMeans

In [0]:
sc = SparkContext()
sqlContext = SQLContext(sc)

## Part 1: Convert files into lowercase


In [0]:
def toLower(inputList):
    outputList = []
    for x in inputList:
        outputList.append(x.lower())
    return outputList

In [31]:
from google.colab import files
uploaded = files.upload()

Saving reut2-000.sgm to reut2-000 (1).sgm
Saving reut2-001.sgm to reut2-001 (1).sgm
Saving reut2-002.sgm to reut2-002 (1).sgm
Saving reut2-003.sgm to reut2-003 (1).sgm
Saving reut2-004.sgm to reut2-004 (1).sgm
Saving reut2-005.sgm to reut2-005 (1).sgm
Saving reut2-006.sgm to reut2-006 (1).sgm
Saving reut2-007.sgm to reut2-007 (1).sgm
Saving reut2-008.sgm to reut2-008 (1).sgm
Saving reut2-009.sgm to reut2-009.sgm
Saving reut2-010.sgm to reut2-010.sgm
Saving reut2-011.sgm to reut2-011.sgm
Saving reut2-012.sgm to reut2-012.sgm
Saving reut2-013.sgm to reut2-013.sgm
Saving reut2-014.sgm to reut2-014.sgm
Saving reut2-015.sgm to reut2-015.sgm
Saving reut2-016.sgm to reut2-016.sgm
Saving reut2-017.sgm to reut2-017.sgm
Saving reut2-018.sgm to reut2-018.sgm
Saving reut2-019.sgm to reut2-019.sgm
Saving reut2-020.sgm to reut2-020.sgm
Saving reut2-021.sgm to reut2-021.sgm


In [0]:
allFiles = sc.wholeTextFiles("*.sgm")

In [0]:
allFilesLower = allFiles.map(lambda x: toLower(x))

## Part 2: Transform the lowercased file to TF-IDF,

In [0]:
df = sqlContext.createDataFrame(documentsLower,["fileName","data"])
allFilesDF = (df.rdd.map(lambda x : (x.fileName,x.data.split(" "))).toDF().withColumnRenamed("_1","fileName").withColumnRenamed("_2","text"))

In [0]:
TF_matrix = MLHashingTF(inputCol="text", outputCol="tf",numFeatures=9500)
TF_value = TF_matrix.transform(allFilesDF)

In [0]:
IDF_matrix = MLIDF(inputCol="tf", outputCol="features")
TF_IDF = idf.fit(TF_value).transform(TF_value)

## Part 3: K-Means algorithm implemented on the TF-IDF matrix

In [0]:
k_means = KMeans(k=4, seed=567)
cluster = k_means.fit(tfidf.select("features"))

## Part 4: Reporting cluster ID for each file

In [65]:
Cluster_ID_File = cluster.transform(tfidf).select("FILENAME", "PREDICTION")
Cluster_ID_File.show()

+--------------------+----------+
|            FILENAME|PREDICTION|
+--------------------+----------+
|file:/content/reu...|         0|
|file:/content/reu...|         0|
|file:/content/reu...|         0|
|file:/content/reu...|         0|
|file:/content/reu...|         0|
|file:/content/reu...|         0|
|file:/content/reu...|         0|
|file:/content/reu...|         0|
|file:/content/reu...|         0|
|file:/content/reu...|         0|
|file:/content/reu...|         0|
|file:/content/reu...|         0|
|file:/content/reu...|         2|
|file:/content/reu...|         1|
|file:/content/reu...|         0|
|file:/content/reu...|         1|
|file:/content/reu...|         0|
|file:/content/reu...|         0|
|file:/content/reu...|         0|
|file:/content/reu...|         0|
+--------------------+----------+
only showing top 20 rows



## Saving output to a file

In [59]:
!mkdir -p drive
!google-drive-ocamlfuse drive

fuse: mountpoint is not empty
fuse: if you are sure this is safe, use the 'nonempty' mount option


In [0]:
Final_cluster = Cluster_ID_File.rdd
Formatted_cluster= Final_cluster.map( lambda x:  [x[0].rsplit('/', 1)[-1], x[1]])


In [0]:
df = sqlContext.createDataFrame(Formatted_cluster, ['FILENAME', 'PREDICTION'])

In [0]:
df.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('outputNim1')