In [1]:
from src.ingestion.database.reader import IngestionReaderInterface
from src.ingestion.database.reader_psql import ConfigurePostgresSparkSession
from src.ingestion.database.reader_psql import PostgresIngestionReader
from pyspark.sql import SparkSession,Window
from pyspark.sql.functions import from_json, col, schema_of_json, get_json_object
from pyspark.sql import functions as F
from pyspark.sql import types as T
from src.ingestion.database.reader import ReadContents
from src.ingestion.database.reader import ReadUsers
from src.ingestion.database.reader import ReadRatingFeedbacks
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.linalg import SparseVector, DenseVector
import json
import numpy as np

In [4]:
spark_builder = SparkSession.builder.appName("Jupyter")
spark_builder = ConfigurePostgresSparkSession(spark_builder)
spark = spark_builder.getOrCreate()
reader = PostgresIngestionReader(db_host="127.0.0.1", db_user="minfei", db_password="FM199601060046gg", spark=spark)
content = ReadContents(reader)
content.cache()
user = ReadUsers(reader)
rating = ReadRatingFeedbacks(reader)


In [5]:
# %load /Users/minfei/Documents/SFU/CMPT732/movie-lens-recommender/src/feature_processor/features_core/content_features.py
from pyspark.sql import DataFrame

from pyspark.sql.functions import array_contains, col, explode
from src.ingestion.database.reader import IngestionReaderInterface
from src.ingestion.database.reader_psql import ConfigurePostgresSparkSession
from src.ingestion.database.reader_psql import PostgresIngestionReader
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, get_json_object, expr, avg, count,broadcast
from pyspark.sql import functions as F
from pyspark.sql import types as T
from src.ingestion.database.reader import ReadContents
from src.ingestion.database.reader import ReadUsers
from src.ingestion.database.reader import ReadRatingFeedbacks
from pyspark.sql.functions import array_contains, col, explode,  mean, stddev, substring,split,stddev_pop, avg, broadcast,regexp_replace
from pyspark.ml.feature import StringIndexer, VectorAssembler,StandardScaler
from pyspark.ml.linalg import SparseVector, DenseVector
def VectorizeGenres(content_genres: DataFrame) -> DataFrame:
    """Encodes a list of genre strings into a multi-hot vector (a list of
    floats).

    Example input:
    ---------------------------
    | id | genres             |
    ---------------------------
    |  1 | ["Action", "IMAX"] |
    ---------------------------

    Example output:
    ---------------------------
    | id | genres             |
    ---------------------------
    |  1 | [0,1,...,0,1,0]    |
    ---------------------------

    Args:
        content_genres (DataFrame): See the example input above.

    Returns:
        DataFrame: See the example output above.
    """
    df1 = content_genres.select('id','genres')
    # df1.first()['genres'] ##2 7 8
    genres_list = [x[0] for x in df1.select(explode("genres").alias("genres")).distinct().orderBy("genres").collect()]
    df_sep = df1.select("*" ,*[
        array_contains("genres", g).alias("g_{}".format(g)).cast("integer")
        for g in genres_list]
    ).drop('genres')
    selected_columns = [column for column in df_sep.columns if column.startswith("g_")] 
    assembler = VectorAssembler(inputCols=selected_columns, outputCol='sparse_genres')
    df_sep = assembler.transform(df_sep).select('id','sparse_genres')
    def sparse_to_array(v):
        v = DenseVector(v)
        new_array = list([float(x) for x in v])
        return new_array
    sparse_to_array_udf = F.udf(sparse_to_array, T.ArrayType(T.FloatType()))
    res = df_sep.withColumn('genres', sparse_to_array_udf('sparse_genres')).select('id','genres')
    return res

def GetSpokenLanguages(content: DataFrame) -> DataFrame:
    """Extract list of languages strings from the column 'tmdb_primary_info', which is json object, 
        under 'spoken_languages' key

     Example input:
    -------------------------------
    | id | tmdb_primary_info      |
    -------------------------------
    |  1 | '{"id": 399168, "adult": false, "title": "The Mathematician and the Devil", "video":
             false, "budget": 0, "genres": [{"id": 35, "name": "Comedy"}, {"id": 18, "name": "Drama"}, 
             {"id": 14, "name": "Fantasy"}], "status": "Released", "imdb_id": "tt3154916", "revenue": 0, "runtime": 21, "tagline": "", "homepage": "", 
             "overview": "A mathematician offers to sell his soul to the devil for a proof or disproof of Fermat\'s Last Theorem. Based on \\"The Devil and Simon Flagg\\" by Arthur Porges.", "popularity": 0.6, 
             "vote_count": 6, "poster_path": "/5JCaWtCySRPy2JbHwgUAmYJBM8b.jpg", "release_date": "1972-06-06", "vote_average": 8.3,
             "backdrop_path": null, "original_title": "Математик и чёрт", 
             "spoken_languages": [{"name": "Pусский", "iso_639_1": "ru", "english_name": "Russian"}], 
             "original_language": "ru", "production_companies": [{"id": 88367, "name": "Centrnauchfilm", 
             "logo_path": "/8BGGqyuaxijzhqzmrgdCINWbPhj.png", "origin_country": "SU"}],
            "production_countries": [{"name": "Soviet Union", "iso_3166_1": "SU"}], "belongs_to_collection": null}') |
    -------------------------------

        Example output:
    -------------------------------
    | id | languages              |
    -------------------------------
    |  1 | ["English", "Spanish"] |
    -------------------------------
    """
    tmdb = content.select(["id","tmdb_primary_info"])
    lan = tmdb.withColumn('languages',get_json_object('tmdb_primary_info', '$.spoken_languages'))
    newdf= lan.withColumn('spoken_languages_all', F.from_json(
            F.col('languages'),
            T.ArrayType(T.StructType([
                T.StructField('name', T.StringType()),
                T.StructField('iso_639_1', T.StringType()),
                T.StructField('english_name', T.StringType()),
            ]))
        ))
    newdf = newdf.select('id','spoken_languages_all')
    spoken_languages = newdf.withColumn('languages' , expr("transform(spoken_languages_all, x -> x['english_name'])"))
    return spoken_languages.select('id','languages')

def VectorizeLanguages(spoken_languages: DataFrame) -> DataFrame:
    """Encodes a list of language strings into a multi-hot vector (a list of
    floats).

    Example input: (take input from GetSpokenLanguages)
    -------------------------------
    | id | languages              |
    -------------------------------
    |  1 | ["English", "Spanish"] |
    -------------------------------

    Example output:
    ---------------------------
    | id | languages          |
    ---------------------------
    |  1 | [0,1,...,0,1,0]    |
    ---------------------------

    Args:
        content_languages (DataFrame): See the example input above.

    Returns:
        DataFrame: See the example output above.
    """
    df1 = spoken_languages.select('id','languages')
    languages_list = [x[0] for x in df1.select(explode("languages").alias("languages")).distinct().orderBy("languages").collect()]
    df_sep = df1.select("*" ,*[
        array_contains("languages", lan).alias("l_{}".format(lan)).cast("integer")
        for lan in languages_list]
    ).drop('languages')
    selected_columns = [column for column in df_sep.columns if column.startswith("l_")] 
    assembler = VectorAssembler(inputCols=selected_columns, outputCol='sparse_languages',handleInvalid="skip")
    df_sep1 = assembler.transform(df_sep).select('id','sparse_languages').na.fill(value = 0, subset=["sparse_languages"])
    def sparse_to_array(v):
        v = DenseVector(v)
        new_array = list([float(x) for x in v])
        return new_array
    sparse_to_array_udf = F.udf(sparse_to_array, T.ArrayType(T.FloatType()))
    res = df_sep1.withColumn('languages', sparse_to_array_udf('sparse_languages')).select('id','languages')
    return res
def ComputeNormalizedAverageRating(
        user_rating_feebacks: DataFrame) -> DataFrame:
    """Computes the average rating each piece of content receives. Then it
    applies the following transformation to the average ratings:
        normalized_avg_ratings =
            (avg_ratings[content_id] - mean(avg_ratings))/std(avg_ratings)

    Example input:
    ------------------------
    | content_id | rating  |
    ------------------------
    |  1         |  3      |
    ------------------------
    |  1         |  5      |
    ------------------------
    |  2         |  3      |
    ------------------------
    |  3         |  2      |
    ------------------------
    |  3         |  2      |
    ------------------------

    Average ratings (intermediate result):
    ---------------------------
    | content_id | avg_rating |
    ---------------------------
    |  1         |  4         |
    ---------------------------
    |  2         |  3         |
    ---------------------------
    |  3         |  2         |
    ---------------------------
    mean = 3, std = sqrt(2/3)

    Example output:
    -------------------
    | id | avg_rating |
    -------------------
    |  1 |  1.2247    |
    -------------------
    |  2 |  0         |
    -------------------
    |  3 | -1.2247    |
    -------------------

    Args:
        user_rating_feebacks (DataFrame): See the example input above.

    Returns:
        DataFrame: See the example output above.
    """
    
    content_rating = user_rating_feebacks.select('content_id', 'rating')
    content_rating = content_rating.groupBy('content_id').agg(avg("rating").alias("avg_rating_unscaled"))
    summary = content_rating.select([mean('avg_rating_unscaled').alias('mu'), stddev('avg_rating_unscaled').alias('sigma')]).collect().pop()
    dft = content_rating.withColumn('avg_rating', (content_rating['avg_rating_unscaled']-summary.mu)/summary.sigma).select(col("content_id").alias("id"), 'avg_rating')
    return dft
def ComputeNormalizedRatingCount(
        user_rating_feebacks: DataFrame) -> DataFrame:
    """Computes the number of ratings each piece of content receives. Then it
    applies the following transformation to the counts:
        normalized_count =
            (rating_count[content_id] - mean(rating_counts))/std(rating_counts)

    Example input:
    ------------------------
    | content_id | rating  |
    ------------------------
    |  1         |  3      |
    ------------------------
    |  1         |  5      |
    ------------------------
    |  2         |  3      |
    ------------------------
    |  3         |  2      |
    ------------------------
    |  3         |  2      |
    ------------------------
    |  3         |  1      |
    ------------------------

    Rating count (intermediate result):
    -----------------------------
    | content_id | rating_count |
    -----------------------------
    |  1         |  2           |
    -----------------------------
    |  2         |  1           |
    -----------------------------
    |  3         |  3           |
    -----------------------------
    mean = 2, std = sqrt(2/3)

    Example output:
    ---------------------
    | id | rating_count |
    ---------------------
    |  1 |  0           |
    ---------------------
    |  2 | -1.2247      |
    ---------------------
    |  3 |  1.2247      |
    ---------------------

    Args:
        user_rating_feebacks (DataFrame): See the example input above.

    Returns:
        DataFrame: See the example output above.
    """
    
    content_rating = user_rating_feebacks.select('content_id', 'rating')
    content_rating = content_rating.groupBy('content_id').agg(count("*").alias("count_unscaled"))
    summary = content_rating.select([mean('count_unscaled').alias('mu'), stddev('count_unscaled').alias('sigma')]).collect().pop()
    rating_count_scaled = content_rating.withColumn('rating_count', (content_rating['count_unscaled']-summary.mu)/summary.sigma).select(col('content_id').alias('id'), 'rating_count')
    return rating_count_scaled

def GetBuget(content: DataFrame) -> DataFrame:
    """
    Extract budget from tmdb json column
    """
    content_budget = content.withColumn('budget_unscaled',
                   get_json_object('tmdb_primary_info', '$.budget')).select('id','budget_unscaled')
    
    return content_budget #some values are null
def NormalizeBudget(content_budget: DataFrame) -> DataFrame:
    """Transforms all budgets, so they distribute in a unit normal.

    Example input:
    -------------------
    | id | budget_unscaled |
    -------------------
    |  1 |  1,000,000 |
    -------------------
    |  3 |  3,000,000 |
    -------------------
    mean = 2,000,000, std = 1,000,000

    Example output:
    ---------------
    | id | budget |
    ---------------
    |  1 |  -1    |
    ---------------
    |  3 |   1    |
    ---------------

    Args:
        content_budget (DataFrame): See the example input above.

    Returns:
        DataFrame: See the example output above.
    """
    budget_non0 = content_budget.filter(content_budget['budget_unscaled'] > 0) #null/0 budgets are left unprocessed
    summary = budget_non0.select([mean('budget_unscaled').alias('mu'), stddev('budget_unscaled').alias('sigma')]).collect().pop()
    budget_scaled = budget_non0.withColumn('budget', (budget_non0['budget_unscaled']-summary.mu)/summary.sigma).select('id', 'budget')
    res = content_budget.join(budget_scaled, ['id'], 'leftouter').select('id','budget')
    return res
def GetRuntime(content: DataFrame) -> DataFrame:
    """
    Extract runtime from tmdb json column
    """
    content_runtime = content.withColumn('runtime_unscaled',get_json_object('tmdb_primary_info', '$.runtime')).select('id','runtime_unscaled')
    return content_runtime
def NormalizeRuntime(content_runtime: DataFrame) -> DataFrame:
    """Transforms all runtimes, so they distribute in a unit normal.

    Example input:
    -------------------
    | id | runtime_unscaled    |
    -------------------
    |  1 |  115       |
    -------------------
    |  3 |  75        |
    -------------------
    mean = 95, std = 1,000,000

    Example output:
    ----------------
    | id | runtime |
    ----------------
    |  1 |   1     |
    ----------------
    |  3 |  -1     |
    ----------------

    Args:
        content_runtime (DataFrame): See the example input above.

    Returns:
        DataFrame: See the example output above.
    """
    runtime_non0 = content_runtime.filter(content_runtime['runtime_unscaled'] > 0) #null/0 runtime are left unprocessed
    summary = runtime_non0.select([mean('runtime_unscaled').alias('mu'), stddev('runtime_unscaled').alias('sigma')]).collect().pop()
    runtime_scaled = runtime_non0.withColumn('runtime', (runtime_non0['runtime_unscaled']-summary.mu)/summary.sigma).select('id', 'runtime')
    res = content_runtime.join(runtime_scaled, ['id'], 'leftouter').select('id','runtime')
    return res

def GetReleaseYear(content: DataFrame) -> DataFrame:
    """
    Extract release year from tmdb json column
    """
    tmdb = content.select(["id","tmdb_primary_info"])
    content_release_year = tmdb.withColumn('release_date',
                   get_json_object('tmdb_primary_info', '$.release_date')).select('id','release_date')
    content_release_year = content_release_year.select('id', substring('release_date', 1,4).alias('release_year_str'))
    content_release_year= content_release_year.withColumn("release_year_unscaled", content_release_year["release_year_str"].cast(T.IntegerType())).select('id', 'release_year_unscaled')
    return content_release_year

def NormalizeReleaseYear(content_release_year: DataFrame) -> DataFrame:
    """Transform all the release years, so they distribute in a unit normal.

    Example input:
    ---------------------
    | id | release_year |
    ---------------------
    |  1 |  1980        |
    ---------------------
    |  2 |  2002        |
    ---------------------
    |  3 |  2012        |
    ---------------------
    mean = 1998, std = 178.67

    Example output:
    ---------------------
    | id | release_year |
    ---------------------
    |  1 |  -0.1        |
    ---------------------
    |  2 |   0.02       |
    ---------------------
    |  3 |   0.08       |
    ---------------------

    Args:
        content_release_year (DataFrame): See the example input above.

    Returns:
        DataFrame: See the example output above.
    """
   
    content_release_year_nonNull = content_release_year.filter(content_release_year['release_year_unscaled'].isNotNull()) #null/0 budgets are left unprocessed
    summary = content_release_year_nonNull.select([mean('release_year_unscaled').alias('mu'), stddev('release_year_unscaled').alias('sigma')]).collect().pop()
    release_year_scaled = content_release_year_nonNull.withColumn('release_year', (content_release_year_nonNull['release_year_unscaled']-summary.mu)/summary.sigma).select('id', 'release_year')
    res = content_release_year.join(release_year_scaled, ['id'], 'leftouter').select('id','release_year')
    return res

def ComputeCasts(content: DataFrame) -> DataFrame:
    '''
     It first computes
#     the absolute count for the number of people in each department under 'Casts', then it
#     normalizes the count based on the mean and the standard deviation.

    Intermidiate result:
|    id|   |Acting|Actors|Art|Camera|Costume & Make-Up|Creator|Crew|Directing|Editing|Lighting|Production|Sound|Visual Effects|Writing|
+------+---+------+------+---+------+-----------------+-------+----+---------+-------+--------+----------+-----+--------------+-------+
| 97216|  0|    54|     0|  0|     0|                0|      0|   1|        0|      0|       0|         0|    0|             0|      0|
| 71936|  0|    47|     0|  0|     0|                0|      0|   0|        0|      0|       0|         1|    0|             0|      0|
| 62526|  0|    22|     0|  0|     0|                0|      0|   0|        0|      0|       0|         0|    0|             0|      0|

#     Example output:
#      ------------------------------------------------
#     | id | cast_composition | crew_composition      |
#     -------------------------------------------------
#     | 1  | [1.0, 0.0, ...]  | [0.0, ..., -1.0, ...] |
#     -------------------------------------------------
#     | 2  | [-1.0, 0.0, ...] | [0.0, ..., 1.0, ...]  |
#     -------------------------------------------------
    '''
    casts= content.withColumn('cast',
                    get_json_object('tmdb_credits', '$.cast')).select('id','cast')
    casts = casts.filter(casts['cast'].isNotNull()).withColumn('departments', F.udf(lambda x: [i['known_for_department'] for i in json.loads(x)]) ('cast')).select('id', 'departments')
    casts2= casts.withColumn(
        "department",
        split(regexp_replace(col("departments"), r"(^\[)|(\]$)|(')", ""), ", ")
    )
    casts2 = casts2.select('id', 'department')
    casts2 = casts2.selectExpr("id","explode(department) as department").groupby("id").pivot('department').count().na.fill(0)
    # casts2:  Intermidiate result
    # Normalize each column
    selected_columns = [column for column in casts2.columns if column!='id' and column != '' ] 
    stats = (casts2.groupBy().agg(
            *([stddev_pop(x).alias(x + '_stddev') for x in selected_columns] + 
            [avg(x).alias(x + '_avg') for x in selected_columns])))
    df2 = casts2.join(broadcast(stats))
    exprs = ['id']+[((df2[x] - df2[x + '_avg']) / df2[x + '_stddev']).alias(x) for x in selected_columns]
    df2=df2.select(exprs)
    #combine multiple columns into one feature:
    assembler = VectorAssembler(inputCols=selected_columns, outputCol='sparse_casts',handleInvalid="skip")
    df_sep1 = assembler.transform(df2).select('id','sparse_casts').na.fill(value = 0, subset=["sparse_casts"])
    #convert sparse array to dense array
    def sparse_to_array(v):
            v = DenseVector(v)
            new_array = list([float(x) for x in v])
            return new_array
    sparse_to_array_udf = F.udf(sparse_to_array, T.ArrayType(T.FloatType()))
    res = df_sep1.withColumn('cast_composition', sparse_to_array_udf('sparse_casts')).select('id','cast_composition')
    return res


def ComputeCrews(content:DataFrame) -> DataFrame:
    """
         It first computes
#     the absolute count for the number of people in each department under 'crew', then it
#     normalizes the count based on the mean and the standard deviation.

    """
    crews= content.withColumn('crew',
                    get_json_object('tmdb_credits', '$.crew')).select('id','crew')
    crews = crews.filter(crews['crew'].isNotNull()).withColumn('departments', F.udf(lambda x: [i['department'] for i in json.loads(x)]) ('crew')).select('id', 'departments')
    crews2= crews.withColumn(
        "department",
        split(regexp_replace(col("departments"), r"(^\[)|(\]$)", ""), ", ")
    )
    crews2 = crews2.select('id', 'department')
    crews2 = crews2.selectExpr("id","explode(department) as department").groupby("id").pivot('department').count().na.fill(0)
    selected_columns = [column for column in crews2.columns if column!='id' and column != '' ] 
    stats = (crews2.groupBy().agg(
            *([stddev_pop(x).alias(x + '_stddev') for x in selected_columns] + 
            [avg(x).alias(x + '_avg') for x in selected_columns])))
    df2 = crews2.join(broadcast(stats))
    exprs = ['id']+[((df2[x] - df2[x + '_avg']) / df2[x + '_stddev']).alias(x) for x in selected_columns]
    df2=df2.select(exprs)
    assembler = VectorAssembler(inputCols=selected_columns, outputCol='sparse_crews',handleInvalid="skip")
    df_sep1 = assembler.transform(df2).select('id','sparse_crews').na.fill(value = 0, subset=["sparse_crews"])
    #convert sparse array to dense array
    def sparse_to_array(v):
            v = DenseVector(v)
            new_array = list([float(x) for x in v])
            return new_array
    sparse_to_array_udf = F.udf(sparse_to_array, T.ArrayType(T.FloatType()))
    res = df_sep1.withColumn('crew_composition', sparse_to_array_udf('sparse_crews')).select('id','crew_composition')
    return res
# def ComputeTeamComposition(
#         content_credits: DataFrame) -> DataFrame:
#     """Finds the composition of the content creation team. It first computes
#     the absolute count for the number of people in each department, then it
#     normalizes the count based on the mean and the standard deviation.

#     Example input:
#     --------------------------------------------------
#     | id | tmdb_credits                              |
#     --------------------------------------------------
#     | 1  | '"cast": [                                |
#     |    |  { "known_for_department": "Acting" },    |
#     |    |  { "known_for_department": "Acting" }     |
#     |    | ],                                        |
#     |    | "crew": [                                 |
#     |    |  { "known_for_department": "Directing" }, |
#     |    |  { "known_for_department": "Writing" }    |
#     |    | ]'                                        |
#     --------------------------------------------------
#     | 2  | '"cast": [                                |
#     |    |  { "known_for_department": "Acting" }     |
#     |    | ],                                        |
#     |    | "crew": [                                 |
#     |    |  { "known_for_department": "Directing" }, |
#     |    |  { "known_for_department": "Sound" }      |
#     |    | ]'                                        |
#     --------------------------------------------------

#     Intermediate result (count by department):
#     -------------------------------------------
#     | id | department          | cast | count |
#     -------------------------------------------
#     | 1  | "Acting"            | true | 2     |
#     -------------------------------------------
#     | 1  | "Directing"         | true | 0     |
#     -------------------------------------------
#     | 1  | "Writing"           | true | 0     |
#     -------------------------------------------
#     | 1  | "Production"        | true | 0     |
#     -------------------------------------------
#     | 1  | "Crew"              | true | 0     |
#     -------------------------------------------
#     | 1  | "Sound"             | true | 0     |
#     -------------------------------------------
#     | 1  | "Camera"            | true | 0     |
#     -------------------------------------------
#     | 1  | "Art"               | true | 0     |
#     -------------------------------------------
#     | 1  | "Costume & Make-Up" | true | 0     |
#     -------------------------------------------
#     | 1  | "Editing"           | true | 0     |
#     -------------------------------------------
#     | 1  | "Visual Effects"    | true | 0     |
#     -------------------------------------------
#     | 1  | "Lighting"          | true | 0     |
#     -------------------------------------------
#     | 1  | "Creator"           | true | 0     |
#     -------------------------------------------

#     ... ...


#     Example output:
#      ------------------------------------------------
#     | id | cast_composition | crew_composition      |
#     -------------------------------------------------
#     | 1  | [1.0, 0.0, ...]  | [0.0, ..., -1.0, ...] |
#     -------------------------------------------------
#     | 2  | [-1.0, 0.0, ...] | [0.0, ..., 1.0, ...]  |
#     -------------------------------------------------

#     Args:
#         content_credits (DataFrame): See the example input above.

#     Returns:
#         DataFrame: See the example output above.
#     """
#     content_credits.show()

def GetVoteCount(content: DataFrame) -> DataFrame:
    '''
    Extract Tmdb Vote count from content tmdb json column.
    '''
    tmdb = content.select(["id","tmdb_primary_info"])
    content_tmdb_vote_count= tmdb.withColumn('vote_count_unscaled',
                    get_json_object('tmdb_primary_info', '$.vote_count')).select('id','vote_count_unscaled')
    return content_tmdb_vote_count

def NormalizeTmdbVoteCount(content_tmdb_vote_count: DataFrame) -> DataFrame:
    """Transform all the TMDB vote counts, so they distribute in a unit normal.

    Example input:
    --------------------------
    | id | tmdb_vote_count   |
    --------------------------
    |  1 |  3000             |
    --------------------------
    |  2 |  2000             |
    --------------------------
    |  3 |  1000             |
    --------------------------
    mean = 2000, std = 816.5

    Example output:
    ------------------------
    | id | tmdb_vote_count |
    ------------------------
    |  1 |  1.2247         |
    ------------------------
    |  2 |  0              |
    ------------------------
    |  3 |  -1.2247        |
    ------------------------

    Args:
        content_tmdb_vote_count (DataFrame): See the example input above.

    Returns:
        DataFrame: See the example output above.
    """
    
    content_tmdb_vote_count_nonNull = content_tmdb_vote_count.filter(content_tmdb_vote_count['vote_count_unscaled'].isNotNull()) #null vote count are left unprocessed
    summary = content_tmdb_vote_count_nonNull.select([mean('vote_count_unscaled').alias('mu'), stddev('vote_count_unscaled').alias('sigma')]).collect().pop()
    vote_count_scaled = content_tmdb_vote_count_nonNull.withColumn('tmdb_vote_count', (content_tmdb_vote_count_nonNull['vote_count_unscaled']-summary.mu)/summary.sigma).select('id', 'tmdb_vote_count')
    res = content_tmdb_vote_count.join(vote_count_scaled, ['id'], 'leftouter').select('id','tmdb_vote_count')
    return res
def GetTmdbAverageRating(content: DataFrame) -> DataFrame:
   """
   Extract tmdb average rating from content tmdb json volumn.
   """
   content_tmdb_avg_rating= content.withColumn('tmdb_avg_rating_unscaled',
                   get_json_object('tmdb_primary_info', '$.vote_average')).select('id','tmdb_avg_rating_unscaled')
   return content_tmdb_avg_rating

def NormalizeTmdbAverageRating(
        content_tmdb_avg_rating: DataFrame) -> DataFrame:
    """Transform all the TMDB average ratings, so they distribute in a unit
    normal.

    Example input:
    --------------------------
    | id | tmdb_avg_rating   |
    --------------------------
    |  1 |  9.5              |
    --------------------------
    |  2 |  7                |
    --------------------------
    |  3 |  2                |
    --------------------------
    mean = 6.2, std = 3.5

    Example output:
    ------------------------
    | id | tmdb_avg_rating |
    ------------------------
    |  1 |   0.94          |
    ------------------------
    |  2 |   0.23          |
    ------------------------
    |  3 |  -1.2           |
    ------------------------

    Args:
        content_tmdb_avg_rating (DataFrame): See the example input above.

    Returns:
        DataFrame: See the example output above.
    """
    tmdb_avg_rating_nonNull = content_tmdb_avg_rating.filter(content_tmdb_avg_rating['tmdb_avg_rating_unscaled'].isNotNull()) #null avg rating are left unprocessed
    summary = tmdb_avg_rating_nonNull.select([mean('tmdb_avg_rating_unscaled').alias('mu'), stddev('tmdb_avg_rating_unscaled').alias('sigma')]).collect().pop()
    tmdb_avg_rating_scaled = tmdb_avg_rating_nonNull.withColumn('tmdb_avg_rating', (tmdb_avg_rating_nonNull['tmdb_avg_rating_unscaled']-summary.mu)/summary.sigma).select('id', 'tmdb_avg_rating')
    res = content_tmdb_avg_rating.join(tmdb_avg_rating_scaled, ['id'], 'leftouter').select('id','tmdb_avg_rating')
    return res

def ComputeCoreContentFeatures(contents: DataFrame,
                               user_rating_feebacks: DataFrame) -> DataFrame:
    """Extracts core features from the content dataframe as well as from the
    user rating feedbacks. See below for the list of core features.

    Args:
        contents (DataFrame): The content dataframe with the schema as follows,
            root
                |-- id: long (nullable = false)
                |-- title: string (nullable = true)
                |-- genres: array (nullable = true)
                |    |-- element: string (containsNull = false)
                |-- genome_scores: json (nullable = true)
                |-- tags: json (nullable = true)
                |-- imdb_id: integer (nullable = true)
                |-- tmdb_id: integer (nullable = true)
                |-- imdb_primary_info: json (nullable = true)
                |-- tmdb_primary_info: json (nullable = true)
                |-- tmdb_credits: json (nullable = true)
        user_rating_feebacks (DataFrame): The user rating feedback dataframe
            with the schema as follows:
            root
                |-- user_id: long (nullable = false)
                |-- content_id: long (nullable = false)
                |-- rated_at: timestamp (nullable = false)
                |-- rating: double (nullable = false)

    Returns:
        DataFrame: A dataframe containing core content features, and the schema
            goes as below,
            root
                |-- id: long (nullable = false)
                |-- genres: array (nullable = false)
                |    |-- element: float (containsNull = false)
                |-- languages: array (nullable = false)
                |    |-- element: float (containsNull = false)
                |-- avg_rating: float (nullable = true)
                |-- rating_count: float (nullable = true)
                |-- budget: float (nullable = true)
                |-- runtime: float (nullable = true)
                |-- release_year: float (nullable = true)
                |-- cast_composition: array (nullable = true)
                |    |-- element: float (containsNull = false)
                |-- crew_composition: array (nullable = true)
                |    |-- element: float (containsNull = false)
                |-- tmdb_avg_rating: float (nullable = true)
                |-- tmdb_vote_count: float (nullable = true)
    """
    
    genres = VectorizeGenres(contents)
    spoken_language = GetSpokenLanguages(contents)
    languages = VectorizeLanguages(spoken_language)
    avg_rating = ComputeNormalizedAverageRating(user_rating_feebacks)
    rating_count = ComputeNormalizedRatingCount(user_rating_feebacks)
    dollar_budget =GetBuget(contents)
    budget = NormalizeBudget(dollar_budget)
    unscaled_runtime = GetRuntime(contents)
    runtime = NormalizeRuntime(unscaled_runtime)
    release_year_unscaled = GetReleaseYear(contents)
    release_year  = NormalizeReleaseYear(release_year_unscaled)
    cast_composition = ComputeCasts(contents)
    crew_composition = ComputeCrews(contents)
    vote_count_unscaled = GetVoteCount(contents) 
    tmdb_vote_count = NormalizeTmdbVoteCount(vote_count_unscaled)
    tmdb_avg_rating = GetTmdbAverageRating(contents)
    tmdb_avg_rating = NormalizeTmdbAverageRating(tmdb_avg_rating)

In [None]:
genres.join(languages, ['id'], 'outer').join(avg_rating, ['id'], 'outer').join(rating_count, ['id'], 'outer').take(1)
    .join(budget, ['id'], 'outer').join(runtime,['id'], 'outer').join(release_year, ['id'], 'outer')\
        .join(cast_composition,['id'], 'outer' ).join(crew_composition,['id'], 'outer' ).join(tmdb_avg_rating,['id'], 'outer' )\
            .join(tmdb_avg_rating, ['id'], 'outer' ).join(tmdb_vote_count,  ['id'], 'outer' ).show(10)

22/12/01 22:37:28 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[Stage 59:>   (0 + 1) / 1][Stage 60:>   (0 + 1) / 1][Stage 61:>   (0 + 1) / 1]

22/12/01 22:37:35 ERROR Executor: Exception in task 0.0 in stage 62.0 (TID 35)
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.util.Arrays.copyOf(Arrays.java:3481)
	at java.base/java.util.ArrayList.grow(ArrayList.java:237)
	at java.base/java.util.ArrayList.grow(ArrayList.java:244)
	at java.base/java.util.ArrayList.add(ArrayList.java:454)
	at java.base/java.util.ArrayList.add(ArrayList.java:467)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2348)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:356)
	at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:496)
	at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:413)
	at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:190)
	at org.postgresql.jdbc.PgPreparedStatement.executeQuery(PgPreparedStatement.java:134)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
	at org.apac

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 51524)
Traceback (most recent call last):
  File "/Users/minfei/opt/anaconda3/lib/python3.9/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/Users/minfei/opt/anaconda3/lib/python3.9/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/Users/minfei/opt/anaconda3/lib/python3.9/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Users/minfei/opt/anaconda3/lib/python3.9/socketserver.py", line 747, in __init__
    self.handle()
  File "/Users/minfei/opt/anaconda3/lib/python3.9/site-packages/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/Users/minfei/opt/anaconda3/lib/python3.9/site-packages/pyspark/accumulators.py", line 253, in poll
    if func():
  File "/Users/minfei/opt/an

ConnectionRefusedError: [Errno 61] Connection refused

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/Users/minfei/opt/anaconda3/lib/python3.9/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/minfei/opt/anaconda3/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [Errno 54] Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/minfei/opt/anaconda3/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/minfei/opt/anaconda3/lib/python3.9/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


In [6]:
genres = VectorizeGenres(content)
spoken_language = GetSpokenLanguages(content)
languages = VectorizeLanguages(spoken_language)
avg_rating = ComputeNormalizedAverageRating(rating)
rating_count = ComputeNormalizedRatingCount(rating)
dollar_budget =GetBuget(content)
budget = NormalizeBudget(dollar_budget)
unscaled_runtime = GetRuntime(content)
runtime = NormalizeRuntime(unscaled_runtime)
release_year_unscaled = GetReleaseYear(content)
release_year  = NormalizeReleaseYear(release_year_unscaled)
cast_composition = ComputeCasts(content)
crew_composition = ComputeCrews(content)
vote_count_unscaled = GetVoteCount(content) 
tmdb_vote_count = NormalizeTmdbVoteCount(vote_count_unscaled)
tmdb_avg_rating = GetTmdbAverageRating(content)
tmdb_avg_rating = NormalizeTmdbAverageRating(tmdb_avg_rating)


                                                                                

                                                                                

In [41]:
tmdb_avg_rating.show()

[Stage 155:>                                                        (0 + 1) / 1]

+---+--------------------+
| id|     tmdb_avg_rating|
+---+--------------------+
|  1|   1.447818102074664|
|  2|  0.9296197423481904|
|  3| 0.36186689466973193|
|  4|  0.2684212888174166|
|  5|  0.2132034308137764|
|  6|  1.4010952991485068|
|  7| 0.16648062788761844|
|  8| -0.4048118260731246|
|  9|0.004366660158981151|
| 10|  0.6733522475107813|
| 11|  0.4701788469076421|
| 12| 0.07374294329176094|
| 13|  0.9706791752226923|
| 14|  0.7115799953594557|
| 15|-0.09686607951435985|
| 16|  1.4789666373587689|
| 17|  1.0931495525897414|
| 18|-0.15845522882611268|
| 19| 0.27479258012552926|
| 20|-0.08553933941104895|
+---+--------------------+
only showing top 20 rows



                                                                                

In [39]:
vote_count_unscaled.filter(col('id')==1).show()

+---+-------------------+
| id|vote_count_unscaled|
+---+-------------------+
|  1|              15929|
+---+-------------------+



In [71]:
content_rating = rating.select('user_id', 'rating')
content_rating.show(10)

[Stage 135:>                                                        (0 + 1) / 1]

+-------+------+
|user_id|rating|
+-------+------+
|  79679|   2.0|
|  79679|   4.0|
|  79679|   4.0|
|  79679|   4.0|
|  79738|   1.0|
|  79738|   4.0|
|  79738|   1.0|
|  79738|   1.0|
|  79738|   1.0|
|  79738|   1.0|
+-------+------+
only showing top 10 rows



                                                                                

In [279]:

content_rating = rating.select('user_id', 'rating')
content_rating = content_rating.groupBy('user_id').agg(count("*").alias("count_unscaled"))
summary = content_rating.select([mean('count_unscaled').alias('mu'), stddev('count_unscaled').alias('sigma')]).collect().pop()
dft = content_rating.withColumn('rating_count', (content_rating['count_unscaled']-summary.mu)/summary.sigma)
dft.show()

[Stage 598:>                                                        (0 + 1) / 1]

+-------+--------------+--------------------+
|user_id|count_unscaled|        rating_count|
+-------+--------------+--------------------+
| 274182|           410|  1.4664842070632833|
| 264195|           336|  1.1186756491278669|
| 247112|           143|   0.211553329107118|
| 274800|           316|  1.0246733361723488|
|  10422|          1326|   5.771790140426008|
|  15237|           599|  2.3548060644929283|
|  11619|           649|   2.589811846881723|
|   3764|            15| -0.3900614738081972|
|  15663|             5| -0.4370626302859562|
|  31762|            85|-0.06105337846388419|
| 268933|           481|  1.8001924180553723|
|  22429|           311|  1.0011727579334693|
|  23766|            96|-0.00935210633834...|
|  41862|           122| 0.11285090050382411|
|  35323|           248|  0.7050654721235876|
|  25207|            18| -0.3759611268648695|
|  30428|           315|  1.0199732205245728|
|  52743|           168|  0.3290562203015155|
|  47928|            15| -0.390061

                                                                                

In [276]:
# content_rating = rating.select('user_id', 'rating')
# content_rating = content_rating.groupBy('user_id').agg(count('*').alias("rating_unscaled"))
# summary = content_rating.select([mean('rating').alias('mu'), stddev('rating').alias('sigma')]).collect().pop()
# dft = content_rating.withColumn('avg_rating', (content_rating['rating']-summary.mu)/summary.sigma)
# dft.show()

AnalysisException: Column 'rating' does not exist. Did you mean one of the following? [user_id, rating_unscaled];
'Aggregate [avg('rating) AS mu#41376, stddev_samp('rating) AS sigma#41386]
+- Aggregate [user_id#37L], [user_id#37L, count(1) AS rating_unscaled#41372L]
   +- Project [user_id#37L, rating#40]
      +- Project [user_id#37L, content_id#38L, rated_at#39, rating#40]
         +- Relation [user_id#37L,content_id#38L,rated_at#39,rating#40,ingested_at#41] JDBCRelation(user_rating) [numPartitions=1]


In [203]:
# get the original_language of movies
# tmdb = content.select(["id","tmdb_primary_info"])
# lan = tmdb.withColumn('orginal_language',
#                    get_json_object('tmdb_primary_info', '$.original_language'))
# lan.show(10)

#get the original_language of movies
tmdb = content.select(["id","tmdb_primary_info"])
lan = tmdb.withColumn('languages',
                   get_json_object('tmdb_primary_info', '$.spoken_languages'))
newdf= lan.withColumn('spoken_languages_all', F.from_json(
        F.col('languages'),
        T.ArrayType(T.StructType([
            T.StructField('name', T.StringType()),
            T.StructField('iso_639_1', T.StringType()),
            T.StructField('english_name', T.StringType()),
        ]))
    ))
newdf = newdf.select('id','spoken_languages_all')
spoken_languages = newdf.withColumn('languages' , expr("transform(spoken_languages_all, x -> x['english_name'])"))
spoken_languages.select('id','languages')



DataFrame[id: bigint, languages: array<string>]

In [250]:
df1 = spoken_languages.select('id','languages')

df1 = spoken_languages.select('id','languages')
languages_list = [x[0] for x in df1.select(explode("languages").alias("languages")).distinct().orderBy("languages").collect()]
df_sep = df1.select("*" ,*[
    array_contains("languages", lan).alias("l_{}".format(lan)).cast("integer")
    for lan in languages_list]
).drop('languages')
selected_columns = [column for column in df_sep.columns if column.startswith("l_")] 
assembler = VectorAssembler(inputCols=selected_columns, outputCol='sparse_languages',handleInvalid="skip")

df_sep1 = assembler.transform(df_sep).select('id','sparse_languages').na.fill(value = 0, subset=["sparse_languages"])

df_sep1.take(20)
def sparse_to_array(v):
    v = DenseVector(v)
    new_array = list([float(x) for x in v])
    return new_array
sparse_to_array_udf = F.udf(sparse_to_array, T.ArrayType(T.FloatType()))
res = df_sep1.withColumn('languages', sparse_to_array_udf('sparse_languages')).select('id','languages')
#res.filter(res['id'] ==  147752) 



                                                                                

In [78]:
budget.show()

+------+---------------+
|    id|budget_unscaled|
+------+---------------+
|191689|              0|
|147752|              0|
|154984|              0|
|132725|              0|
|164819|           null|
|174751|              0|
|158094|           null|
|127419|              0|
|136824|              0|
| 25989|              0|
|156027|              0|
|160325|              0|
|  2477|              0|
|186437|              0|
| 65986|              0|
|140355|              0|
|167570|           null|
|109736|              0|
|174903|              0|
|167044|              0|
+------+---------------+
only showing top 20 rows



In [68]:
content_tmdb_vote_count.show()

[Stage 126:>                                                        (0 + 1) / 1]

+------+-------------------+
|    id|vote_count_unscaled|
+------+-------------------+
|191689|                  6|
|147752|                  6|
|154984|                 22|
|132725|                  0|
|164819|               null|
|174751|                  9|
|158094|               null|
|127419|                 46|
|136824|                  6|
| 25989|                 62|
|156027|                 67|
|160325|                 10|
|  2477|                133|
|186437|                 26|
| 65986|                 12|
|140355|                 11|
|167570|               null|
|109736|                  6|
|174903|                  1|
|167044|                  1|
+------+-------------------+
only showing top 20 rows



                                                                                

In [69]:
tmdb = content.select(["id","tmdb_primary_info"])
content_tmdb_vote_count= tmdb.withColumn('vote_count_unscaled',
                   get_json_object('tmdb_primary_info', '$.vote_count')).select('id','vote_count_unscaled')


content_tmdb_vote_count_nonNull = content_tmdb_vote_count.filter(content_tmdb_vote_count['vote_count_unscaled'].isNotNull()) #null vote count are left unprocessed

summary = content_tmdb_vote_count_nonNull.select([mean('vote_count_unscaled').alias('mu'), stddev('vote_count_unscaled').alias('sigma')]).collect().pop()
vote_count_scaled = content_tmdb_vote_count_nonNull.withColumn('tmdb_vote_count', (content_tmdb_vote_count_nonNull['vote_count_unscaled']-summary.mu)/summary.sigma).select('id', 'tmdb_vote_count')
res = content_tmdb_vote_count.join(vote_count_scaled, ['id'], 'leftouter').na.fill(value = 0, subset=["tmdb_vote_count"]).select('id','tmdb_vote_count')


[Stage 127:>                                                        (0 + 1) / 1]

+------+---------------+
|    id|tmdb_vote_count|
+------+---------------+
|164819|            0.0|
+------+---------------+



                                                                                

In [86]:
tmdb = content.select(["id","tmdb_primary_info"])
budget = tmdb.withColumn('budget_unscaled',
                   get_json_object('tmdb_primary_info', '$.budget')).select('id','budget_unscaled')
budget_non0 = budget.filter(budget['budget_unscaled'] > 0)
summary = budget_non0.select([mean('budget_unscaled').alias('mu'), stddev('budget_unscaled').alias('sigma')]).collect().pop()
budget_scaled = budget_non0.withColumn('budget', (budget_non0['budget_unscaled']-summary.mu)/summary.sigma).select('id', 'budget')
res = budget.join(budget_scaled, ['id'], 'leftouter').na.fill(value = 0, subset=["budget"]).select('id','budget')



                                                                                

+------+--------------------+
|    id|              budget|
+------+--------------------+
|177479| -0.5733302404941284|
|177081| -0.5742063429599672|
|102517| -0.5773316507706047|
|153961|-0.48346715899808546|
| 69873| 0.14325899781449788|
|138300| -0.3895990376582078|
|178637| -0.5757708116489652|
|  1188|-0.48346715899808546|
| 47525| -0.5084986580220529|
|   505|   0.674239670860406|
|120274|  -0.420888411438167|
| 49007|-0.42714628619415884|
| 81094|-0.45217778521812624|
|176751|  0.9871334086599983|
|  4148|  2.1448402385184897|
|  4578|-0.07670529985861549|
|  4967| -0.5456656467984561|
| 71239| -0.3676964760122363|
| 77818| -0.2644415425383709|
|187577|  0.5177928019606098|
+------+--------------------+
only showing top 20 rows



In [51]:
content_release_year = tmdb.withColumn('release_date',
                   get_json_object('tmdb_primary_info', '$.release_date')).select('id','release_date')
content_release_year = content_release_year.select('id', substring('release_date', 1,4).alias('release_year_str'))
content_release_year= content_release_year.withColumn("release_year_unscaled", content_release_year["release_year_str"].cast(T.IntegerType())).select('id', 'release_year_unscaled')

content_release_year.show()

+------+---------------------+
|    id|release_year_unscaled|
+------+---------------------+
|191689|                 1972|
|147752|                 2014|
|154984|                 1967|
|132725|                 1963|
|164819|                 null|
|174751|                 2017|
|158094|                 null|
|127419|                 2013|
|136824|                 2002|
| 25989|                 1955|
|156027|                 2012|
|160325|                 2011|
|  2477|                 1986|
|186437|                 1942|
| 65986|                 1950|
|140355|                 2013|
|167570|                 null|
|109736|                 2012|
|174903|                 2002|
|167044|                 2014|
+------+---------------------+
only showing top 20 rows



                                                                                

In [57]:
tmdb.take(1)

                                                                                

[Row(id=191689, tmdb_primary_info='{"id": 399168, "adult": false, "title": "The Mathematician and the Devil", "video": false, "budget": 0, "genres": [{"id": 35, "name": "Comedy"}, {"id": 18, "name": "Drama"}, {"id": 14, "name": "Fantasy"}], "status": "Released", "imdb_id": "tt3154916", "revenue": 0, "runtime": 21, "tagline": "", "homepage": "", "overview": "A mathematician offers to sell his soul to the devil for a proof or disproof of Fermat\'s Last Theorem. Based on \\"The Devil and Simon Flagg\\" by Arthur Porges.", "popularity": 0.6, "vote_count": 6, "poster_path": "/5JCaWtCySRPy2JbHwgUAmYJBM8b.jpg", "release_date": "1972-06-06", "vote_average": 8.3, "backdrop_path": null, "original_title": "Математик и чёрт", "spoken_languages": [{"name": "Pусский", "iso_639_1": "ru", "english_name": "Russian"}], "original_language": "ru", "production_companies": [{"id": 88367, "name": "Centrnauchfilm", "logo_path": "/8BGGqyuaxijzhqzmrgdCINWbPhj.png", "origin_country": "SU"}], "production_countrie

In [55]:
content_release_year = tmdb.withColumn('release_date',
                   get_json_object('tmdb_primary_info', '$.release_date')).select('id','release_date')
content_release_year = content_release_year.select('id', substring('release_date', 1,4).alias('release_year_str'))
content_release_year= content_release_year.withColumn("release_year_unscaled", content_release_year["release_year_str"].cast(T.IntegerType())).select('id', 'release_year_unscaled')


content_release_year_nonNull = content_release_year.filter(content_release_year['release_year_unscaled'].isNotNull()) #null/0 budgets are left unprocessed
#content_release_year_nonNull.show()
summary = content_release_year_nonNull.select([mean('release_year_unscaled').alias('mu'), stddev('release_year_unscaled').alias('sigma')]).collect().pop()
release_year_scaled = content_release_year_nonNull.withColumn('release_year', (content_release_year_nonNull['release_year_unscaled']-summary.mu)/summary.sigma).select('id', 'release_year')
res = content_release_year.join(release_year_scaled, ['id'], 'leftouter').na.fill(value = 0, subset=["release_year"]).select('id','release_year')
# return res
res.filter(res['release_year'] < 0).show(300)

[Stage 105:>                                                        (0 + 1) / 1]

+------+--------------------+
|    id|        release_year|
+------+--------------------+
|  2453| -0.2137536585794371|
|154986| -0.8532724309610892|
|148347| -1.7326107429858608|
|124609| -2.3321595920936598|
| 88228| -1.5327611266165944|
| 96734|-0.09384388875787733|
| 32954| -0.5335130447702631|
|121997| -2.0124002059028334|
|133907| -0.5335130447702631|
| 91056| -1.6926408197120075|
|  3764|-0.01390404221017...|
| 89881|  -1.612700973164301|
|183799| -0.7733325844133826|
|126753| -0.9332122775087957|
| 62526| -0.6534228145918228|
|169522|-0.17378373530558383|
| 63676| -1.0131521240565022|
|128283|  -1.612700973164301|
|161736| -0.8532724309610892|
| 89806| -1.5327611266165944|
| 79035| -2.0124002059028334|
|  2250|-0.05387396548402...|
|  7279| -0.6534228145918228|
|122045| -1.8125505895335672|
|104498| -0.7733325844133826|
|  6721|-0.01390404221017...|
|190709| -0.9332122775087957|
|124703| -1.6526708964381542|
| 40436| -1.6926408197120075|
|  4894| -0.3736333516748501|
| 81085|-0

                                                                                

In [15]:
tmdb = content.select(["id","tmdb_primary_info"])

In [12]:
content_runtime= content.withColumn('runtime_unscaled',
                   get_json_object('tmdb_primary_info', '$.runtime')).select('id','runtime_unscaled')
runtime_non0 = content_runtime.filter(content_runtime['runtime_unscaled'] > 0) #null/0 runtime are left unprocessed
summary = runtime_non0.select([mean('runtime_unscaled').alias('mu'), stddev('runtime_unscaled').alias('sigma')]).collect().pop()
runtime_scaled = runtime_non0.withColumn('runtime', (runtime_non0['runtime_unscaled']-summary.mu)/summary.sigma).select('id', 'runtime')
res = content_runtime.join(runtime_scaled, ['id'], 'leftouter').na.fill(value = 0, subset=["runtime"]).select('id','runtime')
res.show()

[Stage 49:>                                                         (0 + 1) / 1]

+---+--------------------+
| id|             runtime|
+---+--------------------+
|  1|-0.47434215919753714|
|  2|   0.308206248751211|
|  3| 0.20613471727963517|
|  4|   1.090754656699959|
|  5| 0.37625393639892823|
|  6|  2.5537799411258795|
|  7|   1.090754656699959|
|  8|  0.0700393419842007|
|  9| 0.37625393639892823|
| 10|   1.192826188171535|
| 11|  0.6484446869897972|
| 12|-0.23617525243052684|
| 13|  -0.576413690669113|
| 14|   3.302304505250769|
| 15|  0.9886831252283833|
| 16|   2.859994535540607|
| 17|  1.3969692511146867|
| 18| 0.10406318580805932|
| 19| -0.1681275647828096|
| 20|  0.5123493116943627|
+---+--------------------+
only showing top 20 rows



                                                                                

In [6]:
json.loads(df.select(['tags']).take(1)[0]["tags"])[0]

{'tag': 'Bridget Terry', 'timestamp_secs': 1422435184}

In [None]:
df1 = content_genres.select('id','cont')
genres_list = [x[0] for x in df1.select(explode("genres").alias("genres")).distinct().orderBy("genres").collect()]
df_sep = df1.select("*", *[
    array_contains("genres", g).alias("g_{}".format(g)).cast("integer")
    for g in genres_list]
    ).drop('genres')
selected_columns = [column for column in df_sep.columns if column.startswith("g_")] 
assembler = VectorAssembler(inputCols=selected_columns, outputCol='sparse_genres')
df_sep = assembler.transform(df_sep).select('id','sparse_genres')
def sparse_to_array(v):
    v = DenseVector(v)
    new_array = list([float(x) for x in v])
    return new_array

sparse_to_array_udf = F.udf(sparse_to_array, T.ArrayType(T.FloatType()))
res = df_sep.withColumn('genres', sparse_to_array_udf('sparse_genres')).select('id','genres')
return res

In [11]:
content_tmdb_avg_rating.show()

+------+------------------------+
|    id|tmdb_avg_rating_unscaled|
+------+------------------------+
|191689|                     8.3|
|147752|                     3.8|
|154984|                     5.5|
|132725|                     0.0|
|164819|                    null|
|174751|                     4.7|
|158094|                    null|
|127419|                   4.772|
|136824|                     7.8|
| 25989|                    7.21|
|156027|                     5.8|
|160325|                     5.5|
|  2477|                   5.308|
|186437|                     7.0|
| 65986|                     5.6|
|140355|                     6.9|
|167570|                    null|
|109736|                     8.7|
|174903|                     8.0|
|167044|                     6.5|
+------+------------------------+
only showing top 20 rows



                                                                                

In [25]:

content_tmdb_avg_rating= content.withColumn('tmdb_avg_rating_unscaled',
                   get_json_object('tmdb_primary_info', '$.vote_average')).select('id','tmdb_avg_rating_unscaled')

tmdb_avg_rating_nonNull = content_tmdb_avg_rating.filter(content_tmdb_avg_rating['tmdb_avg_rating_unscaled'].isNotNull()) #null avg rating are left unprocessed

summary = tmdb_avg_rating_nonNull.select([mean('tmdb_avg_rating_unscaled').alias('mu'), stddev('tmdb_avg_rating_unscaled').alias('sigma')]).collect().pop()
tmdb_avg_rating_scaled = tmdb_avg_rating_nonNull.withColumn('tmdb_avg_rating', (tmdb_avg_rating_nonNull['tmdb_avg_rating_unscaled']-summary.mu)/summary.sigma).select('id', 'tmdb_avg_rating')
res = content_tmdb_avg_rating.join(tmdb_avg_rating_scaled, ['id'], 'leftouter').select('id','tmdb_avg_rating')

res.filter(res['tmdb_avg_rating'].isNull()).show()

[Stage 57:>                                                         (0 + 1) / 1]

+------+---------------+
|    id|tmdb_avg_rating|
+------+---------------+
|103478|           null|
|157965|           null|
|163076|           null|
|192339|           null|
|112844|           null|
|115895|           null|
|176883|           null|
|182481|           null|
|   730|           null|
|101483|           null|
|106074|           null|
|141739|           null|
|150548|           null|
|158545|           null|
|176777|           null|
|104119|           null|
|109381|           null|
|166225|           null|
|166735|           null|
| 69201|           null|
+------+---------------+
only showing top 20 rows



                                                                                

In [23]:
json.loads(content.first()['tmdb_credits'])['crew']

                                                                                

[{'id': 1625694,
  'job': 'Story',
  'name': 'Arthur Porges',
  'adult': False,
  'gender': 0,
  'credit_id': '57458af992514129a0002537',
  'department': 'Writing',
  'popularity': 0.6,
  'profile_path': None,
  'original_name': 'Arthur Porges',
  'known_for_department': 'Writing'},
 {'id': 1786440,
  'job': 'Screenplay',
  'name': 'Semyon Raytburt',
  'adult': False,
  'gender': 2,
  'credit_id': '5b5f2df1c3a3684221056faa',
  'department': 'Writing',
  'popularity': 1.4,
  'profile_path': None,
  'original_name': 'Semyon Raytburt',
  'known_for_department': 'Directing'},
 {'id': 1786440,
  'job': 'Director',
  'name': 'Semyon Raytburt',
  'adult': False,
  'gender': 2,
  'credit_id': '5b5f2e04c3a368421e054d1a',
  'department': 'Directing',
  'popularity': 1.4,
  'profile_path': None,
  'original_name': 'Semyon Raytburt',
  'known_for_department': 'Directing'}]

In [33]:
content_tmdb_avg_rating= content.withColumn('crew',
                   get_json_object('tmdb_credits', '$.crew')).select('id','crew')
content_tmdb_avg_rating.first()

                                                                                

Row(id=191689, crew='[{"id":1625694,"job":"Story","name":"Arthur Porges","adult":false,"gender":0,"credit_id":"57458af992514129a0002537","department":"Writing","popularity":0.6,"profile_path":null,"original_name":"Arthur Porges","known_for_department":"Writing"},{"id":1786440,"job":"Screenplay","name":"Semyon Raytburt","adult":false,"gender":2,"credit_id":"5b5f2df1c3a3684221056faa","department":"Writing","popularity":1.4,"profile_path":null,"original_name":"Semyon Raytburt","known_for_department":"Directing"},{"id":1786440,"job":"Director","name":"Semyon Raytburt","adult":false,"gender":2,"credit_id":"5b5f2e04c3a368421e054d1a","department":"Directing","popularity":1.4,"profile_path":null,"original_name":"Semyon Raytburt","known_for_department":"Directing"}]')

In [48]:
j1

[{'id': 1625692,
  'name': 'Vsevolod Shestakov',
  'adult': False,
  'order': 0,
  'gender': 0,
  'cast_id': 0,
  'character': 'Математик',
  'credit_id': '57458a9c925141656200049c',
  'popularity': 0.6,
  'profile_path': '/sRiXFDtxrLnkYCXM235ywLjTbhm.jpg',
  'original_name': 'Vsevolod Shestakov',
  'known_for_department': 'Acting'},
 {'id': 28078,
  'name': 'Aleksandr Kaydanovskiy',
  'adult': False,
  'order': 1,
  'gender': 2,
  'cast_id': 1,
  'character': 'Черт',
  'credit_id': '57458aab92514165940004a4',
  'popularity': 2.883,
  'profile_path': '/yHYfkxQu3GVKxx5ibFmEom8qAS6.jpg',
  'original_name': 'Aleksandr Kaydanovskiy',
  'known_for_department': 'Acting'},
 {'id': 587911,
  'name': 'Alla Pokrovskaya',
  'adult': False,
  'order': 2,
  'gender': 1,
  'cast_id': 7,
  'character': '',
  'credit_id': '5b5f340f0e0a262e8d04fea5',
  'popularity': 0.753,
  'profile_path': '/flaYwCHvSzYhzm0bGFwyxhq6stS.jpg',
  'original_name': 'Alla Pokrovskaya',
  'known_for_department': 'Acting'}]

In [53]:
from collections import Counter

In [54]:
fruits = ["Apple", "Pear", "Peach", "Banana"]

Counter(fruits)

Counter({'Apple': 1, 'Pear': 1, 'Peach': 1, 'Banana': 1})

In [60]:
j1 = json.loads(content_tmdb_avg_rating.first()['cast'])

[i['known_for_department'] for i in json.loads(content_tmdb_avg_rating.first()['cast'])]

                                                                                

['Acting', 'Acting', 'Acting']

In [236]:
casts= content.withColumn('cast',
            get_json_object('tmdb_credits', '$.cast')).select('id','cast')
casts = casts.filter(casts['cast'].isNotNull()).withColumn('departments', F.udf(lambda x: [i['known_for_department'] for i in json.loads(x)]) ('cast')).select('id', 'departments')
casts2= casts.withColumn(
"department",
split(regexp_replace(col("departments"), r"(^\[)|(\]$)|(')", ""), ", ")
)
casts2 = casts2.select('id', 'department')
casts2 = casts2.selectExpr("id","explode(department) as department").groupby("id").pivot('department').count().na.fill(0)
#

                                                                                

In [242]:
content.withColumn('crew',
                    get_json_object('tmdb_credits', '$.crew')).select('id','crew').take(1)

                                                                                

[Row(id=191689, crew='[{"id":1625694,"job":"Story","name":"Arthur Porges","adult":false,"gender":0,"credit_id":"57458af992514129a0002537","department":"Writing","popularity":0.6,"profile_path":null,"original_name":"Arthur Porges","known_for_department":"Writing"},{"id":1786440,"job":"Screenplay","name":"Semyon Raytburt","adult":false,"gender":2,"credit_id":"5b5f2df1c3a3684221056faa","department":"Writing","popularity":1.4,"profile_path":null,"original_name":"Semyon Raytburt","known_for_department":"Directing"},{"id":1786440,"job":"Director","name":"Semyon Raytburt","adult":false,"gender":2,"credit_id":"5b5f2e04c3a368421e054d1a","department":"Directing","popularity":1.4,"profile_path":null,"original_name":"Semyon Raytburt","known_for_department":"Directing"}]')]

In [255]:
crews= content.withColumn('crew',
                    get_json_object('tmdb_credits', '$.crew')).select('id','crew')
crews = crews.filter(crews['crew'].isNotNull()).withColumn('departments', F.udf(lambda x: [i['department'] for i in json.loads(x)]) ('crew')).select('id', 'departments')
crews2= crews.withColumn(
    "department",
    split(regexp_replace(col("departments"), r"(^\[)|(\]$)", ""), ", ")
)
crews2 = crews2.select('id', 'department')
crews2 = crews2.selectExpr("id","explode(department) as department").groupby("id").pivot('department').count().na.fill(0)
selected_columns = [column for column in crews2.columns if column!='id' and column != '' ] 
stats = (crews2.groupBy().agg(
        *([stddev_pop(x).alias(x + '_stddev') for x in selected_columns] + 
        [avg(x).alias(x + '_avg') for x in selected_columns])))
df2 = crews2.join(broadcast(stats))
exprs = ['id']+[((df2[x] - df2[x + '_avg']) / df2[x + '_stddev']).alias(x) for x in selected_columns]
df2=df2.select(exprs)
assembler = VectorAssembler(inputCols=selected_columns, outputCol='sparse_crews',handleInvalid="skip")
df_sep1 = assembler.transform(df2).select('id','sparse_crews').na.fill(value = 0, subset=["sparse_crews"])
#convert sparse array to dense array
def sparse_to_array(v):
        v = DenseVector(v)
        new_array = list([float(x) for x in v])
        return new_array
sparse_to_array_udf = F.udf(sparse_to_array, T.ArrayType(T.FloatType()))
res = df_sep1.withColumn('crew_composition', sparse_to_array_udf('sparse_crews')).select('id','crew_composition')

                                                                                

In [257]:
df_sep1.take(1)

                                                                                

[Row(id=63676, sparse_crews=DenseVector([-0.0114, 0.4717, -0.1972, -0.1719, -0.2302, -0.4518, -0.0909, -0.1964, -0.152, -0.1449, -0.1408, -0.002]))]

In [None]:
res.show()

In [238]:

casts= content.withColumn('cast',
                    get_json_object('tmdb_credits', '$.crew')).select('id','cast')
casts = casts.filter(casts['cast'].isNotNull()).withColumn('departments', F.udf(lambda x: [i['known_for_department'] for i in json.loads(x)]) ('cast')).select('id', 'departments')
casts2= casts.withColumn(
    "department",
    split(regexp_replace(col("departments"), r"(^\[)|(\]$)", ""), ", ")
)
casts2 = casts2.select('id', 'department')
casts2 = casts2.selectExpr("id","explode(department) as department").groupby("id").pivot('department').count().na.fill(0)
# casts2:  Intermidiate result
# Normalize each column
selected_columns = [column for column in casts2.columns if column!='id' and column != '' ] 
stats = (casts2.groupBy().agg(
        *([stddev_pop(x).alias(x + '_stddev') for x in selected_columns] + 
        [avg(x).alias(x + '_avg') for x in selected_columns])))
df2 = casts2.join(broadcast(stats))
exprs = ['id']+[((df2[x] - df2[x + '_avg']) / df2[x + '_stddev']).alias(x) for x in selected_columns]
df2=df2.select(exprs)
#combine multiple columns into one feature:
assembler = VectorAssembler(inputCols=selected_columns, outputCol='sparse_casts',handleInvalid="skip")
df_sep1 = assembler.transform(df2).select('id','sparse_casts').na.fill(value = 0, subset=["sparse_casts"])
#convert sparse array to dense array
def sparse_to_array(v):
        v = DenseVector(v)
        new_array = list([float(x) for x in v])
        return new_array
sparse_to_array_udf = F.udf(sparse_to_array, T.ArrayType(T.FloatType()))
res = df_sep1.withColumn('cast_composition', sparse_to_array_udf('sparse_casts')).select('id','cast_composition')
   

[Stage 391:>                                                        (0 + 1) / 1]

22/12/01 21:32:55 ERROR Executor: Exception in task 0.0 in stage 391.0 (TID 245)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/var/folders/r_/g7vqnjzj64xb8jpg4qkh9yvr0000gn/T/ipykernel_10181/3485733914.py", line 3, in <lambda>
  File "/var/folders/r_/g7vqnjzj64xb8jpg4qkh9yvr0000gn/T/ipykernel_10181/3485733914.py", line 3, in <listcomp>
KeyError: 'known_for_department'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon

Traceback (most recent call last):
  File "/Users/minfei/opt/anaconda3/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 187, in manager
  File "/Users/minfei/opt/anaconda3/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/Users/minfei/opt/anaconda3/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 730, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/Users/minfei/opt/anaconda3/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 595, in read_int
    raise EOFError
EOFError


PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/var/folders/r_/g7vqnjzj64xb8jpg4qkh9yvr0000gn/T/ipykernel_10181/3485733914.py", line 3, in <lambda>
  File "/var/folders/r_/g7vqnjzj64xb8jpg4qkh9yvr0000gn/T/ipykernel_10181/3485733914.py", line 3, in <listcomp>
KeyError: 'known_for_department'


In [220]:
# #content_tmdb_avg_rating.filter(content_tmdb_avg_rating['cast'].isNull()).show()
# content_tmdb_avg_rating= content.withColumn('cast',
#                    get_json_object('tmdb_credits', '$.cast')).select('id','cast')
# content_tmdb_avg_rating.filter(col('id') ==97216 ).take(1)

In [234]:
casts= content.withColumn('cast',
                   get_json_object('tmdb_credits', '$.cast')).select('id','cast')
casts = casts.filter(casts['cast'].isNotNull()).withColumn('departments', F.udf(lambda x: [i['known_for_department'] for i in json.loads(x)]) ('cast')).select('id', 'departments')
casts2= casts.withColumn(
    "department",
    split(regexp_replace(col("departments"), r"(^\[)|(\]$)|(')", ""), ", ")
)
casts2 = casts2.select('id', 'department')
casts2 = casts2.selectExpr("id","explode(department) as department").groupby("id").pivot('department').count().na.fill(0)
selected_columns = [column for column in casts2.columns if column!='id' and column != '' ] 
stats = (casts2.groupBy().agg(
        *([stddev_pop(x).alias(x + '_stddev') for x in selected_columns] + 
          [avg(x).alias(x + '_avg') for x in selected_columns])))
df2 = casts2.join(broadcast(stats))
exprs = ['id']+[((df2[x] - df2[x + '_avg']) / df2[x + '_stddev']).alias(x) for x in selected_columns]
df2=df2.select(exprs)
assembler = VectorAssembler(inputCols=selected_columns, outputCol='sparse_casts',handleInvalid="skip")
df_sep1 = assembler.transform(df2).select('id','sparse_casts').na.fill(value = 0, subset=["sparse_casts"])
def sparse_to_array(v):
        v = DenseVector(v)
        new_array = list([float(x) for x in v])
        return new_array
sparse_to_array_udf = F.udf(sparse_to_array, T.ArrayType(T.FloatType()))
res = df_sep1.withColumn('casts', sparse_to_array_udf('sparse_casts')).select('id','casts')


                                                                                

In [235]:
res.show()

                                                                                

+------+--------------------+
|    id|               casts|
+------+--------------------+
| 97216|[2.3061943, -0.01...|
| 71936|[1.8627309, -0.01...|
| 62526|[0.2789333, -0.01...|
|156976|[-0.101178154, -0...|
|  5409|[1.6093233, -0.01...|
| 32954|[1.3559157, -0.01...|
|172267|[-0.35458577, -0....|
|152541|[0.78574854, -0.0...|
| 89041|[-0.4812896, -0.0...|
| 88576|[-0.7346972, -0.0...|
|171935|[-0.861401, -0.01...|
| 27469|[-0.4812896, -0.0...|
|153306|[-0.7346972, -0.0...|
|128245|[-0.16453007, -0....|
|145301|[-0.101178154, -0...|
| 86941|[-0.9247529, -0.0...|
|104854|[-0.7980491, -0.0...|
|  5556|[1.0391561, -0.01...|
|  2509|[-0.16453007, -0....|
| 84120|[1.0391561, -0.01...|
+------+--------------------+
only showing top 20 rows



                                                                                

In [219]:
assembler = VectorAssembler(inputCols=selected_columns, outputCol='sparse_casts',handleInvalid="skip")
df_sep1 = assembler.transform(casts2).select('id','sparse_casts').na.fill(value = 0, subset=["sparse_casts"])
def sparse_to_array(v):
        v = DenseVector(v)
        new_array = list([float(x) for x in v])
        return new_array
sparse_to_array_udf = F.udf(sparse_to_array, T.ArrayType(T.FloatType()))
res = df_sep1.withColumn('languages', sparse_to_array_udf('sparse_languages')).select('id','languages')

[Stage 287:>                                                        (0 + 1) / 1]

+------+--------------------+
|    id|        sparse_casts|
+------+--------------------+
| 97216|(14,[0,6],[54.0,1...|
| 71936|(14,[0,10],[47.0,...|
| 62526|     (14,[0],[22.0])|
|156976|     (14,[0],[16.0])|
|  5409|(14,[0,6,13],[43....|
| 32954|(14,[0,7,10,13],[...|
|172267|     (14,[0],[12.0])|
|152541|     (14,[0],[30.0])|
| 89041|(14,[0,11],[10.0,...|
| 88576|(14,[0,3],[6.0,1.0])|
|171935|      (14,[0],[4.0])|
| 27469|     (14,[0],[10.0])|
|153306|      (14,[0],[6.0])|
|128245|     (14,[0],[15.0])|
|145301|     (14,[0],[16.0])|
| 86941|      (14,[0],[3.0])|
|104854|(14,[0,7],[5.0,1.0])|
|  5556|     (14,[0],[34.0])|
|  2509|     (14,[0],[15.0])|
| 84120|(14,[0,3,4,10,11]...|
+------+--------------------+
only showing top 20 rows



                                                                                

In [187]:

casts_list = [x[0] for x in casts2.select(explode("department").alias("departments")).distinct().orderBy("departments").collect()]
df_sep = casts2.select("*" ,*[
        array_contains("department", dep).alias("dep_{}".format(dep)).cast("integer")
        for dep in casts_list]
    ).drop('department')
selected_columns = [column for column in df_sep.columns if column.startswith("dep_")] 

assembler = VectorAssembler(inputCols=selected_columns, outputCol='sparse_casts',handleInvalid="skip")
df_sep1 = assembler.transform(df_sep).select('id','sparse_casts').na.fill(value = 0, subset=["sparse_casts"])
df_sep1.show(100)
# casts_list = [x[0] for x in casts.select(explode("departments").alias("all_departments")).distinct().orderBy("all_departments").collect()]
# casts_list

[Stage 189:>                                                        (0 + 1) / 1]

+------+--------------------+
|    id|        sparse_casts|
+------+--------------------+
|191689|      (15,[1],[1.0])|
|147752|      (15,[0],[1.0])|
|154984|      (15,[1],[1.0])|
|132725|      (15,[1],[1.0])|
|174751|      (15,[1],[1.0])|
|127419|      (15,[1],[1.0])|
|136824|      (15,[0],[1.0])|
| 25989|(15,[1,11,14],[1....|
|156027|      (15,[1],[1.0])|
|160325|      (15,[1],[1.0])|
|  2477|      (15,[1],[1.0])|
|186437|      (15,[1],[1.0])|
| 65986|      (15,[1],[1.0])|
|140355|      (15,[1],[1.0])|
|109736|      (15,[1],[1.0])|
|174903|      (15,[0],[1.0])|
|167044|      (15,[1],[1.0])|
|105359|(15,[1,11],[1.0,1...|
|110759|      (15,[1],[1.0])|
|148836|      (15,[0],[1.0])|
|142292|(15,[1,11],[1.0,1...|
|177479|      (15,[1],[1.0])|
|172483|      (15,[1],[1.0])|
| 92210|      (15,[1],[1.0])|
|177081|      (15,[1],[1.0])|
|178473|      (15,[0],[1.0])|
|102517|      (15,[1],[1.0])|
|184021|      (15,[1],[1.0])|
|154360|      (15,[1],[1.0])|
| 73492|      (15,[1],[1.0])|
|154510|  

                                                                                

In [167]:
selected_columns

['dep_',
 'dep_Acting',
 'dep_Actors',
 'dep_Art',
 'dep_Camera',
 'dep_Costume & Make-Up',
 'dep_Creator',
 'dep_Crew',
 'dep_Directing',
 'dep_Editing',
 'dep_Lighting',
 'dep_Production',
 'dep_Sound',
 'dep_Visual Effects',
 'dep_Writing']

In [133]:
casts= content.withColumn('cast',
                   get_json_object('tmdb_credits', '$.cast')).select('id','cast')
casts = casts.filter(casts['cast'].isNotNull()).withColumn('departments', F.udf(lambda x: [i['known_for_department'] for i in json.loads(x)]) ('cast')).select('id', 'departments')
casts.select(split(col('departments'), ',')).show()
# casts_list = [x[0] for x in casts.select(explode("departments").alias("all_departments")).distinct().orderBy("all_departments").collect()]
# casts_list

[Stage 116:>                                                        (0 + 1) / 1]

+------+--------------------+
|    id|         departments|
+------+--------------------+
|191689|[Acting, Acting, ...|
|147752|                  []|
|154984|[Acting, Acting, ...|
|132725|[Acting, Acting, ...|
|174751|[Acting, Acting, ...|
|127419|[Acting, Acting, ...|
|136824|                  []|
| 25989|[Acting, Acting, ...|
|156027|[Acting, Acting, ...|
|160325|[Acting, Acting, ...|
|  2477|[Acting, Acting, ...|
|186437|[Acting, Acting, ...|
| 65986|[Acting, Acting, ...|
|140355|            [Acting]|
|109736|[Acting, Acting, ...|
|174903|                  []|
|167044|            [Acting]|
|105359|[Acting, Acting, ...|
|110759|[Acting, Acting, ...|
|148836|                  []|
+------+--------------------+
only showing top 20 rows



                                                                                

In [134]:
casts.schema

StructType([StructField('id', LongType(), True), StructField('departments', StringType(), True)])