In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, lit, split
from configparser import ConfigParser
import psycopg2
from IPython.display import display
import pprint as pp

In [24]:
# create a spark session
spark = SparkSession \
    .builder \
    .master("local") \
    .appName("Wake County Restaurants") \
    .getOrCreate()

In [53]:
def buildWakeCountyDataframe():
    """
    add, remap and drop columns
    loads the Wake County Restaurant data into a dataframe
    and adds, remaps and drop columns
    """
    
    df = spark \
        .read \
        .format("csv") \
        .option("header", "true") \
        .load("../data/Restaurants_in_Wake_County_NC.csv")
        
    df = df.withColumn("county", lit("Wake")) \
        .withColumnRenamed("HSISID", "datasetId") \
        .withColumnRenamed("NAME", "name") \
        .withColumnRenamed("ADDRESS1", "address1") \
        .withColumnRenamed("ADDRESS2", "address2") \
        .withColumnRenamed("CITY", "city") \
        .withColumnRenamed("STATE", "state") \
        .withColumnRenamed("POSTALCODE", "zip") \
        .withColumnRenamed("PHONENUMBER", "tel") \
        .withColumnRenamed("RESTAURANTOPENDATE", "dateStart") \
        .withColumn("dateEnd", lit(None)) \
        .withColumnRenamed("FACILITYTYPE", "type") \
        .withColumnRenamed("X", "geoX") \
        .withColumnRenamed("Y", "geoY") \
        .drop("OBJECTID") \
        .drop("PERMITID") \
        .drop("GEOCODESTATUS")

    # create a unique id for each record
    delim = lit("_")
    df = df.withColumn(
        "id",
        concat(
            df["state"],
            delim,
            df["county"],
            delim,
            df["datasetId"],
        ),
    )
    
    return df

In [48]:
def buildDurhamCountyDataframe():
    """
    add, remap and drop columns
    loads the Durham County Restaurant data into a dataframe
    and adds, remaps and drop columns
    """
    
    df = spark \
        .read \
        .format("json") \
        .load("../data/Restaurants_in_Durham_County_NC.json")
    
    df = df.withColumn("county", lit("Durham")) \
        .withColumn("datasetId", df["fields.id"]) \
        .withColumn("name", df["fields.premise_name"]) \
        .withColumn("address1", df["fields.premise_address1"]) \
        .withColumn("address2", df["fields.premise_address2"]) \
        .withColumn("city", df["fields.premise_city"]) \
        .withColumn("state", df["fields.premise_state"]) \
        .withColumn("zip", df["fields.premise_zip"]) \
        .withColumn("tel", df["fields.premise_phone"]) \
        .withColumn("dateStart", df["fields.opening_date"]) \
        .withColumn("dateEnd", df["fields.closing_date"]) \
        .withColumn("type", split(df["fields.type_description"], " - ").getItem(1)) \
        .withColumn("geoX", df["fields.geolocation"].getItem(0)) \
        .withColumn("geoY", df["fields.geolocation"].getItem(1)) \
        .drop("geometry") \
        .drop("record_timestamp") \
        .drop("recordid") \
        .drop("fields")

    # create a unique id for each record
    delim = lit("_")
    df = df.withColumn("id", concat(df["state"], delim, df["county"], delim, df["datasetId"]))
    
    return df

In [49]:
wakeRestaurantsDF = buildWakeCountyDataframe()
durhamRestaurantsDF = buildDurhamCountyDataframe()

In [50]:
display(wakeRestaurantsDF.printSchema())
display(wakeRestaurantsDF.count())
display(wakeRestaurantsDF.show(5))
display(wakeRestaurantsDF.head())

root
 |-- datasetId: string (nullable = true)
 |-- name: string (nullable = true)
 |-- address1: string (nullable = true)
 |-- address2: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: string (nullable = true)
 |-- tel: string (nullable = true)
 |-- dateStart: string (nullable = true)
 |-- type: string (nullable = true)
 |-- geoX: string (nullable = true)
 |-- geoY: string (nullable = true)
 |-- county: string (nullable = false)
 |-- dateEnd: null (nullable = true)
 |-- id: string (nullable = true)



None

3440

+-----------+--------------------+--------------------+--------+-----------+-----+----------+--------------+--------------------+-----------------+------------+-----------+------+-------+-------------------+
|  datasetId|                name|            address1|address2|       city|state|       zip|           tel|           dateStart|             type|        geoX|       geoY|county|dateEnd|                 id|
+-----------+--------------------+--------------------+--------+-----------+-----+----------+--------------+--------------------+-----------------+------------+-----------+------+-------+-------------------+
|04092016024|                WABA|2502 1/2 HILLSBOR...|    null|    RALEIGH|   NC|     27607|(919) 833-1710|2011-10-18T00:00:...|       Restaurant|-78.66818477|35.78783803|  Wake|   null|NC_Wake_04092016024|
|04092021693|  WALMART DELI #2247|2010 KILDAIRE FAR...|    null|       CARY|   NC|     27518|(919) 852-6651|2011-11-08T00:00:...|       Food Stand|-78.78211173|35.73717

None

Row(datasetId='04092016024', name='WABA', address1='2502 1/2 HILLSBOROUGH  ST ', address2=None, city='RALEIGH', state='NC', zip='27607', tel='(919) 833-1710', dateStart='2011-10-18T00:00:00.000Z', type='Restaurant', geoX='-78.66818477', geoY='35.78783803', county='Wake', dateEnd=None, id='NC_Wake_04092016024')

In [51]:
display(durhamRestaurantsDF.printSchema())
display(durhamRestaurantsDF.count())
display(durhamRestaurantsDF.show(5))
display(durhamRestaurantsDF.head())

root
 |-- datasetId: string (nullable = true)
 |-- county: string (nullable = false)
 |-- name: string (nullable = true)
 |-- address1: string (nullable = true)
 |-- address2: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: string (nullable = true)
 |-- tel: string (nullable = true)
 |-- dateStart: string (nullable = true)
 |-- dateEnd: string (nullable = true)
 |-- type: string (nullable = true)
 |-- geoX: double (nullable = true)
 |-- geoY: double (nullable = true)
 |-- id: string (nullable = true)



None

2463

+---------+------+--------------------+--------------------+--------+------+-----+-----+--------------+----------+-------+--------------------+----------+-----------+---------------+
|datasetId|county|                name|            address1|address2|  city|state|  zip|           tel| dateStart|dateEnd|                type|      geoX|       geoY|             id|
+---------+------+--------------------+--------------------+--------+------+-----+-----+--------------+----------+-------+--------------------+----------+-----------+---------------+
|    56060|Durham|    WEST 94TH ST PUB| 4711 HOPE VALLEY RD|SUITE 6C|DURHAM|   NC|27707|(919) 403-0025|1994-09-01|   null|          Restaurant|35.9207272|-78.9573299|NC_Durham_56060|
|    58123|Durham|BROOKDALE DURHAM IFS|4434 BEN FRANKLIN...|    null|DURHAM|   NC|27704|(919) 479-9966|2003-10-15|   null|Institutional Foo...|36.0467802|-78.8895483|NC_Durham_58123|
|    70266|Durham|       SMOOTHIE KING|1125 W. NC HWY 54...|    null|DURHAM|   NC|277

None

Row(datasetId='56060', county='Durham', name='WEST 94TH ST PUB', address1='4711 HOPE VALLEY RD', address2='SUITE 6C', city='DURHAM', state='NC', zip='27707', tel='(919) 403-0025', dateStart='1994-09-01', dateEnd=None, type='Restaurant', geoX=35.9207272, geoY=-78.9573299, id='NC_Durham_56060')

In [4]:
# # create a new column
# df.withColumn()

# # rename an existing column
# df.withColumnRenamed()

# # get a column by name
# df.col()
# df.drop()
# df.lit()
# df.concat()

# Displays the top 5 rows. Accepts an optional int parameter - num. of rows to show
# df.head()

# Similar to head, but displays the last rows
# df.tail()

# The dimensions of the dataframe as a (rows, cols) tuple
# df.shape

# The number of columns. Equal to df.shape[0]
# len(df) 

# An array of the column names
# df.columns 

# Columns and their types
# df.dtypes

# Converts the frame to a two-dimensional table
# df.values 

# Displays descriptive stats for all columns
# df.describe()

In [63]:
def combineDataframes(df1, df2):
    df = df1.unionByName(df2)

    return df

In [65]:
df = combineDataframes(df1 = wakeRestaurantsDF, df2 = durhamRestaurantsDF)

df.show(5)
df.printSchema()
df.count()
df.rdd.getNumPartitions()

+-----------+--------------------+--------------------+--------+-----------+-----+----------+--------------+--------------------+-----------------+------------+-----------+------+-------+-------------------+
|  datasetId|                name|            address1|address2|       city|state|       zip|           tel|           dateStart|             type|        geoX|       geoY|county|dateEnd|                 id|
+-----------+--------------------+--------------------+--------+-----------+-----+----------+--------------+--------------------+-----------------+------------+-----------+------+-------+-------------------+
|04092016024|                WABA|2502 1/2 HILLSBOR...|    null|    RALEIGH|   NC|     27607|(919) 833-1710|2011-10-18T00:00:...|       Restaurant|-78.66818477|35.78783803|  Wake|   null|NC_Wake_04092016024|
|04092021693|  WALMART DELI #2247|2010 KILDAIRE FAR...|    null|       CARY|   NC|     27518|(919) 852-6651|2011-11-08T00:00:...|       Food Stand|-78.78211173|35.73717

2

In [60]:
def repartitionDataframes(dataframes = [], numPartitions = 1):
    for df in dataframes:
        # Returns the number of partitions in RDD
        df.rdd.getNumPartitions()
        # increase the level of parallelism in this RDD
        df = df.repartition(numPartitions)
        print(f"Partition Count: {df.rdd.getNumPartitions()}")

Partition Count: 4
Partition Count: 4
