In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, explode, when
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType

# Initialize Spark session
spark = SparkSession.builder.appName("FeatureEngineering").getOrCreate()

# Load credits.csv
credits_df = spark.read.csv("hdfs:///user/data/credits.csv", header=True, inferSchema=True)

# Define schema for parsing JSON in cast and crew columns
cast_schema = ArrayType(StructType([
    StructField("cast_id", IntegerType(), True),
    StructField("character", StringType(), True),
    StructField("name", StringType(), True)
]))

crew_schema = ArrayType(StructType([
    StructField("job", StringType(), True),
    StructField("name", StringType(), True)
]))

# Parse the JSON-like structure in the columns
credits_df = credits_df \
    .withColumn("cast", from_json(col("cast"), cast_schema)) \
    .withColumn("crew", from_json(col("crew"), crew_schema))

# Extract top 5 cast members (by name) as a list
credits_df = credits_df.withColumn("cast_names", col("cast.name").getItem(0)).drop("cast")

# Explode the crew array to get individual crew members as rows
crew_exploded_df = credits_df.withColumn("crew_member", explode(col("crew")))

# Filter for director and select relevant columns
credits_director_df = crew_exploded_df \
    .filter(col("crew_member.job") == "Director") \
    .select("id", "cast_names", col("crew_member.name").alias("director"))

credits_director_df.show()


[Stage 4:>                                                          (0 + 1) / 1]

+------+--------------------+------------------+
|    id|          cast_names|          director|
+------+--------------------+------------------+
| 16420|  Laurence Fishburne|     Oliver Parker|
| 31174|        Ian McKellen| Richard Loncraine|
| 48750|  Jean-Paul Belmondo|    Claude Lelouch|
| 46785|                NULL|      Jafar Panahi|
|188588|                NULL|      Henry Jaglom|
| 47475|     Ellen DeGeneres|       Nick Castle|
| 52856|                NULL|      Diane Keaton|
| 27985|                NULL|    Phillip Borsos|
| 43475|                NULL|        Art Clokey|
| 32631| Christopher Lambert|      J. F. Lawton|
| 17402|Sarah Jessica Parker|     David Frankel|
| 38722|                NULL|      Gregory Nava|
| 41478|     Kadeem Hardison| Mario Van Peebles|
|161495|          Peter Falk|       Peter Yates|
| 32502|                NULL|Wallace Wolodarsky|
|203119|                NULL|       Robert Wuhl|
|171857|   Nicholas Turturro|  Michael Corrente|
| 38129|     Martin 

                                                                                

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, explode, collect_list, concat_ws, regexp_replace
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType

# Initialize Spark session
spark = SparkSession.builder.appName("KeywordsFeatureEngineering").getOrCreate()

# Load keywords.csv
keywords_df = spark.read.csv("hdfs:///user/data/keywords.csv", header=True, inferSchema=True)

print(keywords_df)
keywords_df.show()


DataFrame[id: int, keywords: string]
+-----+--------------------+
|   id|            keywords|
+-----+--------------------+
|  862|['jealousy', 'toy...|
| 8844|"['board game', '...|
|15602|['fishing', 'best...|
|31357|['based on novel'...|
|11862|['baby', 'midlife...|
|  949|['robbery', 'dete...|
|11860|['paris', 'brothe...|
|45325|                  []|
| 9091|['terrorist', 'ho...|
|  710|['cuba', 'falsely...|
| 9087|['white house', '...|
|12110|['dracula', 'spoof']|
|21032|['wolf', 'dog-sle...|
|10858|['usa president',...|
| 1408|['exotic island',...|
|  524|['poker', 'drug a...|
| 4584|['bowling', 'base...|
|    5|"['hotel', ""new ...|
| 9273|['africa', 'indig...|
|11517|['brother brother...|
+-----+--------------------+
only showing top 20 rows



In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, collect_list, concat_ws, regexp_replace, split
from pyspark.sql.types import StringType

# Initialize Spark session
spark = SparkSession.builder.appName("KeywordsFeatureEngineering").getOrCreate()

# Load keywords.csv
keywords_df = spark.read.csv("hdfs:///user/data/keywords.csv", header=True, inferSchema=True)

# Display initial data to check its structure
print("Initial Data from keywords.csv:")
keywords_df.show(5, truncate=False)

# Clean up the `keywords` column by removing unwanted characters
keywords_df = keywords_df.withColumn(
    "keywords",
    regexp_replace(col("keywords"), r"^\[|\]$", "")  # Remove square brackets at start and end
)
keywords_df = keywords_df.withColumn(
    "keywords",
    regexp_replace(col("keywords"), r"'|\"", "")  # Remove single and double quotes
)

# Split the cleaned string into an array
keywords_df = keywords_df.withColumn("keywords", split(col("keywords"), ", "))

# Explode the keywords array to get each keyword in a separate row
keywords_exploded_df = keywords_df.withColumn("keyword_name", explode(col("keywords")))

# Select only movie_id and keyword name
keywords_exploded_df = keywords_exploded_df.select(
    col("id").alias("movie_id"),
    col("keyword_name")
)

# Check exploded keyword data
print("Exploded Keywords Data after cleaning:")
keywords_exploded_df.show(5, truncate=False)

# Group by movie_id and collect all keyword names into a list
keywords_aggregated_df = keywords_exploded_df.groupBy("movie_id").agg(
    collect_list("keyword_name").alias("keyword_names")
)

# Convert the array of keyword names to a comma-separated string
keywords_aggregated_df = keywords_aggregated_df.withColumn(
    "keyword_names", concat_ws(",", col("keyword_names"))
)

# Display final result for verification
print("Final Transformed Keywords Data:")
keywords_aggregated_df.show(5, truncate=False)

# Save the cleaned and transformed data to HDFS
output_path = "hdfs:///user/data/cleaned_keywords.csv"
keywords_aggregated_df.write.csv(output_path, header=True, mode="overwrite")

# Stop Spark session
spark.stop()


Initial Data from keywords.csv:
+-----+-------------------------------------------------------------------------------------------------------------------------------------------+
|id   |keywords                                                                                                                                   |
+-----+-------------------------------------------------------------------------------------------------------------------------------------------+
|862  |['jealousy', 'toy', 'boy', 'friendship', 'friends', 'rivalry', 'boy next door', 'new toy', 'toy comes to life']                            |
|8844 |"['board game', 'disappearance', ""based on children's book""                                                                              |
|15602|['fishing', 'best friend', 'duringcreditsstinger', 'old men']                                                                              |
|31357|['based on novel', 'interracial relationship', 'single mother', 'divorce'

                                                                                

+--------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|movie_id|keyword_names                                                                                                                                                                                                                                                                                                          |
+--------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|3       |salesclerk,helsinki,g

                                                                                

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col, array
from pyspark.ml.feature import CountVectorizer, IDF

# Initialize Spark session
spark = SparkSession.builder.appName("KeywordsFeatureEngineering").getOrCreate()

# Load cleaned keywords data
keywords_df = spark.read.csv("hdfs:///user/data/cleaned_keywords.csv", header=True, inferSchema=True)

# Split the comma-separated keyword names back into an array format
keywords_df = keywords_df.withColumn("keyword_names", split(col("keyword_names"), ","))

# Step 1: Remove NULL and empty keyword arrays
keywords_df = keywords_df.filter((col("keyword_names").isNotNull()) & (col("keyword_names") != array()))

# Step 2: Multi-Hot Encoding using CountVectorizer

# Use CountVectorizer to create multi-hot encoding
cv = CountVectorizer(inputCol="keyword_names", outputCol="keyword_features", binary=True)
cv_model = cv.fit(keywords_df)
multi_hot_keywords_df = cv_model.transform(keywords_df)

# Display the multi-hot encoded features
print("Multi-Hot Encoded Keywords Data:")
multi_hot_keywords_df.select("movie_id", "keyword_features").show(5, truncate=False)

# Step 3: TF-IDF Transformation

# Compute Term Frequency
tf_keywords_df = cv_model.transform(keywords_df)

# Apply IDF to get TF-IDF values
idf = IDF(inputCol="keyword_features", outputCol="tfidf_features")
idf_model = idf.fit(tf_keywords_df)
tfidf_keywords_df = idf_model.transform(tf_keywords_df)

# Display the TF-IDF features
print("TF-IDF Encoded Keywords Data:")
tfidf_keywords_df.select("movie_id", "tfidf_features").show(5, truncate=False)

# Save the transformed data if needed
tfidf_output_path = "hdfs:///user/data/tfidf_keywords.csv"


# Stop Spark session
spark.stop()


Multi-Hot Encoded Keywords Data:
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|movie_id|keyword_features                                                                                                                                                                                                                     |
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2       |(20107,[1,22,267,368,552,1495,2599],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])                                                                                                                                                                   |
|11

In [19]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, coalesce

# Initialize Spark session
spark = SparkSession.builder.appName("CombineLinks").getOrCreate()

# Load datasets
links_df = spark.read.csv("hdfs:///user/data/links.csv", header=True, inferSchema=True)
links_small_df = spark.read.csv("hdfs:///user/data/links_small.csv", header=True, inferSchema=True)

# Step 1: Handle Missing Values in `tmdbId`
# Replace null values in 'tmdbId' with -1 as placeholder, then cast to integer
links_df = links_df.fillna({"tmdbId": -1}).withColumn("tmdbId", col("tmdbId").cast("int"))
links_small_df = links_small_df.fillna({"tmdbId": -1}).withColumn("tmdbId", col("tmdbId").cast("int"))

# Rename `tmdbId` in `links_small_df` to avoid ambiguity during join
links_small_df = links_small_df.withColumnRenamed("tmdbId", "tmdbId_small")

# Step 2: Perform an Outer Join
# Use coalesce to combine `tmdbId` values from both datasets, prioritizing `tmdbId_small` if available
combined_links_df = links_df.join(
    links_small_df,
    on=["movieId", "imdbId"],
    how="outer"
).select(
    col("movieId"),
    col("imdbId"),
    coalesce(col("tmdbId_small"), col("tmdbId")).alias("tmdbId")
)

# Step 3: Remove placeholder values if needed (optional)
# If you don’t want placeholder -1 values in the final combined DataFrame, you can filter them out
combined_links_df = combined_links_df.filter(col("tmdbId") != -1)

# Step 4: Save the Combined Data
combined_output_path = "hdfs:///user/data/combined_links.csv"
combined_links_df.write.csv(combined_output_path, header=True, mode="overwrite")

# Show combined data for verification
combined_links_df.show(10)

# Stop Spark session
spark.stop()


                                                                                

+-------+------+------+
|movieId|imdbId|tmdbId|
+-------+------+------+
|      1|114709|   862|
|      2|113497|  8844|
|      3|113228| 15602|
|      4|114885| 31357|
|      5|113041| 11862|
|      6|113277|   949|
|      7|114319| 11860|
|      8|112302| 45325|
|      9|114576|  9091|
|     10|113189|   710|
+-------+------+------+
only showing top 10 rows

