Pyspark

In [None]:
python -m ipykernel install --user --name=pyspark_env --display-name "Python (pyspark_env)"

In [13]:
import findspark
findspark.init()

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col
import os
import requests
import gzip
import shutil

In [17]:
spark = SparkSession.builder \
    .config("spark.driver.host", "localhost") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .appName("mr_prac") \
    .getOrCreate()

In [9]:
spark

In [19]:
def download_and_unzip(url, output_dir="."):
    filename = url.split("/")[-1]
    local_gz_path = os.path.join(output_dir, filename)
    local_tsv_path = local_gz_path.replace(".gz", "")

    # Skip download if already exists
    if not os.path.exists(local_gz_path):
        print(f"Downloading {filename}...")
        response = requests.get(url, stream=True)
        with open(local_gz_path, 'wb') as f:
            shutil.copyfileobj(response.raw, f)
        print(f"Downloaded: {local_gz_path}")
    else:
        print(f"File already exists: {local_gz_path}")

    # Unzip .gz to .tsv
    if not os.path.exists(local_tsv_path):
        print(f"Unzipping {filename}...")
        with gzip.open(local_gz_path, 'rb') as f_in:
            with open(local_tsv_path, 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)
        print(f"Unzipped to: {local_tsv_path}")
    else:
        print(f"TSV already exists: {local_tsv_path}")

    return local_tsv_path

# IMDb dataset URLs
urls = [
    "https://datasets.imdbws.com/title.crew.tsv.gz",
    "https://datasets.imdbws.com/name.basics.tsv.gz"
]

# Download and unzip all
for url in urls:
    download_and_unzip(url)


# Load IMDb crew data
crew_df = spark.read.csv("title.crew.tsv", sep="\t", header=True, inferSchema=True, nullValue="\\N")

# Load name.basics for mapping nconst -> primaryName
names_df = spark.read.csv("name.basics.tsv", sep="\t", header=True, inferSchema=True, nullValue="\\N")

File already exists: ./title.crew.tsv.gz
TSV already exists: ./title.crew.tsv
File already exists: ./name.basics.tsv.gz
TSV already exists: ./name.basics.tsv


                                                                                

In [21]:
# Clean and extract directors (some titles have multiple directors)
crew_directors = crew_df.select("tconst", explode(split(col("directors"), ",")).alias("director_id")).na.drop()

# Count number of titles per director
director_counts = crew_directors.groupBy("director_id").count().alias("title_count")

# Join with names to get actual director names
director_with_names = director_counts.join(
    names_df.select("nconst", "primaryName"),
    director_counts["director_id"] == names_df["nconst"],
    "inner"
).select("primaryName", "count").orderBy(col("count").desc())

# Show top 20 directors
director_with_names.show(20, truncate=False)

                                                                                

+---------------------+-----+
|primaryName          |count|
+---------------------+-----+
|Johnny Manahan       |13154|
|Saibal Banerjee      |12613|
|Nivedita Basu        |12368|
|Bert De Leon         |10365|
|Anil v Kumar         |8984 |
|Santosh Bhatt        |8495 |
|Danie Joubert        |8282 |
|Conrado Lumabas      |8024 |
|Duma Ndlovu          |7986 |
|Silvia Abravanel     |7437 |
|Malu London          |7434 |
|Henrique Martins     |7173 |
|Shashank Bali        |7125 |
|Mário Márcio Bandarra|6974 |
|Paul Alter           |6888 |
|Walter Avancini      |6421 |
|Bruno De Paola       |6360 |
|Kevin McCarthy       |6264 |
|S. Kumaran           |6263 |
|Dilip Kumar          |6112 |
+---------------------+-----+
only showing top 20 rows



[6, 5, 4, 3, 2, 1, 0]
