# ETL Process for order_info_by_city

## Extract and Transforming using PySpark

### Start Spark Session

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import FloatType

In [2]:
# running local spark
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .config("spark.driver.memory", "12g")\
    .appName("neighborhoods_and_city") \
    .getOrCreate()

sc = spark.sparkContext

24/04/28 03:52:43 WARN Utils: Your hostname, Kun-Mac.local resolves to a loopback address: 127.0.0.1; using 172.20.23.178 instead (on interface en0)
24/04/28 03:52:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/28 03:52:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
print("Using Apache Spark Version", spark.version)
web_ui_url = sc.uiWebUrl
print(f"Spark UI is available at: {web_ui_url}")

Using Apache Spark Version 3.5.0
Spark UI is available at: http://172.20.23.178:4040


### Extract Data

In [4]:
calendar = spark.read.option("header", "true") \
                    .option("delimiter", ",") \
                    .option("inferSchema", "true") \
                    .option("multiLine", "true")\
                    .option("escape", "\"")\
                    .csv("../Data/calendar.csv")
calendar = calendar.drop(*['minimum_nights','maximum_nights'])

24/04/28 03:52:54 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

## Transforming process

In [5]:
def money_to_float(money_str):
    if money_str is None:
        return None
    else:
        cleaned_str = money_str[1:].replace(',', '')
        return float(cleaned_str)
spark.udf.register("money_to_float_udf", money_to_float, FloatType())

<function __main__.money_to_float(money_str)>

In [6]:
calendar.createOrReplaceTempView('calendar')
sql = '''

with calendar_cleaned as ( 
select
    listing_id
    ,date
    ,if(available='t',1,0) as is_available
    ,coalesce(money_to_float_udf(adjusted_price),money_to_float_udf(price)) as price
    ,state
    ,city
    ,data_date as data_download_date
from calendar
)

select
    state
    ,city
    ,date
    ,sum(is_available)/count(is_available) as occupancy_rate
    ,avg(price) as avg_price
from calendar_cleaned t1
group by 
    state
    ,city
    ,date
'''
result_city = spark.sql(sql)

In [7]:
print(result_city.count())
result_city.limit(5).toPandas()

                                                                                

12535


                                                                                

Unnamed: 0,state,city,date,occupancy_rate,avg_price
0,ny,albany,2024-03-16,0.809756,142.746341
1,ny,albany,2024-05-26,0.736585,142.746341
2,nc,asheville,2024-12-22,0.5,229.0
3,tx,austin,2024-09-19,0.323238,375.446479
4,ma,boston,2024-02-20,0.657231,234.703853


# Insert into Database
- result_city to PostgreSQL table "order_info_by_city"

In [8]:
from sqlalchemy import create_engine
from sqlalchemy.sql import text

# Pass the connection string to a variable, conn_url
conn_url = 'postgresql://postgres:123@localhost:5432/airbnb'

# Create an engine that connects to PostgreSQL server
engine = create_engine(conn_url)

# Establish a connection
connection = engine.connect()

### Loading Data

In [9]:
# creating table
# all the column except primary key can take null value
sql = """
CREATE TABLE IF NOT EXISTS order_info_by_city (
    state VARCHAR(10), 
    city VARCHAR(255),
    date DATE,
    occupancy_rate DOUBLE PRECISION,
    avg_price DOUBLE PRECISION,
    PRIMARY KEY (state, city, date),
    FOREIGN KEY (city, state) REFERENCES city(city, state)
);
"""
connection.execute(text(sql))
connection.commit()

In [10]:
pd_df = result_city.toPandas()
pd_df.to_sql(name='order_info_by_city', con=engine, if_exists='append', index=False)

                                                                                

535