In [9]:
!pip3 install pyspark


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.2[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, col
import time
import os

In [11]:
spark = SparkSession.builder \
    .appName("Comparative Analysis") \
    .getOrCreate()

In [12]:
def extract_spark(file_path):
    start_time = time.time()
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    duration = time.time() - start_time
    return df, duration

In [13]:
def transform_spark_rename_columns(df):
    start_time = time.time()
    for col_name in df.columns:
        df = df.withColumnRenamed(col_name, f"{col_name}_new")
    duration = time.time() - start_time
    return duration
    
def transform_spark_drop_na(df):
    start_time = time.time()
    df = df.dropna()
    duration = time.time() - start_time
    return duration

In [14]:
def get_file_size(path):
    hadoop_conf = spark._jsc.hadoopConfiguration()
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(hadoop_conf)
    path = spark._jvm.org.apache.hadoop.fs.Path(path)
    
    if not fs.exists(path):
        raise FileNotFoundError(f"The folder path does not exist: {path}")

    def calculate_size(folder):
        file_status = fs.listStatus(folder)
        total_size = 0
        for status in file_status:
            if status.isFile():
                total_size += status.getLen()  
            elif status.isDirectory():
                total_size += calculate_size(status.getPath()) 
        return total_size

    total_size_bytes = calculate_size(path)

    total_size_mb = total_size_bytes / (1024 * 1024)
    return round(total_size_mb, 2)

In [15]:
def load_spark_csv(df, output_path):
    start_time = time.time()
    df.write.csv(output_path, header=True, mode='overwrite')
    duration = time.time() - start_time

    file_size = get_file_size(output_path)

    return duration, file_size


def load_spark_parquet(df, output_path):
    start_time = time.time()
    df.write.parquet(output_path, mode='overwrite')
    duration = time.time() - start_time

    file_size = get_file_size(output_path)

    return duration, file_size


def load_spark_orc(df, output_path):

    start_time = time.time()
    df.write.format("orc").mode("overwrite").option("mergeSchema", "true").save(output_path)
    duration = time.time() - start_time

    file_size = get_file_size(output_path)

    return duration, file_size


In [16]:
datasets = ['transactions_data.csv', 'titanic.csv', 'reviews.csv', 'locations.csv']

dim_datasets = []
fact_metrics = []

for index, dataset in enumerate(datasets):
    primary_key = index + 1
    path = dataset.split('.')[0]
    
    df_raw, extract_time = extract_spark(f"data/{dataset}")
    
    transform_rename_columns = transform_spark_rename_columns(df_raw)
    transform_dropna = transform_spark_drop_na(df_raw)
    
    load_time_csv, file_size_csv = load_spark_csv(df_raw, f"pyspark_analysis/{path}.csv")
    load_time_parquet, file_size_parquet = load_spark_parquet(df_raw, f"pyspark_analysis/{path}.parquet")
    load_time_orc, file_size_orc = load_spark_orc(df_raw, f"pyspark_analysis/{path}.orc")
    
    dim_datasets.append({
        "id": primary_key,
        "dataset_name": path,
        "number_of_rows": df_raw.count()
    })
    
    fact_metrics.append({
        "dataset_id": primary_key,
        "extract_time": round(extract_time,2),
        "transform_rename_columns_time": round(transform_rename_columns,2),
        "transform_dropna_time": round(transform_dropna,2),
        "load_time_csv": round(load_time_csv,2),
        "file_size_csv_mb": round(file_size_csv / (1024 * 1024),2),
        "load_time_parquet": round(load_time_parquet,2),
        "file_size_parquet_mb": round(file_size_parquet / (1024 * 1024),2),
        "load_time_orc": round(load_time_orc,2),
        "file_size_orc_mb": round(file_size_orc / (1024 * 1024),2)
    })
    


dim_datasets_df = spark.createDataFrame(dim_datasets)
fact_metrics_df = spark.createDataFrame(fact_metrics)

dim_datasets_pd = dim_datasets_df.toPandas()
fact_metrics_pd = fact_metrics_df.toPandas()

dim_datasets_pd.to_csv("pyspark_analysis/dim_datasets.csv", index=False)
fact_metrics_pd.to_csv("pyspark_analysis/fact_metrics_pyspark.csv", index=False)


print("Process finished successfully!")

24/12/10 18:22:33 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/12/10 18:22:48 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
  from pandas.core import (
                                                                                

Process finished successfully!


In [17]:
import pandas as pd

In [18]:
df_dim = pd.read_csv("pyspark_analysis/dim_datasets.csv", sep=',')
df_dim

Unnamed: 0,dataset_name,id,number_of_rows
0,transactions_data,1,13305915
1,titanic,2,891
2,reviews,3,703796
3,locations,4,845


In [19]:
df_fact = pd.read_csv("pyspark_analysis/fact_metrics_pyspark.csv", sep=',')
df_fact

Unnamed: 0,dataset_id,extract_time,file_size_csv_mb,file_size_orc_mb,file_size_parquet_mb,load_time_csv,load_time_orc,load_time_parquet,transform_dropna_time,transform_rename_columns_time
0,1,11.46,0.0,0.0,0.0,22.74,26.78,23.08,0.01,0.08
1,2,0.31,0.0,0.0,0.0,0.22,0.22,0.24,0.02,0.09
2,3,1.11,0.0,0.0,0.0,0.97,1.18,0.72,0.01,0.01
3,4,0.21,0.0,0.0,0.0,0.2,0.17,0.18,0.01,0.1
