## Load the business users file to a dataframe

In [1]:
df_bus = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferschema", "true").load("yelp_business.csv")

In [2]:
df_bus = df_bus.withColumnRenamed("name", "business_name").withColumnRenamed("business_id", "bus_business_id")

In [3]:
df_bus.printSchema()

root
 |-- bus_business_id: string (nullable = true)
 |-- business_name: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- stars: double (nullable = true)
 |-- review_count: integer (nullable = true)
 |-- is_open: integer (nullable = true)
 |-- categories: string (nullable = true)



### Convert the yelp review files by removing new lines in the comments

In [5]:
%%bash

awk '/^"/ {if (f) print f; f=$0; next} {f=f FS $0} END {print f}' yelp_review.csv > yelp_review_fi.csv


In [6]:
%%bash
hdfs dfs -copyFromLocal /home/kkanagalananthapadm1/Scalable_Data/yelp_review_fi.csv

copyFromLocal: `yelp_review_fi.csv': File exists


## Load the cleaned review file to a dataframe

In [4]:
df_rev = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferschema", "true").load("yelp_review_fi.csv")

In [5]:
df_rev = df_rev.withColumnRenamed("stars", "review_stars").withColumnRenamed("useful", "review_useful").withColumnRenamed("date", "review_date").withColumnRenamed("cool", "review_cool").withColumnRenamed("funny", "review_funny")

In [6]:
df_rev.columns

['review_id',
 'user_id',
 'business_id',
 'review_stars',
 'review_date',
 'text',
 'review_useful',
 'review_funny',
 'review_cool']

## Load the users file to a dataframe

In [7]:
df_user = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferschema", "true").load("yelp_user.csv")

In [8]:
df_user = df_user.withColumnRenamed("name", "user_name").withColumnRenamed("cool", "user_cool").withColumnRenamed("funny", "user_funny").withColumnRenamed("useful", "user_useful").withColumnRenamed("user_id", "user_user_id")

In [9]:
df_user.columns

['user_user_id',
 'user_name',
 'review_count',
 'yelping_since',
 'friends',
 'user_useful',
 'user_funny',
 'user_cool',
 'fans',
 'elite',
 'average_stars',
 'compliment_hot',
 'compliment_more',
 'compliment_profile',
 'compliment_cute',
 'compliment_list',
 'compliment_note',
 'compliment_plain',
 'compliment_cool',
 'compliment_funny',
 'compliment_writer',
 'compliment_photos']

## Check for null values in the 3 dataframes and remove null values if any

In [10]:
from pyspark.sql.functions import isnan, when, count, col
#df_user.select([count(when(isnan(c), c)).alias(c) for c in df_user.columns]).show()
df_user.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_user.columns]).show()

+------------+---------+------------+-------------+-------+-----------+----------+---------+----+-----+-------------+--------------+---------------+------------------+---------------+---------------+---------------+----------------+---------------+----------------+-----------------+-----------------+
|user_user_id|user_name|review_count|yelping_since|friends|user_useful|user_funny|user_cool|fans|elite|average_stars|compliment_hot|compliment_more|compliment_profile|compliment_cute|compliment_list|compliment_note|compliment_plain|compliment_cool|compliment_funny|compliment_writer|compliment_photos|
+------------+---------+------------+-------------+-------+-----------+----------+---------+----+-----+-------------+--------------+---------------+------------------+---------------+---------------+---------------+----------------+---------------+----------------+-----------------+-----------------+
|           0|        0|           0|            0|      0|          0|         0|        0|  

In [11]:
from pyspark.sql.functions import isnan, when, count, col
#df_user.select([count(when(isnan(c), c)).alias(c) for c in df_user.columns]).show()
df_rev.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_rev.columns]).show()

+---------+-------+-----------+------------+-----------+----+-------------+------------+-----------+
|review_id|user_id|business_id|review_stars|review_date|text|review_useful|review_funny|review_cool|
+---------+-------+-----------+------------+-----------+----+-------------+------------+-----------+
|        0|   1674|       1890|        2057|       2365|2569|         2712|        2808|       2856|
+---------+-------+-----------+------------+-----------+----+-------------+------------+-----------+



In [12]:
df_rev = df_rev.na.drop()

In [13]:
df_rev.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_rev.columns]).show()

+---------+-------+-----------+------------+-----------+----+-------------+------------+-----------+
|review_id|user_id|business_id|review_stars|review_date|text|review_useful|review_funny|review_cool|
+---------+-------+-----------+------------+-----------+----+-------------+------------+-----------+
|        0|      0|          0|           0|          0|   0|            0|           0|          0|
+---------+-------+-----------+------------+-----------+----+-------------+------------+-----------+



In [14]:
df_bus.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_bus.columns]).show()

+---------------+-------------+------------+-------+----+-----+-----------+--------+---------+-----+------------+-------+----------+
|bus_business_id|business_name|neighborhood|address|city|state|postal_code|latitude|longitude|stars|review_count|is_open|categories|
+---------------+-------------+------------+-------+----+-----+-----------+--------+---------+-----+------------+-------+----------+
|              0|            0|           0|      0|   0|    0|          0|       1|        1|    0|           0|      0|         0|
+---------------+-------------+------------+-------+----+-----+-----------+--------+---------+-----+------------+-------+----------+



In [15]:
df_bus = df_bus.na.drop()

In [16]:
df_bus.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_bus.columns]).show()

+---------------+-------------+------------+-------+----+-----+-----------+--------+---------+-----+------------+-------+----------+
|bus_business_id|business_name|neighborhood|address|city|state|postal_code|latitude|longitude|stars|review_count|is_open|categories|
+---------------+-------------+------------+-------+----+-----+-----------+--------+---------+-----+------------+-------+----------+
|              0|            0|           0|      0|   0|    0|          0|       0|        0|    0|           0|      0|         0|
+---------------+-------------+------------+-------+----+-----+-----------+--------+---------+-----+------------+-------+----------+



## Join the 3 files to create meta table

In [17]:
df_bus_rev = df_bus.join(df_rev,df_rev.business_id==df_bus.bus_business_id)

In [18]:
df_bus_rev_user = df_bus_rev.join(df_user,df_bus_rev.user_id==df_user.user_user_id)

In [19]:
df_bus_rev_user.printSchema()

root
 |-- bus_business_id: string (nullable = true)
 |-- business_name: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- stars: double (nullable = true)
 |-- review_count: integer (nullable = true)
 |-- is_open: integer (nullable = true)
 |-- categories: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- review_stars: string (nullable = true)
 |-- review_date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- review_useful: string (nullable = true)
 |-- review_funny: string (nullable = true)
 |-- review_cool: string (nullable = true)
 |-- user_user_id: string (nullable = true)
 |-- user_name: string (nullable = tr

In [24]:
df_bus_rev_user.columns

['bus_business_id',
 'business_name',
 'neighborhood',
 'address',
 'city',
 'state',
 'postal_code',
 'latitude',
 'longitude',
 'stars',
 'review_count',
 'is_open',
 'categories',
 'review_id',
 'user_id',
 'business_id',
 'review_stars',
 'review_date',
 'text',
 'review_useful',
 'review_funny',
 'review_cool',
 'user_user_id',
 'user_name',
 'review_count',
 'yelping_since',
 'friends',
 'user_useful',
 'user_funny',
 'user_cool',
 'fans',
 'elite',
 'average_stars',
 'compliment_hot',
 'compliment_more',
 'compliment_profile',
 'compliment_cute',
 'compliment_list',
 'compliment_note',
 'compliment_plain',
 'compliment_cool',
 'compliment_funny',
 'compliment_writer',
 'compliment_photos']

## Save the metatable as a csv with only one partition

In [26]:
df_bus_rev_user.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("final_merged_data.csv")