<a href="https://colab.research.google.com/github/amien1410/colab-notebooks/blob/main/Colab_Pyspark_Utility_Functions_and_Visualizations.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# install pyspark
!pip install pyspark

# initiate pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

# download sample
!wget -O movies.csv "https://raw.githubusercontent.com/Apress/applied-data-science-using-pyspark/refs/heads/main/Ch02/Chapter2_Data/movie_data_part1.csv"
df = spark.read.csv("movies.csv", header=True, sep='|', inferSchema=False)
df.show(10)

--2025-06-11 03:16:15--  https://raw.githubusercontent.com/Apress/applied-data-science-using-pyspark/refs/heads/main/Ch02/Chapter2_Data/movie_data_part1.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 29227553 (28M) [text/plain]
Saving to: ‘movies.csv’


2025-06-11 03:16:17 (25.2 MB/s) - ‘movies.csv’ saved [29227553/29227553]

+---------------------+------+-----+-----------------+--------------------+--------------------+----------+--------------------+--------------------+------------+-------+-------+--------+--------------------+--------------------+------------+
|belongs_to_collection|budget|   id|original_language|      original_title|            overview|popularity|production_companies|production_countries|release_date|revenue|runtime|  status|     

In [2]:
#@title Casting Multiple Columns

#Importing necessary libraries
from pyspark.sql.types import *

#Identifying and assigning lists of variables
int_vars=['id']
float_vars=['budget', 'popularity', 'revenue']
date_vars=['release_date']

#Converting integer variables
for column in int_vars:
  df = df.withColumn(column,df[column].cast(IntegerType()))

# Converting float variables
for column in float_vars:
  df=df.withColumn(column,df[column].cast(FloatType()))

# Converting float variables
for column in date_vars:
  df=df.withColumn(column,df[column].cast(DateType()))

df.dtypes

[('belongs_to_collection', 'string'),
 ('budget', 'float'),
 ('id', 'int'),
 ('original_language', 'string'),
 ('original_title', 'string'),
 ('overview', 'string'),
 ('popularity', 'float'),
 ('production_companies', 'string'),
 ('production_countries', 'string'),
 ('release_date', 'date'),
 ('revenue', 'float'),
 ('runtime', 'string'),
 ('status', 'string'),
 ('tagline', 'string'),
 ('title', 'string'),
 ('vote_average', 'string')]

In [4]:
# using when function
df_with_newcols = df.select('id', 'budget','popularity').\
withColumn('budget_cat', when(df['budget']<10000000,'Small').when(df['budget']<100000000,'Medium').otherwise('Big')).\
withColumn('ratings', when(df['popularity']<3,'Low').
when(df['popularity']<5,'Mid').otherwise('High'))

df_with_newcols.show(15, False)

+-----+---------+----------+----------+-------+
|id   |budget   |popularity|budget_cat|ratings|
+-----+---------+----------+----------+-------+
|43000|0.0      |2.503     |Small     |Low    |
|43001|0.0      |5.51      |Small     |High   |
|43002|0.0      |5.62      |Small     |High   |
|43003|0.0      |7.159     |Small     |High   |
|43004|500000.0 |3.988     |Small     |Mid    |
|43006|0.0      |3.194     |Small     |Mid    |
|43007|0.0      |2.689     |Small     |Low    |
|43008|0.0      |6.537     |Small     |High   |
|43010|0.0      |4.297     |Small     |Mid    |
|43011|0.0      |4.417     |Small     |Mid    |
|43012|7000000.0|4.722     |Small     |Mid    |
|43013|0.0      |2.543     |Small     |Low    |
|43014|0.0      |4.303     |Small     |Mid    |
|43015|0.0      |3.493     |Small     |Mid    |
|43016|0.0      |2.851     |Small     |Low    |
+-----+---------+----------+----------+-------+
only showing top 15 rows



In [6]:
# Concatenating two variables
df_with_newcols = df_with_newcols.withColumn('BudgetRating_Category',concat(df_with_newcols.budget_cat,df_with_newcols.ratings))

# Changing the new variable to lowercase
df_with_newcols = df_with_newcols.withColumn('BudgetRating_Category',trim(lower(df_with_newcols.BudgetRating_Category)))
df_with_newcols.show()

+-----+---------+----------+----------+-------+---------------------+
|   id|   budget|popularity|budget_cat|ratings|BudgetRating_Category|
+-----+---------+----------+----------+-------+---------------------+
|43000|      0.0|     2.503|     Small|    Low|             smalllow|
|43001|      0.0|      5.51|     Small|   High|            smallhigh|
|43002|      0.0|      5.62|     Small|   High|            smallhigh|
|43003|      0.0|     7.159|     Small|   High|            smallhigh|
|43004| 500000.0|     3.988|     Small|    Mid|             smallmid|
|43006|      0.0|     3.194|     Small|    Mid|             smallmid|
|43007|      0.0|     2.689|     Small|    Low|             smalllow|
|43008|      0.0|     6.537|     Small|   High|            smallhigh|
|43010|      0.0|     4.297|     Small|    Mid|             smallmid|
|43011|      0.0|     4.417|     Small|    Mid|             smallmid|
|43012|7000000.0|     4.722|     Small|    Mid|             smallmid|
|43013|      0.0|   

In [8]:
#@title Registering Dataframes

# Registering temporary table
df_with_newcols.registerTempTable('temp_data')

# Applying the function to show the results
spark.sql('select ratings, count(ratings) from temp_data group by ratings').show(10, False)



+-------+--------------+
|ratings|count(ratings)|
+-------+--------------+
|High   |16856         |
|Low    |14865         |
|Mid    |12277         |
+-------+--------------+



In [9]:
#@title Window Functions

# Importing the window functions
from pyspark.sql.window import *

# Step 1: Filtering the missing values
df_with_newcols=df_with_newcols.filter( (df_with_newcols['popularity'].isNotNull()) & (~isnan(df_with_newcols['popularity'])) )

# Step 2: Applying the window functions for calculating deciles
df_with_newcols = df_with_newcols.select("id","budget","popularity", ntile(10).over(Window.partitionBy().orderBy(df_with_newcols['popularity'].desc())).alias("decile_rank"))

# Step 3:Dispalying the values
df_with_newcols.groupby("decile_rank").agg(min('popularity').alias('min_popularity'),max('popularity').alias('max_popularity'),count('popularity')).show()

+-----------+--------------+--------------+-----------------+
|decile_rank|min_popularity|max_popularity|count(popularity)|
+-----------+--------------+--------------+-----------------+
|          1|        10.185|         180.0|             4379|
|          2|         7.481|        10.182|             4379|
|          3|         5.841|         7.481|             4379|
|          4|         4.823|         5.841|             4378|
|          5|         4.054|         4.822|             4378|
|          6|         3.383|         4.054|             4378|
|          7|         2.747|         3.383|             4378|
|          8|         2.075|         2.747|             4378|
|          9|         1.389|         2.075|             4378|
|         10|           0.6|         1.389|             4378|
+-----------+--------------+--------------+-----------------+



In [10]:
#@title FInd the second most popular movie in the year 1970

# Step 1: Import the window functions
from pyspark.sql.window import *

# Step 2: Select the required subset of columns
df_second_best = df.select('id', 'popularity', 'release_date')

# Step 3: Create the year column from release date
df_second_best = df_second_best.withColumn('release_year',year('release_date')).drop('release_date')

# Step 4: Define partition function
year_window = Window.partitionBy(df_second_best['release_year']).orderBy(df_with_newcols['popularity'].desc())

# Step 5: Apply partition function
df_second_best = df_second_best.select('id', 'popularity', 'release_year',rank().over(year_window).alias("rank"))

# Step 6: Find the second best rating for the year 1970
df_second_best.filter((df_second_best['release_year'] == 1970) & (df_second_best['rank']==2)).show()

+-----+----------+------------+----+
|   id|popularity|release_year|rank|
+-----+----------+------------+----+
|11202|    14.029|        1970|   2|
+-----+----------+------------+----+



In [33]:
#@title what is the difference between the revenue of the highest-grossing film of the year and other films within that year?

from pyspark.sql.functions import col, to_date, year, max
from pyspark.sql.window import Window

# Convert release_date and extract year
df_revenue = df.select(
    'id',
    'revenue',
    to_date('release_date', 'yyyy-MM-dd').alias('release_date')
).withColumn('release_year', year('release_date'))

# Filter out rows with nulls in critical columns
df_revenue = df_revenue.filter(col('id').isNotNull() & col('revenue').isNotNull() & col('release_year').isNotNull())

# Define window
windowRev = Window.partitionBy('release_year')\
                  .orderBy(col('revenue').desc())\
                  .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# Add revenue difference
df_revenue = df_revenue.withColumn(
    'revenue_difference',
    max(col('revenue')).over(windowRev) - col('revenue')
)

# Show final result
df_revenue.select('id', 'revenue', 'release_year', 'revenue_difference').show(10, False)

+-----+---------+------------+------------------+
|id   |revenue  |release_year|revenue_difference|
+-----+---------+------------+------------------+
|70512|0.0      |1911        |0.0               |
|46751|0.0      |1911        |0.0               |
|96128|1800000.0|1913        |0.0               |
|28627|0.0      |1913        |1800000.0         |
|56511|0.0      |1913        |1800000.0         |
|56516|0.0      |1913        |1800000.0         |
|98078|0.0      |1914        |0.0               |
|28196|0.0      |1914        |0.0               |
|5153 |0.0      |1914        |0.0               |
|22943|0.0      |1914        |0.0               |
+-----+---------+------------+------------------+
only showing top 10 rows



In [31]:
from pyspark.sql.functions import col, to_date, year, max
from pyspark.sql.window import Window

# Convert release_date and extract year
df_revenue = df.select(
    'id',
    'revenue',
    to_date('release_date', 'yyyy-MM-dd').alias('release_date')
).withColumn('release_year', year('release_date'))

# Filter out rows with nulls in critical columns
df_revenue = df_revenue.filter(col('id').isNotNull() & col('revenue').isNotNull() & col('release_year').isNotNull())

# Define window
windowRev = Window.partitionBy('release_year')\
                  .orderBy(col('revenue').desc())\
                  .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# Add revenue difference
df_revenue = df_revenue.withColumn(
    'revenue_difference',
    max(col('revenue')).over(windowRev) - col('revenue')
)

# Show final result
df_revenue.select('id', 'revenue', 'release_year', 'revenue_difference').show(10, False)

+-----+---------+------------+------------------+
|id   |revenue  |release_year|revenue_difference|
+-----+---------+------------+------------------+
|70512|0.0      |1911        |0.0               |
|46751|0.0      |1911        |0.0               |
|96128|1800000.0|1913        |0.0               |
|28627|0.0      |1913        |1800000.0         |
|56511|0.0      |1913        |1800000.0         |
|56516|0.0      |1913        |1800000.0         |
|98078|0.0      |1914        |0.0               |
|28196|0.0      |1914        |0.0               |
|5153 |0.0      |1914        |0.0               |
|22943|0.0      |1914        |0.0               |
+-----+---------+------------+------------------+
only showing top 10 rows



In [32]:
#@title Collect List

# Step 1: Create the year column from release date
df = df.withColumn('release_year',year('release_date'))

# Step 2: Apply collect_list function to gather all occurrences
df.filter("title=='The Lost World'").groupby('title').agg(collect_list("release_year")).show(1,False)

+--------------+------------------------------------+
|title         |collect_list(release_year)          |
+--------------+------------------------------------+
|The Lost World|[1999, 2001, 1925, 1960, 1992, 1998]|
+--------------+------------------------------------+



In [34]:
from pyspark.sql import functions as F

(df
    # 1️⃣  Filter first – touch as few rows as possible
    .where(F.col("title") == "The Lost World")

    # 2️⃣  Project only the two columns you still need
    .select(
        "title",
        F.year("release_date").alias("release_year")   # compute year on-the-fly
    )

    # 3️⃣  Single shuffle: group → collect_list
    .groupBy("title")
    .agg(
        F.sort_array(            # optional: deterministic order
            F.collect_list("release_year")
        ).alias("release_years")
    )
    .show(1, False)
)


+--------------+------------------------------------+
|title         |release_years                       |
+--------------+------------------------------------+
|The Lost World|[1925, 1960, 1992, 1998, 1999, 2001]|
+--------------+------------------------------------+



In [35]:
# Cache if reused
sub = df.where(F.col("title") == "The Lost World").select("title", F.year("release_date").alias("release_year"))
sub.cache()

DataFrame[title: string, release_year: int]

In [36]:
# Repartition on the grouping key
sub = sub.repartition("title")

In [37]:
#@title Sampling

# Simple random sampling in PySpark without replacement
df_sample = df.sample(False, 0.4, seed=11)
print(f"Sample size: {df_sample.count()}")

Sample size: 17627
