<a href="https://colab.research.google.com/github/Melaniam123/amateurcoder/blob/main/Analysis_on_IMBDb.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# IMDB Movie Analysis using Pyspark

Group Assignment </br>
ALY 6110 Big Data - MPS Analytics </br>
Northeastern University </br>
March 23, 2023

#Introduction


Dataset - https://www.kaggle.com/datasets/rajugc/imdb-movies-dataset-based-on-genre/code

# Now that PySpark is installed, we will create a Spark session using the following:

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824028 sha256=954f2445fcce2c0e2f9ffa998bd6a6a1c5131518489441a52761f9cb38a8b7b2
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('MyApp').getOrCreate()

In [3]:
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.stat import Correlation
from pyspark.ml.stat import ChiSquareTest
from pyspark.ml.feature import StringIndexer

In [4]:
# Mount Google Drive in Google Colab. #Go to file and upload the csv "clean.csv"
from google.colab import drive
drive.mount('/content/drive')

# Create a Spark session
spark = SparkSession.builder.appName('clean').getOrCreate()

Mounted at /content/drive


In [5]:
# Load the dataset into a PySpark dataframe
df = spark.read.csv('/content/drive/MyDrive/clean.csv', header=True, multiLine=True, inferSchema=True).show(5,False)

+---------+----------------+----+-----------+-------+--------------------------+------+------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+------------+------------------------------------------------------------------------+---------------------------------------+
|movie_id |movie_name      |year|certificate|runtime|genre                     |rating|votes |gross_revenue|description                                                                                                                                                                                                             |director        |director_id |star                                                                    |star_id                                |
+---------+----------------+----+-----------+-------+---------

In [6]:
# Loading the database 

In [7]:
df = spark.read \
         .option("header", "true") \
         .option("multiLine", "true") \
         .csv("/content/drive/MyDrive/clean.csv")


In [8]:
# Cache the DataFrame to improve performance
df.show(25)

+----------+--------------------+----+-----------+-------+--------------------+------+------+-------------+--------------------+--------------------+------------+--------------------+--------------------+
|  movie_id|          movie_name|year|certificate|runtime|               genre|rating|votes |gross_revenue|         description|            director| director_id|                star|             star_id|
+----------+--------------------+----+-----------+-------+--------------------+------+------+-------------+--------------------+--------------------+------------+--------------------+--------------------+
| tt1825683|       Black Panther|2018|      PG-13|    134|Action, Adventure...|     7| 12730|      1018400|T'Challa, heir to...|        Ryan Coogler|  nm3363032 |Chadwick Boseman,...|nm1569276,nm04301...|
| tt0092099|             Top Gun|1986|         PG|    109|       Action, Drama|     7| 10355|       828400|As students at th...|          Tony Scott|  nm0001716 |Tom Cruise, \nTim.

In [9]:
# Check if the DataFrame loaded correctly
if df is not None:
    # Display summary statistics for all columns
    print("Summary statistics for all columns:")
    df.describe().show()
else:
    print("Error: DataFrame is None.")

Summary statistics for all columns:
+-------+---------+--------------------+------------------+-----------+------------------+------------+------------------+------------------+------------------+--------------------+--------------------+------------+--------------------+--------------------+
|summary| movie_id|          movie_name|              year|certificate|           runtime|       genre|            rating|            votes |     gross_revenue|         description|            director| director_id|                star|             star_id|
+-------+---------+--------------------+------------------+-----------+------------------+------------+------------------+------------------+------------------+--------------------+--------------------+------------+--------------------+--------------------+
|  count|    19258|               19258|             19258|      18487|             19256|       19258|             19258|             19258|             19258|               19258|         

In [10]:
# Display the schema and column names
print("Schema:")
df.printSchema()
print("\nColumn names:")
print(df.columns)

Schema:
root
 |-- movie_id: string (nullable = true)
 |-- movie_name: string (nullable = true)
 |-- year: string (nullable = true)
 |-- certificate: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- votes : string (nullable = true)
 |-- gross_revenue: string (nullable = true)
 |-- description: string (nullable = true)
 |-- director: string (nullable = true)
 |-- director_id: string (nullable = true)
 |-- star: string (nullable = true)
 |-- star_id: string (nullable = true)


Column names:
['movie_id', 'movie_name', 'year', 'certificate', 'runtime', 'genre', 'rating', 'votes ', 'gross_revenue', 'description', 'director', 'director_id', 'star', 'star_id']


In [11]:
# Check the number of rows and columns
print("\nNumber of rows:", df.count())
print("Number of columns:", len(df.columns))


Number of rows: 19258
Number of columns: 14


In [12]:
# Display summary statistics for numeric columns
numeric_cols = [col_name for col_name, col_type in df.dtypes if col_type.startswith('double') or col_type.startswith('int')]
if numeric_cols:
    numeric_df = df.select(numeric_cols)
    print("\nSummary statistics for numeric columns:")
    numeric_df.describe().show()
else:
    print("\nNo numeric columns found.")


No numeric columns found.


In [13]:
df.limit(5)

DataFrame[movie_id: string, movie_name: string, year: string, certificate: string, runtime: string, genre: string, rating: string, votes : string, gross_revenue: string, description: string, director: string, director_id: string, star: string, star_id: string]

In [14]:
df.columns

['movie_id',
 'movie_name',
 'year',
 'certificate',
 'runtime',
 'genre',
 'rating',
 'votes ',
 'gross_revenue',
 'description',
 'director',
 'director_id',
 'star',
 'star_id']

In [15]:
df.dtypes

[('movie_id', 'string'),
 ('movie_name', 'string'),
 ('year', 'string'),
 ('certificate', 'string'),
 ('runtime', 'string'),
 ('genre', 'string'),
 ('rating', 'string'),
 ('votes ', 'string'),
 ('gross_revenue', 'string'),
 ('description', 'string'),
 ('director', 'string'),
 ('director_id', 'string'),
 ('star', 'string'),
 ('star_id', 'string')]

In [16]:
df.printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- movie_name: string (nullable = true)
 |-- year: string (nullable = true)
 |-- certificate: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- votes : string (nullable = true)
 |-- gross_revenue: string (nullable = true)
 |-- description: string (nullable = true)
 |-- director: string (nullable = true)
 |-- director_id: string (nullable = true)
 |-- star: string (nullable = true)
 |-- star_id: string (nullable = true)



In [17]:
# By readng the schema in the code above we observed the values to be string. This is due to the delimiter or carriage characters which are "/",",",":",";". 
# Hence we specified the delimiter by using sep ",", which means specifying that the data in the cell is separated by comma. 
# Now it can be observed that all numeric values are integer 
df = spark.read.csv('/content/drive/MyDrive/clean.csv', header=True, sep=",", inferSchema=True)
df.printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- movie_name: string (nullable = true)
 |-- year: string (nullable = true)
 |-- certificate: string (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- genre: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- votes : integer (nullable = true)
 |-- gross_revenue: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- director: string (nullable = true)
 |-- director_id: string (nullable = true)
 |-- star: string (nullable = true)
 |-- star_id: string (nullable = true)



# Summary Statistics

In [18]:
# Display summary statistics for numeric columns
numeric_cols = [col_name for col_name, col_type in df.dtypes if col_type.startswith('double') or col_type.startswith('int')]
if numeric_cols:
    numeric_df = df.select(numeric_cols)
    print("\nSummary statistics for numeric columns:")
    numeric_df.describe().show()
else:
    print("\nNo numeric columns found.")

# The rutime mean is 107 with a stadard deviation of 21.72,
# The rating mean is 6 with a standard deviation of 1.06,
# The votes mean is 10214 with a standar deviation of 2066,
# The gross revenue mean is 817,136 (in $) with standard deviation of 165320. 


Summary statistics for numeric columns:
+-------+------------------+------------------+------------------+------------------+
|summary|           runtime|            rating|            votes |     gross_revenue|
+-------+------------------+------------------+------------------+------------------+
|  count|             19256|             19258|             19258|             19258|
|   mean|107.52908184461985|6.2920344791774845|10214.201890123584| 817136.1512098868|
| stddev| 21.72623474623741|1.0612835607599416|  2066.50875449799|165320.70035983936|
|    min|                40|                 2|                 0|                 0|
|    max|               808|                 9|             76760|           6140800|
+-------+------------------+------------------+------------------+------------------+



# Deriving a new column from an exisitng one

In [19]:
from pyspark.sql.functions import col
from pyspark.sql.functions import lit

In [20]:
# Deriving a new column from an exisitng one
# We will add a new column called 'movie_certificate' which has the value of movie name and certificate appended together with a space in between 
from pyspark.sql.functions import concat
df = df.withColumn('movie_certificate', concat(col("movie_name"), lit(" "), col("certificate")))
# lit means literal. It populates the row with the literal value given.
# When adding static data / constant values, it is a good practice to use it.
df.show(5,truncate=False)

+-----------------+---------------------------------------+----+-----------+-------+-------------------------+------+------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+------------+------------------+-------+-------------------+
|movie_id         |movie_name                             |year|certificate|runtime|genre                    |rating|votes |gross_revenue|description                                                                                                                                                                                                      |director    |director_id |star              |star_id|movie_certificate  |
+-----------------+---------------------------------------+----+-----------+-------+-------------------------+------+------+-------------+------------------

# Grouping By Columns

In [21]:
# Group By a column in PySpark
df.groupBy('certificate').count().show(5)

+--------------------+-----+
|         certificate|count|
+--------------------+-----+
|           Not Rated| 1955|
|             Unrated|  289|
|nm7248321,nm72510...|    2|
|            Approved|  347|
|              Passed|  328|
+--------------------+-----+
only showing top 5 rows



In [22]:
# Group By a column in PySpark
df.groupBy('genre').count().show(5)

+--------------------+-----+
|               genre|count|
+--------------------+-----+
|    Action, Thriller|  161|
|Adventure, Horror...|    2|
|Action, Drama, Music|    9|
|Crime, Drama, Action|    2|
|     Crime, Thriller|   88|
+--------------------+-----+
only showing top 5 rows



In [23]:
!pip install pyspark_dist_explore
from pyspark.sql import SparkSession
import pyspark_dist_explore

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark_dist_explore
  Downloading pyspark_dist_explore-0.1.8-py3-none-any.whl (7.2 kB)
Installing collected packages: pyspark_dist_explore
Successfully installed pyspark_dist_explore-0.1.8


In [34]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

# Create a SparkSession
spark = SparkSession.builder.appName("ClusteringExample").getOrCreate()

# Load the data into a PySpark DataFrame
#data = spark.read.csv("df.csv", header=True, inferSchema=True)

# Prepare the data for clustering
assembler = VectorAssembler(inputCols=["runtime", "rating", "votes "], outputCol="features", handleInvalid="skip")
data = assembler.transform(df).select("features")



In [36]:
# Train a k-means clustering model
kmeans = KMeans(k=2, seed=1)
model = kmeans.fit(data)

In [43]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [47]:
# Evaluate the model
#wssse = model.computeCost(data)
#print("Within Set Sum of Squared Errors = " + str(wssse))


wssse = model.summary.trainingCost
print("Within Set Sum of Squared Errors = " + str(wssse))

# Make predictions on new data
predictions = model.transform(data)

# In the above example, we first load the data, train the KMeans model, compute the cost using the computeCost() method from the KMeans class, and then evaluate the clustering using the ClusteringEvaluator class.
# The Within Set Sum of Squared Errors (WSSSE) is a measure of the total squared distance between each point in the dataset and its assigned cluster center in a K-means clustering model. A lower WSSSE indicates better clustering, as it suggests that the points in each cluster are closer to their respective cluster centers. In the given example, the WSSSE value is 38959147433.03747. 
# This value is just a number and it is hard to interpret whether it is high or low without any context. However, we can compare this value with the WSSSE values obtained from other clustering models applied to the same dataset, to determine which model performs better.
# In general, while comparing the WSSSE values of different clustering models, a lower value of WSSSE indicates better clustering as it suggests that the points in each cluster are closer to their respective cluster centers. However, the optimal value of WSSSE largely depends on the context of the problem and the specific dataset being used.

Within Set Sum of Squared Errors = 38959147433.03747


In [49]:
predictions = model.transform(data)
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette Score:", silhouette)

# The Silhouette score is a measure of how similar an object is to its own cluster compared to other clusters, and ranges from -1 to 1, with higher values indicating better clustering results.

Silhouette Score: 0.696324522863174


# References
https://medium.datadriveninvestor.com/big-data-analytics-with-pyspark-pyspark-on-google-colab-5abe2322221