#**Assignment9 Spark**
###6031010021 Yanika Dontong

### **Prepare Spark and Data**

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
!tar xf spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()
sc = spark.sparkContext

In [5]:
!wget -q https://www.dropbox.com/s/43ocwui5rgjzi4r/cat_id.csv
!wget -q https://www.dropbox.com/s/6y5turfkb99tbhn/wongnai.csv

In [6]:
import pandas as pd
cat_df = pd.read_csv('cat_id.csv', names=('id', 'category'), dtype=(str, str))
wongnai_df = spark.read.csv('wongnai.csv',header=True)

In [7]:
cat_df.id = cat_df.id + '.0'

In [8]:
cat_mapping = cat_df.set_index('id').to_dict()['category']
cat_mapping

{'10.0': 'Indian',
 '11.0': 'Buffet',
 '12.0': 'Street Food',
 '13.0': 'Fusion Food',
 '14.0': 'Coffee Shop',
 '15.0': 'Others',
 '2.0': 'Japanese',
 '3.0': 'Thai',
 '4.0': 'Chinese',
 '5.0': 'Italian',
 '6.0': 'Korean',
 '7.0': 'Vietnam',
 '8.0': 'American',
 '9.0': 'French'}

In [9]:
wongnai_df.show()

+---+---+--------------------+-----------+-------+------------------+------------------+-----------+---------+-------------------+----------------------------+---+---------------+------------------+------------------+
|_c0| id|                name|category_id|city_id|          latitude|         longitude|price_range|avg_price|number_of_bookmarks|number_of_favorite_bookmarks|top|best_of_wongnai|number_of_checkins|    all_time_score|
+---+---+--------------------+-----------+-------+------------------+------------------+-----------+---------+-------------------+----------------------------+---+---------------+------------------+------------------+
|  0|  2|         บ้านกลมกิ๊ก|        3.0|    1.0|13.738961999999999|        100.550248|        2.0|    175.0|                294|                          84|  1|           null|                32| 22487.87235772358|
|  1|  3|โรงเบียร์เยอรมันต...|       33.0|    1.0|          13.84232|        100.634959|        3.0|     null|                  

In [10]:
wongnai_df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- city_id: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- price_range: string (nullable = true)
 |-- avg_price: string (nullable = true)
 |-- number_of_bookmarks: string (nullable = true)
 |-- number_of_favorite_bookmarks: string (nullable = true)
 |-- top: string (nullable = true)
 |-- best_of_wongnai: string (nullable = true)
 |-- number_of_checkins: string (nullable = true)
 |-- all_time_score: string (nullable = true)



### **จงนับจำนวนร้านอาหารทั้งหมดที่มี**

In [11]:
wongnai_df.count()

283508

### **จงนับจำนวนร้านอาหารในกรุงเทพทั้งหมดที่มี**

after looked up for the latitude and longitude that provide, assumed that city_id of Bangkok is 1.0

In [12]:
wongnai_df.filter(wongnai_df['city_id']==1.0).count()

90611

### **จงนับจำนวนร้านอาหารญี่ปุ่นในต่างจังหวัด (ที่ไม่ใช่กรุงเทพฯ) ทั้งหมดที่มี**

In [13]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

In [14]:
def category_mapping(cat_id): 
    if cat_id in cat_mapping:
        return cat_mapping[cat_id]
    else:
        return 'Unknown'

to_category = udf(category_mapping, StringType())

In [18]:
res_outside_bkk = wongnai_df.filter(wongnai_df['city_id']!=1.0)
res_cat = res_outside_bkk.withColumn('category', to_category(res_outside_bkk.category_id))

In [23]:
jap_res_outside_bkk = res_cat.filter(res_cat['category']=='Japanese')
jap_res_outside_bkk.count()

3053

### **จงแสดงรายชื่อร้านอาหารที่มีจำนวนการ check-in มากกว่า 300 ครั้ง**

In [25]:
wongnai_df.filter(wongnai_df['number_of_checkins'] > 300).select('name').show()

+--------------------+
|                name|
+--------------------+
|  เจ๊โอว ข้าวต้มเป็ด|
|            มุมอร่อย|
|            มนต์นมสด|
|             อบอร่อย|
|      ต๋อง เต็ม โต๊ะ|
|     The Glass House|
| Annyeong Korean BBQ|
|         กล้วยน้ำว้า|
|สวนผัก โอ้กะจู๋ อ...|
|              ไม้เอก|
+--------------------+



### **จงหาค่าเฉลี่ย (mean) ของราคาเฉลี่ย (avg_price) ของร้านอาหารทั้งหมด**

In [69]:
from pyspark.sql.functions import mean
wongnai_df.select(mean('avg_price')).show()

+-----------------+
|   avg(avg_price)|
+-----------------+
|326.0524635946539|
+-----------------+

