In [1]:
import sqlite3
import pandas as pd
from datetime import datetime
import regex as re
from pathlib import Path
import requests
import time
import json
import os
from rapidfuzz import fuzz, process




In [2]:
current_dir = Path.cwd()  
project_root = current_dir.parent.parent 

SCRAPED_DB_PATH=project_root/'scraper'/'data'/'mobiles.db'

# today's date in YYYY-MM-DD format
today = datetime.now().strftime("%Y-%m-%d")


conn = sqlite3.connect(SCRAPED_DB_PATH)

#fetching only today's data
df = pd.read_sql_query(
    "SELECT * FROM mobiles WHERE Date = ?;",
    conn,
    params=(today,)
)

conn.close()

In [3]:
emoji_pattern = re.compile(
    "["
    "\U0001F600-\U0001F64F"  # emoticons
    "\U0001F300-\U0001F5FF"  # symbols & pictographs
    "\U0001F680-\U0001F6FF"  # transport & map symbols
    "\U0001F1E0-\U0001F1FF"  # flags
    "\U00002500-\U00002BEF"  # misc symbols
    "\U00002700-\U000027BF"
    "\U0001F900-\U0001F9FF"
    "\U00002600-\U000026FF"
    "\U00002B00-\U00002BFF"
    "]+",
    flags=re.UNICODE
)

df['Description'] = df['Description'].astype(str).apply(lambda x: emoji_pattern.sub('', x))

In [4]:
df.columns

Index(['URL', 'Title', 'Price', 'Published_time', 'Published_Date',
       'Seller_name', 'Location', 'Division', 'Condition', 'Model', 'Brand',
       'Features', 'Description', 'Img_urls', 'Date'],
      dtype='object')

In [5]:
unwanted_keywords = [
    'sim', 'charger', 'চার্জার', 'সিম', 'watch', 'screen', 'bank',
    'case', 'cover', 'adapter', 'cable', 'headphone', 'earphone',
    'powerbank', 'keyboard', 'mouse',"vr","cooler"
]

pattern = '|'.join(unwanted_keywords)

df = df[
    ~df['Title'].str.contains(pattern, case=False, na=False)
]
df = df[df['Price'] >= 1000]
df = df[~(df['Model'] == 'Other Model')]



In [6]:

def to_english_digits(s):
        """Convert Bengali digits to English"""
        if s:
            return s.translate(str.maketrans("০১২৩৪৫৬৭৮৯", "0123456789"))
        return None
    

def extract_ram(text):
    if not isinstance(text, str):
        return None
    
    plus_slash_match = re.search(
        r'\b([০-৯0-9]{1,2})\s*\+\s*([০-৯0-9]{1,2})\s*/\s*[০-৯0-9]{2,4}\b',
        text
    )
    if plus_slash_match:
        ram1 = float(to_english_digits(plus_slash_match.group(1)))
        ram2 = float(to_english_digits(plus_slash_match.group(2)))
        ram_total = ram1 + ram2
        if 1 <= ram_total <= 24:  # Plausible RAM range
            return ram_total
    
    
    # PRIORITY 1: X/Y or X+Y format (capture FIRST number as RAM)
    # Matches: 8/128, 12/256, 6+64
    slash_match = re.search(
        r'\b([০-৯0-9]{1,2})\s*[/+]\s*[০-৯0-9]{2,4}\b'
        r'(?!\s*[/+])',  # Avoid 8/28/123
        text
    )
    
    if slash_match:
        ram = float(to_english_digits(slash_match.group(1)))
        if 1 <= ram <= 24:  # Valid RAM range
            return ram
    
    # PRIORITY 2: "XGB RAM" format
    # Matches: 8GB RAM, ৮ জিবি র‍্যাম
    ram_keyword_match = re.search(
        r'([০-৯0-9]+)\s*(?:GB|জিবি|gb)[,.:/ \t\-]*(?:RAM|র‍্যাম|রام)',
        text,
        flags=re.IGNORECASE
    )
    
    if ram_keyword_match:
        ram = float(to_english_digits(ram_keyword_match.group(1)))
        if 1 <= ram <= 24:
            return ram
    
    # PRIORITY 3: "RAM XGB" format (reverse)
    # Matches: RAM 8GB, র‍্যাম ৮জিবি
    keyword_ram_match = re.search(
        r'(?:RAM|র‍্যাম|राम)[,.:/ \t\-]*([০-৯0-9]+)\s*(?:GB|জিবি|gb)',
        text,
        flags=re.IGNORECASE
    )
    
    if keyword_ram_match:
        ram = float(to_english_digits(keyword_ram_match.group(1)))
        if 1 <= ram <= 24:
            return ram
    
    # PRIORITY 4: Standalone small number with GB (likely RAM)
    # Matches: 8GB, ৮জিবি (only if ≤16GB to avoid storage confusion)
    standalone_match = re.search(
        r'\b([০-৯0-9]{1,2})\s*(?:GB|জিবি|gb)\b',
        text,
        flags=re.IGNORECASE
    )
    
    if standalone_match:
        ram = float(to_english_digits(standalone_match.group(1)))
        if 1 <= ram <= 16:  # Only small values (likely RAM, not storage)
            return ram
    
    return None


# Apply to DataFrame
df['RAM'] = df.apply(lambda row: extract_ram(row['Title']) or extract_ram(row['Features']) or extract_ram(row['Description']), axis=1)



In [7]:
def extract_rom(text):
    if not isinstance(text, str):
        return None

    def to_english_digits(s):
        """Convert Bengali digits to English"""
        if s:
            return s.translate(str.maketrans("০১২৩৪৫৬৭৮৯", "0123456789"))
        return None
    
    
    # PRIORITY 2: X/YGB format (handles "8/12gb")
    # Matches: 8/128GB, 6/64gb, ৮/১২৮জিবি
    slash_gb_match = re.search(
        r'\b[০-৯0-9]{1,2}\s*[/+-]\s*([০-৯0-9]{2,4})\s*(?:GB|gb|জিবি|স্টোরেজ )',
        text,
        flags=re.IGNORECASE
    )
    
    if slash_gb_match:
        num = float(to_english_digits(slash_gb_match.group(1)))
        if 16 <= num <= 1024:
            return num
    

    # PRIORITY 1: X/Y or X+Y format (most reliable)
    # Matches: 8/128, 12/256, 6+64, ৮/১২৮
    # Avoids: 8/28/123 (stops at first /)
    slash_match = re.search(
        r'\b([০-৯0-9]{1,2})\s*[/+-]\s*([০-৯0-9]{2,4})\b'  # Changed: capture BOTH numbers
        r'(?!\s*[/+])',  # Negative lookahead: don't match if another / or + follows
        text
    )
    
    if slash_match:
        first_num = float(to_english_digits(slash_match.group(1)))
        second_num = float(to_english_digits(slash_match.group(2)))
        
        # Second number should be storage as its larger value
        if second_num >= 16 and second_num <= 1024:  # Valid storage range
            return second_num
    
    
    
    # PRIORITY 3: "XGB ROM/Storage" format
    # Matches: 128GB রম, 256 জিবি Storage
    storage_keyword_match = re.search(
        r'([০-৯0-9]+)\s*(?:GB|জিবি|gb)[,.:/ \t\-]*(?:ROM|Storage|রম|রোম)',
        text,
        flags=re.IGNORECASE
    )
    
    if storage_keyword_match:
        num = float(to_english_digits(storage_keyword_match.group(1)))
        if 16 <= num <= 1024:
            return num
    
    # PRIORITY 4: "ROM/Storage XGB" format (reverse order)
    # Matches: রম 128GB, Storage 256জিবি
    keyword_storage_match = re.search(
        r'(?:ROM|Storage|রম|রোম)[,.:/ \t\-]*([০-৯0-9]+)\s*(?:GB|জিবি|gb)',
        text,
        flags=re.IGNORECASE
    )
    
    if keyword_storage_match:
        num = float(to_english_digits(keyword_storage_match.group(1)))
        if 16 <= num <= 1024:
            return num
    
    # PRIORITY 5: Standalone large number with GB (likely storage)
    # Matches: 128GB, 256জিবি (only if >16GB to avoid RAM confusion)
    standalone_match = re.search(
        r'\b([০-৯0-9]{2,4})\s*(?:GB|জিবি|gb)\b',
        text,
        flags=re.IGNORECASE
    )
    
    if standalone_match:
        num = float(to_english_digits(standalone_match.group(1)))
        if 16 <= num <= 1024:  # Only large values (definitely storage, not RAM)
            return num
    
    return None

df['Storage'] = df.apply(lambda row: extract_rom(row['Title']) or extract_rom(row['Features']) or extract_rom(row['Description']), axis=1)


storage_options = ['64', '128', '256', '512']

df['Storage'] = df.apply(
    lambda row: row['Storage'] if pd.notna(row['Storage']) 
                else next(
                    (float(s) for s in storage_options 
                     if re.search(rf'(?<!\d){s}(?!\d)(?:\s*(?:GB|gb|GIG|Gig|জিবি))?\b', str(row['Title']))), 
                    None
                ),
    axis=1
)


In [8]:
df = df[df['RAM'] != df['Storage']]
df = df.dropna(subset=['RAM', 'Storage'], how='all')


In [9]:
df["Location"] = df["Location"].str.replace(",", "", regex=False)
df["Division"] = df["Division"].str.replace(" বিভাগ", "", regex=False)


In [10]:


results_file =project_root/'scraper'/'data'/'osm_results.json'

if os.path.exists(results_file):
    with open(results_file, "r", encoding="utf-8") as f:
        results = json.load(f)
else:
    results = {}


def get_lat_lon_osm(place, country="Bangladesh"):
    url = "https://nominatim.openstreetmap.org/search"
    params = {
        "q": f"{place}, {country}",
        "format": "json",
        "limit": 1
    }
    response = requests.get(url, params=params, headers={"User-Agent": "my-app"})
    data = response.json()
    if data:
        lat, lon = float(data[0]["lat"]), float(data[0]["lon"])
        print(f" Place: {place} => Lat: {lat}, Lon: {lon}")
        return lat, lon
    else:
        print(f"⚠️ Place: {place} => No result found")

        match = df[df["Location"] == place]
        if not match.empty:
            division = match.iloc[0]["Division"]
            print(f" Trying division '{division}' instead...")
            
            # try to get coordinates using division name
            params["q"] = f"{division}, {country}"
            response = requests.get(url, params=params, headers={"User-Agent": "my-app"})
            data = response.json()
            if data:
                lat, lon = float(data[0]["lat"]), float(data[0]["lon"])
                print(f" Division: {division} => Lat: {lat}, Lon: {lon}")
                return lat, lon



places = df['Location'].unique()


# -----------------------------
for i, place in enumerate(places, 1):
    if place not in results:
        print(f"\nRequest {i}/{len(places)}: Searching for '{place}'...")
        results[place] = get_lat_lon_osm(place)
        time.sleep(1)  # respect 1 request/sec


with open(results_file, "w", encoding="utf-8") as f:
    json.dump(results, f, ensure_ascii=False, indent=2)



df[["lat", "lon"]] = df["Location"].apply(lambda x: pd.Series(results.get(x, (None, None))))


In [11]:
df['Condition'] = df['Condition'].str.replace("ব্যবহৃত", "Used").str.replace("নতুন", "New")
df = df.dropna(subset=['Brand','Model'])


In [12]:
def extract_network(text):
    if not isinstance(text, str):
        return 'Unknown'
    text = text.lower()
    
    has_5g = bool(re.search(r'\b5g\b|৫জি', text))
    has_4g = bool(re.search(r'\b4g\b|৪জি', text))
    
    if has_5g:
        return '5G'
    elif has_4g:
        return '4G'
    else:
        return 'Unknown'
    
df['Network'] = df.apply(
    lambda row: extract_network(str(row['Features']) + ' ' + str(row['Description'])),
    axis=1
)

def extract_camera_type(text):
    if not isinstance(text, str):
        return None
    text = text.lower()
    
    if re.search(r'কোয়াড|quad|quad back camera ', text):
        return 'Quad'
    elif re.search(r'ট্রিপল ক্যামেরা|triple camera|triple back camera', text):
        return 'Triple'
    elif re.search(r'ডুয়েল ক্যামেরা|dual camera|dual back camera', text):
        return 'Dual'
    else:
        return None

df['Camera_Type'] = df.apply(
    lambda row: extract_camera_type(str(row['Features']) + ' ' + str(row['Description'])),
    axis=1
)


In [13]:

def fuzzy_contains(text, keywords, threshold=90):
    if not isinstance(text, str):
        return False
    for kw in keywords:
        if fuzz.partial_ratio(text.lower(), kw.lower()) >= threshold:
            return True
    return False

keywords = ['warranty', 'guarantee', 'গ্যারান্টি', 'ওয়ারেন্টি']

df['has_warranty'] = df['Description'].apply(
    lambda x: fuzzy_contains(x, keywords, threshold=90)
)


In [14]:
def extract_battery(text):
    """
    Extract battery capacity (in mAh) from text.
    Examples:
      - '5000 mAh battery'
      - '৬০০০ এমএএইচ ব্যাটারি'
      - 'Li-Po 4500mah non-removable'
    """
    if not isinstance(text, str):
        return None

    text = text.lower()

    # Common regex for capturing battery capacities
    battery_patterns = [
        # 1️⃣ "5000mAh battery", "৫০০০ এমএএইচ ব্যাটারি"
        r'([০-৯0-9]{3,5})\s*(?:mAh|এমএএইচ|এমএএইচ্|mah)\s*(?:battery|ব্যাটারি)?',

        # 2️⃣ "battery: 5000mah", "ব্যাটারি ৬০০০ এমএএইচ"
        r'(?:battery|ব্যাটারি)\s*[:\-]?\s*([০-৯0-9]{3,5})\s*(?:mAh|এমএএইচ|mah)?',

        # 3️⃣ Just standalone "5000 mAh" (no keyword)
        r'\b([০-৯0-9]{3,5})\s*(?:mAh|এমএএইচ|mah)\b',
    ]

    for pattern in battery_patterns:
        match = re.search(pattern, text, flags=re.IGNORECASE)
        if match:
            mah = int(to_english_digits(match.group(1)))
            if 500 <= mah <= 10000:  # Valid phone battery range
                return mah

    return None

df['Battery'] = df.apply(
    lambda row: extract_battery(row['Description']),
    axis=1
)

In [15]:
def extract_camera_pixel(text, ram=None, storage=None):
    """
    Extracts the FIRST camera specification from text.
    Handles formats like: 48mp, 12+3+5mp, 19mp+1mp, etc.
    Avoids matching RAM/Storage values like 6+128.
    """
    if not isinstance(text, str):
        return None
    
    text = text.strip()
    
    # Convert Bengali digits to English
    text = to_english_digits(text)
    
    # Comprehensive pattern to match camera specs
    # Matches: 48mp, 12+3+5mp, 19mp+1mp, 12+3+5 mp, etc.
    # Added \s* to allow flexible spacing before MP
    pattern = r'([0-9]+(?:\s*\+\s*[0-9]+)*)\s*(?:mp|MP|Mp|মেগাপিক্সেল|মেগা\s*পিক্সেল)(?:\b|$)'

    
    matches = re.finditer(pattern, text, re.IGNORECASE)
    
    for match in matches:
        camera_spec = match.group(1)
        # Clean up whitespace
        camera_spec_clean = re.sub(r'\s+', '', camera_spec)
        
        # Extract all numeric parts
        parts = [int(p) for p in camera_spec_clean.split('+') if p.isdigit()]
        
        if not parts:
            continue
            
        #  Skip if ANY part matches RAM or Storage exactly
        # This avoids matching "6+128" as camera specs
        if ram and any(p == ram for p in parts):
            continue
        if storage and any(p == storage for p in parts):
            continue
            
        #  Additional validation: camera MP values are usually < 200
        if all(p < 200 for p in parts):
            return camera_spec_clean  #  RETURN IMMEDIATELY on first match
    
    return None


df["Camera_Pixel"] = df.apply(
    lambda row: extract_camera_pixel(row["Description"], row["RAM"], row["Storage"]),
    axis=1
)

In [16]:
df['Condition'] = df['Condition'].apply(
    lambda x: 'New' if 'new' in str(x).lower() else x
)

In [17]:
# Function to fill with mode, excluding 'Unknown'
def fill_with_valid_mode(group):
    # Filter out 'Unknown' values
    valid_values = group[group != 'Unknown']
    if not valid_values.empty:
        mode_val = valid_values.mode()
        if not mode_val.empty:
            return group.fillna(mode_val[0]).replace('Unknown', mode_val[0])
    return group

df['Network'] = df.groupby(['Brand','Model'])['Network'].transform(fill_with_valid_mode)
df['Battery'] = df.groupby(['Brand','Model'])['Battery'].transform(fill_with_valid_mode)
df['Camera_Type'] = df.groupby(['Brand','Model'])['Camera_Type'].transform(fill_with_valid_mode)
df['Camera_Pixel'] = df.groupby(['Brand','Model'])['Camera_Pixel'].transform(fill_with_valid_mode)

In [18]:
df = df.dropna(subset=["RAM", "Storage", "Camera_Type", "Battery", "Camera_Pixel"])


In [19]:
seller_counts = df['Seller_name'].value_counts()
df['is_store'] = df['Seller_name'].map(lambda x: seller_counts[x] > 5)


In [20]:

# Clean models properly → remove None, NaN, empty string, "other model"
valid_models = [
    m for m in df['Model'].unique()
    if isinstance(m, str) and m.strip().lower() not in ["other model", "othermodel", "other", ""]
]


def detect_model_from_title(title, model_list):
    if not isinstance(title, str):
        return None
    
    title_lower = title.lower()

    for model in model_list:
        pattern = re.escape(model.lower())
        if re.search(pattern, title_lower):
            return model

    return None


def fix_models(df):
    # Replace various 'other' values safely
    df['Model'] = df['Model'].replace(
        [None, "None", "", "Other Model", "Other model"], 
        "other"
    )

    for idx, row in df.iterrows():

        if row['Model'] == "other":
            detected = detect_model_from_title(row['Title'], valid_models)

            if detected:
                df.at[idx, 'Model'] = detected

    return df


# Run on entire df
fix_models(df)
df=df[~df['Model'].str.contains('other')]

In [21]:
df = df[~(df['Brand'] == "Other Brand")]


In [22]:
CLEANED_DB_PATH=project_root/'scraper'/'data'/'cleaned_mobiles.db'


def init_cleaned_db():
    conn = sqlite3.connect(CLEANED_DB_PATH)
    c = conn.cursor()

    c.execute("""
        CREATE TABLE IF NOT EXISTS cleaned_mobiles (
            URL TEXT PRIMARY KEY,
            Title TEXT,
            Price REAL,
            Published_time TEXT,
            Published_Date TEXT,
            Seller_name TEXT,
            Location TEXT,
            Division TEXT,
            Condition TEXT,
            Model TEXT,
            Brand TEXT,
            Features TEXT,
            Description TEXT,
            Img_urls TEXT,
            Date TEXT,
            RAM REAL,
            Storage REAL,
            lat REAL,
            lon REAL,
            Network TEXT,
            Camera_Type TEXT,
            has_warranty INTEGER,
            Battery REAL,
            Camera_Pixel TEXT,
            is_store INTEGER
        )
    """)

    conn.commit()
    conn.close()

def get_existing_urls():
    conn = sqlite3.connect(CLEANED_DB_PATH)
    c = conn.cursor()
    c.execute("SELECT URL FROM cleaned_mobiles")
    urls = {row[0] for row in c.fetchall()}
    conn.close()
    return urls

def filter_new_rows(df):
    existing = get_existing_urls()
    return df[~df["URL"].isin(existing)]


def save_df_to_db(df):
    # convert list to JSON
    df["Img_urls"] = df["Img_urls"].apply(
        lambda x: json.dumps(x) if isinstance(x, list) else x
    )

    for col in ["has_warranty", "is_store"]:
        df[col] = df[col].astype(int)

    conn = sqlite3.connect(CLEANED_DB_PATH)
    df.to_sql("cleaned_mobiles", conn, if_exists="append", index=False)
    conn.close()


In [23]:
init_cleaned_db()  # Create DB if not exists

# Filter out rows that already exist
fresh_df = filter_new_rows(df)

save_df_to_db(fresh_df)

print(f"Inserted {len(fresh_df)} new rows into the database.")



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["Img_urls"] = df["Img_urls"].apply(
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df[col] = df[col].astype(int)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df[col] = df[col].astype(int)


Inserted 1105 new rows into the database.
