# Pre-processing before map-matching
### This is a pre-processing stage of the map-matching. We filter the points outside the AOI to reduce the load on the map-matching engine. 
### A work by Sepehr Rafeie and Amir Babaei

###### contact: pr.babayee@icloud.com

In [1]:
import os
import boto3
import gzip
import shutil
from tqdm import tqdm
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

# Load AWS credentials
AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY")
AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY")
AWS_BUCKET = "inrixprod-trip-reports"
S3_PREFIX = "user=MB/year=2025/month=01/day=01/date=2025-02-03/reportId=229480/v1/data/waypoints"

# Relut Org S3 for final upload
AWS_ACCESS_KEY_RELUT = os.getenv("AWS_ACCESS_KEY_RELUT")
AWS_SECRET_KEY_RELUT = os.getenv("AWS_SECRET_KEY_RELUT")
AWS_BUCKET_RELUT = os.getenv("AWS_BUCKET_RELUT")

# Define temporary storage location (use fast external drive)
local_dir = "/mnt/sda1/waypoints_temp"
os.makedirs(local_dir, exist_ok=True)

# Initialize S3 client
s3_client = boto3.client("s3", aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY)

# List all gzipped CSV files
response = s3_client.list_objects_v2(Bucket=AWS_BUCKET, Prefix=S3_PREFIX)
gz_files = [obj["Key"] for obj in response.get("Contents", []) if obj["Key"].endswith(".gz")]
print(f"Found {len(gz_files)} gzipped files.")

# Initialize PySpark with Optimized Settings
spark = SparkSession.builder \
    .appName("FCD-Processing") \
    .config("spark.executor.memory", "32g") \
    .config("spark.driver.memory", "32g") \
    .config("spark.sql.shuffle.partitions", "96") \
    .config("spark.default.parallelism", "48") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.5") \
    .getOrCreate()

# Define Schema
schema = StructType([
    StructField("TripId", StringType(), True),
    StructField("WaypointSequence", IntegerType(), True),
    StructField("CaptureDate", TimestampType(), True),
    StructField("Latitude", DoubleType(), True),
    StructField("Longitude", DoubleType(), True),
    StructField("SegmentId", StringType(), True),
    StructField("ZoneName", StringType(), True),
    StructField("Frc", StringType(), True),
    StructField("DeviceId", StringType(), True),
    StructField("RawSpeed", DoubleType(), True),
    StructField("RawSpeedMetric", StringType(), True)
])

# Bounding Box Limits
MIN_LON, MAX_LON, MIN_LAT, MAX_LAT = 8.4, 8.8, 50.0, 50.3

# Process files in chunks (batch processing)
BATCH_SIZE = 10  # Number of files processed at a time

with tqdm(total=len(gz_files), desc="Processing CSV files", unit="file") as pbar:
    for i in range(0, len(gz_files), BATCH_SIZE):
        batch_files = gz_files[i:i + BATCH_SIZE]
        csv_files = []

        for s3_file in batch_files:
            local_file = os.path.join(local_dir, os.path.basename(s3_file))
            extracted_file = local_file.replace(".gz", ".csv")

            # Download file
            s3_client.download_file(AWS_BUCKET, s3_file, local_file)

            # Extract GZ
            with gzip.open(local_file, "rb") as f_in, open(extracted_file, "wb") as f_out:
                shutil.copyfileobj(f_in, f_out)

            csv_files.append(extracted_file)
            pbar.update(1)  # Update progress bar for each file processed

        # Load extracted batch into PySpark
        df = spark.read.option("maxFilesPerTrigger", 5).csv(csv_files, schema=schema, header=False)

        # Apply BBOX filter
        df_filtered = df.filter(
            (col("Longitude") >= MIN_LON) & (col("Longitude") <= MAX_LON) &
            (col("Latitude") >= MIN_LAT) & (col("Latitude") <= MAX_LAT)
        )

        # Save batch as Parquet
        output_parquet = f"/mnt/sda1/filtered_fcd/2025/1/filtered_batch_{i//BATCH_SIZE}.parquet"
        df_filtered.write.mode("overwrite").parquet(output_parquet)

        
        # # Upload processed Parquet to Relut's S3
        # s3_client_relut = boto3.client("s3", aws_access_key_id=AWS_ACCESS_KEY_RELUT, aws_secret_access_key=AWS_SECRET_KEY_RELUT)

        # for file in tqdm(os.listdir(output_parquet), desc="Uploading to S3", unit="file"):
        #     file_path = os.path.join(output_parquet, file)
        #     s3_target_key = f"filtered_data/2024/11/{file}"

        #     s3_client_relut.upload_file(file_path, AWS_BUCKET_RELUT, s3_target_key)

        # Cleanup temp files to free space
        shutil.rmtree(local_dir)
        os.makedirs(local_dir, exist_ok=True)

print("✅ Processing complete.")


Found 216 gzipped files.


25/03/25 14:47:09 WARN Utils: Your hostname, relut resolves to a loopback address: 127.0.1.1; using 10.3.0.143 instead (on interface enp3s0)
25/03/25 14:47:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Processing CSV files: 100%|██████████| 216/216 [1:30:43<00:00, 25.20s/file]     

✅ Processing complete.



