In [25]:
import pyspark
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession

pd.set_option("max_colwidth", 800)

In [2]:
# Run Spark on localhost
spark = SparkSession\
    .Builder()\
    .config("spark.driver.host", "127.0.0.1")\
    .appName("wiki_bias")\
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [100]:
import os
for dirname, _, filenames in os.walk("/Users/chriswallerstein/Development/python/wikipedia_bias/data"):
    for file in filenames:
        print(os.path.join(dirname, file))

wiki_data = spark.read\
    .option("mode", "dropmalformed")\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .option("multiline", "true")\
    .option("charset", "UTF-8")\
    .csv("/Users/chriswallerstein/Development/python/wikipedia_bias/data/*.csv")

/Users/chriswallerstein/Development/python/wikipedia_bias/data/wiki_corpus_2022_2_16.csv
/Users/chriswallerstein/Development/python/wikipedia_bias/data/wiki_corpus_2022_2_13.csv


In [101]:
# basic data cleansing
import pyspark.sql.functions as F

@F.udf
def ascii_ignore(x):
    return x.encode("ascii", "ignore").decode("ascii")

wiki_data = wiki_data.dropna()
wiki_data = wiki_data.withColumn("sentence_text", ascii_ignore("sentence"))

In [102]:
# tokenize
from pyspark.ml.feature import RegexTokenizer, Tokenizer

regex_tokenizer = RegexTokenizer(inputCol="sentence_text", outputCol="words", gaps=False, pattern="[a-z]+")
wiki_data = regex_tokenizer.transform(wiki_data)
wiki_data.show(5)

+--------------------+-------------+--------------------+--------------------+--------------------+
|                  id|       source|            sentence|       sentence_text|               words|
+--------------------+-------------+--------------------+--------------------+--------------------+
|20220216130833233044|conservapedia|right|thumb|the t...|right|thumb|the t...|[right, thumb, th...|
|20220216130833233089|conservapedia|a great player bo...|a great player bo...|[a, great, player...|
|20220216130833233103|conservapedia|in his career (18...|in his career (18...|[in, his, career,...|
|20220216130833404323|conservapedia|thumb|left|buildi...|thumb|left|buildi...|[thumb, left, bui...|
|20220216130833588118|conservapedia|robert lloyd dunc...|robert lloyd dunc...|[robert, lloyd, d...|
+--------------------+-------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [103]:
# vectorize words
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol="words", outputCol="features")
cv_model = cv.fit(wiki_data)
wiki_data = cv_model.transform(wiki_data)
wiki_data.select("words", "features").show(5)

+--------------------+--------------------+
|               words|            features|
+--------------------+--------------------+
|[right, thumb, th...|(19156,[0,1,2,5,8...|
|[a, great, player...|(19156,[1,2,5,8,1...|
|[in, his, career,...|(19156,[0,1,2,3,4...|
|[thumb, left, bui...|(19156,[0,2,4,5,6...|
|[robert, lloyd, d...|(19156,[0,1,2,3,4...|
+--------------------+--------------------+
only showing top 5 rows



In [104]:
# transform label column (source)
from pyspark.ml.feature import StringIndexer

si = StringIndexer(inputCol="source", outputCol="label")
si_model = si.fit(wiki_data)
wiki_data = si_model.transform(wiki_data)
wiki_data.sample(True, 0.1).select("source", "label").show(5)

+-------------+-----+
|       source|label|
+-------------+-----+
|conservapedia|  2.0|
|conservapedia|  2.0|
|conservapedia|  2.0|
|conservapedia|  2.0|
|conservapedia|  2.0|
+-------------+-----+
only showing top 5 rows



In [105]:
train, test = wiki_data.select("features", "label").randomSplit([0.9,0.1])
print(train.count())
print(test.count())

4421
467


In [106]:
# Try Logistic regression
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression()
lr_model = lr.fit(train)
lr_predictions = lr_model.transform(test)
lr_predictions.limit(5).toPandas()

Unnamed: 0,features,label,rawPrediction,probability,prediction
0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...)",1.0,"[10.199248346538464, 11.296519146393823, -21.495767492932288]","[0.2502516134988297, 0.749748386501166, 4.299385657656396e-15]",1.0
1,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...)",1.0,"[10.199248346538464, 11.296519146393823, -21.495767492932288]","[0.2502516134988297, 0.749748386501166, 4.299385657656396e-15]",1.0
2,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...)",1.0,"[10.199248346538464, 11.296519146393823, -21.495767492932288]","[0.2502516134988297, 0.749748386501166, 4.299385657656396e-15]",1.0
3,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...)",1.0,"[10.199248346538464, 11.296519146393823, -21.495767492932288]","[0.2502516134988297, 0.749748386501166, 4.299385657656396e-15]",1.0
4,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...)",1.0,"[10.199248346538464, 11.296519146393823, -21.495767492932288]","[0.2502516134988297, 0.749748386501166, 4.299385657656396e-15]",1.0


In [107]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator()
print(f"Linear regression f1 {evaluator.evaluate(lr_predictions):.4f}")

Linear regression f1 0.8196


In [79]:
# create TF-IDF weightings - to be used later
from pyspark.ml.feature import IDF

# we created the term frequency vectors above with CountVectorizer
idf = IDF(inputCol="features", outputCol="features-idf")
idfModel = idf.fit(wiki_data)
wiki_data_idf = idfModel.transform(wiki_data)
wiki_data_idf.sample(True, 0.1)\
    .select("features").show(5)

+--------------------+
|            features|
+--------------------+
|(20411,[0,1,2,3,4...|
|(20411,[0,1,8,120...|
|(20411,[0,1,3,10,...|
|(20411,[0,2,3,4,1...|
|(20411,[0,2,3,4,7...|
+--------------------+
only showing top 5 rows



In [108]:
# Remove stop words
from pyspark.ml.feature import StopWordsRemover

english_stop_words = StopWordsRemover.loadDefaultStopWords("english")
sw_remover = StopWordsRemover(
    inputCol="words",
    outputCol="sw_free_words",
    stopWords=english_stop_words)
sw_free_wiki_data = sw_remover.transform(wiki_data)

pyspark.sql.dataframe.DataFrame

In [109]:
# flatten -> (source, word)
sw_free_wiki_data_rdd = sw_free_wiki_data\
    .select("source", "sw_free_words").rdd\
    .flatMapValues(lambda x: x)

# convert each pair to (pair, 1) so that the occurrences can be counted
# the key is now (source, word), i.e. (conservapedia, obamacare)
sw_free_wiki_data_rdd = sw_free_wiki_data_rdd\
    .map(lambda x: (x, 1))\
    .reduceByKey(lambda x, y: x + y)

# flip (key, count) to (count, key) and sort
sw_free_wiki_data_rdd\
    .map(lambda x: (x[1], x[0]))\
    .sortByKey(ascending=False)\
    .filter(lambda x : x[1][0] == "conservapedia")\
    .take(10)



[(22, ('conservapedia', 'duncan')),
 (15, ('conservapedia', 'also')),
 (13, ('conservapedia', 'chancellor')),
 (13, ('conservapedia', 'texas')),
 (12, ('conservapedia', 'party')),
 (11, ('conservapedia', 'state')),
 (11, ('conservapedia', 'states')),
 (9, ('conservapedia', 'former')),
 (9, ('conservapedia', 'system')),
 (8, ('conservapedia', 'category'))]

In [110]:
sw_free_wiki_data_rdd\
    .map(lambda x: (x[1], x[0]))\
    .sortByKey(ascending=False)\
    .filter(lambda x : x[1][0] == "wikipedia")\
    .take(10)

[(162, ('wikipedia', 'style')),
 (119, ('wikipedia', 'episode')),
 (103, ('wikipedia', 'align')),
 (103, ('wikipedia', 'width')),
 (93, ('wikipedia', 'faj')),
 (90, ('wikipedia', 'treaties')),
 (83, ('wikipedia', 'new')),
 (83, ('wikipedia', 'also')),
 (80, ('wikipedia', 'text')),
 (80, ('wikipedia', 'left'))]

In [111]:
sw_free_wiki_data_rdd\
    .map(lambda x: (x[1], x[0]))\
    .sortByKey(ascending=False)\
    .filter(lambda x : x[1][0] == "rational")\
    .take(10)

[(311, ('rational', 'right')),
 (239, ('rational', 'one')),
 (233, ('rational', 'align')),
 (231, ('rational', 'also')),
 (211, ('rational', 'de')),
 (205, ('rational', 'people')),
 (171, ('rational', 'px')),
 (159, ('rational', 'comfort')),
 (154, ('rational', 'even')),
 (154, ('rational', 'war'))]