# Wikipedia data transformation

In [1]:
# install a magic to reload the src directory
%load_ext autoreload
%autoreload 2

In [2]:
!tree ../data/

[38;5;33m../data/[0m
├── [38;5;33mexternal[0m
├── [38;5;33minterim[0m
├── [38;5;33mprocessed[0m
└── [38;5;33mraw[0m
    └── [38;5;40menwiki-20080103.main.bz2[0m

4 directories, 1 file


Read data into Spark. Reading through all the data by counting takes ~2 hours with 8 cores.

In [3]:
from pyspark.sql import SparkSession, functions as F
import os

spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 32)
sc = spark.sparkContext
conf = sc._jsc.hadoopConfiguration()
conf.set("textinputformat.record.delimiter", "\n\n")


input_file = 'enwiki-20080103.main.bz2'
rdd = sc.textFile(os.path.join("../data/raw", input_file))

In [189]:
%time rdd.count()

CPU times: user 231 ms, sys: 210 ms, total: 440 ms
Wall time: 1h 3min 53s


116590880

Apply the schema and create a temporary table.

In [69]:
rdd.map(impwiki.process_record).take(1)

[(u'6 233188 AmericanSamoa 2001-01-19T01:12:51Z ip:office.bomis.com ip:office.bomis.com',
  None,
  None,
  None,
  None,
  None,
  None,
  None,
  None,
  None,
  u'*',
  u'0',
  u'1516')]

In [79]:
enwiki_df = spark.createDataFrame(
    rdd.map(impwiki.process_edit), 
    schema=impwiki.wikipedia_schema
)
enwiki_df = impwiki.transform(enwiki_df)
enwiki_df.createOrReplaceTempView('enwiki')
enwiki_df.show(truncate=False, vertical=True, n=1)

-RECORD 0----------------------------
 article_id    | 6                   
 rev_id        | 233188              
 article_title | AmericanSamoa       
 timestamp     | 2001-01-18 17:12:51 
 username      | ip:office.bomis.com 
 user_id       | ip:office.bomis.com 
 category      | null                
 image         | null                
 main          | null                
 talk          | null                
 user          | null                
 user_talk     | null                
 other         | null                
 external      | null                
 template      | null                
 comment       | *                   
 minor         | false               
 textdata      | 1516                
 year          | 2001                
 month         | 1                   
only showing top 1 row



Let there be light. Here's the beginning of Wikipedia using Spark as a query engine. But if we take a second look, the timestamps are out of order and we're starting off at revision 233188.

In [80]:
spark.sql("""
SELECT 
    timestamp,
    article_id, 
    rev_id,
    user_id,
    username,
    minor,
    textdata
FROM 
    enwiki
""").show()

+-------------------+----------+---------+--------------------+--------------------+-----+--------+
|          timestamp|article_id|   rev_id|             user_id|            username|minor|textdata|
+-------------------+----------+---------+--------------------+--------------------+-----+--------+
|2001-01-18 17:12:51|         6|   233188| ip:office.bomis.com| ip:office.bomis.com|false|    1516|
|2007-05-24 07:41:33|         6|133180191|             4477979|            Ngaiklin| true|       5|
|2001-01-20 07:01:12|         8|   233189|ip:pD950754B.dip....|ip:pD950754B.dip....| true|       9|
|2007-05-24 07:41:48|         8|133180238|             4477979|            Ngaiklin| true|       6|
|2001-01-20 18:12:21|        10|   233192|                  99|           RoseParks|false|       8|
|2007-05-24 07:41:58|        10|133180268|             4477979|            Ngaiklin| true|       6|
|2002-02-25 07:00:22|        12|    18201|ip:Conversion_script|ip:Conversion_script| true|    1214|


In [75]:
enwiki_df.select("timestamp", F.year("timestamp").alias("year"), F.month("timestamp")).limit(10000).orderBy("timestamp").show()

+-------------------+----+----------------+
|          timestamp|year|month(timestamp)|
+-------------------+----+----------------+
|2001-01-18 17:12:51|2001|               1|
|2001-01-20 07:01:12|2001|               1|
|2001-01-20 18:12:21|2001|               1|
|2001-10-11 13:18:47|2001|              10|
|2001-11-28 05:32:25|2001|              11|
|2001-12-02 07:08:12|2001|              12|
|2002-02-25 07:00:22|2002|               2|
|2002-02-25 07:43:11|2002|               2|
|2002-02-27 09:34:09|2002|               2|
|2002-02-27 09:36:41|2002|               2|
|2002-02-28 16:13:17|2002|               2|
|2002-04-02 01:51:25|2002|               4|
|2002-04-02 01:53:06|2002|               4|
|2002-04-02 01:54:12|2002|               4|
|2002-04-02 01:55:36|2002|               4|
|2002-04-02 23:36:30|2002|               4|
|2002-04-26 00:02:43|2002|               4|
|2002-05-01 07:18:43|2002|               5|
|2002-05-01 13:54:35|2002|               5|
|2002-05-01 14:24:53|2002|      


The original version ran in 25 minutes...
```
CPU times: user 67.2 ms, sys: 114 ms, total: 181 ms
Wall time: 25min 24s
```

In [82]:
%time enwiki_df.select("timestamp", "user_id", "article_id", "minor", "textdata").limit(10**5).describe().show()

Py4JJavaError: An error occurred while calling o1777.describe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 223 in stage 71.0 failed 1 times, most recent failure: Lost task 223.0 in stage 71.0 (TID 2826, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/amiyaguchi/.local/share/virtualenvs/Academics-HOerVHCU/lib/python2.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 253, in main
    process()
  File "/home/amiyaguchi/.local/share/virtualenvs/Academics-HOerVHCU/lib/python2.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 248, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/amiyaguchi/.local/share/virtualenvs/Academics-HOerVHCU/lib/python2.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 379, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/amiyaguchi/.local/share/virtualenvs/Academics-HOerVHCU/lib/python2.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "/home/amiyaguchi/Academics/wikipedia-retention/src/data/import_wikipedia.py", line 55, in process_edit
    values = process(edit)
  File "/home/amiyaguchi/Academics/wikipedia-retention/src/data/import_wikipedia.py", line 47, in process
    return value
NameError: global name 'value' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:330)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:470)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:453)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:284)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.sql.execution.stat.StatFunctions$.aggResult$lzycompute$1(StatFunctions.scala:273)
	at org.apache.spark.sql.execution.stat.StatFunctions$.org$apache$spark$sql$execution$stat$StatFunctions$$aggResult$1(StatFunctions.scala:273)
	at org.apache.spark.sql.execution.stat.StatFunctions$$anonfun$summary$2.apply$mcVI$sp(StatFunctions.scala:286)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
	at org.apache.spark.sql.execution.stat.StatFunctions$.summary(StatFunctions.scala:285)
	at org.apache.spark.sql.Dataset.summary(Dataset.scala:2478)
	at org.apache.spark.sql.Dataset.describe(Dataset.scala:2417)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/amiyaguchi/.local/share/virtualenvs/Academics-HOerVHCU/lib/python2.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 253, in main
    process()
  File "/home/amiyaguchi/.local/share/virtualenvs/Academics-HOerVHCU/lib/python2.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 248, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/amiyaguchi/.local/share/virtualenvs/Academics-HOerVHCU/lib/python2.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 379, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/amiyaguchi/.local/share/virtualenvs/Academics-HOerVHCU/lib/python2.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "/home/amiyaguchi/Academics/wikipedia-retention/src/data/import_wikipedia.py", line 55, in process_edit
    values = process(edit)
  File "/home/amiyaguchi/Academics/wikipedia-retention/src/data/import_wikipedia.py", line 47, in process
    return value
NameError: global name 'value' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:330)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:470)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:453)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:284)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
enwiki_df.limit(10**5).where('user_id is null').count()