In [1]:
# install boto3 for AWS connection
dbutils.library.installPyPI("boto3", version="1.9.157")
dbutils.library.restartPython()

In [2]:
# import AWS credentials
# import config.py ##for local
%run "/dbfs/FileStore/tables/config" ##for databricks

In [3]:
import pandas as pd
pd.set_option('display.max_columns', None)

In [4]:
from pyspark.sql import SparkSession
from pyspark import SparkFiles
from pyspark.sql import DataFrame
from pyspark.sql.types import *
import pyspark.sql.functions as F

In [5]:
# get or create Spark session
app_name = "spark-airbnb"
spark = SparkSession.builder.appName(app_name).getOrCreate()

In [6]:
# raw file to transform and load after uploading to DBFS
neighbourhoods_file = "/FileStore/tables/neighbourhoods.csv"
reviews_full_file   = "/FileStore/tables/reviews_full.csv"
calendar_file       = "/FileStore/tables/calendar.csv"
listings_full_file  = "/FileStore/tables/listings_full.csv"

In [7]:
## Read CSV files to Spark DataFrame and print schema

# neighbourhoods Spark DataFrame
neighbourhoods_sdf = spark.read \
    .format("com.databricks.spark.csv") \
    .options(header='true', inferSchema="true") \
    .load(neighbourhoods_file)
neighbourhoods_sdf.printSchema()


# reviews_full Spark DataFrame
# reviews have new line characters in the comments that need to be cleaned up before loading
reviews_full_pdf = pd.read_csv("/dbfs/FileStore/tables/reviews_full.csv").replace({r'\\n': ''}, regex=True)
reviews_full_sdf = sqlContext.createDataFrame(reviews_full_pdf)
reviews_full_sdf.printSchema()


# calendar Spark DataFrame
calendar_sdf = spark.read \
    .format("com.databricks.spark.csv") \
    .options(header='true', inferSchema="true") \
    .load(calendar_file)
calendar_sdf.printSchema()


# listings_full Spark DataFrame
# listings needs to be cleaned up before conversion to Spark DataFrame
listings_full_pdf = pd.read_csv("/dbfs/FileStore/tables/listings_full.csv").replace({r'\\n': ''}, regex=True)

def cleanup_string_to_float(col, extra_char, divisor=1):
    ''' Remove extraneous character, convert string to float, divide float if neccessary '''
    return col.str.replace(extra_char,"").astype('float') / divisor

price_columns = ["price", "weekly_price", "monthly_price", "security_deposit", "cleaning_fee", "extra_people"]
for pc in price_columns:
    listings_full_pdf[pc] = cleanup_string_to_float(col=listings_full_pdf[pc].str.replace(",",""), extra_char='$')

# cleanup certain columns
listings_full_pdf["host_response_rate"] = cleanup_string_to_float(col=listings_full_pdf["host_response_rate"], extra_char='%', divisor=100)
listings_full_pdf["jurisdiction_names"] = listings_full_pdf["jurisdiction_names"].str.replace("{","[").str.replace("}","]")
listings_full_pdf["amenities"] = listings_full_pdf["amenities"].str.replace('"','').str.replace(',','","').str.replace('{','["').str.replace('}','"]')

# cast all columns to StringType() initially to avoid initial schema issues
listings_full_sdf = spark.createDataFrame(listings_full_pdf.astype(str))
listings_full_sdf.printSchema()

In [8]:
## Transform data to schema specifications if needed

# reviews_full table
reviews_full_sdf = reviews_full_sdf \
          .withColumn('date', F.col('date').cast(DateType()))

# calendar table
calendar_sdf = calendar_sdf \
          .withColumn('available', F.col('available').cast(BooleanType())) \
          .withColumn('date', F.col('date').cast(DateType())) \
          .withColumn('price', F.regexp_replace(F.col('price'), '\$', '').cast(FloatType())) \
          .withColumn('adjusted_price', F.regexp_replace(F.col('adjusted_price'), '\$', '').cast(FloatType()))

# listings_full table
# if StringType, then find and convert null values
listings_full_sdf = listings_full_sdf \
          .withColumn('id',F.col('id').cast(IntegerType())) \
          .withColumn("listing_url",F.when(F.col("listing_url").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("listing_url"))) \
          .withColumn('scrape_id',F.col('scrape_id').cast(LongType())) \
          .withColumn('last_scraped',F.col('last_scraped').cast(DateType())) \
          .withColumn("name",F.when(F.col("name").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("name"))) \
          .withColumn("summary",F.when(F.col("summary").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("summary"))) \
          .withColumn("space",F.when(F.col("space").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("space"))) \
          .withColumn("description",F.when(F.col("description").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("description"))) \
          .withColumn("experiences_offered",F.when(F.col("experiences_offered").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("experiences_offered"))) \
          .withColumn("neighborhood_overview",F.when(F.col("neighborhood_overview").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("neighborhood_overview"))) \
          .withColumn("notes",F.when(F.col("notes").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("notes"))) \
          .withColumn("transit",F.when(F.col("transit").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("transit"))) \
          .withColumn("access",F.when(F.col("access").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("access"))) \
          .withColumn("interaction",F.when(F.col("interaction").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("interaction"))) \
          .withColumn("house_rules",F.when(F.col("house_rules").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("house_rules"))) \
          .withColumn("thumbnail_url",F.when(F.col("thumbnail_url").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("thumbnail_url"))) \
          .withColumn("medium_url",F.when(F.col("medium_url").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("medium_url"))) \
          .withColumn("picture_url",F.when(F.col("picture_url").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("picture_url"))) \
          .withColumn("xl_picture_url",F.when(F.col("xl_picture_url").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("xl_picture_url"))) \
          .withColumn('host_id',F.col('host_id').cast(IntegerType())) \
          .withColumn("host_url",F.when(F.col("host_url").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("host_url"))) \
          .withColumn("host_name",F.when(F.col("host_name").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("host_name"))) \
          .withColumn('host_since',F.col('host_since').cast(DateType())) \
          .withColumn("host_location",F.when(F.col("host_location").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("host_location"))) \
          .withColumn("host_about",F.when(F.col("host_about").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("host_about"))) \
          .withColumn("host_response_time",F.when(F.col("host_response_time").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("host_response_time"))) \
          .withColumn('host_response_rate',F.col('host_response_rate').cast(FloatType())) \
          .withColumn('host_acceptance_rate',F.col('host_acceptance_rate').cast(FloatType())) \
          .withColumn('host_is_superhost',F.col('host_is_superhost').cast(BooleanType())) \
          .withColumn("host_thumbnail_url",F.when(F.col("host_thumbnail_url").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("host_thumbnail_url"))) \
          .withColumn("host_picture_url",F.when(F.col("host_picture_url").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("host_picture_url"))) \
           .withColumn("host_neighbourhood",F.when(F.col("host_neighbourhood").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("host_neighbourhood"))) \
          .withColumn('host_listings_count',F.col('host_listings_count').cast(IntegerType())) \
          .withColumn('host_total_listings_count',F.col('host_total_listings_count').cast(IntegerType())) \
          .withColumn("host_verifications",F.when(F.col("host_verifications").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("host_verifications"))) \
          .withColumn('host_has_profile_pic',F.col('host_has_profile_pic').cast(BooleanType())) \
          .withColumn('host_identity_verified',F.col('host_identity_verified').cast(BooleanType())) \
          .withColumn("street",F.when(F.col("street").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("street"))) \
          .withColumn("neighbourhood",F.when(F.col("neighbourhood").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("neighbourhood"))) \
          .withColumn("neighbourhood_cleansed",F.when(F.col("neighbourhood_cleansed").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("neighbourhood_cleansed"))) \
          .withColumn("neighbourhood_group_cleansed",F.when(F.col("neighbourhood_group_cleansed").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("neighbourhood_group_cleansed"))) \
          .withColumn("city",F.when(F.col("city").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("city"))) \
          .withColumn("state",F.when(F.col("state").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("state"))) \
          .withColumn("zipcode",F.when(F.col("zipcode").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("zipcode"))) \
          .withColumn("market",F.when(F.col("market").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("market"))) \
          .withColumn("smart_location",F.when(F.col("smart_location").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("smart_location"))) \
          .withColumn("country_code",F.when(F.col("country_code").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("country_code"))) \
          .withColumn("country",F.when(F.col("country").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("country"))) \
          .withColumn('latitude',F.col('latitude').cast(FloatType())) \
          .withColumn('longitude',F.col('longitude').cast(FloatType())) \
          .withColumn('is_location_exact',F.col('is_location_exact').cast(BooleanType())) \
          .withColumn("property_type",F.when(F.col("property_type").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("property_type"))) \
          .withColumn("room_type",F.when(F.col("room_type").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("room_type"))) \
          .withColumn('accommodates',F.col('accommodates').cast(IntegerType())) \
          .withColumn('bathrooms',F.col('bathrooms').cast(FloatType())) \
          .withColumn('bedrooms',F.col('bedrooms').cast(IntegerType())) \
          .withColumn('beds',F.col('beds').cast(IntegerType())) \
          .withColumn("bed_type",F.when(F.col("bed_type").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("bed_type"))) \
          .withColumn("amenities",F.when(F.col("amenities").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("amenities"))) \
          .withColumn('square_feet',F.col('square_feet').cast(IntegerType())) \
          .withColumn('price',F.col('price').cast(FloatType())) \
          .withColumn('weekly_price',F.col('weekly_price').cast(FloatType())) \
          .withColumn('monthly_price',F.col('monthly_price').cast(FloatType())) \
          .withColumn('security_deposit',F.col('security_deposit').cast(FloatType())) \
          .withColumn('cleaning_fee',F.col('cleaning_fee').cast(FloatType())) \
          .withColumn('guests_included',F.col('guests_included').cast(IntegerType())) \
          .withColumn('extra_people',F.col('extra_people').cast(FloatType())) \
          .withColumn('minimum_nights',F.col('minimum_nights').cast(IntegerType())) \
          .withColumn('maximum_nights',F.col('maximum_nights').cast(IntegerType())) \
          .withColumn('minimum_minimum_nights',F.col('minimum_minimum_nights').cast(IntegerType())) \
          .withColumn('maximum_minimum_nights',F.col('maximum_minimum_nights').cast(IntegerType())) \
          .withColumn('minimum_maximum_nights',F.col('minimum_maximum_nights').cast(IntegerType())) \
          .withColumn('maximum_maximum_nights',F.col('maximum_maximum_nights').cast(IntegerType())) \
          .withColumn('minimum_nights_avg_ntm',F.col('minimum_nights_avg_ntm').cast(FloatType())) \
          .withColumn('maximum_nights_avg_ntm',F.col('maximum_nights_avg_ntm').cast(FloatType())) \
          .withColumn("calendar_updated",F.when(F.col("calendar_updated").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("calendar_updated"))) \
          .withColumn('has_availability',F.col('has_availability').cast(BooleanType())) \
          .withColumn('availability_30',F.col('availability_30').cast(IntegerType())) \
          .withColumn('availability_60',F.col('availability_60').cast(IntegerType())) \
          .withColumn('availability_90',F.col('availability_90').cast(IntegerType())) \
          .withColumn('availability_365',F.col('availability_365').cast(IntegerType())) \
          .withColumn('calendar_last_scraped',F.col('calendar_last_scraped').cast(DateType())) \
          .withColumn('number_of_reviews',F.col('number_of_reviews').cast(IntegerType())) \
          .withColumn('number_of_reviews_ltm',F.col('number_of_reviews_ltm').cast(IntegerType())) \
          .withColumn('first_review',F.col('first_review').cast(DateType())) \
          .withColumn('last_review',F.col('last_review').cast(DateType())) \
          .withColumn('review_scores_rating',F.col('review_scores_rating').cast(IntegerType())) \
          .withColumn('review_scores_accuracy',F.col('review_scores_accuracy').cast(IntegerType())) \
          .withColumn('review_scores_cleanliness',F.col('review_scores_cleanliness').cast(IntegerType())) \
          .withColumn('review_scores_checkin',F.col('review_scores_checkin').cast(IntegerType())) \
          .withColumn('review_scores_communication',F.col('review_scores_communication').cast(IntegerType())) \
          .withColumn('review_scores_location',F.col('review_scores_location').cast(IntegerType())) \
          .withColumn('review_scores_value',F.col('review_scores_value').cast(IntegerType())) \
          .withColumn('requires_license',F.col('requires_license').cast(BooleanType())) \
          .withColumn("license",F.when(F.col("license").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("license"))) \
          .withColumn("jurisdiction_names",F.when(F.col("jurisdiction_names").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("jurisdiction_names"))) \
          .withColumn('instant_bookable',F.col('instant_bookable').cast(BooleanType())) \
          .withColumn('is_business_travel_ready',F.col('is_business_travel_ready').cast(BooleanType())) \
          .withColumn("cancellation_policy",F.when(F.col("cancellation_policy").isin('nan', 'NaN', 'n/a', 'N/A'),None).otherwise(F.col("cancellation_policy"))) \
          .withColumn('require_guest_profile_picture',F.col('require_guest_profile_picture').cast(BooleanType())) \
          .withColumn('require_guest_phone_verification',F.col('require_guest_phone_verification').cast(BooleanType())) \
          .withColumn('calculated_host_listings_count',F.col('calculated_host_listings_count').cast(IntegerType())) \
          .withColumn('calculated_host_listings_count_entire_homes',F.col('calculated_host_listings_count_entire_homes').cast(IntegerType())) \
          .withColumn('calculated_host_listings_count_private_rooms',F.col('calculated_host_listings_count_private_rooms').cast(IntegerType())) \
          .withColumn('calculated_host_listings_count_shared_rooms',F.col('calculated_host_listings_count_shared_rooms').cast(IntegerType())) \
          .withColumn('reviews_per_month',F.col('reviews_per_month').cast(FloatType()))


In [9]:
import boto3

secret_name = my_secret_name
region_name = my_region_name
access_key  = my_access_key
secret_key  = my_secret_key

session      = boto3.session.Session(aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region_name)
client       = session.client('secretsmanager')
secret_value = client.get_secret_value(SecretId=secret_name)

In [10]:
import json
def get_connection(secret_value):
    return json.loads(secret_value['SecretString'])

In [11]:
connection = get_connection(secret_value)

# Postgres credentials
jdbcHostname = connection['host']
jdbcPort     = connection['port']
jdbcDatabase = "postgres"
dialect      = "postgresql"
jdbcUsername = connection['username']
jdbcPassword = connection['password']

jdbcUrl = f"jdbc:{dialect}://{jdbcHostname}:{jdbcPort}/{jdbcDatabase}"
connectionProperties = {
  "user"     : jdbcUsername,
  "password" : jdbcPassword,
  "driver"   : "org.postgresql.Driver" 
}

In [12]:
# write out Spark DataFrames to PostgreSQL tables

mode = "append" # options are: error, append, overwrite

neighbourhoods_sdf.write.jdbc(jdbcUrl, "neighbourhoods", mode, connectionProperties)
listings_full_sdf.write.jdbc(jdbcUrl, "listings_full", mode, connectionProperties)
reviews_full_sdf.write.jdbc(jdbcUrl, "reviews_full", mode, connectionProperties)
calendar_sdf.write.jdbc(jdbcUrl, "calendar", mode, connectionProperties)