In [1]:
import pandas as pd
import os
import re

In [2]:
file_path = "c:/Users/User/Desktop/Project/data project/car_scrape_pipeline/dags/etl/data/raw/resale_car.csv"
columns_keyword = ["year_produce", "brand", "model","car_type", "gear_type", "engine_cap","mileage", "price"]

In [3]:
try:
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"File {file_path} does not exist.")
    
    if file_path.endswith('.csv'):
        data = pd.read_csv(file_path, encoding='ISO-8859-1')
    elif file_path.endswith('.json'):
        data = pd.read_json(file_path)
    else:
        raise ValueError("Unsupported file format. Please provide a CSV or JSON file.")

    df = data

except FileNotFoundError as e:
    print(e)
except Exception as e:
    print(f"An error occurred: {e}")

In [4]:
df_cleaned = df.copy()
def get_year(datas):
    for index, row in datas.iterrows():
        first_string = row["Description"].split(' ')[0]
        datas.at[index, 'Description'] = row["Description"].replace(first_string + ' ', '', 1)
        datas.at[index, 'year_produce'] = first_string

    return datas

for col in df_cleaned.columns:
    if 'description' in col.lower():
        df_cleaned = get_year(df_cleaned)


In [5]:
def get_brand(datas):

    datas.replace('N/A', None, inplace=True)

    for index, row in datas.iterrows():
        if row["Model"] is None:
            continue
        split_string = str(row["Model"]).split(' ')
        datas.at[index, 'brand'] = split_string[0]
        datas.at[index, 'Model'] = " ".join(split_string[1:])
    
    return datas

for col in df_cleaned.columns:
    if 'model' in col.lower():
        df_cleaned = get_brand(df_cleaned)

In [6]:
df_cleaned["car_type"] = "used"

In [7]:
def get_cap(datas):

    for index, row in datas.iterrows():

        cap_match = re.search(r'\b\d+\.\d\b', row["Description"])
        if cap_match:
            datas.at[index, 'engine_cap'] = cap_match.group(0)
        else:
            datas.at[index, 'engine_cap'] = "0.0"

    return datas

for col in df_cleaned.columns:
    if 'description' in col.lower():
        df_cleaned = get_cap(df_cleaned)

In [8]:
def get_mileage(datas):
    for index, row in datas.iterrows():
        numbers = re.findall(r'\d+(?:,\d+)*(?:\.\d+)?', row["Milleage"])
        
        if numbers:  
            numbers = [float(n.replace(",", "")) for n in numbers]

            if len(numbers) > 1:
                avg_number = sum(numbers) / len(numbers)
            else:
                avg_number = numbers[0]

            if re.search(r'\d+K', row["Milleage"]):
                avg_number *= 1000

            datas.at[index, 'mileage'] = avg_number
        else:
            datas.at[index, 'mileage'] = None

    return datas

for col in df_cleaned.columns:
    if 'milleage' in col.lower():
        df_cleaned = get_mileage(df_cleaned)


In [9]:
def get_price(datas):
    for index, row in datas.iterrows():
        if pd.isna(row["List_Price"]):
            numbers = re.findall(r'\d+(?:,\d+)*(?:\.\d+)?', row['Monthly_Installment'])
            if numbers:  
                datas.at[index, 'old_price'] = float(numbers[0].replace(',', '')) * 77
            else:
                datas.at[index, 'old_price'] = None
        else:
            numbers = re.findall(r'\d+(?:,\d+)*(?:\.\d+)?', row['List_Price'])
            if numbers:  
                datas.at[index, 'old_price'] = float(numbers[0].replace(',', ''))
            else:
                datas.at[index, 'old_price'] = None

    return datas

df_cleaned = get_price(df_cleaned)

In [None]:
df_cleaned.columns = df_cleaned.columns.str.lower()

df = df_cleaned[["year_produce", "brand", "model","car_type", "gear_type", "engine_cap","mileage", "old_price"]]
    
base_name = os.path.basename(file_path)
name, ext = os.path.splitext(base_name)
new_file_name = f"old_{name}_clean{ext}"
df.to_csv(f'{new_file_name}')