<a href="https://colab.research.google.com/github/Ahmed8aa/az_sc_tr/blob/main/EG_SC%26TR.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install aiohttp nest-asyncio pandas beautifulsoup4 azure-storage-blob

Collecting azure-storage-blob
  Downloading azure_storage_blob-12.23.1-py3-none-any.whl.metadata (26 kB)
Collecting azure-core>=1.30.0 (from azure-storage-blob)
  Downloading azure_core-1.31.0-py3-none-any.whl.metadata (39 kB)
Collecting isodate>=0.6.1 (from azure-storage-blob)
  Downloading isodate-0.7.2-py3-none-any.whl.metadata (11 kB)
Downloading azure_storage_blob-12.23.1-py3-none-any.whl (405 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m405.6/405.6 kB[0m [31m8.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading azure_core-1.31.0-py3-none-any.whl (197 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m197.4/197.4 kB[0m [31m8.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading isodate-0.7.2-py3-none-any.whl (22 kB)
Installing collected packages: isodate, azure-core, azure-storage-blob
Successfully installed azure-core-1.31.0 azure-storage-blob-12.23.1 isodate-0.7.2


In [2]:
import pandas as pd
import numpy as np
from bs4 import BeautifulSoup
import aiohttp
import asyncio
from urllib.parse import urljoin
import re
from datetime import datetime
import random
import logging
import nest_asyncio

# from google.colab import drive
# drive.mount('/content/drive')

# Configure logging
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')

# Apply the async fix for Colab
nest_asyncio.apply()

class WebScraper:
    def __init__(self):
        # Define a list of user agents
        self.user_agents = [
            'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:91.0) Gecko/20100101 Firefox/91.0',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.63 Safari/537.36',
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.63 Safari/537.36 Edg/93.0.961.47',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.63 Safari/537.36 OPR/79.0.4143.50',
            'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.63 Safari/537.36 Vivaldi/4.1',
            'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:54.0) Gecko/20100101 Firefox/54.0',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/601.7.7 (KHTML, like Gecko) Version/9.1.2 Safari/601.7.7',
        ]
        self.current_user_agent_index = 0  # Track current user agent index

    def get_next_user_agent(self):
        user_agent = self.user_agents[self.current_user_agent_index]
        self.current_user_agent_index = (
            self.current_user_agent_index + 1) % len(self.user_agents)
        return user_agent

    async def fetch_page(self, session, url, max_retries=1, initial_delay=2):
        user_agent = self.get_next_user_agent()  # Store the updated user agent
        headers = {
            "User-Agent": user_agent,
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
            "Accept-Language": "en-US,en;q=0.5",
            "Accept-Encoding": "gzip, deflate, br",
            "Connection": "keep-alive",
            "Upgrade-Insecure-Requests": "1"
        }
        captcha_indicators = [
            "captcha", "i am not a robot", "robot",
            "prove you are human", "Enter the characters"
        ]
        delay = initial_delay
        for attempt in range(max_retries):
            try:
                async with session.get(url, headers=headers) as response:
                    # First check the status code
                    if response.status == 403:
                        print("CAPTCHA detected via HTTP status 403.")
                        return None  # You can handle CAPTCHA solution here if needed
                    html = await response.text()  # Directly fetch content

                    if any(indicator in str(response.url).lower() for indicator in captcha_indicators):
                        print("CAPTCHA detected!")

                    logging.info(f"Successfully fetched page from {url[-4:]}")
                    return html  # Return the fetched HTML

            except asyncio.TimeoutError:
                logging.error(f"Timeout error occurred while fetching {url}. Retrying...")
                await asyncio.sleep(delay)
                delay *= 1.5  # Exponential backoff
            except aiohttp.ClientError as e:
                logging.error(f"Error fetching {url}: {str(e)}")
                await asyncio.sleep(delay)
                delay *= 1.5  # Exponential backoff

        logging.error(f"Max retries reached for {url}")
        return None

    def clean_text(self, text):
        text = re.sub(r'[\u200f\u200e]', '', text)
        text = re.sub(r'\s+', ' ', text)
        return text.strip()

    def remove_key_from_value(self, key, value):
        key_cleaned = self.clean_text(key)
        value_cleaned = self.clean_text(value)
        if value_cleaned.startswith(key_cleaned):
            return value_cleaned[len(key_cleaned):].strip(" :")
        return value_cleaned

    async def scrape_product_data(self, session, product_url, region):
        html = await self.fetch_page(session, product_url)
        if not html:
            return None
        soup = BeautifulSoup(html, 'html.parser')

        # Extracting Price
        price_selector_1 = "#corePriceDisplay_desktop_feature_div .a-price-whole"
        price_selector_2 = "div.a-section.a-spacing-micro span.a-price.a-text-price.a-size-medium span.a-offscreen"

        price_element = soup.select_one(price_selector_1)
        if price_element:
            price = price_element.get_text(strip=True)
        else:
            price_element = soup.select_one(price_selector_2)
            price = price_element.get_text(
                strip=True) if price_element else np.nan

        # Extract discount
        discount = None

        # Try various selectors
        possible_selectors = [
            "span.a-color-price",
            ".savingsPercentage"
        ]

        for selector in possible_selectors:
            discount_elements = soup.select(selector)
            for element in discount_elements:
                discount_text = element.get_text(strip=True)

                # Flexible regex to capture both negative and non-negative percentages
                discount_match = re.search(r'(-?\d+%)', discount_text)

                if discount_match:
                    discount = discount_match.group(1)
                    break
            if discount:
                break

        # Rating extraction logic
        rate_element = soup.select_one("span.a-icon-alt")
        if rate_element and "out of 5 stars" in rate_element.text:
            rate = rate_element.text.replace("out of 5 stars", "").strip()
        else:
            rate = np.nan
        site = f"amazon_{region.lower()}"
        product_data = {
            "date_column": datetime.today().strftime('%Y-%m-%d'),
            "product_url": product_url,
            "site": site,  # Changed from amazon_sa to amazon_us
            "category": "mobile phones",
            "Title": soup.select_one("#productTitle").text.strip() if soup.select_one("#productTitle") else np.nan,
            "Rate": rate,
            "Price": price,
            "Discount": discount,
            "Image URL": soup.select_one("#imgTagWrapperId img")['src'] if soup.select_one("#imgTagWrapperId img") else np.nan,
            "Description": soup.select_one("#feature-bullets").text.strip() if soup.select_one("#feature-bullets") else np.nan
        }

        tables = {
            'first_table': '.a-normal.a-spacing-micro',
            'tech_specs': '#productDetails_techSpec_section_1',
            'right_table': '#productDetails_detailBullets_sections1',
            'new_table': 'ul.a-unordered-list.a-nostyle.a-vertical.a-spacing-none.detail-bullet-list'
        }

        for table_name, selector in tables.items():
            table = soup.select_one(selector)
            if table:
                if table_name == 'new_table':
                    items = table.find_all('li')
                    for item in items:
                        key_element = item.select_one('span.a-text-bold')
                        value_element = item.find(
                            'span', class_=lambda x: x != 'a-text-bold')
                        if key_element and value_element:
                            key = self.clean_text(
                                key_element.text.strip().replace(':', ''))
                            value = self.clean_text(value_element.text.strip())
                            value = self.remove_key_from_value(key, value)
                            product_data[key] = value
                else:
                    rows = table.find_all('tr')
                    for row in rows:
                        key_element = row.find(['th', 'td'])
                        value_element = row.find_all(
                            'td')[-1] if row.find_all('td') else None
                        if key_element and value_element:
                            key = self.clean_text(
                                key_element.get_text(strip=True))
                            value = self.clean_text(
                                value_element.get_text(strip=True))
                            product_data[key] = value

        reviews = []
        review_cards = soup.select("div[data-hook='review']")
        for review in review_cards[:5]:
            reviewer_name = review.select_one(
                "span.a-profile-name").text.strip()
            review_rating = review.select_one(
                "i.a-icon-star span.a-icon-alt").text.strip().replace("out of 5 stars", "")
            review_date = review.select_one("span.review-date").text.strip()
            review_text = review.select_one(
                "span[data-hook='review-body']").text.strip()
            reviews.append({
                "Reviewer": reviewer_name,
                "Rating": review_rating,
                "Date": review_date,
                "Review": review_text
            })

        product_data['reviews'] = reviews
        return product_data

    async def scrape_page_products(self, session, page_url, region):
        html = await self.fetch_page(session, page_url)
        if not html:
            return [], None

        soup = BeautifulSoup(html, 'html.parser')

        product_links = soup.find_all(
            'a', class_='a-link-normal s-underline-text s-underline-link-text s-link-style a-text-normal')
        if region == 'eg':
            base_url = 'https://www.amazon.eg'
        elif region == 'sa':
            base_url = 'https://www.amazon.sa'
        elif region == 'us':
            base_url = 'https://www.amazon.com'
        elif region == 'jp':
            base_url = 'https://www.amazon.co.jp'
        else:
            raise ValueError(f"Unsupported region: {region}")

        list_products_links = [urljoin(base_url, link.get('href'))
                               for link in product_links if link.get('href')]

        next_button = soup.select_one("a.s-pagination-next")
        next_page_url = urljoin(
            base_url, next_button['href']) if next_button and next_button.get('href') else None

        return list_products_links, next_page_url

    async def scrape_all_products(self, start_page_url, region, max_pages=17):
        all_product_links = set()
        current_page_url = start_page_url
        page_number = 1
        pages_scraped = 0
        async with aiohttp.ClientSession() as session:
            while current_page_url and page_number <= max_pages:
                logging.info(f"Scraping page {page_number}: {current_page_url}")
                products, next_page = await self.scrape_page_products(session, current_page_url, region)

                if products:
                    all_product_links.update(products)
                    logging.info(f"Found {len(products)} product links on page {page_number}.")
                else:
                    logging.info(f"No products found on page {page_number}. Retrying...")
                    # Wait for 5 seconds before retrying
                    await asyncio.sleep(5)
                    continue
                # Increment the pages scraped counter
                pages_scraped += 1

                if pages_scraped >= 10:  # After every 10 pages
                    # Random delay between 10 to 30 seconds
                    random_delay = random.uniform(5, 12)
                    logging.info(f"Pausing for {random_delay:.2f} seconds after scraping {pages_scraped} pages.")
                    await asyncio.sleep(random_delay)
                    pages_scraped = 0  # Reset the counter after the delay

                if not next_page:
                    logging.info("No more pages to scrape.")
                    break

                current_page_url = next_page
                page_number += 1
                await asyncio.sleep(random.uniform(3, 6))

            logging.info(f"Total product links found: {len(all_product_links)}")

            tasks = [self.scrape_product_data(
                session, url, region) for url in all_product_links]
            all_product_data = await asyncio.gather(*tasks)

        product_df = pd.DataFrame(
            [data for data in all_product_data if data is not None])

        # product_df.to_csv(f'/content/drive/My Drive/Egypt-Data/Amazon_{region.upper()}_{page_number-1}_pages-Row.csv', index=False)
        return product_df


def main():
    start_page_url = "https://www.amazon.eg/s?i=electronics&rh=n%3A21832883031%2Cp_123%3A110955%7C1500397%7C329744%7C338933%7C339703%7C367594%7C380758%7C46655%7C559198%7C568349&dc&fs=true&language=en&qid=1726254193&rnid=91049076031&ref=sr_pg_1"
    scraper = WebScraper()
    product_df = asyncio.run(scraper.scrape_all_products(
        start_page_url, region="eg", max_pages=25))
    logging.info(f"Script completed.")
    return product_df



In [3]:
# Call the main function
df_f = main()

In [4]:
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
import os
# Get today's date
today_date = datetime.now().date()
# Azure Storage connection string
connect_str = "DefaultEndpointsProtocol=https;AccountName=ynwa;AccountKey=aTLtGZuymmaAirlsL1/39g3dDjaQvBFu3lHzQaUrY0o6LDdGduAW8Wbk8qI7fuilCKS8chuiklq7+AStBN3UdA==;EndpointSuffix=core.windows.net"

# Initialize BlobServiceClient
blob_service_client = BlobServiceClient.from_connection_string(connect_str)

# Name of the container and blob
container_name = "huayra"  # Replace with your container name
folder_name = "raw"

# Name of the blob (file) with today's date
blob_name = f"{folder_name}/amazon_eg_raw{today_date}.csv"  # Example: scraped_data_2024-10-07.csv

# Create a container if it doesn't exist
container_client = blob_service_client.get_container_client(container_name)
try:
    container_client.create_container()
except Exception as e:
    print(f"Container already exists: {e}")


# Save the CSV locally (you can skip this if you already have the file path)
csv_file_path = f"scraped_data_{today_date}.csv"
df_f.to_csv(csv_file_path, index=False)

# Upload the CSV file to Blob Storage with today's date in the name
with open(csv_file_path, "rb") as data:
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
    blob_client.upload_blob(data, overwrite=True)
    print(f"CSV file {blob_name} uploaded successfully.")


Container already exists: The specified container already exists.
RequestId:58290efb-d01e-0026-31b2-20b181000000
Time:2024-10-17T16:33:27.9589888Z
ErrorCode:ContainerAlreadyExists
Content: <?xml version="1.0" encoding="utf-8"?><Error><Code>ContainerAlreadyExists</Code><Message>The specified container already exists.
RequestId:58290efb-d01e-0026-31b2-20b181000000
Time:2024-10-17T16:33:27.9589888Z</Message></Error>
CSV file raw/amazon_eg_raw2024-10-17.csv uploaded successfully.


In [16]:
import pandas as pd
import numpy as np
from datetime import datetime
import re


df=df_f.copy()
# Drop rows where the 'Title' column has no values
df = df.dropna(subset=["Title"], how='all', axis=0)

# Define keywords related to phones that may appear in 'Title' or 'Description'
phone_keywords = ['phone', 'smartphone', 'g', 'gb',]

# Filter out rows that are likely not phones based on the absence of phone-related keywords
non_phone_df = df[~(
    df['Title'].str.lower().str.contains('|'.join(phone_keywords), case=False, na=False) |
    df['Description'].str.lower().str.contains('|'.join(phone_keywords), case=False, na=False)
)]

# Display the non_phone_df DataFrame "For Debugging"
# non_phone_df

# Keep rows that are likely phones based on the presence of phone-related keywords
filtered_df = df[
    df['Title'].str.lower().str.contains('|'.join(phone_keywords), case=False, na=False) |
    df['Description'].str.lower().str.contains('|'.join(phone_keywords), case=False, na=False)
]

# Drop columns where all values are NaN
filtered_df = filtered_df.dropna(axis=1, how='all')

# Fill NaN values in 'column' with values from 'same column with different name'
if 'OS' in filtered_df.columns:
    filtered_df['Operating System'] = filtered_df['Operating System'].fillna(filtered_df['OS'])

filtered_df.loc[:,'Connectivity technologies'] = filtered_df['Connectivity technologies'].fillna(filtered_df['Connectivity Technology'])
# filtered_df.loc[:,'Screen Size'] = filtered_df['Screen Size'].fillna(filtered_df['Standing screen display size'])
filtered_df.loc[:,'Wireless Provider'] = filtered_df['Wireless Provider'].fillna(filtered_df['Wireless communication technologies'])
# filtered_df.loc[:,'Other display features'] = filtered_df['Other display features'].fillna(filtered_df['Display resolution'])

# Define a function to fill in 'Brand Name'
def fill_brand_name(row):
    if pd.notna(row['Brand Name']):
        return row['Brand Name']
    elif pd.notna(row['Manufacturer']):
        return row['Manufacturer']
    else:
        return row['Title'].split()[0]  # Take the first word of the title

# Apply the function to each row in the DataFrame
filtered_df.loc[:,'Brand Name'] = filtered_df.apply(fill_brand_name, axis=1)

# Function to extract RAM and Storage
def extract_memory_capacity(row):
    title = row['Title'].lower()
    description = row['Description'].lower() if pd.notna(row['Description']) else "" # Handle Nan

    text = title + " " + description

    # Find all 'number GB/gb' in the title
    matches = re.findall(r'(\d+)\s*(GB|gb)', text)

    # Initialize variables for RAM and storage
    ram_capacity = None
    storage_capacity = None

    for match in matches:
        value, unit = match  # Get the value and unit

        # Construct the full capacity string
        capacity_str = f"{value}{unit}"

        # Check proximity context for RAM
        if 'ram' in title:
            ram_match = re.search(r'(\d+)\s*(GB|gb)\s*ram', title)
            if ram_match and f"{ram_match.group(1)}{ram_match.group(2)}" == capacity_str:
                ram_capacity = capacity_str

        # Check proximity context for Storage (storage/rom)
        if 'storage' in title or 'rom' in title:
            storage_match = re.search(r'(\d+)\s*(GB|gb)\s*(storage|rom)', title)
            if storage_match and f"{storage_match.group(1)}{storage_match.group(2)}" == capacity_str:
                storage_capacity = capacity_str

        # If no explicit mention of 'ram' or 'storage', make an educated guess
        if not ram_capacity and int(value) < 16:
            ram_capacity = capacity_str
        elif not storage_capacity and int(value) >= 16:
            storage_capacity = capacity_str

    # Update the 'storage' column if it's null and storage value is found
    if pd.isnull(row['Memory Storage Capacity']) and storage_capacity:
        row['Memory Storage Capacity'] = storage_capacity

    # Update the 'ram_gb' column if it's null and RAM value is found
    if pd.isnull(row['RAM Memory Installed']) and ram_capacity:
        row['RAM Memory Installed'] = ram_capacity

    return row

# Apply the function to rows with missing storage values
filtered_df = filtered_df.apply(extract_memory_capacity, axis=1)


# Define a dictionary for popular operating systems based on the brand
popular_os = {
    'realme': 'Android',
    'oppo': 'Android',
    'nokia': 'Android',  # Newer Nokia smartphones
    'samsung': 'Android',
    'xiaomi': 'Android'
}

# Function to fill missing OS based on brand
def fill_operating_system(row):
    if pd.isnull(row['Operating System']) and row['Brand Name'].lower() in popular_os:
        row['Operating System'] = popular_os[row['Brand Name'].lower()]  # Fill with popular OS
    return row

# Apply the function to the DataFrame
filtered_df = filtered_df.apply(fill_operating_system, axis=1)

# Check the result
filtered_df[['Operating System']].isna().value_counts()

# Drop unnecessary columns
columns_to_drop = ['OS', 'Connectivity Technology',
                   'Wireless communication technologies']

# # Get the range of columns between 'Country of origin' and 'Display Type'
# start_series1 = filtered_df.columns.get_loc('Country of origin')
# end_series1 = filtered_df.columns.get_loc('Display Type')


# # Get the range of columns between 'Product Dimensions' and 'Number Of Lithium Ion Cells'
# start_series2 = filtered_df.columns.get_loc('Product Dimensions')
# end_series12 = filtered_df.columns.get_loc('Number Of Lithium Ion Cells')

# # Add those columns to the drop list
# columns_to_drop.extend(filtered_df.columns[start_series1:end_series1+1])
# columns_to_drop.extend(filtered_df.columns[start_series2:end_series12+1])

# Drop the columns
filtered_df = filtered_df.drop(columns=columns_to_drop)


# Filter rows where 'Model Name' is NaN
filtered_model_df = filtered_df[filtered_df['Model Name'].isna()]
filtered_model_df[['Title', 'Model Name', 'Brand Name']].head(15)


# Function to clean the text by stripping spaces and converting to lower or upper case
def clean_text(df, columns):
    for col in columns:
        df.loc[:, col] = df[col].str.strip().str.lower()
    return df

# Generalized function to extract the model name using regex and specific word counts
def extract_model_name(title, regex_pattern=None, word_count=3, capitalize=True):
    if pd.isna(title):
        return None

    title = title.lower() if capitalize else title.upper()

    if regex_pattern:
        model_match = re.search(regex_pattern, title)
        if model_match:
            return model_match.group(0).title() if capitalize else model_match.group(0).upper()

    # Default case: take the first few words
    return ' '.join(title.split()[:word_count]).title() if capitalize else ' '.join(title.split()[:word_count]).upper()

# Define brand-specific model extraction logic
def get_extraction_rule(brand, title):
    title_lower = title.lower()

    if brand == 'oppo':
        return {'regex': None, 'word_count': 4 if 'reno' in title_lower else 2}
    elif brand == 'realme':
        return {'regex': None, 'word_count': 4 if 'pro' in title_lower else 3}
    elif brand == 'nokia':
        return {'regex': r'^(nokia\s*\w+|\w+)', 'word_count': 3}
    elif brand == 'redmi':
        return {'regex': r'redmi\s*\d+[a-z]*', 'word_count': 3}
    elif brand == 'samsung':
        return {'regex': r'(galaxy\s+\w+\s*\d*\s*(5g|4g)?)', 'word_count': 3}
    elif brand == 'honor':
        return {'regex': r'(HONOR [\w\s]+?)(\s*\d+GB|\s*(?:4G|5G|LTE)|$)', 'word_count': 3, 'capitalize': False}
    elif brand == 'xiaomi':
        return {'regex': None, 'word_count': 5 if 'redmi' in title_lower else 3}
    elif brand == 'apple':
        return {'regex': r'^([a-z\s]+?)(\d+)\s*(\w*)\s*(pro|plus)?', 'word_count': 3}
    else:
        return {'regex': None, 'word_count': 3}

# Apply the model extraction rules based on the brand
def apply_model_extraction(row):
    brand = row['Brand Name']
    title = row['Title']

    if pd.notna(row['Model Name']):
        return row['Model Name']  # Keep existing model name if available

    rule = get_extraction_rule(brand, title)
    return extract_model_name(title, regex_pattern=rule.get('regex'), word_count=rule.get('word_count', 3), capitalize=rule.get('capitalize', True))

# Apply the cleaning function and the model extraction to the DataFrame
filtered_df = clean_text(filtered_df, ['Title', 'Brand Name'])
filtered_df.loc[:,'Model Name'] = filtered_df.apply(apply_model_extraction, axis=1)


# List of columns to exclude from conversion
exclude_columns = ['Image URL', 'product_url']

# Get object columns excluding the specified ones
object_columns = filtered_df.select_dtypes(include=['object']).columns
columns_to_clean = [col for col in object_columns if col not in exclude_columns]

# Converting all applicable object columns to lowercase and stripping whitespace
for col in columns_to_clean:
    # Ensure the column is of string type before applying string methods
    filtered_df.loc[:, col] = filtered_df[col].astype(str).str.lower().str.strip()


# Select relevant columns
final_df = filtered_df[['date_column','site','category','Brand Name', 'Model Name', 'Title', 'Price', 'Operating System',
                                 'RAM Memory Installed', 'Memory Storage Capacity', 'Screen Size',
                                 'Resolution', 'Refresh Rate', 'CPU Speed', 'Connectivity technologies',
                                 'CPU Model', 'Color', 'Wireless Provider',
                                 'Cellular Technology', 'reviews', 'Rate', 'Discount','product_url','Image URL','ASIN','Batteries','Item model number']]

final_df = final_df.copy()

final_df.rename(columns={
    'Brand Name':'brand', 'Model Name':'model_name', 'Title':'product_title',
    'Price':'price_egp', 'Operating System':'os','RAM Memory Installed' : 'ram_gb',
    'Memory Storage Capacity':'storage', 'Screen Size':'screen_size_in',
    'Resolution':'resolution', 'Refresh Rate':'refresh_rate_hz', 'CPU Speed':'cpu_speed_ghz',
    'Connectivity technologies':'connectivity_technology','CPU Model':'cpu_model',
    'Color':'color','Wireless Provider':'wireless_carrier',
    'Cellular Technology':'cellular_technology', 'reviews':'all_reviews',
    'Rate':'rate', 'Discount':'discount','Image URL':'image_url','ASIN':'asin','Batteries':'batteries','Item model number':'model_number'
},inplace=True)



def clean_col_val(df, col, remove_patterns=None, strip=True):
    # Remove the provided patterns
    if remove_patterns:
        for pattern in remove_patterns:
            df.loc[:,col] = df[col].str.replace(pattern, "", regex=True)

    # Optionally strip whitespace
    if strip:
        df.loc[:,col] = df[col].str.strip()

    return df

final_df = clean_col_val(final_df, "refresh_rate_hz", remove_patterns=["hz|hertz|ghz"])
final_df = clean_col_val(final_df, "discount", remove_patterns=["-|%"])


def clean_price(price):
    if pd.isna(price):
        return price  # If the price is NaN return as is

    # Define a list of unwanted characters and words to remove
    unwanted_chars = [',', '₹', '$', '£', '€', '₣', '¥']
    unwanted_words = ['egp', 'usd', 'inr', 'eur', 'gbp', 'aud', 'cny']

    # Remove unwanted words and characters
    original_price = price  # original price for checking
    price = price.lower()
    for word in unwanted_words:
        price = re.sub(r'\b' + re.escape(word) + r'\b', '', price)
    for char in unwanted_chars:
        price = price.replace(char, '')

    # Remove trailing '.00' or similar patterns for prices with currency codes
    if any(word in original_price.lower() for word in unwanted_words):
        # Remove trailing .00 or .xx
        price = re.sub(r'(\d+)(?:,\d{3})*(?:\.\d{2})$', r'\1', price)
    else:
        # For other prices, remove only commas
        price = price.replace(',', '')

    # Ensure only numeric characters are kept
    price = re.sub(r'\D', '', price)

    # Remove any leading or trailing whitespace
    price = price.strip()

    return price

# Apply the function to the 'price_egp' column
final_df['price_egp'] = final_df['price_egp'].apply(clean_price)

# Fill NaN values of network_type from product_title
final_df["cellular_technology"] = final_df["cellular_technology"].fillna(final_df["product_title"].str.extract('( 4g| 5g)', flags=re.IGNORECASE).squeeze())
final_df["cellular_technology"] = final_df["cellular_technology"].fillna(final_df["model_name"].str.extract('( 4g| 5g)', flags=re.IGNORECASE).squeeze())
# Remove network_type from the mode_name
final_df.loc[:,"model_name"] = final_df["model_name"].str.replace('( 4g| 5g)',"",regex=True)

def format_samsung_model(model):
    model = model.lower()

    # Extract special terms like FE, Ultra, Plus, 5G, 4G
    special_terms = re.findall(r'\b(fe|ultra|plus|5g|4g)\b', model, re.IGNORECASE)
    special_terms = list(dict.fromkeys(special_terms))  # Remove duplicates while maintaining order
    special_terms_str = ' '.join(special_terms).upper()

    # Remove unwanted words and characters
    model = re.sub(r'\b(samsung|galaxy)\b', '', model).strip()
    model = re.sub(r'\b(5g|4g)\b', '', model).strip()  # Remove 5G/4G here to prevent duplication

    # Extract core model name
    if 'note' in model:
        match = re.search(r'note\s*(\d*)\s*(\w*)', model)
        if match:
            note_num, note_suffix = match.groups()
            core_model = f"Note {note_num} {note_suffix}".strip().title()
    else:
        match = re.search(r'([a-z]+\s*\d+(?:\s*[a-z]+)?)', model)
        if match:
            core_model = match.group(1).strip().title()
        else:
            core_model = model.strip().title()

    # Remove special terms from core_model if they're already present
    for term in special_terms:
        core_model = re.sub(rf'\b{term}\b', '', core_model, flags=re.IGNORECASE).strip()

    # Combine core model and special terms
    result = f'{core_model} {special_terms_str}'.strip()

    return result.lower()

def format_realme_model(model):
    model = model.lower()

    # Remove brand name 'realme'
    model = re.sub(r'\b(realme)\b', '', model).strip()

    # Replace '+' with 'plus'
    model = re.sub(r'\+', ' plus', model)

    # Remove extra descriptors like "dual-sim" and "dual"
    model = re.sub(r'\bdual[-\s]?sim\b', '', model)
    model = re.sub(r'\bdual\b', '', model)

    # Remove any extra descriptive information in parentheses or after commas
    model = re.sub(r'\(.*?\)', '', model).strip()
    model = re.sub(r',.*$', '', model).strip()

    # Handle hyphens between numbers and specs
    model = re.sub(r'(\d+)-(\d+)', r'\1 \2', model)

    # Extract the core model name, keeping '4G' or '5G' if present
    model = re.sub(r'\b4g\b', '4g', model)
    model = re.sub(r'\b5g\b', '5g', model)

    return model.strip()

def format_xiaomi_brand(model):
    model = model.lower()

    # Ensure 'xiaomi' is removed, but not 'redmi'
    model = re.sub(r'\b(xiaomi|mi|xioami)\b', '', model).strip()

    # Handle hyphens between numbers and specs  (e.g., 12-128 becomes 12 128)
    model = re.sub(r'(\d+)-(\d+)', r'\1 \2', model)

    # Remove extra descriptors like "dual-sim" and "dual"
    model = re.sub(r'\bdual[-\s]?sim\b', '', model)
    model = re.sub(r'\bdual\b', '', model)

    # Replace '+' with 'plus'
    model = re.sub(r'\+', ' plus', model)

    # Remove colors (e.g., "midnight black", "shiny gold")
    model = re.sub(r'\b(?:midnight|black|blue|gold|silver|white|gray|green|red|orange|pink|purple|yellow)\b', '', model)

    # Remove memory specs (e.g., "8gb", "128gb")
    model = re.sub(r'\b\d+gb\b', '', model)

    # Remove any extra descriptive information in parentheses, after commas, or after hyphens
    model = re.sub(r'\(.*?\)', '', model).strip()
    model = re.sub(r',.*$', '', model).strip()
    model = re.sub(r'[\-\/].*$', '', model).strip()

    # Handle hyphens between numbers and specs (e.g., 12-128 becomes 12 128)
    model = re.sub(r'(\d+)-(\d+)', r'\1 \2', model)

    # Remove extra years (e.g., '2022') or other numeric descriptors after core model
    model = re.sub(r'\b\d{4}\b', '', model).strip()

    # Keep '4G', '5G', 'NE', etc. if present
    model = re.sub(r'\b4g\b', '4g', model)
    model = re.sub(r'\b5g\b', '5g', model)
    model = re.sub(r'\bne\b', 'ne', model)

    # Remove any remaining trailing/leading spaces and return cleaned model
    return model.strip()


def format_honor_model(model):
    model = model.lower().strip()

    # Remove the brand name 'honor'
    model = re.sub(r'\bhonor\b', '', model).strip()

    # Remove any extra descriptive information in parentheses, after commas, or after hyphens
    model = re.sub(r'\(.*?\)', '', model).strip()
    model = re.sub(r',.*$', '', model).strip()
    model = re.sub(r'[\-\/].*$', '', model).strip()

    # Replace '+' with 'plus'
    model = re.sub(r'\+', ' plus', model)

    # Handle 'pro' and 'plus'
    model = re.sub(r'\bpro\b', 'pro', model)
    model = re.sub(r'\bplus\b', 'plus', model)

    # Remove memory specs, colors, and other unnecessary information
    model = re.sub(r'\b\d+gb\b', '', model).strip()  # Remove '4gb', '8gb', etc.
    model = re.sub(r'\b\d+\+\d+\b', '', model).strip()  # Remove '12+512'
    model = re.sub(r'\b(?:titanium|silver|gold|black|blue|white|red|green|pink|grey|purple|yellow)\b', '', model).strip()  # Remove colors

    # Ensure 'plus' is included in the model name if present
    model = re.sub(r'\bplus\b', 'plus', model)

    # Final cleanup: remove extra spaces and return formatted model name
    model = re.sub(r'\s+', ' ', model)  # Replace multiple spaces with a single space

    return model.strip()


def format_apple_model(model):
    """
    Clean and format the model name for Apple products, including core model number and important descriptors.
    """
    model = model.lower().strip()

    # Remove the brand name 'apple' and 'iphone'
    model = re.sub(r'\bapple\b', '', model).strip()
    model = re.sub(r'\biphone\b', '', model).strip()

    # Replace hyphens with spaces
    model = re.sub(r'-', ' ', model)

    # Remove memory specs and additional descriptors
    model = re.sub(r'\b\d+gb\b', '', model)
    model = re.sub(r'\b(?:blue|starlight|facetime|gre|with|new)\b', '', model)

    # Remove extra descriptive information in parentheses or after hyphens
    model = re.sub(r'\(.*?\)', '', model).strip()
    model = re.sub(r',.*$', '', model).strip()

    # Split by spaces to extract core model name and descriptors
    parts = model.split()

    # Extract the core model number or name and important descriptors ('pro', 'max')
    core_model = []
    for part in parts:
        if re.match(r'^\d+', part):  # Check if part starts with a number
            core_model.append(part)
        elif part in ['pro', 'max', 'mini', 'plus']:  # Important descriptors
            core_model.append(part)

    return ' '.join(core_model).strip()

def format_oppo_model(model):
    model = model.lower()

    # Remove brand name 'oppo'
    model = re.sub(r'\b(oppo)\b', '', model).strip()

    # Remove any descriptive words
    model = re.sub(r'(android|smartphone|dual sim|mobile|\.\.\.|glowing|black|blue|uae|version)', '', model, flags=re.IGNORECASE)

    # Remove numbers followed by 'gb' or 'ram'
    model = re.sub(r'\d+\s*(gb|ram)', '', model, flags=re.IGNORECASE)

    # Initialize core_model with cleaned model string
    core_model = model.strip()

    # Extract the core model name (Reno series or A series)
    reno_match = re.search(r'(reno\s*\d+\s*\w*)', model, re.IGNORECASE)
    a_series_match = re.search(r'(a\d+\s*\w*)', model, re.IGNORECASE)

    if reno_match:
        core_model = reno_match.group(1)
    elif a_series_match:
        core_model = a_series_match.group(1)

    # Check for and append 4G/5G if present
    network_match = re.search(r'(4g|5g)', model, re.IGNORECASE)
    if network_match:
        core_model += ' ' + network_match.group(1).lower()

    return core_model.strip()

def format_nokia_model(model, product_title):
    model = model.lower()

    if model == 'nokia':
        product_title = product_title.lower()

        # Replace '+' with 'plus'
        product_title = re.sub(r'\+', ' plus', product_title)

        # Remove 'nokia' and split the title into words
        product_title = re.sub(r'\bnokia\b', '', product_title).strip()
        words = product_title.split()

        if len(words) == 0:
            return ''  # Return empty if no words are left after removing 'nokia'

        # Handle cases where we have fewer than 2 words remaining
        if len(words) == 1:
            return words[0]

        # Extract the second word and the third if it's '4G' or '5G'
        base_model = words[0] + ' ' + words[1]

        if len(words) > 2 and words[2] in ['4g', '5g']:
            base_model += ' ' + words[2]
        return base_model.strip().lower()
    else:
        # Replace '+' with 'plus'
        model = re.sub(r'\+', ' plus', model)
        # Remove 'nokia'
        model = re.sub(r'\bnokia\b', '', model).strip()
        return model.strip()


def format_infinix_model(model):
    model = model.lower()

    # Replace '+' with 'plus'
    model = re.sub(r'\+', ' plus', model)

    # Remove the brand name 'infinix' if present
    model = re.sub(r'\binfinix\b', '', model).strip()

    # Remove any extra descriptive information in parentheses or after commas
    model = re.sub(r'\(.*?\)', '', model).strip()
    model = re.sub(r',.*$', '', model).strip()

    # Handle cases with just the core model name
    model = model.strip().lower()

    return model

def format_redmi_model(model):
    """
    Ensure that the model_name for the brand 'Redmi' includes 'Redmi' at the beginning and clean the brand name without removing 'Redmi'.
    """
    model = model.lower()  # Normalize case

    # Check if 'redmi' is not at the beginning, add it if necessary
    if not model.startswith('redmi'):
        model = 'redmi ' + model

    # Replace '+' with 'plus'
    model = re.sub(r'\+', ' plus', model)

    # Remove any extra descriptive information in parentheses or after commas
    model = re.sub(r'\(.*?\)', '', model).strip()
    model = re.sub(r',.*$', '', model).strip()
    model = re.sub(r'[\-\/].*$', '', model).strip()

    # Handle hyphens between numbers and specs
    model = re.sub(r'(\d+)-(\d+)', r'\1 \2', model)

    # Clean up unnecessary extra text but keep 'redmi'
    model = re.sub(r'\bredmi\b', 'redmi', model).strip()

    # Extract the core model name, keeping '4G' or '5G' if present
    model = re.sub(r'\b4g\b', '4g', model)
    model = re.sub(r'\b5g\b', '5g', model)

    return model.strip()


# Function to apply based on brand
def process_model(row):
    brand = row['brand'].lower()
    model_name = row['model_name']
    product_title = row['product_title']

    if brand == 'samsung':
        return format_samsung_model(model_name)
    elif brand == 'xiaomi':
        return format_xiaomi_brand(model_name)
    elif brand == 'apple':
        return format_apple_model(model_name)
    elif brand == 'honor':
        return format_honor_model(model_name)
    elif brand == 'oppo':
        return format_oppo_model(model_name)
    elif brand == 'nokia':
        return format_nokia_model(model_name,product_title)
    elif brand == 'infinix':
        return format_infinix_model(model_name)
    elif brand == 'realme':
        return format_realme_model(model_name)
    elif brand == 'redmi':
        return format_redmi_model(model_name)
    else:
        return model_name


# Apply function to the DataFrame
final_df['model_name'] = final_df.apply(process_model, axis=1)



final_df = final_df.dropna(subset=["ram_gb", "storage", "screen_size_in", "resolution", "cpu_speed_ghz", "cpu_model"], how='all')

# Convert `date_column` to datetime
final_df['date_column'] = pd.to_datetime(final_df['date_column'], errors='coerce')

# Convert `price_usd` to numeric
final_df['price_egp'] = pd.to_numeric(final_df['price_egp'], errors='coerce')

# final_df.to_csv(f'/content/drive/My Drive/Egypt-Data/Amazon_EG_11_pages-Cleaned.csv', index=False)
final_df['currency']='egp'


In [17]:
# Function to extract and remove network type
def extract_network(text):
    # Check if text is a string, otherwise return it unchanged
    if isinstance(text, str):
        match = re.search(r'(5g|4g)', text)
        if match:
            network = match.group(0)
            # Remove 5G/4G from the original text
            text = re.sub(r'\s?(5g|4g)\s?', '', text).strip()
            return text, network
    return text, None

# Initialize 'network' column as None
final_df['network'] = None

# Apply the extraction logic to each column and update 'network' column
for col in ['model_name', 'product_title', 'cellular_technology']:
    # Apply the function to extract network and updated column text
    final_df[col], extracted_networks = zip(*final_df[col].apply(extract_network))

    # Fill the 'network' column where it is None with the extracted network
    final_df['network'] = final_df['network'].combine_first(pd.Series(extracted_networks))

In [18]:
# # Fill any remaining None values in 'network' using 'cellular_technology'
final_df['network'] = final_df['network'].fillna(final_df['cellular_technology'])

In [19]:
def get_review_url( asin):
        if asin:
            return f"https://www.amazon.com/product-reviews/{asin}"
        return None
final_df['review_url']=final_df['asin'].apply(get_review_url)

In [20]:
final_df.rename(columns={'product_url': 'link','rate':'rating','Brand Name':'brand',
          'Model Name':'model_name','price_egp':'price','date_column':'date','asin':'ASIN'}, inplace=True)
final_df=final_df[['link', 'site', 'category', 'rating', 'image_url', 'brand','discount',
       'model_name', 'product_title', 'price', 'currency', 'os', 'ram_gb',
       'storage', 'screen_size_in', 'resolution', 'refresh_rate_hz',
       'cpu_speed_ghz', 'connectivity_technology', 'cpu_model', 'color',
       'wireless_carrier', 'date', 'all_reviews', 'ASIN', 'review_url',
       'network']]

In [21]:
# Extract numbers from 'storage', 'ram_gb', and 'screen_size_in' columns
final_df['storage'] = final_df['storage'].str.extract('(\d+)', expand=False).astype(float)
final_df['ram_gb'] = final_df['ram_gb'].str.extract('(\d+)', expand=False).astype(float)
final_df['screen_size_in'] = final_df['screen_size_in'].str.extract('(\d+(\.\d+)?)', expand=False)[0].astype(float)
final_df['refresh_rate_hz'] = final_df['refresh_rate_hz'].str.extract('(\d+)', expand=False).astype(float)
final_df['cpu_speed_ghz'] = final_df['cpu_speed_ghz'].str.extract('(\d+)', expand=False).astype(float)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  final_df['storage'] = final_df['storage'].str.extract('(\d+)', expand=False).astype(float)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  final_df['ram_gb'] = final_df['ram_gb'].str.extract('(\d+)', expand=False).astype(float)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  final_df['screen_size_in'

In [22]:
final_df['ram_gb'] = pd.to_numeric(final_df['ram_gb'], errors='coerce')
final_df['storage'] = pd.to_numeric(final_df['storage'], errors='coerce')
final_df['screen_size_in'] = pd.to_numeric(final_df['screen_size_in'], errors='coerce')
final_df['rating'] = pd.to_numeric(final_df['rating'], errors='coerce')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  final_df['ram_gb'] = pd.to_numeric(final_df['ram_gb'], errors='coerce')
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  final_df['storage'] = pd.to_numeric(final_df['storage'], errors='coerce')
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  final_df['screen_size_in'] = pd.to_numeric(final_df['screen

In [23]:
# Replace 'none' with 0 and convert the column to float
final_df['discount'] = final_df['discount'].replace('none', 1).astype(np.int64)
final_df['discount'] = pd.to_numeric(final_df['discount'], errors='coerce')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  final_df['discount'] = final_df['discount'].replace('none', 1).astype(np.int64)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  final_df['discount'] = pd.to_numeric(final_df['discount'], errors='coerce')


In [24]:
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
import os

# Azure Storage connection string
connect_str = "DefaultEndpointsProtocol=https;AccountName=ynwa;AccountKey=aTLtGZuymmaAirlsL1/39g3dDjaQvBFu3lHzQaUrY0o6LDdGduAW8Wbk8qI7fuilCKS8chuiklq7+AStBN3UdA==;EndpointSuffix=core.windows.net"

# Initialize BlobServiceClient
blob_service_client = BlobServiceClient.from_connection_string(connect_str)

# Name of the container and blob
container_name = "huayra"  # Replace with your container name
folder_name = "transformed"

# Name of the blob (file) with today's date
blob_name = f"{folder_name}/amazon_eg_tr{today_date}.csv"  # Example: scraped_data_2024-10-07.csv

# Create a container if it doesn't exist
container_client = blob_service_client.get_container_client(container_name)
try:
    container_client.create_container()
except Exception as e:
    print(f"Container already exists: {e}")


# Save the CSV locally (you can skip this if you already have the file path)
csv_file_path = f"scraped_data_{today_date}.csv"
final_df.to_csv(csv_file_path, index=False)

# Upload the CSV file to Blob Storage with today's date in the name
with open(csv_file_path, "rb") as data:
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
    blob_client.upload_blob(data, overwrite=True)
    print(f"CSV file {blob_name} uploaded successfully.")


Container already exists: The specified container already exists.
RequestId:89587ec3-301e-0063-37b2-206462000000
Time:2024-10-17T16:37:25.9973348Z
ErrorCode:ContainerAlreadyExists
Content: <?xml version="1.0" encoding="utf-8"?><Error><Code>ContainerAlreadyExists</Code><Message>The specified container already exists.
RequestId:89587ec3-301e-0063-37b2-206462000000
Time:2024-10-17T16:37:25.9973348Z</Message></Error>
CSV file transformed/amazon_eg_tr2024-10-17.csv uploaded successfully.


In [25]:
df=final_df.copy()

In [26]:
import pandas as pd
import numpy as np
from datetime import datetime
from azure.storage.blob import BlobServiceClient
import pyarrow as pa
import pyarrow.parquet as pq
import io
import json
import hashlib

def generate_unique_product_id(row):
    unique_string = f"{row['brand']}_{row['model_name']}_{row['storage']}_{row['ram_gb']}_{row['network']}"
    return hashlib.md5(unique_string.encode()).hexdigest()

def assign_partition(row, id_column, num_partitions=7):
    return int(hashlib.md5(str(row[id_column]).encode()).hexdigest(), 16) % num_partitions

def read_parquet_from_blob(blob_service_client, container_name, folder_name, file_name):
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=f"{folder_name}/{file_name}")
    stream = blob_client.download_blob()
    bytes_data = stream.readall()
    return pd.read_parquet(io.BytesIO(bytes_data))

def read_partitioned_parquet_from_blob(blob_service_client, container_name, folder_name, file_name):
    all_data = []
    for i in range(7):  # Assuming 7 partitions
        try:
            blob_client = blob_service_client.get_blob_client(container=container_name, blob=f"{folder_name}/{file_name}/{i}_{file_name}")
            stream = blob_client.download_blob()
            bytes_data = stream.readall()
            partition_data = pd.read_parquet(io.BytesIO(bytes_data))
            all_data.append(partition_data)
        except Exception as e:
            print(f"Error reading partition {i}: {e}")
    return pd.concat(all_data, ignore_index=True) if all_data else pd.DataFrame()

def write_to_parquet(data, blob_service_client, container_name, folder_name, file_name, partition_strategy=None):
    if partition_strategy is None:
        table = pa.Table.from_pandas(data)
        buf = io.BytesIO()
        pq.write_table(table, buf)
        buf.seek(0)

        blob_client = blob_service_client.get_blob_client(container=container_name, blob=f"{folder_name}/{file_name}")
        blob_client.upload_blob(buf.getvalue(), overwrite=True)
    else:
        data['partition'] = data.apply(partition_strategy, axis=1)
        for partition, partition_data in data.groupby('partition'):
            partition_data = partition_data.drop('partition', axis=1)
            table = pa.Table.from_pandas(partition_data)
            buf = io.BytesIO()
            pq.write_table(table, buf)
            buf.seek(0)

            blob_client = blob_service_client.get_blob_client(container=container_name,
                                                              blob=f"{folder_name}/{file_name}/{partition}_{file_name}")
            blob_client.upload_blob(buf.getvalue(), overwrite=True)

def update_fact_table(existing_fact, new_data):
    new_fact = process_fact_table(new_data)
    updated_fact = pd.concat([existing_fact, new_fact]).drop_duplicates(subset=['product_id','site_id','date','url'], keep='last')
    return updated_fact

def update_dim_device_specification(existing_dim, new_data):
    new_dim = process_dim_device_specification(new_data)
    updated_dim = pd.concat([existing_dim, new_dim]).drop_duplicates(subset=['product_id'], keep='last')
    return updated_dim

def update_product_mapping(existing_mapping, new_data):
    new_mapping = process_product_mapping(new_data)
    updated_mapping = pd.concat([existing_mapping, new_mapping]).drop_duplicates(subset=['product_id'], keep='last')
    return updated_mapping

def update_dim_site(existing_site, new_data):
    new_site = process_dim_site(new_data)
    updated_site = pd.concat([existing_site, new_site]).drop_duplicates(subset=['site_name'], keep='last')
    updated_site['site_id'] = range(len(updated_site))  # Reassign site_ids
    return updated_site

def update_dim_review(existing_review, new_data):
    new_review = process_dim_review(new_data)
    updated_review = pd.concat([existing_review, new_review]).drop_duplicates(subset=['review_id'], keep='last')
    return updated_review

def update_dim_date(existing_date, new_data):
    new_date = process_dim_date(new_data)
    updated_date = pd.concat([existing_date, new_date]).drop_duplicates(subset=['date_ID'], keep='last')
    return updated_date

def process_fact_table(df):
    fact_table = df[['product_id', 'site', 'date', 'price', 'currency', 'rating','link']].copy()
    fact_table['site_id'] = 2
    fact_table['discount'] = df['discount']
    fact_table['d_p'] = df['price']/df['discount']
    fact_table['price_without_discount'] = fact_table['price'] +fact_table['d_p']
    fact_table['rating_avg'] = fact_table['rating']
    fact_table['url']=fact_table['link']
    return fact_table[['product_id', 'url','site_id', 'date',  'price', 'currency', 'discount', 'price_without_discount', 'rating_avg']]

def process_dim_device_specification(df):
    return df[['product_id', 'product_title', 'image_url', 'os', 'screen_size_in', 'resolution', 'refresh_rate_hz', 'cpu_speed_ghz', 'cpu_model', 'color', 'wireless_carrier', 'category']].copy()

def process_product_mapping(df):
    return df[['product_id', 'model_name', 'brand', 'network', 'ram_gb', 'storage']].copy()

def process_dim_site(df):
    site_df = pd.DataFrame({'site_name': df['site'].unique()})
    site_df['site_id'] = range(len(site_df))
    return site_df

def process_dim_review(df):
    reviews_list = []

    for idx, row in df.iterrows():
        # Ensure that 'all_reviews' exists and is a list
        if isinstance(row['all_reviews'], list):
            for review in row['all_reviews']:
                # Ensure that 'review' is a dictionary
                if isinstance(review, dict):
                    # Use .get() to safely access dictionary keys
                    review_id = hashlib.md5(f"{row['product_id']}_{review.get('Date', '')}_{row['site']}".encode()).hexdigest()
                    review_text = review.get('Review Body', None)
                    review_rating = review.get('Rating', None).split()[0] if review.get('Rating') else None
                    review_date = pd.to_datetime(review.get('Date', '').split('on ')[-1], format='%d %B %Y', errors='coerce')

                    reviews_list.append({
                        'review_id': review_id,  # Unique review ID
                        'product_id': row['product_id'],
                        'product_reviews_url': row['review_url'],
                        'review_text': review_text,
                        'review_rating': review_rating,
                        'review_date': review_date,
                        'site': row['site']  # Ensure site column is properly handled
                    })



    # Convert the list of review dictionaries into a DataFrame
    review_df = pd.DataFrame(reviews_list)

    # # Convert the 'site' column to string explicitly to avoid ArrowInvalid issues
    # review_df['site'] = review_df['site'].astype(str)

    return review_df

def process_dim_date(df):
    dates = pd.to_datetime(df['date'].unique())
    date_df = pd.DataFrame({
        'date_ID': dates,
        'day': dates.day,
        'month': dates.month,
        'year': dates.year
    })
    return date_df

def main():
    # Azure Storage connection string
    connect_str = "DefaultEndpointsProtocol=https;AccountName=ynwa;AccountKey=aTLtGZuymmaAirlsL1/39g3dDjaQvBFu3lHzQaUrY0o6LDdGduAW8Wbk8qI7fuilCKS8chuiklq7+AStBN3UdA==;EndpointSuffix=core.windows.net"

    # Initialize BlobServiceClient
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)

    # Name of the container and folder
    container_name = "huayra"
    folder_name = "model"

    # Read new data (replace this with your actual data loading method)
    new_df = df.copy()

    # Read existing product ID mapping
    existing_product_id_mapping = read_parquet_from_blob(blob_service_client, container_name, folder_name, "product_id_mapping.parquet")

    # Generate unique product IDs for new data, using existing IDs where possible
    new_df['temp_id'] = new_df.apply(generate_unique_product_id, axis=1)
    new_df = pd.merge(new_df, existing_product_id_mapping[['brand', 'model_name', 'storage', 'ram_gb', 'network', 'product_id']],
                      on=['brand', 'model_name', 'storage', 'ram_gb', 'network'], how='left')
    new_df['product_id'] = new_df['product_id'].fillna(new_df['temp_id'])
    new_df = new_df.drop('temp_id', axis=1)

    # Update product ID mapping
    updated_product_id_mapping = pd.concat([existing_product_id_mapping, new_df[['brand', 'model_name', 'storage', 'ram_gb', 'network', 'product_id']]])
    updated_product_id_mapping = updated_product_id_mapping.drop_duplicates(subset=['brand', 'model_name', 'storage', 'ram_gb', 'network'], keep='last')

    # Read existing data from Azure Blob Storage
    existing_fact = read_partitioned_parquet_from_blob(blob_service_client, container_name, folder_name, "fact_table.parquet")
    existing_dim_device = read_parquet_from_blob(blob_service_client, container_name, folder_name, "dim_device_specification.parquet")
    existing_product_mapping = read_parquet_from_blob(blob_service_client, container_name, folder_name, "product_mapping.parquet")
    existing_dim_site = read_parquet_from_blob(blob_service_client, container_name, folder_name, "dim_site.parquet")
    existing_dim_review = read_partitioned_parquet_from_blob(blob_service_client, container_name, folder_name, "dim_review.parquet")
    existing_dim_date = read_parquet_from_blob(blob_service_client, container_name, folder_name, "dim_date.parquet")

    # Update tables
    updated_fact = update_fact_table(existing_fact, new_df)
    updated_dim_device = update_dim_device_specification(existing_dim_device, new_df)
    updated_product_mapping = update_product_mapping(existing_product_mapping, new_df)
    updated_dim_site = update_dim_site(existing_dim_site, new_df)
    updated_dim_review = update_dim_review(existing_dim_review, new_df)
    updated_dim_date = update_dim_date(existing_dim_date, new_df)

    # Define partition strategies
    fact_partition_strategy = lambda row: assign_partition(row, 'product_id')
    review_partition_strategy = lambda row: assign_partition(row, 'review_id')

    # Write updated tables back to Azure Blob Storage
    write_to_parquet(updated_fact, blob_service_client, container_name, folder_name, "fact_table.parquet", partition_strategy=fact_partition_strategy)
    write_to_parquet(updated_dim_device, blob_service_client, container_name, folder_name, "dim_device_specification.parquet")
    write_to_parquet(updated_product_mapping, blob_service_client, container_name, folder_name, "product_mapping.parquet")
    write_to_parquet(updated_dim_site, blob_service_client, container_name, folder_name, "dim_site.parquet")
    write_to_parquet(updated_dim_review, blob_service_client, container_name, folder_name, "dim_review.parquet", partition_strategy=review_partition_strategy)
    write_to_parquet(updated_dim_date, blob_service_client, container_name, folder_name, "dim_date.parquet")
    write_to_parquet(updated_product_id_mapping, blob_service_client, container_name, folder_name, "product_id_mapping.parquet")

if __name__ == "__main__":
    main()