# Data Dictionary Generation
Analyze the actual Parquet file to understand the data structure and generate an accurate data dictionary.

In [2]:
import pandas as pd
import pyarrow.parquet as pq
import numpy as np
from pathlib import Path

# Load the parquet file
file_path = 'parquet/v2x_messages_20250820_202303_167720.parquet'
df = pd.read_parquet(file_path)

print('=== BASIC INFO ===')
print(f'Shape: {df.shape}')
print(f'Columns: {list(df.columns)}')
print()

print('=== DATA TYPES ===')
print(df.dtypes)
print()

print('=== SAMPLE DATA (First Row) ===')
for col in df.columns:
    sample_val = df[col].iloc[0] if len(df) > 0 else 'N/A'
    # Handle potential binary data safely
    if isinstance(sample_val, bytes):
        print(f'{col}: <bytes, length={len(sample_val)}>')
    else:
        print(f'{col}: {sample_val}')
print()

print('=== DETAILED COLUMN ANALYSIS ===')
for col in df.columns:
    print(f'\n{col}:')
    print(f'  Type: {df[col].dtype}')
    non_null_count = df[col].notna().sum()
    print(f'  Non-null: {non_null_count}/{len(df)} ({non_null_count/len(df)*100:.1f}%)')
    
    if df[col].dtype == 'object':
        unique_count = df[col].nunique()
        print(f'  Unique values: {unique_count}')
        
        # Check if it's binary data
        sample_vals = df[col].dropna().head(3)
        has_bytes = any(isinstance(val, bytes) for val in sample_vals)
        
        if has_bytes:
            print(f'  Data type: Binary/bytes')
            # Show byte lengths
            byte_lengths = [len(val) if isinstance(val, bytes) else 0 for val in sample_vals]
            print(f'  Byte lengths: {byte_lengths}')
        else:
            # Regular string analysis
            if unique_count <= 15:
                print(f'  Values: {sorted(df[col].dropna().unique())}')
            else:
                print(f'  Sample values: {list(df[col].dropna().unique()[:5])}')
                
    elif pd.api.types.is_numeric_dtype(df[col]):
        if non_null_count > 0:
            print(f'  Range: {df[col].min()} to {df[col].max()}')
            print(f'  Sample values: {list(df[col].dropna().head(3).values)}')
    elif pd.api.types.is_datetime64_any_dtype(df[col]):
        if non_null_count > 0:
            print(f'  Range: {df[col].min()} to {df[col].max()}')

=== BASIC INFO ===
Shape: (1000, 6)
Columns: ['mf_bytes', 'TimeStamp', 'MessageType', 'Geohash', 'Latitude', 'Longitude']

=== DATA TYPES ===
mf_bytes        object
TimeStamp      float64
MessageType     object
Geohash         object
Latitude       float64
Longitude      float64
dtype: object

=== SAMPLE DATA (First Row) ===
mf_bytes: <bytes, length=512>
TimeStamp: 1755720883.157014
MessageType: BasicSafetyMessage
Geohash: 9t9p75yd
Latitude: 32.2329212
Longitude: -110.9528807

=== DETAILED COLUMN ANALYSIS ===

mf_bytes:
  Type: object
  Non-null: 1000/1000 (100.0%)
  Unique values: 501
  Data type: Binary/bytes
  Byte lengths: [512, 512, 512]

TimeStamp:
  Type: float64
  Non-null: 1000/1000 (100.0%)
  Range: 1755720883.157014 to 1755721383.156842
  Sample values: [np.float64(1755720883.157014), np.float64(1755720884.15119), np.float64(1755720884.152675)]

MessageType:
  Type: object
  Non-null: 1000/1000 (100.0%)
  Unique values: 1
  Values: ['BasicSafetyMessage']

Geohash:
  Type: ob

In [3]:
# Additional analysis for data dictionary
print('=== TIMESTAMP CONVERSION ===')
timestamp_val = df['TimeStamp'].iloc[0]
from datetime import datetime
dt = datetime.fromtimestamp(timestamp_val)
print(f'Timestamp {timestamp_val} = {dt} UTC')

print('\n=== COORDINATE ANALYSIS ===')
lat, lon = df['Latitude'].iloc[0], df['Longitude'].iloc[0]
print(f'Location: {lat}, {lon}')


print('\n=== MESSAGE BYTES ANALYSIS ===')
msg_bytes = df['mf_bytes'].iloc[0]
print(f'Message bytes length: {len(msg_bytes)}')
print(f'First 20 bytes (hex): {msg_bytes[:20].hex()}')

# This matches the notebook expectation for 'mf_bytes' field
print('\n=== SCHEMA COMPATIBILITY ===')
print('✓ mf_bytes: Binary message data (matches notebook expectation)')
print('✓ TimeStamp: Unix timestamp as float64') 
print('✓ Geohash: String for spatial indexing')
print('✓ MessageType: String identifier')
print('✓ Latitude/Longitude: Float64 coordinates in decimal degrees')

=== TIMESTAMP CONVERSION ===
Timestamp 1755720883.157014 = 2025-08-20 13:14:43.157014 UTC

=== COORDINATE ANALYSIS ===
Location: 32.2329212, -110.9528807

=== MESSAGE BYTES ANALYSIS ===
Message bytes length: 512
First 20 bytes (hex): 0014251e5e9355a56a08a46da1be1493e08c1689

=== SCHEMA COMPATIBILITY ===
✓ mf_bytes: Binary message data (matches notebook expectation)
✓ TimeStamp: Unix timestamp as float64
✓ Geohash: String for spatial indexing
✓ MessageType: String identifier
✓ Latitude/Longitude: Float64 coordinates in decimal degrees


# BSM Parquet Analysis

This notebook processes and analyzes Basic Safety Message (BSM) data stored in Parquet files. It demonstrates how to load, filter, decode, and visualize BSM messages, focusing on specific geohashes and vehicle movement over time.

The workflow includes:
- Loading and concatenating Parquet files
- Time and geohash-based grouping
- Focusing on specific geohashes
- Decoding BSM messages using a C binary
- Extracting BSM IDs and timestamps
- Identifying repeated BSMs
- Visualizing vehicle movement with Plotly

## Import Required Libraries
This cell imports essential Python libraries for data manipulation, visualization, and file handling, including pandas, matplotlib, seaborn, pyarrow, and os.

In [4]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import pyarrow.parquet as pq
import os

## Load and Concatenate Parquet Files
This cell locates all Parquet files in the specified directory, reads them into pandas DataFrames, concatenates them into a single DataFrame, and displays basic information about the combined data.

In [5]:
# Load and concatenate parquet files safely
import os
import glob
import pandas as pd
import pyarrow.parquet as pq
from pathlib import Path

# Set the directory containing your parquet files
parquet_dir = "./parquet"  # Update if needed

# Find all .parquet files in the directory (defensive: ensure dir exists)
if not os.path.isdir(parquet_dir):
    print(f"Directory '{parquet_dir}' does not exist. Adjust parquet_dir and re-run.")
    df_all = pd.DataFrame()
else:
    parquet_files = sorted(glob.glob(os.path.join(parquet_dir, "*.parquet")))
    print(f"Found {len(parquet_files)} Parquet files.")
    if parquet_files:
        try:
            df_all = pd.concat([pd.read_parquet(f) for f in parquet_files], ignore_index=True)
            print('\n=== DATAFRAME INFO ===')
            print(df_all.info())
            display(df_all.head())
        except Exception as e:
            print(f"Error reading parquet files: {e}")
            df_all = pd.DataFrame()
    else:
        print("No Parquet files found in the directory.")
        df_all = pd.DataFrame()

Found 2 Parquet files.

=== DATAFRAME INFO ===
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2000 entries, 0 to 1999
Data columns (total 6 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   mf_bytes     2000 non-null   object 
 1   TimeStamp    2000 non-null   float64
 2   MessageType  2000 non-null   object 
 3   Geohash      2000 non-null   object 
 4   Latitude     2000 non-null   float64
 5   Longitude    2000 non-null   float64
dtypes: float64(3), object(3)
memory usage: 93.9+ KB
None


Unnamed: 0,mf_bytes,TimeStamp,MessageType,Geohash,Latitude,Longitude
0,b'\x00\x14%\x1e^\x93U\xa5j\x08\xa4m\xa1\xbe\x1...,1755721000.0,BasicSafetyMessage,9t9p75yd,32.232921,-110.952881
1,b'\x00\x14%\x1e\x9e\x93U\xa5k\x02dm\xa1\xbe\x1...,1755721000.0,BasicSafetyMessage,9t9p75yd,32.232921,-110.952881
2,b'\x00\x14%\x1e\x9e\x93U\xa5k\x02dm\xa1\xbe\x1...,1755721000.0,BasicSafetyMessage,9t9p75yd,32.232921,-110.952881
3,b'\x00\x14%\x1e\xde\x93U\xa5k\xfb\xa4m\xa1\xbe...,1755721000.0,BasicSafetyMessage,9t9p75yd,32.232921,-110.952881
4,b'\x00\x14%\x1e\xde\x93U\xa5k\xfb\xa4m\xa1\xbe...,1755721000.0,BasicSafetyMessage,9t9p75yd,32.232921,-110.952881


## Group by Time and Geohash
This cell converts timestamps to datetime, buckets them by minute, groups the data by time bucket and geohash, and displays the largest groups.

In [6]:
import pandas as pd

# Convert float timestamp to datetime
df_all['Time'] = pd.to_datetime(df_all['TimeStamp'], unit='s')

# Optional: round to nearest 1 minute (use '1min' to avoid deprecation warning)
df_all['TimeBucket'] = df_all['Time'].dt.floor('1min')  # Change to '1min', '30s' etc. as needed

grouped = df_all.groupby(['TimeBucket', 'Geohash'])

group_sizes = grouped.size().reset_index(name='Count')
group_sizes.sort_values('Count', ascending=False).head(10)


Unnamed: 0,TimeBucket,Geohash,Count
1,2025-08-20 20:15:00,9t9p75yd,120
2,2025-08-20 20:16:00,9t9p75yd,120
10,2025-08-20 20:24:00,9t9p75yd,120
3,2025-08-20 20:17:00,9t9p75yd,120
4,2025-08-20 20:18:00,9t9p75yd,120
5,2025-08-20 20:19:00,9t9p75yd,120
6,2025-08-20 20:20:00,9t9p75yd,120
7,2025-08-20 20:21:00,9t9p75yd,120
8,2025-08-20 20:22:00,9t9p75yd,120
9,2025-08-20 20:23:00,9t9p75yd,120


## Filter for Focus Geohashes
This cell filters the DataFrame to include only rows with geohashes of interest, sorts the results for readability, and displays message counts per geohash.

In [10]:
focus_geohashes = ['9t9p75yd']
df_focus = df_all[df_all['Geohash'].isin(focus_geohashes)]

print("Message count per geohash:")
print(df_focus['Geohash'].value_counts())

# Sort by geohash and time for better readability
df_focus_sorted = df_focus.sort_values(by=["Geohash", "Time"])
df_focus_sorted.reset_index(drop=True, inplace=True)
df_focus_sorted



Message count per geohash:
Geohash
9t9p75yd    2000
Name: count, dtype: int64


Unnamed: 0,mf_bytes,TimeStamp,MessageType,Geohash,Latitude,Longitude,Time,TimeBucket
0,b'\x00\x14%\x1e^\x93U\xa5j\x08\xa4m\xa1\xbe\x1...,1.755721e+09,BasicSafetyMessage,9t9p75yd,32.232921,-110.952881,2025-08-20 20:14:43.157013893,2025-08-20 20:14:00
1,b'\x00\x14%\x1e\x9e\x93U\xa5k\x02dm\xa1\xbe\x1...,1.755721e+09,BasicSafetyMessage,9t9p75yd,32.232921,-110.952881,2025-08-20 20:14:44.151190042,2025-08-20 20:14:00
2,b'\x00\x14%\x1e\x9e\x93U\xa5k\x02dm\xa1\xbe\x1...,1.755721e+09,BasicSafetyMessage,9t9p75yd,32.232921,-110.952881,2025-08-20 20:14:44.152674913,2025-08-20 20:14:00
3,b'\x00\x14%\x1e\xde\x93U\xa5k\xfb\xa4m\xa1\xbe...,1.755721e+09,BasicSafetyMessage,9t9p75yd,32.232921,-110.952881,2025-08-20 20:14:45.151259899,2025-08-20 20:14:00
4,b'\x00\x14%\x1e\xde\x93U\xa5k\xfb\xa4m\xa1\xbe...,1.755721e+09,BasicSafetyMessage,9t9p75yd,32.232921,-110.952881,2025-08-20 20:14:45.152271032,2025-08-20 20:14:00
...,...,...,...,...,...,...,...,...
1995,b'\x00\x14%\x17\xde\x93U\xa5T\x8a\xe4m\xa1\xc0...,1.755722e+09,BasicSafetyMessage,9t9p75yd,32.232922,-110.952881,2025-08-20 20:31:21.136713982,2025-08-20 20:31:00
1996,b'\x00\x14%\x17\xde\x93U\xa5T\x8a\xe4m\xa1\xc0...,1.755722e+09,BasicSafetyMessage,9t9p75yd,32.232922,-110.952881,2025-08-20 20:31:21.138233900,2025-08-20 20:31:00
1997,b'\x00\x14%\x18\x1e\x93U\xa5U\x84\xa4m\xa1\xc0...,1.755722e+09,BasicSafetyMessage,9t9p75yd,32.232922,-110.952881,2025-08-20 20:31:22.136763096,2025-08-20 20:31:00
1998,b'\x00\x14%\x18\x1e\x93U\xa5U\x84\xa4m\xa1\xc0...,1.755722e+09,BasicSafetyMessage,9t9p75yd,32.232922,-110.952881,2025-08-20 20:31:22.137904882,2025-08-20 20:31:00


## Decode BSM Messages Using C Binary
This cell locates the C binary decoder, prepares the focus DataFrame, converts message bytes to hex, and decodes each BSM message using the external decoder. It also prints a few decoded outputs for inspection.

In [11]:
import os
import subprocess
import pandas as pd
from pathlib import Path

# 1. Detect repo root and set decoder path RELATIVE to repo root
notebook_dir = Path.cwd()
repo_root = None

# Traverse up to find .git as marker for root
for parent in notebook_dir.parents:
    if (parent / ".git").exists():
        repo_root = parent
        break
if repo_root is None:
    repo_root = notebook_dir  # fallback if not using git

DECODER_PATH = repo_root / "libsm/b2v-libsm/build/bin/decodeToJER"
print("Detected repo root:", repo_root)
print("Decoder Path:", DECODER_PATH)
if not DECODER_PATH.exists():
    raise FileNotFoundError(f"decodeToJER not found at {DECODER_PATH}")

# 2. Focused geohashes
focus_geohashes = ['9t9p75yd']  # Update as needed
df_focus = df_all[df_all['Geohash'].isin(focus_geohashes)].copy()
print("BSMs in focus:", len(df_focus))

# 3. Convert mf_bytes to hex
def mf_bytes_to_hex(val):
    if isinstance(val, (bytes, bytearray)):
        return val.hex()
    if isinstance(val, str) and val.startswith("b'"):  # as string repr
        return eval(val).hex()
    return None

df_focus["mf_hex"] = df_focus["mf_bytes"].apply(mf_bytes_to_hex)

# 4. Decode each BSM using the C binary
def decode_bsm_hex(hex_str):
    try:
        result = subprocess.run(
            [str(DECODER_PATH), "-i", hex_str],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            check=True,
            text=True,
            timeout=3,
        )
        return result.stdout
    except Exception as e:
        print(f"[DecodeError] {e}")
        return None

df_focus["jer"] = df_focus["mf_hex"].apply(decode_bsm_hex)
print("Decoded BSMs:", df_focus['jer'].notnull().sum())

# 5. (Optional) Show a few decoded outputs for inspection
for jer in df_focus["jer"].dropna().head(3):
    print(jer)


Detected repo root: /home/muhid/Downloads/repo/driveAZ_Analytics
Decoder Path: /home/muhid/Downloads/repo/driveAZ_Analytics/libsm/b2v-libsm/build/bin/decodeToJER
BSMs in focus: 2000
Decoded BSMs: 2000
{
    "messageId": 20,
    "value": {
        "BasicSafetyMessage": {
            "coreData": {
                "msgCnt": 121,
                "id": "7A4D5695",
                "secMark": 43042,
                "lat": 322329212,
                "long": -1109528807,
                "elev": 7443,
                "accuracy": {
                    "semiMajor": 255,
                    "semiMinor": 255,
                    "orientation": 65535
                },
                "transmission": "unavailable",
                "speed": 0,
                "heading": 17672,
                "angle": 127,
                "accelSet": {
                    "long": 100,
                    "lat": -2,
                    "vert": 0,
                    "yaw": -21
                },
                "brakes

## Extract BSM ID and Identify Repeated Messages
This cell parses the decoded JER output to extract BSM IDs and secMark values, sorts by ID and timestamp, computes time differences between consecutive messages with the same ID, and identifies BSMs that are repeated within a short time window.

In [12]:
import json
import numpy as np

# 1. Extract id and secMark from the decoded JER string (for each row)
def extract_id_secmark(jer_str):
    try:
        jer = json.loads(jer_str)
        bsm = jer["value"]["BasicSafetyMessage"]["coreData"]
        return bsm.get("id"), bsm.get("secMark")
    except Exception as e:
        return None, None

df_focus[["bsm_id", "bsm_secMark"]] = df_focus["jer"].apply(lambda x: pd.Series(extract_id_secmark(x)))

# 2. Check which BSMs have the same id and nearby timestamps (TimeStamp or secMark)
# Sort for easier comparison
df_focus_sorted = df_focus.sort_values(["bsm_id", "TimeStamp"])

# Compute time difference (in seconds) to previous message with same id
df_focus_sorted["prev_TimeStamp"] = df_focus_sorted.groupby("bsm_id")["TimeStamp"].shift(1)
df_focus_sorted["dt_sec"] = df_focus_sorted["TimeStamp"] - df_focus_sorted["prev_TimeStamp"]

# Show BSMs with dt_sec < threshold (e.g., 2 seconds)
threshold = 2
nearby = df_focus_sorted[df_focus_sorted["dt_sec"].notnull() & (df_focus_sorted["dt_sec"] < threshold)]

print(f"BSMs with repeated id within {threshold} seconds:")
display(nearby[["bsm_id", "TimeStamp", "dt_sec", "Geohash", "Latitude", "Longitude"]].head(10))


BSMs with repeated id within 2 seconds:


Unnamed: 0,bsm_id,TimeStamp,dt_sec,Geohash,Latitude,Longitude
1,7A4D5695,1755721000.0,0.994176,9t9p75yd,32.232921,-110.952881
2,7A4D5695,1755721000.0,0.001485,9t9p75yd,32.232921,-110.952881
3,7A4D5695,1755721000.0,0.998585,9t9p75yd,32.232921,-110.952881
4,7A4D5695,1755721000.0,0.001011,9t9p75yd,32.232921,-110.952881
5,7A4D5695,1755721000.0,0.998994,9t9p75yd,32.232921,-110.952881
6,7A4D5695,1755721000.0,0.001628,9t9p75yd,32.232921,-110.952881
7,7A4D5695,1755721000.0,0.983692,9t9p75yd,32.232921,-110.952881
8,7A4D5695,1755721000.0,0.0008,9t9p75yd,32.232921,-110.952881
9,7A4D5695,1755721000.0,0.999238,9t9p75yd,32.232921,-110.952881
10,7A4D5695,1755721000.0,0.001012,9t9p75yd,32.232921,-110.952881


## Visualize BSM Movement by Vehicle ID
This cell prepares the data for animation, then uses Plotly to create an animated map showing the movement of vehicles (by BSM ID) over time, with one frame per second.

In [13]:
import plotly.express as px

# Ensure Time is datetime and round/floor to seconds
df_focus_sorted["Time"] = pd.to_datetime(df_focus_sorted["Time"])
df_focus_sorted["Time_sec"] = df_focus_sorted["Time"].dt.floor("s")

# (Optional) Convert bsm_id to string for display
df_focus_sorted["bsm_id"] = df_focus_sorted["bsm_id"].astype(str)

# Sort by Time for animation
df_anim = df_focus_sorted.sort_values("Time_sec")

# Use scatter_mapbox for better basemap support
fig = px.scatter_mapbox(
    df_anim,
    lat="Latitude",
    lon="Longitude",
    color="bsm_id",      # Color by vehicle
    animation_frame=df_anim["Time_sec"].dt.strftime('%Y-%m-%d %H:%M:%S'),
    hover_name="bsm_id",
    zoom=12,
    height=600,
)

fig.update_layout(
    mapbox_style="open-street-map",
    title="BSM Movement by Vehicle ID (Per Second)",
    margin={"r":0, "t":30, "l":0, "b":0},
)
fig.show()


  fig = px.scatter_mapbox(


## Generate data dictionary from current DataFrame

This section derives a package-level data dictionary CSV from the DataFrame currently in memory.
- Uses observed dtypes, min/max, and sample values
- Writes to `BSM_analysis/dictionary/data_dictionary.csv`
- You can re-run after filtering to produce a dictionary for a subset if needed

In [15]:
# Derive data dictionary from df and write CSV
import os
import pandas as pd
from datetime import datetime, timezone
from pathlib import Path

# Choose the DataFrame source: df (possibly filtered) or df_all (raw)
_df = df if 'df' in globals() else df_all

# Column mappings and friendly metadata
friendly = {
    'mf_bytes': ('binary', 'Raw BSM message bytes for decoding', 'raw bytes', '>0 length'),
    'TimeStamp': ('float', 'Unix timestamp of BSM message', 'epoch seconds (UTC)', 'Valid Unix timestamps'),
    'MessageType': ('string', 'SAE J2735 message type identifier', 'enum', 'BasicSafetyMessage'),
    'Geohash': ('string', 'Geohash spatial index for location', 'Base32 geohash string', 'Valid geohash codes'),
    'Latitude': ('float', 'Vehicle latitude position', 'Decimal degrees (WGS84)', '[-90, 90]'),
    'Longitude': ('float', 'Vehicle longitude position', 'Decimal degrees (WGS84)', '[-180, 180]'),
}

rows = []
for col in _df.columns:
    meta = friendly.get(col)
    if meta:
        col_type, desc, units, allowed = meta
    else:
        col_type, desc, units, allowed = (str(_df[col].dtype), '', '', '')
    req = 'Y'
    non_null = _df[col].dropna()
    if not non_null.empty:
        sample = non_null.iloc[0]
        if isinstance(sample, (bytes, bytearray)):
            ex_val = 'b"<...>"'
        else:
            ex_val = str(sample)[:64]
    else:
        ex_val = ''
    rows.append({
        'Variable': col,
        'Type': col_type,
        'Description': desc,
        'Units/Domain': units,
        'Example': ex_val,
        'Required (Y/N)': req,
        'Allowed Values/Range': allowed,
        'Notes': 'Auto-generated from notebook on ' + datetime.now(timezone.utc).isoformat()
    })

# Resolve output path relative to the notebook directory
nb_dir = notebook_dir if 'notebook_dir' in globals() else Path.cwd()
out_path = Path(nb_dir) / 'dictionary' / 'data_dictionary.csv'
out_path.parent.mkdir(parents=True, exist_ok=True)

pd.DataFrame(
    rows,
    columns=['Variable','Type','Description','Units/Domain','Example','Required (Y/N)','Allowed Values/Range','Notes']
).to_csv(out_path, index=False)

print(f'Wrote data dictionary with {len(rows)} rows to {out_path}')

Wrote data dictionary with 6 rows to /home/muhid/Downloads/repo/driveAZ_Analytics/BSM_analysis/dictionary/data_dictionary.csv
