In [1]:
import firebase_admin as fa
from firebase_admin import firestore
import pandas as pd
import os
from datetime import datetime, timedelta
from typing import TypedDict
from datetime import datetime
from google.cloud.firestore_v1 import FieldFilter

In [6]:
secret = fa.credentials.Certificate("service-account.json")
app = fa.initialize_app(secret)
db = firestore.client()

In [None]:
carsRef = db.collection("cars_db")
salesRef = db.collection("sales_db")

In [8]:
class Car(TypedDict):
    brand_name: str
    model_name: str
    city_name: str
    car_condition: str


class Sales(TypedDict):
    car_id: str
    date: datetime
    sales_count: int
    total_sum: float
    average_price: float

In [9]:
filtered_cities = [
    "Atlanta",
    "Washington",
    "New York",
    "Chicago",
    "Las Vegas",
    "Seattle",
    "Boston",
    "San Francisco",
    "Los Angeles",
    # "Charlotte",
    # "San Diego",
    # "Tampa",
    # "Houston",
    # "Sacramento",
    # "Phoenix",
    # "Dallas",
    # "Miami",
]

filtered_brands = [
    "BMW",
    "Ferrari",
    "Ford",
    "Lamborghini",
    "Rolls-Royce",
    "Porsche",
    "Toyota",
    "Tesla",
]

cached_car_ids = dict()
count = 0


def get_date_from_week(year, week):
    first_day_of_year = datetime(year, 1, 1)
    days_to_week = (week - 1) * 7
    # Adjust to the first day of the first week (ISO standard)
    if first_day_of_year.weekday() <= 3:  # If Jan 1 is Thursday or earlier
        start_of_week = first_day_of_year - timedelta(days=first_day_of_year.weekday())
    else:
        start_of_week = first_day_of_year + timedelta(
            days=7 - first_day_of_year.weekday()
        )
    date_of_week = start_of_week + timedelta(days=days_to_week)
    return date_of_week


def filterDataFrame(data_frame: pd.DataFrame):

    filters = {"city": filtered_cities, "brand": filtered_brands}

    filtered_df = data_frame[
        data_frame["brand"].isin(filters["brand"])
        & data_frame["city"].isin(filters["city"])
    ]

    return (
        filtered_df.groupby("model")  # Group by ColumnA
        .head(20)  # Take only the first `max_rows_per_value` rows per group
        .reset_index(drop=True)  # Reset index for cleaner output
    )


def get_or_create_car_id(car_data: Car):

    # print("entered to car processing")
    car_key = (
        car_data["brand_name"],
        car_data["city_name"],
        car_data["car_condition"],
        car_data["model_name"],
    )
    # print(car_key)

    if car_key in cached_car_ids:
        return cached_car_ids[car_key]

    # print("building query")
    try:
        query = (
            carsRef.where(filter=FieldFilter("city_name", "==", car_data["city_name"]))
            .where(filter=FieldFilter("brand_name", "==", car_data["brand_name"]))
            .where(filter=FieldFilter("car_condition", "==", car_data["car_condition"]))
            .where(filter=FieldFilter("model_name", "==", car_data["model_name"]))
        )
    except Exception as e:
        print(e)

    # print("querying the db...")
    docs = query.stream()
    # print(docs)
    # print("querying done...")
    cars = []
    for doc in docs:
        cars.append(doc.id)

    # print(cars)

    size = len(cars)

    carId = ""
    if size > 1:
        raise Exception("DuplicateRecordsFound", "cars")
    elif size == 1:
        carId = cars[0]
    else:
        _, carRecord = carsRef.add(car_data)
        carId = carRecord.id

    # ### just create

    # _, carRecord = db.collection("cars_db").add(car_data)
    # carId = carRecord.id

    cached_car_ids[car_key] = carId
    # print(carId)
    return carId


def get_or_create_or_update_sale_data(sale_data: Sales):
    query = salesRef.where("car_id", "==", sale_data["car_id"]).where(
        "date", "==", sale_data["date"]
    )

    docs = query.stream()
    sales = []
    for doc in docs:
        sales.append(doc.id)

    size = len(sales)

    if size > 1:
        raise Exception("DuplicateRecordsFound", "sales")
    if size == 1:
        salesRef.document(keys[0]).set(sale_data, merge=True)
        return sales[0]
    else:
        _, sale_record = salesRef.add(sale_data)
        return sale_record.id


def parseDataFrame(x: pd.Series, date: datetime) -> None:
    brand_name: str = x.brand
    model_name: str = x.model
    car_condition: str = x.itemCondition
    city_name: str = x.city

    total_sales_price: float = x.sales_sum
    sales_count: int = x.sales_count

    car_data = Car(
        brand_name=brand_name,
        model_name=model_name,
        car_condition=car_condition,
        city_name=city_name,
    )

    sale_data = Sales()
    if sales_count == 0:
        sale_data.setdefault("average_price", 0)
        sale_data.setdefault("sales_count", 0)
        sale_data.setdefault("total_sum", 0)
    else:
        average_price = round(total_sales_price / sales_count, 2)

        sale_data.setdefault("total_sum", total_sales_price)
        sale_data.setdefault("sales_count", sales_count)
        sale_data.setdefault("average_price", average_price)

    sale_data.setdefault("date", date)

    try:
        # print(car_data)
        car_record_id = get_or_create_car_id(car_data)
        # print("car_recorded_id", car_record_id)

        sale_data.setdefault("car_id", car_record_id)

        # print(sale_data)
        sale_record_id = get_or_create_or_update_sale_data(sale_data)
        # print("sale_record_id ", sale_record_id)
        # logger.info("Added record sale: ", sale_record_id, str(sale_data))

    except Exception as e:
        if e.args[0] == "DuplicateRecordsFound":
            print(
                "Failed to add record of car: ",
                car_data,
                " sale: ",
                sale_data,
                e.args[1],
            )
            return "error"
    return "success"


def run(folder_path: str):
    year = 2024

    for file_name in os.listdir(folder_path):
        if file_name.endswith(".parquet"):
            # Extract the week number from the file name
            parts = file_name.split("_")
            week_str = parts[-1][1:].replace(".parquet", "")  # Remove file extension
            if week_str.isdigit():  # Ensure it's a valid number
                week = int(week_str)
                date = get_date_from_week(year, week).date().strftime('%Y-%m-%d')
                
                parquet_data = pd.read_parquet(os.path.join(folder_path, file_name))
                filtered_data = filterDataFrame(parquet_data)

                filtered_data.apply(parseDataFrame, axis=1, date=date)

In [None]:
run('data/')

In [None]:
car_data = {
    "brand_name": "Toyota",
    "model_name": "Corolla",
    "car_condition": "Used",
    "city_name": "Atlanta",
}

# _, car_record = carsRef.add(car_data)
# print(car_record.id)

saleDb = [
    "6vL702kyrp2KPYh7TvIq",
    "rzAzTgc6NqBlZbnkRBK8",
    "BNRAjOFxWzm4ubk5rzc5",
    "tcMe57KuA7mcnJYe40fD",
    "x2RA215QUJWctcbtK7zz",
    "74qeNWT9QowVcRGbUzqR",
    "YsVPRsswenS2zB9qC6g2",
    "6nU9rneJxOgNmrhFWpNo",
    "nE0zNMxux66ldsF73pvy",
    "12oYEuawCeiiS1WqlKYR",
]

brands = {}
for sale in saleDb:
    query = carsRef.document(sale)
    docs = query.get().to_dict()
    if docs["brand_name"] in brands:
        brands[docs["brand_name"]].append(docs)
    else:
        brands[docs["brand_name"]] = [docs]

# carsRef.document(brand.id).delete()

In [None]:
for brand_name in brands.keys():
    print("brand: ", brand_name)
    for val in brands[brand_name]:
        print("     model_name: ", val["model_name"])

### Testings:

1. only brands:
   select 5 brands:
   
    "BMW",
    "Ford",
    "Rolls-Royce",
    "Porsche",
    "Toyota"
2. Only Models:
    


### Metrics

In [4]:


brands = set()
models = set()
cities = set()
total_sales = 0
total_sum = 0


def collectMetrics(x: pd.Series):
    print(x)
    brand = x["brand"]
    model = x["model"]
    city = x["city"]
    global total_sum
    global total_sales

    sale_count = x["sales_count"]
    price_sum = x["sales_sum"]

    models.add(model)
    brands.add(brand)
    cities.add(city)

    total_sales += sale_count
    total_sum += price_sum
    return ""


for file_name in os.listdir("data/"):
    if file_name.endswith(".parquet"):
        # Extract the week number from the file name
        parts = file_name.split("_")
        week_str = parts[-1][1:].replace(".parquet", "")  # Remove file extension
        if week_str.isdigit():  # Ensure it's a valid number
            week = int(week_str)

            parquet_data = pd.read_parquet(os.path.join("data/", file_name))

            city_list = parquet_data["city"].unique().tolist()
            for city in city_list:
                cities.add(city)

            brand_list = parquet_data["brand"].unique().tolist()
            for brand in brand_list:
                brands.add(brand)

            model_list = parquet_data["model"].unique().tolist()
            for model in model_list:
                models.add(model)

            total_sales += parquet_data["sales_count"].sum()
            total_sum += parquet_data["sales_sum"].sum()

In [5]:
print(len(brands), len(models), len(cities), total_sales, total_sum)

103 1883 3393 64124613 2540004029542.0


In [11]:
get_date_from_week(2024, 43).date().strftime('%Y-%m-%d')


'2024-10-21'

In [16]:
metricsRef = db.collection("metrics_db")


def format_number(num):
    if num >= 1e9:
        return f"{num / 1e9:.2f}B"  # Billion
    elif num >= 1e6:
        return f"{num / 1e6:.2f}M"  # Million
    elif num >= 1e3:
        return f"{num / 1e3:.2f}K"  # Thousand
    else:
        return str(num)  # Less than Thousand


metricsRef.document("2024-10-21").set(
    {
        "brands": len(brands),
        "models": len(models),
        "cities": len(cities),
        "sales_count": format_number(total_sales),
        "sales_sum": format_number(total_sum),
    }
)

update_time {
  seconds: 1731991624
  nanos: 199048000
}

In [18]:
format_number(64124613)

'64.12M'