In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pandas

# Spark session & context
spark = SparkSession.builder.master('local[16]').getOrCreate()
sc = spark.sparkContext
s = SQLContext(sc)

In [3]:
def load_data(path):
    df = s.read.option("header",True).csv(path)
    return df

## Select required columns from individual ACS tables

In [4]:
b01 = load_data("cbg_b01.csv")
b01.createOrReplaceTempView('tb01')
b01data = s.sql("select census_block_group as geoid, \
B01001e2 as male, B01001e26 as female, (case when B01001e2 = 0 then 0 else round(B01001e26/B01001e2,2) end) as sex_ratio, B01002e1 as median_age, B01003e1 as pop \
from tb01;")

del b01
b01data.createOrReplaceTempView('b01t')
# b01data.toPandas().to_csv('b01data.csv')

In [5]:
b02 = load_data("cbg_b02.csv")
b02.createOrReplaceTempView("tb02")
b02data = s.sql("select census_block_group as geoid, \
B02001e2 as white, \
B02001e3 as black, \
B02001e5 as asian, \
B02011e1 as asian_with_others from tb02;")

del b02
b02data.createOrReplaceTempView('b02t')
# b02data.toPandas().to_csv('b02data.csv')

In [6]:
b03 = load_data("cbg_b03.csv")
b03.createOrReplaceTempView('tb03')
b03data = s.sql("select census_block_group as geoid, \
B03003e3 as hispanic \
from tb03")

del b03
b03data.createOrReplaceTempView('b03t')
# b03data.toPandas().to_csv('b03data.csv')

In [7]:
b19 = load_data("cbg_b19.csv")
b19.createOrReplaceTempView('tb19')
b19data = s.sql("select census_block_group as geoid, \
B19301e1 as income_per_capita \
from tb19")

del b19
b19data.createOrReplaceTempView('b19t')
# b19data.toPandas().to_csv('b19data.csv')

In [8]:
b25 = load_data("cbg_b25.csv")
b25.createOrReplaceTempView('tb25')
b25data = s.sql("select census_block_group as geoid, \
B25077e1 as median_value_owner_occupied_housing_unit \
from tb25")

del b25
b25data.createOrReplaceTempView('b25t')
# b19data.toPandas().to_csv('b19data.csv')

## Grocery dataset (cleaned in OpenRefine)

In [9]:
path = r"C:\Users\Pankaj\Documents\GitHub\CSE6242\Group_project\Individual_datasets\2017-Grocery-2017-Grocery-cleaned OpenRefine.csv"
grocery_data = load_data(path)
grocery_data.createOrReplaceTempView('grocery_data')
grocery_filtered = s.sql("select geoid_tract, popden_grocery, popden_specialtyFood from grocery_data")
grocery_filtered.createOrReplaceTempView('grocery')

## Crime dataset  (cleaned in OpenRefine)

In [10]:
path = r"C:\Users\Pankaj\Documents\GitHub\CSE6242\Group_project\Individual_datasets\crime-data-w-population-cleaned OpenRefine.csv"
crime_data = load_data(path)
crime_data.createOrReplaceTempView('crime_data')
crime_filtered = s.sql("select geoid_county, crime_rate_per_100000 from crime_data")
crime_filtered.createOrReplaceTempView('crime')

## Parks dataset  (cleaned in OpenRefine)

In [11]:
path = r"C:\Users\Pankaj\Documents\GitHub\CSE6242\Group_project\Individual_datasets\nanda-parks-tract-2018-01P-cleaned OpenRefine.csv"
parks_data = load_data(path)
parks_data.createOrReplaceTempView('parks_data')
parks_filtered = s.sql("select geoid_tract, count_open_parks from parks_data")
parks_filtered.createOrReplaceTempView('parks')

## State and County names

In [45]:
path = r"C:\Users\Pankaj\Documents\GitHub\CSE6242\Group_project\census_data_2019\data\cbg_fips_codes.csv"
names_data = load_data(path)
names_data.createOrReplaceTempView('names_data')

state_names = s.sql("select distinct state, state_fips from names_data")
state_names.createOrReplaceTempView('state_names')
state_names.toPandas().to_csv('state_names.csv')

county_names = s.sql("select state_fips, county_fips, state_fips||county_fips as geoid_till_county, county from names_data")
county_names.createOrReplaceTempView('county_names')
county_names.toPandas().to_csv('county_names.csv')

## Inner join the tables

In [46]:
main = s.sql("select b01t.geoid, \
substr(b01t.geoid, 1, 2) as state_id, \
state_names.state as state, \
substr(b01t.geoid, 3, 3) as county_id, \
county_names.county as county, \
substr(b01t.geoid, 6, 6) as tract_id, \
substr(b01t.geoid, 12, 1) as block_group_id, \
sex_ratio, median_age, pop, \
round(power(((power((white - ((white+black+asian+hispanic)/4)),2) \
+ power((white - ((white+black+asian+hispanic)/4)),2) \
+ power((white - ((white+black+asian+hispanic)/4)),2) \
+ power((white - ((white+black+asian+hispanic)/4)),2) )/4),-0.5),5) as racial_diversity, \
income_per_capita, \
median_value_owner_occupied_housing_unit, \
popden_grocery, popden_specialtyFood, \
round(crime_rate_per_100000,2) as crime_rate_per_100000, \
count_open_parks \
from b01t \
inner join b02t on b01t.geoid = b02t.geoid \
inner join b03t on b02t.geoid = b03t.geoid \
inner join b19t on b03t.geoid = b19t.geoid \
inner join b25t on b19t.geoid = b25t.geoid \
inner join grocery on substr(b01t.geoid, 1, 11) = grocery.geoid_tract \
inner join crime on substr(b01t.geoid, 1, 5) = crime.geoid_county \
inner join parks on substr(b01t.geoid, 1, 11) = parks.geoid_tract \
inner join state_names on substr(b01t.geoid, 1, 2) = state_names.state_fips \
inner join county_names on substr(b01t.geoid, 1, 5) = county_names.geoid_till_county \
")
main.createOrReplaceTempView('main')

## Visualize the final table

In [47]:
main.printSchema()
main.show(5)

root
 |-- geoid: string (nullable = true)
 |-- state_id: string (nullable = true)
 |-- state: string (nullable = true)
 |-- county_id: string (nullable = true)
 |-- county: string (nullable = true)
 |-- tract_id: string (nullable = true)
 |-- block_group_id: string (nullable = true)
 |-- sex_ratio: double (nullable = true)
 |-- median_age: string (nullable = true)
 |-- pop: string (nullable = true)
 |-- racial_diversity: double (nullable = true)
 |-- income_per_capita: string (nullable = true)
 |-- median_value_owner_occupied_housing_unit: string (nullable = true)
 |-- popden_grocery: string (nullable = true)
 |-- popden_specialtyFood: string (nullable = true)
 |-- crime_rate_per_100000: double (nullable = true)
 |-- count_open_parks: string (nullable = true)

+------------+--------+-----+---------+--------------+--------+--------------+---------+----------+----+----------------+-----------------+----------------------------------------+--------------+--------------------+-------------

## Write final table to csv

In [54]:
main.toPandas().to_csv('main_table_geoid.csv')