In [None]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.<enter version>'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETLProjectAnalysis").getOrCreate()

In [None]:
from pyspark import SparkFiles
# Load in user_data.csv from S3 into a DataFrame
url = "https://s3.amazonaws.com//dataviz-curriculum/day_3/ratings_and_sentiments.csv"
spark.sparkContext.addFile(url)

df = spark.read.option('header', 'true').csv(SparkFiles.get("ratings_and_sentiments.csv"), inferSchema=True, sep=',', timestampFormat="mm/dd/yy")
df.show(10)

+--------------------+--------------------+-----------------+----------+----------+---------+------------+---------+--------+------------+------------+----------+------------+-------------+------------+-----------+---------+----------+-------------+----------+
|    coffee_shop_name|         review_text|           rating|num_rating|cat_rating|bool_HIGH|overall_sent|vibe_sent|tea_sent|service_sent|seating_sent|price_sent|parking_sent|location_sent|alcohol_sent|coffee_sent|food_sent|hours_sent|internet_sent|local_sent|
+--------------------+--------------------+-----------------+----------+----------+---------+------------+---------+--------+------------+------------+----------+------------+-------------+------------+-----------+---------+----------+-------------+----------+
|The Factory - Caf...|11/25/2016 1 chec...| 5.0 star rating |         5|      HIGH|        1|           4|        3|       0|           0|           0|         0|           0|            0|           1|          3|   

## Transform DataFrame to fit coffe_rating table

In [None]:
shop_df = df.select(["coffee_shop_name","num_rating"])
shop_df.show()

+--------------------+----------+
|    coffee_shop_name|num_rating|
+--------------------+----------+
|The Factory - Caf...|         5|
|The Factory - Caf...|         4|
|The Factory - Caf...|         4|
|The Factory - Caf...|         2|
|The Factory - Caf...|         4|
|The Factory - Caf...|         4|
|The Factory - Caf...|         4|
|The Factory - Caf...|         5|
|The Factory - Caf...|         3|
|The Factory - Caf...|         5|
|The Factory - Caf...|         4|
|The Factory - Caf...|         3|
|The Factory - Caf...|         3|
|The Factory - Caf...|         5|
|The Factory - Caf...|         5|
|The Factory - Caf...|         5|
|The Factory - Caf...|         5|
|The Factory - Caf...|         4|
|The Factory - Caf...|         4|
|The Factory - Caf...|         4|
+--------------------+----------+
only showing top 20 rows



In [None]:
coffee_ratings_df = shop_df.groupby("coffee_shop_name").agg({"num_rating": "avg", "coffee_shop_name":"count"})
coffee_ratings_df.show()

+--------------------+-----------------------+------------------+
|    coffee_shop_name|count(coffee_shop_name)|   avg(num_rating)|
+--------------------+-----------------------+------------------+
|      Flitch Coffee |                     28| 4.821428571428571|
|Apanas Coffee & B...|                    136| 4.580882352941177|
|Arturo's Undergro...|                    100|               4.3|
|Lola Savannah Cof...|                      4|               5.0|
|Lola Savannah Cof...|                    100|              4.11|
|       Epoch Coffee |                    400|            3.8125|
|       Caffe Medici |                    243|4.1193415637860085|
|Figure 8 Coffee P...|                    100|               4.5|
|    Hot Mama's Cafe |                    100|              4.27|
|  Sorrento's Coffee |                    100|              4.26|
|  The Steeping Room |                    100|              3.96|
|Irie Bean Coffee ...|                    100|               4.3|
| Thunderb

In [None]:
from pyspark.sql.functions import desc
coffee_ratings_df = coffee_ratings_df.withColumnRenamed("count(coffee_shop_name)", "total_ratings")\
                                     .withColumnRenamed("avg(num_rating)", "avg_rating")
coffee_ratings_df.orderBy(desc("avg_rating")).show()

+--------------------+-------------+-----------------+
|    coffee_shop_name|total_ratings|       avg_rating|
+--------------------+-------------+-----------------+
|Lola Savannah Cof...|            4|              5.0|
|The Marvelous Vin...|           10|              5.0|
|Mañana Coffee & J...|           33|4.848484848484849|
|       Brian's Brew |           45|4.844444444444444|
|Third Coast Coffe...|           56|4.821428571428571|
|      Flitch Coffee |           28|4.821428571428571|
|   Kowabunga Coffee |           16|           4.8125|
|Venezia Italian G...|          200|             4.81|
|      Legend Coffee |           28|4.714285714285714|
|       Fleet Coffee |           57|4.701754385964913|
|    My Sweet Austin |           31| 4.67741935483871|
|         Dolce Neve |          100|             4.64|
|       Holy Grounds |           30|4.633333333333334|
|Anderson's Coffee...|          100|             4.62|
|Apanas Coffee & B...|          136|4.580882352941177|
|  Flat Tr

## Transform DataFrame to fit date_table table

In [None]:
review_df = df.select(["review_text"])

In [None]:
from pyspark.sql.functions import regexp_extract
review_df = review_df.withColumn("date", regexp_extract("review_text", "\d+/\d+/\d+", 0))\
      .withColumn("review_text", regexp_extract("review_text", "\d+/\d+/\d+(?:\s)(.*)", 1))\
      .select(["date", "review_text"])\
      .dropna()
review_df.show()

+----------+--------------------+
|      date|         review_text|
+----------+--------------------+
|11/25/2016|1 check-in Love l...|
| 12/2/2016|Listed in Date Ni...|
|11/30/2016|1 check-in Listed...|
|11/25/2016|Very cool vibe! G...|
| 12/3/2016|1 check-in They a...|
|11/20/2016|1 check-in Very c...|
|10/27/2016|2 check-ins Liste...|
| 11/2/2016|2 check-ins Love ...|
|10/25/2016|1 check-in Ok let...|
|11/10/2016|3 check-ins This ...|
|10/22/2016|1 check-in Listed...|
|11/20/2016|The store has A+ ...|
|11/17/2016|1 check-in Listed...|
| 12/5/2016|This is such a cu...|
|11/13/2016|Beautiful eccentr...|
| 11/9/2016|1 check-in Listed...|
| 11/6/2016|Really love the v...|
|10/25/2016|1 check-in Check ...|
|10/15/2016|1 check-in Note: ...|
| 12/1/2016|So much aesthetic...|
+----------+--------------------+
only showing top 20 rows



In [None]:
date_df = review_df.groupBy('date').agg({"date": "count"})
date_df = date_df.withColumnRenamed("count(date)", "review_count")
date_df.show()

+----------+------------+
|      date|review_count|
+----------+------------+
| 8/21/2016|          16|
| 6/29/2016|          10|
| 8/19/2013|           2|
| 2/27/2015|           5|
| 7/31/2016|          13|
| 3/17/2014|           7|
|11/14/2015|          11|
| 6/10/2011|           1|
|10/10/2009|           1|
| 4/27/2014|           1|
| 3/27/2009|           1|
| 12/8/2011|           1|
| 2/21/2014|           2|
| 8/31/2015|          10|
| 1/15/2015|           3|
| 3/16/2012|           1|
|  8/9/2016|           4|
|11/24/2016|           1|
|  8/2/2014|           5|
| 3/23/2011|           1|
+----------+------------+
only showing top 20 rows



In [None]:
date_df.orderBy(desc("review_count")).show()

+----------+------------+
|      date|review_count|
+----------+------------+
| 10/9/2016|          31|
| 9/18/2016|          30|
|11/20/2016|          27|
| 11/2/2016|          27|
| 12/2/2016|          26|
| 12/4/2016|          26|
| 9/15/2016|          25|
| 10/7/2016|          24|
| 11/6/2016|          24|
| 7/24/2016|          24|
| 4/17/2016|          23|
|10/25/2016|          23|
| 12/3/2016|          23|
| 12/1/2016|          23|
|  8/7/2016|          22|
| 6/27/2016|          22|
|  1/4/2016|          21|
| 1/17/2016|          21|
|11/21/2016|          21|
| 8/13/2016|          20|
+----------+------------+
only showing top 20 rows

