In [1]:
from os.path import expanduser, join, abspath

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json

warehouse_location = abspath('spark-warehouse')

# Initialize Spark Session
spark = SparkSession \
    .builder \
    .appName("Listings processing") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()



In [14]:
from pyspark.sql.functions import explode

In [29]:
df = spark.read.json('data_pipelines/mnt/airflow/dags/files/listings_query.json')

In [20]:
df.printSchema()

root
 |-- meta: struct (nullable = true)
 |    |-- build: string (nullable = true)
 |    |-- matching_rows: long (nullable = true)
 |    |-- returned_rows: long (nullable = true)
 |    |-- schema: string (nullable = true)
 |    |-- tracking: string (nullable = true)
 |    |-- tracking_params: struct (nullable = true)
 |    |    |-- channel: string (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- county: string (nullable = true)
 |    |    |-- listingActivity: string (nullable = true)
 |    |    |-- neighborhood: string (nullable = true)
 |    |    |-- propertyStatus: string (nullable = true)
 |    |    |-- propertyType: string (nullable = true)
 |    |    |-- searchBathrooms: string (nullable = true)
 |    |    |-- searchBedrooms: string (nullable = true)
 |    |    |-- searchCityState: string (nullable = true)
 |    |    |-- searchCoordinates: string (nullable = true)
 |    |    |-- searchHouseSqft: string (nullable = true)
 |    |    |-- searchLotSqft: s

In [53]:
df_test = df.select(df.meta, explode(df.properties))

In [54]:
df_test.printSchema()

root
 |-- meta: struct (nullable = true)
 |    |-- build: string (nullable = true)
 |    |-- matching_rows: long (nullable = true)
 |    |-- returned_rows: long (nullable = true)
 |    |-- schema: string (nullable = true)
 |    |-- tracking: string (nullable = true)
 |    |-- tracking_params: struct (nullable = true)
 |    |    |-- channel: string (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- county: string (nullable = true)
 |    |    |-- listingActivity: string (nullable = true)
 |    |    |-- neighborhood: string (nullable = true)
 |    |    |-- propertyStatus: string (nullable = true)
 |    |    |-- propertyType: string (nullable = true)
 |    |    |-- searchBathrooms: string (nullable = true)
 |    |    |-- searchBedrooms: string (nullable = true)
 |    |    |-- searchCityState: string (nullable = true)
 |    |    |-- searchCoordinates: string (nullable = true)
 |    |    |-- searchHouseSqft: string (nullable = true)
 |    |    |-- searchLotSqft: s

In [40]:
df_properties = df.select(explode(df.properties))

In [41]:
df_properties.printSchema()

root
 |-- col: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- county: string (nullable = true)
 |    |    |-- fips_code: string (nullable = true)
 |    |    |-- lat: double (nullable = true)
 |    |    |-- line: string (nullable = true)
 |    |    |-- lon: double (nullable = true)
 |    |    |-- neighborhood_name: string (nullable = true)
 |    |    |-- postal_code: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |    |-- state_code: string (nullable = true)
 |    |-- agents: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- advertiser_id: string (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- photo: struct (nullable = true)
 |    |    |    |    |-- href: string (nullable = true)
 |    |    |    |-- primary: boolean (nullable = true)
 |    

In [55]:
df.createOrReplaceTempView('listings_query')
df_test.createOrReplaceTempView('properties_table')

In [64]:
"""
property_id STRING,
query_date TIMESTAMP,
query_city STRING,
query_state STRING,

prop_type STRING,
address_line STRING,
address_city STRING,
zip_code INT,
fips_code INT,
lat DECIMAL(9,6),
long DECIMAL(9,6),
neighborhood STRING,

listing_price DOUBLE,
baths DOUBLE,
beds INT,
building_size INT,
building_size_units STRING,
lot_size INT,
lot_size_units STRING,
last_update TIMESTAMP,
url STRING
"""

df_to_insert = spark.sql('''
    SELECT
        col.`property_id`,
        current_date() AS query_date,
        meta.`tracking_params`.`city` AS query_city,
        meta.`tracking_params`.`state` AS query_state,
        col.`prop_type`,
        col.`address`.`line` AS address_line,
        col.`address`.`city` AS address_city,
        col.`address`.`postal_code` AS zip_code,
        col.`address`.`fips_code` AS fips_code,
        col.`address`.`lat` AS lat,
        col.`address`.`lon` AS lon,
        col.`address`.`neighborhood_name` as neighborhood,
        col.`price` AS listing_price,
        col.`baths`,
        col.`beds`,
        col.`building_size`.`size` AS building_size,
        col.`building_size`.`units` AS building_size_units,
        col.`lot_size`.`size` AS lot_size,
        col.`lot_size`.`units` AS lot_size_units,
        col.`last_update`,
        col.`rdc_web_url` AS url        
    FROM
        properties_table
''')

In [68]:
type(df_to_insert)

pyspark.sql.dataframe.DataFrame

In [67]:
type(df)

pyspark.sql.dataframe.DataFrame

In [7]:
df.select('properties')

Column<b'properties'>