# Introduction

## Architecture

I decided to solve this challenge using the Azure cloud, specifically leveraging Azure Blob Storage to store the JSON file, Azure Key Vault to securely manage secrets, and Azure Databricks as the data engineering platform due to its powerful data processing capabilities. Recognizing that LATAM Airlines operates on GCP, I would adapt my approach by using Google Cloud Storage and Google Secret Manager as alternatives to Azure services. Databricks, being a multi-cloud platform compatible with Azure, AWS, and GCP, remains suitable for this solution on GCP.

## Setup

First, I install all relevant libraries on the cluster and choose to hide the output.

In [0]:
%pip install -q emoji polars memray

Then I import all libraries, that I need to run this notebook.

In [0]:
from src.q1_memory import q1_memory
from src.q1_time import q1_time
from src.q2_memory import q2_memory
from src.q2_time import q2_time
from src.q3_memory import q3_memory
from src.q3_time import q3_time
import polars as pl
import time
import logging
import os
import memray

Then I created some variables, that I will be using later.

In [0]:
storage_account_name = dbutils.secrets.get(scope="kv-scope", key="storage-account-name")
container_name = "data"
mount_point = "/mnt/data"
file_path = "farmers-protest-tweets-2021-2-4"

Now I could proceed and mount the Azure Blob Storage container into the Databricks environment, so I can access the JSON file directly from Databricks.

In [0]:
# Create this mount, if it does not already exist
if not any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
    dbutils.fs.mount(
        source = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/",
        mount_point = mount_point,
        extra_configs  = {f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net" : dbutils.secrets.get(scope="kv-scope", key="access-key")}
    )

To enhance performance and efficiency, I chose to transform the JSON file into a Parquet file format. Parquet is a columnar storage format optimized for analytical queries, which improves read speed and reduces storage costs by compressing data. This choice enables faster data processing, especially for querying subsets of data, which is highly beneficial in handling large datasets like this one.

In [0]:
# Set up logging
logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("JSON_to_Parquet")

def convert_json_to_parquet(input_path: str, output_path: str, overwrite: bool = True):
    """
    Converts a JSON file to Parquet format using PySpark.

    Parameters
    ----------
    input_path : str
        The path to the input JSON file.
    output_path : str
        The path where the Parquet file will be saved.
    overwrite : bool, optional
        If True, overwrites the existing Parquet file. Defaults to True.
    """
    try:

        logger.info(f"Reading JSON file from {input_path}")
        json_df = spark.read.json(input_path)
        
        logger.info(f"Writing DataFrame to Parquet at {output_path}")
        json_df.write.mode("overwrite").parquet(output_path)
        
        logger.info("Conversion completed successfully.")
        
    except Exception as e:
        logger.error(f"An error occurred during JSON to Parquet conversion: {e}")

In [0]:
convert_json_to_parquet(
    input_path=f"{mount_point}/{file_path}.json",
    output_path=f"{mount_point}/{file_path}.parquet"
)

In [0]:
json_file_path = f"/dbfs{mount_point}/{file_path}.json"
parquet_file_path = f"/dbfs{mount_point}/{file_path}.parquet/*.parquet"

# Challenge

Generally speaking for data manipulation and transformation, I selected Polars over PySpark and Pandas due to its performance advantages with large datasets. While Pandas is commonly used for data analysis, it’s more memory-intensive and slower with large files, making it less suitable here. PySpark, though excellent for distributed data processing, introduced initialization and processing overheads that slowed performance in this specific context. Polars, with its efficient and lightweight DataFrame operations, proved faster and more memory-efficient. Additionally, for memory-optimized functions, I opted to read the JSON file row by row, avoiding loading the entire file into memory. This row-by-row approach, combined with Polars’ efficiency, allowed me to meet both speed and memory constraints effectively.

## q1_memory

In [0]:
result = q1_memory(json_file_path)
result

## q1_time

In [0]:
result = q1_time(parquet_file_path)
result

## q2_memory

In [0]:
result = q2_memory(json_file_path)
result

## q2_time

In [0]:
result = q2_time(parquet_file_path)
result

## q3_memory

In [0]:
result = q3_memory(json_file_path)
result

## q3_time

In [0]:
result = q3_time(parquet_file_path)
result

# Metrics Monitoring

To provide clear insights into each function’s performance, I created a DataFrame summarizing two key metrics: peak memory usage and execution time. Each function is represented as a row, making the results easy to compare.

The approach involves two separate runs for each function. In the first run, I use Memray to track memory usage and capture the peak memory consumption, ensuring we identify the highest memory demand during the function’s execution. In the second run, I measure execution time independently to avoid any potential overhead from memory profiling, resulting in more accurate timing metrics. This separation of memory and execution profiling prevents interference between metrics, allowing for a clean and reliable performance analysis.

In [0]:
memory_functions = [q1_memory, q2_memory, q3_memory]
time_functions = [q1_time, q2_time, q3_time]

In [0]:
def measure_memory(func, *args, **kwargs):
    # Define the output file for memray
    output_file = f"{func.__name__}_memray.bin"
    
    # Remove the output file if it already exists to avoid conflicts
    if os.path.exists(output_file):
        os.remove(output_file)
    
    # Run the function with memray tracking
    with memray.Tracker(output_file, native_traces=True):
        func(*args, **kwargs)
    
    # Analyze the memray data to find peak memory usage
    peak_memory = 0
    reader = memray.FileReader(output_file)
    for record in reader.get_allocation_records():
        peak_memory = max(peak_memory, record.size)
    
    os.remove(output_file)
    
    # Convert bytes to MiB
    peak_memory_mib = peak_memory / (1024 ** 2)
    
    return peak_memory_mib

def measure_time(func, *args, **kwargs):
    start_time = time.time()
    func(*args, **kwargs)
    end_time = time.time()
    execution_time = end_time - start_time
    return execution_time

# Initialize list to store results
metrics_list = []

# Run profiling for memory-based functions (using JSON file)
for func in memory_functions:
    avg_memory = measure_memory(func, json_file_path)  # Measure memory usage with JSON file
    execution_time = measure_time(func, json_file_path)  # Measure execution time with JSON file
    metrics_list.append({
        "Function Name": func.__name__,
        "Peak Memory Usage (MiB)": round(avg_memory, 2),
        "Execution Time (seconds)": round(execution_time, 2)
    })

# Run profiling for time-based functions (using Parquet file)
for func in time_functions:
    avg_memory = measure_memory(func, parquet_file_path)  # Measure memory usage with Parquet file
    execution_time = measure_time(func, parquet_file_path)  # Measure execution time with Parquet file
    metrics_list.append({
        "Function Name": func.__name__,
        "Peak Memory Usage (MiB)": round(avg_memory, 2),
        "Execution Time (seconds)": round(execution_time, 2)
    })

# Convert the list of metrics to a Polars DataFrame
metrics_df = pl.DataFrame(metrics_list)
print(metrics_df)