# Assignment 2 - Spark Dataframes
***Note***: All the dataset files were stored in the same folder as this notebook.

In [2]:
import os
import pyspark
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
conf = pyspark.SparkConf()
sc = pyspark.SparkContext(conf=conf)
spark = pyspark.sql.SparkSession(sc)
spark

# 6. 15 Points
**Dataset**: romeo-juliet-pg1777.txt

**Solve**: Do **word count** in pyspark.
Ignore punctuation, and normalize to lower case. Accept only the characters in this set: **[0-9a-zA-Z]**

## Discussion

For the purpose of this assignment, all characters not in this set: [a-z, A-Z, 0-9] were replaced with spaces or `" "` using the following `regex_string`:
```python
regex_string = r"[^a-zA-Z0-9]"
```
However, there are are issues with this and one such issue is discussed below:
### Hypenated and Apostrophe Words:
The regex used would mistreat characters with hyphens or apostrophes such as `Don't` or `Mother-in-law` into `["don", "t"]` and `["mother", "in", "law"]`. 
### Proposed Alternative Regex:
A more appropriate regex to include hyphenated and appsotrophe words might be given by the two options below. (source: https://stackoverflow.com/questions/27715581/):
```python
re.findall(r"(?!'.*')\b[\w'-]+\b", line.lower())
```
or
```python
re.findall(r"[A-Za-z0-9]+(?:[-'][A-Za-z0-9]+)*", line.lower())
```
However, this would also include possessive words such as `professor's` which might be undesired.

### Approach:
1. 

In [3]:
from pyspark.sql.functions import regexp_replace, lower, split, explode, desc
# read
RomeoJuliet = spark.read.text("romeo-juliet-pg1777.txt")
WordCount = RomeoJuliet

# replace words not in [0-9a-zA-Z] with " "
regex_string = r"[^a-zA-Z0-9]"
WordCount = WordCount.select(regexp_replace("value", regex_string, " ").alias("LINE"))

# lower case
WordCount = WordCount.select(lower("LINE").alias("line"))

# split words in line
WordCount = WordCount.select(split("line", " ").alias("words_in_line"))

# explode words
WordCount = WordCount.select(explode("words_in_line").alias("word"))

# group by word and count
WordCount = WordCount.groupBy("word").count()

# Display
WordCount.show()

+----------+-----+
|      word|count|
+----------+-----+
|     those|   17|
|  carnegie|   10|
|      some|   58|
|      chor|    2|
|       art|   55|
|     still|   15|
|   nourish|    1|
|     cures|    1|
| solemnity|    3|
|     feign|    1|
|    imagin|    1|
|consortest|    1|
|   pitcher|    1|
|      earl|    1|
|      hope|    4|
|    shroud|    3|
|    unfirm|    1|
|   embrace|    1|
|     often|    4|
|  received|    3|
+----------+-----+
only showing top 20 rows



In [None]:
from pyspark.sql.functions import split, udf, col
from pyspark.sql.types import BooleanType
from haversine import haversine, Unit

Restaurants = spark.read\
.option("inferSchema", True)\
.option("delimiter", ';')\
.option("header", True)\
.option("multiline", True)\
.csv("Restaurants_in_Durham_County_NC.csv")

RestaurantsQ7 = Restaurants.filter((col("status") == "ACTIVE") & (col("rpt_area_desc") == "Food Service"))
# Split geolocation - cast lat and lng to double
RestaurantsQ7 = RestaurantsQ7.select("Premise_Name", (split("geolocation", ", "))[0].cast("double").alias("lat"), (split("geolocation", ", "))[1].cast("double").alias("lng")).dropna()

Foreclosure = spark.read.json("durham-nc-foreclosure-2006-2016.json").select("fields.geocode").dropna()

joinExpression = udf(lambda lat, lng, geocode: haversine((lat, lng), geocode, unit='mi') <= 1, BooleanType())
q7 = RestaurantsQ7.join(Foreclosure, joinExpression(RestaurantsQ7.lat, RestaurantsQ7.lng, Foreclosure.geocode))

In [9]:
from pyspark.sql.functions import split, col
from pyspark.sql.types import BooleanType
from haversine import haversine, Unit

Restaurants = spark.read \
    .option("inferSchema", True) \
    .option("delimiter", ';') \
    .option("header", True) \
    .option("multiline", True) \
    .csv("Restaurants_in_Durham_County_NC.csv")
RestaurantsQ7 = Restaurants.filter((col("status") == "ACTIVE") & (col("rpt_area_desc") == "Food Service"))
RestaurantsQ7 = RestaurantsQ7.select("Premise_Name",split("geolocation", ", ")[0].cast("double").alias("lat"),split("geolocation", ", ")[1].cast("double").alias("lng")).dropna()                    .dropna()

Foreclosure = spark.read.json("durham-nc-foreclosure-2006-2016.json").select("fields.geocode").dropna()

def distance(lat, lng, geocode):
    return haversine((lat, lng), geocode, unit='mi') <= 1

distance_udf = udf(distance, BooleanType())
q7 = RestaurantsQ7.join(Foreclosure, distance_udf(RestaurantsQ7["lat"], RestaurantsQ7["lng"], Foreclosure["geocode"]))

In [10]:
q7.count()

63593

In [11]:
q7.groupBy("Premise_Name").count().sort("Premise_Name").show()

+--------------------+-----+
|        Premise_Name|count|
+--------------------+-----+
|(G&J) NOT JUST A ...|   81|
|            80 FRESH|   81|
|     A & D BUFFALO'S|   40|
|  ACADEMY QUICK STOP|    9|
|AI FUJI JAPANESE ...|   16|
|              AKASHI|    2|
|AL-TAIBA HALAL MA...|   53|
|ALAKSHA'S CUSTOM ...|   81|
|            ALIVIA'S|  115|
|ALOFT DURHAM DOWN...|  247|
|              ALPACA|   73|
|        AMANTE PIZZA|   98|
|   AMC SOUTHPOINT 17|   10|
|       AMERICAN HERO|   75|
|AMERICAN LEGION P...|  212|
|   AMERICAN MELTDOWN|   10|
|AMERICAN TOBACCO ...|  229|
|    AMF DURHAM LANES|    5|
|ANOTHER BROKEN EG...|    9|
|AR- RAZAQ ISLAMIC...|   59|
+--------------------+-----+
only showing top 20 rows

