# Generating useful Insights on Amazon Reviews

**Analytical Context:** This notebook shows the preprocessing stage prior to model building and EDA using a large dataset stored in batch and streaming about customer reviews on Amazon. We will find some useful features to address an outlook about what kind of brands and products are prone to get good or bad reviews, so to give an insight we are going to develop certain methods to determine in which cases some product fulfills expectations or not.

**Business Problem:** Our main task with this exercise will be to answer the following questions. What has the most impact on customer satisfaction? Can we infer customer segment based on their review patterns?

## Import Libraries and Create Spark Session

In [1]:
!pip install -r requirements.txt

Collecting glob2 (from -r requirements.txt (line 1))
  Downloading glob2-0.7.tar.gz (10 kB)
  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting azure-eventhub (from -r requirements.txt (line 2))
  Obtaining dependency information for azure-eventhub from https://files.pythonhosted.org/packages/66/b2/89671d93ba3c5af4a776350378765582490c3748eef3aa3a4582c3e87488/azure_eventhub-5.11.3-py3-none-any.whl.metadata
  Downloading azure_eventhub-5.11.3-py3-none-any.whl.metadata (65 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m65.3/65.3 kB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting findspark (from -r requirements.txt (line 3))
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Collecting azure-core<2.0.0,>=1.14.0 (from azure-eventhub->-r requirements.txt (line 2))
  Obtaining dependency information for azure-core<2.0.0,>=1.14.0 from https://files.pythonhosted.org/packages/c3/0a/32b17d776a6bf5ddaa9dbad0e88de9d28a55bec1d37b8d408cc7d2e5e28d/

In [2]:
import findspark
import pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

import pandas as pd

In [3]:
# Create Session
findspark.init()

spark = SparkSession.builder.appName("AmazonReviewsDatathon").getOrCreate()

In [4]:
spark

## Loading Amazon Reviews and Metadata (Batch)

#### *General Note*: 
*We get the Batch Data stored in Azure Data Lake Storage (ADLS), download and store every json gzip file in a local path. Then, we use a docker image https://hub.docker.com/r/jupyter/pyspark-notebook* *to read all data using spark within a container. Datacticos Team decided to choose this additional step in order to reduce costs in this initial phase*

In [5]:
from glob2 import glob
#Read all the json files from both sources of data
jsonFiles_reviews = glob('amazon_reviews/source-files/amazon_reviews/**/**/*.json.gz')
jsonFiles_meta = glob('amazon_reviews/source-files/amazon_metadata/**/*.json.gz')

In [6]:
#Check the lists of partitioned json files 
print("jsonFiles_reviews: ")
print(jsonFiles_reviews[0:5])
print("---------------")
print("jsonFiles_meta: ")
print(jsonFiles_meta[0:5])

jsonFiles_reviews: 
['amazon_reviews/source-files/amazon_reviews/partition_1/part-00000-tid-9136122565017344171-3f98196e-e0c5-4bb5-90cc-d523170ef713-86080-1-c000.json.gz', 'amazon_reviews/source-files/amazon_reviews/partition_10/part-00000-tid-698064602200227711-29b88890-b701-4ddb-82cf-535e4b44c9cf-89301-1-c000.json.gz', 'amazon_reviews/source-files/amazon_reviews/partition_10/part-00001-tid-698064602200227711-29b88890-b701-4ddb-82cf-535e4b44c9cf-89302-1-c000.json.gz', 'amazon_reviews/source-files/amazon_reviews/partition_100/part-00000-tid-6076830777214137320-043be143-2d8f-4c2a-a619-bb4ab95fa4bb-121464-1-c000.json.gz', 'amazon_reviews/source-files/amazon_reviews/partition_100/part-00011-tid-6076830777214137320-043be143-2d8f-4c2a-a619-bb4ab95fa4bb-121475-1-c000.json.gz']
---------------
jsonFiles_meta: 
['amazon_reviews/source-files/amazon_metadata/json_files/part-00000-tid-1001410877349735942-21bdbaae-698f-415f-9a21-69d6c2866481-1675-1-c000.json.gz', 'amazon_reviews/source-files/amazo

In [6]:
#Load the data from Amazon reviews using spark
df_reviews = spark.read.option("multiline","false").option("compression", "gzip").json(jsonFiles_reviews)

In [7]:
#Load the metadata using spark
df_meta = spark.read.option("multiline","false").option("compression", "gzip").json(jsonFiles_meta)

## Examining the Data 

In the following steps, we are going to get closer to the data, this will help us to understand what we can use, see some properties and discard some aspects that we might consider not necessary to fulfill our main business target.

### Amazon Reviews (Batch)

#### **Get Familiar with the Data**

In [9]:
# Print df_reviews Schema to see what attributes we have and what data type they belong
df_reviews.printSchema()

root
 |-- asin: string (nullable = true)
 |-- image: string (nullable = true)
 |-- overall: string (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- style: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: string (nullable = true)
 |-- verified: string (nullable = true)
 |-- vote: string (nullable = true)



In [10]:
# Check raw data
df_reviews.show(5)

+----------+-----+-------+--------------------+--------------+---------------+--------------------+--------------------+--------------+--------+----+
|      asin|image|overall|          reviewText|    reviewerID|   reviewerName|               style|             summary|unixReviewTime|verified|vote|
+----------+-----+-------+--------------------+--------------+---------------+--------------------+--------------------+--------------+--------+----+
|B00005QDPX| null|    5.0|Crazy Taxi is by ...|A36TDX8DY2XK5Q|       Some Kid|                null|It's Party Time! ...|    1054252800|   false|null|
|B000MXIMKK| null|    4.0|I love these pant...| AZZ1YPRM2FAUH| David Colgrove|{"Size:":" 33W x ...|Durable, good loo...|    1459900800|    true|   6|
|B00005QDPX| null|    2.0|Anyone who hasn't...|A3PASG15BRR40D|       SleepyJD|                null|A fun game that g...|    1049414400|   false|null|
|B000MXIMKK| null|    5.0|Great pants and n...|A388UHKJN07JJT|            TJK|{"Size:":" 42W x ...| 

In [10]:
# Count records number 
df_reviews.count()

279665000

We notice that `df_reviews` is a huge dataset with over 270MM records. We note that some features are categorical so we consider important to make a brief exploration to get the distribution of them (*overall* and *verified*). After that, we consider to drop some columns in order to shorten the shape of data and make our analysis more efficient. 

In [16]:
# Register df as Temp table
temp_view_name = "tempvw_reviews"
df_reviews.createOrReplaceTempView(temp_view_name)

In [12]:
# Check the total rating levels ordered 
spark.sql("""
SELECT overall, COUNT(*) as count 
FROM tempvw_reviews 
GROUP BY overall 
ORDER BY overall DESC
""").show()

+-------+---------+
|overall|    count|
+-------+---------+
|    5.0|179369728|
|    4.0| 46643396|
|    3.0| 21573678|
|    2.0| 12577806|
|    1.0| 19500378|
|    0.0|       14|
+-------+---------+



We note that more than half of the reviews are good, so for our analysis we get that most of the products are well reviewed.

In [13]:
# Check how many reviewers the dataset has
result_query_total_reviewers = spark.sql("""
SELECT COUNT(DISTINCT reviewerID) as total_reviewers
FROM tempvw_reviews 
""")

In [14]:
result_query_total_reviewers.show()

+---------------+
|total_reviewers|
+---------------+
|       34105431|
+---------------+



In [15]:
# Check how many verified and not verified reviewers the dataset has
spark.sql("""
SELECT COUNT(DISTINCT reviewerID) as total_reviewers, verified
FROM tempvw_reviews 
GROUP BY verified
""").show()

+---------------+--------+
|total_reviewers|verified|
+---------------+--------+
|       10954035|   false|
|       28705780|    true|
+---------------+--------+



We see that most of the reviewers are true, so we focus our analysis only with verified due to the fact that they represent effective orders and an opinion more sensible. (e.g. customer used the product)

#### **Data Cleaning**

#### Selecting, Filtering and Casting

In [9]:
# Get only verified reviews
new_df_reviews_filtered = df_reviews.filter(df_reviews["verified"] == "true")

In [10]:
# Get the reviewTime converting from unixtime to datetime 
new_df_reviews = new_df_reviews_filtered.withColumn("reviewTime", from_unixtime("unixReviewTime", "yyyy-MM-dd"))

In [20]:
new_df_reviews.show(5)

+----------+-----+-------+--------------------+--------------+--------------+--------------------+--------------------+--------------+--------+----+----------+
|      asin|image|overall|          reviewText|    reviewerID|  reviewerName|               style|             summary|unixReviewTime|verified|vote|reviewTime|
+----------+-----+-------+--------------------+--------------+--------------+--------------------+--------------------+--------------+--------+----+----------+
|B000MXIMKK| null|    4.0|I love these pant...| AZZ1YPRM2FAUH|David Colgrove|{"Size:":" 33W x ...|Durable, good loo...|    1459900800|    true|   6|2016-04-06|
|B000MXIMKK| null|    5.0|Great pants and n...|A388UHKJN07JJT|           TJK|{"Size:":" 42W x ...|       Great Product|    1459900800|    true|null|2016-04-06|
|B000MXIMKK| null|    4.0|These are pretty ...|A1361PH7D34W8R|          Kyle|{"Size:":" 36W x ...|  Good Overall Value|    1459900800|    true|null|2016-04-06|
|B000MXIMKK| null|    1.0|I purchased tw

In [11]:
# Drop the attributes that we consider least relevant
new_df_reviews_2 = new_df_reviews.drop("image", "style", "unixReviewTime", "reviewerName", "verified")

In [22]:
new_df_reviews_2.show(5)

+----------+-------+--------------------+--------------+--------------------+----+----------+
|      asin|overall|          reviewText|    reviewerID|             summary|vote|reviewTime|
+----------+-------+--------------------+--------------+--------------------+----+----------+
|B000MXIMKK|    4.0|I love these pant...| AZZ1YPRM2FAUH|Durable, good loo...|   6|2016-04-06|
|B000MXIMKK|    5.0|Great pants and n...|A388UHKJN07JJT|       Great Product|null|2016-04-06|
|B000MXIMKK|    4.0|These are pretty ...|A1361PH7D34W8R|  Good Overall Value|null|2016-04-06|
|B000MXIMKK|    1.0|I purchased two p...| ALBKZQ7DWPMC6|but I'm definitel...|null|2016-04-06|
|B000MXIMKK|    4.0|       great product|A3GBAA7OEESH78|          Four Stars|null|2016-04-05|
+----------+-------+--------------------+--------------+--------------------+----+----------+
only showing top 5 rows



In [12]:
# Convert overall and vote fields to integer and reviewTime to date format
new_df_reviews_cast = new_df_reviews_2.withColumn("overall", col("overall").cast("integer"))\
.withColumn("vote", col("vote").cast("integer")).withColumn("reviewTime", to_date(col("reviewTime"), "yyyy-MM-dd"))

In [24]:
new_df_reviews_cast.show(5)

+----------+-------+--------------------+--------------+--------------------+----+----------+
|      asin|overall|          reviewText|    reviewerID|             summary|vote|reviewTime|
+----------+-------+--------------------+--------------+--------------------+----+----------+
|B000MXIMKK|      4|I love these pant...| AZZ1YPRM2FAUH|Durable, good loo...|   6|2016-04-06|
|B000MXIMKK|      5|Great pants and n...|A388UHKJN07JJT|       Great Product|null|2016-04-06|
|B000MXIMKK|      4|These are pretty ...|A1361PH7D34W8R|  Good Overall Value|null|2016-04-06|
|B000MXIMKK|      1|I purchased two p...| ALBKZQ7DWPMC6|but I'm definitel...|null|2016-04-06|
|B000MXIMKK|      4|       great product|A3GBAA7OEESH78|          Four Stars|null|2016-04-05|
+----------+-------+--------------------+--------------+--------------------+----+----------+
only showing top 5 rows



In [19]:
new_df_reviews_cast.printSchema()

root
 |-- asin: string (nullable = true)
 |-- overall: integer (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- vote: integer (nullable = true)
 |-- reviewTime: date (nullable = true)



In [13]:
# Drop duplicates from our df_reviews 
new_df_reviews_cast_unique = new_df_reviews_cast.dropDuplicates()

In [18]:
# Register df as Temp table
temp_view_name = "tempvw_df_reviews"
new_df_reviews_cast_unique.createOrReplaceTempView(temp_view_name)

Given the huge amount of reviews available, we determine to shorten the size of data without losing some relevant aspects of data, we pick out one random case with a product to see how many reviews, overall and votes it has to be able to determine if could be better get a mean of the overall rating. 

### One random Case

In [59]:
condition_1 = col("asin") == "0007232535"
new_df_reviews_filtered = new_df_reviews_cast_unique.filter(condition_1)

In [60]:
new_df_reviews_filtered.show()

+----------+-------+--------------------+--------------+--------------------+----+----------+
|      asin|overall|          reviewText|    reviewerID|             summary|vote|reviewTime|
+----------+-------+--------------------+--------------+--------------------+----+----------+
|0007232535|      1|Just want to issu...|A12JDACBRFSWJ6|Buyer Beware -- T...|  10|2007-04-17|
+----------+-------+--------------------+--------------+--------------------+----+----------+



As we get deeper in data we note that some products has many reviews nonetheless some reviews are not popular, so in order to get most relevant without losing overall score from reviews that do not have any votes, we decide to make an average per asin and drop reviews that were not voted to keep the most popular ones. Furthermore, we consider they might cause noise to our analysis.

In [14]:
# We get the average overall score and reviews per product before filtering only the most relevant reviews in order to reduce the possible noise in data
average_scores_df = new_df_reviews_cast_unique.groupBy("asin").agg(round(avg(col("overall")),3).alias("avg_overall"), count('asin').alias('count_asin'))

In [19]:
# Filtered only reviews with more than two votes
df_filtered_relevant_reviews = spark.sql("""
SELECT asin, reviewText, reviewerID, summary, vote, reviewTime, overall
FROM tempvw_df_reviews 
WHERE vote >= 3
""")

In [20]:
# Inner Join to combine average overall scores and relevant reviews 
df_reviews_joined = df_filtered_relevant_reviews.join(average_scores_df, on=["asin"], how="inner")

In [21]:
# Register df as Temp table
temp_view_name = "tempvw_df_reviews_joined"
df_reviews_joined.createOrReplaceTempView(temp_view_name)

We consider only products with at least 5 reviews in order to have a general overview of any of these, with less reviews we consider that it is not enough data to give a proper outcome.

In [22]:
# Filtered only reviews with more than two votes
df_reviews_final = spark.sql("""
SELECT *
FROM tempvw_df_reviews_joined 
WHERE count_asin >= 5
""")

In [27]:
df_reviews_final.show(10)

+----------+--------------------+--------------+--------------------+----+----------+-------+-----------+----------+
|      asin|          reviewText|    reviewerID|             summary|vote|reviewTime|overall|avg_overall|count_asin|
+----------+--------------------+--------------+--------------------+----+----------+-------+-----------+----------+
|0005111587|My Catholic Faith...|A3GWD4C71EXKU0|A Revival of One ...|  41|2002-02-08|      5|        5.0|        17|
|0005111587|VERY INFORMATIVE ...|A3AS7SWOD0PCKK|the CATECHISM OF ...|  25|2003-03-26|      5|        5.0|        17|
|0005111587|MY CATHOLIC FAITH...|A1ODC1A64EVC8J|Superb Catechism ...|  15|2006-09-14|      5|        5.0|        17|
|0005111587|If you think you ...|A34F2I70FAWOVA|    My Catholic Life|   3|2014-04-26|      5|        5.0|        17|
|0005111587|I learned of this...|A1OJ4DADMZNIIM|   My Catholic Faith|  17|2007-11-05|      5|        5.0|        17|
|0007148941|This book is goin...|A21YH1OMDSPASA|A pleasing end t

### Amazon Meta (Batch)

#### **Get Familiar with the Data**

In [25]:
# Print df_meta Schema to see what attributes we have and what data type they belong
df_meta.printSchema()

root
 |-- also_buy: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- also_view: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- asin: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- date: string (nullable = true)
 |-- description: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- details: struct (nullable = true)
 |    |-- \n    Item Weight: \n    : string (nullable = true)
 |    |-- \n    Package Dimensions: \n    : string (nullable = true)
 |    |-- \n    Product Dimensions: \n    : string (nullable = true)
 |    |--  Date first listed on Amazon:: string (nullable = true)
 |    |--  UNSPSC Code:: string (nullable = true)
 |    |-- 3.5" and 5.25" disks:: string (nullable = true)
 |    |-- 3.5" disk:: string (nullable = true)
 |    |-- 5.25" disk:: string (nullable = true)
 |    |-- ASIN:: string (n

We see `df_meta` offers some relevant description about the products and categories such that we can summarize our data in a structured way, so we need to drop the fields that are unnecessary to our analysis 

In [62]:
# Check the content within df_meta
df_meta.show(5)

+--------------------+---------+----------+------------+--------------------+------------+--------------------+--------------------+--------------------+---+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-----+--------------------+
|            also_buy|also_view|      asin|       brand|            category|        date|         description|             details|             feature|fit|               image|            main_cat|  price|                rank|        similar_item|               tech1|tech2|               title|
+--------------------+---------+----------+------------+--------------------+------------+--------------------+--------------------+--------------------+---+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-----+--------------------+
|                  []|       []|B01FVYVI00|NAUTICALMART|[Tools & Home Imp...|May 19, 2016|[NAUTICALMART CH

#### **Data Cleaning**

#### Selecting, Filtering and Casting

In [23]:
# Create temp view to query data
temp_view_name = "tempvw_meta"
df_meta.createOrReplaceTempView(temp_view_name)

In [24]:
# Filter data to avoid urls and empty records (without category)
filtered_df_meta = spark.sql("""
SELECT DISTINCT asin, brand, main_cat, price, rank, description, category, title
FROM tempvw_meta 
WHERE main_cat NOT LIKE '%https%' AND main_cat NOT LIKE ''
""")

In [27]:
# Replace money sign to cast price to float and clean mistype categories
df_meta_clean = filtered_df_meta.withColumn("price", regexp_replace(col("price"), "\\$", "").cast("float")).withColumn("main_cat", regexp_replace(col("main_cat"), "&amp;", "&"))

In [74]:
df_meta_clean.show(5)

+----------+--------------------+----------+-----+--------------------+--------------------+--------------------+--------------------+
|      asin|               brand|  main_cat|price|                rank|         description|            category|               title|
+----------+--------------------+----------+-----+--------------------+--------------------+--------------------+--------------------+
|B000C2E6HE|             Fel-Pro|Automotive| 6.29|[">#804,748 in Au...|[Fel-Pro gaskets ...|[Automotive, Repl...|Fel-Pro 61093 Thr...|
|B000C2EAZW|             Fel-Pro|Automotive|58.99|[">#743,066 in Au...|[This Fel-Pro hea...|[Automotive, Repl...|Fel-Pro HS9170PT1...|
|B000C2GI52|             Fel-Pro|Automotive|14.73|[">#150,455 in Au...|[Fel-Pro gaskets ...|[Automotive, Repl...|Fel-Pro VS50500R ...|
|B000C2M8LA|        Four Seasons|Automotive| 9.06|[">#397,901 in Au...|[Air Conditioning...|[Automotive, Repl...|Four Seasons 2670...|
|B000C2M8VA|Four Seasons/Trumark|Automotive|35.88|[">#4

In [25]:
# Print df_meta_clean Schema to see what attributes we have and what data type they belong
df_meta_clean.printSchema()

root
 |-- asin: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- main_cat: string (nullable = true)
 |-- price: float (nullable = true)
 |-- rank: string (nullable = true)
 |-- description: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title: string (nullable = true)



## Combine both Amazon Sources (Batch)

In [28]:
# Inner Join to combine Reviews with Meta (Get our master table)
df_reviews_master_table = df_reviews_final.join(df_meta_clean, on=["asin"], how="inner")

In [33]:
df_reviews_master_table.printSchema()

root
 |-- asin: string (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- vote: integer (nullable = true)
 |-- reviewTime: date (nullable = true)
 |-- overall: integer (nullable = true)
 |-- avg_overall: double (nullable = true)
 |-- count_asin: long (nullable = false)
 |-- brand: string (nullable = true)
 |-- main_cat: string (nullable = true)
 |-- price: float (nullable = true)
 |-- rank: string (nullable = true)
 |-- description: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title: string (nullable = true)



In [34]:
# Check the results 
df_reviews_master_table.show(5)

+----------+--------------------+--------------+--------------------+----+----------+-------+-----------+----------+--------------------+--------+-----+--------------------+--------------------+--------------------+--------------------+
|      asin|          reviewText|    reviewerID|             summary|vote|reviewTime|overall|avg_overall|count_asin|               brand|main_cat|price|                rank|         description|            category|               title|
+----------+--------------------+--------------+--------------------+----+----------+-------+-----------+----------+--------------------+--------+-----+--------------------+--------------------+--------------------+--------------------+
|0041300033|At the risk of so...| AKEE4Z5O1O1NM|       Life altering|   6|2016-03-29|      5|       4.75|         8|         C.H. Brooks|   Books| 6.03|13,662,400 in Boo...|[Lang:- eng, Page...|[Books, Literatur...|Practice of Autos...|
|0091493714|There are two typ...|A22KH6LRBNXCLP|  No

## Export master table (Batch) to Parquet 

In [29]:
path_parquet = 'reviews_master_parquet'
partition_column = 'main_cat'
df_reviews_master_table.write.partitionBy(partition_column).option("compression", "gzip").parquet(path_parquet)

## Loading Amazon Reviews (Streaming)

We are going to implement a function to get data from Streaming using `Azure Event Hub`.

In [6]:
#Saving data as json

import datetime

output_file = f"streaming_data/streaming_data_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.json"


import json
import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

import nest_asyncio # to allow for event loop in jupyter notebook
nest_asyncio.apply()

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

#Credentials
EVENT_HUB_CONN_STR='Endpoint=sb://factored-datathon.servicebus.windows.net/;SharedAccessKeyName=datathon_group_1;SharedAccessKey=2GETvVt0FxyM0bo0Qau4inlmC/w3t4Uut+AEhAnAEgk=;EntityPath=factored_datathon_amazon_reviews_1'
EVENTHUB_NAME="factored_datathon_amazon_reviews_1"
CONSUMER_GROUP="datacticos"

#Connection to Event Hub
async def on_event(partition_context, event):
    #logger.info(f"Received event from partition {partition_context.partition_id} with sequence number {event.sequence_number} and data:\n {list(event.body)}")
    #print(f"Received event from partition {partition_context.partition_id} with sequence number {event.sequence_number} and data:\n {list(event.body)}")
    
    with open(output_file, "a+") as file:
        data_str=list(event.body)[0].decode('utf-8')
        #convert string to json
        data=json.loads(data_str)
        json.dump(data, file)
        file.write("\n")

    await partition_context.update_checkpoint(event)

async def receive():
    client =  EventHubConsumerClient.from_connection_string(conn_str=EVENT_HUB_CONN_STR,consumer_group=CONSUMER_GROUP,eventhub_name=EVENTHUB_NAME,)
    async with client:
        await client.receive(
            on_event=on_event,
            starting_position="-1",  # "-1" is from the beginning of the partition.
            #max_wait_time=5,  # seconds
        )
        # receive events from specified partition:
        # await client.receive(on_event=on_event, partition_id='0')

if __name__ == '__main__':
    
    with open(output_file, "w"):
            pass  # Create or clear the output file before starting the process

    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive())

INFO:azure.eventhub.aio._eventprocessor.event_processor:EventProcessor '1e1d6300-748c-4a94-9026-c5076e687dbc' is being started
INFO:azure.eventhub._pyamqp.aio._connection_async:Connection state changed: None -> <ConnectionState.START: 0>
INFO:azure.eventhub._pyamqp.aio._connection_async:Connection state changed: <ConnectionState.START: 0> -> <ConnectionState.HDR_SENT: 2>
INFO:azure.eventhub._pyamqp.aio._connection_async:Connection state changed: <ConnectionState.HDR_SENT: 2> -> <ConnectionState.HDR_SENT: 2>
INFO:azure.eventhub._pyamqp.aio._connection_async:Connection state changed: <ConnectionState.HDR_SENT: 2> -> <ConnectionState.OPEN_PIPE: 4>
INFO:azure.eventhub._pyamqp.aio._session_async:Session state changed: <SessionState.UNMAPPED: 0> -> <SessionState.BEGIN_SENT: 1>
INFO:azure.eventhub._pyamqp.aio._link_async:Link state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1>
INFO:azure.eventhub._pyamqp.aio._management_link_async:Management link receiver state changed: <Link

KeyboardInterrupt: 

In [40]:
#Load the data from Amazon reviews streaming using spark
df_streaming_reviews = spark.read.json(f"streaming_data/")

In [41]:
df_streaming_reviews.show()

+----------+--------------------+------------------+-------+----------------+--------------------+--------------+--------------------+--------------------+--------------------+--------+----+
|      asin|               image|internal_partition|overall|partition_number|          reviewText|    reviewerID|        reviewerName|               style|             summary|verified|vote|
+----------+--------------------+------------------+-------+----------------+--------------------+--------------+--------------------+--------------------+--------------------+--------+----+
|B0002F7524|                null|                 0|    5.0|       140010213|this thing is the...| ART43EESRGMNC|             Frauwst|                null|     Huge difference|    true|null|
|B015W1VJEW|                null|                 0|    5.0|       140010214|Great quality. Ve...| AL1B1TMCILE5K|                  LM|{"Size:":" 10 by ...|          Five Stars|    true|   5|
|B0002F7524|                null|            

In [43]:
df_streaming_reviews.count()

60000

In [51]:
# Drop the attributes that we consider least relevant
df_streaming_reviews_2 = df_streaming_reviews.drop("image", "style", "reviewerName", "verified", "internal_partition")

In [52]:
df_streaming_reviews_2.show()

+----------+-------+----------------+--------------------+--------------+--------------------+----+
|      asin|overall|partition_number|          reviewText|    reviewerID|             summary|vote|
+----------+-------+----------------+--------------------+--------------+--------------------+----+
|B0002F7524|    5.0|       140010213|this thing is the...| ART43EESRGMNC|     Huge difference|null|
|B015W1VJEW|    5.0|       140010214|Great quality. Ve...| AL1B1TMCILE5K|          Five Stars|   5|
|B0002F7524|    4.0|       140010215|I bought this to ...| AA1F8W6QUHBOJ|Great clutch for ...|null|
|B015W1VJEW|    5.0|       140010216|This is top-notch...| ACSDL76U6MB41|Top quality, beau...|   9|
|B0002F7524|    5.0|       140010217|This thing grips ...|A14UOWJ7SMSC6O|So Much Better th...|null|
|B015W1VJEW|    4.0|       140010218|It's a nice looki...|A27VBEK94S3ZI6|It's a nice looki...|  18|
|B0002F7524|    5.0|       140010219|I was using a ver...| ABPP6J9PWJ25O|Pearl Super Grip ...|null|


In [53]:
# Convert overall and vote fields to integer and reviewTime to date format
df_streaming_reviews_2 = df_streaming_reviews_2.withColumn("overall", col("overall").cast("integer"))\
.withColumn("vote", col("vote").cast("integer"))

In [72]:
# Inner Join to combine Streaming Reviews with Meta (Get our master table for streaming data)
df_reviews_streaming_master_table = df_streaming_reviews_2.join(df_meta_clean, on=["asin"], how="left")

## Export Reviews (Streaming) to Parquet 

In [67]:
## Export master table (Batch) to Parquet 
path_parquet = 'reviews_streaming_data'
df_reviews_streaming_master_table.write.mode("overwrite").option("compression", "gzip").parquet(path_parquet)