#### Business Data Analysis from Yelp and Google Local near CalStateLA, USC, UCLA: Los Angeles 2016

###### Ram Dharan, Jongwook Woo at HiPIC (http://web.calstatela.edu/centers/hipic/), California State University Los Angeles
 
**Date:** 07/21/2016

In [3]:
# Load input data set from Amazon AWS S3?
IS_USING_S3 = True

In [4]:
# Load input data set from Amazon S3
# NOTE: Set the access to this notebook appropriately to protect the security of your keys as IAM user not Root user.
# Or you can delete this cell after you run the mount command below once successfully.
if IS_USING_S3:
  ACCESS_KEY = "YOUR_IAM_USER_ACCESS_KEY"
  SECRET_KEY = "YOUR_IAM_USER_SECRET_KEY"
  ENCODED_SECRET_KEY = SECRET_KEY.replace("/", "%2F")
  AWS_BUCKET_NAME = "hipicdatasets"
  MOUNT_NAME = "yelp" # "yelp_raw_fall_2016.csv"

In [5]:
if IS_USING_S3:
  # unmount the existing mount
  dbutils.fs.unmount("/mnt/%s" % MOUNT_NAME)
  # mount it to dtabricks file systems
  dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)


In [6]:
if IS_USING_S3:
  business_raw_yelpRDD = sc.textFile("/mnt/%s/bizDataLASummer2016.csv" % MOUNT_NAME)
  # Count the number of data
  business_raw_yelpRDD.count()

In [7]:

if IS_USING_S3:
  # Read input data directly from Amazon AWS S3
  business_df_yelp = sqlContext.read.format('com.databricks.spark.csv').options(header='true',inferSchema='true').load("/mnt/%s/bizDataLASummer2016.csv" % MOUNT_NAME)
else:
  # You can read data from the table created at community cloud server of Databricks
  # NOTE: You have to put your file path
  business_df_yelp = sqlContext.read.format('com.databricks.spark.csv').options(header='true',inferSchema='true').load('/FileStore/tables/d0sesps11468549046044/All__3_-f9cfc.csv')

In [8]:
# See the Schema
business_df_yelp.printSchema()

In [9]:
# see the distinct business listing
display(business_df_yelp.select('business_id').distinct().collect())

Function to calculate distance in miles using latitude and longitude

In [11]:
import math
from math import sin, cos, sqrt, atan2, radians,asin

# radius in miles
RADIUS = 5000 #3965

# distance between (lat1, lon1) and (lat2, lon2)
def distance(lat1, lon1, lat2, lon2, radius):
   try:
    # Geo of CalStateLA
    if lat2 is None: 
      lat2 = 34.0651 
    if lon2 is None: 
      lon2 = -118.1701 
    if radius is None: 
      radius = 0

    dlat = math.radians(lat2-lat1)
    dlon = math.radians(lon2-lon1)
    a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \
        * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    d = radius * c

    return d
   except:
    return 0.0

# distance in the radius specfied above from California State University Los Angeles
def distanceCalStateLA(lat1, lon1) :
  return distance(lat1, lon1, None, None, RADIUS)

# distance in the radius specfied above  from University of California Los Angeles
def distanceUCLA(lat1, lon1) :
  return distance(lat1, lon1, 34.0689, -118.4452, RADIUS)

# distance in the radius specfied above  from University of Southern California
def distanceUSC(lat1, lon1) :
  return distance(lat1, lon1, 34.0224, -118.2851, RADIUS)


Register UDFs

In [13]:
# Register UDFs
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
distanceCalStateLA_udf = udf(distanceCalStateLA, FloatType())
distanceUCLA_udf = udf(distanceUCLA, FloatType())
distanceUSC_udf = udf(distanceUSC, FloatType())

In [14]:
data = business_df_yelp.select().distinct()
display(data)

##Add a new column distance to data frame and calculate distance from CalStateLA, USC and UCLA

In [16]:
distance_df=business_df_yelp.withColumn('distance', distanceCalStateLA_udf('latitude' ,'longitude'))
display(distance_df)

In [17]:
distance_usc_df=business_df_yelp.withColumn('distance', distanceUSC_udf('latitude' ,'longitude'))
display(distance_usc_df)

In [18]:
distance_ucla_df=business_df_yelp.withColumn('distance', distanceUCLA_udf('latitude' ,'longitude'))
display(distance_ucla_df)

##Display all the businesses under 5 miles from CalStateLA

In [20]:
business_within_5=distance_df.where(distance_df['distance']<5)
display(business_within_5)

## list the top 10 business categories within 5 miles of CalStateLA

In [22]:
from pyspark.sql.functions import col
top_business_csula=business_within_5.groupby('categories__001').count().sort(col("count").desc())
display(top_business_csula.take(10))

## List businesses with the average star ratings, which is within 5 miles from CalStateLA and have more than 700 points star ratings

In [24]:
import pyspark.sql.functions as func

# SparkSQL
#sqlContext.sql("SELECT count(stars) as total,city from business_data_yelp where city IN('alhambra','Pasadena','Long beach','Santa monica','Beverly hills','burbank','West hollywood','arcadia','El monte','Monterey park','San gabriel','downey','baldwin park','Montebello','Los angeles') and stars=5 group by city order by total desc").show()

# DataFrame
display(business_within_5.groupBy('categories__001')
        .agg(func.count('stars').alias('total'), func.avg('stars').alias('star average')).where("total>700").orderBy(col('star average').desc()))

##Display all the businesses under 5 miles from USC

In [26]:
business_within_5_usc=distance_usc_df.where(distance_usc_df['distance']<5)
display(business_within_5_usc)

##Business within 5 miles of USC in the sorted distance

In [28]:
business_within_5_usc=distance_usc_df.where(distance_usc_df['distance']<5).orderBy(col('distance'))
display(business_within_5_usc)

## List businesses with the average star ratings, which is within 5 miles from USC and have more than 500 points star ratings

In [30]:
display(business_within_5_usc.groupBy('categories__001')
        .agg(func.count('stars').alias('total'), func.avg('stars').alias('star average'))
        .where("total>500").orderBy(col('star average').desc()))

##Display all the businesses under 5 miles from UCLA

In [32]:
business_within_5_ucla=distance_ucla_df.where(distance_ucla_df['distance']<5)
display(business_within_5_ucla)

##Business within 5 miles of UCLA in the sorted distance

In [34]:
business_within_5_ucla=distance_ucla_df.where(distance_ucla_df['distance']<5).orderBy(col('distance'))
display(business_within_5_ucla)

##Display business names for more than 250 instances  where the ratings are higher than the overall average

In [36]:
display(business_within_5_ucla.groupBy('categories__001')
        .agg(func.count('stars').alias('total'), func.avg('stars').alias('star average'))
        .where("total>250").orderBy(col('star average').desc()))

##Register data frame as Table

In [38]:
business_df_yelp.registerTempTable("yelp_data")

##Display which areas have more star rating count around Los Angeles

In [40]:
display(sqlContext.sql("SELECT count(stars) as total,city from yelp_data where city IN('alhambra','Pasadena','Long beach','Santa monica','Beverly hills','burbank','West hollywood','arcadia','El monte','Monterey park','San gabriel','downey','baldwin park','Montebello','Los angeles') and stars=5 group by city order by total desc"))

## List the star rating and its total counts

In [42]:
display(sqlContext.sql("select stars as score, count(*) as total from ( select case  when stars between 0 and 1 then ' 0-1' when stars between 1 and 2 then '1-2' when stars between 2 and 3 then '2-3' when stars between 3 and 4 then '3-4' when stars between 4 and 5 then '4-5'when stars is Null  then 'others' end as stars from yelp_data) yelp_data group by yelp_data.stars order by score"))

##Show how many busineeses are opened during different time intervals of the day

In [44]:
time_data=sqlContext.sql("select case when Monday__open between 0.0 and 1.99 then '00:00-2:00' when Monday__open between 2.0 and 3.59 then '02:00-04:00' when Monday__open between 4.0 and 5.59 then '04:00-06:00' when Monday__open between 6.0 and 7.59 then '06:00-08:00' when Monday__open between 8.0 and 9.59 then '08:00-10:00' when Monday__open between 10.0 and 11.59 then '10:00-12:00' when Monday__open between 12.0 and 13.59 then '12:00-14:00' when Monday__open between 14.0 and 15.39 then '14:00-16:00' when Monday__open between 16.0 and 17.59 then '16:00-18:00' when Monday__open between 18.0 and 19.59 then '18:00-20:00' when Monday__open between 20.0 and 21.59 then '20:00-22:00' when Monday__open between 22.0 and 23.59 then '22:00-00:00' end as Monday__open from yelp_data where Monday__open is not null")

In [45]:
from pyspark.sql.functions import col
display(time_data.groupby('Monday__open').count().sort(col('Monday__open')))

##Spatial visualization of all the businesses in different cities around Los Angeles

In [47]:
display(sqlContext.sql("SELECT latitude,longitude,city from yelp_data where city IN('alhambra','Pasadena','Long beach','Santa monica','Beverly hills','burbank','West hollywood','arcadia','El monte','Monterey park','San gabriel','downey','baldwin park','Montebello','Los angeles') "))

## union all businesses within 5 miles from CalStateLA, USC, UCLA

In [49]:
#display(sqlContext.sql("SELECT latitude,longitude,city from yelp_data where city IN('alhambra','Pasadena','Long beach','Santa monica','Beverly hills','burbank','West hollywood','arcadia','El monte','Monterey park','San gabriel','downey','baldwin park','Montebello','Los angeles') "))
biz_near_schools = business_within_5.unionAll(business_within_5_ucla.unionAll(business_within_5_usc))
#display(biz_near_schools)


In [50]:
biz_near_schools.cache()

In [51]:
display(biz_near_schools.select('latitude','longitude','categories__001', 'name'))

# Save to local file systems - NOTE: don't know where the file exists
#biz_near_schools.select('latitude','longitude','categories__001', 'name').repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("/FileStore/bizNearSchools.csv") #"file:///G//Downloads//bizNearSchools2.csv")

# Save to Amazon S3
#biz_near_schools.select('latitude','longitude','categories__001', 'name').repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("s3a://%s:%s@%s/%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME, "bizNearSchoolsOne.csv")) 