In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, trim, col, regexp_replace

In [2]:
# initialize SparkSession
spark = SparkSession.builder \
    .appName("Streaming Platform Analyze") \
    .getOrCreate()

24/11/12 14:54:26 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


### Data Preparation

In [3]:
# paths to CSVs
csv_apple_data = "gs://raw-platform-data/apple_data.csv"
csv_netflix_data = "gs://raw-platform-data/netflix_data.csv"
csv_hbo_data = "gs://raw-platform-data/hbo_data.csv"
csv_amazon_data = "gs://raw-platform-data/amazon_data.csv"

In [4]:
# function to load a CSV into df
def load_csv_to_df(file_path):
    try:
        df = spark.read.format("csv") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .option("quote", '"') \
            .option("escape", '"') \
            .load(file_path)
        return df
    except Exception as e:
        print(f"Error reading file {file_path}: {e}")
        return None

In [5]:
# load data to df
df_apple = load_csv_to_df(csv_apple_data)
df_netflix = load_csv_to_df(csv_netflix_data)
df_hbo = load_csv_to_df(csv_hbo_data)
df_amazon = load_csv_to_df(csv_amazon_data)

                                                                                

In [6]:
df_apple = df_apple.withColumn("platform", lit("Apple TV+"))
df_netflix = df_netflix.withColumn("platform", lit("Netflix"))
df_hbo = df_hbo.withColumn("platform", lit("HBO Max"))
df_amazon = df_amazon.withColumn("platform", lit("Amazon Prime"))

In [7]:
combined_df = df_netflix.unionByName(df_apple) \
                           .unionByName(df_hbo) \
                           .unionByName(df_amazon)

### Data exploration and cleaning

In [8]:
combined_df.show(10)

+--------------------+-----+--------------------+-----------+---------+-----------------+------------+--------------------+--------+
|               title| type|              genres|releaseYear|   imdbId|imdbAverageRating|imdbNumVotes|  availableCountries|platform|
+--------------------+-----+--------------------+-----------+---------+-----------------+------------+--------------------+--------+
|        Forrest Gump|movie|      Drama, Romance|       1994|tt0109830|              8.8|     2316975|                  MX| Netflix|
|   The Fifth Element|movie|Action, Adventure...|       1997|tt0119116|              7.6|      517225|          AT, CH, DE| Netflix|
|   Kill Bill: Vol. 1|movie|Action, Crime, Th...|       2003|tt0266697|              8.2|     1222077|AE, AL, AO, AT, A...| Netflix|
|             Jarhead|movie|Biography, Drama,...|       2005|tt0418763|              7.0|      211593|AD, AE, AG, AL, A...| Netflix|
|          Unforgiven|movie|      Drama, Western|       1992|tt010569

In [9]:
combined_df.printSchema()

root
 |-- title: string (nullable = true)
 |-- type: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- releaseYear: integer (nullable = true)
 |-- imdbId: string (nullable = true)
 |-- imdbAverageRating: double (nullable = true)
 |-- imdbNumVotes: integer (nullable = true)
 |-- availableCountries: string (nullable = true)
 |-- platform: string (nullable = false)



In [10]:
# convert releaseYear to int and imdbAverageRating to double
combined_df = combined_df.withColumn("releaseYear", col("releaseYear").cast("int")) \
                            .withColumn("imdbAverageRating", col("imdbAverageRating").cast("double"))

In [11]:
combined_df.select('type').distinct().show()



+-----+
| type|
+-----+
|   tv|
|movie|
+-----+



                                                                                

In [12]:
# verifing rows with "'Tis Time for" in the title for specific movie 
# to confirm quoted fields were handled correctly during csv parsing

# combined_df.filter(trim(combined_df['title']) == '"" Princess"').show()
combined_df.filter(combined_df["title"].like("%'Tis Time for%")).show()

                                                                                

+--------------------+----+--------------------+-----------+----------+-----------------+------------+--------------------+------------+
|               title|type|              genres|releaseYear|    imdbId|imdbAverageRating|imdbNumVotes|  availableCountries|    platform|
+--------------------+----+--------------------+-----------+----------+-----------------+------------+--------------------+------------+
|'Tis Time for "To...|  tv|Animation, Comedy...|       2024|tt30643998|              6.4|         221|HK, ID, IN, JP, M...|     Netflix|
|'Tis Time for "To...|  tv|Animation, Comedy...|       2024|tt30643998|              6.4|         221|                  JP|Amazon Prime|
+--------------------+----+--------------------+-----------+----------+-----------------+------------+--------------------+------------+



In [13]:
# replace '&' with ',' in the genres column
combined_df = combined_df.withColumn("genres", regexp_replace("genres", "&", ","))
combined_df.select("genres").distinct().show()



+--------------------+
|              genres|
+--------------------+
|Animation, Comedy...|
|Documentary, Hist...|
|       Family, Short|
|Drama, Musical, R...|
|Drama, Romance, T...|
|   Documentary, News|
|      Music, Romance|
|Drama, Family, Th...|
|      Musical, Short|
|Adventure, Horror...|
|Drama, Fantasy, M...|
|Romance, Family, ...|
|  Family, Reality-TV|
|      Family, Sci-Fi|
|Adventure, Family...|
|Drama, Sci-Fi , F...|
|Comedy, Music, Mu...|
| Crime, Drama, Sport|
|Comedy, Horror, T...|
|Comedy, Family, M...|
+--------------------+
only showing top 20 rows



                                                                                

In [14]:
# validating columns imdbAverageRating and releaseYear
invalid_imdb_rating = combined_df.filter((combined_df["imdbAverageRating"] < 0) | (combined_df["imdbAverageRating"] > 10))
invalid_imdb_rating.show()

                                                                                

+-----+----+------+-----------+------+-----------------+------------+------------------+--------+
|title|type|genres|releaseYear|imdbId|imdbAverageRating|imdbNumVotes|availableCountries|platform|
+-----+----+------+-----------+------+-----------------+------------+------------------+--------+
+-----+----+------+-----------+------+-----------------+------------+------------------+--------+



In [15]:
invalid_year = combined_df.filter((combined_df["releaseYear"] < 1900) | (combined_df["releaseYear"].isNull()))
# invalid_year.count() # 219
invalid_year.show()



+--------------------+-----+--------------------+-----------+------+-----------------+------------+--------------------+---------+
|               title| type|              genres|releaseYear|imdbId|imdbAverageRating|imdbNumVotes|  availableCountries| platform|
+--------------------+-----+--------------------+-----------+------+-----------------+------------+--------------------+---------+
|Rudra: Secret of ...|movie|                NULL|       NULL|  NULL|             NULL|        NULL|AE, AG, AL, AO, A...|  Netflix|
|  Ahead of the Curve|movie|                NULL|       NULL|  NULL|             NULL|        NULL|AU, CA, FJ, GB, G...|  Netflix|
|                NULL|   tv|                NULL|       NULL|  NULL|             NULL|        NULL|          PH, SG, TW|  Netflix|
|                NULL|   tv|Drama, Crime, Mys...|       NULL|  NULL|             NULL|        NULL|                  CA|  Netflix|
|                NULL|   tv|           Animation|       NULL|  NULL|             NU

                                                                                

In [16]:
# drop rows where all specified columns are null, because they don't give us enough information
# columns_to_drop = ["genres", "releaseYear", "imdbId", "imdbAverageRating"]
# cleaned_df = combined_df.dropna(subset=columns_to_drop, how="all")

In [17]:
# split combined_df into two datasets based on the 'type' column
movies_df = combined_df.filter(combined_df["type"] == "movie")
tv_df = combined_df.filter(combined_df["type"] == "tv")

In [18]:
# drop type column
movies_df = movies_df.drop("type")
tv_df = tv_df.drop("type")

In [19]:
movies_df.show()

+--------------------+--------------------+-----------+---------+-----------------+------------+--------------------+--------+
|               title|              genres|releaseYear|   imdbId|imdbAverageRating|imdbNumVotes|  availableCountries|platform|
+--------------------+--------------------+-----------+---------+-----------------+------------+--------------------+--------+
|        Forrest Gump|      Drama, Romance|       1994|tt0109830|              8.8|     2316975|                  MX| Netflix|
|   The Fifth Element|Action, Adventure...|       1997|tt0119116|              7.6|      517225|          AT, CH, DE| Netflix|
|   Kill Bill: Vol. 1|Action, Crime, Th...|       2003|tt0266697|              8.2|     1222077|AE, AL, AO, AT, A...| Netflix|
|             Jarhead|Biography, Drama,...|       2005|tt0418763|              7.0|      211593|AD, AE, AG, AL, A...| Netflix|
|          Unforgiven|      Drama, Western|       1992|tt0105695|              8.2|      443878|AU, BA, BG, CZ,

In [20]:
tv_df.show()

+--------------------+--------------------+-----------+---------+-----------------+------------+--------------------+--------+
|               title|              genres|releaseYear|   imdbId|imdbAverageRating|imdbNumVotes|  availableCountries|platform|
+--------------------+--------------------+-----------+---------+-----------------+------------+--------------------+--------+
|                NULL|              Comedy|       1996|     NULL|             NULL|        NULL|          HK, JP, TW| Netflix|
|    Sex and the City|Comedy, Drama, Ro...|       1998|tt0159206|              7.3|      148475|AG, AR, BA, BB, B...| Netflix|
|             7 vidas|              Comedy|       1999|tt0192877|              7.1|        1537|                  ES| Netflix|
|              Martin|Comedy, Drama, Ro...|       1992|tt0103488|              7.5|       11234|                  US| Netflix|
|            The Game|Comedy, Drama, Ro...|       2006|tt0772137|              6.8|        4892|               

[Stage 24:>                                                         (0 + 1) / 1]                                                                                

### Data analysis and insights