
# Load data

**source**:  Airbnb data for Athens at http://insideairbnb.com/get-the-data

**destination**: airbnb.raw schema 

In [0]:
%run ./setup

In [0]:
import io
import pandas as pd
import requests

def load_data_from_url_as_spark_df(url, **params):
    """Loads data from url. Optional keyword arguments are passed to pandas.read_csv."""
    response = requests.get(url)
    dx = pd.read_csv(io.BytesIO(response.content), **params)  
    return spark.createDataFrame(dx)

In [0]:
import json 
import pyspark.sql.functions as f
from pyspark.sql.types import StringType, TimestampType
from delta.tables import DeltaTable

def update_schema_with_metadata_fields(schema):
    """Helper method to add metadata fields to contact schema."""
    return schema\
            .add("processing_datetime", TimestampType(), True)\
            .add("area", StringType(), True)
        
def add_metadata_columns(df, area: str):
    """Helper method to add metadata columns to dataframe."""
    return (
        df.withColumn("processing_datetime", f.current_timestamp())
        .withColumn("area", f.lit(area))
    )


## Load Listings

In [0]:
# load contract schema 
with open(contracts_path + "/listing_schema.json", "r") as fin:
    listing_schema = StructType.fromJson(json.loads(fin.read()))

# update contract schema with metadata fields
raw_listing_schema = update_schema_with_metadata_fields(listing_schema)

# initialize delta table
listing_delta = (
    DeltaTable.createIfNotExists(spark)
    .addColumns(raw_listing_schema)
    .tableName('airbnb.raw.listings')
).execute()

In [0]:
listings_url = "http://data.insideairbnb.com/greece/attica/athens/2023-09-21/data/listings.csv.gz"

listings_df = load_data_from_url_as_spark_df(
    listings_url, sep=',', index_col=0, quotechar='"', compression='gzip'
)

In [0]:
_ = (listings_df
     .transform(lambda x: add_metadata_columns(x, "Athens"))
     .writeTo("airbnb.raw.listings")
     .append())


## Load calendar data 

In [0]:
# load contract schema 
with open(contracts_path + "/calendar_schema.json", "r") as fin:
    calendar_schema = StructType.fromJson(json.loads(fin.read()))

# update contract schema with metadata fields
raw_calendar_schema = update_schema_with_metadata_fields(calendar_schema)

# initialize delta table
calendar_delta = (
    DeltaTable.createIfNotExists(spark)
    .addColumns(raw_calendar_schema)
    .tableName('airbnb.raw.calendar')
).execute()

In [0]:
callendar_url = "http://data.insideairbnb.com/greece/attica/athens/2023-09-21/data/calendar.csv.gz"

callendar_df = load_data_from_url_as_spark_df(
    callendar_url, sep=',', index_col=0, quotechar='"', compression='gzip'
)

In [0]:
_ = (callendar_df
     .transform(lambda x: add_metadata_columns(x, "Athens"))
     .writeTo("airbnb.raw.calendar")
     .append())


## Load agg_listings

In [0]:
# load contract schema 
with open(contracts_path + "/agg_listing_schema.json", "r") as fin:
    agg_listing_schema = StructType.fromJson(json.loads(fin.read()))

# update contract schema with metadata fields
raw_agg_listing_schema = update_schema_with_metadata_fields(agg_listing_schema)

# initialize delta table
calendar_delta = (
    DeltaTable.createIfNotExists(spark)
    .addColumns(raw_agg_listing_schema)
    .tableName('airbnb.raw.agg_listing')
).execute()

In [0]:
agg_listing_url = "http://data.insideairbnb.com/greece/attica/athens/2023-09-21/visualisations/listings.csv"

agg_listing_df = load_data_from_url_as_spark_df(
    agg_listing_url, sep=',', index_col=0, quotechar='"'
)


In [0]:
_ = (agg_listing_df
     .transform(lambda x: add_metadata_columns(x, "Athens"))
     .writeTo("airbnb.raw.agg_listing")
     .append())


## Load geo json data 

In orde to load geo data we can use pandas-geojson package. 
But w need to install it first. Plase ```%pip install pandas-geojson ```in the first cell in notebook and rerul entire notebook. 

```
  import json
  from pandas_geojson import read_geojson_url

  neighbourhoods_geo_url = "http://data.insideairbnb.com/greece/attica/athens/2023-09-21/visualisations/neighbourhoods.geojson"

  # load geo data & save as json file.
  geo_json = read_geojson_url(neighbourhoods_geo_url)
  dbutils.fs.put(neighbourhoods_geo_path, json.dumps(geo_json))

  # load neighbourhoods_geo as spark data frame 
  neighbourhoods_geo_df = spark.read.format('json').load(neighbourhoods_geo_path)
```