In [40]:
from dagster_acled.acled_request_config import * 
from dagster_acled.secrets_config import *
import os
import polars as pl

sm = SecretManager(region_name=os.environ['REGION_NAME'])
client = AcledClientConfig()
token = await client.get_oauth_manager().get_access_token()

username = client.get_oauth_manager().username
password = client.get_oauth_manager().password

def mprint(df: pl.DataFrame) -> None: 
    with pl.Config(tbl_rows=df.shape[0], tbl_cols=df.shape[1]): 
        print(df)

In [27]:
async def test_token(username: str, password: str, date: str):
    """Minimal test: get token, make API call."""
    
    # Get token
    data = aiohttp.FormData()
    data.add_field('username', username)
    data.add_field('password', password)
    data.add_field('grant_type', 'password')
    data.add_field('client_id', 'acled')
    
    async with aiohttp.ClientSession() as session:
        # Get token
        async with session.post('https://acleddata.com/oauth/token', data=data) as resp:
            token = (await resp.json())['access_token']
            print(f"Token: {token[:20]}...")
        
        headers = {'Authorization': f'Bearer {token}'}
        async with session.get(f'https://acleddata.com/api/acled/read?limit=5000&region=1&event_date={date}', headers=headers) as resp:
            data = await resp.json()
            return data

In [28]:
import asyncio
import polars as pl

data = await test_token(username, password, date = '2025-09-01')
df = pl.DataFrame(data['data'])

with pl.Config(tbl_rows=df.shape[0], tbl_cols=df.shape[1]): 
    print(pl.DataFrame(df.collect_schema()))

Token: eyJ0eXAiOiJKV1QiLCJh...
shape: (1, 31)
┌─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┐
│ eve ┆ eve ┆ yea ┆ tim ┆ dis ┆ eve ┆ sub ┆ act ┆ ass ┆ int ┆ act ┆ ass ┆ int ┆ int ┆ civ ┆ iso ┆ reg ┆ cou ┆ adm ┆ adm ┆ adm ┆ loc ┆ lat ┆ lon ┆ geo ┆ sou ┆ sou ┆ not ┆ fat ┆ tag ┆ tim │
│ nt_ ┆ nt_ ┆ r   ┆ e_p ┆ ord ┆ nt_ ┆ _ev ┆ or1 ┆ oc_ ┆ er1 ┆ or2 ┆ oc_ ┆ er2 ┆ era ┆ ili ┆ --- ┆ ion ┆ ntr ┆ in1 ┆ in2 ┆ in3 ┆ ati ┆ itu ┆ git ┆ _pr ┆ rce ┆ rce ┆ es  ┆ ali ┆ s   ┆ est │
│ id_ ┆ dat ┆ --- ┆ rec ┆ er_ ┆ typ ┆ ent ┆ --- ┆ act ┆ --- ┆ --- ┆ act ┆ --- ┆ cti ┆ an_ ┆ obj ┆ --- ┆ y   ┆ --- ┆ --- ┆ --- ┆ on  ┆ de  ┆ ude ┆ eci ┆ --- ┆ _sc ┆ --- ┆ tie ┆ --- ┆ amp │
│ cnt ┆ e   ┆ obj ┆ isi ┆ typ ┆ e   ┆ _ty ┆ obj ┆ or_ ┆ obj ┆ obj ┆ or_ ┆ obj ┆ on  ┆ tar ┆ ect ┆ obj ┆ --- ┆ obj ┆ obj ┆ obj ┆ --- ┆ --- ┆ --- ┆ sio ┆ obj ┆ ale ┆ obj ┆ s   ┆ obj ┆ --- │
│ y   ┆ --- ┆ 

In [38]:
actors = df.select('actor1').unique()

In [43]:
mprint(actors)

shape: (15, 1)
┌─────────────────────────────────┐
│ actor1                          │
│ ---                             │
│ str                             │
╞═════════════════════════════════╡
│ Military Forces of Nigeria (20… │
│ Military Forces of Burkina Fas… │
│ Military Forces of Mali (2021-… │
│ Dozo Communal Militia (Mali)    │
│ Sokoto Communal Militia (Niger… │
│ Protesters (Guinea)             │
│ Zamfara Communal Militia (Nige… │
│ Protesters (Senegal)            │
│ Protesters (Nigeria)            │
│ JNIM: Group for Support of Isl… │
│ Katsina Communal Militia (Nige… │
│ Rioters (Nigeria)               │
│ Protesters (Benin)              │
│ Unidentified Armed Group (Nige… │
│ Protesters (Mauritania)         │
└─────────────────────────────────┘


In [23]:
def _cast_to_proper_types(df: pl.DataFrame) -> pl.DataFrame:
        """
        Cast DataFrame columns to proper types before storing in S3.
        This ensures efficient storage and correct querying later.
        """
        return df.with_columns([
            pl.col("event_date").str.strptime(pl.Date, "%Y-%m-%d", strict=False),
            
            pl.col("year").cast(pl.Int16, strict=False),
            pl.col("time_precision").cast(pl.Int16, strict=False),
            pl.col("iso").cast(pl.Int16, strict=False),
            pl.col("geo_precision").cast(pl.Int16, strict=False),
            pl.col("fatalities").cast(pl.Int16, strict=False),
            
            pl.col("latitude").cast(pl.Float64, strict=False),
            pl.col("longitude").cast(pl.Float64, strict=False),
            pl.col("timestamp").cast(pl.Int64).cast(pl.Datetime("ms"), strict=False),
        ])

result = _cast_to_proper_types(df)
result.collect_schema()

Schema([('event_id_cnty', String),
        ('event_date', Date),
        ('year', Int16),
        ('time_precision', Int16),
        ('disorder_type', String),
        ('event_type', String),
        ('sub_event_type', String),
        ('actor1', String),
        ('assoc_actor_1', String),
        ('inter1', String),
        ('actor2', String),
        ('assoc_actor_2', String),
        ('inter2', String),
        ('interaction', String),
        ('civilian_targeting', String),
        ('iso', Int16),
        ('region', String),
        ('country', String),
        ('admin1', String),
        ('admin2', String),
        ('admin3', String),
        ('location', String),
        ('latitude', Float64),
        ('longitude', Float64),
        ('geo_precision', Int16),
        ('source', String),
        ('source_scale', String),
        ('notes', String),
        ('fatalities', Int16),
        ('tags', String),
        ('timestamp', Datetime(time_unit='ms', time_zone=None))])

In [61]:
import polars as pl
import json

# Get individual file paths from the glob pattern
test_Df = pl.scan_parquet(
        "s3://dagster-acled-bucket/acled/acled_daily_data/acled_daily_data/partition_2025-09-26.parquet"
    ).collect()

In [None]:


async def test_cookie_auth(username: str, password: str):
    """Minimal test: login with cookies, make API call."""
    
    async with aiohttp.ClientSession() as session:
        login_data = {
            "name": username,
            "pass": password
        }
        
        async with session.post(
            'https://acleddata.com/user/login?_format=json',
            json=login_data,
            headers={'Content-Type': 'application/json'}
        ) as login_resp:
            login_result = await login_resp.json()
            print(f"Login response: {login_result}")
            print(f"Cookies after login: {len(session.cookie_jar)} cookies")
        
        async with session.get(
            'https://acleddata.com/api/acled/read?limit=1&event_date=2024-12-31'
        ) as api_resp:
            data = await api_resp.json()
            print(f"API response status: {api_resp.status}")
            return data

result = await test_cookie_auth(username=username,password=password)
print(result)

Login response: {'current_user': {'uid': '59469', 'name': 's.a.bojilov@umail.leidenuniv.nl'}, 'csrf_token': '3_d7dxkGreXNTYwht65QrU3SZFXGkjF0m0OwW5temUY', 'logout_token': 'Vd3lt9wRVESord-vBM0uqWrkcoDD-BGI2UB6a6nhasQ'}
Cookies after login: 1 cookies
API response status: 200
{'status': 200, 'success': True, 'count': 0, 'total_count': 0, 'messages': [], 'data': [], 'filename': 'results.json', 'data_query_restrictions': {'countries': [], 'event_types': [], 'regions': [], 'history': [], 'recency': [], 'date_recency': {'quantity': 12, 'unit': 'Months', 'description': '12 Months old', 'timestamp': 1726586733, 'date': '2024-09-17'}}}


In [1]:
import duckdb
import boto3
# Get AWS credentials
session = boto3.Session()
credentials = session.get_credentials()

conn = duckdb.connect()
conn.execute("INSTALL httpfs;")
conn.execute("LOAD httpfs;")

# Set credentials from boto3
conn.execute(f"SET s3_access_key_id='{credentials.access_key}';")
conn.execute(f"SET s3_secret_access_key='{credentials.secret_key}';")
conn.execute("SET s3_region='eu-north-1';")
if credentials.token:
    conn.execute(f"SET s3_session_token='{credentials.token}';")

In [1]:
import polars as pl
import duckdb
from datetime import datetime, timedelta

# Setup DuckDB
conn = duckdb.connect()
conn.execute("INSTALL httpfs; LOAD httpfs;")
conn.execute("CALL load_aws_credentials();")
conn.execute("SET s3_region='eu-north-1';")

# Define dates
end_date = datetime.now().date()
start_date = end_date - timedelta(days=365)

# Convert to ISO format strings
start_date_str = start_date.isoformat()
end_date_str = end_date.isoformat()

result = pl.from_arrow(
    conn.execute("""
        SELECT 
            disorder_type,
            event_type,
            sub_event_type,
            actor1,
            actor2,
            inter1,
            inter2,
            interaction,
            admin1,
            admin2, 
            admin3,
            CAST(latitude AS DOUBLE) as latitude,
            CAST(longitude AS DOUBLE) as longitude,
            CAST(fatalities AS INTEGER) as fatalities,
            CAST(event_date AS DATE) as event_date
        FROM read_parquet('s3://dagster-acled-bucket/acled/acled_daily_data/partition_*.parquet')
        WHERE CAST(event_date AS DATE) >= CAST(? AS DATE)
            AND CAST(event_date AS DATE) <= CAST(? AS DATE)
            AND CAST(fatalities AS INTEGER) IS NOT NULL
            AND CAST(fatalities AS INTEGER) > 0
            AND CAST(latitude AS DOUBLE) IS NOT NULL 
            AND CAST(longitude AS DOUBLE) IS NOT NULL
        ORDER BY event_date
    """, [start_date_str, end_date_str]).arrow()
)

print(f"Loaded {len(result):,} records")
print(result.head())

Loaded 10,317 records
shape: (5, 15)
┌───────────┬───────────┬───────────┬───────────┬───┬──────────┬───────────┬───────────┬───────────┐
│ disorder_ ┆ event_typ ┆ sub_event ┆ actor1    ┆ … ┆ latitude ┆ longitude ┆ fatalitie ┆ event_dat │
│ type      ┆ e         ┆ _type     ┆ ---       ┆   ┆ ---      ┆ ---       ┆ s         ┆ e         │
│ ---       ┆ ---       ┆ ---       ┆ str       ┆   ┆ f64      ┆ f64       ┆ ---       ┆ ---       │
│ str       ┆ str       ┆ str       ┆           ┆   ┆          ┆           ┆ i32       ┆ date      │
╞═══════════╪═══════════╪═══════════╪═══════════╪═══╪══════════╪═══════════╪═══════════╪═══════════╡
│ Political ┆ Explosion ┆ Shelling/ ┆ Military  ┆ … ┆ 47.8479  ┆ 35.1602   ┆ 6         ┆ 2024-12-3 │
│ violence  ┆ s/Remote  ┆ artillery ┆ Forces of ┆   ┆          ┆           ┆           ┆ 0         │
│           ┆ violence  ┆ /missile  ┆ Ukraine   ┆   ┆          ┆           ┆           ┆           │
│           ┆           ┆ att…      ┆ (20…      ┆   ┆ 

In [44]:
result = (
    pl.scan_parquet(
        "s3://dagster-acled-bucket/acled/acled_daily_data/acled_africa/*",
    )
    .select([
        pl.all(), 
    ])
    .sort("event_type")
    .collect(engine="streaming") 
)

In [61]:
result.collect_schema()

Schema([('event_id_cnty', String),
        ('event_date', Date),
        ('year', Int16),
        ('time_precision', Int16),
        ('disorder_type', String),
        ('event_type', String),
        ('sub_event_type', String),
        ('actor1', String),
        ('assoc_actor_1', String),
        ('inter1', String),
        ('actor2', String),
        ('assoc_actor_2', String),
        ('inter2', String),
        ('interaction', String),
        ('civilian_targeting', String),
        ('iso', Int16),
        ('region', String),
        ('country', String),
        ('admin1', String),
        ('admin2', String),
        ('admin3', String),
        ('location', String),
        ('latitude', Float64),
        ('longitude', Float64),
        ('geo_precision', Int16),
        ('source', String),
        ('source_scale', String),
        ('notes', String),
        ('fatalities', Int16),
        ('tags', String),
        ('timestamp', Datetime(time_unit='ms', time_zone=None))])

In [57]:
actor_counts = result.select(pl.col('actor1'), pl.col('country')).group_by('country').n_unique().sort('actor1', descending=True)
actors = result.group_by('actor1').agg(pl.count().alias('event_count')).sort(by='event_count', descending=True)
mprint(actors)

shape: (1_149, 2)
┌─────────────────────────────────┬─────────────┐
│ actor1                          ┆ event_count │
│ ---                             ┆ ---         │
│ str                             ┆ u32         │
╞═════════════════════════════════╪═════════════╡
│ Protesters (Morocco)            ┆ 2220        │
│ Rapid Support Forces            ┆ 1784        │
│ Al Shabaab                      ┆ 1581        │
│ JNIM: Group for Support of Isl… ┆ 1561        │
│ Military Forces of Sudan (2019… ┆ 1408        │
│ Protesters (Kenya)              ┆ 1058        │
│ Protesters (South Africa)       ┆ 970         │
│ M23: March 23 Movement          ┆ 966         │
│ Rioters (Kenya)                 ┆ 959         │
│ Ambazonian Separatists (Camero… ┆ 698         │
│ Military Forces of Somalia (20… ┆ 668         │
│ Fano Youth Militia              ┆ 609         │
│ Military Forces of Ethiopia (2… ┆ 602         │
│ Boko Haram - Jamaatu Ahli is-S… ┆ 536         │
│ Military Forces of Mali (2021-

(Deprecated in version 0.20.5)
  actors = result.group_by('actor1').agg(pl.count().alias('event_count')).sort(by='event_count', descending=True)


In [70]:
# Get country info for each actor
country_info = (
    result
    .select(['actor1', 'assoc_actor_1', 'actor2', 'assoc_actor_2', 'country'])
    .melt(
        id_vars=['country'],
        value_vars=['actor1', 'assoc_actor_1', 'actor2', 'assoc_actor_2'],
        variable_name='actor_role',
        value_name='actor_name'
    )
    .filter(pl.col('actor_name').is_not_null())
    .group_by('actor_name')
    .agg([
        pl.col('country').n_unique().alias('n_countries'),
        pl.col('country').unique().sort().alias('countries')
    ])
)

# Get role breakdown
actors_detailed = (
    result
    .select(['actor1', 'assoc_actor_1', 'actor2', 'assoc_actor_2'])
    .melt(variable_name='actor_role', value_name='actor_name')
    .filter(pl.col('actor_name').is_not_null())
    .group_by(['actor_name', 'actor_role'])
    .agg(pl.count().alias('event_count'))
)

# Pivot role breakdown
actors_by_role = (
    actors_detailed
    .pivot(
        values='event_count',
        index='actor_name',
        columns='actor_role',
    )
    .with_columns(
        (
            pl.col('actor1').fill_null(0) +
            pl.col('assoc_actor_1').fill_null(0) +
            pl.col('actor2').fill_null(0) +
            pl.col('assoc_actor_2').fill_null(0)
        ).alias('total_events')
    )
)

# Combine everything
final_result = (
    actors_by_role
    .join(country_info, on='actor_name', how='left')
    .select([
        'actor_name',
        'total_events',
        'n_countries',
        'countries',
        'actor1',
        'assoc_actor_1',
        'actor2',
        'assoc_actor_2',
    ])
    .sort('total_events', descending=True)
)

mprint(final_result)

  result
  result
(Deprecated in version 0.20.5)
  .agg(pl.count().alias('event_count'))
  actors_detailed


shape: (4_930, 8)
┌─────────────┬─────────────┬─────────────┬────────────┬────────┬────────────┬────────┬────────────┐
│ actor_name  ┆ total_event ┆ n_countries ┆ countries  ┆ actor1 ┆ assoc_acto ┆ actor2 ┆ assoc_acto │
│ ---         ┆ s           ┆ ---         ┆ ---        ┆ ---    ┆ r_1        ┆ ---    ┆ r_2        │
│ str         ┆ ---         ┆ u32         ┆ list[str]  ┆ u32    ┆ ---        ┆ u32    ┆ ---        │
│             ┆ u32         ┆             ┆            ┆        ┆ u32        ┆        ┆ u32        │
╞═════════════╪═════════════╪═════════════╪════════════╪════════╪════════════╪════════╪════════════╡
│             ┆ 62112       ┆ 56          ┆ ["Algeria" ┆ null   ┆ 23767      ┆ 10803  ┆ 27542      │
│             ┆             ┆             ┆ ,          ┆        ┆            ┆        ┆            │
│             ┆             ┆             ┆ "Angola",  ┆        ┆            ┆        ┆            │
│             ┆             ┆             ┆ … "eSwat…  ┆        ┆        