In [1]:
from ollama import chat
import time
import os
import re
import pandas as pd
from dotenv import load_dotenv
from collections import defaultdict
import csv
from datetime import datetime

In [2]:
load_dotenv()
the_wizard = os.getenv("WIZARD")
the_spell = os.getenv("LEVIOSA")

In [3]:
def log_performance(start_time, end_time, image_count, log_csv='performance_log.csv'):
    duration = end_time - start_time
    log_data = {
        'date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        'image_count': image_count,
        'duration_seconds': round(duration, 2)
    }

    file_exists = os.path.isfile(log_csv)
    with open(log_csv, 'a', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=log_data.keys())
        if not file_exists:
            writer.writeheader()
        writer.writerow(log_data)

In [12]:
def load_image_paths(csv_path):
    return pd.read_csv(csv_path)['image_path'].tolist()

In [5]:
def load_processed_paths(processed_csv):
    if os.path.exists(processed_csv):
        return set(pd.read_csv(processed_csv)['path'].tolist())
    return set()

In [6]:
def save_processed_paths(processed_csv, new_paths):
    df = pd.DataFrame({'path': list(new_paths)})
    if os.path.exists(processed_csv):
        existing = pd.read_csv(processed_csv)
        df = pd.concat([existing, df], ignore_index=True).drop_duplicates()
    df.to_csv(processed_csv, index=False)

In [24]:
def extract_data_from_images(image_paths, wizard, spell, content_response):
    for path in image_paths:
        parts = os.path.basename(path).split('_')
        statistic = '_'.join(parts[:-1])
        country = parts[-1].replace('.png', '')

        response = chat(
            model=wizard,
            messages=[
                {
                    'role': 'user',
                    'content': spell,
                    'images': [path],
                }
            ],
        )

        if country not in content_response:
            content_response[country] = {}
        content_response[country][statistic] = response.message.content
    print(content_response)

In [8]:
def clean_ollama_response(text):
    # Step 1: Remove all asterisks
    text = text.replace('*', '')
    
    # Step 2: Remove all newline characters
    text = text.replace('\n', '')
    
    # Step 3: Remove everything before and including the first colon
    colon_index = text.find(':')
    if colon_index != -1:
        text = text[colon_index + 1:]
    
    # Optional: Strip leading/trailing whitespace
    return text.strip()

In [9]:
def build_statistic_dataframes(data):
    """
    Builds a dictionary of DataFrames, one per statistic.
    Each DataFrame has years as index and countries as columns.
    Missing values are filled with NaN.
    """
    stats_data = defaultdict(lambda: defaultdict(dict))  # {stat: {year: {country: value}}}
    all_countries = set(data.keys())  # Track all countries from input

    for country, stats in data.items():
        for stat_name, stat_text in stats.items():
            cleaned = clean_ollama_response(stat_text)
            matches = re.findall(r"(\d{4}):\s*([\d.]+)", cleaned)
            for year_str, value_str in matches:
                try:
                    year = int(year_str)
                    value = float(value_str)
                    stats_data[stat_name][year][country] = value
                except ValueError:
                    continue  # Skip malformed entries

    stat_dfs = {}
    for stat_name, year_country_values in stats_data.items():
        df = pd.DataFrame.from_dict(year_country_values, orient='index').sort_index()
        df = df.reindex(columns=all_countries)  # Ensure all countries are present
        stat_dfs[stat_name] = df

    return stat_dfs

In [35]:
def export_statistic_dataframes_to_parquet(stat_dfs, output_dir):
    os.makedirs(output_dir, exist_ok=True)

    for name, new_df in stat_dfs.items():
        output_path = os.path.join(output_dir, f"{name}.parquet")

        if os.path.exists(output_path):
            existing_df = pd.read_parquet(output_path)

            # Merge on index (years) and columns (countries)
            combined_df = existing_df.combine_first(new_df)
        else:
            combined_df = new_df

        combined_df.to_parquet(output_path, index=True)
        print(f"✅ Merged: {output_path}")

In [18]:
def run_extraction_pipeline(
    input_csv,
    processed_csv,
    model=the_wizard,
    spell=the_spell,
    batch_size=4,
    output_dir='parquet_exports'
):
    start_time = time.time()
    
    # Step 1: Load paths
    all_paths = load_image_paths(input_csv)
    processed_paths = load_processed_paths(processed_csv)
    remaining_paths = [p for p in all_paths if p not in processed_paths]

    # Step 2: Select batch
    batch_paths = remaining_paths[:batch_size]
    print(f"Processing {len(batch_paths)} images...")

    # Step 3: Extract data
    content_response = {}
    extract_data_from_images(batch_paths, model, spell, content_response)

    # Step 4: Save processed paths
    save_processed_paths(processed_csv, batch_paths)

    # Step 5: Build DataFrames and export
    stat_dfs = build_statistic_dataframes(content_response)
    export_statistic_dataframes_to_parquet(stat_dfs, output_dir=output_dir)

    end_time = time.time()
    log_performance(start_time, end_time, len(batch_paths))
    print(f"The process for {len(batch_paths)} images has finished.")

In [38]:
run_extraction_pipeline(
    input_csv='data/image_paths.csv',
    processed_csv='data/processed_paths.csv')

Processing 4 images...
{'Austria': {'labor_force': 'Claro, aquí tienes los datos extraídos de la imagen:\n\n*   **2016:** 4.57\n*   **2017:** 4.60\n*   **2018:** 4.64\n*   **2019:** 4.66\n*   **2020:** 4.64\n*   **2021:** 4.69\n*   **2022:** 4.76\n*   **2023:** 4.83', 'livestock_production_index': 'Aquí están los años y valores extraídos del gráfico:\n\n*   **2015:** 101.2\n*   **2016:** 101.4\n*   **2017:** 101.4\n*   **2018:** 101.5\n*   **2019:** 101.1\n*   **2020:** 100.9\n*   **2021:** 100.3\n*   **2022:** 100.4', 'Percent_agricultural_land': 'Aquí están los años y valores extraídos de la imagen:\n\n*   **2015:** 32.94\n*   **2016:** 32.36\n*   **2017:** 32.17\n*   **2018:** 32.15\n*   **2019:** 32.13\n*   **2020:** 31.54\n*   **2021:** 31.54\n*   **2022:** 31.48', 'Percent_urban_population': 'Claro, aquí están los años y valores del gráfico que proporcionaste:\n\n*   2016: 57.90\n*   2017: 58.09\n*   2018: 58.30\n*   2019: 58.51\n*   2020: 58.75\n*   2021: 58.99\n*   2022: 59.26\

In [19]:
def load_parquet_files(directory):
    dataframes = {}
    for filename in os.listdir(directory):
        if filename.endswith(".parquet"):
            path = os.path.join(directory, filename)
            df = pd.read_parquet(path)
            dataframes[filename] = df
    return dataframes

In [37]:
# Load all Parquet files from the output directory
parquet_data = load_parquet_files("parquet_exports")

# Inspect each DataFrame
for name, df in parquet_data.items():
    print(f"\n📄 File: {name}")
    print(df.head())


📄 File: labor_force.parquet
      Australia
2016      12.73
2017      13.01
2018      13.31
2019      13.59
2020      13.58

📄 File: livestock_production_index.parquet
      Australia
2015      102.4
2016       96.8
2017       91.0
2018       97.2
2019       97.7

📄 File: Percent_agricultural_land.parquet
      Australia
2015      45.31
2016      44.54
2017      48.34
2018      46.66
2019      47.12

📄 File: Percent_urban_population.parquet
      Australia
2016      85.80
2017      85.90
2018      86.01
2019      86.12
2020      86.24


In [39]:
# Load all Parquet files from the output directory
parquet_data = load_parquet_files("parquet_exports")

# Inspect each DataFrame
for name, df in parquet_data.items():
    print(f"\n📄 File: {name}")
    print(df.head())


📄 File: labor_force.parquet
      Australia  Austria
2016      12.73     4.57
2017      13.01     4.60
2018      13.31     4.64
2019      13.59     4.66
2020      13.58     4.64

📄 File: livestock_production_index.parquet
      Australia  Austria
2015      102.4    101.2
2016       96.8    101.4
2017       91.0    101.4
2018       97.2    101.5
2019       97.7    101.1

📄 File: Percent_agricultural_land.parquet
      Australia  Austria
2015      45.31    32.94
2016      44.54    32.36
2017      48.34    32.17
2018      46.66    32.15
2019      47.12    32.13

📄 File: Percent_urban_population.parquet
      Australia  Austria
2016      85.80    57.90
2017      85.90    58.09
2018      86.01    58.30
2019      86.12    58.51
2020      86.24    58.75


In [None]:
run_extraction_pipeline(
    input_csv='data/image_paths.csv',
    processed_csv='data/processed_paths.csv',
    batch_size=16
)