# 1. Spotahome Data Cleaning

In [0]:
import pandas as pd
import numpy as np
import dateparser

In [0]:
London = spark.table("silver.edgar_sarto_revenue.spotahome_london")
Milan = spark.table("silver.edgar_sarto_revenue.spotahome_milan")
Rome = spark.table("silver.edgar_sarto_revenue.spotahome_rome")

London = London.toPandas()
Milan = Milan.toPandas()
Rome = Rome.toPandas()

London['City'] = 'LONDON'
Milan['City'] = 'MILAN'
Rome['City'] = 'ROME'

London['Size_Metric'] = 'ft'
Milan['Size_Metric'] = 'm2'
Rome['Size_Metric'] = 'm2'

## 1.1. City / Brough / Zone (Postal Code) / Agency

In [0]:
dataframes = [London, Milan, Rome]

SPH = pd.concat(dataframes, ignore_index=True)

SPH['City'].value_counts()

MILAN     2788
LONDON    1971
ROME      1507
Name: City, dtype: int64

In [0]:
keywords = [
    'Flat', 'Apartment', 'Penthouse', 'Suite', 'Condominium', 
    'Villa', 'Apartamento', 'Piso', 'Condominio', 'Chalet', 
    'Casa', 'Casa rural', 'Casa en condominio'
]

def delete_values(row):
    if pd.notna(row['Text15']) and any(keyword in row['Text15'] for keyword in keywords):
        row['Text15'] = None
    return row

SPH = SPH.apply(delete_values, axis=1)

SPH = SPH.rename(columns={'Text15':'Street'})

In [0]:
# Cleaning Type, Beds, Baths & Size

SPH[['Type', 'Bedrooms', 'Bedrooms_1', 'Bathrooms', 'Bathrooms_1', 'Size', 'Size_1']] = SPH['Text1'].str.split('\n', expand=True)

baths = {'bathrooms':'0', 'Baños': '0'}

SPH['Bathrooms'].replace(baths, regex=True, inplace=True)

def shift_values(row):
    if pd.notna(row['Bathrooms']):
        try:
            if float(row['Bathrooms']) >= 10:
                row['Bathrooms'], row['Size'] = 0, row['Bathrooms']
        except ValueError:
            pass
    return row

SPH = SPH.apply(shift_values, axis=1)

SPH['Size'].replace('m2', None, inplace=True)

SPH = SPH.drop(columns=['Text1', 'Bedrooms_1', 'Bathrooms_1', 'Size_1'])

In [0]:
# CLEANING PRICE

SPH[['Price_1', 'Monthly_Price']] = SPH['Text2'].str.split('\n\n', expand=True)

SPH = SPH.drop(columns=['Text2', 'Price_1'])

SPH['Currency'] = None

def shift_values(row):
    if pd.notna(row['Monthly_Price']) and ('€' in row['Monthly_Price']):
        row['Currency'] = 'EUR'
    if pd.notna(row['Monthly_Price']) and ('£' in row['Monthly_Price']):
        row['Currency'] = 'GBP'
    return row

SPH = SPH.apply(shift_values, axis=1)

SPH['Max_Price'] = None
SPH['Min_Price'] = SPH['Monthly_Price']

def shift_values(row):
    if pd.notna(row['Monthly_Price']) and '-' in row['Monthly_Price']:
        min_price, max_price = row['Monthly_Price'].split('-')
        row['Min_Price'] = min_price.strip()
        row['Max_Price'] = max_price.strip()
    elif pd.notna(row['Monthly_Price']):
        row['Max_Price'] = row['Monthly_Price'].strip()
    return row

SPH = SPH.apply(shift_values, axis=1)

currency = {' €':'', '£': ''}

SPH['Max_Price'].replace(currency, regex=True, inplace=True)
SPH['Min_Price'].replace(currency, regex=True, inplace=True)

SPH['Max_Price'] = SPH['Max_Price'].fillna(0).astype(int)
SPH['Min_Price'] = SPH['Min_Price'].fillna(0).astype(int)


In [0]:
# CLEANING DEPOSIT

SPH[['Deposit_1', 'Deposit']] = SPH['Text9'].str.split('\n\n', expand=True)

SPH = SPH.drop(columns=['Text9', 'Deposit_1'])

replace_1 = {' mes de alquiler': '', " month's rent": '', ' meses de alquiler':'', '1/2': '0.5'}

SPH['Deposit'].replace(replace_1, regex=True, inplace=True)

currency = {'€': '', '£': ''}

def shift_values(row):
    if pd.notna(row['Deposit']):
        deposit_str = str(row['Deposit'])
        if '€' in deposit_str or '£' in deposit_str:
            for symbol in ['€', '£']:
                deposit_str = deposit_str.replace(symbol, '')
            try:
                deposit_num = float(deposit_str)
            except ValueError:
                deposit_num = 0
        else:
            try:
                deposit_num = float(deposit_str)
            except ValueError:
                deposit_num = 0
            deposit_num = row['Min_Price'] * deposit_num
        row['Deposit'] = deposit_num
    return row

SPH = SPH.apply(shift_values, axis=1)

In [0]:
# AVAILABILITY

SPH[['Available_1', 'Available']] = SPH['Text11'].str.split('\n\n', expand=True)

SPH.drop(columns=['Text11', 'Available_1'], inplace=True)

def process_dates(row):
    if pd.notna(row['Available']):
        if ('Now' in row['Available']) or ('ahora' in row['Available']):
            return pd.Timestamp.now().strftime('%Y-%m-%d')
        else:
            parser = dateparser.parse(row['Available'], languages=['es', 'en'])
            if parser:
                return parser.strftime('%Y-%m-%d')
            else:
                return pd.NaT
    return None 

SPH['Available'] = SPH.apply(process_dates, axis=1)


In [0]:
# MIN LOS

SPH[['Min_Stay_1', 'Min_Stay']] = SPH['Text12'].str.split('\n\n', expand=True)

SPH.drop(columns=['Text12', 'Min_Stay_1'], inplace=True)

SPH['Min_Stay'] = SPH['Min_Stay'].str.extract(r'(\d+)\s*(?:días?|days?)')[0]

##########################################
# MAX LOS

SPH[['Max_Stay_1', 'Max_Stay']] = SPH['Text13'].str.split('\n\n', expand=True)

SPH.loc[SPH['Max_Stay'].str.contains('xim', na=False), 'Max_Stay'] = None

SPH.loc[SPH['Max_Stay'].str.contains('xim', na=False) == False, 'Max_Stay'] = SPH['Max_Stay'].str.extract(r'(\d+)\s*(?:días?|days?)')[0]

In [0]:
#CLEANING

SPH['Cleaning'] = None

mask_unique = SPH['Text3'].str.contains('único', na=False) | SPH['Text3'].str.contains('unique', na=False)
SPH.loc[mask_unique, 'Cleaning'] = SPH.loc[mask_unique, 'Text3'].str.extract(r'(\d+)\s*€ pago único')[0]

mask_no_available = SPH['Text3'].str.contains('No', na=False) | SPH['Text3'].str.contains('Included', na=False)
SPH.loc[mask_no_available, 'Cleaning'] = 0

mask_monthly = SPH['Text3'].notna() & ~mask_unique & ~mask_no_available
SPH.loc[mask_monthly, 'Cleaning'] = SPH.loc[mask_monthly, 'Text3'].str.extract(r'(\d+)\s*(?:€/mes?|/month)')[0]

SPH['Cleaning'] = pd.to_numeric(SPH['Cleaning'], errors='coerce')

In [0]:
#CLEANING PAYMENT

SPH['Cleaning_Payment'] = None

def shift_values(row):
    if pd.notna(row['Text3']) and ('único' in row['Text3'] or 'unique' in row['Text3']):
        row['Cleaning_Payment'] = 'Unique Payment'
    elif pd.notna(row['Text3']) and ('/month' in row['Text3'] or '/mes' in row['Text3']):
        row['Cleaning_Payment'] = 'Monthly'
    elif pd.notna(row['Cleaning']) and '0' in str(row['Cleaning']):
        row['Cleaning_Payment'] = 'Included'
    return row

SPH = SPH.apply(shift_values, axis=1)

In [0]:
SPH['Utilities_Included'] = '1'

mask_no_utilities = SPH['Text5'].str.contains('No', na=False) | \
                    SPH['Text6'].str.contains('No', na=False) | \
                    SPH['Text7'].str.contains('No', na=False) | \
                    SPH['Text8'].str.contains('No', na=False)

SPH.loc[mask_no_utilities, 'Utilities_Included'] = '0'

SPH['Utilities_Included'].value_counts()

1    4337
0    1929
Name: Utilities_Included, dtype: int64

In [0]:
SPH.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6266 entries, 0 to 6265
Data columns (total 31 columns):
 #   Column              Non-Null Count  Dtype         
---  ------              --------------  -----         
 0   Current_time        6266 non-null   datetime64[ns]
 1   Page_URL            6266 non-null   object        
 2   Text                6266 non-null   object        
 3   Text10              6266 non-null   object        
 4   Text13              6266 non-null   object        
 5   Text14              6266 non-null   object        
 6   Street              5946 non-null   object        
 7   Text3               6266 non-null   object        
 8   Text4               6266 non-null   object        
 9   Text5               6266 non-null   object        
 10  Text6               6266 non-null   object        
 11  Text7               6266 non-null   object        
 12  Text8               6266 non-null   object        
 13  City                6266 non-null   object      

In [0]:
SPH['Bathrooms'].value_counts()

1    4046
0    1250
2     851
3      92
4      12
5       4
0       3
Name: Bathrooms, dtype: int64

In [0]:
SPH.drop(columns=['Text','Text10','Text13','Text14','Text3','Text4','Text5','Text6',
                  'Text7','Text8', 'Monthly_Price', 'Max_Stay_1'],inplace=True)

SPH['Bathrooms'] = SPH['Bathrooms'].fillna(0).astype(int)
SPH['Bedrooms'] = SPH['Bedrooms'].fillna(0).astype(int)
SPH['Cleaning'] = SPH['Cleaning'].fillna(0.0).astype(float)
SPH['Size'] = SPH['Size'].fillna(0.0).astype(float)
SPH['Max_Stay'] = SPH['Max_Stay'].fillna(0).astype(int)
SPH['Min_Stay'] = SPH['Min_Stay'].fillna(0).astype(int)
SPH['Utilities_Included'] = SPH['Utilities_Included'].fillna(0).astype(int)

In [0]:
SPH.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6266 entries, 0 to 6265
Data columns (total 19 columns):
 #   Column              Non-Null Count  Dtype         
---  ------              --------------  -----         
 0   Current_time        6266 non-null   datetime64[ns]
 1   Page_URL            6266 non-null   object        
 2   Street              5946 non-null   object        
 3   City                6266 non-null   object        
 4   Size_Metric         6266 non-null   object        
 5   Type                6266 non-null   object        
 6   Bedrooms            6266 non-null   int64         
 7   Bathrooms           6266 non-null   int64         
 8   Size                6266 non-null   float64       
 9   Currency            6258 non-null   object        
 10  Max_Price           6266 non-null   int64         
 11  Min_Price           6266 non-null   int64         
 12  Deposit             6259 non-null   float64       
 13  Available           6034 non-null   object      

In [0]:
from geopy.geocoders import Nominatim
import time
geolocator = Nominatim(user_agent="my_custom_application_spotahome", timeout=8000)

In [0]:
data = [] 

Test_Geo = pd.DataFrame(data)

Test_Geo['Street'] = SPH['Street'].iloc[0:8000]

place = [Test_Geo['Street']]

In [0]:
from functools import lru_cache
import concurrent.futures

@lru_cache(maxsize=1000)
def geocode_address(address):
    try:
        location = geolocator.geocode(address, timeout=10)
        if location:
            return location.address
        else:
            return None
    except Exception as e:
        return None

def geocode_with_delay(address):
    result = geocode_address(address)
    time.sleep(1)
    return result

addresses = Test_Geo['Street'].tolist()

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    results = list(executor.map(geocode_with_delay, addresses))

Test_Geo['GeoAddress'] = results


com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

In [0]:
Test_Geo_5 = Test_Geo

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:457)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:733)
	at com.data

In [0]:
Test_Geo_SPH = [Test_Geo_5]

#Test_Geo_SPH = [Test_Geo_1, Test_Geo_2, Test_Geo_3, Test_Geo_4]

Test_Geo_SPH = pd.concat(Test_Geo_SPH, ignore_index=True)

In [0]:
SPH_Geo = pd.merge(SPH, Test_Geo_SPH, left_index=True, right_index=True)

In [0]:
def invertir_orden_columna(df, columna):
    df[columna] = df[columna].apply(lambda x: ", ".join(str(x).split(", ")[::-1]))
    return df

SPH_Geo = invertir_orden_columna(SPH_Geo, "GeoAddress")


In [0]:
SPH_Geo[['GeoAddress_1', 'GeoAddress_2', 'GeoAddress_3', 'GeoAddress_4', 'GeoAddress_5', 'GeoAddress_6', 'GeoAddress_7',
         'GeoAddress_8', 'GeoAddress_9', 'GeoAddress_10']] = SPH_Geo['GeoAddress'].str.split(',', n=9, expand=True)

In [0]:
def delete_unnecesary(row):
    if pd.notna(row['GeoAddress_1']) and ('Italia' not in row['GeoAddress_1'] and 'United Kingdom' not in row['GeoAddress_1']):
        row['GeoAddress_1'] = None
    return row

SPH_Geo = SPH_Geo.apply(delete_unnecesary, axis=1)

SPH_Geo['GeoAddress_1'].value_counts()

Italia            4516
United Kingdom    1251
Name: GeoAddress_1, dtype: int64

In [0]:
def delete_unnecesary(row):
    if row['GeoAddress_1'] is None:
        row['GeoAddress_2'] = None
        row['GeoAddress_3'] = None
        row['GeoAddress_4'] = None
    return row

SPH_Geo = SPH_Geo.apply(delete_unnecesary, axis=1)

In [0]:
def delete_unnecesary(row):
    if pd.notna(row['GeoAddress_1']) and ('Italia' not in row['GeoAddress_1'] and 'United Kingdom' not in row['GeoAddress_1']):
        row['GeoAddress_1'] = None
    return row

SPH_Geo = SPH_Geo.apply(delete_unnecesary, axis=1)

In [0]:
def delete_unnecesary(row):
    if pd.notna(row['GeoAddress_2']) and any(x in row['GeoAddress_2'] for x in ['Lazio', 'Lombardia', 'England']):
        row['GeoAddress_3'], row['GeoAddress_4'], row['GeoAddress_5'] = row['GeoAddress_2'],row['GeoAddress_3'], row['GeoAddress_4']
    return row

SPH_Geo = SPH_Geo.apply(delete_unnecesary, axis=1)

def delete_unnecesary(row):
    if pd.notna(row['GeoAddress_3']) and not any(x in row['GeoAddress_3'] for x in ['Lazio', 'Lombardia', 'England']):
        row['GeoAddress_3'], row['GeoAddress_4'], row['GeoAddress_5'] = None, row['GeoAddress_3'], row['GeoAddress_4']
    return row

SPH_Geo = SPH_Geo.apply(delete_unnecesary, axis=1)

In [0]:
SPH_Geo = SPH_Geo.drop(columns={'GeoAddress_4', 'GeoAddress_5', 'GeoAddress_6', 'GeoAddress_7', 'GeoAddress_8', 'GeoAddress_9', 'GeoAddress_10'})
SPH_Geo = SPH_Geo.rename(columns={'GeoAddress_1': 'Country','GeoAddress_2': 'Posal_Code', 'GeoAddress_3': 'Region'})

In [0]:
def delete_unnecesary(row):
    if pd.notna(row['Posal_Code']) and any(x in row['Posal_Code'] for x in [' Northern Ireland / Tuaisceart Éireann', ' Toscana', ' Lombardia', 
                                                                            ' England', ' Piemonte']):
        row['Posal_Code'] = None
    return row

SPH_Geo = SPH_Geo.apply(delete_unnecesary, axis=1)

In [0]:
SPH_Spark = spark.createDataFrame(SPH_Geo)

In [0]:
SPH_Spark.write.mode("overwrite").saveAsTable("silver.edgar_sarto_revenue.Spotahome_CLEAN")

In [0]:
data = ['Milano, Pagano'] 

Testing = pd.DataFrame(data)

Testing['Street'] = data

Testing

Unnamed: 0,0,Street
0,"Milano, Pagano","Milano, Pagano"


In [0]:
from functools import lru_cache
import concurrent.futures

@lru_cache(maxsize=1000)
def geocode_address(address):
    try:
        location = geolocator.geocode(address, timeout=10)
        if location:
            return location.address
        else:
            return None
    except Exception as e:
        return None

def geocode_with_delay(address):
    result = geocode_address(address)
    time.sleep(1)
    return result

addresses = Testing['Street'].tolist()

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    results = list(executor.map(geocode_with_delay, addresses))

Testing['Street_1'] = results


In [0]:
Testing['Street_1']

0    Pagano, Via Mario Pagano, Pagano, Municipio 7,...
Name: Street_1, dtype: object