### HADOOP COMMANDS
### 1. START HADOOP
##### allstart.sh
### 2. COPY FILE FROM LOCAL TO HADOOP
#### hadoop fs -copyFromLocal Corona_NLP_train.csv
### 3. OPEN JUPYTER NOTEBOOK 
##### pysparknb

### Importing Libraries

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark=SparkSession.builder.appName('nlp').getOrCreate()

In [None]:
import numpy as np

from pyspark.ml.feature import StringIndexer, OneHotEncoder

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler, StandardScaler
from pyspark.ml import Pipeline

In [None]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

from pyspark.ml.feature import StopWordsRemover

from pyspark.ml.feature import CountVectorizer

In [None]:
from pyspark.ml.classification import LogisticRegression

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import time

### Import Data

In [None]:
df = spark.read.csv("dbfs:/FileStore/shared_uploads/joel.ignatius@centurylink.com/Corona_NLP_train.csv", sep=",", header=True, inferSchema=True)
# df = spark.read.format("csv").load("dbfs:/FileStore/shared_uploads/joel.ignatius@centurylink.com/Corona_NLP_train.csv")

### Data Exploration

In [None]:
df.show()

In [None]:
df.columns

In [None]:
len(df.columns)

In [None]:
df.printSchema()

In [None]:
df.head()

In [None]:
df.tail(3)

In [None]:
df.count()

In [None]:
df.describe().show()

In [None]:
df.select('TweetAt','OriginalTweet','Sentiment').show()

In [None]:
df = df.select('TweetAt','OriginalTweet','Sentiment')

In [None]:
df.toPandas()['OriginalTweet'].isnull().sum()

In [None]:
df = df.dropna(subset=('OriginalTweet'))

In [None]:
df.show()

In [None]:
df.toPandas()['Sentiment'].isnull().sum()

In [None]:
df = df.dropna(subset=('Sentiment'))

In [None]:
df.show()

In [None]:
import re
from pyspark.sql.functions import regexp_replace

In [None]:
df.withColumn("OriginalTweet", regexp_replace(col("OriginalTweet"), "/[^0-9A-Za-z t]+/" , ""))

### Feature Engineering

In [None]:
import pyspark.ml.feature
from pyspark.ml.feature import IDF

In [None]:
####Initialising the pipeline stages
tokenizer = Tokenizer(inputCol='OriginalTweet' , outputCol='words')
stopwords_remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')
vectorizer = CountVectorizer(inputCol='filtered_words' , outputCol='vector_words')
idf = IDF(inputCol='vector_words' , outputCol='vectorized_features')

In [None]:
####Adding Labels
labelEncoder = StringIndexer(inputCol='Sentiment' , outputCol='label').fit(df)

In [None]:
labelEncoder.transform(df).show(10)

In [None]:
df = labelEncoder.transform(df)

In [None]:
df.show()

In [None]:
### Positive = 0.0
### Negative = 1.0
### Neutral  = 2.0
### Extermely Positive = 3.0
### Extremely Negative = 4.0

### Model

In [None]:
train,test = df.randomSplit([0.6,0.4])

In [None]:
lr = LogisticRegression(featuresCol='vectorized_features' , labelCol='label')

### Pipeline

In [None]:
pipeline = Pipeline(stages = [tokenizer, stopwords_remover, vectorizer, idf, lr])

### Building Model

In [None]:
lr_model = pipeline.fit(train)

In [None]:
### Testing Model
predictions = lr_model.transform(test)

In [None]:
predictions.show()

### Model Evaluation

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')

In [None]:
accuracy = evaluator.evaluate(predictions)

In [None]:
accuracy