In [2]:
# imports
import requests
import pandas as pd
from google.cloud import bigquery
from google.oauth2 import service_account
import json

In [3]:
# BQ 
PROJECT_ID = "nimbus-479211"
DATASET_ID = "nimbus_assignment"
TABLE_ID = "stg_ev_chargepoints"
CREDENTIALS_PATH = "credentials.json" # must be downlaoded from GCP environment

# Open Charge Map API 
OCM_API_URL = "https://api.openchargemap.io/v3/poi/"
API_KEY = "f561c179-7f48-4705-a7af-012fdab7f987"  
USER_AGENT = "NimbusAssignment/1.0"

In [19]:
# Strict Zone 1 Postcodes
CENTRAL_LONDON_PREFIXES = ['EC1', 'EC2', 'EC3', 'EC4', 'WC1', 'WC2', 'SE1', 'SW1', 'W1']

def run_pipeline():

    # 1. EXTRACT
    params = {
        "output": "json", "countrycode": "GB", "maxresults": 5000, "compact": False, 
        "key": API_KEY, "latitude": 51.5074, "longitude": -0.1278, "distance": 5, "distanceunit": "Miles"
    }
    print("Fetching API data...")
    response = requests.get(OCM_API_URL, params=params, headers={"User-Agent": USER_AGENT})
    response.raise_for_status()
    raw_data = response.json()
    print(f"Fetched {len(raw_data)} records.")

    # 2. TRANSFORM
    clean_rows = []
    
    for device in raw_data:
        addr = device.get('AddressInfo') or {}
        postcode = (addr.get('Postcode') or '').strip().upper()
        
        # Central London Filter
        matched = False
        for area in CENTRAL_LONDON_PREFIXES:
            if postcode.startswith(area):
                suffix = postcode[len(area):]
                if not suffix or suffix[0] == " " or not suffix[0].isdigit():
                    matched = True
                    break
        if not matched: continue

        # Safe Nested Objects
        operator_info = device.get('OperatorInfo') or {}
        usage_info = device.get('UsageType') or {}
        status_info = device.get('StatusType') or {}
        conns = device.get('Connections') or []

        # Aggregations & Tech Specs
        max_kw = max([float(c.get('PowerKW') or 0) for c in conns], default=0.0)
        conn_types = {c.get('ConnectionType', {}).get('Title', 'Unknown') for c in conns if c.get('ConnectionType')}
        
        # Current Type Logic
        curr_types = {c.get('CurrentType', {}).get('Title', '') for c in conns if c.get('CurrentType')}
        primary_current = "DC" if any("DC" in t for t in curr_types) else "AC"

        operational_count = sum(1 for c in conns if (c.get('StatusType') or {}).get('IsOperational', False))

        clean_rows.append({
            'charge_device_id': str(device.get('ID')),
            'name': (addr.get('Title') or 'Unknown'),
            'latitude': float(addr.get('Latitude') or 0),
            'longitude': float(addr.get('Longitude') or 0),
            'postcode': postcode,
            'town': (addr.get('Town') or 'London'),
            'district': postcode.split()[0][:4], 
            
            # Business & Access
            'operator': (operator_info.get('Title') or 'Unknown'),
            'usage_type': (usage_info.get('Title') or 'Unknown'),
            'status': (status_info.get('Title') or 'Unknown'),
            'pay_at_location': bool(usage_info.get('IsPayAtLocation', False)),
            'is_membership_required': bool(usage_info.get('IsMembershipRequired', False)),
            
            # Technical Specs
            'max_power_kw': max_kw,
            'connector_types': ", ".join(conn_types),
            'current_type': primary_current,
            'total_plugs': len(conns),
            'operational_plugs': operational_count,
            
            # Metadata / Quality
            'date_created': pd.to_datetime(device.get('DateCreated'), errors='coerce'),
            'data_quality_level': int(device.get('DataQualityLevel') or 1),
            'last_verified': pd.to_datetime(device.get('DateLastVerified'), errors='coerce'),
            'data_source': 'OpenChargeMap',
            'ingested_at': pd.Timestamp.now()
        })

    df = pd.DataFrame(clean_rows)
    if df.empty: 
        print("No data matched filters. Exiting.")
        return None

    print(f"Identified {len(df)} records.")

    # 3. LOAD
    print("Loading to BigQuery...")
    creds = service_account.Credentials.from_service_account_file(CREDENTIALS_PATH)
    client = bigquery.Client(credentials=creds, project=PROJECT_ID)
    
    table_full_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
    
    schema = [
        bigquery.SchemaField("charge_device_id", "STRING"),
        bigquery.SchemaField("name", "STRING"),
        bigquery.SchemaField("latitude", "FLOAT"),
        bigquery.SchemaField("longitude", "FLOAT"),
        bigquery.SchemaField("postcode", "STRING"),
        bigquery.SchemaField("town", "STRING"),
        bigquery.SchemaField("district", "STRING"),
        
        bigquery.SchemaField("operator", "STRING"),
        bigquery.SchemaField("usage_type", "STRING"),
        bigquery.SchemaField("status", "STRING"),
        bigquery.SchemaField("pay_at_location", "BOOLEAN"),
        bigquery.SchemaField("is_membership_required", "BOOLEAN"),
        
        bigquery.SchemaField("max_power_kw", "FLOAT"),
        bigquery.SchemaField("connector_types", "STRING"),
        bigquery.SchemaField("current_type", "STRING"),
        bigquery.SchemaField("total_plugs", "INTEGER"),
        bigquery.SchemaField("operational_plugs", "INTEGER"),
        
        bigquery.SchemaField("date_created", "TIMESTAMP"),
        bigquery.SchemaField("data_quality_level", "INTEGER"),
        bigquery.SchemaField("last_verified", "TIMESTAMP"),
        bigquery.SchemaField("data_source", "STRING"),
        bigquery.SchemaField("ingested_at", "TIMESTAMP"),
    ]

    job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE", schema=schema)
    job = client.load_table_from_dataframe(df, table_full_id, job_config=job_config)
    job.result()
    print(f"SUCCESS: Loaded {len(df)} rows into {table_full_id}")
    
    # Return the dataframe so you can inspect locally
    return df

if __name__ == "__main__":
    # If run as a script, this just executes the pipeline
    df = run_pipeline()

Fetching API data...
Fetched 5000 records.
Identified 790 records.
Loading to BigQuery...
SUCCESS: Loaded 790 rows into nimbus-479211.nimbus_assignment.stg_ev_chargepoints


In [20]:
df = run_pipeline()

Fetching API data...
Fetched 5000 records.
Identified 790 records.
Loading to BigQuery...
SUCCESS: Loaded 790 rows into nimbus-479211.nimbus_assignment.stg_ev_chargepoints


In [21]:
df.head(2)

Unnamed: 0,charge_device_id,name,latitude,longitude,postcode,town,district,operator,usage_type,status,...,max_power_kw,connector_types,current_type,total_plugs,operational_plugs,date_created,data_quality_level,last_verified,data_source,ingested_at
0,4396,Masterpark Trafalgar Square Car Park,51.507291,-0.128896,SW1A 2BN,London,SW1A,(Unknown Operator),Unknown,Operational,...,0.0,,AC,0,0,2011-05-17 17:23:00+00:00,1,2011-05-17 17:23:00+00:00,OpenChargeMap,2025-11-25 17:21:42.190224
1,52877,Q-Park Trafalgar Car Park,51.507099,-0.130117,SW1A 2TS,London,SW1A,BP Pulse (UK),Public - Membership Required,Operational,...,7.0,"Type 2 (Socket Only), BS1363 3 Pin 13 Amp",AC,2,2,2015-09-13 17:55:00+00:00,1,2023-04-03 16:58:00+00:00,OpenChargeMap,2025-11-25 17:21:42.190713


In [22]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 790 entries, 0 to 789
Data columns (total 22 columns):
 #   Column                  Non-Null Count  Dtype              
---  ------                  --------------  -----              
 0   charge_device_id        790 non-null    object             
 1   name                    790 non-null    object             
 2   latitude                790 non-null    float64            
 3   longitude               790 non-null    float64            
 4   postcode                790 non-null    object             
 5   town                    790 non-null    object             
 6   district                790 non-null    object             
 7   operator                790 non-null    object             
 8   usage_type              790 non-null    object             
 9   status                  790 non-null    object             
 10  pay_at_location         790 non-null    bool               
 11  is_membership_required  790 non-null    bool 

In [23]:
df.to_csv('ev_charge_points.csv')

In [24]:
df.isnull().sum()

charge_device_id          0
name                      0
latitude                  0
longitude                 0
postcode                  0
town                      0
district                  0
operator                  0
usage_type                0
status                    0
pay_at_location           0
is_membership_required    0
max_power_kw              0
connector_types           0
current_type              0
total_plugs               0
operational_plugs         0
date_created              0
data_quality_level        0
last_verified             0
data_source               0
ingested_at               0
dtype: int64

In [25]:
df.duplicated().sum()

0

In [26]:
min(df['last_verified'])

Timestamp('2011-05-17 17:23:00+0000', tz='UTC')

In [27]:
max(df['last_verified'])

Timestamp('2025-08-17 22:57:00+0000', tz='UTC')

In [28]:
df['status'].value_counts()

status
Unknown                       436
Operational                   346
Partly Operational (Mixed)      4
Not Operational                 2
Planned For Future Date         2
Name: count, dtype: int64

In [29]:
df['max_power_kw'].value_counts()

max_power_kw
3.7      200
5.0      161
5.1      161
7.0      128
22.0      44
50.0      29
0.0       21
4.0       12
3.0       10
3.5        8
4.8        7
3.6        3
150.0      1
12.0       1
2.3        1
4.6        1
13.0       1
120.0      1
Name: count, dtype: int64

In [30]:
# poorly formatted
df['postcode'].dropna().apply(lambda x: x.split()[0]).unique().tolist()

['SW1A',
 'WC2R',
 'WC2H',
 'WC2N',
 'SW1Y',
 'WC2E',
 'W1F',
 'SE1',
 'SW1H',
 'WC2B',
 'W1D',
 'W1S',
 'WC1A',
 'SW1P',
 'W1J',
 'WC1B',
 'WC2A',
 'W1T',
 'SW1E',
 'W1B',
 'W1K',
 'SW1W',
 'WC1R',
 'WC1E',
 'W1G',
 'W1W',
 'WC1N',
 'SW1',
 'WC1H',
 'EC4V',
 'SW1X',
 'WC1X',
 'SW1V',
 'EC1N',
 'W1U',
 'W1X',
 'W1H',
 'EC1M',
 'EC1A',
 'SW1X8PS',
 'EC1R',
 'EC4M',
 'EC4R',
 'EC2V',
 'EC2Y',
 'EC2Y8BY',
 'EC1V',
 'EC1Y',
 'EC2A',
 'EC3P',
 'EC3R',
 'EC3N',
 'EC2M']