# Content features

## POMS

In [7]:
import requests
from pyspark.sql.functions import udf
import pyspark.sql.functions as sf
from pyspark.sql.window import Window
from pyspark.sql.types import *

### Preparations

In [2]:
poms = spark.read.parquet("gs://mit-processed-events-prod.npo-data.nl/poms-enriched/")
print("Total poms: ", poms.count())

('Total poms: ', 1468384)
root
 |-- age_rating: string (nullable = true)
 |-- av_type: string (nullable = true)
 |-- available_subtitles: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- language: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- broadcasters: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |-- countries: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- code: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |-- creation_date: string (nullable = true)
 |-- credits: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- family_name: string (nullable = true)
 |    |    |-- given_name: string (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- descendant_of: array (nullable = true)
 |   

In [3]:
# Gain broadcasts that are able to be streamed on NPO Start
poms = (
    poms
    .filter(sf.col("type")=="BROADCAST")
    .filter(sf.size("locations") > 0)
    .select('broadcaster', 'credits', 'descriptions', 'genres', 'mid', 'locations', 'seriesRef', 'titles')
    .withColumn('locations', sf.explode('locations'))
    .withColumn('program_url', sf.col('locations.program_url'))
    .withColumn('platform', sf.col('locations.platform'))
    .withColumn('publish_start', sf.col('locations.publish_start'))
    .withColumn('publish_stop', sf.col('locations.publish_stop'))
    .filter(sf.col('program_url').startswith('npo+drm://') | sf.col('program_url').startswith('npo://'))
    .filter((sf.col('platform') == 'INTERNETVOD') | (sf.col('platform') == 'PLUSVOD'))
    .filter((sf.col('publish_start') != '0') | (sf.col('publish_stop') != '0'))
    .filter(~sf.col('seriesRef').isNull())
)
print("Filtered poms: ", poms.count())
poms.printSchema()

('Filtered poms: ', 105105)
root
 |-- broadcaster: string (nullable = true)
 |-- credits: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- family_name: string (nullable = true)
 |    |    |-- given_name: string (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- descriptions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- owner: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- terms: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- mid: string (nullable = true)
 |-- locations: struct (nullable = true)
 |    |-- av_file_format: string (nullable = true)
 |    |-- creation_date: string (nullable = true)
 |    |-- last_modified_date: string (nullable

In [5]:
# select the content features and ids
poms = poms.select('broadcaster', 'credits', 'descriptions', 'genres', 'mid', 'seriesRef', 'titles')

+-----------+--------------------+--------------------+--------------------+-----------+------------------+--------------------+
|broadcaster|             credits|        descriptions|              genres|        mid|         seriesRef|              titles|
+-----------+--------------------+--------------------+--------------------+-----------+------------------+--------------------+
|        NOS|[[Meulens, Milous...|[[NEBO, MAIN, De ...|[[3.0.1.1, [Jeugd...| 9Jeugd1230| POMS_S_NOS_105306|[[NEBO, MAIN, NOS...|
|        RVU|                  []|[[NEBO, MAIN, Sim...|[[3.0.1.7, [Infor...| RVU_102368|        KN_1678993|[[NEBO, MAIN, Keu...|
|        RVU|                  []|[[BROADCASTER, SH...|                  []| RVU_102939|        KN_1678993|[[NEBO, MAIN, Keu...|
|        RVU|                  []|[[BROADCASTER, SH...|                  []| RVU_102944|        KN_1678993|[[NEBO, MAIN, Keu...|
|        RVU|                  []|[[BROADCASTER, SH...|                  []| RVU_103514|        K

### Preparation of textual features

In [10]:
# Gain the right title and description (as displayed on NPO Start)
w = Window.partitionBy('mid').orderBy(sf.length("description.type"))

poms = (poms
        .withColumn('titles', sf.explode('titles'))
        .withColumn('title_type', sf.col('titles.type'))
        .withColumn('title', sf.col('titles.value'))
        .filter(sf.col('title_type')=='MAIN')
        .withColumn('description', sf.explode("descriptions"))
        .filter(sf.col('description.type').isin(["MAIN", "SHORT", "KICKER"]))
        .withColumn('description', sf.first('description.value').over(w))
)

poms = poms.select('broadcaster', 'credits', 'description', 'genres', 'mid', 'seriesRef', 'title')

+-----------+-------+--------------------+--------------------+--------------------+---------------+--------------------+--------------------+----------+--------------------+
|broadcaster|credits|        descriptions|              genres|                 mid|      seriesRef|              titles|         description|title_type|               title|
+-----------+-------+--------------------+--------------------+--------------------+---------------+--------------------+--------------------+----------+--------------------+
|        NOS|     []|[[MIS, MAIN, Live...|[[3.0.1.7.21, [In...|12Jnl1022AmVerkDebat|NOSAmerikaKiest|[NEBO, MAIN, NOS ...|Live uitzending v...|      MAIN|NOS Amerikaans Ve...|
|        NOS|     []|[[MIS, MAIN, Live...|[[3.0.1.7.21, [In...|12Jnl1022AmVerkDebat|NOSAmerikaKiest|[NEBO, MAIN, NOS ...|Live uitzending v...|      MAIN|NOS Amerikaans Ve...|
|        NOS|     []|[[MIS, MAIN, Live...|[[3.0.1.7.21, [In...|12Jnl1022AmVerkDebat|NOSAmerikaKiest|[MIS, MAIN, NOS A...|Live

In [None]:
# subtitle extraction using the poms api
mid = poms.select('mid').distinct()

def spark_udf(data_type):
    def create_udf(f):
        return udf(f, data_type)
    return create_udf
@spark_udf(StringType())
def spark_sub(x):
    sub = requests.get("https://rs.poms.omroep.nl/v1/api/subtitles/" + x + "/nl_NL/CAPTION.vtt").text.encode('ascii','ignore')
    sub = sub.lower().split('\n\n') # lower and split
    sub = sub[1:] # remove first entry of subtitles 'webvtt'
    sub = [line.split('\n', 2)[-1].replace('\n', ' ') for line in sub] # remove display time info and '\n' in subtitle text
    sub = u" ".join(sub)
    return sub

poms = poms.withColumn('sub', spark_sub('mid'))

In [None]:
poms.write.parquet("gs://dataproc-jupyter-eileen.npo-data.nl/data/poms_stream/")

## Feature encoding
### Categorical features
#### Preparations

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import pyspark.sql.functions as sf
from pyspark.sql.types import *

plt.style.use('bmh')
pd.options.display.max_columns = 500

In [None]:
poms = spark.read.parquet("gs://dataproc-jupyter-eileen.npo-data.nl/data/poms_stream/")
print(poms.count())
print(poms.printSchema())
df = poms.toPandas()

#### Broadcaster

In [None]:
feature = 'broadcaster'
sideinfo = (
    poms
    .select(sf.col('seriesRef').alias('mid'), sf.col(feature).alias('value'))
    .withColumn('feature', sf.lit(feature))
    .dropDuplicates()
)
sideinfo.show()
sideinfo.write.parquet("gs://dataproc-jupyter-eileen.npo-data.nl/data/content_features/" + feature)

#### Credits

In [None]:
feature = 'credits'

sideinfo = (
    poms
    .select(sf.col('seriesRef').alias('mid'), sf.col(feature).alias('value'))
    .withColumn('value', sf.explode('value'))
    .withColumn('feature', sf.lit(feature))
    .withColumn('value', sf.concat(sf.col('value.family_name'),sf.lit(', '), sf.col('value.given_name')))
    .dropDuplicates()
)
sideinfo.show()
sideinfo.write.parquet("gs://dataproc-jupyter-eileen.npo-data.nl/data/content_features/" + feature)

#### Genres

In [None]:
feature = 'genres'

sideinfo = (
    poms
    .select(sf.col('seriesRef').alias('mid'), sf.col(feature).alias('value'))
    .withColumn('value', sf.explode('value'))
    .withColumn('feature', sf.lit(feature))
    .withColumn('value', sf.col('value.terms').cast(StringType()))
    .dropDuplicates()
)
sideinfo.printSchema()
sideinfo.show()
sideinfo.write.parquet("gs://dataproc-jupyter-eileen.npo-data.nl/data/content_features/" + feature)

### Textual features
#### Preparations

In [None]:
import pandas as pd
import numpy as np
import pyspark.sql.functions as sf
pd.options.display.max_columns = 500

import re as re
from pyspark.ml.feature import CountVectorizer, IDF, StopWordsRemover, NGram, RegexTokenizer

import spacy
from spacy.lang.nl import Dutch
from spacy.lang.en import English
stopwords = spacy.lang.nl.stop_words.STOP_WORDS.union(spacy.lang.en.stop_words.STOP_WORDS)

from pyspark.mllib.linalg import Vector, Vectors
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.ml import Pipeline

In [None]:
poms = spark.read.parquet("gs://dataproc-jupyter-eileen.npo-data.nl/data/poms_stream/")
print(poms.count())
poms.printSchema()

#### Perform TF-IDF per feature
| Feature     | TF-IDF amount |
|-------------|---------------|
| title       | 3             |
| description | 10            |
| subtitles   | 20            |

In [None]:
n = 3 # 10, 20
text = 'title' #'description', 'sub'
df = poms.select(sf.col('seriesRef').alias('mid'), sf.col(text)) 
df = df.dropna(subset=text)
df = df.groupBy("mid").agg(sf.collect_set(text)).withColumn("text", sf.concat_ws(" ", "collect_set("+text+")"))

In [None]:
# ML pipeline, consisting of four stages: tokenizer, stopwordremover, countvectorizer, idf
tokenizer = RegexTokenizer(inputCol="text", outputCol="words", minTokenLength = 4, pattern="\\W")
stopwordsList = [s.encode('utf-8') for s in stopwords]
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="filtered" ,stopWords=stopwordsList)
cv = CountVectorizer(inputCol=remover.getOutputCol(), outputCol="rawFeatures")
pipeline = Pipeline(stages=[tokenizer, remover, cv])
model = pipeline.fit(df)

df = model.transform(df)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(df)
df = idfModel.transform(df)

In [None]:
# get top n tf-idf
def spark_udf(data_type):
    def create_udf(f):
        return udf(f, data_type)
    return create_udf
@spark_udf(ArrayType(IntegerType()))
def spark_argmaxes(vector):
    if len(vector.values) > 0:
        if len(vector.values) < n:
            return np.argpartition(vector.values, -len(vector.values)).tolist()
        if len(vector.values) >= n:
            return np.argpartition(vector.values, -n)[-n:].tolist()
    return None
df = df.withColumn('argmaxes', spark_argmaxes('features'))

In [None]:
# extract top-n words
word_df = df.withColumn('argmaxes', sf.explode('argmaxes')).select('mid', 'features', 'argmaxes')

vocabulary = [x.encode('UTF8') for x in model.stages[2].vocabulary]

@spark_udf(StringType())
def spark_vocab(vector, x):
    if len(vector.values) > 0:
        return vocabulary[vector.indices[x]]
    return None
word_df = word_df.withColumn('value', spark_vocab('features', 'argmaxes'))
text_tfidf = word_df.select('mid', 'value').withColumn('feature', sf.lit(text))
text_tfidf.show()

In [None]:
text_tfidf.write.parquet("gs://dataproc-jupyter-eileen.npo-data.nl/data/content_features/" + text + "_tfidf")