# Data Retrieval

The purpose of this notebook is to create a quick access notebook to read in the data files and to be run as a notebook for data analysis. 

For example, run the following code in a new notebook to retrieve data:

%run Data-Retrieval.ipynb

In [1]:
#Import Libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, MapType, IntegerType, FloatType, DateType, TimestampType, ArrayType
from pyspark.sql.functions import explode, col, map_keys, split, expr, lit, when, array, udf, coalesce, create_map

import pandas as pd
from IPython.display import Image
import os

In [2]:
# Define the application name
app_name = "FinalProject"

# Create a SparkSession with the specified application name
spark = SparkSession.builder.appName(app_name).getOrCreate()

# Retrieve the SparkContext
sc = spark.sparkContext

24/03/26 04:10:05 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
#Set up directory - Data must be placed 
#Set up directory - Data must be placed 
bucket_name =  'quagliam-bucket-milestone1' ### Change this to your bucket name
proj_directory = f'gs://{bucket_name}/notebooks/jupyter/'

## Business

In [4]:
#Define Business Schema
businessSchema = StructType([
    StructField("business_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("address", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("postal_code", StringType(), True),
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True),
    StructField("stars", FloatType(), True),
    StructField("review_count", IntegerType(), True),
    StructField("is_open", IntegerType(), True),
    StructField("attributes", MapType(StringType(), StringType()), True), #Dictionary Data Type
    StructField("categories", StringType(), True),
    StructField("hours",MapType(StringType(), StringType()), True) #Dictionary Data Type
])

In [5]:
#Read dataframes
business = spark.read.schema(businessSchema).json(f'{proj_directory}yelp_academic_dataset_business.json')
#Change categories to an array of strings
business = business.withColumn("categories", split(business["categories"], ", "))

## Review

In [8]:
#Define Checkin review
reviewSchema = StructType([
    StructField("review_id", StringType(), nullable=True),
    StructField("user_id", StringType(), nullable=True),
    StructField("business_id", StringType(), nullable=True),
    StructField("stars", FloatType(), nullable=True),
    StructField("useful", IntegerType(), nullable=True),
    StructField("funny", IntegerType(), nullable=True),
    StructField("cool", IntegerType(), nullable=True),
    StructField("text", StringType(), nullable=True),
    StructField("date", TimestampType(), nullable=True)
])

In [9]:
review = spark.read.schema(reviewSchema).json(f'{proj_directory}yelp_academic_dataset_review.json').cache()

## User

In [12]:
#Define user schema
userSchema = StructType([
    StructField("user_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("review_count", IntegerType(), True),
    StructField("yelping_since", TimestampType(), True),
    StructField("friends", StringType(), True),
    StructField("useful", IntegerType(), True),
    StructField("funny", IntegerType(), True),
    StructField("cool", IntegerType(), True),
    StructField("fans", IntegerType(), True),
    StructField("elite", StringType(), True),
    StructField("average_stars", FloatType(), True),
    StructField("compliment_hot", IntegerType(), True),
    StructField("compliment_more", IntegerType(), True),
    StructField("compliment_profile", IntegerType(), True),
    StructField("compliment_cute", IntegerType(), True),
    StructField("compliment_list", IntegerType(), True),
    StructField("compliment_note", IntegerType(), True),
    StructField("compliment_plain", IntegerType(), True),
    StructField("compliment_cool", IntegerType(), True),
    StructField("compliment_funny", IntegerType(), True),
    StructField("compliment_writer", IntegerType(), True),
    StructField("compliment_photos", IntegerType(), True)
])

In [13]:
user = spark.read.schema(userSchema).json(f'{proj_directory}yelp_academic_dataset_user.json')
user = user.withColumn("friends", split(user["friends"], ", "))
user = user.withColumn("elite", split(user["elite"], ",")).cache()

## Edgelist

In [None]:
edges = spark.read.parquet(f"{proj_directory}/edges.parquet")