In [None]:
import csv
import json


def make_json(csvFilePath, jsonFilePath):

    # create a dictionary
    data = {}

    # Open a csv reader called DictReader
    with open(csvFilePath, encoding="utf-8") as csvf:
        csvReader = csv.DictReader(csvf)

        # Convert each row into a dictionary
        # and add it to data
        for rows in csvReader:

            key = rows["metadataid"]
            data[key] = rows

    # Open a json writer, and use the json.dumps()
    # function to dump data
    with open(jsonFilePath, "w", encoding="utf-8") as jsonf:
        jsonf.write(json.dumps(data, indent=4))


# Driver Code

# Decide the two file paths according to your
# computer system
csvFilePath = r"./metadata_category_clothing_shoes_and_jewelry_only.csv"
jsonFilePath = r"./metadata.json"

# Call the make_json function
make_json(csvFilePath, jsonFilePath)

In [13]:
import csv
import json
import ast  # To safely convert string representations of dictionaries/lists


def convert_value(value):
    """Try to convert a string into a dictionary, list, number, or keep it as a string."""
    value = value.strip()  # Remove leading/trailing spaces

    # Skip conversion if the value is empty
    if not value:
        return value

    # Try JSON parsing first
    try:
        return json.loads(value)  # Works for dicts, lists, numbers, booleans, and null
    except (json.JSONDecodeError, TypeError):
        pass  # If it fails, move to the next step

    # Try converting to a number (int or float)
    if value.replace(".", "", 1).isdigit():
        return float(value) if "." in value else int(value)

    return value  # Keep as a string if nothing works


def make_json(csvFilePath, jsonFilePath):
    """Convert a CSV file to JSON, ensuring proper data types."""
    data = []

    with open(csvFilePath, encoding="utf-8") as csvf:
        csvReader = csv.DictReader(csvf)

        for row in csvReader:
            # Convert all columns dynamically
            row = {key: convert_value(value) for key, value in row.items()}
            data.append(row)

    with open(jsonFilePath, "w", encoding="utf-8") as jsonf:
        json.dump(data, jsonf, indent=4)


# Paths
csvFilePath = r"./reviews_Clothing_Shoes_and_Jewelry_5.csv"
jsonFilePath = r"./reviews.json"

# Convert
make_json(csvFilePath, jsonFilePath)

In [None]:
import csv
import psycopg2
import io
import json
from datetime import datetime
from itertools import islice

# Database connection settings
DB_STAGE1 = {
    "dbname": "your_db",
    "user": "your_user",
    "password": "your_password",
    "host": "your_host",
    "port": "your_port",
}

# Max lengths for columns (based on your table definition)
MAX_LENGTHS = {
    "R_Reviewer_Source_Key": 14,
    "R_Product_Key": 10,
    "R_Reviewer_Name": 100,
    "R_Review_Text": 225,
    "R_Review_Title": 150,
    "R_Review_DateTime": 19,  # Timestamp format length (yyyy-mm-dd hh:mm:ss)
}


def ingest_reviews(csvFilePath):
    chunk_size = 1000  # Process in chunks of 1000
    chunk_count = 0  # Track chunks

    failed_rows = []  # To collect failed rows for logging to JSON

    with open(csvFilePath, encoding="utf-8") as csvf, psycopg2.connect(
        **DB_STAGE1
    ) as conn:

        cursor = conn.cursor()
        csvReader = csv.DictReader(csvf)

        while True:
            chunk = list(islice(csvReader, chunk_size))  # Read 1000 rows at a time
            if not chunk:
                break  # Stop when no more data

            transformed_rows = []
            for row in chunk:
                try:
                    # Extract and transform data
                    reviewer_id = str(row["reviewerID"])
                    product_key = str(row["asin"])
                    reviewer_name = (
                        str(row["reviewerName"])
                        if row["reviewerName"]
                        else "*Unknown Username"
                    )

                    # Validate string lengths
                    if len(reviewer_id) > MAX_LENGTHS["R_Reviewer_Source_Key"]:
                        failed_rows.append(
                            {"row": row, "error": "reviewer_id too long"}
                        )
                        continue
                    if len(product_key) > MAX_LENGTHS["R_Product_Key"]:
                        failed_rows.append(
                            {"row": row, "error": "product_key too long"}
                        )
                        continue
                    if len(reviewer_name) > MAX_LENGTHS["R_Reviewer_Name"]:
                        failed_rows.append(
                            {"row": row, "error": "reviewer_name too long"}
                        )
                        continue

                    review_text = (
                        str(row["reviewText"])
                        if row["reviewText"]
                        else "*Unknown review text"
                    )
                    if len(review_text) > MAX_LENGTHS["R_Review_Text"]:
                        failed_rows.append(
                            {"row": row, "error": "review_text too long"}
                        )
                        continue

                    review_title = (
                        str(row["summary"])
                        if row["summary"]
                        else "*Unknown review title"
                    )
                    if len(review_title) > MAX_LENGTHS["R_Review_Title"]:
                        failed_rows.append(
                            {"row": row, "error": "review_title too long"}
                        )
                        continue

                    # Handle review score
                    try:
                        review_score = float(row["overall"])
                    except:
                        review_score = 0.0

                    # Handle helpfulness rating
                    try:
                        rating_array = convert_value(row["helpful"])
                        helpfullness_rating = float(
                            round(rating_array[0] / rating_array[1], 2)
                        )
                    except:
                        helpfullness_rating = 0.0

                    # Handle review date
                    try:
                        review_datetime = datetime.utcfromtimestamp(
                            int(row["unixReviewTime"])
                        )
                    except:
                        review_datetime = "1900-01-01 00:00:00"

                    # Add the transformed row
                    transformed_rows.append(
                        [
                            reviewer_id,
                            product_key,
                            reviewer_name,
                            helpfullness_rating,
                            review_text,
                            review_score,
                            review_title,
                            review_datetime,
                        ]
                    )

                except Exception as e:
                    failed_rows.append({"row": row, "error": str(e)})
                    continue  # Skip this row and continue with others

            if transformed_rows:
                # Convert to CSV format (in-memory) for COPY
                output = io.StringIO()
                for row in transformed_rows:
                    output.write("\t".join(map(str, row)) + "\n")  # Tab-separated
                output.seek(0)

                try:
                    cursor.copy_from(
                        output,
                        "S1_Review",
                        sep="\t",
                        columns=[
                            "R_Reviewer_Source_Key",
                            "R_Product_Key",
                            "R_Reviewer_Name",
                            "R_Helpfulness_Rating",
                            "R_Review_Text",
                            "R_Review_Score",
                            "R_Review_Title",
                            "R_Review_DateTime",
                        ],
                    )
                    conn.commit()
                    chunk_count += 1
                    print(
                        f"Loaded chunk {chunk_count} ({len(transformed_rows)} records)"
                    )
                except Exception as e:
                    failed_rows.append({"chunk_error": str(e)})

    # Log the failed rows into a JSON file after processing
    if failed_rows:
        with open("failed_rows.json", "w", encoding="utf-8") as jsonf:
            json.dump(failed_rows, jsonf, indent=4)
            print(f"Logged {len(failed_rows)} failed rows to 'failed_rows.json'")

    print("Data ingestion complete.")


# Run the ingestion
csvFilePath = r"./metadata_category_clothing_shoes_and_jewelry_only.csv"
ingest_reviews(csvFilePath)

In [None]:
print(0 / 0)

ZeroDivisionError: division by zero