In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/25 11:41:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark.sparkContext.uiWebUrl

'http://de-zoomcamp.australia-southeast1-b.c.western-triode-447803-t3.internal:4040'

In [8]:
!wc -l domain_properties.csv

11160 domain_properties.csv


In [10]:
df = spark.read \
    .option("header", "true") \
    .csv('domain_properties.csv')

                                                                                

In [13]:
df.schema

StructType([StructField('price', StringType(), True), StructField('date_sold', StringType(), True), StructField('suburb', StringType(), True), StructField('num_bath', StringType(), True), StructField('num_bed', StringType(), True), StructField('num_parking', StringType(), True), StructField('property_size', StringType(), True), StructField('type', StringType(), True), StructField('suburb_population', StringType(), True), StructField('suburb_median_income', StringType(), True), StructField('suburb_sqkm', StringType(), True), StructField('suburb_lat', StringType(), True), StructField('suburb_lng', StringType(), True), StructField('suburb_elevation', StringType(), True), StructField('cash_rate', StringType(), True), StructField('property_inflation_index', StringType(), True), StructField('km_from_cbd', StringType(), True)])

In [14]:
!head -n 1001 domain_properties.csv > head.csv

In [15]:
import pandas as pd

In [16]:
df_pandas = pd.read_csv('head.csv')

In [17]:
df_pandas.dtypes

price                         int64
date_sold                    object
suburb                       object
num_bath                      int64
num_bed                       int64
num_parking                   int64
property_size                 int64
type                         object
suburb_population             int64
suburb_median_income          int64
suburb_sqkm                 float64
suburb_lat                  float64
suburb_lng                  float64
suburb_elevation              int64
cash_rate                   float64
property_inflation_index    float64
km_from_cbd                 float64
dtype: object

In [18]:
spark.createDataFrame(df_pandas).schema

StructType([StructField('price', LongType(), True), StructField('date_sold', StringType(), True), StructField('suburb', StringType(), True), StructField('num_bath', LongType(), True), StructField('num_bed', LongType(), True), StructField('num_parking', LongType(), True), StructField('property_size', LongType(), True), StructField('type', StringType(), True), StructField('suburb_population', LongType(), True), StructField('suburb_median_income', LongType(), True), StructField('suburb_sqkm', DoubleType(), True), StructField('suburb_lat', DoubleType(), True), StructField('suburb_lng', DoubleType(), True), StructField('suburb_elevation', LongType(), True), StructField('cash_rate', DoubleType(), True), StructField('property_inflation_index', DoubleType(), True), StructField('km_from_cbd', DoubleType(), True)])

In [19]:
from pyspark.sql import types

In [28]:
schema = types.StructType([
    types.StructField('price', types.StringType(), True), 
    types.StructField('date_sold', types.StringType(), True), 
    types.StructField('suburb', types.StringType(), True), 
    types.StructField('num_bath', types.IntegerType(), True), 
    types.StructField('num_bed', types.IntegerType(), True), 
    types.StructField('num_parking', types.IntegerType(), True), 
    types.StructField('property_size', types.IntegerType(), True), 
    types.StructField('type', types.StringType(), True), 
    types.StructField('suburb_population', types.IntegerType(), True), 
    types.StructField('suburb_median_income', types.IntegerType(), True), 
    types.StructField('suburb_sqkm', types.DoubleType(), True), 
    types.StructField('suburb_lat', types.DoubleType(), True), 
    types.StructField('suburb_lng', types.DoubleType(), True), 
    types.StructField('suburb_elevation', types.IntegerType(), True), 
    types.StructField('cash_rate', types.DoubleType(), True), 
    types.StructField('property_inflation_index', types.DoubleType(), True), 
    types.StructField('km_from_cbd', types.DoubleType(), True)
])

In [29]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('domain_properties.csv')

In [30]:
df = df.repartition(24)

In [32]:
df.write.parquet('domain2016-2021',mode='overwrite')

                                                                                

In [33]:
df = spark.read.parquet('domain2016-2021')

In [34]:
df.printSchema()

root
 |-- price: string (nullable = true)
 |-- date_sold: string (nullable = true)
 |-- suburb: string (nullable = true)
 |-- num_bath: integer (nullable = true)
 |-- num_bed: integer (nullable = true)
 |-- num_parking: integer (nullable = true)
 |-- property_size: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- suburb_population: integer (nullable = true)
 |-- suburb_median_income: integer (nullable = true)
 |-- suburb_sqkm: double (nullable = true)
 |-- suburb_lat: double (nullable = true)
 |-- suburb_lng: double (nullable = true)
 |-- suburb_elevation: integer (nullable = true)
 |-- cash_rate: double (nullable = true)
 |-- property_inflation_index: double (nullable = true)
 |-- km_from_cbd: double (nullable = true)



In [35]:
from pyspark.sql import functions as F

In [39]:
df = df.withColumn("date_sold", F.to_date(F.col("date_sold"), "d/M/yy"))

In [40]:
df.show()

+-------+----------+----------------+--------+-------+-----------+-------------+--------------------+-----------------+--------------------+-----------+----------+----------+----------------+---------+------------------------+-----------+-------------+
|  price| date_sold|          suburb|num_bath|num_bed|num_parking|property_size|                type|suburb_population|suburb_median_income|suburb_sqkm|suburb_lat|suburb_lng|suburb_elevation|cash_rate|property_inflation_index|km_from_cbd|date_sold_new|
+-------+----------+----------------+--------+-------+-----------+-------------+--------------------+-----------------+--------------------+-----------+----------+----------+----------------+---------+------------------------+-----------+-------------+
| 670000|2020-10-03|      Lalor Park|       1|      3|          4|          550|               House|             7667|               31200|      2.701| -33.76065| 150.93136|              77|     0.43|                   165.9|      28.27|   