In [None]:
import boto3
import csv
from ..celery import app
import pandas as pd
import numpy as np
from itertools import groupby
from io import StringIO
import logging
import os
from datetime import datetime
from dotenv import load_dotenv
from service.common.other import create_property_name_clean, get_latest_csv_group,get_specific_csv_group
from service.common.s3_utils import (log_error_to_csv)
from service.common.pipeline_utils import (insert_pipeline_run, update_pipeline_run_by_pipeline_run_id,insert_pipeline_error_log)
import psycopg2
from psycopg2.extras import execute_batch
from sqlalchemy import create_engine
import json

scraped_date = os.getenv('SCRAPED_DATE')

load_dotenv()

ACCESS_ID = os.getenv('ACCESS_ID')
ACCESS_KEY = os.getenv('ACCESS_KEY')
LISTINGS_INPUT_BUCKET_NAME = os.getenv('LISTINGS_INPUT_BUCKET_NAME')
LISTINGS_OUTPUT_BUCKET_NAME = os.getenv('LISTINGS_OUTPUT_BUCKET_NAME')
REGION_NAME = os.getenv('REGION_NAME')
INPUT_BUCKET_PREFIX = os.getenv('INPUT_BUCKET_PREFIX')
DLD_OUTPUT_BUCKET = os.getenv('DLD_OUTPUT_BUCKET')

INDEX_FIELD = 'listing_url'

DB_HOST = os.getenv('DB_HOST')
DB_PORT = os.getenv('DB_PORT')
DB_NAME = os.getenv('DB_NAME')
DB_USERNAME = os.getenv('DB_USERNAME')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_URI = os.getenv('DB_URI')


s3 = boto3.resource('s3', aws_access_key_id=ACCESS_ID,aws_secret_access_key=ACCESS_KEY, region_name=REGION_NAME)
s3_client = boto3.client('s3', aws_access_key_id=ACCESS_ID,aws_secret_access_key=ACCESS_KEY, region_name=REGION_NAME)

# bucket = s3.Bucket(LISTINGS_INPUT_BUCKET_NAME)
# prefix_objs = bucket.objects.filter(Prefix=INPUT_BUCKET_PREFIX + "/")
bucket = s3.Bucket("prop-ai-source-files")

propai_result_frames = []
other_result_frames = []

marketplace_list = ["PropertyFinder", "Bayut", "Dubizzle"]

# common 
PROPERTY_TYPE_MAP = {
    'Studio': 'Apartment',
    'Flat': 'Apartment',
    'Penthouse': 'Apartment',
    'Hotel apartments': 'Hotel Apartment',
    'Complex Villas': 'Villa',
    'Villa': 'Villa',
    'Duplex': 'Apartment',
    'Bungalow': "",
    'Townhouse': "Villa"
}

STATUS_MAP = {
    "off_plan": "Off-Plan",
    "Off_Plan": "Off-Plan",
    "off_plan_primary": "Off-Plan",
    "Off_Plan_Primary": "Off-Plan",
    "under-construction": "Off-Plan",
    "completed_primary": "Existing",
    "completed": "Existing",
    "Completed": "Existing",
    "Completed_primary":"Existing",
    "":"Existing",
    'Off_plan':'Off-Plan',
    'Ready':"Existing"
}


# bayut
supported_types_bayut = [
    'apartments',
    'apartment',
    'penthouses',
    'penthouse',
    'hotel apartments',
    'hotel apartment',
    'townhouses',
    'townhouse',
    'villas',
    'villa',
    'buildings',
    'building',
    'plots',
    'plot',
]

bayut_dic = {
    'apartments': 'Apartment',
    'apartment': 'Apartment',
    'penthouses': 'Penthouse',
    'penthouse': 'Penthouse',
    'hotel apartments': 'Hotel Apartment',
    'hotel apartment': 'Hotel Apartment',
    'townhouses': 'Townhouse',
    'townhouse': 'Townhouse',
    'villas': 'Villa',
    'villa': 'Villa',
    'buildings': 'Building',
    'building': 'Building',
    'plots': 'Land',
    'plot': 'Land',
}

column_map_bayut = {
    'url': 'listing_url',
    'website': 'listing_source',
    'country': 'country_name',
    'city': 'city_name',
    'listing_name': 'property_name',
    'price': 'asking_price',
    'price_currency': 'asking_price_currency',
    'property_type': 'property_category',
    'property_sub_type': 'property_type',
    'address': 'property_address',
    'area': 'city_region',
    'latitude': 'map_coordinates_latitude',
    'longitude': 'map_coordinates_longitude',
    'description': 'description',
    'for_sale': 'for_sale',
    'for_rent': 'for_rent',
    'plot_area': 'total_area',
    'plot_area_units': 'total_area_units',
    'bathroom_count': 'bathrooms_total',
    'room_count': 'rooms_total',
    'amenities': 'amenities_text',
    'date_listed': 'date_listed',
    'bayut_reference_number': 'listing_id',
    'completion_status': 'completion_status',
    'realtor_name': 'list_agent_full_name',
    'realtor_email': 'list_agent_email',
    'realtor_cell_phone': 'list_agent_mobile_phone',
    'realtor_phone': 'list_agent_direct_phone',
    'realtor_permit_number': 'list_agent_mls_id',
    'agent_name': 'list_agent_full_name',
    'agent_email': 'list_agent_email',
    'agent_cell_phone': 'list_agent_mobile_phone',
    'agent_phone': 'list_agent_direct_phone',
    'agency_name': 'list_office_name',
    'furnishing_status': 'furnished_info',
    'agent_whatsapp': 'agent_whatsapp',
    # 'agency_orn':'agency_orn',
    'agent_orn':'agent_orn',
    'trakheesi_permit_number': 'trakheesi_permit_number',
    'rera_trakheesi_listing_permit':'list_agent_mls_id',
    'building_name': 'building_name',
    'trucheck_verified': 'is_verified',
    'dubailand_listing_validation_url':'dubailand_listing_validation_url',
    'building_built_date':'building_built_date',
    'building_floor_count':'building_floor_count',
    'building_builtup_area_sqm':'building_builtup_area_sqm',
    'developer_project_name':'developer_project_name',
    'listing_floor_number':'listing_floor_number',
    'trakheesi_start':'trakheesi_start',
    'trakheesi_start':'trakheesi_start',
    'trakheesi_end':'trakheesi_end',
    'is_qr_code_link_available':'is_qr_code_link_available',
    'regulatory_status':'regulatory_status',
    'is_permit_delisted':'is_permit_delisted',
    'orn_license':'orn_license',
    'ded_license':'ded_license',
    'rera_license':'rera_license',
    'service_charge_sqft':'service_charge_sqft'
}


# propertyfinder
supported_types_propertyfinder = [
    'apartment',
    'duplex',
    'penthouse',
    'hotel & hotel apartment',
    'townhouse',
    'villa',
    'bungalow',
    'whole building',
    'land',
]

propertyfinder_dic = {
    'apartment': 'Apartment',
    'duplex': 'Duplex',
    'penthouse': 'Penthouse',
    'hotel & hotel apartment': 'Hotel Apartment',
    'townhouse': 'Townhouse',
    'villa': 'Villa',
    'bungalow': 'Bungalow',
    'whole building': 'Building',
    'land': 'Land',
}

column_map_propertyfinder = {
    'url': 'listing_url',
    'website': 'listing_source',
    'country': 'country_name',
    'city': 'city_name',
    'listing_name': 'property_name',
    'price': 'asking_price',
    'price_currency': 'asking_price_currency',
    'property_type': 'property_category',
    'property_sub_type': 'property_type',
    'address': 'property_address',
    'area': 'city_region',
    'latitude': 'map_coordinates_latitude',
    'longitude': 'map_coordinates_longitude',
    'description': 'description',
    'for_sale': 'for_sale',
    'for_rent': 'for_rent',
    'built_area': 'total_area',
    'built_area_units': 'total_area_units',
    'bathroom_count': 'bathrooms_total',
    'room_count': 'rooms_total',
    'amenities': 'amenities_text',
    'date_listed': 'date_listed',
    'agent_id': 'list_agent_mls_id',
    'agent_call': 'list_agent_mobile_phone',
    'agent_name': 'list_agent_full_name',
    'email': 'list_agent_email',
    'broker_id': 'list_office_mls_id',
    'broker_name': 'list_office_name',
    'reference': 'listing_id',
    'agent_phone': 'list_agent_mobile_phone',
    'agent_email': 'list_agent_email',
    'agency_id': 'list_office_mls_id',
    'propertyfinder_reference_number': 'listing_id',
    'completion_status': 'completion_status',
    'building_name': 'building_name',
    'furnishing_status': 'furnished_info',
    'is_furnished': 'furnished_info',
    'agent_whatsapp': 'agent_whatsapp',
    'is_verified': 'is_verified',
    # 'agency_orn_number':'agency_orn',
    'agent_orn_number':'agent_orn',
    'dubailand_listing_validation_url':'dubailand_listing_validation_url'                           
}


# dubizzle
supported_types_dubizzle = [        
    'apartment',
    'commercial villa',
    'penthouse',
    'residential building',
    'townhouse',
    'villa',
    'villa compound'
]

dubizzle_dic = {
    'Apartment':'Apartment',
    'Commercial Villa':'Commercial Villa', 
    'Penthouse':'Penthouse',
    'Residential Building':'Resedential Building',
    'Townhouse':'Townhouse',
    'Villa':'Villa',
    'Villa Compound':'Villa Compound'
}

column_map_dubizzle = {
    'url': 'listing_url',
    'country': 'country_name',
    'city': 'city_name',
    'region': 'city_region',
    'location':'property_address',
    'address':'property_address',
    'property_category': 'property_category',
    'agency_id':'list_office_mls_id',
    'agency_name':'list_office_name',   
    'agent_name': 'list_agent_full_name',
    'agent_email': 'list_agent_email',
    'agent_id': 'list_agent_mls_id',
    'agent_phone': 'list_agent_mobile_phone',
    'agent_brn':'agent_brn_number',              
    'listing_price': 'asking_price',
    'listing_price_currency': 'asking_price_currency',
    'listing_size': 'total_area',
    'listing_bathrooms': 'bathrooms_total',
    'listing_bedrooms':'rooms_total',
    'amenities': 'amenities_text',                         
    'lat': 'map_coordinates_latitude',
    'lon': 'map_coordinates_longitude',
    'listing_description': 'description',
    'listing_name': 'property_name',
    'trakheesi_permit_number':'trakheesi_permit_number',               
    'property_sub_category': 'property_type',
    'property_furnished':'furnished_info',
    'property_completion_status': 'completion_status',
    'for_sale': 'for_sale',
    'for_rent': 'for_rent',
    'property_updated_at': 'date_listed',
    'property_listing_id': 'listing_id',
    'dubailand_listing_validation_url':'dubailand_listing_validation_url',
    'photos_main_urls':'photo_urls',                   
    'building_name': 'building_name',   
    # 'rera_registration_number':'agency_orn',                                
}


In [None]:

@app.task
def process(task_chain_name, scraped_date=None, marketplace=None, task_id=None):
    """_summary_
    Triggers the CSV processing function for a specified marketplace or for all marketplaces.
    Args:
        scraped_date (_type_, optional):
            The date of the scraped data. 
            Defaults to None.
        marketplace (_type_, optional): 
            The marketplace to process (e.g., "propertyfinder", "bayut", "dubizzle").
            Defaults to None.
    """
    if not scraped_date:
        logging.error("scraped_date is required but not provided.")
        return
    
    task_name = f'Active Listing - {marketplace}'
    pipeline_details = {
        "pipeline_name": task_name,
        "pipeline_name_code": task_chain_name,
        "pipeline_run_id": task_id,
        "scraped_date": scraped_date,        
        "marketplace": marketplace,
    }
    task_id = insert_pipeline_run(pipeline_details)
    
    try:
        print(f"marketplace: {marketplace} , scraped_date: {scraped_date}")
        if(marketplace):
            logging.info(f'CSV processing triggered for marketplace: {marketplace}')
        else:
            logging.info('CSV processing function triggered for all marketplaces')
        
        
        # Define the marketplace keywords you're looking for
        if (marketplace):
            required_marketplaces = [marketplace.lower()]
        else:
            required_marketplaces = ['bayut', 'dubizzle', 'propertyfinder']

        # A dictionary to track if the files for each marketplace exist
        marketplace_files = {required_marketplace: False for required_marketplace in required_marketplaces}

        # Loop through the objects in the bucket and check the presence of files for each marketplace
        prefix_objs = bucket.objects.filter(Prefix="propai" + "/" + scraped_date + "/")
        all_csvs = []
        csvs = []
        for obj in prefix_objs:
            if obj.key.endswith('.csv'):
                all_csvs.append(obj.key)
                csvs.append(s3_client.get_object(
                    Bucket="prop-ai-source-files", Key=obj.key))

                # Check which marketplace the file belongs to
                for required_marketplace in required_marketplaces:
                    if required_marketplace in obj.key:
                        marketplace_files[required_marketplace] = True

        # Check if all required marketplaces have files
        missing_marketplaces = [required_marketplace for required_marketplace, exists in marketplace_files.items() if not exists]
        if missing_marketplaces:
            raise ValueError(f"Missing CSV files for marketplaces: {', '.join(missing_marketplaces)}")
        else:
            print("All required marketplaces are present. Proceeding with the next steps.")


        logging.info(f'CSV processing started for marketplace: {marketplace}')
        process_cvs([all_csvs], marketplace)
        logging.info('CSV processing finished successfully')        
        update_pipeline_run_by_pipeline_run_id(task_id, {"run_status":'SUCCESS'})
        
    except Exception as e:
        print(f"Error occurred: {e}")
        error_file = log_error_to_csv(task_name, str(e), LISTINGS_OUTPUT_BUCKET_NAME)
        insert_pipeline_error_log(task_id,{"error_message":str(e)})        
        update_pipeline_run_by_pipeline_run_id(task_id,{"run_status":'FAILED', "run_status_info":str(e), "error_file":error_file})


def agency_orn_fillna_for_markerplace(df):
    """
    Fills missing values in the 'agency_orn' column using values from 'agency_orn_number', 
    'orn_license', and 'rera_registration_number' columns (in that order).
    """
    df['agency_orn'] = (df.get('agency_orn', pd.Series([pd.NA] * len(df))) 
                        .fillna(df.get('agency_orn_number', pd.Series([pd.NA] * len(df)))) 
                        .fillna(df.get('orn_license', pd.Series([pd.NA] * len(df)))) 
                        .fillna(df.get('rera_registration_number', pd.Series([pd.NA] * len(df)))))
    
    if 'rera_registration_number' in df.columns:
        df.drop(columns=['rera_registration_number'], inplace=True)

    return df


# process_cvs
def process_cvs(grouped_objects, marketplace=None, task_id=None):
    for csvs in grouped_objects:
        csv_folder_name = csvs[0].split('/')[1] 
        try:
            for csv_name in csvs:
                bayut_key = 'ae_bayut_listing_' + \
                    csv_name.split('/')[1].replace('-', '') + '.csv'
                propertyfinder_key = 'ae_propertyfinder_listing_' + \
                    csv_name.split('/')[1].replace('-', '') + '.csv'
                zoom_key = 'ae_zoomproperty_listing_' + \
                    csv_name.split('/')[1].replace('-', '') + '.csv'
                dubizzle_key = 'ae_dubizzle_listing_' + \
                    csv_name.split('/')[1].replace('-', '') + '.csv' 
        except:
            raise ValueError("incorrect values")
        
        logging.info(f'CSVs to be processed for marketplace: {marketplace}')

        # Load the correct source columns based on the date and marketplace
        if marketplace == "PropertyFinder":
            source_column_name_list_propertyfinder = get_source_column_name_lists_dicts_by_date(csv_folder_name)["propertyfinder"]                            
        elif marketplace == "Bayut":    
            source_column_name_list_bayut = get_source_column_name_lists_dicts_by_date(csv_folder_name)["bayut"]        
        elif marketplace == "Dubizzle":
            source_column_name_list_dubizzle = get_source_column_name_lists_dicts_by_date(csv_folder_name)["dubizzle"]
        else: 
           source_column_name_list_dubizzle = get_source_column_name_lists_dicts_by_date(csv_folder_name)["dubizzle"]
           source_column_name_list_propertyfinder = get_source_column_name_lists_dicts_by_date(csv_folder_name)["propertyfinder"]                            
           source_column_name_list_bayut = get_source_column_name_lists_dicts_by_date(csv_folder_name)["bayut"]       
        

        # Process only the marketplace specified
        if marketplace == "Bayut":
            process_bayut(csv_folder_name, bayut_key, source_column_name_list_bayut)
        elif marketplace == "PropertyFinder":
            process_propertyfinder(csv_folder_name, propertyfinder_key, source_column_name_list_propertyfinder)
        elif marketplace == "Dubizzle":
            process_dubizzle(csv_folder_name, dubizzle_key, source_column_name_list_dubizzle)
        else:
            process_dubizzle(csv_folder_name, dubizzle_key, source_column_name_list_dubizzle)
            process_bayut(csv_folder_name, bayut_key, source_column_name_list_bayut)
            process_propertyfinder(csv_folder_name, propertyfinder_key, source_column_name_list_propertyfinder)
            
        process_result_frames(csv_folder_name, marketplace)

def process_bayut(csv_folder_name, bayut_key, source_column_name_list_bayut):
    try:
        df = pd.read_csv(f's3://prop-ai-source-files/propai/{csv_folder_name}/{bayut_key}', header=0, storage_options={"key": ACCESS_ID, "secret": ACCESS_KEY})
        if 'listing_floor_count' in df.columns:
            df.rename(columns={'listing_floor_count': 'listing_floor_number'}, inplace=True)
        print(df.info())
        df2 = df[source_column_name_list_bayut]

        num_of_records = df2.shape[0]
        print("Number of records Bayut:", num_of_records)
        bayut_present=True

        df2 = agency_orn_fillna_for_markerplace(df2)

        df2 = df2.rename(columns = column_map_bayut)
        df2['listing_source'] = "Bayut"
        df2['country_name'] = "UAE"

        num_of_records = df2.shape[0]
        print("Number of records Bayut:", num_of_records)
        df2['property_category'] = df2['property_category'].str.lower()
        df2 = df2[df2['property_category'] == "residential"]
        num_of_records = df2.shape[0]
        print("Number of records Bayut:", num_of_records)
            
        df2['property_type'] = df2['property_type'].str.lower()
        df2 = df2[df2['property_type'].isin(supported_types_bayut)]

        num_of_records = df2.shape[0]
        print("Number of records Bayut:", num_of_records)
        print(df2['for_sale'].unique())
        print(df2['for_rent'].unique())

        print(df2['for_sale'].unique())
        print(df2['for_rent'].unique())

        print(df2[['for_sale', 'for_rent']].dtypes)

        df2 = df2[(df2['for_sale'] == True) | (df2['for_rent'] == True)]
        num_of_records = df2.shape[0]
        print("Number of records Bayut:", num_of_records)

        df2['property_category'] = df2['property_category'].str[:1].str.upper() + df2['property_category'].str[1:]
        df2['total_area_sqm'] = df2['total_area']
        df2['total_area_sqft'] = df2['total_area']

        df2.loc[df2['country_name'] == 'United Arab Emirates', 'country_name'] = 'UAE'

        num_of_records = df2.shape[0]
        print("Number of records Bayut:", num_of_records)

        # Assuming df2 is your DataFrame
        print(df2['total_area'].dtype)  # Check data type of 'total_area'

        # Convert 'total_area' to numeric, handling errors
        df2['total_area'] = pd.to_numeric(df2['total_area'], errors='coerce')

        df2.loc[df2['total_area_units'].str.lower() == 'sqm', 'total_area_sqft'] = df2['total_area'] * 10.7639
        df2.loc[df2['total_area_units'].str.lower() == 'sqft', 'total_area_sqm'] = df2['total_area'] * 0.092903

        df2['property_type'].replace(bayut_dic, inplace=True)

        df2['date_listed'] = pd.to_datetime(df2["date_listed"], dayfirst=True, format='mixed').dt.strftime('%d/%m/%Y')
        df2["date_scraped"] = csv_folder_name
        df2['date_scraped'] = pd.to_datetime(df2['date_scraped'], format='%Y-%m-%d', errors='coerce')
        df2 = df2.drop(['total_area', 'total_area_units'], axis=1)

        df2 = df2.assign(furnished_yn=(df2["furnished_info"] =="furnished"))

        num_of_records = df2.shape[0]
        print("Number of records Bayut:", num_of_records)
            
        try:
            #merging photo urls
            bayut_photo_key = bayut_key.replace('listing', 'photo')
            df_photo = pd.read_csv(f's3://prop-ai-source-files/propai/{csv_folder_name}/{bayut_photo_key}', header=0, storage_options={"key": ACCESS_ID, "secret": ACCESS_KEY})
            photos = df_photo.groupby('url')['photo_url'].agg(list).reset_index().rename(columns={'photo_url':'photo_urls'})
            num_of_records = df2.shape[0]
            print("Number of records Bayut:", num_of_records)
            df2 = pd.merge(df2, photos,left_on="listing_url", right_on="url").drop("url", axis=1)
        except Exception as e:
            logging.exception('Error while merging photo URLs: %s', str(e))

        #other fields on df
        propai_columns_bayut = [x for x in source_column_name_list_bayut if x not in ['url']]
        df = df.drop(propai_columns_bayut, axis=1)
        df = df.rename(columns = {'url':INDEX_FIELD})
        
        # Append to global lists
        other_result_frames.append(df)
        propai_result_frames.append(df2)   
    except Exception as e:
        logging.exception('Error processing Bayut: %s', str(e))

def process_propertyfinder(csv_folder_name, propertyfinder_key, source_column_name_list_propertyfinder):
    try:
        df3 = pd.read_csv(f's3://prop-ai-source-files/propai/{csv_folder_name}/{propertyfinder_key}', header=0, storage_options={"key": ACCESS_ID, "secret": ACCESS_KEY})
        print(df3.info())
        
        if 'dubailand_listing_validation_url' not in df3.columns:
            df3['dubailand_listing_validation_url'] = ''
        
        df4 = df3[source_column_name_list_propertyfinder]
        num_of_records = df4.shape[0]
        print("Number of records Propertyfinder:", num_of_records)

        df4 = agency_orn_fillna_for_markerplace(df4)

        df4 = df4.rename(columns = column_map_propertyfinder)
        property_finder_present=True
        df4 = df4.drop(columns=['agency_orn_number'])
        df4['listing_source'] = "PropertyFinder"  
    
        num_of_records = df4.shape[0]
        print("Number of records Propertyfinder:", num_of_records)
        df4 = df4[df4['property_category'] == "residential"]
        num_of_records = df4.shape[0]
        print("Number of records Propertyfinder:", num_of_records)

        df4['property_type'] = df4['property_type'].str.lower()
        df4 = df4[df4['property_type'].isin(supported_types_propertyfinder)] 
        num_of_records = df4.shape[0]
        print("Number of records Propertyfinder:", num_of_records)
        df4 = df4[(df4['for_sale'] == True) | (df4['for_rent'] == True)]

        num_of_records = df4.shape[0]
        print("Number of records Propertyfinder:", num_of_records)

        df4['property_category'] = df4['property_category'].str[:1].str.upper() + df4['property_category'].str[1:]
        df4['total_area_sqm'] = df4['total_area']
        df4['total_area_sqft'] = df4['total_area']

        df4.loc[df4['country_name'] == 'United Arab Emirates', 'country_name'] = 'UAE'

        df4.loc[df4['total_area_units'].str.lower() == 'sqm', 'total_area_sqft'] = df4['total_area'] * 10.7639
        df4.loc[df4['total_area_units'].str.lower() == 'sqft', 'total_area_sqm'] = df4['total_area'] * 0.092903

        num_of_records = df4.shape[0]
        print("Number of records Propertyfinder:", num_of_records)

        df4['property_type'].replace(propertyfinder_dic, inplace=True)

        df4['date_listed'] = pd.to_datetime(df4["date_listed"], dayfirst=True, format='mixed').dt.strftime('%d/%m/%Y')
        df4["date_scraped"] = csv_folder_name
        df4['date_scraped'] = pd.to_datetime(df4['date_scraped'], format='%Y-%m-%d', errors='coerce')
        df4 = df4.drop(['total_area', 'total_area_units'], axis=1)
        df4['furnished_info'] = df4['furnished_info'].map({True: 'furnished', False: 'unfurnished'})

        num_of_records = df4.shape[0]
        print("Number of records Propertyfinder:", num_of_records)

        df4 = df4.assign(furnished_yn=df4["furnished_info"])
        try:
            #merging photo urls
            propertyfinder_photo_key = propertyfinder_key.replace('listing', 'photo')
            # df_photo = pd.read_csv(f's3://{INPUT_BUCKET_NAME}/propai/{csv_folder_name}/{propertyfinder_photo_key}', header=0)
            df_photo = pd.read_csv(f's3://prop-ai-source-files/propai/{csv_folder_name}/{propertyfinder_photo_key}', header=0, storage_options={"key": ACCESS_ID, "secret": ACCESS_KEY})
                
            photos = df_photo.groupby('url')['photo_url'].agg(list).reset_index().rename(columns={'photo_url':'photo_urls'})
            df4 = pd.merge(df4, photos,left_on="listing_url", right_on="url").drop("url", axis=1)

            num_of_records = df4.shape[0]
            print("Number of records Propertyfinder:", num_of_records)
        except Exception as e:
            logging.exception('Error while merging photo URLs: %s', str(e))
            property_finder_present=False
            #other fields on df
            propai_columns_propertyfinder = [x for x in source_column_name_list_propertyfinder if x not in ['url']]
            df3 = df3.drop(propai_columns_propertyfinder, axis=1)
            df3 = df3.rename(columns = {'url':INDEX_FIELD})
            
        # Append to global lists
        other_result_frames.append(df3)
        propai_result_frames.append(df4) 
    except Exception as e:
        logging.exception('Error processing PropertyFinder: %s', str(e))

def process_dubizzle(csv_folder_name, dubizzle_key, source_column_name_list_dubizzle):
    try:
        print(f's3://prop-ai-source-files/propai/{csv_folder_name}/{dubizzle_key}')
        df7 = pd.read_csv(f's3://prop-ai-source-files/propai/{csv_folder_name}/{dubizzle_key}', header=0, storage_options={"key": ACCESS_ID, "secret": ACCESS_KEY})
        print(len(df7))

        if 'location' in df7.columns:
            df7.rename(columns={'location': 'address'}, inplace=True) 
        # Check if the column 'photos_main_urls' exists; if not, add it
        if 'photos_main_urls' not in df7.columns:
            df7['photos_main_urls'] = '' 
        print("Dubizzle")
        print(df7.info())
            
        rename_dict = {
                'area': 'region',
                'property_type': 'property_category',
                'agent_bayut_id': 'agent_id',
                'agent_brn_number': 'agent_brn',
                'price': 'listing_price',
                'price_currency': 'listing_price_currency',
                'built_area': 'listing_size',
                'bathroom_count': 'listing_bathrooms',
                'room_count': 'listing_bedrooms',
                'latitude': 'lat',
                'longitude': 'lon',
                'rera_permit_number': 'trakheesi_permit_number',
                'property_sub_type': 'property_sub_category',
                'furnishing_status': 'property_furnished',
                'completion_status': 'property_completion_status',
                'updated_at': 'property_updated_at',
                'website_provided_id': 'property_listing_id',
                'tower_name': 'building_name',
                'description':'listing_description'
        }
            
        # df7.rename(columns=rename_dict, inplace=True)
            
        df8 = df7[source_column_name_list_dubizzle]
        dubizzle_present=True
        df8 = agency_orn_fillna_for_markerplace(df8)

        df8 = df8.rename(columns = column_map_dubizzle)
        df8['listing_source'] = "Dubizzle"                
        df8 = df8[df8['property_category'] == "residential"]
        print('df8 - 631') 
        print(df8.info())
        unique_values_counts = df8['property_type'].value_counts()
        # Display the unique values and their counts
        print("Unique values and their counts:")
        print(unique_values_counts)
        
        # Normalize the 'property_type' column to lowercase
        df8['property_type'] = df8['property_type'].str.lower()
 
        df8 = df8[df8['property_type'].isin(supported_types_dubizzle)]
        df8 = df8[(df8['for_sale'] == True) | (df8['for_rent'] == True)]
        
        df8['property_category'] = df8['property_category'].str[:1].str.upper() + df8['property_category'].str[1:]
        df8['total_area_sqm'] = df8['total_area']
        df8['total_area_sqft'] = df8['total_area']

        df8['total_area_units']='sqft'
        print('df8 - 645') 
        print(df8.info())

        df8.loc[df8['total_area_units'].str.lower() == 'sqm', 'total_area_sqft'] = df8['total_area'] * 10.7639
        df8.loc[df8['total_area_units'].str.lower() == 'sqft', 'total_area_sqm'] = df8['total_area'] * 0.092903
        df8['property_type'].replace(dubizzle_dic, inplace=True)

        print('df8 - 652') 
        print(df8.info())
        
        # Replace ":" after the date part with a space
        df8['date_listed'] = df8['date_listed'].str.replace(r'(\d{4}-\d{2}-\d{2}):', r'\1 ', regex=True)

        
        df8['date_listed'] = pd.to_datetime(df8["date_listed"], dayfirst=True, format='mixed').dt.strftime('%d/%m/%Y')
        df8["date_scraped"] = csv_folder_name
        df8['date_scraped'] = pd.to_datetime(df8['date_scraped'], format='%Y-%m-%d', errors='coerce')
        df8 = df8.drop(['total_area', 'total_area_units'], axis=1)

        df8['photo_urls'] = df8['photo_urls'].apply(lambda x: x.strip("[]").split(", "))
        print('df8 - 661') 
        print(df8.info())
        df8 = df8.assign(furnished_yn=(df8["furnished_info"] =="Furnished"))
        
        print('df7') 
        print(df7.info()) 
        print('df8')

        print(df8.info())
        print("Dubizzle finished")

        # Append to global lists
        other_result_frames.append(df7)
        propai_result_frames.append(df8)    

        propai_output_frame = pd.concat(propai_result_frames, ignore_index=True)
          
        unique_listing_sources = propai_output_frame['listing_source'].unique()
        # Printing the unique values
        print("unique_listing_sources in propai_output_frame:")
        print(unique_listing_sources)    
    except Exception as e: 
        logging.exception('Error processing Dubizzle: %s', str(e))

def process_result_frames(csv_folder_name, marketplace):
    propai_output_frame = pd.concat(propai_result_frames, ignore_index=True)
          
    unique_listing_sources = propai_output_frame['listing_source'].unique()
    # Printing the unique values
    print("unique_listing_sources in propai_output_frame:")
    print(unique_listing_sources)
        
    other_output_frame = pd.concat(other_result_frames, ignore_index=True)
    propai_output_frame['photo_urls'] = propai_output_frame["photo_urls"].apply(lambda x: create_photo_url_objs(x))
    propai_output_frame["property_address_clean"] = propai_output_frame["property_address"].apply(clean_address)
       
    all_listings_frame = propai_output_frame.copy()
    
    # Ensure that 'is_verified' is present in all_listings_frame
    if 'is_verified' not in all_listings_frame.columns:
        all_listings_frame['is_verified'] = False    

    #address population for latest stuff
    # dubai_address_mapping_frame = new_cheat_df.copy()
    all_listings_frame['property_address_clean'] = all_listings_frame['property_address_clean'].str.replace('UNITED ARAB EMIRATES', '', regex=False)
    all_listings_frame['property_address_clean'] = all_listings_frame['property_address_clean'].str.replace('UAE', '', regex=False)
    all_listings_frame['property_address_clean'] = all_listings_frame['property_address_clean'].str.replace(r'\bDUBAI\s+DUBAI\b', 'DUBAI', regex=True)

    # Remove any leading and trailing whitespaces
    all_listings_frame['property_address_clean'] = all_listings_frame['property_address_clean'].str.strip()

    # cheat_df_old= create_cheatsheet_from_db(DATABASE, 'dubai_address_mapping')
    cheat_df_old =pd.read_csv(f's3://{LISTINGS_OUTPUT_BUCKET_NAME}/extra/old_address_mapping.csv', storage_options={"key": ACCESS_ID, "secret": ACCESS_KEY}, header=0)
    cheat_df_old = cheat_df_old.rename(columns={'Address': 'address'})
    cheat_df_old['address'] = cheat_df_old['address'].drop_duplicates()
    cheat_df_old['address'] =  cheat_df_old['address'].str.replace('UAE', '', regex=False)
    cheat_df_old['address'] = cheat_df_old['address'].str.replace('UNITED ARAB EMIRATES', '', regex=False)
    cheat_df_old['address'] = cheat_df_old['address'].str.replace(r'\bDUBAI\s+DUBAI\b', 'DUBAI', regex=True)
    cheat_df_old['address'] =  cheat_df_old['address'].str.strip()

    all_listings_frame = pd.merge(all_listings_frame, cheat_df_old, how="left", left_on=["property_address_clean"], right_on=["address"])
    all_listings_frame = all_listings_frame.drop_duplicates(subset='listing_url')

    # cheat_df_new= create_cheatsheet_from_db(DATABASE, 'propai_listings_address_mapping')
    cheat_df_new =pd.read_csv(f's3://{LISTINGS_OUTPUT_BUCKET_NAME}/extra/new_address_mapping.csv', storage_options={"key": ACCESS_ID, "secret": ACCESS_KEY}, header=0)
    cheat_df_new['address'] = cheat_df_new['address'].drop_duplicates()
    cheat_df_new['address'] =  cheat_df_new['address'].str.replace('UAE', '', regex=False)
    cheat_df_new['address'] = cheat_df_new['address'].str.replace('UNITED ARAB EMIRATES', '', regex=False)
    cheat_df_new['address'] = cheat_df_new['address'].str.replace(r'\bDUBAI\s+DUBAI\b', 'DUBAI', regex=True)
    cheat_df_new['address'] =  cheat_df_new['address'].str.strip()

    all_listings_frame = pd.merge(all_listings_frame, cheat_df_new, how="left", left_on=["property_address_clean"], right_on=["address"])
    all_listings_frame = all_listings_frame.drop_duplicates(subset='listing_url')
    # Drop the 'address_y' column
    all_listings_frame = all_listings_frame.drop(columns=['address_y'])
    all_listings_frame = all_listings_frame.drop(columns=['_id_x'])
    all_listings_frame = all_listings_frame.drop(columns=['_id_y'])

    # Rename 'address_x' to 'address'
    all_listings_frame = all_listings_frame.rename(columns={'address_x': 'address'})

    #---------------------------------------------- agent details -------------------------------------------------------
    agent_details= pd.read_csv(f's3://{LISTINGS_OUTPUT_BUCKET_NAME}/extra/dld_agents_public.csv', storage_options={"key": ACCESS_ID, "secret": ACCESS_KEY}, header=0)
    agent_details['agent_brn_number'] = agent_details['agent_brn_number'].drop_duplicates()
        
    # Convert 'agent_brn_number' column to string in both DataFrames
    all_listings_frame['agent_brn_number'] = all_listings_frame['agent_brn_number'].astype(str)
    agent_details['agent_brn_number'] = agent_details['agent_brn_number'].astype(str)

    # Merge the DataFrames
    all_listings_frame = pd.merge(all_listings_frame, agent_details, how="left", left_on=["agent_brn_number"], right_on=["agent_brn_number"])
    # Fill NaN values in 'agent_brn_number' column with an empty string
    all_listings_frame['agent_brn_number'] = all_listings_frame['agent_brn_number'].fillna("")
    # Replace "nan" strings with an empty string "" in the 'agent_brn_number' column
    all_listings_frame['agent_brn_number'] = all_listings_frame['agent_brn_number'].replace("nan", "")
    #---------------------------------------------------------------------------------------------------------------------

    all_listings_frame = all_listings_frame.rename(columns={
        'project_name_prop-ai':'project_name_prop_ai' ,
        'master_project_prop-ai':'master_project_prop_ai' ,
        'area_name_prop-ai':'area_name_prop_ai' ,
        'development_name_prop-ai':'development_name_prop_ai' 
        # Add more columns as needed
    })
        
    # all_listings_frame = pd.merge(all_listings_frame, cheat_df, how="left", left_on=["property_address_clean"], right_on=["address"])

    object_cols = all_listings_frame.select_dtypes(include=['object']).columns.tolist()
    all_listings_frame[object_cols] = all_listings_frame[object_cols].fillna('')

    # all_listings_frame["Development Name"] = all_listings_frame["development_name"]
    # all_listings_frame["development_name"] = all_listings_frame.apply(merge_columns, axis=1)
    all_listings_frame['is_verified'] = all_listings_frame['is_verified'].apply(map_values)
    all_listings_frame["Property Type"] = all_listings_frame["property_type"].replace(PROPERTY_TYPE_MAP)
    all_listings_frame["Number of Bedrooms"] = all_listings_frame["rooms_total"]
    all_listings_frame["Development Status"] = all_listings_frame["completion_status"].replace(STATUS_MAP)
    all_listings_frame["Area in SQM"] = all_listings_frame["total_area_sqft"] / 10.7639
    all_listings_frame['asking_price'] = pd.to_numeric(all_listings_frame['asking_price'], errors='coerce')
    all_listings_frame['Area in SQM'] = pd.to_numeric(all_listings_frame['Area in SQM'], errors='coerce')
    all_listings_frame["Price per SQM"] = all_listings_frame.apply(lambda row: calculate_price_per_sqm(row['asking_price'], row['total_area_sqft']), axis=1)
    all_listings_frame['property_name_clean'] = all_listings_frame.apply(lambda row: create_property_name_clean(row['area_name_prop_ai'], row['development_name_prop_ai'], row['Property Type'], row['Number of Bedrooms']), axis=1)
        
    # all_listings_frame = all_listings_frame.loc[all_listings_frame['asking_price'] != 0]
    # dubai_developments_frame = all_listings_frame[['development_name', 'area_name_en', 'area_name_dropdown']]
    # dubai_developments_frame = dubai_developments_frame[dubai_developments_frame['development_name'] != ""]
    # dubai_developments_frame = dubai_developments_frame.drop_duplicates(subset=['development_name'], keep='first')

    # dubai_active_listings_frame = all_listings_frame.loc[(all_listings_frame["city_name"] == "Dubai") & (all_listings_frame['for_sale'] == True)]
    dubai_active_listings_frame = all_listings_frame.loc[(all_listings_frame["city_name"] == "Dubai")]
    # dubai_active_listings_frame = dubai_active_listings_frame[dubai_active_listings_frame['area_name_en'] != ""]

    dubai_active_listings_frame.rename(columns={'photo_urls': 'photo_url_list'}, inplace=True)
    dubai_active_listings_frame['photo_url_list'] = dubai_active_listings_frame['photo_url_list'].apply(convert_to_json)
        
    print(dubai_active_listings_frame.columns)
    new_dubai_active_listings_frame = clean_and_transform_dataframe_v2(dubai_active_listings_frame)
        
    unique_listing_sources = new_dubai_active_listings_frame['listing_source'].unique()
    # Printing the unique values
    print("unique_listing_sources in new_dubai_active_listings_frame:")
    print(unique_listing_sources)
    
    new_dubai_active_listings_frame['dubailand_listing_validation_url_cleaned'] = new_dubai_active_listings_frame['dubailand_listing_validation_url'].str.extract(r'=(.*)$')
    
    print('data before creating csv')
    print(new_dubai_active_listings_frame.info())  
    duplicate_rows = new_dubai_active_listings_frame[new_dubai_active_listings_frame.duplicated(subset=['listing_url'])]
    # Count duplicate rows
    duplicate_rows_count = duplicate_rows.shape[0]
    # Display results
    print(f"Number of Duplicate Rows: {duplicate_rows_count}") 
    total_listings = len(new_dubai_active_listings_frame)
    unique_listings = new_dubai_active_listings_frame['listing_url'].nunique()
    # Calculate the number of duplicate listings
    duplicate_listings = total_listings - unique_listings
    print(total_listings,unique_listings,duplicate_listings) 
    
    # new
    if (marketplace):
        csv_buffer = StringIO()
        new_dubai_active_listings_frame.to_csv(csv_buffer,index=False)
        
        s3.Object(DLD_OUTPUT_BUCKET,
        f'dld-non-compliant-listings-and-sm-ads/non-compliant-listings/active-listings/{csv_folder_name}/dubai_active_listings_{csv_folder_name}_{marketplace}.csv').put(Body=csv_buffer.getvalue())
        
    else:      
        csv_buffer = StringIO()
        new_dubai_active_listings_frame.to_csv(csv_buffer,index=False)
            
        s3.Object(DLD_OUTPUT_BUCKET,
        f'dld-non-compliant-listings-and-sm-ads/non-compliant-listings/active-listings/{csv_folder_name}/dubai_active_listings_{csv_folder_name}.csv').put(Body=csv_buffer.getvalue())   
        
    # old
    if (marketplace):
        csv_buffer = StringIO()
        new_dubai_active_listings_frame.to_csv(csv_buffer,index=False)
        
        s3.Object(LISTINGS_OUTPUT_BUCKET_NAME,
        f'testing-output/{csv_folder_name}/dubai_active_listings_{csv_folder_name}_{marketplace}.csv').put(Body=csv_buffer.getvalue())
        
    else:      
        csv_buffer = StringIO()
        new_dubai_active_listings_frame.to_csv(csv_buffer,index=False)
            
        s3.Object(LISTINGS_OUTPUT_BUCKET_NAME,
        f'testing-output/{csv_folder_name}/dubai_active_listings_{csv_folder_name}.csv').put(Body=csv_buffer.getvalue()) 
    
    
    
    if (marketplace):
        weekly_listings_s3 = pd.read_csv(
        f's3://{DLD_OUTPUT_BUCKET}/dld-non-compliant-listings-and-sm-ads/non-compliant-listings/active-listings/{csv_folder_name}/dubai_active_listings_{csv_folder_name}_{marketplace}.csv',
        header=0,
        storage_options={"key": ACCESS_ID, "secret": ACCESS_KEY}
        ) 
    else:
        weekly_listings_s3 = pd.read_csv(
            f's3://{DLD_OUTPUT_BUCKET}/dld-non-compliant-listings-and-sm-ads/non-compliant-listings/active-listings/{csv_folder_name}/dubai_active_listings_{csv_folder_name}.csv',
            header=0,
            storage_options={"key": ACCESS_ID, "secret": ACCESS_KEY}
        )


    print('weekly_listings_s3')
    print(weekly_listings_s3.info())
    print(weekly_listings_s3.empty)

    if not weekly_listings_s3.empty:
        logging.info('Adding Active Listing to RDS Started')
        clean_table_data('dubai_active_listings',marketplace)        
        weekly_listings_s3['scraped_date'] = scraped_date
        insert_dataframe_to_postgresql(weekly_listings_s3, 'dubai_active_listings', if_exists="append")        
        logging.info('Adding Active Listing to RDS Finished')        
    else:
        print("No data to insert")


In [None]:

def get_source_column_name_lists_dicts_by_date(date):
    source_column_name_lists_dict = {
        "bayut": None,
        "propertyfinder": None,
        "zoom": None,
        "dubizzle":None
    }

    if (date in ["2023-06-28","2023-06-26","2023-06-19","2023-06-12","2023-06-07","2023-06-05","2023-05-29","2023-05-22"]):
        source_column_name_lists_dict["zoom"] = ['url', 'website', 'country', 'city', 'listing_name', 'price', 'price_currency', 'property_type', 'property_sub_type', 'area',
                                                'latitude', 'longitude', 'description', 'for_sale', 'for_rent', 'built_area', 'built_area_units', 'bathroom_count', 'room_count',
                                                'amenities', 'date_listed', 'email', 'phone', 'reference_number', 'building_name','building_id','is_verified']

        source_column_name_lists_dict["propertyfinder"] = ['url', 'website', 'country', 'city', 'listing_name', 'price', 'price_currency', 'property_type',
                                                        'property_sub_type', 'address', 'area', 'latitude', 'longitude', 'description', 'for_sale', 'for_rent', 'built_area',
                                                        'built_area_units', 'bathroom_count', 'room_count', 'amenities', 'date_listed', 'agent_id', 'agent_call', 'agent_name',
                                                        'email', 'broker_id', 'broker_name', 'reference', 'completion_status', 'building_name','dubailand_listing_validation_url','is_verified']

        source_column_name_lists_dict["bayut"] = ['url', 'website', 'country', 'city', 'listing_name', 'price', 'price_currency', 'property_type', 'property_sub_type', 'address',
                                                'area', 'latitude', 'longitude', 'description', 'for_sale', 'for_rent', 'plot_area', 'plot_area_units', 'bathroom_count',
                                                'room_count', 'amenities', 'date_listed', 'bayut_reference_number', 'completion_status','building_name', 'realtor_name', 'realtor_email',
                                                'realtor_cell_phone', 'realtor_phone', 'realtor_permit_number', 'agency_name','dubailand_listing_validation_url','trucheck_verified']
        
    elif (date in ["2023-07-04","2023-07-11","2023-07-17","2023-07-24","2023-07-31"]):
        source_column_name_lists_dict["zoom"] = ['url', 'website', 'country', 'city', 'listing_name', 'price', 'price_currency', 'property_type', 'property_sub_type', 'area',
                                                'latitude', 'longitude', 'description', 'for_sale', 'for_rent', 'built_area', 'built_area_units', 'bathroom_count', 'room_count',
                                                'amenities', 'date_listed', 'email', 'phone', 'reference_number', 'agency_id', 'agency_name', 'agent_id', 'agent_name','building_name','building_id','is_verified']


        source_column_name_lists_dict["propertyfinder"] = ['url', 'website', 'country', 'city', 'listing_name', 'price', 'price_currency', 'property_type',
                                                        'property_sub_type', 'address', 'area', 'latitude', 'longitude', 'description', 'for_sale', 'for_rent', 'built_area',
                                                        'built_area_units', 'bathroom_count', 'room_count', 'amenities', 'date_listed', 'agent_id', 'agent_call', 'agent_name',
                                                        'email', 'broker_id', 'broker_name', 'reference', 'completion_status', 'building_name','dubailand_listing_validation_url','is_verified']

        source_column_name_lists_dict["bayut"] = ['url', 'website', 'country', 'city', 'listing_name', 'price', 'price_currency', 'property_type', 'property_sub_type', 'address',
                                                'area', 'latitude', 'longitude', 'description', 'for_sale', 'for_rent', 'plot_area', 'plot_area_units', 'bathroom_count',
                                                'room_count', 'amenities', 'date_listed', 'bayut_reference_number', 'completion_status','building_name', 'realtor_name', 'realtor_email',
                                                'realtor_cell_phone', 'realtor_phone', 'realtor_permit_number', 'agency_name','dubailand_listing_validation_url','trucheck_verified']
    elif (date in ["2023-08-07"]):
        source_column_name_lists_dict["zoom"] = ['url', 'website', 'country', 'city', 'listing_name', 'price', 'price_currency', 'property_type', 'property_sub_type',
                                    'area', 'latitude', 'longitude', 'description', 'for_sale', 'for_rent', 'built_area', 'built_area_units',
                                    'bathroom_count', 'room_count', 'amenities', 'date_listed', 'agent_email', 'agent_phone', 'zoomproperty_reference_number',
                                    'agency_id', 'agency_name', 'agent_id', "agent_name",'building_name','building_id','is_verified']

        source_column_name_lists_dict["propertyfinder"] = ['url', 'website', 'country', 'city', 'listing_name', 'price', 'price_currency', 'property_type',
                                                'property_sub_type', 'address', 'area', 'latitude', 'longitude', 'description', 'for_sale', 'for_rent',
                                                'built_area', 'built_area_units', 'bathroom_count', 'room_count', 'amenities', 'date_listed', 'agent_id',
                                                'agent_phone', 'agent_name', 'agent_email', 'propertyfinder_reference_number',
                                                'completion_status', 'building_name', 'is_furnished','dubailand_listing_validation_url','is_verified']


        source_column_name_lists_dict["bayut"] = ['url', 'website', 'country', 'city', 'listing_name', 'price', 'price_currency', 'property_type', 'property_sub_type',
                                    'address', 'area', 'latitude', 'longitude', 'description', 'for_sale', 'for_rent', 'plot_area', 'plot_area_units',
                                    'bathroom_count', 'room_count', 'amenities', 'date_listed', 'bayut_reference_number', 'completion_status','building_name', 'agent_name',
                                    'agent_email', 'agent_cell_phone', 'agent_phone', 'trakheesi_permit_number', 'agency_name', 'furnishing_status','dubailand_listing_validation_url','trucheck_verified']
    else:
        #latest
        source_column_name_lists_dict["zoom"] = ['url', 'website', 'country', 'city', 'listing_name', 'price', 'price_currency', 'property_type', 'property_sub_type',
                                    'area', 'latitude', 'longitude', 'description', 'for_sale', 'for_rent', 'built_area', 'built_area_units',
                                    'bathroom_count', 'room_count', 'amenities', 'date_listed', 'agent_email', 'agent_phone', 'zoomproperty_reference_number','agent_whatsapp',
                                    'agency_id', 'agency_name', 'agent_id', 'agent_name','building_name','trakheesi_permit_number','agency_orn','agent_orn','building_id','is_verified']

        source_column_name_lists_dict["propertyfinder"] = ['url', 'website', 'country', 'city', 'listing_name', 'price', 'price_currency', 'property_type',
                                                'property_sub_type', 'address', 'area', 'latitude', 'longitude', 'description', 'for_sale', 'for_rent',
                                                'built_area', 'built_area_units', 'bathroom_count', 'room_count', 'amenities', 'date_listed', 'agent_id',
                                                'agent_phone', 'agent_name', 'agent_email', 'agency_id', 'agency_name', 'propertyfinder_reference_number','agent_whatsapp',
                                                'completion_status', 'building_name', 'furnishing_status','agent_brn_number', 'trakheesi_permit_number','agency_orn_number','dubailand_listing_validation_url','is_verified']


        source_column_name_lists_dict["bayut"] = ['url','city', 'listing_name', 'price', 'price_currency', 'property_type', 'property_sub_type',
                                    'address', 'area', 'latitude', 'longitude', 'description', 'for_sale', 'for_rent', 'plot_area', 'plot_area_units',
                                    'bathroom_count', 'room_count', 'amenities', 'date_listed', 'bayut_reference_number', 'completion_status','building_name', 'agent_name',
                                    'agent_email', 'agent_cell_phone', 'agent_phone', 'trakheesi_permit_number', 'agency_name', 'furnishing_status','community_name', 'agent_whatsapp',
                                    'agent_brn_number','dubailand_listing_validation_url','rera_trakheesi_listing_permit',
                                    "building_built_date", "building_floor_count", "building_builtup_area_sqm", "developer_project_name", "listing_floor_number", "trakheesi_start", "trakheesi_end", "is_qr_code_link_available", "regulatory_status", "is_permit_delisted", "orn_license", "ded_license", "rera_license", "service_charge_sqft",'trucheck_verified']

        source_column_name_lists_dict["dubizzle"] =  [
                                        'url', 'country', 'city', 'region', 'address', 'property_category', 
                                        'agency_id', 'agency_name', 'agent_name', 'agent_email',
                                        'agent_id', 'agent_phone', 'agent_brn', 'listing_price', 'listing_price_currency',
                                        'listing_size', 'listing_bathrooms', 'listing_bedrooms', 'amenities', 'lat', 'lon',
                                        'listing_description', 'listing_name', 'trakheesi_permit_number',
                                        'property_sub_category', 'property_furnished', 'property_completion_status', 'for_sale',
                                        'for_rent', 'property_updated_at', 'property_listing_id', 'dubailand_listing_validation_url',
                                        'photos_main_urls', 'building_name','rera_registration_number'
                                    ]

    return source_column_name_lists_dict

# data processing function section 
def create_photo_url_objs(urls):
    url_objs =[]
    if type(urls)==float:
        return url_objs
    for i in range(len(urls)):
        if i<5:
            url_objs.append({
                "url": urls[i],
                "propai_selected": True
            })
        else:
            url_objs.append({
                "url": urls[i],
                "propai_selected": False
            })
    return url_objs

def clean_address(address):
    if isinstance(address, str):
        return address.upper().strip().replace(r'\s{2,}', ' ').replace(',', '')
    else:
        # Return a default value or handle the non-string input appropriately
        return ''
 
def merge_columns(row):
    if row['master_project_en']!="" and row['project_name_en']!="":
        return f"{row['master_project_en']}, {row['project_name_en']}"
    elif row['master_project_en']!="":
        return row['master_project_en']
    else:
        return row['project_name_en']

def calculate_price_per_sqm(asking_price, total_area_sqft):
  if total_area_sqft == 0:
    return np.nan
  else:
    return asking_price / total_area_sqft

def clean_and_transform_dataframe_v2(df):
    # Text Columns: Remove leading/trailing spaces and replace NaN with ''
    # Ensure that columns are of string type before applying string methods
    text_columns = ['listing_url', 'listing_source', 'country_name', 'city_name', 'property_name', 
                    'asking_price_currency', 'property_category', 'property_type',
                     'property_address', 
                    'city_region', 'listing_id', 'completion_status', 'list_agent_full_name', 
                    'list_agent_email', 'list_office_name',
                    'agent_whatsapp','description','trakheesi_permit_number', 'trakheesi_start', 'trakheesi_end','date_listed', 'date_scraped',
                    'property_name_clean','master_project_en','address',
                    'Property Type','developer_project_name','regulatory_status','orn_license','ded_license','rera_license','dubailand_listing_validation_url','building_built_date',
                    'project_name_prop_ai','master_project_prop_ai','area_name_prop_ai','development_name_prop_ai',
                    "dld_agent_full_name", "dld_broker_office_name", "dld_agent_email", "dld_agent_mobile", "dld_agent_landline"]
    for col in text_columns:
        if col not in df.columns:
            df[col] = ''  # Add the missing column with empty strings
        df[col] = df[col].astype(str).str.strip().fillna('')

    # Numeric Columns: Convert to numeric and replace NaN with 0
    numeric_columns = ['asking_price', 'map_coordinates_latitude', 'map_coordinates_longitude', 
                       'bathrooms_total', 'rooms_total', 'list_agent_mobile_phone', 'total_area_sqm', 
                       'total_area_sqft','building_floor_count','building_builtup_area_sqm','listing_floor_number','service_charge_sqft','list_agent_direct_phone',
                       "Number of Bedrooms","Area in SQM","Price per SQM"
                       ]
    # Ensure all numeric columns are present
    for col in numeric_columns:
        if col not in df.columns:
            df[col] = 0  # Add the missing column with 0s
        df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)

    # Boolean Columns: Ensure boolean format
    boolean_columns = ['for_sale', 'for_rent','is_verified','furnished_yn','is_qr_code_link_available','is_permit_delisted']
    for col in boolean_columns:
        if col not in df.columns:
            df[col] = False  # Add the missing column with False
        df[col] = df[col].astype(bool)

    datetime_columns = []

    for col in datetime_columns:
        # Replace empty strings, None, and '0' values with '1/1/1970'
        df[col] = df[col].replace(['', '0', None], '01/01/1970')
        
        # Convert string to datetime object
        df[col] = pd.to_datetime(df[col], errors='coerce')
        
        # Format datetime object as string in 'YYYY-MM-DD' format
        df[col] = df[col].dt.strftime('%Y-%m-%d')


    df=df.fillna(0)
    return df

def fetch_data_ordered_by_date(connection, table_name):
    cursor = connection.cursor()
    fetch_query = f"SELECT * FROM {table_name} ORDER BY date_scraped DESC;"
    try:
        cursor.execute(fetch_query)
        rows = cursor.fetchall()
        return rows
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        cursor.close()

def combine_photo_urls(row):
    url = row['photo_url']
    urls = row['photo_urls']

    if url in [None, '[]'] and urls in [None, '[]']:
        return None
    elif url in [None, '[]']:
        return urls
    elif urls in [None, '[]']:
        return url
    else:
        return f"{url}, {urls}"  # Concatenating URLs
    
def convert_to_json(data):
    if isinstance(data, (dict, list)):
        return json.dumps(data)
    return data

def map_values(value):
    if value == 1:
        return True
    else:
        return False   


# database functions section 
def get_postgres_connection():
    return psycopg2.connect(
        host=DB_HOST,
        port=DB_PORT,
        dbname=DB_NAME,
        user=DB_USERNAME,
        password=DB_PASSWORD
    )


In [None]:

def create_table_if_not_exists(connection, table_name):
    cursor = connection.cursor()
    create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
    listing_url TEXT PRIMARY KEY,
    date_scraped VARCHAR,   
    listing_source VARCHAR,
    country_name VARCHAR,
    city_name VARCHAR,
    property_name TEXT,
    asking_price NUMERIC,
    asking_price_currency VARCHAR,
    property_category VARCHAR,
    property_type VARCHAR,
    property_address TEXT,
    photo_url_list TEXT,
    description TEXT,
    city_region VARCHAR,
    map_coordinates_latitude NUMERIC,
    map_coordinates_longitude NUMERIC,
    for_sale BOOLEAN,
    for_rent BOOLEAN,
    bathrooms_total NUMERIC,
    rooms_total NUMERIC,
    amenities_text TEXT,
    date_listed VARCHAR,
    listing_id VARCHAR,
    completion_status VARCHAR,
    list_agent_full_name VARCHAR,
    list_agent_email VARCHAR,
    agent_whatsapp VARCHAR,
    list_agent_mobile_phone NUMERIC,
    list_agent_direct_phone NUMERIC,
    list_office_name VARCHAR,
    furnished_info VARCHAR,
    community_name VARCHAR,
    agent_brn_number VARCHAR,
    trakheesi_permit_number VARCHAR,
    is_verified BOOLEAN,
    total_area_sqm NUMERIC,
    total_area_sqft NUMERIC,
    furnished_yn BOOLEAN,
    list_agent_mls_id VARCHAR,
    list_office_mls_id VARCHAR,
    agency_name VARCHAR,
    building_name VARCHAR,
    property_address_clean VARCHAR,
    agency_orn VARCHAR,
    address TEXT,
    project_name_en VARCHAR,
    master_project_en TEXT,
    area_name_en VARCHAR,
    development_name VARCHAR,
    "Property Type" VARCHAR,
    "Number of Bedrooms" NUMERIC,
    "Development Status" VARCHAR,
    "Area in SQM" NUMERIC,
    "Price per SQM" NUMERIC,
    property_name_clean TEXT,
    building_built_date VARCHAR,
    building_floor_count NUMERIC,
    building_builtup_area_sqm NUMERIC,
    developer_project_name VARCHAR,
    listing_floor_number NUMERIC,
    trakheesi_start VARCHAR,
    trakheesi_end VARCHAR,
    is_qr_code_link_available BOOLEAN,
    regulatory_status VARCHAR,
    is_permit_delisted BOOLEAN,
    orn_license VARCHAR,
    ded_license VARCHAR,
    rera_license VARCHAR,
    service_charge_sqft NUMERIC,
    dubailand_listing_validation_url VARCHAR,
    project_name_prop_ai VARCHAR,
    master_project_prop_ai VARCHAR,
    area_name_prop_ai VARCHAR,
    development_name_prop_ai VARCHAR,
    dld_agent_full_name VARCHAR(255),
    dld_broker_office_name VARCHAR(255),
    dld_agent_email VARCHAR(255),
    dld_agent_mobile VARCHAR(20),
    dld_agent_landline VARCHAR(20)
    ); """
    try:
        cursor.execute(create_table_query)
        connection.commit()
    except Exception as e:
        connection.rollback()
        raise e
    finally:
        cursor.close()

def insert_into_postgresql_unique_listing_url(data, table_name):
    if not data or not isinstance(data, list):
        raise ValueError("Data is empty or not in the expected format (list)")

    conn = get_postgres_connection()
    create_table_if_not_exists(conn, table_name)
    cursor = conn.cursor()


    columns = data[0].keys()
    quoted_columns = [quote_col(col) for col in columns]
    placeholders = ', '.join(['%s'] * len(quoted_columns))

    insert_query = f"INSERT INTO {table_name} ({','.join(quoted_columns)}) VALUES ({placeholders}) "

    # Modify the ON CONFLICT action
    conflict_action = "ON CONFLICT (listing_url) DO UPDATE SET "
    update_columns = ', '.join([f"{quote_col(col)} = EXCLUDED.{quote_col(col)}" 
                                for col in columns 
                                if col != 'listing_url'])
    query = insert_query + conflict_action + update_columns

    values = [tuple(item[col] for col in columns) for item in data]

    try:
        execute_batch(cursor, query, values)
        conn.commit()
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        cursor.close()
        conn.close()

def insert_dataframe_to_postgresql(df, table_name, if_exists="append"):
    try:
        engine = create_engine(DB_URI)
        df.to_sql(table_name, engine, if_exists=if_exists, index=False)
    except Exception as e:
        logging.error(f"Error while inserting dataframe to PostgreSQL: {e}")

def quote_col(col):    
    # Function to quote column names if they contain spaces or special characters
    if " " in col or '"' in col:
        return f'"{col}"'
    return col

def clean_table_data(table_name, marketplace):
    try:
        conn = get_postgres_connection()
        cur = conn.cursor()
        if(marketplace):
            cur.execute(
                f'DELETE FROM {table_name} WHERE "listing_source" = %s AND "scraped_date" = %s;',
                (marketplace, scraped_date)
            )
            conn.commit()  # Commit the changes to make sure records are deleted
            print(f"Records matching {scraped_date} & {marketplace}  have been deleted from {table_name}.")
        else:
            cur.execute(
                f'DELETE FROM {table_name} WHERE "scraped_date" = %s;', (scraped_date,)
            )
            conn.commit()  # Commit the changes to make sure records are deleted
            print(f"Records matching {scraped_date}  have been deleted from {table_name}.") 
    except Exception as e:
        logging.error(f"Error while clearing data from {table_name}: {e}")
    finally:
        if cur:
            cur.close()
        if conn:
            conn.close()