In [1]:
from pyspark.sql import SparkSession

In [2]:
# Create SparkSession from builder
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local") \
                    .appName('project-pipeline') \
                    .getOrCreate()

24/03/12 01:38:03 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
df = spark.read.csv('gs://peru-real-state-datalake/2024-03-10/*.csv', header=True, inferSchema=True, multiLine=True)

                                                                                

In [4]:
from pyspark.sql.functions import when
from pyspark.sql.functions import expr
from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import FloatType, IntegerType
from pyspark.sql.functions import col, substring

In [5]:
#Normalize some values in "property_type"
df = df.withColumn("property_type", when(df["property_type"] == "casas", "casa").otherwise(df["property_type"]))
df = df.withColumn("property_type", when(df["property_type"] == "departamentos", "departamento").otherwise(df["property_type"]))
df = df.withColumn("property_type", when(df["property_type"] == "oficinas", "oficina").otherwise(df["property_type"]))
df = df.withColumn("property_type", when(df["property_type"] == "casas-playa", "casa de playa").otherwise(df["property_type"]))
df = df.withColumn("property_type", when(df["property_type"] == "locales-comerciales", "local").otherwise(df["property_type"]))
df = df.withColumn("property_type", when(df["property_type"] == "casas-condominio", "condo").otherwise(df["property_type"]))
df = df.withColumn("property_type", when(df["property_type"] == "terrenos", "terreno").otherwise(df["property_type"]))

In [6]:
#Clean the values in "price" column and convert them to float
clean = df.withColumn('price', regexp_replace('price', '\n', ''))
clean = clean.withColumn('price', regexp_replace('price', ' ', ''))
clean = clean.withColumn('price', regexp_replace('price', '\\$', ''))
clean = clean.withColumn('price', regexp_replace('price', ',', ''))
clean = clean.withColumn("price", clean["price"].cast(FloatType()))

In [7]:
#Clean the values in "size_m2" and convert them to float
clean = clean.withColumn("size_m2", regexp_replace("size_m2", "m2", ""))
clean = clean.withColumn("size_m2", clean["size_m2"].cast(FloatType()))

In [8]:
#Clean some irregular values in "city"
clean = clean.withColumn("city", regexp_replace("city", "-departamento/list", ""))

In [9]:
#Convert the coordinates to float
clean = clean.withColumn("longitude_x", clean["longitude_x"].cast(FloatType()))
clean = clean.withColumn("latitude_y", clean["latitude_y"].cast(FloatType()))

In [10]:
#Delete the strings from "rooms" and "bathrooms" and convert them to integer
clean = clean.withColumn("rooms", regexp_replace("rooms", " Habitaciones", ""))
clean = clean.withColumn("bathrooms", regexp_replace("rooms", " Baños", ""))
clean = clean.withColumn("rooms", clean["rooms"].cast(IntegerType()))
clean = clean.withColumn("bathrooms", clean["bathrooms"].cast(IntegerType()))

In [11]:
#Clean the empty spaces and the unnecesary characters in "specs"
clean = clean.withColumn('specs', regexp_replace('specs', '\n', ''))
clean = clean.withColumn('specs', regexp_replace('specs', '\\s{2,}', ' '))
clean = clean.withColumn('specs', substring(col('specs'), 2, 1000000))

In [12]:
#Delete the "https://www." from the urls
clean = clean.withColumn('url', regexp_replace("url", "https://www.", ""))

In [13]:
clean.printSchema()

root
 |-- scraped_on: timestamp (nullable = true)
 |-- city: string (nullable = true)
 |-- property_type: string (nullable = true)
 |-- operation: string (nullable = true)
 |-- name: string (nullable = true)
 |-- price: float (nullable = true)
 |-- longitude_x: float (nullable = true)
 |-- latitude_y: float (nullable = true)
 |-- size_m2: float (nullable = true)
 |-- rooms: integer (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- adress: string (nullable = true)
 |-- description: string (nullable = true)
 |-- specs: string (nullable = true)
 |-- url: string (nullable = true)



In [14]:
clean.select("operation").count()

                                                                                

20932

In [15]:
drops = ["city", "longitude_x", "latitude_y", "size_m2", "rooms", "bathrooms", "price"] 
clean = clean.dropDuplicates(subset=drops)

In [16]:
fillnas = ["size_m2", "price", "rooms", "bathrooms"]
clean = clean.fillna(0, subset=fillnas)

In [17]:
clean.printSchema()

root
 |-- scraped_on: timestamp (nullable = true)
 |-- city: string (nullable = true)
 |-- property_type: string (nullable = true)
 |-- operation: string (nullable = true)
 |-- name: string (nullable = true)
 |-- price: float (nullable = false)
 |-- longitude_x: float (nullable = true)
 |-- latitude_y: float (nullable = true)
 |-- size_m2: float (nullable = false)
 |-- rooms: integer (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- adress: string (nullable = true)
 |-- description: string (nullable = true)
 |-- specs: string (nullable = true)
 |-- url: string (nullable = true)



In [20]:
clean = clean.withColumn("price_per_m2", expr("price / size_m2"))

In [24]:
clean.show(50)



Py4JJavaError: An error occurred while calling o233.showString.
: org.apache.spark.SparkException: Job 11 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1195)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1193)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1193)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2940)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$stop$3(DAGScheduler.scala:2834)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1485)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2834)
	at org.apache.spark.SparkContext.$anonfun$stop$11(SparkContext.scala:2158)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1485)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:2158)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:2112)
	at org.apache.spark.SparkContext.$anonfun$new$37(SparkContext.scala:710)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2067)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [25]:
clean.count()

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


Py4JError: An error occurred while calling o233.count