# Library Things

In [None]:
!pip install duckdb --upgrade



In [None]:
!pip install polars --upgrade



In [None]:
!pip install gspread oauth2client --upgrade

Collecting gspread
  Downloading gspread-6.1.3-py3-none-any.whl.metadata (11 kB)
Downloading gspread-6.1.3-py3-none-any.whl (57 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.6/57.6 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: gspread
  Attempting uninstall: gspread
    Found existing installation: gspread 6.0.2
    Uninstalling gspread-6.0.2:
      Successfully uninstalled gspread-6.0.2
Successfully installed gspread-6.1.3


In [None]:
import os
import json
import time
import psutil
import polars as pl
import duckdb
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep
from random import uniform
import gspread
from oauth2client.service_account import ServiceAccountCredentials

In [None]:
from google.colab import drive
drive.mount('/content/drive/', force_remount=True)

Mounted at /content/drive/


# Case Study 1: SQL Query

**1. Initial Data Extraction and Table Creation**

In [None]:
# Connect to DuckDB
conn = duckdb.connect()

# Create the temporary table for storing business reviews
create_table_query = """
CREATE TEMPORARY TABLE business_reviews (
    business_id VARCHAR PRIMARY KEY,
    stars INTEGER,
    useful INTEGER,
    funny INTEGER,
    cool INTEGER,
    latest_date DATE
);
"""

# Execute the query to create the temporary table
conn.execute(create_table_query)

<duckdb.duckdb.DuckDBPyConnection at 0x7e5afdf87830>

In [None]:
# Load and summarize the filtered data from the JSON file
json_file_path = "/content/drive/MyDrive/Dealls Technical Test/yelp_academic_dataset_review.json"
filter_and_summarize_query = f"""
WITH filtered_business_reviews AS (
    SELECT
        business_id,
        stars,
        useful,
        funny,
        cool,
        date AS review_date
    FROM read_json_auto('{json_file_path}')
    WHERE date >= '2018-01-01' AND date <= '2018-01-31'
)
SELECT
    business_id,
    AVG(stars) AS stars,
    AVG(useful) AS useful,
    AVG(funny) AS funny,
    AVG(cool) AS cool,
    MAX(review_date) AS latest_date
FROM filtered_business_reviews
GROUP BY business_id;
"""

# Fetch the summarized data
summary_data = conn.execute(filter_and_summarize_query).fetchdf()

# Insert the summarized data into the business_reviews temporary table
for index, row in summary_data.iterrows():
    insert_query = f"""
    INSERT INTO business_reviews (business_id, stars, useful, funny, cool, latest_date)
    VALUES ('{row['business_id']}', {row['stars']}, {row['useful']}, {row['funny']}, {row['cool']}, '{row['latest_date']}')
    """
    conn.execute(insert_query)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [None]:
# Verify the insertion by selecting some data
result = conn.execute("SELECT * FROM business_reviews LIMIT 5").fetchdf()

# Display the result
print(result)

              business_id  stars  useful  funny  cool latest_date
0  lk9IwjZXqUMqqOhM774DtQ      4       4      3     5  2018-01-26
1  EQ-TZ2eeD_E0BHuvoaeG5Q      5       1      0     1  2018-01-29
2  TUH-FGpkOKCOVGIzmtTtfg      3       0      0     0  2018-01-22
3  2FnoLyEO0nq7g2Wz8x7Z5A      4       0      1     0  2018-01-24
4  PPJL9EbIM6B6r86Gf-H0nw      5       0      0     0  2018-01-22


**2. Daily Data Updates**

In [None]:
# Summarize the new data (for the daily update)
summarize_new_data_query = f"""
WITH new_filtered_reviews AS (
    SELECT
        business_id,
        stars,
        useful,
        funny,
        cool,
        date AS review_date
    FROM read_json_auto('{json_file_path}')
),
max_date_cte AS (
    -- Find the maximum review_date (latest date)
    SELECT MAX(review_date) AS max_date FROM new_filtered_reviews
)
-- Now select only the reviews with the maximum date
SELECT
    business_id,
    AVG(stars) AS stars,
    AVG(useful) AS useful,
    AVG(funny) AS funny,
    AVG(cool) AS cool,
    MAX(review_date) AS latest_date
FROM new_filtered_reviews
WHERE review_date = (SELECT max_date FROM max_date_cte)
GROUP BY business_id;
"""

# Fetch the summarized data into a Pandas DataFrame
summary_data = conn.execute(summarize_new_data_query).fetchdf()

# Insert new data into the business_reviews table or update it if business_id exists
for index, row in summary_data.iterrows():
    insert_query = f"""
    INSERT INTO business_reviews (business_id, stars, useful, funny, cool, latest_date)
    VALUES ('{row['business_id']}', {row['stars']}, {row['useful']}, {row['funny']}, {row['cool']}, '{row['latest_date']}')
    ON CONFLICT (business_id) DO UPDATE SET
        stars = EXCLUDED.stars,
        useful = EXCLUDED.useful,
        funny = EXCLUDED.funny,
        cool = EXCLUDED.cool,
        latest_date = GREATEST(business_reviews.latest_date, EXCLUDED.latest_date);
    """
    conn.execute(insert_query)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [None]:
# Step 8: Verify that the data has been updated
result = conn.execute("SELECT * FROM business_reviews LIMIT 5").fetchdf()

# Display the result
print(result)

              business_id  stars  useful  funny  cool latest_date
0  lk9IwjZXqUMqqOhM774DtQ      4       4      3     5  2018-01-26
1  EQ-TZ2eeD_E0BHuvoaeG5Q      5       1      0     1  2018-01-29
2  TUH-FGpkOKCOVGIzmtTtfg      3       0      0     0  2018-01-22
3  2FnoLyEO0nq7g2Wz8x7Z5A      4       0      1     0  2018-01-24
4  PPJL9EbIM6B6r86Gf-H0nw      5       0      0     0  2018-01-22


**3. Scenarios to Handle**

In [None]:
# Get the Summary Metrics
summary_metrics_query = """
SELECT
    COUNT(*) AS total_row_count,
    SUM(stars) AS total_sum_stars,
    SUM(useful) AS total_sum_useful,
    SUM(funny) AS total_sum_funny,
    SUM(cool) AS total_sum_cool
FROM business_reviews;
"""

# Execute the query and fetch the results
summary_metrics = conn.execute(summary_metrics_query).fetchdf()

# Display the summary metrics
print("Summary Metrics (Total):")
print(summary_metrics)

Summary Metrics (Total):
   total_row_count  total_sum_stars  total_sum_useful  total_sum_funny  \
0            30802         113875.0           36663.0           7879.0   

   total_sum_cool  
0         13808.0  


In [None]:
# Get the Detailed Metrics Per Date (grouped by latest_date)
detailed_metrics_query = """
SELECT
    latest_date,
    COUNT(*) AS row_count,
    SUM(stars) AS sum_stars,
    SUM(useful) AS sum_useful,
    SUM(funny) AS sum_funny,
    SUM(cool) AS sum_cool
FROM business_reviews
GROUP BY latest_date
ORDER BY latest_date;
"""

# Execute the query and fetch the results
detailed_metrics = conn.execute(detailed_metrics_query).fetchdf()

# Display the detailed metrics per date
print("\nDetailed Metrics Per Date:")
print(detailed_metrics)


Detailed Metrics Per Date:
   latest_date  row_count  sum_stars  sum_useful  sum_funny  sum_cool
0   2018-01-01        492     1602.0       458.0      138.0     217.0
1   2018-01-02        596     2140.0       666.0      152.0     287.0
2   2018-01-03        677     2381.0      1056.0      206.0     416.0
3   2018-01-04        716     2590.0       981.0      225.0     366.0
4   2018-01-05        828     3059.0       862.0      159.0     300.0
5   2018-01-06        842     3043.0       845.0      191.0     346.0
6   2018-01-07        828     2976.0      1007.0      249.0     403.0
7   2018-01-08        752     2744.0       898.0      202.0     351.0
8   2018-01-09        675     2353.0       897.0      223.0     295.0
9   2018-01-10        725     2670.0       959.0      193.0     304.0
10  2018-01-11        744     2703.0       948.0      174.0     279.0
11  2018-01-12        820     3059.0       920.0      164.0     289.0
12  2018-01-13        865     3114.0       914.0      176.0   

# Case Study 2: Python Data Replication

**Python Code:** Include the functions for reading JSON files, converting them to tabular data, and writing them to Parquet files.

**Performance Metrics:**
* Runtime of the function (in seconds).
* Memory consumption during execution (in MB).

**Parquet File Details:**
* Size of the generated Parquet file (in bytes).
* Structure/schema of the Parquet file, detailing columns and their data types.

**Data Comparison Metrics:**
* Summary of metrics comparing the original JSON data to the data in the Parquet file, including total row count and sum of relevant numeric fields (stars, useful, funny, cool) for both formats.

In [None]:
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Function to process a single chunk of data
def process_chunk(chunk, attempt=1, max_attempts=3):
    try:
        logging.info(f"Processing chunk (attempt {attempt})")

        # Convert to Polars DataFrame
        df_chunk = pl.from_arrow(chunk)

        # Filter out rows with nulls in the specified columns
        df_chunk = df_chunk.filter(
            (df_chunk['stars'].is_not_null()) &
            (df_chunk['useful'].is_not_null()) &
            (df_chunk['funny'].is_not_null()) &
            (df_chunk['cool'].is_not_null())
        )

        return df_chunk
    except Exception as e:
        logging.warning(f"Error processing chunk: {e}")
        if attempt < max_attempts:
            sleep_time = uniform(0.5, 1.5) * 2 ** attempt  # Exponential backoff
            logging.info(f"Retrying chunk after {sleep_time:.2f} seconds (attempt {attempt + 1})")
            sleep(sleep_time)
            return process_chunk(chunk, attempt + 1)
        else:
            logging.error(f"Failed to process chunk after {max_attempts} attempts")
            return None

# Function to convert JSON to Parquet with memory and performance optimization
def convert_json_to_parquet(json_file_path, parquet_file_path):
    start_time = time.time()

    # Track memory usage at the start
    process = psutil.Process(os.getpid())
    start_mem = process.memory_info().rss / (1024 * 1024)  # Memory in MB

    try:
        # Read JSON data using DuckDB's read_json and convert to Polars DataFrame
        json_data = duckdb.read_json(json_file_path)

        # Convert to Arrow batches for chunk processing
        chunk_size = 50_000  # Reduce chunk size to avoid memory overload
        arrow_batches = json_data.arrow().to_batches(chunk_size)

        polars_dfs = []  # To collect processed chunks

        # Use ThreadPoolExecutor to process chunks concurrently
        with ThreadPoolExecutor(max_workers=4) as executor:  # Limiting threads to reduce resource usage
            futures = {executor.submit(process_chunk, batch): batch for batch in arrow_batches}

            for future in as_completed(futures):
                result = future.result()  # Get processed chunk
                if result is not None:
                    polars_dfs.append(result)

        # Concatenate all processed chunks into a single Polars DataFrame
        if polars_dfs:
            df_polars = pl.concat(polars_dfs)
        else:
            logging.error("No valid data processed. Exiting.")
            return

    except Exception as e:
        logging.error(f"Error reading JSON: {e}")
        return

    # Write the Polars DataFrame to Parquet
    try:
        # Choose “zstd” for good compression performance. Choose “lz4” for fast compression/decompression.
        df_polars.write_parquet(parquet_file_path, compression="zstd")
        logging.info(f"Parquet file written to {parquet_file_path}")
    except Exception as e:
        logging.error(f"Error writing Parquet file: {e}")
        return

    # Performance Metrics
    runtime = time.time() - start_time  # Runtime in seconds
    end_mem = process.memory_info().rss / (1024 * 1024)  # Memory in MB
    memory_consumed = end_mem - start_mem  # Memory consumption during execution

    # Parquet file details
    parquet_size = os.path.getsize(parquet_file_path)  # Size of the Parquet file in bytes
    parquet_schema = df_polars.schema  # Schema of the Parquet file (column names and data types)

    # Data Comparison Metrics
    total_rows_json = df_polars.height  # Number of rows in the Polars DataFrame (after conversion)
    sum_json_stars = df_polars['stars'].sum()
    sum_json_useful = df_polars['useful'].sum()
    sum_json_funny = df_polars['funny'].sum()
    sum_json_cool = df_polars['cool'].sum()

    # Display results
    logging.info(f"Runtime: {runtime:.2f} seconds")
    logging.info(f"Memory consumption: {memory_consumed:.2f} MB")
    logging.info(f"Parquet file size: {parquet_size} bytes")
    logging.info(f"Parquet schema: {parquet_schema}")
    logging.info(f"Total rows in JSON data: {total_rows_json}")
    logging.info(f"Sum of 'stars' in JSON data: {sum_json_stars}")
    logging.info(f"Sum of 'useful' in JSON data: {sum_json_useful}")
    logging.info(f"Sum of 'funny' in JSON data: {sum_json_funny}")
    logging.info(f"Sum of 'cool' in JSON data: {sum_json_cool}")

    return {
        "runtime_seconds": runtime,
        "memory_consumption_mb": memory_consumed,
        "parquet_file_size_bytes": parquet_size,
        "parquet_schema": parquet_schema,
        "data_comparison": {
            "total_rows_json": total_rows_json,
            "sum_json_stars": sum_json_stars,
            "sum_json_useful": sum_json_useful,
            "sum_json_funny": sum_json_funny,
            "sum_json_cool": sum_json_cool
        }
    }

In [None]:
# Paths
json_file_path = "/content/drive/MyDrive/Dealls Technical Test/yelp_academic_dataset_review.json"
parquet_file_path = "/content/drive/MyDrive/Dealls Technical Test/yelp_academic_dataset_review.parquet"

# Execute the function and retrieve metrics
convert_json_to_parquet(json_file_path, parquet_file_path)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

{'runtime_seconds': 116.58408403396606,
 'memory_consumption_mb': 6752.97265625,
 'parquet_file_size_bytes': 1863534888,
 'parquet_schema': Schema([('review_id', String),
         ('user_id', String),
         ('business_id', String),
         ('stars', Float64),
         ('useful', Int64),
         ('funny', Int64),
         ('cool', Int64),
         ('text', String),
         ('date', Datetime(time_unit='us', time_zone=None))]),
 'data_comparison': {'total_rows_json': 6990280,
  'sum_json_stars': 26203650.0,
  'sum_json_useful': 8280748,
  'sum_json_funny': 2282743,
  'sum_json_cool': 3485476}}

# Case Study 3: Python Data Export

In [None]:
# Function to load data from JSON into DuckDB using read_json_auto with error handling
def load_json_to_duckdb(json_file_path):
    # Create an in-memory DuckDB connection
    con = duckdb.connect()

    # Try to load the JSON file directly into DuckDB using read_json_auto, handle malformed or missing data
    try:
        con.execute(f"""
        CREATE TABLE reviews AS
        SELECT *
        FROM read_json_auto('{json_file_path}')
        """)
    except Exception as e:
        print(f"Error loading JSON: {e}")
        return None

    # Return the connection if loading is successful
    return con

# Function to clean and validate data (handling missing or malformed values)
def clean_data(con):
    # Ensure the data types are correct, replace nulls with 0 for numeric fields
    try:
        con.execute("""
        UPDATE reviews
        SET stars = COALESCE(CAST(stars AS DOUBLE), 0),
            useful = COALESCE(CAST(useful AS INTEGER), 0),
            funny = COALESCE(CAST(funny AS INTEGER), 0),
            cool = COALESCE(CAST(cool AS INTEGER), 0)
        """)
    except Exception as e:
        print(f"Error cleaning data: {e}")

# Function to aggregate data for a specific business_id with additional checks
def aggregate_data_for_business(con, business_id):
    # Query to get aggregate data for the specified business_id
    query = f"""
    SELECT
        COUNT(*) as total_rows,
        AVG(stars) as avg_stars,
        AVG(useful) as avg_useful,
        AVG(funny) as avg_funny,
        AVG(cool) as avg_cool
    FROM reviews
    WHERE business_id = '{business_id}'
    """

    try:
        # Execute the query and fetch results
        result = con.execute(query).fetchdf()
    except Exception as e:
        print(f"Error during aggregation: {e}")
        return None

    # Return the result if no error occurred
    return result

# Function to authenticate and connect to Google Sheets
def connect_to_google_sheets(cred_file, sheet_name):
    # Define the scope for the Google Sheets API
    scope = ["https://spreadsheets.google.com/feeds", "https://www.googleapis.com/auth/spreadsheets",
             "https://www.googleapis.com/auth/drive.file", "https://www.googleapis.com/auth/drive"]

    # Authenticate using the service account key file
    creds = ServiceAccountCredentials.from_json_keyfile_name(cred_file, scope)
    client = gspread.authorize(creds)

    # Open the Google Sheet
    sheet = client.open(sheet_name).sheet1  # Assuming you are using the first sheet
    return sheet

# Function to store the results in Google Sheets with proper formatting
def store_results_in_google_sheets(data, sheet):
    if data is None or data.empty:
        print("No data to store in Google Sheets")
        return

    # Clear the existing data in the sheet
    sheet.clear()

    # Add headers
    headers = list(data.columns)
    sheet.append_row(headers)

    # Add the aggregated data (only one row in this case)
    values = data.iloc[0].tolist()
    sheet.append_row(values)

In [None]:
# Main process
def main(json_file_path, business_id, cred_file, sheet_name):
    # Load data into DuckDB
    con = load_json_to_duckdb(json_file_path)
    if con is None:
        print("Failed to load JSON data")
        return

    # Clean the data (handle missing or malformed fields)
    clean_data(con)

    # Aggregate data for the specific business_id
    aggregated_data = aggregate_data_for_business(con, business_id)
    if aggregated_data is None:
        print("Failed to aggregate data")
        return

    # Connect to Google Sheets
    sheet = connect_to_google_sheets(cred_file, sheet_name)

    # Store the aggregated results in Google Sheets
    store_results_in_google_sheets(aggregated_data, sheet)

    print("Data has been successfully stored in Google Sheets!")

In [None]:
# Specify the paths and parameters
cred_file = "/content/drive/MyDrive/Dealls Technical Test/alfianhid-2f8bbfb208ab.json"  # Path to your Google service account key
sheet_name = "Aggregated Business Data"
business_id = '7ATYjTIgM3jUlt4UM3IypQ'

# Run the main process
main(json_file_path, business_id, cred_file, sheet_name)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Data has been successfully stored in Google Sheets!
