In [1]:
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql.types import *
import pyspark.sql.functions as F
import nltk
from nltk.corpus import stopwords
import string
from nltk.stem.wordnet import WordNetLemmatizer
sqlContext = SQLContext(sc)

The OpenSubtitle dataset is stored in XML format. In order to process the dataset using spark, we will use the spark xml package from databricks (https://github.com/databricks/spark-xml). When using spark XML, you can either let spark-XML infer the schema of the XML file or you can specify it yourself. Letting spark-xml specify the schema for you is convenient however it is very expensive which we experienced ourselves. In order to speed up input pipeline, we will specify or own static xml schema below:

In [23]:
sourceSchema = StructType([\
                            StructField('genre', StringType()),
                            StructField('year', StringType()),
                          ])

subtitleSchema = StructType([\
                            StructField('duration', StringType())
                          ])

conversionSchema =  StructType([\
                            StructField('sentences', StringType()),
                            StructField('tokens', StringType())
                          ])

metaSchema = StructType([\
                            StructField('source', sourceSchema),
                            StructField('conversion', conversionSchema),
                            StructField('subtitle', subtitleSchema),
                          ])

wType = ArrayType(StructType([\
                       StructField('_VALUE', StringType())
                   ]))

sSchema = StructType([\
                      StructField('w', wType)
    
])

sentenceSchema = ArrayType(sSchema)

schema = StructType([
    StructField('_id', IntegerType()),
    StructField('meta', metaSchema)
])



In [24]:
df = sqlContext.read.format('com.databricks.spark.xml')\
    .options(rowTag='document', samplingRatio=0.0)\
    .load('[12][0-9][0-9][0-9]/**/*.xml.gz', schema=schema)
df.printSchema()
df.show()

root
 |-- _id: integer (nullable = true)
 |-- meta: struct (nullable = true)
 |    |-- source: struct (nullable = true)
 |    |    |-- genre: string (nullable = true)
 |    |    |-- year: string (nullable = true)
 |    |-- conversion: struct (nullable = true)
 |    |    |-- sentences: string (nullable = true)
 |    |    |-- tokens: string (nullable = true)
 |    |-- subtitle: struct (nullable = true)
 |    |    |-- duration: string (nullable = true)

+-------+--------------------+
|    _id|                meta|
+-------+--------------------+
|6950023|[[Action,Adventur...|
|3602733|[[Comedy,Drama,Ro...|
|3602678|[[Comedy,Drama,Ro...|
|3585549|[[Comedy,Drama,Ro...|
|3602669|[[Comedy,Drama,Ro...|
|3602753|[[Comedy,Drama,Ro...|
|3602659|[[Comedy,Drama,Ro...|
|6988060|[[Action,Adventur...|
|7008412|[[Action,Drama,Sc...|
|7070230|[[Crime,Drama, 20...|
|7070263|[[Crime,Drama, 20...|
|7070270|[[Crime,Drama, 20...|
|7070229|[[Crime,Drama, 20...|
+-------+--------------------+



In [25]:
"""
words_in_sentence = F.udf(lambda z: __builtins__.sum(list(map(lambda x: len(x), z))), IntegerType())

meta_data_df = df.withColumn("sentences", F.col('s.w'))
meta_data_df = meta_data_df.withColumn("number_of_sentences", F.size(F.col('sentences')))
meta_data_df = meta_data_df.withColumn("number_of_words", words_in_sentence(F.col('sentences')))
"""

'\nwords_in_sentence = F.udf(lambda z: __builtins__.sum(list(map(lambda x: len(x), z))), IntegerType())\n\nmeta_data_df = df.withColumn("sentences", F.col(\'s.w\'))\nmeta_data_df = meta_data_df.withColumn("number_of_sentences", F.size(F.col(\'sentences\')))\nmeta_data_df = meta_data_df.withColumn("number_of_words", words_in_sentence(F.col(\'sentences\')))\n'

Extract meta data about each movie:

In [27]:
movies = df.select(\
               F.col("_id"),\
               F.col("meta.source.genre").alias("genre"),\
               F.col("meta.source.year").alias("year"),\
               F.col("meta.conversion.sentences").alias("sentences"),\
               F.col("meta.conversion.tokens").alias("words"),\
               F.col("meta.subtitle.duration").alias("duration"))  
movies.show()

+-------+--------------------+----+---------+-----+------------+
|    _id|               genre|year|sentences|words|    duration|
+-------+--------------------+----+---------+-----+------------+
|3602669|Comedy,Drama,Romance|2020|      364| 3035|  00:00:2,00|
|6988060|Action,Adventure,...|2018|       21|  144|00:02:26,144|
|7070229|         Crime,Drama|2018|     1322| 7312|00:42:41,066|
+-------+--------------------+----+---------+-----+------------+



Extract the 10 most common words in each movies:

In [170]:
stop_words=set(stopwords.words('english'))
punctuation_list=list(string.punctuation)
lemmatizer = WordNetLemmatizer()

from nltk.stem import PorterStemmer
ps = PorterStemmer()

print("and" in stop_words)

True


In [180]:
"""
words_df = (
    df.select(F.explode(F.col('s.w')))
        .rdd
        .flatMap(lambda x: x['col'])
        .map(lambda x: x['_VALUE'].lower())
        .filter(lambda x: x not in punctuation_list and len(x) > 2)
        .map(lambda x: (lemmatizer.lemmatize(x,'v'), 1))
        .filter(lambda x: x[0] not in stop_words and len(x[0]) > 2)
        .reduceByKey(lambda x,y: x + y)
        .takeOrdered(20, key = lambda x: -x[1])
)
words_df
"""
lemmatize = F.udf(lambda x: lemmatizer.lemmatize(x, 'v'), StringType())

cc = df.select(F.explode(F.col('s.w')))
cc = cc.select(F.explode(F.col('col')['_VALUE']))
cc = cc.select(F.lower(F.col('col')).alias('word'))
cc = cc.filter((cc.word.isin(stop_words) == False) & (cc.word.isin(punctuation_list) == False))
cc = cc.groupby('word').count()
cc.show()

+-----------+-----+
|       word|count|
+-----------+-----+
|        ...|  226|
|      still|   49|
|     outfit|    3|
|     waters|    2|
|  connected|    1|
|  peacefume|    1|
|   randomly|    1|
|     spared|    1|
|       earl|    1|
|  recognize|    1|
|     online|    2|
|     harder|    3|
|       hope|   12|
|      inner|    1|
|    jewelry|    2|
|secondguess|    1|
|      trail|    2|
|      1970s|    3|
|  traveling|    1|
|      oscar|    5|
+-----------+-----+
only showing top 20 rows



In [28]:
df = sqlContext.read.parquet('movies_sample_v2.parquet')

In [34]:
df.count()

17702

In [38]:
df.groupby('year').count().sort(F.desc("year")).show(100)

+------+-----+
|  year|count|
+------+-----+
|  2020|    2|
|  2017| 1708|
|  2016| 2442|
|  2015| 1479|
|  2014|   75|
|  2013|  740|
|  2012|    4|
|  2011|  960|
|  2010|  670|
|  2009|    2|
|  2008| 1587|
|  2007| 1012|
|  2006| 1405|
|  2005|  305|
|  2004|   13|
|  2003|    8|
|  2000|  702|
|  1999|    4|
|  1997|    1|
|1996??|    7|
|  1996|  505|
|  1994|    1|
|  1993|  408|
|  1991|  289|
|  1990|  353|
|  1989|  275|
|  1988|  270|
|  1987|    1|
|  1984|    3|
|  1982|  169|
|  1978|    1|
|  1977|   71|
|  1975|  160|
|  1974|    2|
|  1970|  137|
|  1969|  158|
|  1968|  148|
|  1967|    6|
|  1966|  155|
|  1965|  127|
|  1964|  140|
|  1962|   72|
|  1961|   77|
|  1960|    1|
|  1959|   69|
|  1958|   93|
|  1956|   86|
|  1955|   67|
|  1954|    1|
|  1952|   50|
|  1948|   53|
|  1947|   35|
|  1946|   40|
|  1945|   38|
|  1944|   33|
|  1943|   35|
|  1940|   36|
|  1939|   43|
|  1938|   32|
|  1936|   39|
|  1935|   30|
|  1934|   24|
|  1929|    1|
|  1928|  