# A Movie behind a Script


In [1]:
import os
import re
import findspark

findspark.init()
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.utils import AnalysisException
from datetime import datetime
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
import urllib.request
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.10:0.4.1 pyspark-shell'

In [2]:
spark = SparkSession.builder.getOrCreate()
spark.conf.set('spark.sql.session.timeZone', 'UTC')
sc = spark.sparkContext
sqlContext = SQLContext(sc)

# Overview of datasets

The OpenSubtitles dataset is a compressed cluster of folders containing XML files. Each XML file is split into a script portion with the subtitles of the movie and a metadata portion with additional information about the movie or show. The name of one of the parent folders of the XML file is the corresponding IMDb identifier of the movie or show, thus allowing us to extract additional information from the IMDb dataset.

## IMDb Dataset

We have at our disposal the IMDb ratings and basics dataset. For the moment we have downloaded the files locally, but we would like to scrape the data.

In [3]:
# TODO scrape data https://datasets.imdbws.com/
ratings_fn = "title.ratings.tsv.gz"
basics_fn = "title.basics.tsv.gz"

In [4]:
df_ratings = spark.read.option("header", "true")\
                       .option("sep", "\t")\
                       .csv("imdb_data/" + ratings_fn)
df_ratings = df_ratings.selectExpr("tconst", 
                                   "cast(averageRating as float) averageRating", 
                                   "cast(numVotes as int) numVotes")
df_ratings.show()

+---------+-------------+--------+
|   tconst|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.8|    1440|
|tt0000002|          6.3|     172|
|tt0000003|          6.6|    1041|
|tt0000004|          6.4|     102|
|tt0000005|          6.2|    1735|
|tt0000006|          5.5|      91|
|tt0000007|          5.5|     579|
|tt0000008|          5.6|    1539|
|tt0000009|          5.6|      74|
|tt0000010|          6.9|    5127|
|tt0000011|          5.4|     214|
|tt0000012|          7.4|    8599|
|tt0000013|          5.7|    1318|
|tt0000014|          7.2|    3739|
|tt0000015|          6.2|     660|
|tt0000016|          5.9|     982|
|tt0000017|          4.8|     197|
|tt0000018|          5.5|     414|
|tt0000019|          6.6|      13|
|tt0000020|          5.1|     232|
+---------+-------------+--------+
only showing top 20 rows



'12'

In [5]:
# Function to split genres
udf_split = udf(str.split, ArrayType(StringType()))

df_basics = spark.read.option("header", "true")\
                      .option("sep", "\t")\
                      .csv("imdb_data/" + basics_fn)
df_basics = df_basics.withColumn("rttmp", df_basics.runtimeMinutes.cast(DoubleType()))\
                     .drop("runtimeMinutes")\
                     .withColumnRenamed("rttmp", "runtimeMinutes")\
                     .withColumn("gtmp", udf_split("genres"))\
                     .drop("genres")\
                     .withColumnRenamed("gtmp", "genres")
df_basics.show()

+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|tt0000001|    short|          Carmencita|          Carmencita|      0|     1894|     \N|           1.0| [Documentary,Short]|
|tt0000002|    short|Le clown et ses c...|Le clown et ses c...|      0|     1892|     \N|           5.0|   [Animation,Short]|
|tt0000003|    short|      Pauvre Pierrot|      Pauvre Pierrot|      0|     1892|     \N|           4.0|[Animation,Comedy...|
|tt0000004|    short|         Un bon bock|         Un bon bock|      0|     1892|     \N|          null|   [Animation,Short]|
|tt0000005|    short|    Blacksmith Scene|    Blacksmith Scene|      0|     1893|     \N|           1.0|      [Comedy,

In [26]:
df_choice = df_basics.filter(df_basics.titleType == "movie").join(df_ratings.filter(df_ratings.numVotes > 5000), ["tconst"])
df_choice.filter(df_choice.startYear == "2016").show()

Py4JJavaError: An error occurred while calling o172.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 17.0 failed 1 times, most recent failure: Lost task 0.0 in stage 17.0 (TID 17, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark\spark-2.3.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 238, in main
  File "C:\spark\spark-2.3.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 690, in read_int
    length = stream.read(4)
  File "C:\Users\Martin\Anaconda3\lib\socket.py", line 586, in readinto
    return self._sock.recv_into(b)
socket.timeout: timed out

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:330)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:66)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:284)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2489)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2703)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark\spark-2.3.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 238, in main
  File "C:\spark\spark-2.3.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 690, in read_int
    length = stream.read(4)
  File "C:\Users\Martin\Anaconda3\lib\socket.py", line 586, in readinto
    return self._sock.recv_into(b)
socket.timeout: timed out

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:330)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:66)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:284)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more


## OpenSubtitles dataset

The dataset consists of 31 GB of XML files distributed in the following file structure: 

```
├── opensubtitle
│   ├── OpenSubtitles2018
│   │   ├── Year
│   │   │   ├── Id
│   │   │   │   ├── #######.xml.gz
│   │   │   │   ├── #######.xml.gz
│   ├── en.tar.gz
│   ├── fr.tar.gz
│   ├── zh_cn.tar.gz
```
where
- `######` is a 6-digit unique identifier of the file on the OpenSubtitles dataset.
- `Year` is the year the movie or episode was made.
- `Id` is a 5 to 7 digit identifier (if it's 7-digit it's also an IMDb identifier).

The subtitles are provided in different languages. We only analyze the `OpenSubtitles2018` folder and it's the only folder we detail.

The decompressed XML files vary in size, ranging from 5KB to 9000KB sized files.

## XML Files

Each XML file is split into a `document` and `metadata` section.

### Subtitles

The `document` section contains all the subtitles and its general structure is the following:

```
├── s
│   ├── time: Integer
│   ├── w: String
```

An example snippet of an XML file:

```xml
  <s id="1">
    <time id="T1S" value="00:00:51,819" />
    <w id="1.1">Travis</w>
    <w id="1.2">.</w>
    <time id="T1E" value="00:00:53,352" />
  </s>
```

The subtitles in each XML file are stored by **blocks** denoted by `s` with a unique `id` attribute (integers in increasing order starting at 1).  

Each block (`<s id="1">` for instance) has a:  

1. Set of timestamps (denoted by `time`) with
 - A timestamp `id` attribute that can take two different formats: `T#S` or `T#E`, where _S_ indicates _start_, _E_ indicates _end_ and _#_ is an increasing integer. 
 - A `value` attribute which has the format `HH:mm:ss,fff`.

2. Set of words (denoted by `w`) with
 - an `id` attribute that is simply an increasing number of decimal numbers of the format `X.Y` where X is the string id and Y is the word id within the corresponding string
 - a non-empty `value` attribute that contains a token: a word or a punctuation character. 

It sometimes also has an `alternative`, `initial` and `emphasis` attribute.  

 - The `initial` attribute generally corresponds to slang words or mispronounced words because of an accent such as _lyin'_ instead of _lying_.  
 - The `alternative` attribute is another way of displaying the subtitle for example _HOW_ instead of _how_.
 - The `emphasis` attribute is a boolean.

### Metadata

The `metadata` section has the following structure:

```
├── Conversion
│   ├── corrected_words: Integer
│   ├── sentences: Integer
│   ├── tokens: Integer
│   ├── encoding: String (always utf-8)
│   ├── unknown_words: Integer
│   ├── ignored_blocks: Integer
│   ├── truecased_words: Integer
├── Subtitle
│   ├── language: String
│   ├── date: String
│   ├── duration: String
│   ├── cds: String (presented as #/# where # is an int)
│   ├── blocks: Integer
│   ├── confidence: Double
├── Source
│   ├── genre: String[] (up to 3 genres)
│   ├── year: Integer
│   ├── duration: Integer (in minutes)
│   ├── original: String
│   ├── country: String
```

We note that some XML files may not have all the entries. 
We can use the metadata to obtain additional information about the movie or show's subtitles and compute certain statistics. 

## Document dataframe

## Exploration

Going through the dataset we notice a few things:

1. The dataset has meaningless folders. For example, the folder 1858/ is empty.
2. Dataset contains XML files that are not related to movies or TV shows. For example, the folder 666/ contains Justin Bieber song subtitles.  
3. Trailer of films can be present in the dataset. For example, the folder 2018/ we found for example Black Panther teaser trailer subtitles.
4. Each movie might have more than 1 subtitle file.
5. Some subtitle files contain text that is not related to the movie, like credits to the person who made the subtitles.
6. The IDMDb folder name is not always a 7-digit number, meaning it is not always a valid IMDb identifer and we can't retrieve the IMDb info.
7. Each block may have an arbitrary number (including 0) of timestamps associated to it.

To solve points 1 and 2, we ignore all the folders which aren't inside the range of 1920-2018.

To solve point 3, we drop trailers by looking at the `duration` field in the metadata section.

To solve point 4, we simply take the first one.

To solve point 6, we keep movies that have a correct IMDb identifier. Hence, all the files in folders that don't have a 7-digit folder name are dropped.

To solve point 7, we decide not to associate a timestamp to each word for the moment.
 
For the moment, we take a sample of the dataset from the cluster (see python script `extract_sample_2.py`) by collecting 1 or 2 movies for each year in the range 1920-2018.

## Putting it all together

After doing an analysis of the files and considering the statistics we want to obtain taking the size of our data into account, we decide to load the metadata and subtitles directly into 1 dataframe where we manipulate it as before. We decide not to extract all tokens at first as it would induce into very heavy computations. We store the text in an array of subtitles where each subtitle is an array of tokens.

In [27]:
imdb_id = '6464116'
df_document_example = sqlContext.read.format('com.databricks.spark.xml')\
                                     .options(rowTag='document') \
                                     .load('sample_dataset/2017/6464116/6887453.xml.gz')
df_document_example.printSchema()
df_document_example.show()

root
 |-- _id: long (nullable = true)
 |-- meta: struct (nullable = true)
 |    |-- conversion: struct (nullable = true)
 |    |    |-- corrected_words: long (nullable = true)
 |    |    |-- encoding: string (nullable = true)
 |    |    |-- ignored_blocks: long (nullable = true)
 |    |    |-- sentences: long (nullable = true)
 |    |    |-- tokens: long (nullable = true)
 |    |    |-- truecased_words: long (nullable = true)
 |    |    |-- unknown_words: long (nullable = true)
 |    |-- source: struct (nullable = true)
 |    |    |-- duration: long (nullable = true)
 |    |    |-- genre: string (nullable = true)
 |    |    |-- year: long (nullable = true)
 |    |-- subtitle: struct (nullable = true)
 |    |    |-- blocks: long (nullable = true)
 |    |    |-- cds: string (nullable = true)
 |    |    |-- confidence: double (nullable = true)
 |    |    |-- date: string (nullable = true)
 |    |    |-- duration: string (nullable = true)
 |    |    |-- language: string (nullable = true)
 

To avoid confusion, we will set some naming conventions. We will refer to certain attributes as follows:

- The `s` array as **blocks**
- An element of blocks, as a **block**.
- The `w` array as **elements**
- An element of elements, as **element**.
- `_VALUE` as a **token**
- A **subtitle** is a list of tokens

### Dataframe manipulation

We define a function that retrieves the tokens from the elements (`w` array) and returns an array of subtitles, where each subtitle is a list of tokens.

In [28]:
def to_subtitles_array(sentences):
    """Function to map the elements (a struct containing tokens)
    to a list of list of tokens """
    s_list = []
    if sentences is None:
        return s_list
    for words in sentences:
        w_list = []
        if words and "w" in words and words["w"]:
            for w in words["w"]:
                if '_VALUE' in w and w['_VALUE']:
                    w_list.append(w['_VALUE'])
                
            s_list.append(w_list)

    return s_list

Here we define a couple of udf functions we will later use for the manipulation of our dataset

In [29]:
# Transform to spark function
udf_subtitles_array = udf(to_subtitles_array, ArrayType(ArrayType(StringType())))
# Convert array of words into a single string
udf_sentence = udf(lambda x: ' '.join(x), StringType())


In [30]:
#checks if correct schema
def has_correct_schema(df):
    arguments = [
                 "meta.conversion.sentences",
                 "meta.source.year", 
                 "meta.subtitle.blocks",
                 "meta.subtitle.duration",
                 "meta.subtitle.language",
                 "s"]
    for col in arguments:
        try:
            df[col]
        except AnalysisException:
            return False
    return True

schema_films = StructType([StructField('tconst', StringType(), False),
                               StructField('num_subtitles', LongType(), True),
                               StructField('year', LongType(), True),
                               StructField('blocks', LongType(), True),
                               StructField('subtitle_mins', DoubleType(), True),
                               StructField('subtitles', ArrayType(ArrayType(StringType())), True)])

The function below structures our data to the format we want to then process all the queries we need: We link the movie with the proper imdbID, we get all the subtitles, change the subtitle duration to be in seconds (We assume for this that they all have the same format and after exploring the dataset we know the vast majority does).

In [31]:
has_correct_schema(df_document_example)

True

In [32]:
def clean_df(df_document, imdb_id):
    """Restructures and selects the columns of a dataframe of an XML
    file with its corresponding IMDB Id"""
    # Create IMDb ID and subtitles column
    df_film_sentences = df_document.withColumn("tconst", lit("tt" + imdb_id))\
                                   .withColumn("subtitles", udf_subtitles_array("s"))
    
    # Select metadata and previously created columns
    df_result = df_film_sentences.selectExpr("tconst",
                                             "meta.conversion.sentences as num_subtitles",
                                             "meta.source.year", 
                                             "meta.subtitle.blocks",
                                             "meta.subtitle.duration as subtitle_duration",
                                             "meta.subtitle.language",
                                             "subtitles")
    # Split genre column and convert subtitle duration to seconds
    df_result = df_result.withColumn("subtitle_mins", 
                                     unix_timestamp(df_result.subtitle_duration, "HH:mm:ss,SSS") / 60)
    # Discard redundant columns
    return df_result.select("tconst", "num_subtitles", "year", "blocks", "subtitle_mins", "subtitles")

Applying it on the example XML file, we see how our corresponding schema is much more simple.

In [33]:
df_document_example = clean_df(df_document_example, imdb_id)
df_document_example.printSchema()
df_document_example.show()

root
 |-- tconst: string (nullable = false)
 |-- num_subtitles: long (nullable = true)
 |-- year: long (nullable = true)
 |-- blocks: long (nullable = true)
 |-- subtitle_mins: double (nullable = true)
 |-- subtitles: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

+---------+-------------+----+------+-------------+--------------------+
|   tconst|num_subtitles|year|blocks|subtitle_mins|           subtitles|
+---------+-------------+----+------+-------------+--------------------+
|tt6464116|          967|2017|   888|         40.7|[[[, shots, firin...|
+---------+-------------+----+------+-------------+--------------------+



We generalize what we have done so far. We would like to create a dataframe for several XML files, so we define a function that does this.

In [35]:
def load_df(path):
    """Load an XML subtitles file into a dataframe"""
    df_film = sqlContext.read.format('com.databricks.spark.xml')\
                             .options(rowTag='document')\
                             .load(path)
    return df_film

The code below creates a dataframe for all the XML files in the sample dataset. We will later expand it to cover a bigger quantity of films. We call it `df_films` because it contains all the information for each film/show in our datasets.

In [45]:
path = "sample_dataset/"
# Create empty dataframe with same schema


df_films = spark.createDataFrame([], schema_films)
film_list = []
for year in os.listdir(path):
    if not year.startswith('.'):
        for imdb_id in os.listdir(path + year):
            if not imdb_id.startswith('.'):
                current_path = path + year + "/" + imdb_id
                for idx, file in enumerate(os.listdir(current_path)):
                        # Create a dataframe for each file
                        df_document = load_df(current_path + '/' + file)
                        if has_correct_schema(df_document):
                            # Restructure dataframe and add it to df_films
                            film_list.append(clean_df(df_document, imdb_id))
                        

#             df_m.show()
#             print(current_path + "/" + file)

We now have a proper dataframe which will help us generate useful statistics. This is the resulting format

In [50]:
def unionAll(*dfs):
    first, rest = dfs[0], dfs[1:]  # Python 3.x, for 2.x you'll have to unpack manually
    return first.sql_ctx.createDataFrame(
        first.sql_ctx._sc.union([df.rdd for df in dfs]),
        first.schema
    )
df_films = unionAll(*film_list)

In [None]:
df_films.show()

We join our dataframe with the IMDb dataframe

In [None]:
df_films.join(df_ratings, ["tconst"]).show()

## Data analysis

Here we discuss how we want to proceed with our data.

We consider only films with more that 10000 reviews as IMDb considers this a good metric to estimate the public actual approval of a film.

We first start by analyzing the 1000 best films according to IMDB with more that 20000 reviews. We look into the resulting dataframe columns and  try to look for some kind of pattern. We look at the ratio of subtitle time per runtime, the most used words in the films, the ratio of distinct words per total number of words, average sentence length, number of sentences. We draw conclusions if we see any apparent relation. Afterwards if we can find a relation between rating and one of the resulting values, we do the same tests on the worst imdb films with more than 20000 reviews. If we find that a relation holds for both groups we make a statement assuming there exists a relationship

We know this is very vague and we cannot make a strong statement based on what weve found, if we have found anything because we haven't taken into consideration many variables such as the movie genre, the year of release, if it is an adult film or not. Also the country of origin would be a variable to take into account but we dont have it yet. We can also check the box office for the best films if we consider.

To proceed then, we ask what are the most popular genres and we look into how the statistics for the 10 most popular ones behave. Can we find any relationship between a genre and the statistics. We look aswell for relationships between genre and rating. Is the genre of a movie a variable which influences average rating. 

It is all interconnected, we ask ourselves for example, given an action movie, are the value of the statistics influential to its average rating. 

We havent taken TIME into account either! We look into different time periods, particular years where something occured (WW2 end or smth like that) and we try using the same statistics, to find relationships. We need to choose how to address the other variables such as genre and if it is an adult film or not.

After analyzing this data if we find that there are actually dependencies between the values, we decide to create a small learning algorithm to predict the rating of a film given its script.

In [None]:
df_filtered = df_ratings.filter(df_ratings.numVotes > 20000)
df_50_best = df_films.join(df_filtered, ["tconst"])\
                     .orderBy(df_ratings.averageRating.desc())\
                     .select("num_subtitles", 
                             "averageRating", 
                             "numVotes")\
                     .take(50)

In [None]:
#cest de la merde pour linstant vue la taille du dataset
df_pd_example = pd.DataFrame(df_50_best)

In [None]:
df_pd_example
