In [0]:
!pip install tqdm

Collecting tqdm
  Downloading tqdm-4.67.1-py3-none-any.whl.metadata (57 kB)
Downloading tqdm-4.67.1-py3-none-any.whl (78 kB)
Installing collected packages: tqdm
Successfully installed tqdm-4.67.1
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
dbutils.library.restartPython()

In [0]:
%fs ls /Volumes/workspace/default/assignment1/

path,name,size,modificationTime
dbfs:/Volumes/workspace/default/assignment1/2024/,2024/,0,1762853582136
dbfs:/Volumes/workspace/default/assignment1/__MACOSX/,__MACOSX/,0,1762853582136
dbfs:/Volumes/workspace/default/assignment1/filesystem_snapshot.parquet,filesystem_snapshot.parquet,141580806,1762851471000
dbfs:/Volumes/workspace/default/assignment1/recreated_cvelistV5/,recreated_cvelistV5/,0,1762853582136


In [0]:
# --- Code to Show First 5 Rows ---

# Define the path to your Parquet file
parquet_path = "/Volumes/workspace/default/assignment1/filesystem_snapshot.parquet"

# Read the Parquet file into a Spark DataFrame
df = spark.read.parquet(parquet_path)

# Show the first 5 rows of the DataFrame
# truncate=False ensures you can see the full content of each column
df.show(5, truncate=False)

+---------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
import os
import pandas as pd
from pyspark.sql.functions import col, spark_partition_id

# --- 1. Configuration ---
parquet_path = "/Volumes/workspace/default/assignment1/filesystem_snapshot.parquet"
output_root_path = "/Volumes/workspace/default/assignment1/recreated_cvelistV5"

# --- NEW: PARALLELISM CONFIGURATION ---
# We will force Spark to use this many parallel tasks.
NUM_PARALLEL_TASKS = 200

print(f"Reading snapshot from: {parquet_path}")
print(f"Will recreate the exact folder structure inside: {output_root_path}")

# --- 2. Read and REPARTITION the DataFrame ---
df = spark.read.parquet(parquet_path)

print(f"\nIncreasing parallelism to {NUM_PARALLEL_TASKS} tasks...")

# *** THIS IS THE KEY TO MORE SPEED ***
# Repartition the DataFrame into many more pieces for higher parallelism.
df_repartitioned = df.repartition(NUM_PARALLEL_TASKS)

# --- 3. Verify Parallelism (The Correct, Serverless-Compliant Way) ---
# This runs a quick, separate job to count the distinct partition IDs.
actual_tasks = df_repartitioned.select(spark_partition_id()).distinct().count()

print("\n--- Parallelism Plan ---")
print(f"The DataFrame is now split into {actual_tasks} partitions.")
print(f"Spark will run up to {actual_tasks} tasks in parallel to process this data.")
print("--------------------------\n")

# --- 4. Define the BATCHED File Writing Function ---
# This function receives an ITERATOR of Pandas DataFrames.
def write_files_in_batch(pdf_iterator: iter) -> pd.DataFrame:
    for pdf in pdf_iterator:
        for index, row in pdf.iterrows():
            clean_relative_path = row['relative_path'].replace('\\', '/')
            full_destination_path = os.path.join(output_root_path, clean_relative_path)
            destination_dir = os.path.dirname(full_destination_path)
            os.makedirs(destination_dir, exist_ok=True)
            with open(full_destination_path, "wb") as f:
                f.write(row['file_content'])
    return pd.DataFrame()

# --- 5. Run the BLAZING FAST, REPARTITIONED Job ---
print("\nStarting the HIGHLY PARALLEL, distributed file recreation.")
print("!!! MONITOR THE SPARK UI PROGRESS BAR BELOW THIS CELL !!!")

# We call mapInPandas on the REPARTITIONED DataFrame.
df_repartitioned.mapInPandas(write_files_in_batch, schema="result INT").collect()

print("\nFile structure recreated successfully!")

# --- 6. Final Verification in Databricks ---
print("\n--- Verifying recreated structure (listing top-level items) ---")
display(dbutils.fs.ls(output_root_path))

Reading snapshot from: /Volumes/workspace/default/assignment1/filesystem_snapshot.parquet
Will recreate the exact folder structure inside: /Volumes/workspace/default/assignment1/recreated_cvelistV5

Increasing parallelism to 200 tasks...

--- Parallelism Plan ---
The DataFrame is now split into 200 partitions.
Spark will run up to 200 tasks in parallel to process this data.
--------------------------


Starting the HIGHLY PARALLEL, distributed file recreation.
!!! MONITOR THE SPARK UI PROGRESS BAR BELOW THIS CELL !!!

File structure recreated successfully!

--- Verifying recreated structure (listing top-level items) ---


path,name,size,modificationTime
dbfs:/Volumes/workspace/default/assignment1/recreated_cvelistV5/README.md,README.md,11356,1762854883000
dbfs:/Volumes/workspace/default/assignment1/recreated_cvelistV5/cves/,cves/,0,1762855307165
