
# Load data

**source**:  Airbnb data for Athens at http://insideairbnb.com/get-the-data

**destination**: airbnb.raw schema 

In [0]:
USERS   = ["pelekh", "samoilenko", "yaremko", "pavliuk"]
DOMAIN  = "ucu.edu.ua"
SUFFIX  = "pn"  # builds <surname>.pn@ucu.edu.ua

CATALOG, SCHEMA, VOLUME = "airbnb", "raw", "vol"

def email_for(surname: str) -> str:
    return f"{surname}.{SUFFIX}@{DOMAIN}"

def run(sql):
    print("SQL >", sql)
    return spark.sql(sql)

def try_run(sql):
    try:
        run(sql); print("✅ OK")
    except Exception as e:
        print("↪️ Skipped/failed:", e)

try_run(f"CREATE CATALOG IF NOT EXISTS {CATALOG}")
try_run(f"USE CATALOG {CATALOG}")
try_run(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA}")
try_run(f"CREATE VOLUME IF NOT EXISTS {CATALOG}.{SCHEMA}.{VOLUME}")

run(f"USE CATALOG {CATALOG}")
run(f"USE SCHEMA {SCHEMA}")
tables = [r.tableName for r in run(f"SHOW TABLES IN {CATALOG}.{SCHEMA}").collect()]
print("Tables found:", tables)

def grant_for_principal(principal: str):
    print("\n" + "="*80)
    print(f"Granting for: {principal}")
    print("="*80)

    try_run(f"GRANT USE CATALOG ON CATALOG {CATALOG} TO `{principal}`")

    try:
        run(f"GRANT USE SCHEMA ON SCHEMA {CATALOG}.{SCHEMA} TO `{principal}`"); print("✅ OK")
    except Exception as e:
        print("↪️ USE SCHEMA failed, try USAGE")
        try_run(f"GRANT USAGE ON SCHEMA {CATALOG}.{SCHEMA} TO `{principal}`")

    try_run(f"GRANT CREATE VOLUME ON SCHEMA {CATALOG}.{SCHEMA} TO `{principal}`")

    try_run(f"GRANT READ VOLUME  ON VOLUME {CATALOG}.{SCHEMA}.{VOLUME} TO `{principal}`")
    try_run(f"GRANT WRITE VOLUME ON VOLUME {CATALOG}.{SCHEMA}.{VOLUME} TO `{principal}`")

    for t in tables:
        try_run(f"GRANT SELECT ON TABLE {CATALOG}.{SCHEMA}.{t} TO `{principal}`")
        try_run(f"GRANT MODIFY ON TABLE {CATALOG}.{SCHEMA}.{t} TO `{principal}`")

principals = [email_for(u) for u in USERS]
print("Principals:", principals)

for p in principals:
    grant_for_principal(p)

if principals:
    first = principals[0]
    print("\n--- CURRENT GRANTS ON VOLUME ---")
    try:
        display(run(f"SHOW GRANTS ON VOLUME {CATALOG}.{SCHEMA}.{VOLUME}"))
    except Exception as e:
        print("SHOW GRANTS (volume) failed:", e)

    if tables:
        print("\n--- CURRENT GRANTS ON FIRST TABLE ---")
        try:
            display(run(f"SHOW GRANTS ON TABLE {CATALOG}.{SCHEMA}.{tables[0]}"))
        except Exception as e:
            print("SHOW GRANTS (table) failed:", e)

print("\nAll admin grants attempted for all principals.")


SQL > CREATE CATALOG IF NOT EXISTS airbnb
✅ OK
SQL > USE CATALOG airbnb
✅ OK
SQL > CREATE SCHEMA IF NOT EXISTS raw
✅ OK
SQL > CREATE VOLUME IF NOT EXISTS airbnb.raw.vol
✅ OK
SQL > USE CATALOG airbnb
SQL > USE SCHEMA raw
SQL > SHOW TABLES IN airbnb.raw
Tables found: ['listings']
Principals: ['pelekh.pn@ucu.edu.ua', 'samoilenko.pn@ucu.edu.ua', 'yaremko.pn@ucu.edu.ua', 'pavliuk.pn@ucu.edu.ua']

Granting for: pelekh.pn@ucu.edu.ua
SQL > GRANT USE CATALOG ON CATALOG airbnb TO `pelekh.pn@ucu.edu.ua`
↪️ Skipped/failed: (com.databricks.sql.managedcatalog.acl.UnauthorizedAccessException) PERMISSION_DENIED: User does not have MANAGE on Catalog 'airbnb'.

JVM stacktrace:
com.databricks.sql.managedcatalog.acl.UnauthorizedAccessException
	at com.databricks.managedcatalog.ErrorDetailsHandler.wrapServiceException(ErrorDetailsHandler.scala:98)
	at com.databricks.managedcatalog.ErrorDetailsHandler.wrapServiceException$(ErrorDetailsHandler.scala:69)
	at com.databricks.managedcatalog.ManagedCatalogClientI

Principal,ActionType,ObjectType,ObjectKey
samoilenko.pn@ucu.edu.ua,READ VOLUME,VOLUME,airbnb.raw.vol
samoilenko.pn@ucu.edu.ua,WRITE VOLUME,VOLUME,airbnb.raw.vol



--- CURRENT GRANTS ON FIRST TABLE ---
SQL > SHOW GRANTS ON TABLE airbnb.raw.listings


Principal,ActionType,ObjectType,ObjectKey
samoilenko.pn@ucu.edu.ua,MODIFY,TABLE,airbnb.raw.listings
samoilenko.pn@ucu.edu.ua,SELECT,TABLE,airbnb.raw.listings



All admin grants attempted for all principals.


In [0]:
from pyspark.sql.types import *
from pyspark.sql import functions as F
from delta.tables import DeltaTable
import json

CATALOG = "airbnb"
SCHEMA  = "raw"
VOLUME  = "vol"

spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE SCHEMA {SCHEMA}")

def update_schema_with_metadata_fields(schema: StructType) -> StructType:
    """Додаємо metadata поля до схеми"""
    return (schema
            .add("processing_datetime", TimestampType(), True)
            .add("area", StringType(), True))

def add_metadata_columns(df, area: str):
    """Додаємо metadata колонки до DataFrame"""
    return (df
            .withColumn("processing_datetime", F.current_timestamp())
            .withColumn("area", F.lit(area)))

listing_schema = StructType([
    StructField("listing_url", StringType(), True),
    StructField("scrape_id", LongType(), True),
    StructField("last_scraped", StringType(), True),
    StructField("source", StringType(), True),
    StructField("name", StringType(), True),
    StructField("description", StringType(), True),
    StructField("neighborhood_overview", StringType(), True),
    StructField("picture_url", StringType(), True),
    StructField("host_id", LongType(), True),
    StructField("host_url", StringType(), True),
    StructField("host_name", StringType(), True),
    StructField("host_since", StringType(), True),
    StructField("host_location", StringType(), True),
    StructField("host_about", StringType(), True),
    StructField("host_response_time", StringType(), True),
    StructField("host_response_rate", StringType(), True),
    StructField("host_acceptance_rate", StringType(), True),
    StructField("host_is_superhost", StringType(), True),
    StructField("host_thumbnail_url", StringType(), True),
    StructField("host_picture_url", StringType(), True),
    StructField("host_neighbourhood", StringType(), True),
    StructField("host_listings_count", DoubleType(), True),
    StructField("host_total_listings_count", DoubleType(), True),
    StructField("host_verifications", StringType(), True),
    StructField("host_has_profile_pic", StringType(), True),
    StructField("host_identity_verified", StringType(), True),
    StructField("neighbourhood", StringType(), True),
    StructField("neighbourhood_cleansed", StringType(), True),
    StructField("neighbourhood_group_cleansed", DoubleType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("property_type", StringType(), True),
    StructField("room_type", StringType(), True),
    StructField("accommodates", LongType(), True),
    StructField("bathrooms", DoubleType(), True),
    StructField("bathrooms_text", StringType(), True),
    StructField("bedrooms", DoubleType(), True),
    StructField("beds", DoubleType(), True),
    StructField("amenities", StringType(), True),
    StructField("price", StringType(), True),
    StructField("minimum_nights", LongType(), True),
    StructField("maximum_nights", LongType(), True),
    StructField("minimum_minimum_nights", LongType(), True),
    StructField("maximum_minimum_nights", LongType(), True),
    StructField("minimum_maximum_nights", LongType(), True),
    StructField("maximum_maximum_nights", LongType(), True),
    StructField("minimum_nights_avg_ntm", DoubleType(), True),
    StructField("maximum_nights_avg_ntm", DoubleType(), True),
    StructField("calendar_updated", DoubleType(), True),
    StructField("has_availability", StringType(), True),
    StructField("availability_30", LongType(), True),
    StructField("availability_60", LongType(), True),
    StructField("availability_90", LongType(), True),
    StructField("availability_365", LongType(), True),
    StructField("calendar_last_scraped", StringType(), True),
    StructField("number_of_reviews", LongType(), True),
    StructField("number_of_reviews_ltm", LongType(), True),
    StructField("number_of_reviews_l30d", LongType(), True),
    StructField("first_review", StringType(), True),
    StructField("last_review", StringType(), True),
    StructField("review_scores_rating", DoubleType(), True),
    StructField("review_scores_accuracy", DoubleType(), True),
    StructField("review_scores_cleanliness", DoubleType(), True),
    StructField("review_scores_checkin", DoubleType(), True),
    StructField("review_scores_communication", DoubleType(), True),
    StructField("review_scores_location", DoubleType(), True),
    StructField("review_scores_value", DoubleType(), True),
    StructField("license", StringType(), True),
    StructField("instant_bookable", StringType(), True),
    StructField("calculated_host_listings_count", LongType(), True),
    StructField("calculated_host_listings_count_entire_homes", LongType(), True),
    StructField("calculated_host_listings_count_private_rooms", LongType(), True),
    StructField("calculated_host_listings_count_shared_rooms", LongType(), True),
    StructField("reviews_per_month", DoubleType(), True),
])

contracts_path = f"dbfs:/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}/contracts"
dbutils.fs.mkdirs(contracts_path)  # requires WRITE VOLUME
schema_json_path = f"{contracts_path}/listing_schema.json"

dbutils.fs.put(schema_json_path, listing_schema.json(), True)
print("✅ Wrote:", schema_json_path)

print("Contracts dir listing:")
for f in dbutils.fs.ls(contracts_path):
    print(" -", f.path)


Wrote 5527 bytes.
✅ Wrote: dbfs:/Volumes/airbnb/raw/vol/contracts/listing_schema.json
Contracts dir listing:
 - dbfs:/Volumes/airbnb/raw/vol/contracts/agg_listing_schema.json
 - dbfs:/Volumes/airbnb/raw/vol/contracts/calendar_schema.json
 - dbfs:/Volumes/airbnb/raw/vol/contracts/listing_schema.json


In [0]:
import io
import pandas as pd
import requests

def load_data_from_url_as_spark_df(url, **params):
    """Loads data from url. Optional keyword arguments are passed to pandas.read_csv."""
    response = requests.get(url)
    dx = pd.read_csv(io.BytesIO(response.content), **params)  
    return spark.createDataFrame(dx)

In [0]:
import json 
import pyspark.sql.functions as f
from pyspark.sql.types import StringType, TimestampType
from delta.tables import DeltaTable

def update_schema_with_metadata_fields(schema):
    """Helper method to add metadata fields to contact schema."""
    return schema\
            .add("processing_datetime", TimestampType(), True)\
            .add("area", StringType(), True)
        
def add_metadata_columns(df, area: str):
    """Helper method to add metadata columns to dataframe."""
    return (
        df.withColumn("processing_datetime", f.current_timestamp())
        .withColumn("area", f.lit(area))
    )


## Load Listings

In [0]:
CATALOG = "airbnb"
SCHEMA  = "raw"
TABLE   = "listings"
contracts_path = "dbfs:/Volumes/airbnb/raw/vol/contracts"  # your UC Volume folder
schema_file = f"{contracts_path}/listing_schema.json"

spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE SCHEMA {SCHEMA}")

import json
from pyspark.sql.types import StructType, TimestampType, StringType
from pyspark.sql import functions as F
from delta.tables import DeltaTable

def update_schema_with_metadata_fields(schema: StructType) -> StructType:
    return (schema
            .add("processing_datetime", TimestampType(), True)
            .add("area", StringType(), True))

def add_metadata_columns(df, area: str):
    return (df
            .withColumn("processing_datetime", F.current_timestamp())
            .withColumn("area", F.lit(area)))

print("Contracts dir listing:")
print("\n".join([f.path for f in dbutils.fs.ls(contracts_path)]))

schema_json_text = dbutils.fs.head(schema_file)
listing_schema = StructType.fromJson(json.loads(schema_json_text))

raw_listing_schema = update_schema_with_metadata_fields(listing_schema)

(
    DeltaTable.createIfNotExists(spark)
    .tableName(f"{CATALOG}.{SCHEMA}.{TABLE}")
    .addColumns(raw_listing_schema)        # StructType is accepted
    # .comment("Airbnb listings (raw) with metadata")   # optional
    # .partitionedBy("neighbourhood_cleansed")          # optional
    .execute()
)

print(f"✅ Delta table ensured: {CATALOG}.{SCHEMA}.{TABLE}")


Contracts dir listing:
dbfs:/Volumes/airbnb/raw/vol/contracts/agg_listing_schema.json
dbfs:/Volumes/airbnb/raw/vol/contracts/calendar_schema.json
dbfs:/Volumes/airbnb/raw/vol/contracts/listing_schema.json
✅ Delta table ensured: airbnb.raw.listings


In [0]:
listings_url = "http://data.insideairbnb.com/greece/attica/athens/2023-09-21/data/listings.csv.gz"

listings_df = load_data_from_url_as_spark_df(
    listings_url, sep=',', index_col=0, quotechar='"', compression='gzip'
)

In [0]:
_ = (listings_df
     .transform(lambda x: add_metadata_columns(x, "Athens"))
     .writeTo("airbnb.raw.listings")
     .append())