Data Preparation

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .appName("CSV Import") \
    .getOrCreate()

file_path = "C:/Users/afili/Desktop/Uni/Mestrado/AASE/apartments_for_rent_classified_10K.csv"

df = spark.read.csv(file_path, header=True, inferSchema=True, sep=";")

# Especificando valores que devem ser tratados como NaN
na_values = ["NA", "null", "", " "]

from pyspark.sql.functions import col

null_count = df.filter(col('price').isNull()).count()
print(f"Número de valores nulos na coluna 'price': {null_count}")

# Remover as linhas onde 'price' é nulo
df = df.filter(col('price').isNotNull())

null_count = df.filter(col('price').isNull()).count()
print(f"Número de valores nulos na coluna 'price': {null_count}")

print(f"Número de linhas: {df.count()}") 

Número de valores nulos na coluna 'price': 0
Número de valores nulos na coluna 'price': 0
Número de linhas: 10000


Demo da tabela e Mundança de tipos de colunas

In [3]:
df.show()

+----------+--------------------+--------------------+--------------------+--------------------+---------+--------+--------+---+---------+------------+-----+-------------+----------+-----------+--------------------+-------------+-----+--------+---------+------------+----------+
|        id|            category|               title|                body|           amenities|bathrooms|bedrooms|currency|fee|has_photo|pets_allowed|price|price_display|price_type|square_feet|             address|     cityname|state|latitude|longitude|      source|      time|
+----------+--------------------+--------------------+--------------------+--------------------+---------+--------+--------+---+---------+------------+-----+-------------+----------+-----------+--------------------+-------------+-----+--------+---------+------------+----------+
|5668626895|housing/rent/apar...|Studio apartment ...|This unit is loca...|                null|     null|       0|     USD| No|Thumbnail|        None|  790|      

In [4]:
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- category: string (nullable = true)
 |-- title: string (nullable = true)
 |-- body: string (nullable = true)
 |-- amenities: string (nullable = true)
 |-- bathrooms: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- fee: string (nullable = true)
 |-- has_photo: string (nullable = true)
 |-- pets_allowed: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- price_display: string (nullable = true)
 |-- price_type: string (nullable = true)
 |-- square_feet: integer (nullable = true)
 |-- address: string (nullable = true)
 |-- cityname: string (nullable = true)
 |-- state: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- source: string (nullable = true)
 |-- time: integer (nullable = true)



In [5]:
from pyspark.sql.types import FloatType

columns_to_convert = ["bathrooms", "bedrooms", "latitude", "longitude"]  # Substitua pelos nomes das colunas

for column in columns_to_convert:
    df = df.withColumn(column, col(column).cast(FloatType()))

df.printSchema()


root
 |-- id: long (nullable = true)
 |-- category: string (nullable = true)
 |-- title: string (nullable = true)
 |-- body: string (nullable = true)
 |-- amenities: string (nullable = true)
 |-- bathrooms: float (nullable = true)
 |-- bedrooms: float (nullable = true)
 |-- currency: string (nullable = true)
 |-- fee: string (nullable = true)
 |-- has_photo: string (nullable = true)
 |-- pets_allowed: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- price_display: string (nullable = true)
 |-- price_type: string (nullable = true)
 |-- square_feet: integer (nullable = true)
 |-- address: string (nullable = true)
 |-- cityname: string (nullable = true)
 |-- state: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- source: string (nullable = true)
 |-- time: integer (nullable = true)



In [6]:
df = df.dropDuplicates(['id'])
df = df.filter(df['category'] == 'housing/rent/apartment')
df = df.filter(df['price_type'] == 'Monthly')

In [7]:
df.count()

9994

In [8]:
from pyspark.sql.functions import col, when
cities_to_keep = ['Dallas', 'Denver', 'Los Angeles', 'Las Vegas', 'Arlington', 'Atlanta', 'Charlotte']

df = df.withColumn(
    'cityname_transformed',
    when(col('cityname').isin(cities_to_keep), col('cityname')).otherwise('Other')
)

df.show()


+----------+--------------------+--------------------+--------------------+--------------------+---------+--------+--------+---+---------+------------+-----+-------------+----------+-----------+-------+-----------+-----+--------+---------+------------+----------+--------------------+
|        id|            category|               title|                body|           amenities|bathrooms|bedrooms|currency|fee|has_photo|pets_allowed|price|price_display|price_type|square_feet|address|   cityname|state|latitude|longitude|      source|      time|cityname_transformed|
+----------+--------------------+--------------------+--------------------+--------------------+---------+--------+--------+---+---------+------------+-----+-------------+----------+-----------+-------+-----------+-----+--------+---------+------------+----------+--------------------+
|5508654087|housing/rent/apar...|$1,287 / One BR -...|Square footage: 7...|Internet Access,P...|      1.0|     1.0|     USD| No|Thumbnail|       

In [9]:
from pyspark.sql.functions import when, col

df = df.withColumn(
    "pets_allowed_transformed",
    when(col("pets_allowed").isin("Cats", "Dogs", "Cats,Dogs", "Cats,Dogs,None"), "Yes").otherwise("No"))

df.show(20)


+----------+--------------------+--------------------+--------------------+--------------------+---------+--------+--------+---+---------+------------+-----+-------------+----------+-----------+-------+-----------+-----+--------+---------+------------+----------+--------------------+------------------------+
|        id|            category|               title|                body|           amenities|bathrooms|bedrooms|currency|fee|has_photo|pets_allowed|price|price_display|price_type|square_feet|address|   cityname|state|latitude|longitude|      source|      time|cityname_transformed|pets_allowed_transformed|
+----------+--------------------+--------------------+--------------------+--------------------+---------+--------+--------+---+---------+------------+-----+-------------+----------+-----------+-------+-----------+-----+--------+---------+------------+----------+--------------------+------------------------+
|5508654087|housing/rent/apar...|$1,287 / One BR -...|Square footage: 

In [10]:
df = df.withColumn(
    "has_photo_transformed",
    when(col("has_photo").isin("Yes", "Thumbnail"), "Yes").otherwise("No"))

In [11]:
df = df.fillna({"bathrooms": 0})
df = df.fillna({"bedrooms": 0})
df = df.dropna(subset=['price'])

In [12]:
df.select("bedrooms").distinct().show()
df.select("bathrooms").distinct().show()

+--------+
|bedrooms|
+--------+
|     9.0|
|     5.0|
|     7.0|
|     2.0|
|     3.0|
|     1.0|
|     6.0|
|     8.0|
|     4.0|
|     0.0|
+--------+

+---------+
|bathrooms|
+---------+
|      5.0|
|      5.5|
|      2.5|
|      8.5|
|      7.0|
|      2.0|
|      3.0|
|      1.5|
|      3.5|
|      1.0|
|      6.0|
|      8.0|
|      4.5|
|      4.0|
|      0.0|
+---------+



In [13]:
df_amenities = df.select('id', 'amenities', 'price')

display(df_amenities.show())

+----------+--------------------+-----+
|        id|           amenities|price|
+----------+--------------------+-----+
|5508654087|Internet Access,P...| 1287|
|5508654149|      Fireplace,Pool|  860|
|5508654460|AC,Clubhouse,Dish...|  850|
|5508654607|Gym,Pool,Washer D...| 2669|
|5508654638|Gym,Playground,Po...|  990|
|5508655416|                null| 2395|
|5508655471|Gym,Pool,Washer D...| 2184|
|5508655790|Gym,Parking,Pool,...|  853|
|5508657700|AC,Cable or Satel...|  615|
|5508657758|AC,Clubhouse,Gym,...| 2495|
|5508657770|AC,Cable or Satel...|  700|
|5508658636|AC,Clubhouse,Gym,...| 2095|
|5508658740|AC,Cable or Satel...|  670|
|5508658972|AC,Cable or Satel...|  499|
|5508662051|Fireplace,Gated,G...| 1060|
|5508662767|            Gym,Pool| 2424|
|5508664910|Parking,Pool,Refr...| 2560|
|5508665196|AC,Clubhouse,Dish...| 1930|
|5508665258|  Clubhouse,Gym,Pool| 1280|
|5508665899|AC,Clubhouse,Dish...| 1755|
+----------+--------------------+-----+
only showing top 20 rows



None

In [14]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col

df_amenities = df_amenities.withColumn('amenities', F.when(F.col('amenities') == 'null', 'None').otherwise(F.col('amenities')))

In [15]:
df_amenities = df_amenities.withColumn('amenities', F.explode(F.split(F.col('amenities'), ',')))

display(df_amenities.show())

+----------+----------------+-----+
|        id|       amenities|price|
+----------+----------------+-----+
|5508654087| Internet Access| 1287|
|5508654087|         Parking| 1287|
|5508654087|      Patio/Deck| 1287|
|5508654087|            Pool| 1287|
|5508654087|         Storage| 1287|
|5508654087|              TV| 1287|
|5508654087|    Washer Dryer| 1287|
|5508654149|       Fireplace|  860|
|5508654149|            Pool|  860|
|5508654460|              AC|  850|
|5508654460|       Clubhouse|  850|
|5508654460|      Dishwasher|  850|
|5508654460|Garbage Disposal|  850|
|5508654460|             Gym|  850|
|5508654460|         Parking|  850|
|5508654460|      Patio/Deck|  850|
|5508654460|      Playground|  850|
|5508654460|            Pool|  850|
|5508654607|             Gym| 2669|
|5508654607|            Pool| 2669|
+----------+----------------+-----+
only showing top 20 rows



None

In [16]:
df_amenities.select('amenities').distinct().toPandas()

pivot_amenities = df_amenities.groupBy("id", "price").pivot("amenities").agg(F.lit(1)).na.fill(0)

pivot_amenities = pivot_amenities.drop('None')

display(pivot_amenities.show())

+----------+-----+---+-----+----------+------------------+---------+----------+-------+--------+---------+----------------+-----+----+---+-------+---------------+------+-------+----------+----------+----+------------+-------+---+------+----+------------+-----------+
|        id|price| AC|Alarm|Basketball|Cable or Satellite|Clubhouse|Dishwasher|Doorman|Elevator|Fireplace|Garbage Disposal|Gated|Golf|Gym|Hot Tub|Internet Access|Luxury|Parking|Patio/Deck|Playground|Pool|Refrigerator|Storage| TV|Tennis|View|Washer Dryer|Wood Floors|
+----------+-----+---+-----+----------+------------------+---------+----------+-------+--------+---------+----------------+-----+----+---+-------+---------------+------+-------+----------+----------+----+------------+-------+---+------+----+------------+-----------+
|5508654087| 1287|  0|    0|         0|                 0|        0|         0|      0|       0|        0|               0|    0|   0|  0|      0|              1|     0|      1|         1|         0|

None

In [17]:
df_join = df.join(pivot_amenities, ['id', 'price'], "right")

display(df_join.show())

+----------+-----+--------------------+--------------------+--------------------+--------------------+---------+--------+--------+---+---------+------------+-------------+----------+-----------+-------+------------+-----+--------+---------+------------+----------+--------------------+------------------------+---------------------+---+-----+----------+------------------+---------+----------+-------+--------+---------+----------------+-----+----+---+-------+---------------+------+-------+----------+----------+----+------------+-------+---+------+----+------------+-----------+
|        id|price|            category|               title|                body|           amenities|bathrooms|bedrooms|currency|fee|has_photo|pets_allowed|price_display|price_type|square_feet|address|    cityname|state|latitude|longitude|      source|      time|cityname_transformed|pets_allowed_transformed|has_photo_transformed| AC|Alarm|Basketball|Cable or Satellite|Clubhouse|Dishwasher|Doorman|Elevator|Fireplace

None

In [18]:
from pyspark.sql.window import Window

df_classes = df_join.select('id','price').orderBy('price',ascending=True)

mediana = df_classes.count()/2

X = Window.orderBy('price')

df_classes = df_classes.withColumn('row_num', F.row_number().over(X))

df_classes = df_classes.withColumn(
    "class", 
    F.when(df_classes["row_num"] <= mediana, F.lit('B')).otherwise(F.lit('A'))
).drop("row_num")

In [19]:
df_join = df.join(df_classes, ['id', 'price'], "left")

display(df.count())

df = df_join.withColumnRenamed('class', 'price_class')

display(df.show())

9994

+----------+-----+--------------------+--------------------+--------------------+--------------------+---------+--------+--------+---+---------+------------+-------------+----------+-----------+-------+-----------+-----+--------+---------+------------+----------+--------------------+------------------------+---------------------+-----------+
|        id|price|            category|               title|                body|           amenities|bathrooms|bedrooms|currency|fee|has_photo|pets_allowed|price_display|price_type|square_feet|address|   cityname|state|latitude|longitude|      source|      time|cityname_transformed|pets_allowed_transformed|has_photo_transformed|price_class|
+----------+-----+--------------------+--------------------+--------------------+--------------------+---------+--------+--------+---+---------+------------+-------------+----------+-----------+-------+-----------+-----+--------+---------+------------+----------+--------------------+------------------------+---

None

In [20]:
df = df.join(pivot_amenities, ['id', 'price'], "right")

display(df.show())

+----------+-----+--------------------+--------------------+--------------------+--------------------+---------+--------+--------+---+---------+------------+-------------+----------+-----------+-------+------------+-----+--------+---------+------------+----------+--------------------+------------------------+---------------------+-----------+---+-----+----------+------------------+---------+----------+-------+--------+---------+----------------+-----+----+---+-------+---------------+------+-------+----------+----------+----+------------+-------+---+------+----+------------+-----------+
|        id|price|            category|               title|                body|           amenities|bathrooms|bedrooms|currency|fee|has_photo|pets_allowed|price_display|price_type|square_feet|address|    cityname|state|latitude|longitude|      source|      time|cityname_transformed|pets_allowed_transformed|has_photo_transformed|price_class| AC|Alarm|Basketball|Cable or Satellite|Clubhouse|Dishwasher|Do

None

In [21]:
from pyspark.sql.functions import col

ids_to_remove = [5648708029, 5667488036]  # Altere com os IDs que deseja excluir

df = df.filter(~col('id').isin(ids_to_remove))

In [23]:
# Converter PySpark DataFrame para Pandas DataFrame
pandas_df = df.toPandas()

# Salvar como CSV
pandas_df.to_csv(
    "C:/Users/afili/Desktop/Uni/Mestrado/AASE/apartments_for_rent_10k_final.csv", 
    sep=';', 
    index=False, 
    encoding='utf-8'
)
