##

In [24]:
# Import libraries
from data import data_management
from data.variables import MONGODB_COLLECTION_NAME
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_date, round, year
from pyspark.sql.types import DateType
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline

In [27]:
# Create a Spark session
spark = SparkSession.builder.appName("MongoDBIntegration").getOrCreate()

# Retrieve data from MongoDB
mongo_db = data_management.get_database()
mongo_collection = mongo_db[MONGODB_COLLECTION_NAME]
mongo_data = list(mongo_collection.find())
mongo_data_excluded_id = [{k: v for k, v in doc.items() if k not in ["_id","ad_id", "title"]} for doc in mongo_data]

# Create a PySpark DataFrame from the MongoDB data
df = spark.createDataFrame(mongo_data_excluded_id)

# Show the initial DataFrame
df.show(5)


+--------------------+-----------------+------------+--------------------+-----+--------------------+--------------+-----------+---------+-------+------------+-------------+--------------+--------------------+-----------+
|          about_home|construction_year|energy_label|          facilities|floor|            location|nr_of_bedrooms|nr_of_rooms|plot_area|  price|primary_area|property_type|renovated_year|               title|usable_area|
+--------------------+-----------------+------------+--------------------+-----+--------------------+--------------+-----------+---------+-------+------------+-------------+--------------+--------------------+-----------+
|RekkehusBruksarea...|             1969| E - Oransje|[Balkong/Terrasse...|    2|Åsenhagen 52C, 20...|             3|          4|    42459|4100000|          96|     Rekkehus|          null|Pent og velholdt ...|         96|
|Stue, kjøkken, 3 ...|             1925|     G - Rød|[Balkong/Terrasse...|    1|Gammel-lina 32C, ...|           

In [28]:
# Transformation: Add a new column 'price_per_sqft' and 'property_age'
df = df.withColumn("price_per_msquared", round(col("price") / col("primary_area"),2))
df = df.withColumn("property_age", year(current_date()) - col("construction_year"))
# modify energy_level and propery_type and facilities to categorical data
# modify facilities to categorical variable 
# get a sentiment score from about_home 


# Show the transformed DataFrame
df.show(5)

+--------------------+-----------------+------------+--------------------+-----+--------------------+--------------+-----------+---------+-------+------------+-------------+--------------+--------------------+-----------+------------------+------------+
|          about_home|construction_year|energy_label|          facilities|floor|            location|nr_of_bedrooms|nr_of_rooms|plot_area|  price|primary_area|property_type|renovated_year|               title|usable_area|price_per_msquared|property_age|
+--------------------+-----------------+------------+--------------------+-----+--------------------+--------------+-----------+---------+-------+------------+-------------+--------------+--------------------+-----------+------------------+------------+
|RekkehusBruksarea...|             1969| E - Oransje|[Balkong/Terrasse...|    2|Åsenhagen 52C, 20...|             3|          4|    42459|4100000|          96|     Rekkehus|          null|Pent og velholdt ...|         96|          42708.3