In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, explode, count, first, substring_index
from pyspark.sql.types import StructType, StructField, StringType, ArrayType

In [2]:
from os import environ, path
environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.10:0.4.1 pyspark-shell' 

In [3]:
spark = SparkSession \
    .builder \
    .appName("Python Spark wordcount") \
    .getOrCreate()

In [4]:
spark.sparkContext.uiWebUrl

'http://192.168.178.66:4040'

In [5]:
take_first = udf(lambda l: l[0]) # take first from list

In [6]:
schema = StructType([
    StructField('w', ArrayType(StringType()))
])

In [7]:
# read xml files as df
df_xml = spark.read \
  .format("com.databricks.spark.xml") \
  .option("rowTag", "s") \
  .option("rootTag", "document") \
  .load("../subtitles/af/*/*/*/*/*/*.xml.gz", schema=schema) # spark does not support recursive load

In [12]:
df_wc = df_xml \
    .withColumn("word", explode(col("w"))) \
    .drop("w") \
    .groupBy("word").agg(count(col("word"))) \
    .sort(col("count(word)").desc())

In [13]:
df_wc.show()

+----+-----------+
|word|count(word)|
+----+-----------+
|   .|      13115|
|   ,|       9955|
| die|       5915|
| nie|       4973|
|   ?|       4758|
|  is|       4630|
|   '|       3826|
| het|       3769|
|   !|       3582|
|   n|       3420|
|  Ek|       3011|
|  jy|       2987|
|  ek|       2940|
|   -|       2607|
|  my|       2297|
| dit|       2291|
| van|       1980|
|  en|       1879|
| jou|       1767|
|  in|       1604|
+----+-----------+
only showing top 20 rows



In [14]:
spark.sparkContext.stop()