# SparkSQL R Sample - USA Zip Codes (JSON)

In [1]:
Sys.getenv("SPARK_HOME")

In [2]:
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
  Sys.setenv(SPARK_HOME = "/Users/skalathur/MyApps/spark")
}

In [3]:
Sys.setenv(SPARK_LOCAL_IP="localhost")

In [4]:
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))


Attaching package: ‘SparkR’

The following objects are masked from ‘package:stats’:

    cov, filter, lag, na.omit, predict, sd, var, window

The following objects are masked from ‘package:base’:

    as.data.frame, colnames, colnames<-, drop, endsWith, intersect,
    rank, rbind, sample, startsWith, subset, summary, transform, union



In [5]:
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))

Spark package found in SPARK_HOME: /Users/skalathur/MyApps/spark


Launching java with spark-submit command /Users/skalathur/MyApps/spark/bin/spark-submit   --driver-memory "2g" sparkr-shell /var/folders/s3/hy6_p79n3w1fw802t6ps40qr0000gp/T//RtmpTrgpVP/backend_portdf5a58a4b339 


Java ref type org.apache.spark.sql.SparkSession id 1 

In [6]:
inputFile <- "/temp/datasets/usa_zipcodes.json"

In [7]:
usaZipCodes <- read.df(inputFile, source = "json", 
                         inferSchema='true')

usaZipCodes

SparkDataFrame[_id:string, city:string, loc:array<double>, pop:bigint, state:string]

In [8]:
printSchema(usaZipCodes)

root
 |-- _id: string (nullable = true)
 |-- city: string (nullable = true)
 |-- loc: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- pop: long (nullable = true)
 |-- state: string (nullable = true)


In [9]:
count(usaZipCodes)

In [10]:
head(usaZipCodes)

_id,city,loc,pop,state
1001,AGAWAM,"-72.62274, 42.07021",15338,MA
1002,CUSHMAN,"-72.51565, 42.37702",36963,MA
1005,BARRE,"-72.10835, 42.40970",4546,MA
1007,BELCHERTOWN,"-72.41095, 42.27510",10579,MA
1008,BLANDFORD,"-72.93611, 42.18295",1240,MA
1010,BRIMFIELD,"-72.18846, 42.11654",3706,MA


In [11]:
persist(usaZipCodes, "MEMORY_AND_DISK")

SparkDataFrame[_id:string, city:string, loc:array<double>, pop:bigint, state:string]

In [12]:
createOrReplaceTempView(usaZipCodes, "usaZipCodesTable")

In [13]:
# Keep only the zip codes with population > 100

query <- "SELECT * FROM usaZipCodesTable WHERE pop > 100"
query

In [14]:
usaZipCodes <- sql(query)
usaZipCodes

SparkDataFrame[_id:string, city:string, loc:array<double>, pop:bigint, state:string]

In [15]:
createOrReplaceTempView(usaZipCodes, "usaZipCodesTable")

In [16]:
query <- "SELECT max(pop) as MaxPop, min(pop) as MinPop from usaZipCodesTable"
query

In [17]:
maxAndMin <- sql(query)
maxAndMin

SparkDataFrame[MaxPop:bigint, MinPop:bigint]

In [18]:
localDf <- collect(maxAndMin)
localDf

MaxPop,MinPop
112047,101


## Number of zip codes in each state

In [19]:
query <- "SELECT state, count(*) as Count FROM usaZipCodesTable GROUP BY state"
query

In [20]:
zipCodesByState <- sql(query)
zipCodesByState

SparkDataFrame[state:string, Count:bigint]

In [21]:
count(zipCodesByState)

In [22]:
collect(zipCodesByState)

state,Count
SC,347
AZ,260
LA,457
MN,877
NJ,535
DC,22
OR,363
VA,802
RI,69
KY,791


In [23]:
query <- "SELECT state, count(*) as Count FROM usaZipCodesTable 
            GROUP BY state ORDER BY state"
query

In [24]:
collect(sql(query))

state,Count
AK,169
AL,564
AR,569
AZ,260
CA,1475
CO,397
CT,260
DC,22
DE,53
FL,820


## 10 Most populous zip codes

In [25]:
collect(sql("SELECT * FROM usaZipCodesTable ORDER BY pop DESC LIMIT 10"))

_id,city,loc,pop,state
60623,CHICAGO,"-87.71570, 41.84902",112047,IL
11226,BROOKLYN,"-73.95699, 40.64669",111396,NY
10021,NEW YORK,"-73.95880, 40.76848",106564,NY
10025,NEW YORK,"-73.96831, 40.79747",100027,NY
90201,BELL GARDENS,"-118.17205, 33.96918",99568,CA
60617,CHICAGO,"-87.55601, 41.72574",98612,IL
90011,LOS ANGELES,"-118.25819, 34.00786",96074,CA
60647,CHICAGO,"-87.70432, 41.92090",95971,IL
60628,CHICAGO,"-87.62428, 41.69344",94317,IL
90650,NORWALK,"-118.08177, 33.90564",94188,CA


## Most populous states

In [26]:
query <- "SELECT state, sum(pop) as TotalPop FROM usaZipCodesTable 
                GROUP BY state ORDER BY TotalPop DESC"
query

In [27]:
popByState <-  sql(query)
popByState

SparkDataFrame[state:string, TotalPop:bigint]

In [28]:
count(popByState)

In [29]:
collect(popByState)

state,TotalPop
CA,29758155
NY,17988283
TX,16984340
FL,12937753
PA,11880512
IL,11430349
OH,10847077
MI,9295060
NJ,7729991
NC,6628251


In [30]:
# Stop the SparkSession now
sparkR.session.stop()