# Spark initialization - spark template

In [60]:
import os
import pyspark

conf = pyspark.SparkConf()
conf = conf.setAppName("<my-app-name>")
conf.set('spark.ui.proxyBase', '/user/' + os.environ['JUPYTERHUB_USER'] + '/proxy/4040') ## to setup SPARK UI
conf = conf.set('spark.jars', os.environ['GRAPHFRAMES_PATH']) ## graphframes in spark configuration
sc = pyspark.SparkContext(conf=conf)
sc

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=<my-app-name>, master=local[*]) created by __init__ at /tmp/ipykernel_294/2910356201.py:8 

In [None]:
spark = pyspark.SQLContext(sc)
spark

### Open Spark UI
``` https://csgy1-6513-fall.rcnyu.org/user/<USER_NETID>/proxy/4040/jobs/  ```


## Bakery data

In [None]:
df_bakery = spark.read.option("inferSchema",True).option("header",True).csv("shared/data/Bakery.csv").repartition(10)



In [None]:
df_bakery.printSchema()

In [None]:
df_bakery.select("*").show()

# Question 1 Answer below 

In [None]:
df_bakery.createOrReplaceTempView("df_bakery_view")

In [None]:
from pyspark.sql.functions import dayofweek, hour
df_bakery_monday_7to11 = df_bakery.filter(
    (dayofweek('Date') == 2) & 
    (hour('Time').between(7, 11))
)


In [None]:
df_bakery_monday_7to11.createOrReplaceTempView("df_bakery_monday_7to11_view")

In [None]:
spark.sql("""
    WITH sales_rank AS (
    SELECT 
        Item,
        COUNT(*) as qty,
        'Monday' as weekday,
        DATE(Date) as sale_date,
        CONCAT(CAST(HOUR(Time) AS STRING), 'AM') as hour_period,
        ROW_NUMBER() OVER (
            PARTITION BY DATE(Date), HOUR(Time)  -- Partitioning by both date and hour
            ORDER BY COUNT(*) DESC
        ) as rn
    FROM df_bakery_view
    WHERE DAYOFWEEK(Date) = 2  -- Monday
    AND HOUR(Time) BETWEEN 7 AND 11
    GROUP BY Item, DATE(Date), HOUR(Time)
    )
    SELECT 
        Item,
        qty,
        weekday,
        sale_date as Date,
        hour_period as Hour_period
    FROM sales_rank
    WHERE rn = 1
    ORDER BY sale_date ASC, hour_period DESC;
""").show()

# Question 2 answer below 

In [None]:
spark.sql(
"""
WITH ranked_items AS (
    SELECT 
        Item,
        COUNT(*) as qty,
        CASE 
            WHEN HOUR(Time) BETWEEN 6 AND 10 THEN 'Breakfast'
            WHEN HOUR(Time) BETWEEN 11 AND 15 THEN 'Lunch'
            ELSE 'Dinner'
        END as Daypart,
        CASE 
            WHEN DAYOFWEEK(Date) IN (1, 7) THEN 'Weekend'
            ELSE 'Weekday'
        END as DayType,
        ROW_NUMBER() OVER (
            PARTITION BY 
                CASE 
                    WHEN HOUR(Time) BETWEEN 6 AND 10 THEN 'Breakfast'
                    WHEN HOUR(Time) BETWEEN 11 AND 15 THEN 'Lunch'
                    ELSE 'Dinner'
                END,
                CASE 
                    WHEN DAYOFWEEK(Date) IN (1, 7) THEN 'Weekend'
                    ELSE 'Weekday'
                END
            ORDER BY COUNT(*) DESC
        ) as rn
    FROM df_bakery_view
    GROUP BY 
        Item,
        CASE 
            WHEN HOUR(Time) BETWEEN 6 AND 10 THEN 'Breakfast'
            WHEN HOUR(Time) BETWEEN 11 AND 15 THEN 'Lunch'
            ELSE 'Dinner'
        END,
        CASE 
            WHEN DAYOFWEEK(Date) IN (1, 7) THEN 'Weekend'
            ELSE 'Weekday'
        END
),
top_2_items AS (
    SELECT 
        DayType,
        Daypart,
        CONCAT(
            MAX(CASE WHEN rn = 1 THEN CONCAT('( ',Item, ' ') END),
            ', ',
            MAX(CASE WHEN rn = 2 THEN CONCAT(Item, ' )') END)
        ) as top_items
    FROM ranked_items
    WHERE rn <= 2
    GROUP BY DayType, Daypart
)
SELECT 
    DayType,
    Daypart,
    top_items
FROM top_2_items
ORDER BY 
    CASE WHEN DayType = 'Weekend' THEN 1 ELSE 2 END,
    CASE 
        WHEN Daypart = 'Breakfast' THEN 1
        WHEN Daypart = 'Lunch' THEN 2
        ELSE 3
    END;
"""
).show()

In [None]:
df_bakery.printSchema()

# Question 3 answer below


In [None]:
Restaurants_in_Durham_County_NC = spark.read\
  .option("header", True)\
  .option("inferSchema", True)\
  .json("shared/data/Restaurants_in_Durham_County_NC.json")

In [None]:
Restaurants_in_Durham_County_NC.select('*').show()

In [None]:
Restaurants_in_Durham_County_NC.createOrReplaceTempView("Restaurants_in_Durham_County_NC_view")

In [None]:
spark.sql(
"""
SELECT 
    fields.rpt_area_desc as area_desc,
    COUNT(*) as count
FROM Restaurants_in_Durham_County_NC_view
WHERE fields.rpt_area_desc IS NOT NULL
GROUP BY fields.rpt_area_desc
ORDER BY count DESC
"""
).show()

# Question 4 answer below 

In [None]:
# Read the CSV first
from pyspark.sql.functions import col, round, max, min, lit

# Read CSV with proper schema inference
df_population = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .csv("shared/data/populationbycountry19802010millions.csv") \
    .withColumnRenamed("_c0", "Country")


In [None]:
df_population.select('1990').show()

In [None]:
from pyspark.sql.functions import col, when, isnull, round

# Calculate percentage change between 1990 and 2000
df_changes = df_population \
    .where(col("Country") != "World") \
    .select(
        "Country",
        when(
            (col("1990").isNull()) | (col("2000").isNull()) | (col("1990") == 0),
            None
        ).otherwise(
            round(((col("2000") - col("1990")) / col("1990") * 100), 2)
        ).alias("pct_change")
    )
            
# Find max and min changes
max_change = df_changes.agg(max("pct_change").alias("max_change")).collect()[0]["max_change"]
min_change = df_changes.agg(min("pct_change").alias("min_change")).collect()[0]["min_change"]

# Get countries with max and min changes
result = df_changes \
    .where((col("pct_change") == max_change) | (col("pct_change") == min_change)) \
    .orderBy(col("pct_change").desc())

# Display results
result.show()

# Question 5 & 6 answer below

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, explode, lower, concat_ws, count
from pyspark.ml.feature import NGram

In [None]:
def read_text_files(file_paths):

    text_data = []
    for file_path in file_paths:
        text_df = spark.read.text(file_path)
        text_data.append(text_df)
    
    return reduce(DataFrame.unionAll, text_data) if len(text_data) > 1 else text_data[0]


def word_count_analysis(df):
    # Convert to lowercase and clean text
    df_cleaned = df.select(
        lower(col("value")).alias("text")
    )
    
    # Create RegexTokenizer to remove punctuation and tokenize
    regexTokenizer = RegexTokenizer(
        inputCol="text", 
        outputCol="words", 
        pattern="[^0-9a-z]"  # Keep only alphanumeric characters
    )
    
    # Apply tokenization
    words_df = regexTokenizer.transform(df_cleaned)
    
    # Explode the words array and count occurrences
    word_counts = words_df.select(
        explode(col("words")).alias("word")
    ).filter(
        col("word") != ""  # Remove empty strings
    ).groupBy(
        "word"
    ).count().orderBy(
        col("count").desc()
    )
    
    return word_counts


def bigram_analysis(df):
    # Clean and tokenize text as before
    df_cleaned = df.select(lower(col("value")).alias("text"))
    
    regexTokenizer = RegexTokenizer(
        inputCol="text", 
        outputCol="words", 
        pattern="[^0-9a-z]"
    )
    
    words_df = regexTokenizer.transform(df_cleaned)
    
    # Create bigrams
    ngram = NGram(n=2, inputCol="words", outputCol="bigrams")
    bigrams_df = ngram.transform(words_df)
    
    # Count bigram occurrences
    bigram_counts = bigrams_df.select(
        explode(col("bigrams")).alias("bigram")
    ).groupBy(
        "bigram"
    ).count().orderBy(
        col("count").desc()
    )
    
    return bigram_counts

In [None]:
 file_paths = ["./*.txt"]
    
# Read and process text files
text_df = read_text_files(file_paths)

In [None]:
# Perform word count analysis
print("Word Count Analysis:")
word_counts = word_count_analysis(text_df)
word_counts.show(20) 
    
# Perform bigram analysis
print("\nTop 10 Bigrams:")
bigram_counts = bigram_analysis(text_df)
bigram_counts.show(10) 

# Question 7 answer below

In [None]:
Restaurants_in_Durham_County_NC = spark.read\
  .option("header", True)\
  .option("inferSchema", True)\
  .json("shared/data/Restaurants_in_Durham_County_NC.json")

foreclosures_df = spark.read.json("./shared/data/durham-nc-foreclosure-2006-2016.json")

In [None]:
from pyspark.sql.functions import element_at
Restaurants_in_Durham_County_NC.select((col('fields'))).show()

In [None]:
from haversine import haversine, Unit
from pyspark.sql.functions import udf, col, element_at, isnan
from pyspark.sql.types import DoubleType

# Part A: Find closest restaurant
INITIAL_LAT = 35.994914
INITIAL_LON = -78.897133

def calculate_distance(lat, lon):
    try:
        if lat is None or lon is None:
            return None
        return haversine(
            (float(INITIAL_LAT), float(INITIAL_LON)), 
            (float(lat), float(lon)),
            unit='mi'
        )
    except (ValueError, TypeError):
        return None

distance_calculator_udf = udf(calculate_distance, DoubleType())

# Find closest restaurant
restaurant_distances = Restaurants_in_Durham_County_NC \
    .filter(
        (col('fields.status') == 'ACTIVE') & 
        (col('fields.rpt_area_desc') == 'Food Service')
    ) \
    .select(
        col('fields.premise_name').alias('rest_name'),
        element_at(col('fields.geolocation'), 1).alias('latitude'),
        element_at(col('fields.geolocation'), 2).alias('longitude'),
        distance_calculator_udf(
            element_at(col('fields.geolocation'), 1),
            element_at(col('fields.geolocation'), 2)
        ).alias('distance')
    ) \
    .where(col('distance').isNotNull()) \
    .orderBy('distance') \
    .limit(1)

In [None]:
# Part B: Find foreclosures near the closest restaurant
closest_restaurant = restaurant_distances.first()
if closest_restaurant:
    # Create new distance calculator using restaurant coordinates
    def calculate_distance_from_restaurant(lat, lon):
        try:
            if lat is None or lon is None:
                return None
            return haversine(
                (float(closest_restaurant['latitude']), float(closest_restaurant['longitude'])), 
                (float(lat), float(lon)),
                unit='mi'
            )
        except (ValueError, TypeError):
            return None

    restaurant_distance_calculator_udf = udf(calculate_distance_from_restaurant, DoubleType())

    # Find foreclosures within 1 mile of closest restaurant
    foreclosed_properties_within_one_mile = durham_nc_foreclosure_2006_2016 \
        .filter(col('fields.geocode').isNotNull()) \
        .select(
            element_at(col('fields.geocode'), 1).alias('latitude'),
            element_at(col('fields.geocode'), 2).alias('longitude'),
            restaurant_distance_calculator_udf(
                element_at(col('fields.geocode'), 1),
                element_at(col('fields.geocode'), 2)
            ).alias('distance')
        ) \
        .where(
            col('latitude').isNotNull() & 
            col('longitude').isNotNull() & 
            col('distance').isNotNull() & 
            (col('distance') <= 1.0)
        )

    print("\nClosest restaurant found:")
    restaurant_distances.show(truncate=False)
    print(f"\nNumber of foreclosures within 1 mile of closest restaurant: {foreclosed_properties_within_one_mile.count()}")


In [None]:
foreclosed_properties_within_one_mile.select('*').show()

In [None]:
foreclosed_properties_within_one_mile.count()