In [113]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('basics').getOrCreate()

In [114]:
df_dataset1 = spark.read.csv('data/tmdb_5000_movies_fixed1_fix.csv')

In [115]:
# Optionally, rename columns for better clarity
df_dataset1 = df_dataset1.withColumnRenamed("_c0", "budget") \
       .withColumnRenamed("_c1", "id") \
       .withColumnRenamed("_c2", "original_language") \
       .withColumnRenamed("_c3", "popularity") \
       .withColumnRenamed("_c4", "revenue") \
       .withColumnRenamed("_c5", "runtime") \
       .withColumnRenamed("_c6", "status") \
       .withColumnRenamed("_c7", "title") \
       .withColumnRenamed("_c8", "vote_average") \
       .withColumnRenamed("_c9", "vote_count") \
       .withColumnRenamed("_c10", "genre") \
       .withColumnRenamed("_c11", "release_date") \
       .withColumnRenamed("_c12", "production_country")

In [116]:
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import IntegerType, DoubleType, DateType

# Convert columns to the desired data types
df_dataset1 = df_dataset1.withColumn("budget", df_dataset1["budget"].cast(IntegerType()))
df_dataset1 = df_dataset1.withColumn("id", df_dataset1["id"].cast(IntegerType()))
df_dataset1 = df_dataset1.withColumn("popularity", df_dataset1["popularity"].cast(DoubleType()))
df_dataset1 = df_dataset1.withColumn("revenue", df_dataset1["revenue"].cast(IntegerType()))
df_dataset1 = df_dataset1.withColumn("runtime", df_dataset1["runtime"].cast(IntegerType()))
df_dataset1 = df_dataset1.withColumn("vote_average", df_dataset1["vote_average"].cast(DoubleType()))
df_dataset1 = df_dataset1.withColumn("vote_count", df_dataset1["vote_count"].cast(IntegerType()))
df_dataset1 = df_dataset1.withColumn("release_date", to_date(df_dataset1["release_date"], "yyyy-MM-dd"))

# Print the updated schema
df_dataset1.printSchema()

root
 |-- budget: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- original_language: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- revenue: integer (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: integer (nullable = true)
 |-- genre: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- production_country: string (nullable = true)



In [117]:
# Filter DataFrame to include only rows with release_date after '2005-01-01'
df_dataset1_clean = df_dataset1.filter(col("release_date") > '2005-01-01')

# Describe the release_date column of the cleaned DataFrame
df_dataset1_clean.select("release_date").describe().show()

# Show the cleaned DataFrame
df_dataset1_clean.describe().show(2)

+-------+
|summary|
+-------+
|  count|
|   mean|
| stddev|
|    min|
|    max|
+-------+

+-------+-------------------+-----------------+-----------------+------------------+-------------------+------------+------+-------+-----------------+-----------------+-----+------------------+
|summary|             budget|               id|original_language|        popularity|            revenue|     runtime|status|  title|     vote_average|       vote_count|genre|production_country|
+-------+-------------------+-----------------+-----------------+------------------+-------------------+------------+------+-------+-----------------+-----------------+-----+------------------+
|  count|               2562|             2562|             2562|              2562|               2561|        2560|  2562|   2562|             2562|             2562| 2543|              2562|
|   mean|3.253420255542545E7|93351.39071038252|             null|24.567019620999194|9.112642305583756E7|104.64296875|  null|960.625|5

In [118]:
# Filter DataFrame to drop rows where budget or revenue is less than 50000
df_dataset1_clean = df_dataset1_clean.filter((col("budget") >= 50000) & (col("revenue") >= 50000))

# Describe the budget column of the cleaned DataFrame
df_dataset1_clean.select("budget").describe().show()
df_dataset1_clean.select("revenue").describe().show()

+-------+--------------------+
|summary|              budget|
+-------+--------------------+
|  count|                1649|
|   mean| 4.827350042146756E7|
| stddev|5.1542107119679816E7|
|    min|               65000|
|    max|           380000000|
+-------+--------------------+

+-------+-------------------+
|summary|            revenue|
+-------+-------------------+
|  count|               1649|
|   mean|1.400165642953305E8|
| stddev|2.054395901950808E8|
|    min|              53086|
|    max|         1519557910|
+-------+-------------------+



In [119]:
# Specify columns to drop
columns_to_drop = ["id", "original_language", "status", "title"]

# Drop the specified columns
df_dataset1_clean = df_dataset1_clean.drop(*columns_to_drop)

# Show the updated DataFrame
df_dataset1_clean.show()

+---------+----------+----------+-------+------------+----------+---------------+------------+------------------+
|   budget|popularity|   revenue|runtime|vote_average|vote_count|          genre|release_date|production_country|
+---------+----------+----------+-------+------------+----------+---------------+------------+------------------+
|300000000|139.082615| 961000000|    169|         6.9|      4500|         Action|  2007-05-19|      united_state|
|245000000|107.376788| 880674609|    148|         6.3|      4466|         Action|  2015-10-26|    united_kingdom|
|250000000| 112.31295|1084939099|    165|         7.6|      9106|         Action|  2012-07-16|      united_state|
|260000000| 43.926995| 284139100|    132|         6.1|      2124|         Action|  2012-03-07|               USA|
|260000000| 48.681969| 591794936|    100|         7.4|      3330|      Animation|  2010-11-24|      united_state|
|280000000|134.279229|1405403694|    141|         7.3|      6767|         Action|  2015-

In [120]:
# Check unique values in the 'production_country' column
unique_values = df_dataset1_clean.select('production_country').distinct()

# Show unique values
unique_values.show()

+------------------+
|production_country|
+------------------+
|           finland|
|            greece|
|            israel|
|             #REF!|
|    czech republic|
|             malta|
|             japan|
|           austria|
|        luxembourg|
|          thailand|
|             india|
|          hongkong|
|         australia|
|       united_arab|
|         indonesia|
|       new_zealand|
|            brazil|
|           jamaica|
|       netherlands|
|      south_africa|
+------------------+
only showing top 20 rows



In [121]:
from pyspark.sql.functions import when

# Calculate the mode of the production_country column
mode_production_country = df_dataset1_clean.select('production_country') \
                                           .groupBy('production_country') \
                                           .count() \
                                           .orderBy(col('count').desc()) \
                                           .limit(1) \
                                           .collect()[0][0]

# Replace #REF! with the mode
df_dataset1_clean = df_dataset1_clean.withColumn('production_country', 
                                                 when(col('production_country') == '#REF!', mode_production_country)
                                                 .otherwise(col('production_country')))

# Check unique values in the 'production_country' column
unique_values = df_dataset1_clean.select('production_country').distinct()

# Show unique values
unique_values.show(70)

+------------------+
|production_country|
+------------------+
|           finland|
|            greece|
|            israel|
|    czech republic|
|             malta|
|             japan|
|           austria|
|        luxembourg|
|          thailand|
|             india|
|          hongkong|
|         australia|
|       united_arab|
|         indonesia|
|       new_zealand|
|            brazil|
|           jamaica|
|       netherlands|
|      south_africa|
|            canada|
|            france|
|        swizerland|
|           iceland|
|    united_kingdom|
|         argentina|
|               USA|
|       south_korea|
|           germany|
|            russia|
|             italy|
|           ireland|
|           belgium|
|           romania|
|            norway|
|           denmark|
|           bahamas|
|       philippines|
|             china|
|          bulgaria|
|      united_state|
|           hungary|
|             spain|
|            mexico|
+------------------+



In [122]:
from pyspark.sql.functions import col

# Filter rows with any NaN values
rows_with_nan = df_dataset1_clean.filter(
    (col("budget").isNull()) |
    (col("revenue").isNull()) |
    (col("runtime").isNull()) |
    (col("vote_average").isNull()) |
    (col("vote_count").isNull()) |
    (col("release_date").isNull()) |
    (col("production_country").isNull()) |
    (col("genre").isNull())
)

# Show the rows with NaN values
rows_with_nan.show()


+------+----------+-------+-------+------------+----------+-----+------------+------------------+
|budget|popularity|revenue|runtime|vote_average|vote_count|genre|release_date|production_country|
+------+----------+-------+-------+------------+----------+-----+------------+------------------+
+------+----------+-------+-------+------------+----------+-----+------------+------------------+



In [123]:
# Replace "USA" with "united_state" in the production_country column
df_dataset1_clean = df_dataset1_clean.withColumn("production_country",
                                                 when(df_dataset1_clean["production_country"] == "USA", "united_state")
                                                 .otherwise(df_dataset1_clean["production_country"]))

# Show unique values in the production_country column
df_dataset1_clean.select("production_country").distinct().show(70)

+------------------+
|production_country|
+------------------+
|           finland|
|            greece|
|            israel|
|    czech republic|
|             malta|
|             japan|
|           austria|
|        luxembourg|
|          thailand|
|             india|
|          hongkong|
|         australia|
|       united_arab|
|         indonesia|
|       new_zealand|
|            brazil|
|           jamaica|
|       netherlands|
|      south_africa|
|            canada|
|            france|
|        swizerland|
|           iceland|
|    united_kingdom|
|         argentina|
|       south_korea|
|           germany|
|            russia|
|             italy|
|           ireland|
|           belgium|
|           romania|
|            norway|
|           denmark|
|           bahamas|
|       philippines|
|             china|
|          bulgaria|
|      united_state|
|           hungary|
|             spain|
|            mexico|
+------------------+



## get ouliners 

In [124]:
from pyspark.sql.window import Window
from pyspark.sql.functions import stddev, mean

# Calculate the mean and standard deviation of the column
mean_val = df_dataset1_clean.select(mean(col("budget"))).first()[0]
stddev_val = df_dataset1_clean.select(stddev(col("budget"))).first()[0]

# Set the threshold for outliers (e.g., 3 standard deviations from the mean)
threshold = 3

# Calculate Z-score for each row
df_dataset1_clean = df_dataset1_clean.withColumn("z_score", (col("budget") - mean_val) / stddev_val)

# Filter rows with Z-score greater than the threshold (outliers)
outliers = df_dataset1_clean.filter(col("z_score") > threshold)

outliers.show()

+---------+----------+----------+-------+------------+----------+---------------+------------+------------------+------------------+
|   budget|popularity|   revenue|runtime|vote_average|vote_count|          genre|release_date|production_country|           z_score|
+---------+----------+----------+-------+------------+----------+---------------+------------+------------------+------------------+
|300000000|139.082615| 961000000|    169|         6.9|      4500|         Action|  2007-05-19|      united_state| 4.883900050768747|
|245000000|107.376788| 880674609|    148|         6.3|      4466|         Action|  2015-10-26|    united_kingdom|3.8168113523519165|
|250000000| 112.31295|1084939099|    165|         7.6|      9106|         Action|  2012-07-16|      united_state|3.9138194158443556|
|260000000| 43.926995| 284139100|    132|         6.1|      2124|         Action|  2012-03-07|      united_state| 4.107835542829234|
|260000000| 48.681969| 591794936|    100|         7.4|      3330|    

In [125]:
# Calculate the mean and standard deviation of the column
mean_val = df_dataset1_clean.select(mean(col("revenue"))).first()[0]
stddev_val = df_dataset1_clean.select(stddev(col("revenue"))).first()[0]

# Set the threshold for outliers (e.g., 3 standard deviations from the mean)
threshold = 3

# Calculate Z-score for each row
df_dataset1_clean = df_dataset1_clean.withColumn("z_score", (col("revenue") - mean_val) / stddev_val)

# Filter rows with Z-score greater than the threshold (outliers)
outliers = df_dataset1_clean.filter(col("z_score") > threshold)

outliers.show()


+---------+----------+----------+-------+------------+----------+---------------+------------+------------------+------------------+
|   budget|popularity|   revenue|runtime|vote_average|vote_count|          genre|release_date|production_country|           z_score|
+---------+----------+----------+-------+------------+----------+---------------+------------+------------------+------------------+
|300000000|139.082615| 961000000|    169|         6.9|      4500|         Action|  2007-05-19|      united_state| 3.996227966211781|
|245000000|107.376788| 880674609|    148|         6.3|      4466|         Action|  2015-10-26|    united_kingdom|3.6052352129468197|
|250000000| 112.31295|1084939099|    165|         7.6|      9106|         Action|  2012-07-16|      united_state| 4.599515282363017|
|280000000|134.279229|1405403694|    141|         7.3|      6767|         Action|  2015-04-22|      united_state| 6.159412255948751|
|250000000| 98.885637| 933959197|    153|         7.4|      5293|    

In [127]:
df_dataset1_clean = df_dataset1_clean.drop("z_score")
df_dataset1_clean.show()

+---------+----------+----------+-------+------------+----------+---------------+------------+------------------+----------+
|   budget|popularity|   revenue|runtime|vote_average|vote_count|          genre|release_date|production_country|    profit|
+---------+----------+----------+-------+------------+----------+---------------+------------+------------------+----------+
|300000000|139.082615| 961000000|    169|         6.9|      4500|         Action|  2007-05-19|      united_state| 661000000|
|245000000|107.376788| 880674609|    148|         6.3|      4466|         Action|  2015-10-26|    united_kingdom| 635674609|
|250000000| 112.31295|1084939099|    165|         7.6|      9106|         Action|  2012-07-16|      united_state| 834939099|
|260000000| 43.926995| 284139100|    132|         6.1|      2124|         Action|  2012-03-07|      united_state|  24139100|
|260000000| 48.681969| 591794936|    100|         7.4|      3330|      Animation|  2010-11-24|      united_state| 331794936|


## create columns

In [126]:
# Add a new column 'profit' to the DataFrame
df_dataset1_clean = df_dataset1_clean.withColumn("profit", col("revenue") - col("budget"))

# Show the DataFrame
df_dataset1_clean.show()

+---------+----------+----------+-------+------------+----------+---------------+------------+------------------+--------------------+----------+
|   budget|popularity|   revenue|runtime|vote_average|vote_count|          genre|release_date|production_country|             z_score|    profit|
+---------+----------+----------+-------+------------+----------+---------------+------------+------------------+--------------------+----------+
|300000000|139.082615| 961000000|    169|         6.9|      4500|         Action|  2007-05-19|      united_state|   3.996227966211781| 661000000|
|245000000|107.376788| 880674609|    148|         6.3|      4466|         Action|  2015-10-26|    united_kingdom|  3.6052352129468197| 635674609|
|250000000| 112.31295|1084939099|    165|         7.6|      9106|         Action|  2012-07-16|      united_state|   4.599515282363017| 834939099|
|260000000| 43.926995| 284139100|    132|         6.1|      2124|         Action|  2012-03-07|      united_state|  0.7015324

## second dataset

In [103]:
df_dataset2 = spark.read.csv('data/IMDB Top 250 Movies.csv')

In [104]:
df_dataset2 = df_dataset2.withColumnRenamed("_c0", "rank") \
       .withColumnRenamed("_c1", "title") \
       .withColumnRenamed("_c2", "release_date") \
       .withColumnRenamed("_c3", "vote_average") \
       .withColumnRenamed("_c4", "genre") \
       .withColumnRenamed("_c5", "certificate") \
       .withColumnRenamed("_c6", "time") \
       .withColumnRenamed("_c7", "production_country") \
       .withColumnRenamed("_c8", "tagline") \
       .withColumnRenamed("_c9", "budget") \
       .withColumnRenamed("_c10", "revenue") \
       .withColumnRenamed("_c11", "casts") \
       .withColumnRenamed("_c12", "directors")\
       .withColumnRenamed("_c13", "writers")

# Drop specified columns
columns_to_drop = ["rank", "title", "certificate", "tagline", "casts", "directors", "writers",'_c14','_c15','_c16','_c17','_c18','_c19','_c20','_c21','_c22','_c23','_c24']
df_dataset2_clean = df_dataset2.drop(*columns_to_drop)

# Remove the first row
df_dataset2_clean = df_dataset2_clean.filter(df_dataset2_clean["release_date"] != "year")

# Show the updated DataFrame
df_dataset2_clean.show()

+------------+------------+---------+------+------------------+-------------+-------------+
|release_date|vote_average|    genre|  time|production_country|       budget|      revenue|
+------------+------------+---------+------+------------------+-------------+-------------+
|  16/10/2022|         8.3|   Action|2h 10m|               USA|    170000000|   1488732821|
|  30/06/2021|         8.2|   Action|2h 28m|               USA|    200000000|   1921847111|
|  24/06/2021|         8.8|    Crime|2h 44m|               USA|Not Available|Not Available|
|  18/12/2020|         8.4|Biography|2h 40m|               USA|Not Available|Not Available|
|  10/10/2020|         8.2|    Drama|1h 37m|               USA|      6000000|     24427162|
|  04/10/2019|         8.5|    Drama|2h 12m|      south korean|     11400000|    262676096|
|  15/03/2019|         8.4|    Crime| 2h 2m|               USA|     55000000|   1074458282|
|  18/06/2019|         8.4|   Action| 3h 1m|               USA|    356000000|   

In [105]:
from pyspark.sql.functions import regexp_extract
# Convert columns to the desired data types
df_dataset2_clean = df_dataset2_clean.withColumn("budget", df_dataset2_clean["budget"].cast(IntegerType()))
df_dataset2_clean = df_dataset2_clean.withColumn("revenue", df_dataset2_clean["revenue"].cast(IntegerType()))

# Extract numerical values from the "runtime" column
# Extract hours and minutes separately
hours = regexp_extract(df_dataset2_clean["time"], r"(\d+)h", 1)
minutes = regexp_extract(df_dataset2_clean["time"], r"(\d+)m", 1)

# Convert hours and minutes to minutes and sum them up
total_minutes = hours * 60 + minutes

# Add a new column with the total runtime in minutes
df_dataset2_clean = df_dataset2_clean.withColumn("runtime", total_minutes)

# Convert the extracted values to integers
df_dataset2_clean = df_dataset2_clean.withColumn("runtime", df_dataset2_clean["runtime"].cast(IntegerType()))

df_dataset2_clean = df_dataset2_clean.withColumn("vote_average", df_dataset2_clean["vote_average"].cast(DoubleType()))
df_dataset2_clean = df_dataset2_clean.withColumn("release_date", to_date(df_dataset2_clean["release_date"], "dd/MM/yyyy"))

# Show the resulting DataFrame
df_dataset2_clean.show()

+------------+------------+---------+------+------------------+---------+----------+-------+
|release_date|vote_average|    genre|  time|production_country|   budget|   revenue|runtime|
+------------+------------+---------+------+------------------+---------+----------+-------+
|  2022-10-16|         8.3|   Action|2h 10m|               USA|170000000|1488732821|    130|
|  2021-06-30|         8.2|   Action|2h 28m|               USA|200000000|1921847111|    148|
|  2021-06-24|         8.8|    Crime|2h 44m|               USA|     null|      null|    164|
|  2020-12-18|         8.4|Biography|2h 40m|               USA|     null|      null|    160|
|  2020-10-10|         8.2|    Drama|1h 37m|               USA|  6000000|  24427162|     97|
|  2019-10-04|         8.5|    Drama|2h 12m|      south korean| 11400000| 262676096|    132|
|  2019-03-15|         8.4|    Crime| 2h 2m|               USA| 55000000|1074458282|    122|
|  2019-06-18|         8.4|   Action| 3h 1m|               USA|3560000

In [107]:
from pyspark.sql.functions import to_date

# Filter rows based on the "release_date" column
df_dataset2_clean = df_dataset2_clean = df_dataset2_clean.filter(col("release_date") > '2015-01-01')

# drop time
df_dataset2_clean = df_dataset2_clean.drop("time")

df_dataset2_clean.show()


+------------+------------+---------+------------------+---------+----------+-------+
|release_date|vote_average|    genre|production_country|   budget|   revenue|runtime|
+------------+------------+---------+------------------+---------+----------+-------+
|  2022-10-16|         8.3|   Action|               USA|170000000|1488732821|    130|
|  2021-06-30|         8.2|   Action|               USA|200000000|1921847111|    148|
|  2021-06-24|         8.8|    Crime|               USA|     null|      null|    164|
|  2020-12-18|         8.4|Biography|               USA|     null|      null|    160|
|  2020-10-10|         8.2|    Drama|               USA|  6000000|  24427162|     97|
|  2019-10-04|         8.5|    Drama|      south korean| 11400000| 262676096|    132|
|  2019-03-15|         8.4|    Crime|               USA| 55000000|1074458282|    122|
|  2019-06-18|         8.4|   Action|               USA|356000000|      null|    181|
|  2019-03-03|         8.2|   Action|               US

In [109]:
#drop null
# Drop rows with null values in any column
df_dataset2_clean = df_dataset2_clean.dropna()

# Show the resulting DataFrame
df_dataset2_clean.show()

+------------+------------+---------+------------------+---------+----------+-------+
|release_date|vote_average|    genre|production_country|   budget|   revenue|runtime|
+------------+------------+---------+------------------+---------+----------+-------+
|  2022-10-16|         8.3|   Action|               USA|170000000|1488732821|    130|
|  2021-06-30|         8.2|   Action|               USA|200000000|1921847111|    148|
|  2020-10-10|         8.2|    Drama|               USA|  6000000|  24427162|     97|
|  2019-10-04|         8.5|    Drama|      south korean| 11400000| 262676096|    132|
|  2019-03-15|         8.4|    Crime|               USA| 55000000|1074458282|    122|
|  2019-03-03|         8.2|   Action|               USA| 95000000| 384479940|    119|
|  2019-06-30|         8.1|   Action|               USA| 97600000| 225508210|    152|
|  2018-03-03|         8.4|   Action|               USA|321000000|2052415039|    149|
|  2018-01-13|         8.2|Biography|               US

In [110]:
# Replace "USA" with "united_state" in the production_country column
df_dataset2_clean = df_dataset2_clean.withColumn("production_country", \
            when(df_dataset2_clean["production_country"] == "USA", "united_state") \
            .otherwise(df_dataset2_clean["production_country"]))

# Show the resulting DataFrame
df_dataset2_clean.show()

+------------+------------+---------+------------------+---------+----------+-------+
|release_date|vote_average|    genre|production_country|   budget|   revenue|runtime|
+------------+------------+---------+------------------+---------+----------+-------+
|  2022-10-16|         8.3|   Action|      united_state|170000000|1488732821|    130|
|  2021-06-30|         8.2|   Action|      united_state|200000000|1921847111|    148|
|  2020-10-10|         8.2|    Drama|      united_state|  6000000|  24427162|     97|
|  2019-10-04|         8.5|    Drama|      south korean| 11400000| 262676096|    132|
|  2019-03-15|         8.4|    Crime|      united_state| 55000000|1074458282|    122|
|  2019-03-03|         8.2|   Action|      united_state| 95000000| 384479940|    119|
|  2019-06-30|         8.1|   Action|      united_state| 97600000| 225508210|    152|
|  2018-03-03|         8.4|   Action|      united_state|321000000|2052415039|    149|
|  2018-01-13|         8.2|Biography|      united_stat

In [111]:
# Create a new column 'profit' based on 'revenue' and 'budget'
df_dataset2_clean = df_dataset2_clean.withColumn("profit", col("revenue") - col("budget"))

# Show the resulting DataFrame
df_dataset2_clean.show()

+------------+------------+---------+------------------+---------+----------+-------+----------+
|release_date|vote_average|    genre|production_country|   budget|   revenue|runtime|    profit|
+------------+------------+---------+------------------+---------+----------+-------+----------+
|  2022-10-16|         8.3|   Action|      united_state|170000000|1488732821|    130|1318732821|
|  2021-06-30|         8.2|   Action|      united_state|200000000|1921847111|    148|1721847111|
|  2020-10-10|         8.2|    Drama|      united_state|  6000000|  24427162|     97|  18427162|
|  2019-10-04|         8.5|    Drama|      south korean| 11400000| 262676096|    132| 251276096|
|  2019-03-15|         8.4|    Crime|      united_state| 55000000|1074458282|    122|1019458282|
|  2019-03-03|         8.2|   Action|      united_state| 95000000| 384479940|    119| 289479940|
|  2019-06-30|         8.1|   Action|      united_state| 97600000| 225508210|    152| 127908210|
|  2018-03-03|         8.4|   

In [131]:
df_dataset1_clean.printSchema()
df_dataset2_clean.printSchema()

root
 |-- budget: integer (nullable = true)
 |-- popularity: double (nullable = true)
 |-- revenue: integer (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: integer (nullable = true)
 |-- genre: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- production_country: string (nullable = true)
 |-- profit: integer (nullable = true)

root
 |-- release_date: date (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- genre: string (nullable = true)
 |-- production_country: string (nullable = true)
 |-- budget: integer (nullable = true)
 |-- revenue: integer (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- profit: integer (nullable = true)



In [137]:
from pyspark.sql.functions import lit

# Define the missing columns and their default values
missing_columns = ["popularity", "vote_count"]

# Add each missing column with default values to df_dataset2_clean
for column in missing_columns:
    df_dataset2_clean = df_dataset2_clean.withColumn(column, lit(None).cast(df_dataset1_clean.schema[column].dataType))

# Show the DataFrame with added columns
df_dataset2_clean.show()


+------------+------------+---------+------------------+---------+----------+-------+----------+----------+----------+
|release_date|vote_average|    genre|production_country|   budget|   revenue|runtime|    profit|popularity|vote_count|
+------------+------------+---------+------------------+---------+----------+-------+----------+----------+----------+
|  2022-10-16|         8.3|   Action|      united_state|170000000|1488732821|    130|1318732821|      null|      null|
|  2021-06-30|         8.2|   Action|      united_state|200000000|1921847111|    148|1721847111|      null|      null|
|  2020-10-10|         8.2|    Drama|      united_state|  6000000|  24427162|     97|  18427162|      null|      null|
|  2019-10-04|         8.5|    Drama|      south korean| 11400000| 262676096|    132| 251276096|      null|      null|
|  2019-03-15|         8.4|    Crime|      united_state| 55000000|1074458282|    122|1019458282|      null|      null|
|  2019-03-03|         8.2|   Action|      unite

In [141]:
# Define the column sequence from df_dataset1_clean
column_sequence = ["budget", "popularity", "revenue", "runtime", "vote_average", "vote_count", "genre", "release_date", "production_country", "profit"]

# Select columns in df_dataset2_clean in the order defined by column_sequence
df_dataset2_clean = df_dataset2_clean.select(*column_sequence)

# Show the DataFrame with swapped column sequences
df_dataset2_clean.show()

+---------+----------+----------+-------+------------+----------+---------+------------+------------------+----------+
|   budget|popularity|   revenue|runtime|vote_average|vote_count|    genre|release_date|production_country|    profit|
+---------+----------+----------+-------+------------+----------+---------+------------+------------------+----------+
|170000000|      null|1488732821|    130|         8.3|      null|   Action|  2022-10-16|      united_state|1318732821|
|200000000|      null|1921847111|    148|         8.2|      null|   Action|  2021-06-30|      united_state|1721847111|
|  6000000|      null|  24427162|     97|         8.2|      null|    Drama|  2020-10-10|      united_state|  18427162|
| 11400000|      null| 262676096|    132|         8.5|      null|    Drama|  2019-10-04|      south korean| 251276096|
| 55000000|      null|1074458282|    122|         8.4|      null|    Crime|  2019-03-15|      united_state|1019458282|
| 95000000|      null| 384479940|    119|       

### Merge two dataset

In [147]:
# Union the DataFrames
concatenated_df = df_dataset1_clean.union(df_dataset2_clean)

# Show the resulting DataFrame
concatenated_df.show()

+---------+----------+----------+-------+------------+----------+---------------+------------+------------------+----------+
|   budget|popularity|   revenue|runtime|vote_average|vote_count|          genre|release_date|production_country|    profit|
+---------+----------+----------+-------+------------+----------+---------------+------------+------------------+----------+
|300000000|139.082615| 961000000|    169|         6.9|      4500|         Action|  2007-05-19|      united_state| 661000000|
|245000000|107.376788| 880674609|    148|         6.3|      4466|         Action|  2015-10-26|    united_kingdom| 635674609|
|250000000| 112.31295|1084939099|    165|         7.6|      9106|         Action|  2012-07-16|      united_state| 834939099|
|260000000| 43.926995| 284139100|    132|         6.1|      2124|         Action|  2012-03-07|      united_state|  24139100|
|260000000| 48.681969| 591794936|    100|         7.4|      3330|      Animation|  2010-11-24|      united_state| 331794936|


In [151]:
# Get the list of column names
columns = concatenated_df.columns

# Iterate over each column
for column in columns:
    print(f"Statistics for column '{column}':")
    
    # Calculate and display statistics for the current column
    concatenated_df.describe(column).show()


Statistics for column 'budget':
+-------+-------------------+
|summary|             budget|
+-------+-------------------+
|  count|               1665|
|   mean|4.871892023723724E7|
| stddev|5.221135091224088E7|
|    min|              65000|
|    max|          380000000|
+-------+-------------------+

Statistics for column 'popularity':
+-------+-----------------+
|summary|       popularity|
+-------+-----------------+
|  count|             1649|
|   mean|34.66885861613099|
| stddev|44.62214029434688|
|    min|         0.041651|
|    max|       875.581305|
+-------+-----------------+

Statistics for column 'revenue':
+-------+--------------------+
|summary|             revenue|
+-------+--------------------+
|  count|                1665|
|   mean|1.4499308936936936E8|
| stddev| 2.197874552841609E8|
|    min|               53086|
|    max|          2052415039|
+-------+--------------------+

Statistics for column 'runtime':
+-------+------------------+
|summary|           runtime|
+---

In [152]:
from pyspark.sql.functions import col

# Check for null values in each column
for col_name in concatenated_df.columns:
    null_count = concatenated_df.where(col(col_name).isNull()).count()
    print(f"Number of null values in column '{col_name}': {null_count}")

Number of null values in column 'budget': 0
Number of null values in column 'popularity': 16
Number of null values in column 'revenue': 0
Number of null values in column 'runtime': 0
Number of null values in column 'vote_average': 0
Number of null values in column 'vote_count': 16
Number of null values in column 'genre': 0
Number of null values in column 'release_date': 0
Number of null values in column 'production_country': 0
Number of null values in column 'profit': 0


In [154]:
from pyspark.sql.functions import rand

# Fill null values with a random value sampled from the column distribution
df_data = concatenated_df.withColumn("popularity", when(concatenated_df["popularity"].isNull(), rand()).otherwise(concatenated_df["popularity"]))
df_data = df_data.withColumn("vote_count", when(concatenated_df["vote_count"].isNull(), rand()).otherwise(concatenated_df["vote_count"]))


# Check again for null values
for col_name in df_data.columns:
    null_count = df_data.where(col(col_name).isNull()).count()
    print(f"Number of null values in column '{col_name}': {null_count}")

Number of null values in column 'budget': 0
Number of null values in column 'popularity': 0
Number of null values in column 'revenue': 0
Number of null values in column 'runtime': 0
Number of null values in column 'vote_average': 0
Number of null values in column 'vote_count': 0
Number of null values in column 'genre': 0
Number of null values in column 'release_date': 0
Number of null values in column 'production_country': 0
Number of null values in column 'profit': 0


In [160]:
from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import col

# Define the number of bins
num_bins = 2

# Define the column to be discretized
input_col = 'popularity'

# Quantile-based discretization strategy
quantile_discretizer = Bucketizer(splits=[float("-inf"), 0.5, float("inf")], inputCol=input_col, outputCol=input_col+'_rank')

# Apply discretization to the DataFrame
df_data_discretized = quantile_discretizer.transform(df_data)

# Assign the discretized column back to the original DataFrame
df_data = df_data_discretized.withColumnRenamed(input_col+'_rank', 'popularity_rank')

df_data

IllegalArgumentException: requirement failed: Column popularity_rank already exists.

In [162]:
df_data.show()

+---------+----------+----------+-------+------------+----------+---------------+------------+------------------+----------+---------------+
|   budget|popularity|   revenue|runtime|vote_average|vote_count|          genre|release_date|production_country|    profit|popularity_rank|
+---------+----------+----------+-------+------------+----------+---------------+------------+------------------+----------+---------------+
|300000000|139.082615| 961000000|    169|         6.9|    4500.0|         Action|  2007-05-19|      united_state| 661000000|            1.0|
|245000000|107.376788| 880674609|    148|         6.3|    4466.0|         Action|  2015-10-26|    united_kingdom| 635674609|            1.0|
|250000000| 112.31295|1084939099|    165|         7.6|    9106.0|         Action|  2012-07-16|      united_state| 834939099|            1.0|
|260000000| 43.926995| 284139100|    132|         6.1|    2124.0|         Action|  2012-03-07|      united_state|  24139100|            1.0|
|260000000| 4

In [163]:
# Define the number of bins
num_bins = 3

# Define the column to be discretized
input_col = 'profit'

# Quantile-based discretization strategy
quantile_discretizer = Bucketizer(splits=[float("-inf"), 0.333, 0.666, float("inf")], inputCol=input_col, outputCol=input_col+'_bucket')

# Apply discretization to the DataFrame
df_data_discretized = quantile_discretizer.transform(df_data)

# Reverse the order of the bins
df_data_discretized = df_data_discretized.withColumn(input_col+'_bucket', (num_bins - 1 - col(input_col+'_bucket')).cast('integer'))

# Assign the discretized column back to the original DataFrame
df_data = df_data_discretized.withColumnRenamed(input_col+'_bucket', 'risk')

In [164]:
df_data.show()

+---------+----------+----------+-------+------------+----------+---------------+------------+------------------+----------+---------------+----+
|   budget|popularity|   revenue|runtime|vote_average|vote_count|          genre|release_date|production_country|    profit|popularity_rank|risk|
+---------+----------+----------+-------+------------+----------+---------------+------------+------------------+----------+---------------+----+
|300000000|139.082615| 961000000|    169|         6.9|    4500.0|         Action|  2007-05-19|      united_state| 661000000|            1.0|   0|
|245000000|107.376788| 880674609|    148|         6.3|    4466.0|         Action|  2015-10-26|    united_kingdom| 635674609|            1.0|   0|
|250000000| 112.31295|1084939099|    165|         7.6|    9106.0|         Action|  2012-07-16|      united_state| 834939099|            1.0|   0|
|260000000| 43.926995| 284139100|    132|         6.1|    2124.0|         Action|  2012-03-07|      united_state|  24139100|