### 1. Build Connection 

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import seaborn as sns

import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
spark = SparkSession.builder.appName('HouseSales').getOrCreate()

#change configuration settings on Spark 
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '4g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','4g')])

#print spark configuration settings
spark.sparkContext.getConf().getAll()

[('spark.eventLog.enabled', 'true'),
 ('spark.yarn.jars',
  'local:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/jars/*,local:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/hive/*'),
 ('spark.yarn.appMasterEnv.MKL_NUM_THREADS', '1'),
 ('spark.sql.queryExecutionListeners',
  'com.cloudera.spark.lineage.NavigatorQueryListener'),
 ('spark.lineage.log.dir', '/var/log/spark/lineage'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS',
  'md01.rcc.local,md02.rcc.local'),
 ('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'),
 ('spark.executorEnv.PYTHONPATH',
  '/opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.10.7-src.zip:/opt/cloudera/parcels/CDH/lib/spark/python/lib/pyspark.zip<CPS>/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/python/lib/py4j-0.10.7-src.zip<CPS>/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/python/lib/pyspark.zip'),
 ('spark.yarn.historyServer.addre

### 2 Preprocess Data

In [248]:
import pandas as pd

In [249]:
dat = spark.read.csv('/user/efischbein/data/group_project/us_housing_prices', header = True)

In [250]:
hospital_ratings= spark.read.csv('/user/efischbein/data/group_project/hospital/Hospital_General_Information.csv', header = True)

In [251]:
hospital = spark.read.csv('/user/efischbein/data/group_project/hospital/hospitals.csv', header = True)

In [252]:
crime = spark.read.csv('/user/efischbein/data/group_project/crime_data_w_population_and_crime_rate.csv',header = True)

In [253]:
CountyCrossWalk_Zillow = spark.read.csv('/user/efischbein/data/group_project/zillow_econ/CountyCrossWalk_Zillow.csv',header = True)

#### Hospital Preprocessing

In [254]:
hospital_ratings = hospital_ratings.select('Hospital Name', 'Hospital overall rating')

In [255]:

hospital_ratings.dropna()


DataFrame[Hospital Name: string, Hospital overall rating: string]

In [256]:
hospital_ratings = hospital_ratings.where(hospital_ratings['Hospital overall rating'] != 'Not Available')

In [None]:
#fillna need the same data type, if it is string and fill integer 0 will be wrong.  have to fill '0'

In [263]:
#RENAME THE COLUMN FOR JOIN 
hospital = hospital.withColumn('Hospital Name',hospital.NAME)
       

In [264]:
hospitals_with_ratings =hospital.join(hospital_ratings,['Hospital Name'],'left').na.fill('0',subset = ['Hospital overall rating'])


In [265]:
hospital_final = hospitals_with_ratings.groupBy(['ZIP', 'CITY','STATE','COUNTY','TYPE','Hospital overall rating','OWNER','STATUS']).count().orderBy('count',ascending = [0])

In [268]:
hos1 = hospital_final.groupBy('ZIP').agg(F.sum('count'),F.avg('Hospital overall rating')).orderBy('sum(count)', ascending = [0])

In [266]:
hos2 = hospital_final.groupBy('ZIP').pivot('TYPE').agg(F.sum('count').alias("CNT")
                                                    ,F.avg('Hospital overall rating').alias('RATE'))

hos2 = hos2.na.fill(0)
hos3 = hospital_final.groupBy('ZIP').pivot('OWNER').agg(F.sum('count').alias("CNT")
                                                    ,F.avg('Hospital overall rating').alias('RATE'))


hos3 = hos3.na.fill(0)

In [267]:
hos3 = hos3.drop('REHABILITATION_CNT','REHABILITATION_RATE')

In [269]:
#hospitals_with_ratings =hospital.join(hospital_ratings,['Hospital Name'],'left').na.fill('0',subset = ['Hospital overall rating'])
hos1 = hos1.join(hos2,['ZIP'], 'left')
hos1 = hos1.join(hos3,['ZIP'], 'left')


In [270]:
hos1  = hos1.withColumn('ZIP5', hos1.ZIP).drop('ZIP')

In [271]:
hos1 = hos1.drop('null','NOT AVAILABLE')

In [272]:
### Join dat

In [273]:
dat = dat.join(hos1, 'ZIP5','left').na.fill(0)

In [274]:
#change the data type 
hospitals_with_ratings = hospitals_with_ratings.withColumn('Hospital overall rating', hospitals_with_ratings['Hospital overall rating'].cast('int'))

Crime

In [275]:
state = crime.withColumn('ColCommasRemoved',F.split(crime.county_name,',')).select('county_name',F.rtrim(F.col('ColCommasRemoved')[1]))


In [276]:
state = state.withColumn('state',state['rtrim(ColCommasRemoved[1])']).drop('rtrim(ColCommasRemoved[1])')

In [277]:

crime = crime.join(state, 'county_name','left')

In [168]:
#Crime_rates["FIPS"] = Crime_rates["FIPS"].astype("int64")

In [278]:
CountyCrossWalk_Zillow = CountyCrossWalk_Zillow.withColumn('FIPS_ST',CountyCrossWalk_Zillow['StateFIPS'])\
                                                .withColumn('FIPS_CTY',CountyCrossWalk_Zillow['CountyFIPS'])

In [279]:
crime = crime.join(CountyCrossWalk_Zillow,['FIPS_ST','FIPS_CTY'],'left')


In [280]:

crime = crime.select(['CountyName','state','crime_rate_per_100000'
        ,'MURDER','RAPE','ROBBERY','AGASSLT','BURGLRY','LARCENY','MVTHEFT','ARSON','population'])
crime = crime.dropna()

In [281]:
crime = crime.groupBy('state').agg(F.count('MURDER'),F.count('RAPE'),F.count('ROBBERY')
                            ,F.count('AGASSLT'),F.count('BURGLRY'),F.count('MVTHEFT'),F.count('ARSON')
                            ,F.sum('population')
                            ,F.avg('crime_rate_per_100000'))

In [282]:
print(crime.count(), len(crime.columns))

28 10


In [284]:
dat = dat.join(crime,'state','left').na.fill(0)