In [3]:
import boto3
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import datetime
import pandas as pd
pd.set_option('display.max_columns', None)
import statistics
import requests
from bs4 import BeautifulSoup

In [4]:
with open("AWS_credentials.txt", "r") as creds:
    creds_list = json.loads(creds.read())
s3 = boto3.resource('s3', aws_access_key_id=creds_list[0], aws_secret_access_key=creds_list[1])
my_bucket = s3.Bucket('hadis-s3-project-bucket')
for object_summary in my_bucket.objects.filter(Prefix="Real-Estate/input/"):
    print(object_summary.key)

Real-Estate/input/
Real-Estate/input/2024-04-19_6901.parquet.gzip
Real-Estate/input/2024-04-30_6901.parquet.gzip


In [None]:
my_bucket.download_file("Real-Estate/input/2024-04-30_6901.parquet.gzip", "2024-04-30_6901.parquet.gzip")

In [5]:
my_spark = SparkSession.builder.getOrCreate()
print(my_spark.catalog.listTables())

[]


In [8]:
path = '2024-04-30_6901.parquet.gzip'
df = my_spark.read.format('parquet').options(header=True,inferSchema=True).load(path)
unreq_cols = ["productLabel","rentFrequency","projectNumber","title_l1","title_l2","title_l3","coverPhoto","photoCount","videoCount","panoramaCount","randBoostScore","randBoostScore_l1",
              "randBoostScore_l2","randBoostScore_l3","floorPlanID","amenities_l1","amenities_l2","amenities_l3","indyScore","indyScore_l1","indyScore_l2","indyScore_l3","photoIDs",
              "coverVideo","slug_l1","slug_l2","slug_l3","category","_highlightResult","id","ownerID","userExternalID","sourceID","state","_geoloc","purpose","product","productScore","referenceNumber",
              "permitNumber","title","externalID","slug","score","score_l1","score_l2","score_l3","hash","isVerified","verification","veridiedScore","type","ownerAgent","amenities","citylevelScore",
              "hasMatchingFloorPlans","hidePrice","locationPurposeTier","plotArea","objectID","project","offplanDetails","completionDetails","paymentPlanSummaries","hasTransactionHistory","hasDldBuildingNk",
              "availabilityStatus","occupancyStatus","keywords","verifiedScore"]
x = datetime.datetime.timestamp(datetime.datetime.now() - datetime.timedelta(days=14))
df = df.drop(*unreq_cols).filter((df.createdAt > x) | (df.updatedAt > x))

df = df.withColumn("createdAt", to_timestamp(col("createdAt").cast("timestamp"), "yyyyMMddHHmmSS"))
df = df.withColumn("updatedAt", to_timestamp(col("updatedAt").cast("timestamp"), "yyyyMMddHHmmSS"))
df = df.withColumn("reactivatedAt", to_timestamp(col("reactivatedAt").cast("timestamp"), "yyyyMMddHHmmSS"))

df = df.withColumn("loc_size",size("location"))
df = df.withColumn("country", lower(col("location")[0]["name"]))
df = df.withColumn("city", lower(col("location")[1]["name"]))
df = df.withColumn("neighborhood", lower(col("location")[2]["name"]))
df = df.withColumn("neighborhood", regexp_replace(col("neighborhood"),"[\\s]","-"))
df = df.withColumn("neighborhood", regexp_replace(col("neighborhood"),"[\\.]",""))
df = df.withColumn('sub_neighborhood', lower(when(col('loc_size') == 6, col("location")[3]["name"]).otherwise(lit(None))))
df = df.withColumn("sub_neighborhood", regexp_replace(col("sub_neighborhood"),"[\\s]","-"))
df = df.withColumn("sub_neighborhood", regexp_replace(col("sub_neighborhood"),"[\\.]",""))
df = df.withColumn('building_series', lower(when(col('loc_size') >= 5, element_at(col('location'), -2)["name"]).otherwise(lit(None))))
df = df.withColumn("building_series", regexp_replace(col("building_series"),"[\\s]","-"))
df = df.withColumn("building_series", regexp_replace(col("building_series"),"[\\.]",""))
df = df.withColumn("building_name", element_at(col('location'), -1)["name"])
# df = df.withColumn("building_name", lower(element_at(col('location'), -1)["name"]))
# df = df.withColumn("building_name", regexp_replace(col("building_name"),"[\\s]","-"))
# df = df.withColumn("building_name", regexp_replace(col("building_name"),"[\\.]",""))

df = df.drop("loc_size","location")

df = df.withColumn("phone_number", col("phoneNumber")["mobile"])
df = df.withColumn("whatsapp", col("phoneNumber")["whatsapp"])
df = df.drop("phoneNumber")

df = df.withColumn("agency", col("agency")["name"])

df = df.withColumn("geography_lat",col("geography")["lat"])
df = df.withColumn("geography_lat",round(df["geography_lat"],3))
df = df.withColumn("geography_lng",col("geography")["lng"])
df = df.withColumn("geography_lng",round(df["geography_lng"],3))
df = df.drop("geography")

df = df.withColumn("area",col("area")*10.7639)

df.sort(df.updatedAt.desc()).show(10)

+---------+-------------------+-------------------+-------------------+-----+-----+------------------+--------------------+--------------------+----------------+------------------+-------+-----+--------------+----------------+---------------+--------------------+-------------+------------+-------------+-------------+
|    price|          createdAt|          updatedAt|      reactivatedAt|rooms|baths|              area|         contactName|              agency|furnishingStatus|  completionStatus|country| city|  neighborhood|sub_neighborhood|building_series|       building_name| phone_number|    whatsapp|geography_lat|geography_lng|
+---------+-------------------+-------------------+-------------------+-----+-----+------------------+--------------------+--------------------+----------------+------------------+-------+-----+--------------+----------------+---------------+--------------------+-------------+------------+-------------+-------------+
|3929989.0|2024-04-26 11:54:27|2024-04-30 1

In [19]:
def add_listing(units_in_building):
    units_in_building = units_in_building.reset_index(drop=True)
    listings_cols = ["price","contactName","agency","phone_number","whatsapp","createdAt","updatedAt","reactivatedAt"]
    listings = {}
    for index, unit_in_building in units_in_building.iterrows():
        listing = {}
        for listing_col in listings_cols:
            listing[listing_col] = unit_in_building[listing_col]
        listings["listing_"+str(index+1)] = listing
    return listings


In [33]:
pdf = df.select("*").toPandas()
pdf.sort_values("building_name",inplace = True, ignore_index=True)

pdf_new = pd.DataFrame(columns=pdf.columns)
pdf_new.rename(columns = {'price':'price_avg', 'area':'area_avg'}, inplace = True)
shared_cols = ["building_name","building_series","sub_neighborhood","neighborhood","furnishingStatus","completionStatus","rooms","baths","geography_lat","geography_lng","city","country"]
pdf_new = pdf_new.drop(columns=["createdAt","updatedAt","reactivatedAt","contactName","agency","phone_number","whatsapp"])
pdf_new["listings"] = ""
pdf_new["num_listings"] = ""

for units_in_building in pdf.groupby(["building_name"]):
    units_in_building = units_in_building[1].reset_index(drop=True)

    units_range = list(range(len(units_in_building)))
    for i in units_range:
        similar_listings = [i]
        for j in range(i+1,len(units_in_building)):
            if (((units_in_building.iloc[j]["price"] <= (units_in_building.iloc[i]["price"]+200000)) and (units_in_building.iloc[j]["price"] >= (units_in_building.iloc[i]["price"]-200000))) and
                ((units_in_building.iloc[j]["area"] <= (units_in_building.iloc[i]["area"]+10)) and (units_in_building.iloc[j]["area"] >= (units_in_building.iloc[i]["area"]-10))) and
                (units_in_building.iloc[j]["furnishingStatus"] == units_in_building.iloc[i]["furnishingStatus"]) and
                (units_in_building.iloc[j]["baths"] == units_in_building.iloc[i]["baths"])):
                similar_listings.append(j)
                units_range.remove(j)

        # new_row = pd.DataFrame()
        new_row =  units_in_building.iloc[i:i+1][shared_cols]
        new_row["price_avg"] = statistics.mean(units_in_building.iloc[similar_listings]["price"])
        new_row["area_avg"] = statistics.mean(units_in_building.iloc[similar_listings]["area"])
        listings = add_listing(units_in_building.iloc[similar_listings])
        new_row["listings"] = [listings]
        new_row["num_listings"] = len(listings)
        
        pdf_new = pd.concat([pdf_new,new_row],ignore_index=True)


pdf_new.head()

  pdf_new = pd.concat([pdf_new,new_row],ignore_index=True)


Unnamed: 0,price_avg,rooms,baths,area_avg,furnishingStatus,completionStatus,country,city,neighborhood,sub_neighborhood,building_series,building_name,geography_lat,geography_lng,listings,num_listings
0,3500000.0,2,3,1429.998616,unfurnished,under-construction,uae,dubai,downtown-dubai,,,25h-heimat,25.191,55.27,"{'listing_1': {'price': 3500000.0, 'contactNam...",1
1,3929989.0,2,3,1389.998655,unfurnished,under-construction,uae,dubai,downtown-dubai,,,25h-heimat,25.191,55.27,"{'listing_1': {'price': 3929989.0, 'contactNam...",1
2,3500000.0,2,2,1349.998694,furnished,completed,uae,dubai,downtown-dubai,,29-boulevard,29-boulevard-1,25.191,55.273,"{'listing_1': {'price': 3500000.0, 'contactNam...",1
3,3200000.0,2,2,1438.998607,unfurnished,completed,uae,dubai,downtown-dubai,,29-boulevard,29-boulevard-2,25.192,55.272,"{'listing_1': {'price': 3200000.0, 'contactNam...",1
4,3055000.0,2,3,1404.99864,unfurnished,completed,uae,dubai,downtown-dubai,,29-boulevard,29-boulevard-2,25.192,55.272,"{'listing_1': {'price': 3055000.0, 'contactNam...",1


In [49]:
def get_url(row):
    url = url = "https://www.bayut.com/property-market-analysis/sale/"
    for url_item in row[["city","neighborhood","sub_neighborhood","building_series","building_name"]]:
        if url_item is not None:
            url = url + url_item + "/"
    area = row["area_avg"]*0.092903
    if area >= 92.949 and area <= 185.85:
        url = url +"?area_min=92.949&area_max=185.85&beds="+str(row["rooms"])
    elif area <= 92.949:
        url = url +"?area_max=92.948&beds="+str(row["rooms"])
    elif area >= 185.851 and area <= 278.752:
        url = url +"?area_min=185.851&area_max=278.752&beds="+str(row["rooms"])
    elif area >= 278.753 and area <= 371.655:
        url = url +"?area_min=278.753&area_max=371.655&beds="+str(row["rooms"])
    elif area >= 371.656 and area <= 464.557:
        url = url +"?area_min=371.656&area_max=464.557&beds="+str(row["rooms"])
    elif area >= 464.558 and area <= 696.813:
        url = url +"?area_min=464.558&area_max=696.813&beds="+str(row["rooms"])
    elif area >= 696.814 and area <= 929.068:
        url = url +"?area_min=696.814&area_max=929.068&beds="+str(row["rooms"])
    elif area >= 929.069:
        url = url +"?area_min=929.069&beds="+str(row["rooms"])
    return url

In [48]:
for index, row in pdf_new.iterrows():      
    response = requests.get(get_url(row))
    break

In [75]:
t = pdf_new[pdf_new["building_name"]=="boulevard-point"].iloc[0]
response = requests.get(get_url(t)).text
soup = BeautifulSoup(response, 'html.parser')

In [151]:
find = soup.find_all('ul', attrs={"role":True})


In [152]:
find

[]