In [70]:
PYSPARK_PATH = '../spark/spark-2.4.3-bin-hadoop2.7/'

In [71]:
import os
import numpy as np

import findspark
findspark.init(PYSPARK_PATH)
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

from pyspark.sql import *
from pyspark.sql.functions import col, lower, regexp_replace, split, size, UserDefinedFunction
from pyspark.sql.types import StringType, IntegerType
from functools import reduce
import re

In [134]:
# df_path = "csv_data/top-10-sample-output.csv"
df_path = "csv_data/sample-output-ores.csv"
df_out_path = "{}_features.csv".format(df_path[:-4])
df = spark.read.csv(df_path, inferSchema=True, header=True, multiLine=True, escape='"')

### EDA + Feature Engineering

In [135]:
"""
Columns filtering
    Useful: sha1 (as identifier),  timestamp, title, text
    Questionable: user, comment, ip, id (there are different articles with the same id), parentid, restrictions
    Not useful (no unique info): model, format, ns, contributor, revision, restrictions
""" 

print("All columns:", df.columns)
print("Unique values for..")
for column in ["format", "model", "ns", "contributor", "revision", "restrictions"]:
    print("\t", column, ":", df.select(column).distinct().rdd.map(lambda r: r[0]).collect())
    
ores_weights = {'Stub': 1, 'Start': 2, 'C': 3, 'B': 4, 'GA': 5, 'FA': 6}
ores_scores = list(ores_weights.keys())
useful_columns = ["sha1", "timestamp", "title", "text"] + ores_scores
print("Useful columns:", useful_columns)

All columns: ['_c0', 'Unnamed: 0', 'comment', 'contributor', 'format', 'id', 'ip', 'model', 'ns', 'parentid', 'restrictions', 'revision', 'sha1', 'text', 'timestamp', 'title', 'username', 'revid', 'B', 'C', 'FA', 'GA', 'Start', 'Stub']
Unique values for..
	 format : ['text/x-wiki']
	 model : ['wikitext']
	 ns : [12, 6, 4, 100, 118, 0]
	 contributor : [None, '  ', ' ']
	 revision : ['        ', '       ', '         ', '          ']
	 restrictions : ['edit=autoconfirmed:move=sysop', 'edit=autoconfirmed:move=autoconfirmed', 'edit=sysop:move=sysop', None, 'move=:edit=', 'sysop', 'move=sysop', 'move=sysop:edit=sysop']
Useful columns: ['sha1', 'timestamp', 'title', 'text', 'Stub', 'Start', 'C', 'B', 'GA', 'FA']


In [136]:
clean_df = df[useful_columns]
clean_df.printSchema()
print("Size of the DataFrame: {} records".format(clean_df.count()))

root
 |-- sha1: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)
 |-- Stub: double (nullable = true)
 |-- Start: double (nullable = true)
 |-- C: double (nullable = true)
 |-- B: double (nullable = true)
 |-- GA: double (nullable = true)
 |-- FA: double (nullable = true)

Size of the DataFrame: 38842 records


In [137]:
df_features = clean_df

In [138]:
df_features.show(4)

+--------------------+-------------------+-----------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+
|                sha1|          timestamp|      title|                text|                Stub|               Start|                  C|                  B|                 GA|                 FA|
+--------------------+-------------------+-----------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+
|5ib24sew99ceyko33...|2019-04-11 16:19:39|The X-Files|{{About|the telev...|0.003073017103110...|0.007612710147196926|0.02019868704430509| 0.0510891495437035|0.31740231803291463| 0.6006241181287694|
|q2nngvea32gwowk9y...|2019-04-25 23:53:46|Third World|{{mergefrom|Third...|0.006592174737638895| 0.03438149756654297|  0.452702637198141|0.20820061897204512| 0.2271170545728272|0.07100601695280477|
|ihx6i3puo

In [139]:
def words_counts(df):
    return df.withColumn('n_words', size(split(col('text'), ' ')))

df_features = words_counts(df_features)

In [140]:
"""
Headings counting
Syntaxis:
    ==Level 2==
    ===Level 3===
    ====Level 4====
    =====Level 5=====
    ======Level 6======
"""

def single_head_level_count(text, level):
    assert level in range(2,7)
    pattern = "=" * level
    pattern = pattern + "[a-zA-Z0-9.,!? ]+" + pattern
    return size(split(text, pattern=pattern))-1

def count_headings(df):
    return reduce(
        lambda df, level: df.withColumn("level{}".format(level),
                                        single_head_level_count(col("text"), level)),
        range(2,7), df)
    
df_features = count_headings(df_features)

In [141]:
"""
Citation counting
Syntaxis:
    {{cite {book}(.*?)}}
    {{cite {journal}(.*?)}}
"""

def citation_counter(citation_source):
    def _count_citations(text):
        matches = re.findall(f"{{cite {citation_source}(.*?)}}", text, re.IGNORECASE)
        return len(matches)
    return _count_citations

book_citations_count = UserDefinedFunction(citation_counter("book"), IntegerType())
journal_citations_count = UserDefinedFunction(citation_counter("journal"), IntegerType())

df_features = df_features.withColumn("book_citations", book_citations_count("text"))\
  .withColumn("journal_citations", journal_citations_count("text"))

In [142]:
'''Internal Links:
    [[A]] -- internal reference to an article titled A
    [[A|B]] -- internal reference to an article titled A (written as B)
    [[A#C|B]] -- internal reference to a section C of an article titled A (written as B)'''

def count_internal_links(df):
    
    pattern = "\[\[[a-zA-Z0-9.,!? ]+\]\]"
    pattern += "|\[\[[a-zA-Z0-9.,!? ]+\|[a-zA-Z0-9.,!? ]+\]\]"
    pattern += "|\[\[[a-zA-Z0-9.,!? ]+#[a-zA-Z0-9.,!? ]+\|[a-zA-Z0-9.,!? ]+\]\]"
    
    return df.withColumn("n_internal_links",
                         size(split(col('text'), pattern=pattern))-1)

df_features = count_internal_links(df_features)

In [143]:
'''External Links:
    https://www.google.com -- simple link
    [https://www.google.com] -- link (reference)
    [https://www.google.com A] -- reference written as A
    <ref name="B">[https://www.google.com A]</ref> -- reference A written as B, can be referenced again like:
    <ref name="B" /> -- reference to the source B
    <ref>Lots of words</ref> -- reference without a link
    {{sfnm|1a1=Craig|1y=2005|1p=14|2a1=Sheehan|2y=2003|2p=85}} -- external reference
    Example:
        {{sfnm|1a1=McLaughlin|1y=2007|1p=59|2a1=Flint|2y=2009|2p=27}} -- McLaughlin 2007, p. 59; Flint 2009, p. 27.
        {{sfnm|1a1=Craig|1y=2005|1p=14|2a1=Sheehan|2y=2003|2p=85}} -- Craig 2005, p. 14; Sheehan 2003, p. 85.'''

def count_external_links(df):
    
    pattern = 'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+'
    pattern += '|\[https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+\]'
    pattern += '|\[https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+\ [a-zA-Z0-9.,!? ]+]'
    pattern += '<ref name="[a-zA-Z0-9.,!? ]+">\[https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+\]'
    
    # template of the external reference
    #template = '\{\{sfnm\|1a1=[a-zA-Z]+\|1y=[0-9]+\|1p=[0-9]+\|2a1=[a-zA-Z]+\|2y=[0-9]+\|2p=[0-9]+\}\}'
    
    # <ref name="B" /> - this form use information from other reference, so we didn't count it again
    # <ref>Lots of words</ref> - reference without a link will be in the other feature
    
    return df.withColumn("n_external_links",
                         size(split(col('text'), pattern=pattern))-1)

df_features = count_external_links(df_features)

In [144]:
"""
Paragraphs
"""

def count_paragraphs(df):
    
    # filter the basic wikipedia syntaxis
    pattern_filtering = '\n\n\{\{.*\}\}\n\n|\n\n\[\[.*\]\]\n\n|\n\n={1,7}.*={1,7}\n\n'
    # split by two enters
    pattern_splitting = '\n\n'

    return df.withColumn('n_paragraphs', size(split(regexp_replace(col('text'), 
                                                                   pattern_filtering, ''), 
                                                    pattern_splitting))-1)

df_features = count_paragraphs(df_features)

In [145]:
'''<ref>Lots of words</ref> -- reference without a link
{{cn}} -- citation needed'''

def count_unreferenced(df):
    
    # citation needed and references without link
    pattern = '\{\{cn\}\}|<ref>[a-zA-Z0-9.,!? ]+</ref>'
    
    return df.withColumn('n_unreferenced', size(split(col('text'), pattern))-1)

df_features = count_unreferenced(df_features)

In [146]:
'''[[Category:Category name]]
[[:Category:Category name]]
[[:File:File name]]'''

def count_categories(df):
    
    #using template
    pattern = '\[\[:?Category:[a-zA-Z0-9.,\-!?\(\) ]+\]\]'
    
    return df.withColumn('n_categories', size(split(col('text'), pattern))-1)

df_features = count_categories(df_features)

In [147]:
'''
    [[File: | thumb  | upright | right | alt= | caption ]]
'''

def count_of_images(df):
    any_text = "[a-zA-Z0-9.,!? ]+ \] "
    pattern = "\[[a-zA-Z0-9.,!? ]+\|[a-zA-Z0-9.,!? ]+\|[a-zA-Z0-9.,!? ]+\|[a-zA-Z0-9.,!? ]+\|[a-zA-Z0-9.,!? ]+\|[a-zA-Z0-9.,!? ]+\|[a-zA-Z0-9.,!? ]+\|[a-zA-Z0-9.,!? ]+\]"
    return df.withColumn("n_images", size(split(col('text'), pattern=pattern))-1)

df_features = count_of_images(df_features)

In [148]:
df_features.printSchema()

root
 |-- sha1: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)
 |-- Stub: double (nullable = true)
 |-- Start: double (nullable = true)
 |-- C: double (nullable = true)
 |-- B: double (nullable = true)
 |-- GA: double (nullable = true)
 |-- FA: double (nullable = true)
 |-- n_words: integer (nullable = false)
 |-- level2: integer (nullable = false)
 |-- level3: integer (nullable = false)
 |-- level4: integer (nullable = false)
 |-- level5: integer (nullable = false)
 |-- level6: integer (nullable = false)
 |-- book_citations: integer (nullable = true)
 |-- journal_citations: integer (nullable = true)
 |-- n_internal_links: integer (nullable = false)
 |-- n_external_links: integer (nullable = false)
 |-- n_paragraphs: integer (nullable = false)
 |-- n_unreferenced: integer (nullable = false)
 |-- n_categories: integer (nullable = false)
 |-- n_images: integer (nullable = false)



In [149]:
features_names = ['title',
                  'Stub',
                  'Start',
                  'C',
                  'B',
                  'GA',
                  'FA',
                  'n_words',
                  'level2',
                  'level3',
                  'level4',
                  'level5',
                  'level6',
                  'book_citations',
                  'journal_citations',
                  'n_internal_links',
                  'n_external_links',
                  'n_paragraphs',
                  'n_unreferenced',
                  'n_categories',
                  'n_images'
                 ]

df_features = df_features.select(list(map(lambda x: df_features[x].cast('double') if x != 'title' else df_features[x], 
                                          features_names)))

In [150]:
for feature in features_names:
    
    df_features = df_features.filter(df_features[feature].isNotNull())

In [25]:
df_features.toPandas().to_csv(df_out_path)

##  Clustering

In [103]:
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.feature import PCA

from pyspark.ml.feature import VectorAssembler

In [151]:
vectorAssembler = VectorAssembler(inputCols=features_names[1:],
                                  outputCol="features")

df_features = vectorAssembler.setHandleInvalid("skip").transform(df_features)

In [None]:
df_features.select('features').count()

In [153]:
df_features.select('features').limit(20).collect()

[Row(features=DenseVector([0.0031, 0.0076, 0.0202, 0.0511, 0.3174, 0.6006, 19873.0, 34.0, 26.0, 5.0, 0.0, 0.0, 23.0, 0.0, 605.0, 283.0, 92.0, 0.0, 40.0, 0.0])),
 Row(features=DenseVector([0.0066, 0.0344, 0.4527, 0.2082, 0.2271, 0.071, 2140.0, 10.0, 0.0, 0.0, 0.0, 0.0, 9.0, 2.0, 81.0, 6.0, 23.0, 1.0, 6.0, 0.0])),
 Row(features=DenseVector([0.0035, 0.0086, 0.0409, 0.1002, 0.4848, 0.3621, 10813.0, 24.0, 14.0, 0.0, 0.0, 0.0, 7.0, 17.0, 332.0, 111.0, 94.0, 0.0, 53.0, 0.0])),
 Row(features=DenseVector([0.0049, 0.0119, 0.0648, 0.1765, 0.48, 0.2618, 5271.0, 18.0, 8.0, 1.0, 0.0, 0.0, 12.0, 30.0, 175.0, 51.0, 40.0, 0.0, 2.0, 0.0])),
 Row(features=DenseVector([0.0058, 0.0287, 0.2947, 0.4207, 0.0601, 0.19, 3030.0, 10.0, 0.0, 0.0, 0.0, 0.0, 3.0, 1.0, 146.0, 18.0, 24.0, 0.0, 1.0, 0.0])),
 Row(features=DenseVector([0.0045, 0.028, 0.5393, 0.3212, 0.0852, 0.0219, 1789.0, 12.0, 5.0, 0.0, 0.0, 0.0, 0.0, 0.0, 55.0, 15.0, 28.0, 0.0, 2.0, 0.0])),
 Row(features=DenseVector([0.0051, 0.0204, 0.2895, 0.2569, 0.

In [154]:
# Trains a bisecting k-means model.
bkm = BisectingKMeans().setK(3).setSeed(1)
model = bkm.fit(df_features)

# Evaluate clustering.
cost = model.computeCost(df_features)
print("Within Set Sum of Squared Errors = " + str(cost))

# Shows the result.
print("Cluster Centers: ")
centers = model.clusterCenters()
for center in centers:
    print(center)

Py4JJavaError: An error occurred while calling o2092.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 162.0 failed 1 times, most recent failure: Lost task 0.0 in stage 162.0 (TID 6127, localhost, executor driver): java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:109)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1073)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1089)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1127)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:50)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.mllib.clustering.BisectingKMeans$.org$apache$spark$mllib$clustering$BisectingKMeans$$summarize(BisectingKMeans.scala:304)
	at org.apache.spark.mllib.clustering.BisectingKMeans.run(BisectingKMeans.scala:171)
	at org.apache.spark.ml.clustering.BisectingKMeans$$anonfun$fit$1.apply(BisectingKMeans.scala:272)
	at org.apache.spark.ml.clustering.BisectingKMeans$$anonfun$fit$1.apply(BisectingKMeans.scala:257)
	at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:183)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:183)
	at org.apache.spark.ml.clustering.BisectingKMeans.fit(BisectingKMeans.scala:257)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:109)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1073)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1089)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1127)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:50)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)


In [None]:
#get predictions and collect into the list
labels = model.transform(df_features).select('prediction')
labels = labels.collect() 
#create vanilla list with ints 
# instead of list with element type Rows
labels = list(map(lambda x: x.prediction, labels))

## PCA compressing

In [None]:
pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df_features)

compressed = model.transform(df_features).select("pcaFeatures")
compressed.show(10)

In [None]:
compressed = compressed.toPandas()
compressed.pcaFeatures = compressed.pcaFeatures.apply(lambda x: np.array(x))
compressed = compressed.pcaFeatures.values
compressed = np.stack(compressed, axis=0)

In [None]:
colors = {0: 'red', 1: 'green', 2: 'blue'}
colors = list(map(lambda x: colors[x], labels))