# CS-GY 6513 Homework 2

## Preliminary

### Dependency Imports

In [52]:
import pyspark.sql.functions as F

from pyspark.sql import SparkSession, Window
from pyspark.ml.feature import RegexTokenizer, NGram
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, ArrayType, FloatType, TimestampType

### Constants

In [53]:
BAKERY_CSV_FILE_PATH = "./Bakery.csv"
BAKERY_CSV_SCHEMA = StructType(
    [
        StructField("Date", DateType(), False),
        StructField("Time", StringType(), False),
        StructField("Transaction", IntegerType(), False),
        StructField("Item", StringType(), False),
    ]
)

RESTAURANT_IN_NC_JSON_FILE_PATH = "./Restaurants_in_Durham_County_NC (2).json"
RESTAURANT_IN_NC_SCHEMA = StructType(
    [
        StructField("datasetid", StringType(), False),
        StructField("recordid", StringType(), False),
        StructField(
            "fields",
            StructType(
                [
                    StructField("status", StringType(), False),
                    StructField("geolocation", ArrayType(FloatType()), False),
                    StructField("premise_zip", StringType(), False),
                    StructField("rpt_area_desc", StringType(), False),
                    StructField("risk", IntegerType(), False),
                    StructField("est_group_desc", StringType(), False),
                    StructField("seats", IntegerType(), False),
                    StructField("water", StringType(), False),
                    StructField("premise_phone", StringType(), False),
                    StructField("premise_state", StringType(), False),
                    StructField("insp_freq", IntegerType(), False),
                    StructField("type_description", StringType(), False),
                    StructField("premise_city", StringType(), False),
                    StructField("premise_address2", StringType(), False),
                    StructField("opening_date", TimestampType(), False),
                    StructField("premise_name", StringType(), False),
                    StructField("transitional_type_desc", StringType(), False),
                    StructField("smoking_allowed", StringType(), False),
                    StructField("id", StringType(), False),
                    StructField("sewage", StringType(), False),
                    StructField("premise_address1", StringType(), False),
                ]
            ),
            False,
        ),
        StructField(
            "geometry",
            StructType(
                [
                    StructField("type", StringType(), False),
                    StructField("coordinates", ArrayType(FloatType()), False),
                ]
            ),
            False,
        ),
        StructField("record_timestamp", TimestampType(), False),
    ]
)

POPULATION_BY_COUNTRY_CSV_FILE_PATH = "./populationbycountry19802010millions (1).csv"
POPULATION_BY_COUNTRY_SCHEMA = StructType(
    [StructField("Country", StringType(), False)]
    + [StructField(f"{year}", FloatType(), False) for year in range(1980, 2011)]
)

WORD_TEXT_FILE_PATH = "./hw1text/*.txt"

### Configs

In [54]:
spark = SparkSession.builder.appName("BigDataHomework2").master("local[*]").getOrCreate()

### Read Data

In [55]:
df_bakery = spark.read.csv(BAKERY_CSV_FILE_PATH, header=False, schema=BAKERY_CSV_SCHEMA)
df_bakery.show(5)

+----------+--------+-----------+-------------+
|      Date|    Time|Transaction|         Item|
+----------+--------+-----------+-------------+
|      NULL|    Time|       NULL|         Item|
|2016-10-30|09:58:11|          1|        Bread|
|2016-10-30|10:05:34|          2| Scandinavian|
|2016-10-30|10:05:34|          2| Scandinavian|
|2016-10-30|10:07:57|          3|Hot chocolate|
+----------+--------+-----------+-------------+
only showing top 5 rows



In [56]:
df_restaurant_in_nc = spark.read.json(RESTAURANT_IN_NC_JSON_FILE_PATH, schema=RESTAURANT_IN_NC_SCHEMA)
df_restaurant_in_nc.show(5)

+----------------+--------------------+--------------------+--------------------+-------------------+
|       datasetid|            recordid|              fields|            geometry|   record_timestamp|
+----------------+--------------------+--------------------+--------------------+-------------------+
|restaurants-data|1644654b953d1802c...|{ACTIVE, [35.9207...|{Point, [-78.9573...|2017-07-13 09:15:31|
|restaurants-data|93573dbf8c9e799d8...|{ACTIVE, [36.0467...|{Point, [-78.8895...|2017-07-13 09:15:31|
|restaurants-data|0d274200c7cef50d0...|{ACTIVE, [35.9182...|{Point, [-78.9593...|2017-07-13 09:15:31|
|restaurants-data|cf3e0b175a6ebad2a...|{ACTIVE, [36.0183...|{Point, [-78.9060...|2017-07-13 09:15:31|
|restaurants-data|e796570677f7c39cc...|{ACTIVE, [36.0556...|{Point, [-78.9135...|2017-07-13 09:15:31|
+----------------+--------------------+--------------------+--------------------+-------------------+
only showing top 5 rows



In [57]:
df_population_by_country = spark.read.csv(POPULATION_BY_COUNTRY_CSV_FILE_PATH)
# * remove the first row
row_count = df_population_by_country.count()
df_population_by_country = df_population_by_country.limit(row_count - 1).subtract(
    df_population_by_country.limit(1)
)

# * manually add column names
# * since the first column name is missing
idx = 0
for field in POPULATION_BY_COUNTRY_SCHEMA:
    df_population_by_country = df_population_by_country.withColumnRenamed(
        f"_c{idx}", field.name
    ).withColumn(field.name, F.col(field.name).cast(field.dataType))
    idx += 1
df_population_by_country.show(5)

+-------------+---------+---------+---------+---------+---------+--------+---------+---------+--------+---------+--------+---------+---------+---------+--------+---------+--------+---------+---------+---------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|      Country|     1980|     1981|     1982|     1983|     1984|    1985|     1986|     1987|    1988|     1989|    1990|     1991|     1992|     1993|    1994|     1995|    1996|     1997|     1998|     1999|    2000|     2001|     2002|     2003|     2004|     2005|     2006|     2007|     2008|     2009|     2010|
+-------------+---------+---------+---------+---------+---------+--------+---------+---------+--------+---------+--------+---------+---------+---------+--------+---------+--------+---------+---------+---------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|North America|320.27637|324.44693|328.6

In [58]:
df_text = spark.read.text(WORD_TEXT_FILE_PATH)

# * pre-process data
# * lower case the words and replace all characters not [0-9a-z] with spaces
df_text_cleaned = df_text.withColumn(
    "cleaned text", F.lower(F.regexp_replace(F.col("value"), "[^0-9a-z]+", " "))
)
# * split the text into tokens separated by "s+"
regex_tokenizer = RegexTokenizer(inputCol="cleaned text", outputCol="words", pattern="\\s+")
df_text_tokenized = regex_tokenizer.transform(df_text_cleaned)
df_text_tokenized = df_text_tokenized.filter(
    F.col("value").isNotNull() & F.col("cleaned text").isNotNull()
)
df_text_tokenized = df_text_tokenized.filter(
    (df_text_tokenized["value"] != "") & (df_text_tokenized["cleaned text"] != "")
)

df_text_tokenized.show(5)

+--------------------+--------------------+--------------------+
|               value|        cleaned text|               words|
+--------------------+--------------------+--------------------+
|@@31678741 <p> If...| 31678741 p f pro...|[31678741, p, f, ...|
|@@31680641 <p> In...| 31680641 p n the...|[31680641, p, n, ...|
|@@31680841 <p> Th...| 31680841 p housa...|[31680841, p, hou...|
|@@31682241 <h> Le...| 31682241 h esson...|[31682241, h, ess...|
|@@31683241 <h> OE...| 31683241 h warns...|[31683241, h, war...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



## Question 1

**Top 1 Selling Item on Monday from 7AM - 11AM**

In [59]:
target_weekday = "Monday"
target_time_range = [7, 11]
df_bakery_q1 = (
    (
        df_bakery.withColumn("Weekday", F.date_format(F.col("Date"), "EEEE"))
        .withColumn("Hour", F.hour(F.col("Time")))
        .filter(
            (F.col("Weekday") == target_weekday)
            & (F.col("Hour").between(target_time_range[0], target_time_range[1]))
        )
    )
    .groupBy("Hour", "Item")
    .count()
    .withColumnRenamed("count", "qty")
)
df_bakery_q1.show(1) # * only show the item with highest selling count

+----+------+---+
|Hour|  Item|qty|
+----+------+---+
|  10|Coffee|114|
+----+------+---+
only showing top 1 row



## Question 2

**Top 2 (by qty) Items Bought by Daypart and DayType**

In [60]:
time_col = F.col("Time")
hour_col = F.col("Hour")
date_col = F.col("Date")
day_of_week_col = F.col("DayofWeek")

df_bakery_q2 = (
    df_bakery.withColumn("Hour", F.hour(time_col))
    .withColumn(
        "Daypart",
        F.when((hour_col >= 6) & (hour_col <= 10), "Breakfast")
        .when((hour_col >= 11) & (hour_col <= 15), "Lunch")
        .otherwise("Dinner"),
    )
    .withColumn("DayofWeek", F.date_format(date_col, "EEEE"))
    .withColumn(
        "DayType",
        F.when((day_of_week_col == "Saturday") | (day_of_week_col == "Sunday"), "Weekend").otherwise(
            "Weekday"
        ),
    )
)  # * compute all necessary columns

df_bakery_q2 = df_bakery_q2.groupBy("Daypart", "DayType", "Item").count()

window = Window.partitionBy("Daypart", "DayType").orderBy(F.desc("count"))
df_bakery_q2 = (
    (
        df_bakery_q2.withColumn("ranking", F.row_number().over(window))
        .filter(F.col("ranking") <= 2)
        .groupBy("Daypart", "DayType")
        .agg(F.collect_list("Item").alias("Top_2_Items"))
    )
    .select("Daypart", "DayType", "Top_2_Items")
    .show(truncate=False)
)

+---------+-------+---------------+
|Daypart  |DayType|Top_2_Items    |
+---------+-------+---------------+
|Breakfast|Weekday|[Coffee, Bread]|
|Breakfast|Weekend|[Coffee, Bread]|
|Dinner   |Weekday|[Coffee, Bread]|
|Dinner   |Weekend|[Coffee, Bread]|
|Lunch    |Weekday|[Coffee, Bread]|
|Lunch    |Weekend|[Coffee, Bread]|
+---------+-------+---------------+



## Question 3

**Show the Number of Entities by “fields.rpt_area_desc”**

In [61]:
df_restaurant_in_nc_q3 = df_restaurant_in_nc.groupBy(F.col("fields.rpt_area_desc")).count()
df_restaurant_in_nc_q3.show(truncate=False)

+---------------------+-----+
|rpt_area_desc        |count|
+---------------------+-----+
|Bed&Breakfast Home   |4    |
|Summer Camps         |4    |
|Institutions         |30   |
|Local Confinement    |2    |
|Mobile Food          |147  |
|School Buildings     |89   |
|Summer Food          |242  |
|Swimming Pools       |420  |
|Day Care             |173  |
|Tattoo Establishments|36   |
|Residential Care     |154  |
|Bed&Breakfast Inn    |2    |
|Adult Day Care       |5    |
|Lodging              |62   |
|Food Service         |1093 |
+---------------------+-----+



## Question 4

**Biggest Percentage Increase from 1990 to 2000**

In [62]:
from pyspark.sql import functions as F

# * Select only the necessary columns for the calculation
df_population = df_population_by_country.filter(
    df_population_by_country["Country"] != "World"
).select("Country", "1990", "2000")

# * Remove rows where either 1990 or 2000 is null
df_population = df_population.filter(F.col("1990").isNotNull() & F.col("2000").isNotNull())


df_population = df_population.withColumn(
    "Percentage Change", ((F.col("2000") - F.col("1990")) / F.col("1990")) * 100
)
max_increase = df_population.orderBy(F.col("Percentage Change").desc()).limit(1)
max_decrease = df_population.orderBy(F.col("Percentage Change").asc()).limit(1)

print("Country with the biggest percentage increase:")
max_increase.select("Country", "Percentage Change").show()
print("Country with the biggest percentage decrease:")
max_decrease.select("Country", "Percentage Change").show()

Country with the biggest percentage increase:
+--------------------+-----------------+
|             Country|Percentage Change|
+--------------------+-----------------+
|United Arab Emirates|76.27926665641841|
+--------------------+-----------------+

Country with the biggest percentage decrease:
+----------+-----------------+
|   Country|Percentage Change|
+----------+-----------------+
|Montserrat|-63.1873277639145|
+----------+-----------------+



## Question 5

**Word Count**

In [63]:

# * explode the array of words into individual rows and select only the word column
df_words = df_text_tokenized.select(F.explode(F.col("words")).alias("word"))

# * show the result
df_words.show(5)
print(f"Total tokens: {df_words.count()}")

+--------+
|    word|
+--------+
|31678741|
|       p|
|       f|
|promised|
|      to|
+--------+
only showing top 5 rows





Total tokens: 2925445


                                                                                

In [64]:

# * group by words and count the occurrences
df_word_count = df_words.groupBy("word").count().orderBy(F.col("count").desc())

df_word_count.show(truncate=False)



+----+------+
|word|count |
+----+------+
|the |142965|
|to  |87873 |
|p   |78583 |
|of  |75074 |
|and |70933 |
|in  |52844 |
|a   |50187 |
|for |28369 |
|he  |27781 |
|is  |27646 |
|that|27443 |
|s   |25354 |
|on  |23636 |
|are |19529 |
|with|18699 |
|be  |17764 |
|as  |16110 |
|have|16083 |
|at  |15209 |
|said|14893 |
+----+------+
only showing top 20 rows



                                                                                

## Question 6

**10 Most Common Bigrams**

In [72]:

# * use ngram to get the bigrams
ngram = NGram(n=2, inputCol="words", outputCol="bigrams")
df_bigrams = ngram.transform(df_text_tokenized)

# * explode bigram token arrays into rows
df_bigrams_exploded = df_bigrams.select(F.explode(F.col("bigrams")).alias("bigram"))

# * group by bigram and count the occurrences 
df_bigrams_count = df_bigrams_exploded.groupBy("bigram").count().orderBy(F.desc("count"))

# * show the top-10 bigram
df_bigrams_count.show(10, truncate=False)



+-------+-----+
|bigram |count|
+-------+-----+
|of the |17217|
|in the |12045|
|p he   |10876|
|to the |8227 |
|n t    |5368 |
|for the|5328 |
|on the |4806 |
|to be  |4522 |
|will be|4171 |
|and the|3881 |
+-------+-----+
only showing top 10 rows



                                                                                