# DIMENSION TABLES

In [1]:
!pip install s3fs
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

from sqlalchemy import create_engine
import pandas as pd

from pyspark.sql.functions import unix_timestamp, from_unixtime
from pyspark.sql.functions import expr

from time import time
import configparser

import s3fs
import matplotlib.pyplot as plt



In [2]:

config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))
KEY=config.get('AWS','AWS_ACCESS_KEY_ID')
SECRET= config.get('AWS','AWS_SECRET_ACCESS_KEY')

DWH_DB= config.get("DWH","DB_NAME")
DWH_DB_USER= config.get("DWH","DB_USER")
DWH_DB_PASSWORD= config.get("DWH","DB_PASSWORD")
DWH_PORT = config.get("DWH","DB_PORT")

In [3]:
DWH_ENDPOINT="dwhcluster.cug0uxavqpyn.us-west-2.redshift.amazonaws.com"
DWH_ROLE_ARN="arn:aws:iam::451089474087:role/dwhRole"

In [4]:
%load_ext sql

In [5]:
import os 
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

postgresql://dwhuser:Passw0rd@dwhcluster.cug0uxavqpyn.us-west-2.redshift.amazonaws.com:5439/capstone


'Connected: dwhuser@capstone'

In [6]:
%%sql

CREATE SCHEMA IF NOT EXISTS zillow;

SET
search_path TO zillow;

DROP TABLE IF EXISTS heating_or_system_type;

DROP TABLE IF EXISTS property_land_use_type;

DROP TABLE IF EXISTS story_type;

DROP TABLE IF EXISTS air_conditioning_type;

DROP TABLE IF EXISTS architectural_style_type;

DROP TABLE IF EXISTS type_construction_type;

DROP TABLE IF EXISTS building_class_type;


CREATE TABLE heating_or_system_type (
	id	integer	not null,
	heating_or_system 	varchar(30)	not null
	)
diststyle all;
	
CREATE TABLE property_land_use_type (
	id	integer	not null,
	property_land_use 	varchar(100)	not null
	)
diststyle all;
	
CREATE TABLE story_type (
	id	integer	not null,
	story 	varchar(100)	not null
	)
diststyle all;
	
CREATE TABLE air_conditioning_type (
	id	integer	not null,
	air_conditioning 	varchar(30)	not null
	)
diststyle all;
	
CREATE TABLE architectural_style_type (
	id	integer	not null,
	architectural_style 	varchar(30)	not null
	)
diststyle all;
	
CREATE TABLE type_construction_type (
	id	integer	not null,
	type_construction 	varchar(30)	not null
	)
diststyle all;
	

CREATE TABLE building_class_type (
	id	integer	not null,
	building_class 	varchar(300)	not null
	)
diststyle all;

 * postgresql://dwhuser:***@dwhcluster.cug0uxavqpyn.us-west-2.redshift.amazonaws.com:5439/capstone
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.


[]

In [8]:
input_data = "s3://capstone-rawdata/"
# read dimension data
dimension_data = os.path.join(input_data, "zillow_data_dimension_dictionary.xlsx")
dimension_data


's3://capstone-rawdata/zillow_data_dimension_dictionary.xlsx'

In [9]:
import pandas as pd
import s3fs
xl = pd.ExcelFile(dimension_data)
print(xl.sheet_names)

['HeatingOrSystemType', 'PropertyLandUseType', 'StoryType', 'AirConditioningType', 'ArchitecturalStyleType', 'TypeConstructionType', 'BuildingClassType']


In [10]:
xl = pd.ExcelFile(dimension_data)
#print(xl.sheet_names)

In [11]:
d = {}
conn = create_engine(conn_string)
tables = ['heating_or_system_type','property_land_use_type','story_type','air_conditioning_type','architectural_style_type','type_construction_type','building_class_type']
for i, sheet in enumerate(xl.sheet_names):
    d[f'{sheet}']= pd.read_excel(xl,sheet_name=sheet)
    df = d[f'{sheet}']
    print(sheet)
    print(tables[i])
    df.to_sql(tables[i], conn, index=False, if_exists='append', schema='zillow')

HeatingOrSystemType
heating_or_system_type
PropertyLandUseType
property_land_use_type
StoryType
story_type
AirConditioningType
air_conditioning_type
ArchitecturalStyleType
architectural_style_type
TypeConstructionType
type_construction_type
BuildingClassType
building_class_type


# TRANSACTION TABLE

In [None]:
#spark.stop()

In [11]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [12]:
jars = [
    "/home/workspace/RedshiftJDBC42-no-awssdk-1.2.51.1078.jar"
]

In [13]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .config("spark.driver.extraClassPath", ":".join(jars)) \
    .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.awsAccessKeyId", os.environ['AWS_ACCESS_KEY_ID']) \
    .config("spark.hadoop.fs.s3a.awsSecretAccessKey", os.environ['AWS_SECRET_ACCESS_KEY']) \
    .config("spark.hadoop.fs.s3a.path.style.access", True)\
    .config("com.amazonaws.services.s3.enableV4", True)\
    .config("spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")\
    .config("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")\
    .getOrCreate()

In [None]:
spark.sparkContext.getConf().getAll()

In [15]:
test = spark.read.format("jdbc").option("url","jdbc:redshift://dwhcluster.cug0uxavqpyn.us-west-2.redshift.amazonaws.com:5439/capstone").option("driver","com.amazon.redshift.jdbc42.Driver").option("dbtable","zillow.air_conditioning_type").option("user", DWH_DB_USER).option("password", DWH_DB_PASSWORD).load()
test.head()

Row(id=1, air_conditioning='Central')

## Read from S3, write back to S3

In [126]:
import boto3
s3 = boto3.resource('s3')

## Bucket to use
bucket = s3.Bucket('capstone-rawdata')
print(bucket)
json = []
print(type(json))
csv = []
## List objects within a given prefix
for obj in bucket.objects.filter(Delimiter='/'):
    if 'json' in obj.key:
        json.append(obj.key)
    if 'sample1.csv' in obj.key:
        csv.append(obj.key)

s3.Bucket(name='capstone-rawdata')
<class 'list'>


In [127]:
json

['transactions_2016.json', 'transactions_2017.json']

In [128]:
csv

['2sample1.csv', 'sample1.csv']

In [141]:

transactions_data = os.path.join(input_data, '_*.json')
print(transactions_data)
# read transaction data:
tdf = spark.read.option("multiline","true").json(transactions_data)
# clean data
tdf = tdf.withColumn("transaction_date", expr("TO_DATE(CAST(UNIX_TIMESTAMP(transactiondate, 'yyyy-MM-dd') AS TIMESTAMP))"))

tdf = tdf.select(col("parcelid").alias("parcel_id"),col("logerror").alias("log_error"), col("transaction_date"))
tdf.head()
# save to S3
tdf.write.format('parquet').bucketBy(10, "parcel_id")\
.sortBy("parcel_id").option("path",os.path.join(output_data, 'transactions'))\
.saveAsTable('zillowwwtransactions')

s3a://capstone-rawdata/transactions_*.json


In [7]:
input_data = "s3a://capstone-rawdata/"
transactions_data = os.path.join(input_data, "transactions_2016.json")
transactions_data


's3a://capstone-rawdata/transactions_2016.json'

Row(parcel_id=11016594, log_error=0.0276, transaction_date=datetime.date(2016, 1, 1))

In [19]:
tdf.head(5)

[Row(parcel_id=11016594, log_error=0.0276, transaction_date=datetime.date(2016, 1, 1)),
 Row(parcel_id=14366692, log_error=-0.1684, transaction_date=datetime.date(2016, 1, 1)),
 Row(parcel_id=12098116, log_error=-0.004, transaction_date=datetime.date(2016, 1, 1)),
 Row(parcel_id=12643413, log_error=0.0218, transaction_date=datetime.date(2016, 1, 2)),
 Row(parcel_id=14432541, log_error=-0.005, transaction_date=datetime.date(2016, 1, 2))]

In [28]:
output_data = "s3a://zillowanalytics"

## Copy from S3 to Redshift

In [22]:
%%sql

CREATE SCHEMA IF NOT EXISTS zillow;

SET search_path TO zillow;
DROP TABLE IF EXISTS property_transactions;

CREATE TABLE property_transactions (
	parcel_id	bigint	not null,
	log_error 	float	not null,
	transaction_date	date	not null
	)
diststyle all;

 * postgresql://dwhuser:***@dwhcluster.cug0uxavqpyn.us-west-2.redshift.amazonaws.com:5439/capstone
Done.
Done.
Done.
Done.


[]

In [23]:
qry = """
    copy property_transactions 
    from 's3://zillowanalytics/transactions/'
    iam_role '{}'
    format as parquet;
""".format(DWH_ROLE_ARN)

%sql $qry

 * postgresql://dwhuser:***@dwhcluster.cug0uxavqpyn.us-west-2.redshift.amazonaws.com:5439/capstone
Done.


[]

# PROPERTY DETAILS TABLE

In [8]:
properties_data = os.path.join(input_data, "*sample1.csv")
properties_data

's3a://capstone-rawdata/*sample1.csv'

In [14]:
# read property data
pdf = spark.read.option("header","true").csv(properties_data)


In [40]:
pdf.head()

Row(parcelid=10754147, airconditioningtypeid=None, architecturalstyletypeid=None, bathroomcnt=0, bedroomcnt=0, buildingclasstypeid=None, calculatedfinishedsquarefeet=None, heatingorsystemtypeid=None, propertylandusetypeid=269, regionidcity=37688, regionidcounty=3101, regionidzip=96337, roomcnt=0, storytypeid=None, typeconstructiontypeid=None, yearbuilt=None, taxvaluedollarcnt=9)

In [16]:
drop_columns = ['basementsqft','buildingqualitytypeid','calculatedbathnbr','decktypeid','finishedfloor1squarefeet',
                'finishedsquarefeet12','finishedsquarefeet13','finishedsquarefeet15','finishedsquarefeet50','finishedsquarefeet6',
                'fips','fireplacecnt','fullbathcnt','garagecarcnt','garagetotalsqft','hashottuborspa','latitude','longitude',
                'lotsizesquarefeet','poolcnt','poolsizesum','pooltypeid10','pooltypeid2','pooltypeid7','propertycountylandusecode',
                'propertyzoningdesc','rawcensustractandblock','regionidneighborhood','threequarterbathnbr', 'unitcnt','yardbuildingsqft17',
                'yardbuildingsqft26','numberofstories','fireplaceflag','structuretaxvaluedollarcnt','assessmentyear','landtaxvaluedollarcnt',
                'taxamount','taxdelinquencyflag','taxdelinquencyyear','censustractandblock']

In [17]:
from functools import reduce
from pyspark.sql import DataFrame

pdf = reduce(DataFrame.drop, drop_columns, pdf)

In [18]:
pdf.head()

Row(parcelid='10754147', airconditioningtypeid=None, architecturalstyletypeid=None, bathroomcnt='0', bedroomcnt='0', buildingclasstypeid=None, calculatedfinishedsquarefeet=None, heatingorsystemtypeid=None, propertylandusetypeid='269', regionidcity='37688', regionidcounty='3101', regionidzip='96337', roomcnt='0', storytypeid=None, typeconstructiontypeid=None, yearbuilt=None, taxvaluedollarcnt='9')

In [35]:
pd.set_option('max_colwidth', -1) # to prevent truncating of columns in jupyter

def count_column_types(spark_df):
    """Count number of columns per type"""
    return pd.DataFrame(spark_df.dtypes).groupby(1, as_index=False)[0].agg({'count':'count'}).rename(columns={1:"type"})

count_column_types(pdf)

Unnamed: 0,type,count
0,int,17


In [20]:
int_columns = ['parcelid','airconditioningtypeid','architecturalstyletypeid','bathroomcnt','bedroomcnt',
                 'buildingclasstypeid','calculatedfinishedsquarefeet','heatingorsystemtypeid','propertylandusetypeid',
                'regionidcity','regionidcounty','regionidzip','roomcnt','storytypeid','typeconstructiontypeid','yearbuilt','taxvaluedollarcnt']

#float_columns = ['calculatedfinishedsquarefeet','tax_value_dollar_count']

In [21]:
from pyspark.sql.functions import col
pdf = pdf.select(*(col(c).cast("int").alias(c) for c in pdf.columns))

In [22]:
#check shape
print((pdf.count(), len(pdf.columns)))

(34, 17)


In [23]:
#drop columns with null parcelid
pdf = pdf.na.drop(subset=["parcelid"])

In [51]:
count_column_types(pdf)['count'][0] == 17

True

In [52]:
count_column_types(pdf)['type'][0] == 'int'

True

In [None]:
count_column_types(pdf)

In [33]:
null_count = pdf.filter(pdf.parcelid.isNull()).count()

In [34]:
null_count

0

In [41]:
def count_null_value(df):
    """Count number of null value"""
    return df.filter(df.parcelid.isNull()).count()

In [42]:
count_null_value(pdf)

0

In [53]:
import sys
# data quality check: 
if count_column_types(pdf)['count'][0] != 17:
    sys.exit("Did not have 17 columns as defined")
elif count_column_types(pdf)['type'][0] != 'int':
    sys.exit("Did not have int type")
elif count_null_value(pdf) != 0:
    sys.exit("Parcelid contains null value")
else:
    # save clean result back to S3
    pdf.write.format('parquet').bucketBy(10, "parcelid")\
    .sortBy("parcelid").option("path",os.path\
    .join(output_data, 'propdetails')).saveAsTable('properties')


NameError: name 'output_data' is not defined

In [88]:
pdf.write.format('parquet').bucketBy(10, "parcelid").sortBy("parcelid").option("path",os.path.join(output_data, 'propdetails')).saveAsTable('ppproperties')

In [89]:
%%sql

CREATE SCHEMA IF NOT EXISTS zillow;

SET search_path TO zillow;
DROP TABLE IF EXISTS property_details;

CREATE TABLE property_details (
	parcel_id	bigint not null,
	air_conditioning_type_id	int,
	architectural_style_type_id 	int,
	bathroom_count	int,
	bedroom_count	int,
	building_class_type_id	int,
	calculated_finished_square_feet int,
	heating_or_system_type_id int,
	property_land_use_type_id int,
	region_id_city int,
	region_id_county int,
	region_id_zip int,
	room_count int,
	story_type_id int, 
	type_construction_type_id int,
	year_built int,
	tax_value_dollar_count bigint
	)
diststyle even;

 * postgresql://dwhuser:***@dwhcluster.cug0uxavqpyn.us-west-2.redshift.amazonaws.com:5439/capstone
Done.
Done.
Done.
Done.


[]

In [90]:
qry = """
    copy property_details 
    from 's3://zillowanalytics/propdetails/'
    iam_role '{}'
    format as parquet;
""".format(DWH_ROLE_ARN)

%sql $qry

 * postgresql://dwhuser:***@dwhcluster.cug0uxavqpyn.us-west-2.redshift.amazonaws.com:5439/capstone
Done.


[]

# ANALYTICS TABLE

## Read data from Redshift

In [19]:
trans = spark.read.format("jdbc").option("url","jdbc:redshift://dwhcluster.cug0uxavqpyn.us-west-2.redshift.amazonaws.com:5439/capstone").option("driver","com.amazon.redshift.jdbc42.Driver").option("dbtable","property_transactions").option("user", DWH_DB_USER).option("password", DWH_DB_PASSWORD).load()
trans.head()

Row(parcel_id=10711738, log_error=0.0276, transaction_date=datetime.date(2016, 8, 2))

In [20]:
details = spark.read.format("jdbc").option("url","jdbc:redshift://dwhcluster.cug0uxavqpyn.us-west-2.redshift.amazonaws.com:5439/capstone").option("driver","com.amazon.redshift.jdbc42.Driver").option("dbtable","property_details").option("user", DWH_DB_USER).option("password", DWH_DB_PASSWORD).load()
details.head()

Row(parcel_id=10879947, air_conditioning_type_id=None, architectural_style_type_id=None, bathroom_count=0, bedroom_count=0, building_class_type_id=4, calculated_finished_square_feet=1776, heating_or_system_type_id=None, property_land_use_type_id=31, region_id_city=12447, region_id_county=3101, region_id_zip=96450, room_count=0, story_type_id=None, type_construction_type_id=None, year_built=1947, tax_value_dollar_count=433491)

In [24]:
details = spark.read.format("jdbc").option("url","jdbc:redshift://{}:{}/{}".format(DWH_ENDPOINT,config.get("DWH","DB_PORT"),config.get("DWH","DB_NAME"))).option("driver","com.amazon.redshift.jdbc42.Driver").option("dbtable","property_details").option("user", config.get("DWH","DB_USER")).option("password", config.get("DWH","DB_PASSWORD")).load()


In [25]:
trans.createOrReplaceTempView("transactions")
details.createOrReplaceTempView("details")

analytics_table = spark.sql("""
        SELECT d.region_id_county,
        extract(year from t.transaction_date) as year,
        extract(month from t.transaction_date) as month,
        count(distinct t.parcel_id) as transaction_count,
        sum(power(2.71828,(ln(cast(d.tax_value_dollar_count as float))+t.log_error))) as transaction_value
        FROM transactions t 
        LEFT JOIN details d 
        ON t.parcel_id = d.parcel_id
        group by d.region_id_county,
        extract(year from t.transaction_date),
        extract(month from t.transaction_date)
    """)

# check if a transaction not in details table --> exclude


In [26]:
analytics_table.head(5)

[Row(region_id_county=None, year=2017, month=3, transaction_count=9327, transaction_value=None),
 Row(region_id_county=None, year=2017, month=8, transaction_count=9934, transaction_value=None),
 Row(region_id_county=None, year=2016, month=7, transaction_count=9947, transaction_value=None),
 Row(region_id_county=None, year=2016, month=11, transaction_count=1826, transaction_value=None),
 Row(region_id_county=None, year=2016, month=5, transaction_count=9961, transaction_value=None)]

In [31]:
def count_column_types(spark_df):
    """Count number of columns per type"""
    return pd.DataFrame(spark_df.dtypes).groupby(1, as_index=False)[0].agg({'count':'count', 'names': lambda x: " | ".join(set(x))}).rename(columns={1:"type"})

count_column_types(analytics_table)

Unnamed: 0,type,count,names
0,bigint,1,transaction_count
1,double,1,transaction_value
2,int,3,year | month | region_id_county


In [29]:
analytics_table.write.partitionBy("region_id_county").mode('overwrite').parquet(os.path.join(output_data, 'transactions_by_month'))

KeyboardInterrupt: 

In [11]:

%%sql

CREATE SCHEMA IF NOT EXISTS zillow;

SET search_path TO zillow;
DROP TABLE IF EXISTS transactions_by_month;

CREATE TABLE transactions_by_month (
	region_id_county	int,
	year	int	not null,
	month	int	not null,
	transaction_count	int	not null,
	transaction_value	float
	)
diststyle even;

 * postgresql://dwhuser:***@dwhcluster.cug0uxavqpyn.us-west-2.redshift.amazonaws.com:5439/capstone
Done.
Done.
Done.
Done.


[]