In [280]:
from google.cloud import documentai_v1 as documentai
from google.api_core.client_options import ClientOptions
from google.cloud import storage
import pandas as pd
import time
from pathlib import Path
import json

# === CONFIG ===
project_id = "vercillopersonal"
location = "us"
processor_id = "fe61eee8945a8018"

# === INPUT/OUTPUT PATHS ===
gcs_input_uri = "gs://vercillo_projects/transactions/amex/2023/2023-03-03.pdf"
pdf_filename = Path(gcs_input_uri).name               
pdf_prefix = pdf_filename.replace(".pdf", "")        
gcs_output_uri = "gs://vercillo_projects/transactions/amex/exports/"

# === Setup Client ===
client = documentai.DocumentProcessorServiceClient(
    client_options=ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com")
)
name = f"projects/{project_id}/locations/{location}/processors/{processor_id}"

# === GCS input/output config ===
input_config = documentai.BatchDocumentsInputConfig(
    gcs_documents=documentai.GcsDocuments(
        documents=[documentai.GcsDocument(gcs_uri=gcs_input_uri, mime_type="application/pdf")]
    )
)

output_config = documentai.DocumentOutputConfig(
    gcs_output_config=documentai.DocumentOutputConfig.GcsOutputConfig(
        gcs_uri=gcs_output_uri
    )
)

# === Submit batch process ===
request = documentai.BatchProcessRequest(
    name=name,
    input_documents=input_config,
    document_output_config=output_config
)

operation = client.batch_process_documents(request)

print("Waiting for operation to finish...")
operation.result(timeout=300)

print("Document AI processing complete.")

Waiting for operation to finish...
Document AI processing complete.


In [281]:
import json

# === Locate the first JSON file in output path ===
storage_client = storage.Client()
output_bucket_name = gcs_output_uri.replace("gs://", "").split("/")[0]
output_prefix = "/".join(gcs_output_uri.replace("gs://", "").split("/")[1:])

blobs = list(storage_client.list_blobs(output_bucket_name, prefix=output_prefix))
json_blobs = [b for b in blobs if b.name.endswith(".json")]

if not json_blobs:
    raise ValueError("No JSON output found. Wait a few more seconds or check if the processor ran successfully.")

# Sort and take the most recent JSON (usually only one)
json_blobs = sorted(json_blobs, key=lambda b: b.updated, reverse=True)
output_blob = json_blobs[0]

# Download and parse
json_str = output_blob.download_as_text()
doc = json.loads(json_str)


In [282]:
full_text = doc.get("text", "")
pages = doc.get("pages", [])
rows = []

def find_y_from_tokens(start_idx):
    for page in pages:
        for token in page.get("tokens", []):
            segs = token["layout"]["textAnchor"].get("textSegments", [])
            if segs:
                token_start = int(segs[0].get("startIndex", -1))
                if token_start == start_idx:
                    return round(token["layout"]["boundingPoly"]["normalizedVertices"][0]["y"], 4), page["pageNumber"]
    return None, None

for entity in doc.get("entities", []):
    type_ = entity.get("type")
    value = entity.get("mentionText")
    confidence = round(entity.get("confidence", 0), 2)

    text_segments = entity.get("textAnchor", {}).get("textSegments", [{}])
    start_index = int(text_segments[0].get("startIndex", -1))
    end_index = int(text_segments[0].get("endIndex", -1))

    # Get Y and page by matching entity start index to token
    y_pos, page = find_y_from_tokens(start_index)

    rows.append({
        "type": type_,
        "value": value,
        "confidence": confidence,
        "page": page,
        "start_index": start_index,
        "end_index": end_index,
        "y_position": y_pos
    })

df = pd.DataFrame(rows)
print(df.info())


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 202 entries, 0 to 201
Data columns (total 7 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   type         202 non-null    object 
 1   value        202 non-null    object 
 2   confidence   202 non-null    float64
 3   page         202 non-null    int64  
 4   start_index  202 non-null    int64  
 5   end_index    202 non-null    int64  
 6   y_position   202 non-null    float64
dtypes: float64(2), int64(3), object(2)
memory usage: 11.2+ KB
None


In [283]:
import re

date_regex = re.compile(r"^[A-Za-z]{3,9} \d{1,2}$")   # e.g. Dec 4
amount_regex = re.compile(r"-?\$?[\d,]+\.\d{2}$")     # e.g. -2,481.67

payment_rows = []

# Filter for payment entities
df_payment_entities = df[df["type"] == "payment"].sort_values(by="start_index").reset_index(drop=True)

for _, row in df_payment_entities.iterrows():
    lines = row["value"].splitlines()
    lines = [line.strip() for line in lines if line.strip()]

    # === Extract amount
    amount = None
    if lines and amount_regex.match(lines[-1]):
        amount = lines[-1].replace(",", "").replace("$", "")
        lines = lines[:-1]

    # === Extract dates
    dates = [line for line in lines if date_regex.match(line)]
    transaction_date = dates[0] if len(dates) > 0 else None
    posting_date = dates[1] if len(dates) > 1 else transaction_date
    vendor_lines = [line for line in lines if line not in dates]

    # === Clean vendor
    vendor_clean = " | ".join(vendor_lines).strip()
    if "payment received" in vendor_clean.lower():
        vendor_clean = "PAYMENT RECEIVED"

    payment_rows.append({
        "transaction_date": transaction_date,
        "posting_date": posting_date,
        "Vendor": vendor_clean,
        "amount": amount,
        "location": None,
    })

df_payment_rows = pd.DataFrame(payment_rows)
df_payment_rows = df_payment_rows[df_payment_rows["transaction_date"].notnull()].reset_index(drop=True)


In [284]:
# === Clean up all JSONs in the exports folder (after processing) ===
export_prefix = "transactions/amex/exports/"

for blob in storage_client.list_blobs("vercillo_projects", prefix=export_prefix):
    if blob.name.endswith(".json"):
        print(f"Deleting {blob.name}")
        blob.delete()

Deleting transactions/amex/exports/2346962065558724734/0/2023-03-03-0.json


In [285]:
df_payment_rows 

Unnamed: 0,transaction_date,posting_date,Vendor,amount,location
0,Feb 10,Feb 10,PAYMENT RECEIVED,-1500.0,
1,Feb 22,Feb 22,PAYMENT RECEIVED,-3644.1,
2,Feb 22,Feb 22,Use Points for Purchases | Reference S00023053...,-205.81,


In [286]:
df.head(10)

Unnamed: 0,type,value,confidence,page,start_index,end_index,y_position
0,Vendor,HOMESENSE 089,0.97,2,2332,2345,0.4056
1,Vendor,MOBIL@ - 4361 0324,0.99,2,2365,2383,0.4228
2,Vendor,THE SYMPOSIUM CAFE 2084,1.0,2,2404,2427,0.4413
3,Vendor,ΑΜΖΝ ΜΚΤР СА*Z85480UV3,0.99,2,2453,2475,0.4598
4,Vendor,AMZN MKTP CA*UO2JQ9U03,1.0,2,2508,2530,0.4788
5,Vendor,TICKETMASTER CANADA HOS,0.98,2,2569,2592,0.4977
6,Vendor,DOORDASH*OSMOWS,0.99,2,2636,2651,0.5162
7,Vendor,EUREST-EUREST-TJX-23102,1.0,2,2671,2694,0.5347
8,Vendor,EUREST-EUREST-TJX-23102,1.0,2,2719,2742,0.5524
9,Vendor,AMZ*MOSWAG,1.0,2,2779,2789,0.5717


In [287]:
target_types = [
    "Vendor", "amount", "location", "payment",
    "posting_date", "transaction_date"
]

# Split: dedup these
df_dedup_target = df[df["type"].isin(target_types)].copy()

# Keep all other types untouched
df_other = df[~df["type"].isin(target_types)].copy()

# Only filter low-confidence 'location' values
mask_location = df_dedup_target["type"] == "location"
df_dedup_target = df_dedup_target[~mask_location | (df_dedup_target["confidence"] >= 0.90)]

# Deduplicate by type + page + y_position
df_dedup_target = df_dedup_target.drop_duplicates(subset=["type", "page", "y_position"])

# Combine both
df_cleaned = pd.concat([df_dedup_target, df_other], ignore_index=True)

df_cleaned


Unnamed: 0,type,value,confidence,page,start_index,end_index,y_position
0,Vendor,HOMESENSE 089,0.97,2,2332,2345,0.4056
1,Vendor,MOBIL@ - 4361 0324,0.99,2,2365,2383,0.4228
2,Vendor,THE SYMPOSIUM CAFE 2084,1.00,2,2404,2427,0.4413
3,Vendor,ΑΜΖΝ ΜΚΤР СА*Z85480UV3,0.99,2,2453,2475,0.4598
4,Vendor,AMZN MKTP CA*UO2JQ9U03,1.00,2,2508,2530,0.4788
...,...,...,...,...,...,...,...
196,transaction_date,Feb 24,1.00,3,4561,4567,0.8128
197,closing_date,"Mar 03, 2023",0.97,1,631,643,0.1615
198,opening_date,"Feb 04, 2023",0.99,1,618,630,0.1611
199,points_earned,2248,0.99,6,12254,12259,0.2705


In [288]:
anchor_types = ["transaction_date", "posting_date", "Vendor", "amount", "location"]

# Step 1: Filter just the rows of interest
df_anchor = df_cleaned[df_cleaned["type"].isin(anchor_types)].copy()

anchored_parts = []

for type_ in anchor_types:
    df_type = df_cleaned[df_cleaned["type"] == type_].copy()
    df_type = df_type.sort_values(by="start_index").reset_index(drop=True)
    df_type["row_id"] = range(len(df_type))
    anchored_parts.append(df_type[["type", "value", "start_index", "row_id"]])

df_anchor = pd.concat(anchored_parts).sort_values(by=["row_id", "type"]).reset_index(drop=True)

df_anchor.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 194 entries, 0 to 193
Data columns (total 4 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   type         194 non-null    object
 1   value        194 non-null    object
 2   start_index  194 non-null    int64 
 3   row_id       194 non-null    int64 
dtypes: int64(2), object(2)
memory usage: 6.2+ KB


In [289]:
# Rebuild wide-format table
df_ouput = df_anchor.pivot_table(
    index="row_id", columns="type", values="value", aggfunc="first"
).reset_index()

# Preview final result
df_ouput.head()


type,row_id,Vendor,amount,location,posting_date,transaction_date
0,0,HOMESENSE 089,278.5,GUELPH,Feb 5,Feb 4
1,1,MOBIL@ - 4361 0324,52.62,TORONTO,Feb 5,Feb 4
2,2,THE SYMPOSIUM CAFE 2084,55.1,GUELPH,Feb 5,Feb 4
3,3,ΑΜΖΝ ΜΚΤР СА*Z85480UV3,29.36,WWW.AMAZON.CA,Feb 7,Feb 6
4,4,AMZN MKTP CA*UO2JQ9U03,40.66,WWW.AMAZON.CA,Feb 7,Feb 7


In [290]:
# Create a helper function to extract single-value fields
def extract_single_value(df, field_name):
    matches = df[df["type"] == field_name]["value"]
    return matches.iloc[0] if not matches.empty else None

closing_date = extract_single_value(df_cleaned, "closing_date")
opening_date = extract_single_value(df_cleaned, "opening_date")

df_ouput["closing_date"] = closing_date
df_ouput["opening_date"] = opening_date

df_ouput.head()

type,row_id,Vendor,amount,location,posting_date,transaction_date,closing_date,opening_date
0,0,HOMESENSE 089,278.5,GUELPH,Feb 5,Feb 4,"Mar 03, 2023","Feb 04, 2023"
1,1,MOBIL@ - 4361 0324,52.62,TORONTO,Feb 5,Feb 4,"Mar 03, 2023","Feb 04, 2023"
2,2,THE SYMPOSIUM CAFE 2084,55.1,GUELPH,Feb 5,Feb 4,"Mar 03, 2023","Feb 04, 2023"
3,3,ΑΜΖΝ ΜΚΤР СА*Z85480UV3,29.36,WWW.AMAZON.CA,Feb 7,Feb 6,"Mar 03, 2023","Feb 04, 2023"
4,4,AMZN MKTP CA*UO2JQ9U03,40.66,WWW.AMAZON.CA,Feb 7,Feb 7,"Mar 03, 2023","Feb 04, 2023"


In [291]:
points_earned = extract_single_value(df_cleaned, "points_earned")
points_redeemed = extract_single_value(df_cleaned, "points_redeemed")

points_rows = pd.DataFrame([
    {
        "row_id": df_ouput["row_id"].max() + 1,
        "Vendor": "points_earned",
        "amount": points_earned,
        "location": None,
        "posting_date": None,
        "transaction_date": None,
        "closing_date": closing_date,
        "opening_date": opening_date
    },
    {
        "row_id": df_ouput["row_id"].max() + 2,
        "Vendor": "points_redeemed",
        "amount": points_redeemed,
        "location": None,
        "posting_date": None,
        "transaction_date": None,
        "closing_date": closing_date,
        "opening_date": opening_date
    }
])

df_ouput = pd.concat([df_ouput, points_rows], ignore_index=True)

In [292]:
# Add row_id and meta fields before final export
base_row_id = df_ouput["row_id"].max() + 1
df_payment_rows["row_id"] = range(base_row_id, base_row_id + len(df_payment_rows))
df_payment_rows["closing_date"] = closing_date
df_payment_rows["opening_date"] = opening_date

df_ouput = pd.concat([df_ouput, df_payment_rows], ignore_index=True)

In [293]:
df_ouput.head()

Unnamed: 0,row_id,Vendor,amount,location,posting_date,transaction_date,closing_date,opening_date
0,0,HOMESENSE 089,278.5,GUELPH,Feb 5,Feb 4,"Mar 03, 2023","Feb 04, 2023"
1,1,MOBIL@ - 4361 0324,52.62,TORONTO,Feb 5,Feb 4,"Mar 03, 2023","Feb 04, 2023"
2,2,THE SYMPOSIUM CAFE 2084,55.1,GUELPH,Feb 5,Feb 4,"Mar 03, 2023","Feb 04, 2023"
3,3,ΑΜΖΝ ΜΚΤР СА*Z85480UV3,29.36,WWW.AMAZON.CA,Feb 7,Feb 6,"Mar 03, 2023","Feb 04, 2023"
4,4,AMZN MKTP CA*UO2JQ9U03,40.66,WWW.AMAZON.CA,Feb 7,Feb 7,"Mar 03, 2023","Feb 04, 2023"


In [294]:
# Local export path (make sure this folder exists)
local_csv = rf"C:\Users\jverc\OneDrive\02.DataScienceOD\test_files\{pdf_prefix}_cleansed.csv"

# Save locally
df_ouput.to_csv(local_csv, index=False)
print(f"Exported CSV saved to: {local_csv}")

Exported CSV saved to: C:\Users\jverc\OneDrive\02.DataScienceOD\test_files\2023-03-03_cleansed.csv


In [295]:
from io import StringIO

# Define GCS export path
gcs_output_csv_path = f"transactions/amex/data/{pdf_prefix}_cleansed.csv"
bucket = storage_client.bucket("vercillo_projects")
blob = bucket.blob(gcs_output_csv_path)

# Convert DataFrame to CSV in memory
csv_buffer = StringIO()
df_ouput.to_csv(csv_buffer, index=False)
csv_buffer.seek(0)

# Upload to GCS
blob.upload_from_string(csv_buffer.getvalue(), content_type="text/csv")
print(f"✅ CSV uploaded to: gs://vercillo_projects/{gcs_output_csv_path}")

✅ CSV uploaded to: gs://vercillo_projects/transactions/amex/data/2023-03-03_cleansed.csv


In [296]:
# %% ✅ Data Quality Check — using df_cleaned
from io import StringIO

required_fields = ["Vendor", "amount", "location", "posting_date", "transaction_date"]
record_counts = {field: df_cleaned[df_cleaned["type"] == field].shape[0] for field in required_fields}

print("\n🔍 Data Quality Check (based on df_cleaned entity counts):")
for field, count in record_counts.items():
    print(f"{field}: {count}")

# Only upload if mismatch is detected
if len(set(record_counts.values())) != 1:
    print("⚠️ Data quality issue detected: exporting issue file...")

    # Pivot df_cleaned to wide format for easier review
    df_quality_check = (
        df_cleaned[df_cleaned["type"].isin(required_fields)]
        .sort_values(by="start_index")
        .groupby("type")
        .apply(lambda g: g.reset_index(drop=True))
        .reset_index(drop=True)
        .pivot(columns="type", values="value")
    )

    # Prepare export path
    gcs_output_csv_path = f"transactions/amex/quality_checks/{pdf_prefix}_issues.csv"
    bucket = storage_client.bucket("vercillo_projects")
    blob = bucket.blob(gcs_output_csv_path)

    # Convert to CSV and upload
    csv_buffer = StringIO()
    df_quality_check.to_csv(csv_buffer, index=False)
    csv_buffer.seek(0)
    blob.upload_from_string(csv_buffer.getvalue(), content_type="text/csv")
    print(f"✅ Issue CSV uploaded to: gs://vercillo_projects/{gcs_output_csv_path}")
else:
    print("✅ Data quality passed — No issue file exported.")



🔍 Data Quality Check (based on df_cleaned entity counts):
Vendor: 39
amount: 39
location: 38
posting_date: 39
transaction_date: 39
⚠️ Data quality issue detected: exporting issue file...


  .apply(lambda g: g.reset_index(drop=True))


✅ Issue CSV uploaded to: gs://vercillo_projects/transactions/amex/quality_checks/2023-03-03_issues.csv
