In [None]:
import requests  # API calls
import json      # JSON handling
import time      # For retry backoff
import logging   # Structured logging
from typing import Optional, Dict, Any, List  # Type hints for clarity
from copy import deepcopy


# Setup logging

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("pipeline.log")
    ]
)


# API ingestion class

class APIIngestor:
    def __init__(self, base_url: str, max_retries: int = 5, backoff_factor: float = 0.5):
        """
        Initialize the API ingestor.

        Args:
            base_url (str): Base URL of the API.
            max_retries (int): Number of retries for failed requests.
            backoff_factor (float): Backoff multiplier for retries.
        """
        self.base_url = base_url
        self.max_retries = max_retries
        self.backoff_factor = backoff_factor

    def _make_request(self, url: str, params: Optional[Dict[str, Any]] = None) -> Dict:
        """
        Make a GET request with retry logic.

        Args:
            url (str): Full API endpoint URL.
            params (dict, optional): Query parameters.

        Returns:
            dict: Parsed JSON response.
        """
        retries = 0
        while retries < self.max_retries:
            try:
                response = requests.get(url, params=params, timeout=10)
                if response.status_code == 200:
                    return response.json()
                else:
                    logging.warning(f"Request failed [{response.status_code}]: {response.text}")
            except requests.RequestException as e:
                logging.warning(f"Request exception: {e}")

            retries += 1
            sleep_time = self.backoff_factor * (2 ** (retries - 1))
            logging.info(f"Retrying in {sleep_time:.1f}s...")
            time.sleep(sleep_time)

        raise Exception(f"Failed to fetch data from {url} after {self.max_retries} retries")

    #fetch_paginated → “I want all users, but I can only carry 100 at a time.”



    def fetch_paginated(
            self,
            endpoint: str,
            params: Optional[Dict[str, Any]] = None,
            page_param: str = "page",
            per_page: int = 100,
            max_pages: Optional[int] = None,
            months: Optional[List[str]] = None  # NEW: optional list of months
        ) -> List[Dict]:
        """
        Fetch data from APIs that paginate results.

        Args:
            endpoint (str): API endpoint path.
            params (dict, optional): Query parameters.
            page_param (str): Parameter name for page number.
            per_page (int): Number of results per page.
            max_pages (int, optional): Maximum number of pages to fetch.
            months (list of str, optional): List of months to fetch (e.g., ["2025-01", "2025-02"]).

        Returns:
            List[Dict]: List of all records fetched.
        """
        results = []

        # If no months are provided, default to single month in params
        if months is None:
            months = [params.get("date")] if params and "date" in params else [None]

        for month in months:
            page = 1
            while True:
                paged_params = deepcopy(params) if params else {}
                if month:
                    paged_params["date"] = month
                paged_params.update({page_param: page, "results": per_page})

                logging.info(f"Fetching page {page} from {endpoint} for month {month}")
                data = self._make_request(f"{self.base_url}{endpoint}", paged_params)

                # Support APIs that return either a list or a dict with 'results'
                items = data.get("results") if isinstance(data, dict) and "results" in data else data
                if not items:
                    break

                results.extend(items)

                if max_pages and page >= max_pages:
                    break
                if len(items) < per_page:
                    break

                page += 1

        return results

        

    #fetch_incremental → “I want only the users added or updated since yesterday.”

    def fetch_incremental(
        self,
        endpoint: str,
        since: Optional[str] = None,
        params: Optional[Dict[str, Any]] = None
    ) -> List[Dict]:
        """
        Fetch only new or updated records since a given timestamp or ID.

        Args:
            endpoint (str): API endpoint path.
            since (str, optional): Timestamp or ID to fetch incremental data from.
            params (dict, optional): Additional query parameters.

        Returns:
            List[Dict]: Incremental data.
        """
        query_params = params.copy() if params else {}
        if since:
            query_params["since"] = since

        logging.info(f"Fetching incremental data from {endpoint} since {since}")
        data = self._make_request(f"{self.base_url}{endpoint}", query_params)
        return data.get("results") if isinstance(data, dict) and "results" in data else data

    def batch_process(self, data: List[Dict], batch_size: int = 50):
        """
        Yield data in batches.

        Args:
            data (List[Dict]): List of records.
            batch_size (int): Size of each batch.

        Yields:
            List[Dict]: Batch of records.
        """
        for i in range(0, len(data), batch_size):
            yield data[i:i + batch_size]



##Instantiting the Class
if __name__ == "__main__":                       #This line ensures the code only runs when the script is executed directly, not when imported as a module in another script.
   
    ingestor = APIIngestor(base_url="https://data.police.uk/api/")

    # Example: Fetch multiple pages from Police UK API
    all_crimes = ingestor.fetch_paginated(
        endpoint="/api/crimes-street/all-crime",
        params = {
            "date": f"2025-{month:02d}",
        "lat": 52.629729,
        "lng": -1.131592},
            per_page=200,
            max_pages=20
    )
    logging.info(f"Total users fetched: {len(all_crimes)}")

    # Process in batches
    for batch in ingestor.batch_process(all_crimes, batch_size=100):
        logging.info(f"Processing batch of {len(batch)} crimes")
        logging.info(f"Batch type: {type(batch)} | Sample: {batch[:2] if isinstance(batch, list) else batch}")
        logging.info(batch[1]["location"]["street"]["name"])



#Light Transformation



2025-10-31 21:51:16,935 [INFO] Fetching page 1 from /api/crimes-street/all-crime for month None
2025-10-31 21:51:17,193 [INFO] Fetching page 2 from /api/crimes-street/all-crime for month None
2025-10-31 21:51:17,324 [INFO] Fetching page 3 from /api/crimes-street/all-crime for month None
2025-10-31 21:51:17,713 [INFO] Fetching page 4 from /api/crimes-street/all-crime for month None
2025-10-31 21:51:18,194 [INFO] Fetching page 5 from /api/crimes-street/all-crime for month None
2025-10-31 21:51:18,315 [INFO] Fetching page 6 from /api/crimes-street/all-crime for month None
2025-10-31 21:51:18,692 [INFO] Fetching page 7 from /api/crimes-street/all-crime for month None
2025-10-31 21:51:19,205 [INFO] Fetching page 8 from /api/crimes-street/all-crime for month None
2025-10-31 21:51:19,695 [INFO] Fetching page 9 from /api/crimes-street/all-crime for month None
2025-10-31 21:51:20,191 [INFO] Fetching page 10 from /api/crimes-street/all-crime for month None
2025-10-31 21:51:20,692 [INFO] Fetching

In [3]:
import requests

url = "https://data.police.uk/api/crimes-street/all-crime"
params = {
    "date": "2024-01",
    "lat": 52.629729,
    "lng": -1.131592
}

response = requests.get(url, params=params)

if response.status_code == 200:
    data = response.json()
    print(f"Returned {len(data)} records")
    print(data[0])  # sample record
else:
    print(f"Error: {response.status_code}")


Returned 1472 records
{'category': 'anti-social-behaviour', 'location_type': 'Force', 'location': {'latitude': '52.639558', 'street': {'id': 1738364, 'name': 'On or near Nightclub'}, 'longitude': '-1.132473'}, 'context': '', 'outcome_status': None, 'persistent_id': '', 'id': 116208928, 'location_subtype': '', 'month': '2024-01'}


In [2]:
import pandas as pd

# Convert the raw list of crime dictionaries to a flattened DataFrame
df = pd.json_normalize(all_crimes)

df.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 34860 entries, 0 to 34859
Data columns (total 14 columns):
 #   Column                   Non-Null Count  Dtype  
---  ------                   --------------  -----  
 0   category                 34860 non-null  object 
 1   location_type            34860 non-null  object 
 2   context                  34860 non-null  object 
 3   outcome_status           0 non-null      float64
 4   persistent_id            34860 non-null  object 
 5   id                       34860 non-null  int64  
 6   location_subtype         34860 non-null  object 
 7   month                    34860 non-null  object 
 8   location.latitude        34860 non-null  object 
 9   location.street.id       34860 non-null  int64  
 10  location.street.name     34860 non-null  object 
 11  location.longitude       34860 non-null  object 
 12  outcome_status.category  30220 non-null  object 
 13  outcome_status.date      30220 non-null  object 
dtypes: float64(1), int64(2

In [None]:
# Replace NaN in all columns with 'unknown'
columns_to_drop = ['outcome_status']
df = (
    df.drop(columns=columns_to_drop).rename(columns={
    'location.street.name': 'street_name',
    'location.latitude': 'latitude',
    'location.longitude': 'longitude',
    'outcome_status.category': 'outcome_status_category',
     'outcome_status.date': 'outcome_status_date'  
})
)
df.head()

Unnamed: 0,category,location_type,context,persistent_id,id,location_subtype,month,latitude,location.street.id,street_name,longitude,outcome_status_category,outcome_status_date
0,anti-social-behaviour,Force,,,130066740,,2025-07,52.621857,1737629,On or near Quainton Road,-1.146029,unknown,unknown
1,anti-social-behaviour,Force,,,130066726,,2025-07,52.627798,1737705,On or near Tarragon Road,-1.144613,unknown,unknown
2,anti-social-behaviour,Force,,,130066724,,2025-07,52.627798,1737705,On or near Tarragon Road,-1.144613,unknown,unknown
3,anti-social-behaviour,Force,,,130067358,,2025-07,52.628462,1738792,On or near De Montfort Square,-1.126029,unknown,unknown
4,anti-social-behaviour,Force,,,130067331,,2025-07,52.636041,1738351,On or near East Gates,-1.133444,unknown,unknown


In [42]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 17240 entries, 0 to 17239
Data columns (total 13 columns):
 #   Column                   Non-Null Count  Dtype 
---  ------                   --------------  ----- 
 0   category                 17240 non-null  object
 1   location_type            17240 non-null  object
 2   context                  17240 non-null  object
 3   persistent_id            17240 non-null  object
 4   id                       17240 non-null  int64 
 5   location_subtype         17240 non-null  object
 6   month                    17240 non-null  object
 7   latitude                 17240 non-null  object
 8   location.street.id       17240 non-null  int64 
 9   street_name              17240 non-null  object
 10  longitude                17240 non-null  object
 11  outcome_status_category  17240 non-null  object
 12  outcome_status_date      17240 non-null  object
dtypes: int64(2), object(11)
memory usage: 1.7+ MB


In [3]:
import duckdb


In [4]:
query = """
select month
from df
group by month
"""

result = duckdb.query(query).to_df()

result

Unnamed: 0,month
0,2025-08


In [46]:
import snowflake.connector

conn = snowflake.connector.connect(
    user='dbt',
    password= 'dbtPassword123',
    account='sskjybh-yw79994',          # e.g., xy12345.us-east-1
    warehouse='COMPUTE_WH',
    database='GA_ANALYTICS',
    schema='GA_EXPORT'
)

cursor = conn.cursor()

2025-10-31 20:31:46,951 [INFO] Snowflake Connector for Python Version: 4.0.0, Python Version: 3.12.12, Platform: macOS-14.6-arm64-arm-64bit
2025-10-31 20:31:46,952 [INFO] Connecting to GLOBAL Snowflake domain


In [55]:
df.to_csv("all_crimes.csv", index=False)

AttributeError: module 'plotly.express' has no attribute '__version__'