# SHUBHAM SHAILESH PANDEY, UBID - spandey4

### The method below reads the text files from the subfolders inside the Data folder and makes a Data Frame out of it. The Dataframe contains 3 columns after this - 
### 'value' - This column contains an entire article
### 'filename' - This column contains the filepath of the article 
### 'Category'- The category to which the article belongs to (Business,Politics,Sports or Technology)
### 'label' - This column maps a unique integer based on the article's category


In [19]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name,col,lit,split,lower
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import HashingTF, IDF
from pyspark.sql import functions as F
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import regexp_extract

def make_df():
    df1 = spark.read.text("./Data/Technology")
    df1 = df1.withColumn("filename", input_file_name())
    df1 = df1.withColumn("Category",lit("Technology"))
    df1 = df1.withColumn("label",lit(0))

    df2 = spark.read.text("./Data/Sports")
    df2 = df2.withColumn("filename", input_file_name())  
    df2 = df2.withColumn("Category",lit("Sports"))
    df2 = df2.withColumn("label",lit(1))

    df3 = spark.read.text("./Data/Business")
    df3 = df3.withColumn("filename", input_file_name())  
    df3 = df3.withColumn("Category",lit("Business"))
    df3 = df3.withColumn("label",lit(2))

    df4 = spark.read.text("./Data/Politics")
    df4 = df4.withColumn("filename", input_file_name())
    df4 = df4.withColumn("Category",lit("Politics"))
    df4 = df4.withColumn("label",lit(3))

    df = df1.union(df2)
    df = df.union(df3)
    df = df.union(df4)
    return df

### The method below lists some regex rules needed to clean the article words

In [20]:
def regex_rules(df):
    df = df.withColumn('words', F.regexp_replace('value', 'http+s*\:+\/\/[a-zA-Z\.\/0-9]+ *', ''))
    df = df.withColumn('words', F.regexp_replace('words', ' {1}[a-zA-Z0-9]+…$', ''))
    df = df.withColumn('words', F.regexp_replace('words', '[\)\( \/\.-][0-9]+[ \)\/\.\(-]', ' '))
    df = df.withColumn('words', lower(col('words')))
    df = df.withColumn('words', F.regexp_replace('words', "'s", ''))
    df = df.withColumn('words', F.regexp_replace('words', '[^a-zA-Z0-9 ]+', ' '))
    df = df.withColumn('words', F.regexp_replace('words', '[ ][0-9]+ ', ''))
    df = df.withColumn('words', split('words', "\s+"))
    return df

### The method below removes the stop words mentioned in the array. They are not essential to the data we need to classify. I used the StopWordsRemover library to do this.

In [21]:
def stopwords_remove(df):
    stopwords = ['a','an','the','and','is','are','was','were','what','them','had','some','ca',
             'why','when','where','who','whose','which','that','off','ever','many','ve',
             'those','this','those','but','so','thus','again','therefore','its','both',
             'like','in','on','up','down','under','over','i','we','they','while','okay',
             'he','them','their','there','us','of','you','your','us','our','mine','mr',
             'such','am','to','too','for','from','since','until','between','she','own',
             'my','not','if','as', 'well','youre','hadnt','havent','wont','q','se','ok',
             'very','have','it','be','been','has','having','his', 'her','never','above',
             'should','would', 'could','just', 'about','do','doing','does','did','la','ha',
             'go','going','goes','being','with', 'yes', 'no','how','before','than','d',
             'after','any','here','out','now','then','got','into','all','cant','or','ya',
             'despite','beyond','further','wanna', 'want','gonna','isnt', 'at','also','lo',
             'because','due','heres','try','said','says','will','shall','link','asked',
             'more','less','often','lol','maybe','perhaps','quite','even','him','by','n',
             'among','can','may','most','took','during','me','told','might','hi','es','l',
             'theyll','use','u','whats','couldnt','wouldnt','see','im','dont','x','de',
             'doesnt','shouldnt', 'hes','thats','let','lets','get','gets','en','co','k',
             'whats','s','say','via','youll','wed','theyd','youd','w','m','hey','hello',
             'youve','theyve','weve','theyd','youd','ive','were','ill','yet','b','rt',
             'id','o','r','z','um','em','seen','didnt','r','e','t','c','y','only','v',
             'arent','werent','hasnt','mostly','much','ago','wasnt','aint','nope','p',
             'll','ja','al','el','gt','cs','si','didnt','re','f','fo','j','ni','tr','il',
             'rt','http','https','amp']
    remover = StopWordsRemover(inputCol="words", outputCol="filtered_words", stopWords = stopwords)
    df = remover.transform(df)
    return df

### The drop_cols method below drop unnecessary columns from the data frame generated.
### The tf_idf method generates a column of features based on the word and document frequencies.
### These features will be used for training the data using the various classification methods.
### After lot of parameter tuning, I observed the best results when I use 13100 features 

In [236]:
def drop_cols(df):
    df = df.withColumn('file', regexp_extract('filename', '^.*/(.*)', 1)) \
        .drop('filename')
    df = df.drop('rawFeatures')
    return df

def tf_idf(df):
    hashingTF = HashingTF(inputCol="filtered_words", outputCol="rawFeatures", numFeatures=13100)
    featurizedData = hashingTF.transform(df)
    idf = IDF(inputCol="rawFeatures", outputCol="features")
    idfModel = idf.fit(featurizedData)
    df = idfModel.transform(featurizedData)
    return df

### The method below classifies data using Random Forest Classifier. I show 30 predictions to show how much they conformed to the actual labels.

In [237]:
def random_forest(trainingData,testData):
    from pyspark.ml.classification import RandomForestClassifier
    print("Random Forest Classifier")
    rf = RandomForestClassifier(labelCol="label", featuresCol="features")
    model = rf.fit(trainingData)
    predictions = model.transform(testData)
    # Select example rows to display.
    predictions.select("value","Category","probability","label","prediction") \
        .orderBy("probability", ascending=False) \
        .show(n = 30, truncate = 30)
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
    print("Accuracy :- " + str(100*evaluator.evaluate(predictions)) +" %"+"\n")
    
    

### The method below classifies data using Naive Bayes Classifier. I show 30 predictions to show how much they conformed to the actual labels.

In [243]:
def naive_bayes(trainingData,testData):
    from pyspark.ml.classification import NaiveBayes
    print("Naive Bayes Classifier")
    nb = NaiveBayes(smoothing=1)
    model = nb.fit(trainingData)
    predictions = model.transform(testData)
    predictions.select("value","Category","probability","label","prediction") \
        .orderBy("probability", ascending=False) \
        .show(n = 30, truncate = 30)
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
    print("Accuracy :- " + str(100*evaluator.evaluate(predictions)) +" %"+"\n")
    


### The method classifies data using Logistic Regression. I show 30 predictions to show how much they conformed to the actual labels.

In [244]:
def logistic_regression(trainingData,testData):
    from pyspark.ml.classification import LogisticRegression
    print("Logistic Regression")
    lr = LogisticRegression(maxIter=10, regParam=0.32, elasticNetParam=0)
    lrModel = lr.fit(trainingData)
    predictions = lrModel.transform(testData)
    predictions.select("value","Category","probability","label","prediction") \
            .orderBy("probability", ascending=False) \
            .show(n = 30, truncate = 30)
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
    print("Accuracy :- " + str(100*evaluator.evaluate(predictions)) +" %"+"\n")

### The script below sets up a spark session, makes a dataframe from the articles read, applies regex rules to the dataframe to clean the articles, removes stop words, extracts features using TF-IDF algorithm, drops unnecessary columns, splits the data into training and test data in [80:20] ratio, fits the training data using Naive Bayes and Logistic Regression models and finally predicts the labels for test data.

In [248]:
spark = SparkSession \
        .builder \
        .getOrCreate()
    
df = make_df()
df = regex_rules(df)
df = stopwords_remove(df)
df = tf_idf(df)
df = drop_cols(df)
df.show(5)

(trainingData, testData) = df.randomSplit([0.8, 0.2], seed = 100000)

print("Training Data Count: " + str(trainingData.count()))
print("Test Data Count: " + str(testData.count())+"\n")

#random_forest(trainingData,testData)
naive_bayes(trainingData,testData)
logistic_regression(trainingData,testData)

+--------------------+----------+-----+--------------------+--------------------+--------------------+----------------+
|               value|  Category|label|               words|      filtered_words|            features|            file|
+--------------------+----------+-----+--------------------+--------------------+--------------------+----------------+
|SAN FRANCISCO — T...|Technology|    0|[san, francisco, ...|[san, francisco, ...|(13100,[1,13,18,2...|Technology52.txt|
|It may not qualif...|Technology|    0|[it, may, not, qu...|[qualify, lightni...|(13100,[1,3,8,9,1...|Technology22.txt|
|Robert O. Work, t...|Technology|    0|[robert, o, work,...|[robert, work, ve...|(13100,[12,48,108...| Technology2.txt|
|Andrew S. Grove, ...|Technology|    0|[andrew, s, grove...|[andrew, grove, l...|(13100,[8,13,36,5...| Technology7.txt|
|SEATTLE — When Gl...|Technology|    0|[seattle, when, g...|[seattle, glenn, ...|(13100,[13,38,80,...|Technology24.txt|
+--------------------+----------+-----+-

### The same method for making a dataframe is repeated for the unknown data set read from Unknown folder

In [249]:
def make_df_unknown():
    df1 = spark.read.text("./Unknown/Technology")
    df1 = df1.withColumn("filename", input_file_name())
    df1 = df1.withColumn("Category",lit("Technology"))
    df1 = df1.withColumn("label",lit(0))

    df2 = spark.read.text("./Unknown/Sports")
    df2 = df2.withColumn("filename", input_file_name())  
    df2 = df2.withColumn("Category",lit("Sports"))
    df2 = df2.withColumn("label",lit(1))

    df3 = spark.read.text("./Unknown/Business")
    df3 = df3.withColumn("filename", input_file_name())  
    df3 = df3.withColumn("Category",lit("Business"))
    df3 = df3.withColumn("label",lit(2))

    df4 = spark.read.text("./Unknown/Politics")
    df4 = df4.withColumn("filename", input_file_name())
    df4 = df4.withColumn("Category",lit("Politics"))
    df4 = df4.withColumn("label",lit(3))

    df = df1.union(df2)
    df = df.union(df3)
    df = df.union(df4)
    return df

### The same script for making predictions is repeated for the unknown dataframe.

In [251]:
unknownData = make_df_unknown()
unknownData = regex_rules(unknownData)
unknownData = stopwords_remove(unknownData)
unknownData = tf_idf(unknownData)
unknownData = drop_cols(unknownData)
unknownData.show(5)
print(unknownData.count())
naive_bayes(trainingData,unknownData)
logistic_regression(trainingData,unknownData)
spark.stop()

+--------------------+----------+-----+--------------------+--------------------+--------------------+----------------+
|               value|  Category|label|               words|      filtered_words|            features|            file|
+--------------------+----------+-----+--------------------+--------------------+--------------------+----------------+
|The scene opened ...|Technology|    0|[the, scene, open...|[scene, opened, r...|(13100,[29,35,92,...|Technology61.txt|
|WASHINGTON — Thre...|Technology|    0|[washington, thre...|[washington, thre...|(13100,[35,94,108...|Technology80.txt|
|For the past seve...|Technology|    0|[for, the, past, ...|[past, several, y...|(13100,[33,35,77,...|Technology78.txt|
|Here’s a question...|Technology|    0|[here, s, a, ques...|[question, hoping...|(13100,[35,95,151...|Technology77.txt|
|Each Friday, Farh...|Technology|    0|[each, friday, fa...|[each, friday, fa...|(13100,[8,13,35,9...|Technology62.txt|
+--------------------+----------+-----+-

### References used for the project - 
### https://spark.apache.org/docs/2.2.0/ml-features.html
### https://spark.apache.org/docs/2.1.0/ml-classification-regression.html#naive-bayes
### https://spark.apache.org/docs/2.1.0/ml-classification-regression.html#random-forests
### https://spark.apache.org/docs/2.1.0/ml-classification-regression.html#logistic-regression