# Setting Up PySpark

In [1]:
import findspark
findspark.init()

import pyspark
import pandas

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName("HW2")\
        .getOrCreate()

spark


In [3]:
sc = spark.sparkContext
sc

In [4]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from geopy.distance import geodesic
import mpu
import string

# Problem 1

In [5]:
bakery_schema = StructType([
    StructField('date', DateType(), True),
    StructField('time', StringType(), True),
    StructField('transaction', IntegerType(), True),
    StructField('item', StringType(), True)
])

df1 = spark.read \
    .format("csv") \
    .option("header", "true") \
    .load("D:\PBDA\BreadBasket_DMS.csv", schema=bakery_schema)

df1.printSchema()
#df1.show()

root
 |-- date: date (nullable = true)
 |-- time: string (nullable = true)
 |-- transaction: integer (nullable = true)
 |-- item: string (nullable = true)



In [6]:
bakery= df1.select("date",(substring(df1['time'], 0, 2)).alias("hour"),"item")
#bakery.show()

In [7]:
baked = bakery.filter(df1.item != 'NONE')

In [8]:
baked.registerTempTable("bake")

In [9]:
q1=spark.sql("SELECT item, date, hour, count(*) as count "+
               "FROM bake "+
               "GROUP BY item, date, hour "+
               "ORDER BY item, date, hour")
q1.show()

+--------------------+----------+----+-----+
|                item|      date|hour|count|
+--------------------+----------+----+-----+
|          Adjustment|2016-11-09|  19|    1|
|Afternoon with th...|2017-01-06|  14|    1|
|Afternoon with th...|2017-01-07|  13|    1|
|Afternoon with th...|2017-01-08|  15|    1|
|Afternoon with th...|2017-01-11|  14|    2|
|Afternoon with th...|2017-01-14|  18|    3|
|Afternoon with th...|2017-01-16|  09|    1|
|Afternoon with th...|2017-01-20|  08|    1|
|Afternoon with th...|2017-01-21|  11|    1|
|Afternoon with th...|2017-01-21|  12|    1|
|Afternoon with th...|2017-01-21|  14|    1|
|Afternoon with th...|2017-01-22|  09|    1|
|Afternoon with th...|2017-01-22|  12|    1|
|Afternoon with th...|2017-02-03|  14|    1|
|Afternoon with th...|2017-02-10|  13|    1|
|Afternoon with th...|2017-02-11|  17|    1|
|Afternoon with th...|2017-02-12|  18|    2|
|Afternoon with th...|2017-02-14|  08|    1|
|Afternoon with th...|2017-02-14|  15|    1|
|Afternoon

# Problem 2

In [10]:
json_schema = StructType([
    StructField('datasetid', StringType(), True),
    StructField('recordid', StringType(), True),
    StructField('fields', MapType(StringType(), StringType()), True),
    StructField('geometry', StringType(), True),
    StructField('record_timestamp', StringType(), True),
    
])

jsonrec= spark.read.json('D:\PBDA\Restaurants_in_Durham_County_NC.json',schema=json_schema)

In [11]:
#jsonrec.show()

In [12]:
jsonrec.printSchema()

root
 |-- datasetid: string (nullable = true)
 |-- recordid: string (nullable = true)
 |-- fields: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- geometry: string (nullable = true)
 |-- record_timestamp: string (nullable = true)



In [13]:
q2=jsonrec.select((jsonrec.fields['rpt_area_desc']).alias('rpt_area'))
#q2.show()

In [14]:
q2.registerTempTable("newtab")

df2= spark.sql("SELECT rpt_area, count(rpt_area) as count "+"FROM newtab "+"GROUP BY rpt_area")
df2.show()

+--------------------+-----+
|            rpt_area|count|
+--------------------+-----+
|  Bed&Breakfast Home|    4|
|        Summer Camps|    4|
|        Institutions|   30|
|   Local Confinement|    2|
|         Mobile Food|  147|
|    School Buildings|   89|
|         Summer Food|  242|
|      Swimming Pools|  420|
|            Day Care|  173|
|Tattoo Establishm...|   36|
|    Residential Care|  154|
|   Bed&Breakfast Inn|    2|
|      Adult Day Care|    5|
|             Lodging|   62|
|        Food Service| 1093|
+--------------------+-----+



# Problem 3

In [15]:
popdata = spark.read \
    .format("csv") \
    .option("header", "true").option("nullValue","0").option("nanValue","0")  \
    .option("inferSchema","true") \
    .load("D:\PBDA\populationbycountry19802010millions.csv")


In [16]:
countries= popdata.select('_c0').rdd.map(lambda row: row._c0 )

#indexing the countries
countries = countries.zipWithIndex()
country= countries.map(lambda x: (x[1],x[0]))

#cols = popdata.columns

In [17]:
#Looping through the columns

for i in range(2,32):
    
    #Retrieving year over year percent for each year
    rd1= popdata.rdd\
    .map(lambda x: ((float(x[i])-float(x[i-1]))/float(x[i-1])*100)\
         if (x[i]!='NA'and x[i-1]!='NA'and x[i]!='--'and x[i-1]!='--') \
         else 0)
    
    #Obtaining max value of that year
    maxval =rd1.max()
    year = 1979+i
    
    #Indexing the previous rdd to get the position of max value and finding respective country from the countries list
    #Printing the countries
    print ("%i %s %f%%"% (year,country.lookup(rd1.zipWithIndex().lookup(maxval)[0])[0],maxval))
      
    

1981 Western Sahara 12.133183%
1982 Western Sahara 11.115105%
1983 French Guiana 14.285714%
1984 Qatar 10.964057%
1985 French Guiana 12.500000%
1986 Qatar 8.771733%
1987 French Guiana 11.111111%
1988 Cayman Islands 11.010421%
1989 United Arab Emirates 6.119858%
1990 Djibouti 12.824048%
1991 Jordan 11.273940%
1992 Kuwait 48.633439%
1993 Afghanistan 13.224595%
1994 Afghanistan 8.727662%
1995 Burundi 7.222489%
1996 Rwanda 19.614177%
1997 Falkland Islands (Islas Malvinas) 21.500000%
1998 Liberia 12.017450%
1999 Falkland Islands (Islas Malvinas) 7.692308%
2000 Montserrat 16.863905%
2001 Montserrat 7.341772%
2002 Montserrat 13.443396%
2003 Afghanistan 5.803892%
2004 Montserrat 10.467706%
2005 Liberia 4.797671%
2006 Jordan 7.088497%
2007 Jordan 6.764378%
2008 Montserrat 12.638581%
2009 Liberia 4.157111%
2010 Niger 3.737166%


# Problem 4

In [74]:
words =  spark.read.text('D:/PBDA/romeo-juliet-pg1777.txt')

x = words.rdd.map(lambda r: r[0])


In [76]:
y = sorted(x.map(lambda x: x.lower())\
       .map(lambda x: x.translate(str.maketrans('','',string.punctuation)))\
       .flatMap(lambda x: x.split(' ')).map(lambda a: (a,1))\
       .reduceByKey(lambda v1, v2: v1+v2).collect(),reverse=True)


In [77]:
for x in y:
    print("%s: %i"% (x[0],x[1])) 


zounds: 2
youthful: 3
youth: 6
yourself: 5
yours: 3
your: 117
youngest: 1
younger: 2
young: 24
youll: 4
you: 323
yonder: 6
yond: 5
yon: 1
yoke: 1
yielding: 1
yew: 2
yet: 47
yesternight: 1
yes: 2
yellow: 1
yeas: 1
years: 11
year: 5
yea: 4
ye: 11
yard: 1
xxxxx10xxxx: 1
x: 1
wrought: 2
wrongst: 1
wronged: 1
wrong: 2
written: 6
writing: 1
writes: 1
write: 3
writ: 7
wring: 1
wretchedness: 1
wretched: 3
wretch: 3
wrenching: 1
wreak: 1
wounds: 1
wounded: 2
wound: 3
wouldst: 4
would: 56
wot: 1
worthy: 1
worth: 1
worst: 1
worshippd: 1
worship: 2
worser: 2
worse: 3
worn: 4
wormwood: 2
worms: 2
worm: 2
worldwearied: 1
worlds: 2
world: 29
works: 14
work: 5
words: 8
word: 29
wooes: 1
wood: 2
woo: 4
wondrous: 1
wondring: 1
wonder: 2
won: 1
women: 4
womb: 4
womanish: 2
woman: 3
wolvishravening: 1
woful: 7
woes: 8
woeful: 1
woe: 14
wives: 1
wits: 5
withoutbook: 1
without: 13
within: 21
wither: 1
withdraw: 3
withal: 5
with: 268
wit: 12
wish: 7
wisely: 5
wise: 5
wisdom: 4
winter: 1
winning: 1
winking: 1

# Bonus Question

In [20]:
q3=jsonrec.select((jsonrec.fields['premise_name']).alias('name'),(jsonrec.fields['status']).alias('status'),(jsonrec.fields['rpt_area_desc']).alias('rpt_area'),(jsonrec.fields['geolocation']).alias('geo'))
q3 = q3.filter(q3.geo != 'NONE')
q3.show(5)
q3.count()

+--------------------+------+----------------+--------------------+
|                name|status|        rpt_area|                 geo|
+--------------------+------+----------------+--------------------+
|    WEST 94TH ST PUB|ACTIVE|    Food Service|[35.9207272,-78.9...|
|BROOKDALE DURHAM IFS|ACTIVE|    Food Service|[36.0467802,-78.8...|
|       SMOOTHIE KING|ACTIVE|    Food Service|[35.9182655,-78.9...|
|HAMPTON INN & SUITES|ACTIVE|    Food Service|[36.0183378,-78.9...|
|BETTER LIVING CON...|ACTIVE|Residential Care|[36.0556347,-78.9...|
+--------------------+------+----------------+--------------------+
only showing top 5 rows



2441

In [21]:
#Filter the data and convert latitude and longitude to float type

q3=(q3.filter(col('status')=='ACTIVE')).filter(col('rpt_area')=='Food Service')



split_col = pyspark.sql.functions.split(q3['geo'], ',')
q3 = q3.withColumn('lat', split_col.getItem(0))
q3 = q3.withColumn('long', split_col.getItem(1))
q3 = q3.withColumn('lat',expr("substring(lat, 2, length(lat))"))
q3 = q3.withColumn('long',expr("substring(long, 2, length(long)-2)"))

q4=q3.select('name',q3.lat.cast('float').alias('lat'),q3.long.cast('float').alias('long'))


q4.show(3)

#q3.count()

+--------------------+---------+--------+
|                name|      lat|    long|
+--------------------+---------+--------+
|    WEST 94TH ST PUB|35.920727|78.95733|
|BROOKDALE DURHAM IFS| 36.04678|78.88955|
|       SMOOTHIE KING|35.918266|78.95933|
+--------------------+---------+--------+
only showing top 3 rows



In [22]:
jsonrec2= spark.read.json('D:\PBDA\durham-nc-foreclosure-2006-2016.json',schema=json_schema)

#Filter the data and convert latitude and longitude to float type

df6=jsonrec2.select((jsonrec2.fields['parcel_number']).alias('pno'),(jsonrec2.fields['geocode']).alias('geo2'))
df6 = df6.filter(df6.geo2 != 'None')
split_col = pyspark.sql.functions.split(df6['geo2'], ',')
df6 = df6.withColumn('lat', split_col.getItem(0))
df6 = df6.withColumn('long', split_col.getItem(1))
df6 = df6.withColumn('lat',expr("substring(lat, 2, length(lat))"))
df6 = df6.withColumn('long',expr("substring(long, 2, length(long)-2)"))

df6=df6.select('pno',df6.lat.cast('float').alias('dlat'),df6.long.cast('float').alias('dlong'))




df6.show(3)

df6.count()

+------+---------+---------+
|   pno|     dlat|    dlong|
+------+---------+---------+
|110138|36.001377| 78.89226|
|110535|35.995796| 78.89539|
|110536|35.995415|78.895035|
+------+---------+---------+
only showing top 3 rows



1942

In [23]:
q4.registerTempTable('restaurants')
df6.registerTempTable('locations')

In [24]:
#Merging both the tables

df7=spark.sql("SELECT * "+
              "FROM restaurants cross join locations L ")
            
df7.show(3)
df7.count()

+----------------+---------+--------+------+---------+---------+
|            name|      lat|    long|   pno|     dlat|    dlong|
+----------------+---------+--------+------+---------+---------+
|WEST 94TH ST PUB|35.920727|78.95733|110138|36.001377| 78.89226|
|WEST 94TH ST PUB|35.920727|78.95733|110535|35.995796| 78.89539|
|WEST 94TH ST PUB|35.920727|78.95733|110536|35.995415|78.895035|
+----------------+---------+--------+------+---------+---------+
only showing top 3 rows



2114838

In [25]:
#Using mpu package to calculate Haversine distance that returns the distance between two points on the earth's surface in kms
rd4 = df7.rdd.map(lambda x: mpu.haversine_distance((x[1], x[2]), (x[4], x[5])))

df = spark.createDataFrame(rd4,DoubleType())
df.show(3)
#df.count()


+------------------+
|             value|
+------------------+
|10.710884459766776|
|10.037624285919843|
|10.020408226154624|
+------------------+
only showing top 3 rows



In [26]:
#Merging the calculated distance dataframe with the previous dataframe

df = df.withColumn("id", monotonically_increasing_id())

df7 = df7.withColumn("id", monotonically_increasing_id())

newdf = df7.join(df, "id", "outer").drop("id")

In [27]:
newdf.registerTempTable('distance')

In [28]:
# 1 minute radius at Durham is approximately 1.85km 
# Therefore any foreclosure within 1.85 kms is assumed to be within 1 minute radius of the restaurant

newtab = spark.sql("SELECT name as Restaurant_Name, count(*) as No_Of_Foreclosures "+
                   "FROM distance "+
                   "WHERE value < 1.85 "+
                   "GROUP BY name")

In [29]:
newtab.count()

975

In [30]:
newtab.show(10)

+--------------------+------------------+
|     Restaurant_Name|No_Of_Foreclosures|
+--------------------+------------------+
|  COMPARE FOODS DELI|               128|
|W G PEARSON SCHOO...|               241|
|     DPAC  3RD FLOOR|               358|
|HARRIS TEETER 172...|                11|
|GSK COMMERCIAL OP...|                 1|
| BLU SEAFOOD AND BAR|                37|
|            GRILL 46|                24|
|    MCDONALD'S 35265|                29|
|DUKE UNIVERSITY W...|                12|
|         JADE BUFFET|                41|
+--------------------+------------------+
only showing top 10 rows

