### 1. Description of Data

The Yelp dataset is downloaded from Kaggle website. In total, there are 5,200,000 user reviews, information on 174,000 business. we will focus on two tables which are business table and review table. Attributes of business table are as following:

- business_id: ID of the business
- name: name of the business
- neighborhood
- address: address of the business
- city: city of the business
- state: state of the business
- postal_code: postal code of the business
- latitude: latitude of the business

- longitude: longitude of the business
- stars: average rating of the business
- review_count: number of reviews received
- is_open: 1 if the business is open, 0 therwise
- categories: multiple categories of the business

### Attribues of review table are as following:

- review_id: ID of the review
- user_id: ID of the user
- business_id: ID of the business
- stars: ratings of the business
- date: review date
- text: review from the user
- useful: number of users who vote a review as usefull
- funny: number of users who vote a review as funny
- cool: number of users who vote a review as cool

### 2. Tasks carried out on datasets:-

###### Out of 7 csv data files, I chose to work on 2 of them
- yelp_business.csv size: 30MB
- yelp_review.csv   size:3.53GB

In [1]:
import sys
import os

sys.path.insert(0, '/usr/hdp/current/spark2-client/python')
sys.path.insert(0, '/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip')

os.environ['SPARK_HOME'] = '/usr/hdp/current/spark2-client/'
os.environ['SPARK_CONF_DIR'] = '/etc/spark2/conf'
os.environ['PYSPARK_PYTHON'] = '/opt/anaconda3/bin/python'

import pyspark
conf = pyspark.SparkConf()
conf.setMaster("yarn")
conf.set("spark.driver.memory","1g")
conf.set("spark.executor.instances", "8")
conf.set("spark.executor.memory","8g")
conf.set("spark.executor.cores","5")

sc = pyspark.SparkContext(conf=conf)

In [2]:
sc

In [4]:
!hdfs dfs -ls Yelp_data

Found 8 items
drwxr-xr-x   - PK922097 hadoop          0 2020-01-02 14:11 Yelp_data/.ipynb_checkpoints
-rw-r--r--   3 PK922097 hadoop   31760674 2020-01-02 14:11 Yelp_data/yelp_business.csv
-rw-r--r--   3 PK922097 hadoop   41377121 2020-01-02 14:11 Yelp_data/yelp_business_attributes.csv
-rw-r--r--   3 PK922097 hadoop   13866351 2020-01-02 14:11 Yelp_data/yelp_business_hours.csv
-rw-r--r--   3 PK922097 hadoop    5053126 2020-01-02 14:10 Yelp_data/yelp_checkin.csv
-rw-r--r--   3 PK922097 hadoop 3791120545 2020-01-02 14:11 Yelp_data/yelp_review.csv
-rw-r--r--   3 PK922097 hadoop  148085910 2020-01-02 14:11 Yelp_data/yelp_tip.csv
-rw-r--r--   3 PK922097 hadoop 1363176944 2020-01-02 14:11 Yelp_data/yelp_user.csv


In [2]:
business = sc.textFile("Yelp_data/yelp_business.csv")

In [3]:
# Use of sqlContext for eploring the data

sqlContext = pyspark.SQLContext(sc)
sqlContext

<pyspark.sql.context.SQLContext at 0x7f9cf047de50>

## Exploring business data

In [4]:
business = sqlContext.read.format("com.databricks.spark.csv")\
    .option("header", "true")\
    .option("inferschema", "true")\
    .load("Yelp_data/yelp_business.csv")\
    .cache()

In [5]:
business.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- stars: string (nullable = true)
 |-- review_count: string (nullable = true)
 |-- is_open: double (nullable = true)
 |-- categories: string (nullable = true)



In [6]:
business.show(5)

+--------------------+--------------------+------------+--------------------+--------------+---------+-----------+----------+-----------+------------+------------+-------+--------------------+
|         business_id|                name|neighborhood|             address|          city|    state|postal_code|  latitude|  longitude|       stars|review_count|is_open|          categories|
+--------------------+--------------------+------------+--------------------+--------------+---------+-----------+----------+-----------+------------+------------+-------+--------------------+
|FYWN1wneV18bWNgQj...|"""Dental by Desi...|        null| """4855 E Warner Rd|     Ste B9"""|Ahwatukee|         AZ|     85044| 33.3306902|-111.9785992|         4.0|   22.0|                   1|
|He-G7vWjzVUysIKrf...|"""Stephen Szabo ...|        null|"""3101 Washingto...|      McMurray|       PA|      15317|40.2916853|-80.1048999|         3.0|          11|    1.0|Hair Stylists;Hai...|
|KQPW8lFf1y5BT2Mxi...|"""Western Mo

In [39]:
# Calculating null values in dataset

for col in business.columns:
    
    print(col, "\n", "===with null values: ", business.filter(business[col].isNull()).count())

business_id 
 ===with null values:  0
name 
 ===with null values:  0
neighborhood 
 ===with null values:  104758
address 
 ===with null values:  1699
city 
 ===with null values:  97
state 
 ===with null values:  8
postal_code 
 ===with null values:  590
latitude 
 ===with null values:  28
longitude 
 ===with null values:  6
stars 
 ===with null values:  2
review_count 
 ===with null values:  0
is_open 
 ===with null values:  0
categories 
 ===with null values:  0


## Eploring review data

In [18]:
review = sqlContext.read.format("com.databricks.spark.csv")\
    .option("header", "true")\
    .option("inferschema", "true")\
    .load("Yelp_data/yelp_review.csv")\
    .cache()

In [19]:
review.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- stars: string (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: string (nullable = true)
 |-- funny: string (nullable = true)
 |-- cool: string (nullable = true)



In [20]:
review.show(5)

+--------------------+--------------------+--------------------+-----+----------+--------------------+------+-----+----+
|           review_id|             user_id|         business_id|stars|      date|                text|useful|funny|cool|
+--------------------+--------------------+--------------------+-----+----------+--------------------+------+-----+----+
|vkVSCC7xljjrAI4UG...|bv2nCi5Qv5vroFiqK...|AEx2SYEUJmTxVVB18...|    5|2016-05-28|Super simple plac...|  null| null|null|
|Staff was very he...|                   0|                   0|    0|      null|                null|  null| null|null|
|n6QzIUObkYshz4dz2...|bv2nCi5Qv5vroFiqK...|VR6GpWIda3SfvPC-l...|    5|2016-05-28|Small unassuming ...|  null| null|null|
|We had their beef...|                null|                null| null|      null|                null|  null| null|null|
|A bit outside of ...|                   0|                   0|    0|      null|                null|  null| null|null|
+--------------------+----------

In [40]:
# calculating null values in dataset

for col in review.columns:
    
    print(col, "\n", "====with null values:", review.filter(review[col].isNull()).count())

review_id 
 ====with null values: 203
user_id 
 ====with null values: 2079178
business_id 
 ====with null values: 3165175
stars 
 ====with null values: 3871240
date 
 ====with null values: 5329698
text 
 ====with null values: 6135851
useful 
 ====with null values: 8641595
funny 
 ====with null values: 8918581
cool 
 ====with null values: 9077133


## TASKS:-



## 1. Find top 10 reviewed business 
    

##### Following result is before dropping the null values

In [41]:
# to do this task take review data which has rating(stars) more than 3
review3star = review.filter('stars >3')
grouped_review = review3star.groupby('business_id','stars').count()
review_sort = grouped_review.sort('count',ascending=False)

In [42]:
review_sort.show()

+--------------------+-----+-----+
|         business_id|stars|count|
+--------------------+-----+-----+
|                   2|    4| 4620|
|                   3|    4| 4441|
|                   4|    4| 3791|
|                   1|    4| 3789|
|4JNXUYY8wbaaDmk3B...|    5| 3280|
|                   0|    4| 2883|
|RESDUcs7fIiihp38-...|    5| 2725|
|4JNXUYY8wbaaDmk3B...|    4| 2576|
|                   4|    5| 2508|
|                   3|    5| 2499|
|DkYS3arLOhA8si5uU...|    5| 2443|
|                   5|    4| 2435|
|hihud--QRriCYZw1z...|    5| 2280|
|cYwJA2A6I12KNkm2r...|    5| 2198|
|KskYqH1Bi7Z_61pH6...|    5| 2162|
|                   2|    5| 2126|
|                   5|    5| 2031|
|K7lWdNUhCbcnEvI0N...|    4| 1913|
|RESDUcs7fIiihp38-...|    4| 1899|
|f4x1YBxkLrZg652xt...|    5| 1888|
+--------------------+-----+-----+
only showing top 20 rows



In [43]:
review = review.na.drop()

In [46]:
business = business.na.drop()

In [44]:
# so take review data which has rating(stars) more than 3
review_star_three = review.filter('stars >3')
grouped_review = review_star_three.groupby('business_id','stars').count()
review_sort = grouped_review.sort('count',ascending=False)

In [47]:
review_sort.show()

# bit of clean data after dropping null values

+--------------------+-----+-----+
|         business_id|stars|count|
+--------------------+-----+-----+
|4JNXUYY8wbaaDmk3B...|    5| 1894|
|hihud--QRriCYZw1z...|    5| 1848|
|RESDUcs7fIiihp38-...|    5| 1397|
|cYwJA2A6I12KNkm2r...|    5| 1294|
|f4x1YBxkLrZg652xt...|    5| 1242|
|4JNXUYY8wbaaDmk3B...|    4| 1242|
|KskYqH1Bi7Z_61pH6...|    5| 1178|
|3kdSl5mo9dWC4clrQ...|    5| 1160|
|DkYS3arLOhA8si5uU...|    5| 1101|
|fL-b760btOaGa85OJ...|    5| 1026|
|faPVqws-x-5k2CQKD...|    5|  951|
|iCQpiavjjPzJ5_3gP...|    5|  942|
|K7lWdNUhCbcnEvI0N...|    5|  902|
|mDR12Hafvr84ctpsV...|    5|  896|
|VyVIneSU7XAWgMBll...|    5|  889|
|RwMLuOkImBIqqYj4S...|    5|  856|
|igHYkXZMLAc9UdV5V...|    5|  843|
|f4x1YBxkLrZg652xt...|    4|  814|
|RESDUcs7fIiihp38-...|    4|  813|
|Xg5qEQiB-7L6kGJ5F...|    5|  807|
+--------------------+-----+-----+
only showing top 20 rows



## 2. Top 10 categories with highest business count

In [49]:
from pyspark.sql.functions import split,explode

In [52]:
category = business.select('categories')
individual_category = category.select(explode(split('categories', ',')).alias('category'))
grouped_category = individual_category.groupby('category').count()
top_category = grouped_category.sort('count',ascending=False)
top_category.show(10,truncate=False)

+-------------------------+-----+
|category                 |count|
+-------------------------+-----+
|1                        |12624|
|0                        |2895 |
|Food;Coffee & Tea        |452  |
|Coffee & Tea;Food        |398  |
|Restaurants;Pizza        |324  |
|Restaurants;Chinese      |303  |
|Pizza;Restaurants        |294  |
|Chinese;Restaurants      |279  |
|Beauty & Spas;Hair Salons|268  |
|Restaurants;Italian      |266  |
+-------------------------+-----+
only showing top 10 rows



In [57]:
business.select(['categories']).show(20)

+--------------------+
|          categories|
+--------------------+
|                   1|
|Bakeries;Bagels;Food|
|Restaurants;Ameri...|
|Italian;French;Re...|
|   Food;Coffee & Tea|
|Bars;Sports Bars;...|
|Tiki Bars;Nightli...|
|                   1|
|Coffee & Tea;Food...|
|Arts & Entertainm...|
|Hair Salons;Blow ...|
|                   1|
|                   1|
| Italian;Restaurants|
|Hotels & Travel;C...|
|Nightlife;Bars;Ba...|
|                   1|
|                   1|
|                   1|
|                   1|
+--------------------+
only showing top 20 rows



In [None]:
## 3. To claculate average rating for each business

In [None]:
business_avg = sqlContext.sql("""
SELECT business_id , round(avg(stars),4) AS avg_rating
FROM y_reviews
Group By business_id
""")
business_avg.show(truncate=True)
business_avg.sort('avg_rating')

In [None]:
## 4.Top rating given by users to business

In [None]:
## Top rating given by users to business

rating = business.select('stars')
User_rating = rating.groupby('stars').count()
User_rating_top = User_rating.sort('count',ascending=False)
User_rating_top.show(10)

In [None]:
### 5. Locations which have more businesses

In [None]:
locations = business.select('business_id','city')
review_city = review.select('business_id')

# merging two dataframes to get final result
merge_city = locations.join(review_city,'business_id','inner')
grouped_review_city = merge_city.groupby('city').count()
most_reviewed_city = grouped_review_city.groupby('city').sum()
most_reviewed_city.sort('sum(count)',ascending=False).show(10)

In [None]:
from pyspark.sql.functions import lit, col

In [None]:
states = ["AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DC", "DE", "FL", "GA", 
          "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD", 
          "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", 
          "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", 
          "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY"]

In [None]:
US_data= sqlContext.read.format("com.databricks.spark.csv")\
    .option("header", "true")\
    .option("inferschema", "true")\
    .load("yelp_business.csv")\
    .cache()

In [None]:
US_data = US_data.drop('neighborhood','latitude','longitude')

In [None]:
df = US_data.filter(col('state').isin(states)).show(5)

In [None]:
from pyspark.sql.functions import *

In [None]:
category = df.select('categories')
individual_category = category.select(explode(split('categories', ',')).alias('category'))
grouped_category = individual_category.groupby('category').count()
top_category = grouped_category.sort('count',ascending=False)
top_category.show(10,truncate=False)

In [None]:
locations = df.select('business_id','city')
review_city = review.select('business_id')

# merging two dataframes to get final result
merge_city = locations.join(review_city,'business_id','inner')
grouped_review_city = merge_city.groupby('city').count()
most_reviewed_city = grouped_review_city.groupby('city').sum()
most_reviewed_city.sort('sum(count)',ascending=False).show(10)