In [0]:
# Import library

from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import NGram, VectorAssembler
from pyspark.ml.feature import ChiSqSelector
from pyspark.sql.functions import when 
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.classification import MultilayerPerceptronClassifier
import mlflow

In [0]:
# Load data

# File location and type
file_location_1 = "/FileStore/tables/Twitter_Data.csv"
file_location_2 = "/FileStore/tables/Reddit_Data.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
file_1 = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", 'true') \
  .option("sep", delimiter) \
  .load(file_location_1)

file_2 = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", 'true') \
  .option("sep", delimiter) \
  .load(file_location_2)

full_data = file_1.union(file_2)

display(full_data)

clean_text,category
when modi promised “minimum government maximum governance” expected him begin the difficult job reforming the state why does take years get justice state should and not business and should exit psus and temples,-1.0
talk all the nonsense and continue all the drama will vote for modi,0.0
what did just say vote for modi welcome bjp told you rahul the main campaigner for modi think modi should just relax,1.0
asking his supporters prefix chowkidar their names modi did great service now there confusion what read what not now crustal clear what will crass filthy nonsensical see how most abuses are coming from chowkidars,1.0
answer who among these the most powerful world leader today trump putin modi may,1.0
kiya tho refresh maarkefir comment karo,0.0
surat women perform yagna seeks divine grace for narendra modi become again,
",0",
this comes from cabinet which has scholars like modi smriti and hema time introspect,0.0
with upcoming election india saga going important pair look current modi leads govt elected with deal brexit combination this weekly looks juicy bears imho,1.0


In [0]:
# Clean NA
full_data = full_data.dropna()
full_data = full_data.filter(full_data.category != 0)
full_data = full_data.withColumn('category', when(full_data.category == -1, 0).otherwise(full_data.category))
display(full_data)
print(full_data.count())

clean_text,category
when modi promised “minimum government maximum governance” expected him begin the difficult job reforming the state why does take years get justice state should and not business and should exit psus and temples,0
what did just say vote for modi welcome bjp told you rahul the main campaigner for modi think modi should just relax,1
asking his supporters prefix chowkidar their names modi did great service now there confusion what read what not now crustal clear what will crass filthy nonsensical see how most abuses are coming from chowkidars,1
answer who among these the most powerful world leader today trump putin modi may,1
with upcoming election india saga going important pair look current modi leads govt elected with deal brexit combination this weekly looks juicy bears imho,1
gandhi was gay does modi,1
things like demonetisation gst goods and services tax…the upper castes would sort either view favourably say that need give this more time other castes like dalits the muslims were more against because that’ just not modi’ constituency2,1
hope tuthukudi people would prefer honest well behaved nationalist courageous likly minister modi cabinet vote benifit thuthukudi,1
calm waters wheres the modi wave,1
vote such party and leadershipwho can take fast and firm action none other than narendra damodardas modi and bjp party,0


In [0]:
# Split train test
(train_set, test_set) = full_data.randomSplit([0.8, 0.2], seed = 123)
print(train_set.count(), len(train_set.columns))
print(test_set.count(), len(test_set.columns))

In [0]:
# Text preparation

tokenizer = Tokenizer(inputCol="clean_text", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
test_set_1 = pipelineFit.transform(test_set)
train_df.show(5)

In [0]:
# Train first  LR model

lr = LogisticRegression()

paramGrid = (ParamGridBuilder() \
                 .addGrid(lr.regParam, [0.1, 0.01]) \
                 .addGrid(lr.fitIntercept, [False, True]) \
                 .build())

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC' )

tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluator,
                           trainRatio=0.8)

lrModel = tvs.fit(train_df)

predictions = lrModel.transform(test_set_1)

accuracy = predictions.filter(predictions["label"] == predictions["prediction"]).count() / float(predictions.count())
auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(auc))

In [0]:
# Text preparation

tokenizer = Tokenizer(inputCol="clean_text", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")

In [0]:
# Train second LR model

lr = LogisticRegression()

paramGrid = (ParamGridBuilder() \
                 .addGrid(lr.regParam, [0.1, 0.01]) \
                 .addGrid(lr.fitIntercept, [False, True]) \
                 .build())

tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluator,
                           trainRatio=0.8)

pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, tvs])

pipelineFit = pipeline.fit(train_set)

predictions = pipelineFit.transform(test_set)

accuracy = predictions.filter(predictions["label"] == predictions["prediction"]).count() / float(predictions.count())
auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(auc))

In [0]:
# N-gram Implementation ver 1

def build_trigrams(inputCol=["clean_text","target"], n=3):
    tokenizer = [Tokenizer(inputCol="clean_text", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=2**14,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="rawFeatures"
    )]
    label_stringIdx = [StringIndexer(inputCol = "category", outputCol = "label")]
    selector = [ChiSqSelector(numTopFeatures=2**14,featuresCol='rawFeatures', outputCol="features")]
    lr = LogisticRegression()
    
    paramGrid = (ParamGridBuilder() \
                 .addGrid(lr.regParam, [0.1, 0.01]) \
                 .addGrid(lr.fitIntercept, [False, True]) \
                 .build())

    tvs = [TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluator,
                           trainRatio=0.8)]
    
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx+selector+tvs)

In [0]:
# Train third model

trigram_pipelineFit = build_trigrams().fit(train_set)
predictions = trigram_pipelineFit.transform(test_set)

accuracy = predictions.filter(predictions["label"] == predictions["prediction"]).count() / float(predictions.count())
auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(auc))

In [0]:
# N-gram Implementation ver 2

def build_ngrams_wocs(inputCol=["clean_text","target"], n=3):
    tokenizer = [Tokenizer(inputCol="clean_text", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=5460,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="features"
    )]
    label_stringIdx = [StringIndexer(inputCol = "category", outputCol = "label")]
    lr = LogisticRegression()
    
    paramGrid = (ParamGridBuilder() \
                 .addGrid(lr.regParam, [0.1, 0.01]) \
                 .addGrid(lr.fitIntercept, [False, True]) \
                 .build())

    tvs = [TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluator,
                           trainRatio=0.8)]
    
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx+tvs)

In [0]:
# Train fourth LR model

trigramwocs_pipelineFit = build_ngrams_wocs().fit(train_set)
predictions = trigramwocs_pipelineFit.transform(test_set)

accuracy = predictions.filter(predictions["label"] == predictions["prediction"]).count() / float(predictions.count())
auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(auc))

In [0]:
# N-gram Implementation GBT Classifier ver1

def build_trigrams(inputCol=["clean_text","target"], n=3):
    tokenizer = [Tokenizer(inputCol="clean_text", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=2**14,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="rawFeatures"
    )]
    label_stringIdx = [StringIndexer(inputCol = "category", outputCol = "label")]
    
    selector = [ChiSqSelector(numTopFeatures=2**14,featuresCol='rawFeatures', outputCol="features")]
    
    gb = GBTClassifier()
    
    paramGrid = (ParamGridBuilder() \
                 .addGrid(gb.maxDepth, [3, 7, 15]) \
                 .addGrid(gb.maxIter, [5, 10, 15]) \
                 .build())
    
    tvs = [TrainValidationSplit(estimator=gb,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluator,
                           trainRatio=0.8)]
    
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx + selector + tvs)

In [0]:
# GBT Classifier ver1

trigramwocs_pipelineFit = build_trigrams().fit(train_set)
predictions = trigramwocs_pipelineFit.transform(test_set)

accuracy = predictions.filter(predictions["label"] == predictions["prediction"]).count() / float(predictions.count())
auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(auc))

In [0]:
# N-gram Implementation GBT Classifier ver2

def build_ngrams_wocs(inputCol=["clean_text","target"], n=3):
    tokenizer = [Tokenizer(inputCol="clean_text", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=5460,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="features"
    )]
    label_stringIdx = [StringIndexer(inputCol = "category", outputCol = "label")]
    gb = GBTClassifier()
    
    paramGrid = (ParamGridBuilder() \
                 .addGrid(gb.maxDepth, [3, 7, 15]) \
                 .addGrid(gb.maxIter, [5, 10, 15]) \
                 .build())
    
    tvs = [TrainValidationSplit(estimator=gb,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluator,
                           trainRatio=0.8)]
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx + tvs)

In [0]:
# GBT Classifier ver 2

Ngramwocs_pipelineFit = build_ngrams_wocs().fit(train_set)
predictions = Ngramwocs_pipelineFit.transform(test_set)

accuracy = predictions.filter(predictions["label"] == predictions["prediction"]).count() / float(predictions.count())
auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(auc))

In [0]:
# N-gram Implementation MLP PySpark ver 1

def build_trigrams(inputCol=["clean_text","target"], n=3):
    tokenizer = [Tokenizer(inputCol="clean_text", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=2**14,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="rawFeatures"
    )]
    label_stringIdx = [StringIndexer(inputCol = "category", outputCol = "label")]
    
    selector = [ChiSqSelector(numTopFeatures=2**14,featuresCol='rawFeatures', outputCol="features")]
    
    mlp = [MultilayerPerceptronClassifier(maxIter=300,blockSize=16, seed=1234,
                                          layers = [2**14, 64, 32, 2])]
    
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx + selector + mlp)

In [0]:
# MLP PySpark ver 1

Tgramwocs_pipelineFit = build_trigrams().fit(train_set)
predictions_wocs = Tgramwocs_pipelineFit.transform(test_set)

accuracy = predictions_wocs.filter(predictions_wocs["label"] == predictions_wocs["prediction"]).count() / float(predictions_wocs.count())
auc = evaluator.evaluate(predictions_wocs, {evaluator.metricName: "areaUnderROC"})

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(auc))

In [0]:
# N-gram Implementation MLP PySpark ver 2

def build_ngrams_wocs(inputCol=["clean_text","target"], n=3):
    tokenizer = [Tokenizer(inputCol="clean_text", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=5460,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="features"
    )]
    label_stringIdx = [StringIndexer(inputCol = "category", outputCol = "label")]
    
    mlp = [MultilayerPerceptronClassifier(maxIter=300,blockSize=16, seed=1234,
                                          layers = [16380, 64, 32, 2])]

    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx + mlp)

In [0]:
# MLP PySpark ver 1

Ngramwocs_pipelineFit = build_ngrams_wocs().fit(train_set)
predictions_wocs = Ngramwocs_pipelineFit.transform(test_set)

accuracy = predictions_wocs.filter(predictions_wocs["label"] == predictions_wocs["prediction"]).count() / float(predictions_wocs.count())
auc = evaluator.evaluate(predictions_wocs, {evaluator.metricName: "areaUnderROC"})

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(auc))