In [None]:
import os
import glob
import zipfile
import shutil
import pandas as pd
from datetime import datetime
from dateutil.relativedelta import relativedelta  # pip install python-dateutil

# Base directory containing all ZIP files
crime_data_base_dir = r"C:/Studie/SSML/project/police_crime_archive"
# A single target directory to collect all extracted folders
extraction_target_dir = os.path.join(crime_data_base_dir, "extracted_2013_2025")
os.makedirs(extraction_target_dir, exist_ok=True)

# 1. Helper function to list all months between start_ym and end_ym
def year_month_range(start_ym, end_ym):
    """
    Yields year-month strings (YYYY-MM) from start_ym up to and including end_ym.
    Example:
      start_ym = '2014-12', end_ym = '2015-02'
      yields '2014-12', '2015-01', '2015-02'
    """
    start_dt = datetime.strptime(start_ym, "%Y-%m")
    end_dt = datetime.strptime(end_ym, "%Y-%m")

    curr = start_dt
    while curr <= end_dt:
        yield curr.strftime("%Y-%m")
        curr += relativedelta(months=1)

# 2. Define which folders each ZIP covers via (start_ym, end_ym)
zip_ranges = {
    "2014-12.zip": ("2010-12", "2014-12"),
    "2017-11.zip": ("2014-12", "2017-11"),
    "2020-11.zip": ("2017-12", "2020-11"),
    "2022-12.zip": ("2020-01", "2022-12"),
    "2025-01.zip": ("2022-02", "2025-01"),
}

# 3. Find all ZIP files under crime_data_base_dir
zip_files = []
for root, dirs, files in os.walk(crime_data_base_dir):
    for f in files:
        if f.endswith(".zip"):
            zip_files.append(os.path.join(root, f))

extracted_folders = []

# 4. Extract only valid subfolders for each ZIP
for zip_path in zip_files:
    zip_filename = os.path.basename(zip_path)

    # If this ZIP is in zip_ranges, we know which range to extract
    if zip_filename in zip_ranges:
        start_ym, end_ym = zip_ranges[zip_filename]
        months_to_extract = list(year_month_range(start_ym, end_ym))
        # e.g. if start_ym='2014-12' and end_ym='2017-11', 
        # months_to_extract might be ['2014-12','2015-01','2015-02',...,'2017-11']
        
        print(f"\nExtracting from {zip_filename}: {start_ym} to {end_ym}")
        
        with zipfile.ZipFile(zip_path, 'r') as zf:
            zip_contents = zf.namelist()  # all files/folders in the zip

            # For each month in that range, extract that folder if present
            for ym in months_to_extract:
                # We look for files in the ZIP whose paths start with "YYYY-MM/"
                prefix = ym + "/"
                matching_files = [f for f in zip_contents if f.startswith(prefix)]

                if matching_files:
                    # We'll extract these to a subfolder in extraction_target_dir
                    target_folder = os.path.join(extraction_target_dir, ym)

                    # If that folder already exists, we remove it to overwrite
                    if os.path.exists(target_folder):
                        shutil.rmtree(target_folder)

                    # Extract matching files into that target folder
                    for fpath in matching_files:
                        # Remove the 'ym/' prefix from the path, 
                        # so it extracts into target_folder
                        relative_path = fpath[len(prefix):]

                        # Build final path on disk
                        out_path = os.path.join(target_folder, relative_path)

                        if fpath.endswith("/"):
                            # It's just a directory entry
                            os.makedirs(out_path, exist_ok=True)
                        else:
                            # Extract the file
                            os.makedirs(os.path.dirname(out_path), exist_ok=True)
                            with zf.open(fpath) as src, open(out_path, "wb") as dst:
                                dst.write(src.read())

                    extracted_folders.append(target_folder)
                    print(f" Extracted {ym} → {target_folder}")
                else:
                    # That month folder doesn't exist in this ZIP
                    pass
    else:
        print(f"\nSkipping {zip_filename}: no known date range mapping.")

# 5. Save extracted folders to CSV (optional)
output_csv = os.path.join(extraction_target_dir, "extracted_folders.csv")
df = pd.DataFrame({"folder_path": sorted(set(extracted_folders))})
df.to_csv(output_csv, index=False)

# Summary
print("\nDone!")
print(f"Total ZIPs processed: {len(zip_files)}")
print(f"Total folder paths extracted: {len(extracted_folders)}")
print(f"Unique extracted folders: {len(set(extracted_folders))}")
print(f"Folder paths saved to: {output_csv}")



Skipping 2013-12.zip: no known date range mapping.

Skipping 2014-01.zip: no known date range mapping.

Skipping 2014-02.zip: no known date range mapping.

Skipping 2014-03.zip: no known date range mapping.

Skipping 2014-04.zip: no known date range mapping.

Skipping 2014-05.zip: no known date range mapping.

Skipping 2014-06.zip: no known date range mapping.

Skipping 2014-07.zip: no known date range mapping.

Skipping 2014-08.zip: no known date range mapping.

Skipping 2014-09.zip: no known date range mapping.

Skipping 2014-10.zip: no known date range mapping.

Skipping 2014-11.zip: no known date range mapping.

Extracting from 2014-12.zip: 2010-12 to 2014-12
 Extracted 2010-12 → C:/Studie/SSML/project/police_crime_archive\extracted_2013_2025\2010-12
 Extracted 2011-01 → C:/Studie/SSML/project/police_crime_archive\extracted_2013_2025\2011-01
 Extracted 2011-02 → C:/Studie/SSML/project/police_crime_archive\extracted_2013_2025\2011-02
 Extracted 2011-03 → C:/Studie/SSML/project/poli

In [None]:
import os
import pandas as pd

# 1) Define the root directory containing month-folders
root_dir = r"C:/Studie/SSML/project/police_crime_archive/extracted_2013_2025"

# 2) Initialize an empty DataFrame for concatenation
combined_df = pd.DataFrame()

# 3) Iterate over each subfolder (YYYY-MM)
for folder_name in sorted(os.listdir(root_dir)):
    folder_path = os.path.join(root_dir, folder_name)
    print(fold)
    # Ensure we're processing only directories
    if os.path.isdir(folder_path):
        # Grab all CSV files within this folder
        csv_files = [
            f for f in os.listdir(folder_path)
            if f.lower().endswith('.csv')
        ]
        
        # Read each CSV and append to combined_df
        for csv_file in csv_files:
            csv_path = os.path.join(folder_path, csv_file)
            
            # Read the CSV
            df = pd.read_csv(csv_path)
            
            # Optionally, track which folder (year-month) this data originated from
            df['year_month'] = folder_name

            # Concatenate
            combined_df = pd.concat([combined_df, df], ignore_index=True)

# 4) Write the combined DataFrame to a single CSV file
output_csv = os.path.join(root_dir, "combined_data.csv")
combined_df.to_csv(output_csv, index=False)

print(f"Finished combining CSV files into: {output_csv}")
print(f"Total rows in the combined DataFrame: {len(combined_df)}")


KeyboardInterrupt: 

In [5]:
import os
import pyarrow as pa
import pyarrow.csv as arrow_csv
import pyarrow.parquet as pq

def gather_csv_paths_by_pattern(root_dir: str, substring: str):
    """
    Recursively finds all CSV files under root_dir whose filenames contain the given substring.
    Returns a sorted list of absolute file paths.
    """
    matched_paths = []
    for folder_name in sorted(os.listdir(root_dir)):
        folder_path = os.path.join(root_dir, folder_name)
        if os.path.isdir(folder_path):
            csv_files = [
                f for f in os.listdir(folder_path)
                if f.lower().endswith(".csv") and substring in f.lower()
            ]
            for csv_file in csv_files:
                csv_path = os.path.join(folder_path, csv_file)
                matched_paths.append(csv_path)
    return sorted(matched_paths)

def infer_union_schema(csv_paths):
    """
    Reads each CSV (using default inference) and collects its schema.
    For each CSV that contains a 'Date' column, force cast that column to 
    timestamp[ns, tz=UTC]. Then unifies the list of schemas into one union schema.
    """
    schemas = []
    for path in csv_paths:
        table = arrow_csv.read_csv(path)
        # If the CSV has a 'Date' column, force it to timestamp[ns, tz=UTC]
        if "Date" in table.column_names:
            date_idx = table.column_names.index("Date")
            try:
                # Cast the Date column to ns resolution
                casted_date = table.column(date_idx).cast(pa.timestamp("ns", tz="UTC"))
                table = table.set_column(date_idx, "Date", casted_date)
            except Exception as e:
                print(f"Error casting 'Date' in {path}: {e}")
        schemas.append(table.schema)
    if not schemas:
        return None
    union_schema = pa.unify_schemas(schemas)
    return union_schema

def write_csvs_with_inferred_schema(csv_paths, union_schema, output_parquet):
    """
    Second pass: For each CSV path in csv_paths,
    - Reads the CSV with dictionary encoding enabled,
    - Adds missing columns as null arrays,
    - Drops extra columns,
    - Reorders columns to match union_schema,
    - Casts the table to union_schema,
    - And writes/appends the table to a single Parquet file.
    """
    if os.path.exists(output_parquet):
        os.remove(output_parquet)
    writer = None
    for path in csv_paths:
        table = arrow_csv.read_csv(
            path,
            convert_options=arrow_csv.ConvertOptions()
        )
        # If 'Date' is present, cast it to timestamp[ns, tz=UTC] as before
        if "Date" in table.column_names:
            date_idx = table.column_names.index("Date")
            try:
                table = table.set_column(date_idx, "Date", table.column(date_idx).cast(pa.timestamp("ns", tz="UTC")))
            except Exception as e:
                print(f"Error casting 'Date' in {path}: {e}")

        # 1) Add missing columns as null arrays based on union_schema
        needed_cols = [f.name for f in union_schema]
        current_cols = table.column_names
        for field in union_schema:
            if field.name not in current_cols:
                null_arr = pa.array([None] * table.num_rows, type=field.type)
                table = table.append_column(field.name, null_arr)

        # 2) Drop columns not in union_schema
        keep_indices = [i for i, cname in enumerate(table.column_names) if cname in needed_cols]
        table = table.select(keep_indices)

        # 3) Reorder columns to match union_schema exactly
        reorder_indices = []
        for field in union_schema:
            reorder_indices.append(table.column_names.index(field.name))
        table = table.select(reorder_indices)

        # 4) Cast table to union_schema for consistency
        table = table.cast(union_schema)

        # 5) Write the table incrementally using ParquetWriter
        if writer is None:
            writer = pq.ParquetWriter(output_parquet, schema=union_schema)
        else:
            if not table.schema.equals(writer.schema, check_metadata=False):
                table = table.cast(writer.schema)
        writer.write_table(table)
        print(f"Appended {table.num_rows} rows from {os.path.basename(path)} to {output_parquet}")
    if writer is not None:
        writer.close()
        print(f"\nWrote {output_parquet} with data from {len(csv_paths)} CSVs.")
    else:
        print("No CSVs to write.")

def process_csv_type(root_dir: str, substring: str, output_parquet: str):
    """
    For a given CSV type (determined by the substring in the filename):
    1. Gather CSV paths,
    2. Infer the union schema across these CSVs (forcing the 'Date' column type),
    3. Read each CSV again, unify its columns, cast to the union schema, and write incrementally to one Parquet file.
    """
    csv_paths = gather_csv_paths_by_pattern(root_dir, substring)
    if not csv_paths:
        print(f"No files matched substring '{substring}'. Nothing to do.")
        return
    union_schema = infer_union_schema(csv_paths)
    if not union_schema:
        print(f"Could not infer schema for substring '{substring}'. No CSV data?")
        return
    write_csvs_with_inferred_schema(csv_paths, union_schema, output_parquet)

# -----------------------------------------------------------------------------
if __name__ == "__main__":
    # Define the root directory containing subfolders (e.g., year-month folders)
    root_dir = r"C:/Studie/SSML/project/police_crime_archive/extracted_2013_2025"

    # Define patterns and corresponding output Parquet filenames.
    # This will process CSVs whose filenames contain:
    #   '-street', '-stop-and-search', and '-outcomes'
    patterns = [
       # ("-street", "combined_street.parquet"),
        ("-stop-and-search", "combined_stop_and_search.parquet"),
        ("-outcomes", "combined_outcomes.parquet"),
    ]

    for pattern_substring, out_name in patterns:
        out_path = os.path.join(root_dir, out_name)
        print(f"\n=== Processing CSV type: '{pattern_substring}' ===")
        process_csv_type(root_dir, pattern_substring, out_path)



=== Processing CSV type: '-stop-and-search' ===
Appended 627 rows from 2015-01-avon-and-somerset-stop-and-search.csv to C:/Studie/SSML/project/police_crime_archive/extracted_2013_2025\combined_stop_and_search.parquet
Appended 488 rows from 2015-01-cleveland-stop-and-search.csv to C:/Studie/SSML/project/police_crime_archive/extracted_2013_2025\combined_stop_and_search.parquet
Appended 554 rows from 2015-01-dorset-stop-and-search.csv to C:/Studie/SSML/project/police_crime_archive/extracted_2013_2025\combined_stop_and_search.parquet
Appended 360 rows from 2015-01-dyfed-powys-stop-and-search.csv to C:/Studie/SSML/project/police_crime_archive/extracted_2013_2025\combined_stop_and_search.parquet
Appended 1012 rows from 2015-01-greater-manchester-stop-and-search.csv to C:/Studie/SSML/project/police_crime_archive/extracted_2013_2025\combined_stop_and_search.parquet
Appended 108 rows from 2015-01-gwent-stop-and-search.csv to C:/Studie/SSML/project/police_crime_archive/extracted_2013_2025\combi

In [3]:
import os
import pyarrow as pa
import pyarrow.dataset as ds
import geopandas as gpd
import pandas as pd

def chunked_parquet_to_gpkg(parquet_file, gpkg_file, layer_name, chunksize=1_000_000):
    """
    Streams the Parquet in batches, building partial GeoDataFrames,
    writing each chunk to a TEMP GPKG, then merges at the end.
    
    Args:
        parquet_file (str): Path to your Parquet file.
        gpkg_file (str): Path to the final GeoPackage to create.
        layer_name (str): Layer name within the GeoPackage.
        chunksize (int): Number of rows (approx) per chunk. 
                         Adjust to fit your memory constraints.
    """
    # 1) Build a PyArrow Dataset from the Parquet file
    dataset = ds.dataset(parquet_file, format="parquet")

    # We'll gather partial GPKGs in a list for merging
    temp_files = []
    chunk_idx = 0

    # 2) Read in batches using to_batches
    for record_batch in dataset.to_batches(batch_size=chunksize):
        # Convert record_batch -> a small PyArrow Table -> Pandas
        table = pa.Table.from_batches([record_batch])
        df = table.to_pandas()

        # 3) Create geometry column from (Longitude, Latitude)
        gdf = gpd.GeoDataFrame(
            df,
            geometry=gpd.points_from_xy(df['Longitude'], df['Latitude']),
            crs="EPSG:4326"
        )

        # 4) Write chunk to a temporary GPKG
        temp_gpkg = f"{gpkg_file}_temp_{chunk_idx}.gpkg"
        gdf.to_file(temp_gpkg, layer=layer_name, driver="GPKG")
        temp_files.append(temp_gpkg)

        print(f"Wrote chunk {chunk_idx} with {len(gdf)} rows to {temp_gpkg}")
        chunk_idx += 1

    # 5) Merge partial GPKGs if needed
    # NOTE: This step can be memory-heavy if all chunks combined are large.
    #       If your data is huge, consider merging with an external tool
    #       or storing in a DB (e.g., SpatiaLite/PostGIS) instead.
    if temp_files:
        merged_gdf = None
        for i, temp_path in enumerate(temp_files):
            chunk_gdf = gpd.read_file(temp_path, layer=layer_name)
            if merged_gdf is None:
                merged_gdf = chunk_gdf
            else:
                merged_gdf = gpd.GeoDataFrame(
                    pd.concat([merged_gdf, chunk_gdf], ignore_index=True),
                    crs=chunk_gdf.crs
                )
        
        # Finally, write merged result to the final GPKG
        merged_gdf.to_file(gpkg_file, layer=layer_name, driver="GPKG")
        print(f"\nFinal GPKG with {len(merged_gdf)} rows: {gpkg_file}")

        # Clean up temporary chunk files
        for f in temp_files:
            os.remove(f)
    else:
        print("No data to write (no chunks).")



# Usage: chunked approach
street_parquet = r"C:\Studie\SSML\project\police_crime_archive\extracted_2013_2025\combined_street.parquet"
gpkg_path = r"crime_data.gpkg"
chunked_parquet_to_gpkg(street_parquet, gpkg_path, layer_name="street_data")


Wrote chunk 0 with 12201 rows to crime_data.gpkg_temp_0.gpkg
Wrote chunk 1 with 5393 rows to crime_data.gpkg_temp_1.gpkg
Wrote chunk 2 with 1470 rows to crime_data.gpkg_temp_2.gpkg
Wrote chunk 3 with 5916 rows to crime_data.gpkg_temp_3.gpkg
Wrote chunk 4 with 4363 rows to crime_data.gpkg_temp_4.gpkg
Wrote chunk 5 with 635 rows to crime_data.gpkg_temp_5.gpkg
Wrote chunk 6 with 6195 rows to crime_data.gpkg_temp_6.gpkg
Wrote chunk 7 with 4083 rows to crime_data.gpkg_temp_7.gpkg
Wrote chunk 8 with 8316 rows to crime_data.gpkg_temp_8.gpkg
Wrote chunk 9 with 9829 rows to crime_data.gpkg_temp_9.gpkg
Wrote chunk 10 with 6190 rows to crime_data.gpkg_temp_10.gpkg
Wrote chunk 11 with 2119 rows to crime_data.gpkg_temp_11.gpkg
Wrote chunk 12 with 3330 rows to crime_data.gpkg_temp_12.gpkg
Wrote chunk 13 with 11736 rows to crime_data.gpkg_temp_13.gpkg
Wrote chunk 14 with 4581 rows to crime_data.gpkg_temp_14.gpkg
Wrote chunk 15 with 25968 rows to crime_data.gpkg_temp_15.gpkg
Wrote chunk 16 with 6258 r

KeyboardInterrupt: 

In [12]:
import os
import time
import logging
import pyarrow as pa
import pyarrow.dataset as ds
import geopandas as gpd
import pandas as pd
from shapely.geometry import Point

def chunked_parquet_to_gpkg(
    parquet_file,
    gpkg_file,
    layer_name,
    chunksize=1_000_000,
    x_col='Longitude',
    y_col='Latitude',
    crs="EPSG:4326"
):
    """
    Streams a large Parquet file in chunks, converts each chunk to a GeoDataFrame,
    and writes/appends it to a single GeoPackage layer.
    
    Args:
        parquet_file (str): Path to the large Parquet file.
        gpkg_file (str): Path to output GeoPackage.
        layer_name (str): The name of the layer in the GeoPackage.
        chunksize (int): Approx. number of rows per chunk.
        x_col (str): Column name for x-coordinates (longitude).
        y_col (str): Column name for y-coordinates (latitude).
        crs (str): Coordinate reference system string (e.g., "EPSG:4326").
    """
    logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
    logger = logging.getLogger(__name__)

    logger.info(f"Starting optimized chunked Parquet-to-GPKG conversion...")
    start_time = time.perf_counter()

    # Create dataset
    try:
        dataset = ds.dataset(parquet_file, format="parquet")
        total_rows = dataset.count_rows()
        logger.info(f"Total rows in dataset: {total_rows}")
    except Exception as e:
        logger.error(f"Failed to open Parquet dataset: {e}")
        return

    # Get schema information for the first chunk to ensure consistency
    try:
        # Get the first batch to determine schema
        scanner = dataset.scanner(batch_size=1)
        first_batch = next(scanner.to_batches())
        schema = first_batch.schema
        logger.info(f"Dataset schema: {schema}")
    except Exception as e:
        logger.error(f"Error accessing schema: {e}")
        return

    # Process in chunks
    chunk_idx = 0
    processed_rows = 0
    
    # Create scanner with the desired batch size
    scanner = dataset.scanner(batch_size=chunksize)
    
    # Delete existing file if it exists to avoid append issues
    if os.path.exists(gpkg_file) and chunk_idx == 0:
        try:
            if os.path.exists(gpkg_file):
                logger.info(f"Removing existing GeoPackage: {gpkg_file}")
                os.remove(gpkg_file)
        except Exception as e:
            logger.error(f"Error removing existing GeoPackage: {e}")
    
    for batch in scanner.to_batches():
        chunk_start = time.perf_counter()
        
        try:
            # Convert batch to DataFrame
            df = batch.to_pandas()
            processed_rows += len(df)
            
            # Create geometry column
            geometry = [Point(x, y) for x, y in zip(df[x_col], df[y_col])]
            
            # Create GeoDataFrame
            gdf = gpd.GeoDataFrame(df, geometry=geometry, crs=crs)
            
            # Write to GeoPackage
            if chunk_idx == 0:
                # First chunk: create new file
                gdf.to_file(gpkg_file, layer=layer_name, driver="GPKG")
                logger.info(f"Created new GeoPackage with {len(gdf)} rows in layer '{layer_name}'")
            else:
                # Subsequent chunks: append to existing layer
                gdf.to_file(gpkg_file, layer=layer_name, driver="GPKG", mode="a")
                logger.info(f"Appended {len(gdf)} rows to layer '{layer_name}'")
                
            chunk_idx += 1
            chunk_time = time.perf_counter() - chunk_start
            elapsed = time.perf_counter() - start_time
            
            # Log progress
            percent = processed_rows / total_rows * 100
            est_total_time = (elapsed / processed_rows) * total_rows
            remaining_time = est_total_time - elapsed
            
            logger.info(
                f"Chunk {chunk_idx} processed in {chunk_time:.2f}s, "
                f"{processed_rows}/{total_rows} rows ({percent:.2f}%). "
                f"Estimated remaining time: {remaining_time:.2f}s"
            )
            
        except Exception as e:
            logger.error(f"Error processing chunk {chunk_idx}: {e}")
            continue

    # Final summary
    total_elapsed = time.perf_counter() - start_time
    logger.info(f"Finished writing {chunk_idx} chunks to '{gpkg_file}' layer='{layer_name}'.")
    logger.info(f"Total rows processed: {processed_rows}")
    logger.info(f"Total time: {total_elapsed:.2f}s")
    logger.info(f"Average processing speed: {processed_rows / total_elapsed:.2f} rows/second")

# -----------------------------------------------------------------------------
if __name__ == "__main__":
    parquet_file = r"C:\Studie\SSML\project\police_crime_archive\extracted_2013_2025\combined_street.parquet"
    gpkg_file   = r"C:\Studie\SSML\project\police_crime_archive\extracted_2013_2025\crime_data_uk.gpkg"
    layer_name  = "street_data"

    # Adjust chunk size based on available memory
    chunk_size = 500_000  # Reduced for better memory management

    # Run the chunk-based conversion
    chunked_parquet_to_gpkg(
        parquet_file=parquet_file,
        gpkg_file=gpkg_file,
        layer_name=layer_name,
        chunksize=chunk_size
    )

2025-03-17 20:58:18,092 [INFO] Starting optimized chunked Parquet-to-GPKG conversion...
2025-03-17 20:58:18,361 [INFO] Total rows in dataset: 87463152
2025-03-17 20:58:18,491 [INFO] Dataset schema: Crime ID: string
Month: string
Reported by: string
Falls within: string
Longitude: double
Latitude: double
Location: string
LSOA code: string
LSOA name: string
Crime type: string
Last outcome category: string
Context: string
2025-03-17 20:58:20,002 [INFO] Created 12,201 records
2025-03-17 20:58:20,269 [INFO] Created new GeoPackage with 12201 rows in layer 'street_data'
2025-03-17 20:58:20,273 [INFO] Chunk 1 processed in 1.29s, 12201/87463152 rows (0.01%). Estimated remaining time: 15584.66s
2025-03-17 20:58:20,629 [INFO] Created 5,393 records
2025-03-17 20:58:20,654 [INFO] Appended 5393 rows to layer 'street_data'
2025-03-17 20:58:20,655 [INFO] Chunk 2 processed in 0.38s, 17594/87463152 rows (0.02%). Estimated remaining time: 12705.80s
2025-03-17 20:58:20,741 [INFO] Created 1,470 records
202