In [149]:
# Python cell
file_path = "abfss://netflixanalysis@netflixanalysis.dfs.core.windows.net/netflix_movies_detailed_up_to_2025.csv"

df_movies = spark.read.option("header", True).option("inferSchema", True).csv(file_path)

# Show first few rows
df_movies.head()

StatementMeta(sparknetflix, 10, 2, Finished, Available, Finished)

Row(show_id='10192', type='Movie', title='Shrek Forever After', director='Mike Mitchell', cast='Mike Myers, Eddie Murphy, Cameron Diaz, Antonio Banderas, Walt Dohrn', country='United States of America', date_added='2010/5/16', release_year='2010', rating='6.38', duration=None, genres='Comedy, Adventure, Fantasy, Animation, Family', language='en', description="A bored and domesticated Shrek pacts with deal-maker Rumpelstiltskin to get back to feeling like a real ogre again, but when he's duped and sent to a twisted version of Far Far Away—where Rumpelstiltskin is king, ogres are hunted, and he and Fiona have never met—he sets out to restore his world and reclaim his true love.", popularity='203.893', vote_count='7449', vote_average='6.38', budget='165000000', revenue='752600867')

In [151]:
# Python cell
file_path = "abfss://netflixanalysis@netflixanalysis.dfs.core.windows.net/netflix_tv_shows_detailed_up_to_2025.csv"

df_tv = spark.read.option("header", True).option("enforceSchema", True).csv(file_path)

# Show first few rows
df_tv.head()

StatementMeta(sparknetflix, 10, 4, Finished, Available, Finished)

Row(show_id='32415', type='TV Show', title='Conan', director=None, cast="Conan O'Brien, Andy Richter", country='United States of America', date_added='2010/11/8', release_year='2010', rating='7.035', duration='1 Seasons', genres='Talk, Comedy, News', language='en', description="A late night television talk show hosted by  Conan O'Brien.", popularity='1670.58', vote_count='229', vote_average='7.035')

In [152]:
from pyspark.sql import functions as F

# 选取两个表共同字段（不包括 budget、revenue 等电影独有字段）
common_cols = [
    "show_id","country", "rating", "duration",
    "genres", "language","release_year","date_added","popularity"
]

# 给两个表打标签列
df_movies_labeled = df_movies.select(common_cols).withColumn("platform_type", F.lit("Movie"))
df_tv_labeled = df_tv.select(common_cols).withColumn("platform_type", F.lit("TV Show"))

# 合并两个表
df_all = df_movies_labeled.unionByName(df_tv_labeled)


StatementMeta(sparknetflix, 10, 5, Finished, Available, Finished)

In [153]:
df_all.head()

StatementMeta(sparknetflix, 10, 6, Finished, Available, Finished)

Row(show_id='10192', country='United States of America', rating='6.38', duration=None, genres='Comedy, Adventure, Fantasy, Animation, Family', language='en', release_year='2010', date_added='2010/5/16', popularity='203.893', platform_type='Movie')

In [159]:
from pyspark.sql import functions as F

df_clean = df_all \
    .withColumn("rating_clean", F.col("rating").cast("float")) \
    .withColumn("popularity_clean", F.col("popularity").cast("float")) \
    .withColumn("release_year_clean", F.col("release_year").cast("int")) \
    .filter(
        F.col("rating_clean").isNotNull() &
        F.col("popularity_clean").isNotNull() &
        F.col("release_year_clean").isNotNull()
    ) \
    .withColumn("rating", F.col("rating_clean")) \
    .withColumn("popularity", F.col("popularity_clean")) \
    .withColumn("release_year", F.col("release_year_clean")) \
    .drop("rating_clean", "popularity_clean", "release_year_clean") \
    .withColumn("country", F.regexp_replace(F.col("country"), r'[\r\n]+', ' ')) \
    .filter(F.col("country").isNotNull()) \
    .filter(F.col("language").isNotNull()) \
    .filter(F.col("platform_type").isNotNull()) \
    .select("show_id", "country", "rating", "duration", "genres", "language", "release_year", "date_added", "popularity", "platform_type")


StatementMeta(sparknetflix, 10, 12, Finished, Available, Finished)

In [160]:
# 保存为 CSV，避免换行冲突并正确转义
df_clean.write \
    .mode("overwrite") \
    .option("header", True) \
    .option("quote", '"') \
    .option("escape", '"') \
    .csv("abfss://netflixanalysis@netflixanalysis.dfs.core.windows.net/synapse/staging/a1_csv/")

StatementMeta(sparknetflix, 10, 13, Finished, Available, Finished)