In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:95% !important; }</style>"))

In [2]:
import os
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"

In [3]:
from pyspark import SparkContext
import pyspark
conf = pyspark.SparkConf()
conf.set('spark.local.dir', '/home/jimmy/spark_tmp')
conf.set('spark.executor.memory', '15G')
conf.set('spark.driver.memory', '15G')
conf.set('spark.driver.maxResultSize', '15G')
conf.set("spark.driver.host", "localhost")
#conf.set('spark.cores.max', '8')
#conf.set("spark.default.parallelism", 8)
sc = SparkContext(appName="Train classifier", conf=conf)
sc.setCheckpointDir('checkpoint/')
sc

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession(sc)
spark

In [5]:
files = sc.textFile("/home/jimmy/Documents/courses/spark/data/*/part-00[0-9]*")
data = spark.read.json(files)

In [6]:
vandal = data.filter(data.label == "vandal")
vandal_count = data.filter(data.label == "vandal").count()
vandal_count

1977

In [7]:
unsafe_count = 5_000
total_unsafe_count = data.filter(data.label == "unsafe").count()
unsafe = data.filter(data.label == "unsafe").sample(withReplacement = False, fraction= unsafe_count/total_unsafe_count)

unsafe_count

5000

In [8]:
safe_count = 10_000
total_count = data.count()
total_safe_count = total_count - vandal_count - total_unsafe_count
safe = data.filter(data.label == "safe").sample(withReplacement = False, fraction= safe_count/total_safe_count)

In [9]:
data = vandal.union(unsafe).union(safe)

In [None]:
#data.write.save(f'/home/jimmy/Documents/courses/spark/notebooks/processed_data')

In [None]:
import gc

del vandal
del unsafe
del safe

gc.collect()

In [10]:
from difflib import unified_diff

def make_diff(old, new):
    additions = []
    deletions = []
    generator = unified_diff(old.split('\n'), new.split('\n'))
    for l in generator:
        if l.startswith('+'):
            additions.append(l[1:])
        elif l.startswith('-'):
            deletions.append(l[1:])
    additions = ' '.join(additions)
    deletions = ' '.join(deletions)
    return (additions, deletions)

In [11]:
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit, lower
from pyspark.sql.types import StringType, LongType
from pyspark.sql.functions import udf

profanities = [s.strip() for s in open("/home/jimmy/Documents/courses/spark/profanities.txt", "r").readlines()]
profanities = sc.broadcast(profanities)

@udf("string")
def additions(old, new):
    (additions, _) = make_diff(old, new)
    return additions

@udf("string")
def deletions(old, new):
    (_, deletions) = make_diff(old, new)
    return deletions

@udf("long")
def longest_same_character_sequence(additions):
    ans, curr = 0, 1
    previous = None
    for c in additions:
        if c == previous:
            curr += 1
        else:
            curr = 1
        if curr > ans:
            ans = curr
        previous = c
    return ans

@udf("long")
def count_profanities(additions):
    count = 0
    additions = additions
    for profanity in profanities.value:
        count += additions.count(profanity)
    return count

drop_list = ["text_old", "text_new", 'url_page', 'title_page', 'name_user', 'comment']

def process_dataframe(df):
    return df \
        .withColumn("additions", lower(additions("text_old", "text_new"))) \
        .withColumn("deletions", lower(deletions("text_old", "text_new"))) \
        .drop(*drop_list) \
        .withColumn("profanities", count_profanities("additions")) \
        .withColumn("longest_same_character_sequence", longest_same_character_sequence("additions"))

In [12]:
data = process_dataframe(data)
data.printSchema()
data.show()

root
 |-- label: string (nullable = true)
 |-- additions: string (nullable = true)
 |-- deletions: string (nullable = true)
 |-- profanities: long (nullable = true)
 |-- longest_same_character_sequence: long (nullable = true)

+------+--------------------+--------------------+-----------+-------------------------------+
| label|           additions|           deletions|profanities|longest_same_character_sequence|
+------+--------------------+--------------------+-----------+-------------------------------+
|vandal|++ 
 | name= jona...|-- 
 | name= jona...|          0|                              2|
|vandal|++ 
 * bucko is t...|-- 
 * [[bucko (c...|          3|                              3|
|vandal|++ 
 the only nat...|-- 
 the only nat...|          3|                              2|
|vandal|++ 
 throughout h...|-- 
 throughout h...|          1|                              2|
|vandal|++ 
 fan chung wa...|-- 
 fan chung wa...|          2|                              3|
|vandal|++ 
 

In [13]:
data.write.save(f'/home/jimmy/Documents/courses/spark/notebooks/processed_data')

In [None]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, Word2Vec, StringIndexer, IndexToString, HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
from pyspark.sql.functions import expr, concat, lit
from pyspark import StorageLevel

text_columns = ['additions', 'deletions']
input_columns = text_columns + ['profanities','longest_same_character_sequence']
pipelines = {}

for col in text_columns:
    #regexTokenizer = RegexTokenizer(inputCol=col, outputCol="temp1")
    tokenizer = Tokenizer(inputCol=col, outputCol="temp1")
    stopwordsRemover = StopWordsRemover(inputCol='temp1', outputCol='temp2')
    #word2vec = Word2Vec(inputCol="temp2", outputCol="temp3", numPartitions=16, minCount=10)
    #countVectorizer = CountVectorizer(inputCol="temp2", outputCol="temp3")
    tf = HashingTF(inputCol="temp2", outputCol="temp3")
    idf = IDF(inputCol="temp3", outputCol="temp4")
    pipeline = Pipeline(stages=[tokenizer, stopwordsRemover, tf, idf])
    pipelines[col] = pipeline.fit(data)
    data = pipelines[col].transform(data) \
            .drop(col, 'temp1', 'temp2', 'temp3') \
            .withColumnRenamed('temp4', col)
    
label_indexer = StringIndexer(inputCol = "label", outputCol = "target")
label_indexer = label_indexer.fit(data)
pipelines['label'] = label_indexer
data = pipelines['label'].transform(data)

data.show(5)
pipelines

In [None]:
for col in text_columns:
    pipelines[col].save(f'/home/jimmy/Documents/courses/spark/notebooks/pipeline_{col}')
pipelines['label'].save(f'/home/jimmy/Documents/courses/spark/notebooks/pipeline_label')

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=input_columns,outputCol="features")
data = assembler.transform(data).drop(*input_columns) #.persist(StorageLevel.DISK_ONLY).collect()
data.show(5)

In [None]:
total_count = vandal_count + unsafe_count + safe_count
vandal_ratio = vandal_count / total_count
unsafe_ratio = unsafe_count / total_count
safe_ratio = safe_count / total_count
(vandal_ratio, unsafe_ratio, safe_ratio)

In [None]:
from pyspark.sql import functions as F

data = data.withColumn('weight', F.when(F.col('label')=='safe', safe_ratio).when(F.col('label')=='unsafe', unsafe_ratio).otherwise(vandal_ratio))
data.show()

In [None]:
data.repartition(10).write.save(f'/home/jimmy/Documents/courses/spark/notebooks/processed_data_2')

In [None]:
data = spark.load.read(f'/home/jimmy/Documents/courses/spark/notebooks/processed_data_2')
from pyspark.ml.feature import StringIndexerModel, IndexToString
pipelines = {}
pipelines["label"] = StringIndexerModel.load(f'/home/jimmy/Documents/courses/spark/notebooks/pipeline_label')

In [None]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

scaler = scaler.fit(data)
data = scaler.transform(data).drop("features").withColumnRenamed("scaledFeatures", "features")
data.count()

In [None]:
# set seed for reproducibility
(trainingData, testData) = data.randomSplit([0.7, 0.3], seed = 100)
from pyspark import StorageLevel
trainingData.persist(StorageLevel.MEMORY_ONLY)
testData.persist(StorageLevel.MEMORY_ONLY)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

In [None]:
del data
gc.collect()

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol='target', weightCol="weight", maxIter=20, regParam=0.3, elasticNetParam=0)

label_converter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=pipelines['label'].labels)

pipeline = Pipeline(stages=[lr, label_converter])

results = pipeline.fit(trainingData).transform(testData)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction")

evaluator.evaluate(results)

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator # Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build()) # Create 10-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=10)
cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)
# Evaluate best model
evaluator.evaluate(predictions)

In [None]:
cvModel.save(f'/home/jimmy/Documents/courses/spark/notebooks/logistic_regression_classifier')

In [None]:
pipeline = Pipeline(stages=[cvModel, label_converter])
pipeline.save(f'/home/jimmy/Documents/courses/spark/notebooks/pipeline_logistic_regression_classifier')

In [None]:
sc.stop()