# Data Pre Processing

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic") \
    .getOrCreate()

In [3]:
df = spark.read.json('C:\opt\FP-Big-Data\Dataset/review.json')
df.show()

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|ujmEBvifdJM6h6RLv...|   0|2013-05-07 04:34:36|    1|Q1sbwvVQXV2734tPg...|  1.0|Total bill for th...|     6|hG7b0MtEbXx5QzbzE...|
|NZnhc2sEQy3RmzKTZ...|   0|2017-01-14 21:30:33|    0|GJXCdrto3ASJOqKeV...|  5.0|I *adore* Travis ...|     0|yXQM5uF2jS6es16SJ...|
|WTqjgwHlXbSFevF32...|   0|2016-11-09 20:09:03|    0|2TzJjDVDEuAW6MR5V...|  5.0|I have to say tha...|     3|n6-Gk65cPZL6Uz8qR...|
|ikCg8xy5JIg_NGPx-...|   0|2018-01-09 20:56:38|    0|yi0R0Ugj_xUx_Nek0...|  5.0|Went in for a lun...|     0|dacAIZ6fTM6mqwW5u...|
|b1b1eb3uo-w561D0Z...|   0|2018-01-30 23:07:38|    0|11a8sVPMUFtaC7_AB...|  1.0|Today was 

In [4]:
df.createOrReplaceTempView("business")

In [5]:
data = spark.sql("SELECT DISTINCT user_id FROM business")
data.show()

+--------------------+
|             user_id|
+--------------------+
|rs3pq6wRmaSIADCIn...|
|xS6kmkMXp0PRrFwkS...|
|aNOSjqQFsrfcgmFtO...|
|-9da1xk7zgnnfO1uT...|
|PLjruA-EMskWfirBU...|
|O-frog8VhICKAT0gr...|
|7o473jeLWW-zgKN-Q...|
|L1XxGWFJ3S7xBQCT8...|
|D2ljL5ejuqpa4f8fn...|
|CzkWUMIYDxUSetfCR...|
|5avk-VCo_6Bx65ct1...|
|oKWVVqPWVzq5s6nS4...|
|e5kxYMksMVWApEJdO...|
|f-6oae7TltlfJicUi...|
|NL9jmu5jSkCdMM-i9...|
|z6gjzFENiQf-K3lPy...|
|Al2g2P9gt057Julh1...|
|midS4e50ZmuOeGyNm...|
|yTr8nlIjQCJWc0ZIC...|
|yb0AdKzhYwQIlt47r...|
+--------------------+
only showing top 20 rows



# Converting Encrypted Data 

The data column user_id and business_id is encrypted, so we must convert it into integer.

In [6]:
data.createOrReplaceTempView("newId")
new_user_id = spark.sql("SELECT user_id, ROW_NUMBER() OVER (ORDER BY user_id) AS userId FROM newId")
new_user_id.show()

+--------------------+------+
|             user_id|userId|
+--------------------+------+
|---1lKK3aKOuomHnw...|     1|
|---89pEy_h9PvHwcH...|     2|
|---94vtJ_5o_nikEs...|     3|
|---PLwSf5gKdIoVny...|     4|
|---cu1hq55BP9DWVX...|     5|
|---fhiwiwBYrvqhpX...|     6|
|---udAKDsn0yQXmzb...|     7|
|--0LlX_UcypHnxW-4...|     8|
|--0RtXvcOIE4XbErY...|     9|
|--0VhLFv2XfEKHL0Y...|    10|
|--0WZ5gklOfbUIodJ...|    11|
|--0kuuLmuYBe3Rmu0...|    12|
|--0sXNBv6IizZXuV-...|    13|
|--0zxhZTSLZ7w1hUD...|    14|
|--104qdWvE99vaoIs...|    15|
|--1UpCuUDJQbqiuFX...|    16|
|--1av6NdbEbMiuBr7...|    17|
|--1mPJZdSY9KluaBY...|    18|
|--23ebARX_Thb82aj...|    19|
|--26jc8nCJBy4-7r3...|    20|
+--------------------+------+
only showing top 20 rows



In [7]:
df = df.join(new_user_id, df.user_id == new_user_id.user_id)
df.show()

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+--------------------+------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|             user_id|userId|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+--------------------+------+
|CHbbzfGr69CQV1fh6...|   0|2015-08-18 19:11:04|    3|bklQqMXHT-JYiVQdB...|  1.0|I took my motorcy...|     3|--CJT4d-S8UhwqHe0...|--CJT4d-S8UhwqHe0...|    89|
|KG4dklrK6sGerhNri...|   0|2015-08-18 19:22:17|    0|5aq0aGO3LNcAIV2Rl...|  4.0|Jason was very he...|     0|--CJT4d-S8UhwqHe0...|--CJT4d-S8UhwqHe0...|    89|
|pmrHuQiy25xKB86tb...|   0|2014-03-21 17:09:59|    1|a0r23pA6D4HDLb9P5...|  4.0|They ran out of c...|     0|-0Ji0nOyFe-4yo8BK...|-0Ji0nOyFe-4yo8BK...|   514|
|QhwaGi1niDeVEHMRt...|   0|2012-07-06 03:43:34|    3

In [8]:
data2 = spark.sql("SELECT DISTINCT business_id FROM business")
data2.show()

+--------------------+
|         business_id|
+--------------------+
|f4mh1Y0rnvbJRfQ3j...|
|cKwg6HFaLYXl7Ar0r...|
|jcpgiXF0PCyS9hrvq...|
|R_M4P9XetEM-aLE7e...|
|DEBqmgxv2yhJ93LqG...|
|Cml4Yt5cTx64cOMan...|
|bo3SQVtErnMOqO6lk...|
|Cl-xl1vTUwHeaGgBx...|
|oIEmXWLtoh5blz-iw...|
|Op2IR4FffXZ5KXYFn...|
|yB5FMuc9Y3oyhsOmu...|
|cEqOh78v1g1RCWHyu...|
|lt8IW9Bpy9GMeKGxy...|
|uC3qwaxsOkdJzpOc0...|
|686oeWNsbc-aczplC...|
|gPuxh3HNvoVt8aWVW...|
|mA27CG2U3ytmkxIGV...|
|x6qH9HXhzuKM03jcZ...|
|74LU6K2ro5AQXKT0J...|
|TdefcbsFAj6WXHwlG...|
+--------------------+
only showing top 20 rows



In [10]:
data2.createOrReplaceTempView("newId2")
new_business_id = spark.sql("SELECT business_id, ROW_NUMBER() OVER (ORDER BY business_id) AS businessId FROM newId2")
new_business_id.show()


+--------------------+----------+
|         business_id|businessId|
+--------------------+----------+
|--1UhMGODdWsrMast...|         1|
|--6MefnULPED_I942...|         2|
|--7zmmkVg-IMGaXbu...|         3|
|--8LPVSo5i0Oo61X0...|         4|
|--9QQLMTbFzLJ_oT-...|         5|
|--9e1ONYQuAa-CB_R...|         6|
|--DaPTJW3-tB1vP-P...|         7|
|--DdmeR16TRb3LsjG...|         8|
|--EF5N7P70J_UYBTP...|         9|
|--EX4rRznJrltyn-3...|        10|
|--FBCX-N37CMYDfs7...|        11|
|--FLdgM0GNpXVMn74...|        12|
|--GM_ORV2cYS-h38D...|        13|
|--Gc998IMjLn8yr-H...|        14|
|--I7YYLada0tSLkOR...|        15|
|--KCl2FvVQpvjzmZS...|        16|
|--KQsXc-clkO7oHRq...|        17|
|--Ni3oJ4VOqfOEu7S...|        18|
|--Rsj71PBe31h5Ylj...|        19|
|--S62v0QgkqQaVUhF...|        20|
+--------------------+----------+
only showing top 20 rows



In [11]:
df = df.join(new_business_id, df.business_id == new_business_id.business_id)
df.show()

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+--------------------+-------+--------------------+----------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|             user_id| userId|         business_id|businessId|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+--------------------+-------+--------------------+----------+
|--9e1ONYQuAa-CB_R...|   4|2014-03-24 19:52:40|    0|fqyv4RdeKmHd6kW4M...|  5.0|WOW.

I came to V...|     4|0y8ORuC2X1i1UF6SG...|0y8ORuC2X1i1UF6SG...|  50375|--9e1ONYQuAa-CB_R...|         6|
|--9e1ONYQuAa-CB_R...|   0|2018-04-26 03:08:43|    0|iBzBKf0EnBxNNfCe9...|  5.0|We went here with...|     0|3qz_dfwbFwTQeDRzy...|3qz_dfwbFwTQeDRzy...| 123873|--9e1ONYQuAa-CB_R...|         6|
|--9e1ONYQuAa-CB_R...|   0|2011-09-27 16:24:0

In [12]:
query1 = df.select('userId', 'businessId', 'stars')

In [13]:
query1.show()

+-------+----------+-----+
| userId|businessId|stars|
+-------+----------+-----+
|  50375|         6|  5.0|
| 123873|         6|  5.0|
| 278174|         6|  2.0|
| 283084|         6|  5.0|
| 431625|         6|  4.0|
| 460724|         6|  5.0|
| 716480|         6|  4.0|
| 884420|         6|  4.0|
|1060886|         6|  5.0|
|1309129|         6|  3.0|
|1500052|         6|  5.0|
|   3432|         6|  5.0|
|  73292|         6|  5.0|
| 543660|         6|  4.0|
| 594236|         6|  5.0|
|1210032|         6|  4.0|
|1352147|         6|  3.0|
| 170942|         6|  2.0|
| 260633|         6|  2.0|
| 776993|         6|  5.0|
+-------+----------+-----+
only showing top 20 rows



In [14]:
# Convert to Pandas
import pandas as pd
query1Pandas = query1.toPandas()

In [15]:
# Save to single CSV
query1Pandas.to_csv("C:/opt/FP-Big-Data/Dataset/business_ratings.csv", index=False)