In [1]:
import os

%pwd

'/Users/kzc0l4/arcdemo/notebooks'

In [12]:
os.chdir("../")
%pwd

'/Users/kzc0l4/arcdemo'

In [46]:
import pandas as pd

df = pd.read_csv("../data/call_logs.csv")
df.head()

Unnamed: 0,lat,lon,timestamp
0,37.775217,-122.418489,2025-02-16 15:30:37.020819
1,37.775497,-122.418479,2025-02-16 15:18:21.020819
2,37.775283,-122.419299,2025-02-16 15:20:29.020819
3,37.774932,-122.418852,2025-02-16 15:20:16.020819
4,37.775481,-122.418644,2025-02-16 15:31:15.020819


In [49]:
now = pd.Timestamp.now()
df["timestamp"] = pd.to_datetime(df["timestamp"])
oldest_valid_value = now - pd.to_timedelta(3, unit="hours")
df = df[df["timestamp"] >= oldest_valid_value]

In [24]:
import geohash

df["geohash"] = df.apply(lambda row: geohash.encode(row["lat"], row["lon"], precision=6), axis=1)

# Sort by geohash and timestamp
df = df.sort_values(by=["geohash", "timestamp"])
df.head()

Unnamed: 0,lat,lon,timestamp,geohash
2375,37.770439,-122.446066,2025-02-16 16:18:30.090597,9q8yve
3174,37.770225,-122.449171,2025-02-16 16:22:57.297603,9q8yve
2662,37.770213,-122.443306,2025-02-16 16:23:51.090597,9q8yve
3970,37.770083,-122.447129,2025-02-16 16:23:58.297603,9q8yve
3451,37.770995,-122.446312,2025-02-16 16:26:05.297603,9q8yve


In [None]:
TIME_GAP = 15 * 60  # 15 minutes
df["timestamp"] = pd.to_datetime(df["timestamp"])
df = df.sort_values(by=["geohash", "timestamp"])
df.set_index("timestamp", inplace=True)
agg_df = (
    df.groupby("geohash")
    .resample(f"{TIME_GAP}s")  # Group by 15-minute intervals
    .agg(count=("geohash", "count"))
    .reset_index()
)

print(agg_df)

   geohash           timestamp  count
0   9q8yve 2025-02-16 16:15:00      6
1   9q8yvg 2025-02-16 16:15:00     12
2   9q8yvg 2025-02-16 16:30:00      1
3   9q8yvs 2025-02-16 16:15:00     30
4   9q8yvs 2025-02-16 16:30:00      5
..     ...                 ...    ...
78  9q8zn7 2025-02-16 16:30:00      4
79  9q8znh 2025-02-16 16:15:00     53
80  9q8znh 2025-02-16 16:30:00      4
81  9q8znk 2025-02-16 16:15:00     40
82  9q8znk 2025-02-16 16:30:00      2

[83 rows x 3 columns]


In [5]:
os.chdir("src")

In [6]:
from arcdemo.constants import CONFIG_FILE_PATH
from arcdemo.utils.common import read_config
from dataclasses import dataclass

# data_preparation:
#   input_data_file: ../data/call_logs.csv
#   history_in_hours: 3
#   output_data_file: ../data/call_logs_prepared.csv


@dataclass
class DataPreparationConfig:
    input_data_file: str
    history_in_hours: int
    output_data_file: str


class ConfigurationManager:
    def __init__(self, config_filepath=CONFIG_FILE_PATH):
        self.config = read_config(config_filepath)

    def get_data_preparation_config(self) -> DataPreparationConfig:
        data_preparation_config = DataPreparationConfig(
            input_data_file=self.config.data_preparation.input_data_file,
            history_in_hours=self.config.data_preparation.history_in_hours,
            output_data_file=self.config.data_preparation.output_data_file,
        )
        return data_preparation_config

In [30]:
config = ConfigurationManager()

print(config.get_data_preparation_config())

[2025-02-16 16:52:06,332: INFO: common: Config file loaded: /Users/kzc0l4/arcdemo/confs/config.yaml]
DataPreparationConfig(input_data_file='../data/call_logs.csv', history_in_hours=3, output_data_file='../data/call_logs_prepared.csv')


In [38]:
os.chdir("src")
%pwd

'/Users/kzc0l4/arcdemo/src'

In [57]:
# Description: Generate random points within a bounding box.
import pandas as pd
from arcdemo.constants import TIME_GAP
# from arcdemo.entity.config_entity import DataPreparationConfig


class DataPreparation:
    """
    Generate a geohash and aggregate by geohash and timestamp.
    """

    def __init__(self, config: DataPreparationConfig):
        self.config = config
        self.df = pd.read_csv(self.config.input_data_file)
        self.geohashed_df = None  # Stores geohashed data
        self.aggregated_df = None  # Stores aggregated data

    def truncate_call_logs(self):
        """
        Read from file and return the last n hours of data.
        Save the data back to that local file.
        """
        now = pd.Timestamp.now()
        self.df["timestamp"] = pd.to_datetime(self.df["timestamp"])
        oldest_valid_value = now - pd.to_timedelta(self.config.history_in_hours, unit="hours")
        self.df = self.df[self.df["timestamp"] >= oldest_valid_value]
        self.df.to_csv(self.config.input_data_file, index=False)

    def geohash_call_logs(self) -> pd.DataFrame:
        """
        Generate a geohash for each datapoint.
        """
        self.geohashed_df = self.df.copy()
        self.geohashed_df["geohash"] = self.geohashed_df.apply(
            lambda x: geohash.encode(x["lat"], x["lon"], precision=5), axis=1
        )

    def aggregate_call_logs(self) -> pd.DataFrame:
        """
        Aggregate the data by geohash and timestamp at TIME_GAP intervals.
        """
        if self.geohashed_df is None:
            raise ValueError("Error: Call geohash_call_logs() before aggregate_call_logs()")

        # Ensure timestamp is in datetime format
        df = self.geohashed_df.copy()
        # df["timestamp"] = pd.to_datetime(df["timestamp"])
        df = df.sort_values(by=["geohash", "timestamp"])
        df.set_index("timestamp", inplace=True)

        # Aggregate data
        self.aggregated_df = (
            df.groupby("geohash")
            .resample(f"{TIME_GAP}s")
            .agg(count=("geohash", "count"))
            .reset_index()
        )

        # Save the aggregated data
        self.aggregated_df.to_csv(self.config.output_data_file, index=False)

    def process_data(self):
        """
        Run the full pipeline: truncate → geohash → aggregate.
        """
        self.truncate_call_logs()
        self.geohash_call_logs()
        self.aggregate_call_logs()

In [59]:
config = ConfigurationManager()
data_preparation_config = config.get_data_preparation_config()
data_preparation = DataPreparation(config=data_preparation_config)
# print(data_preparation_config)
# # bbox = BoundingBox(40.5, 40.9, -74.3, -73.7)
# df = data_preparation.truncate_call_logs()
# df = data_preparation.geohash_call_logs()
# df = data_preparation.aggregate_call_logs()
df = data_preparation.process_data()

[2025-02-16 17:04:42,477: INFO: common: Config file loaded: /Users/kzc0l4/arcdemo/confs/config.yaml]
