In [0]:
# pyspark functions
from pyspark.sql.functions import *
# URL processing
import urllib



In [0]:
%run /Workspace/Shared/authorisation.py

In [0]:

ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

# COMMAND ----------

AWS_S3_BUCKET = 'bucket-amazonfoods'
MOUNT_NAME = '/mnt/mount_s3'

# COMMAND ----------

SOURCE_URL = 's3a://%s:%s@%s' %(ACCESS_KEY,ENCODED_SECRET_KEY,AWS_S3_BUCKET)
try:
    dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)
except:
    dbutils.fs.unmount(MOUNT_NAME)
    dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)



/mnt/mount_s3 has been unmounted.


In [0]:
file_location_aws = "/mnt/mount_s3/foodsAWS.json"
file_location_azure = "abfss://containeramazonfoods@saamazonfoods.dfs.core.windows.net/foodsAzure.json"

df_aws =   spark.read.json(file_location_aws, multiLine=True)
df_azure = spark.read.json(file_location_azure, multiLine=True)



In [0]:
print("Display Schema from AWS json file")
df_aws.printSchema()
print("Display Schema from Azure json file")
df_azure.printSchema()


Display Schema from AWS json file
root
 |-- product/productId: string (nullable = true)
 |-- review/helpfulness: string (nullable = true)
 |-- review/profileName: string (nullable = true)
 |-- review/score: string (nullable = true)
 |-- review/summary: string (nullable = true)
 |-- review/text: string (nullable = true)
 |-- review/time: string (nullable = true)
 |-- review/userId: string (nullable = true)

Display Schema from Azure json file
root
 |-- product/productId: string (nullable = true)
 |-- review/helpfulness: string (nullable = true)
 |-- review/profileName: string (nullable = true)
 |-- review/score: string (nullable = true)
 |-- review/summary: string (nullable = true)
 |-- review/text: string (nullable = true)
 |-- review/time: string (nullable = true)
 |-- review/userId: string (nullable = true)



In [0]:
numberrecords = df_aws.count() + df_azure.count()
print(f"The number of records is: {numberrecords}")

The number of records is: 568454


In [0]:
df_union_dataframes = df_aws.union(df_azure)
print(f"The number of records is: {df_union_dataframes.count()}")


The number of records is: 568454


In [0]:
df_union_dataframes = df_union_dataframes.withColumnRenamed("product/productId","productId")
df_union_dataframes = df_union_dataframes.withColumnRenamed("review/helpfulness","helpfulness")
df_union_dataframes = df_union_dataframes.withColumnRenamed("review/profileName","profileName")
df_union_dataframes = df_union_dataframes.withColumnRenamed("review/score","score")
df_union_dataframes = df_union_dataframes.withColumnRenamed("review/summary","summary")
df_union_dataframes = df_union_dataframes.withColumnRenamed("review/text","text")
df_union_dataframes = df_union_dataframes.withColumnRenamed("review/time","time")
df_union_dataframes = df_union_dataframes.withColumnRenamed("review/userId","userId")

print("Display Schemna")
df_union_dataframes.printSchema()


Display Schemna
root
 |-- productId: string (nullable = true)
 |-- helpfulness: string (nullable = true)
 |-- profileName: string (nullable = true)
 |-- score: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- text: string (nullable = true)
 |-- time: string (nullable = true)
 |-- userId: string (nullable = true)



In [0]:
drop_columns = ['summary','text']
df_union_dataframes = df_union_dataframes.drop(*drop_columns)
df_union_dataframes.show(5)

+----------+-----------+--------------------+-----+----------+--------------+
| productId|helpfulness|         profileName|score|      time|        userId|
+----------+-----------+--------------------+-----+----------+--------------+
|B001E4KFG0|        1/1|          delmartian|  5.0|1303862400|A3SGXH7AUHU8GW|
|B00813GRG4|        0/0|              dll pa|  1.0|1346976000|A1D87F6ZCVE5NK|
|B000LQOCH0|        1/1|Natalia Corres "N...|  4.0|1219017600| ABXLMWJIXXAIN|
|B000UA0QIQ|        3/3|                Karl|  2.0|1307923200|A395BORC6FGVXV|
|B006K2ZZ7K|        0/0|Michael D. Bigham...|  5.0|1350777600|A1UQRSCLF8GW1T|
+----------+-----------+--------------------+-----+----------+--------------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import from_unixtime,col

#Convert from String to integer
df_union_dataframes = df_union_dataframes.withColumn('time', col('time').cast('integer'))
df_union_dataframes.printSchema()

# Convert from unix_timestamp to datetime
df_union_dataframes = df_union_dataframes.withColumn('time', from_unixtime(col('time').cast('bigint')).cast('timestamp'))
df_union_dataframes.show(5)



root
 |-- productId: string (nullable = true)
 |-- helpfulness: string (nullable = true)
 |-- profileName: string (nullable = true)
 |-- score: string (nullable = true)
 |-- time: integer (nullable = true)
 |-- userId: string (nullable = true)

+----------+-----------+--------------------+-----+-------------------+--------------+
| productId|helpfulness|         profileName|score|               time|        userId|
+----------+-----------+--------------------+-----+-------------------+--------------+
|B001E4KFG0|        1/1|          delmartian|  5.0|2011-04-27 00:00:00|A3SGXH7AUHU8GW|
|B00813GRG4|        0/0|              dll pa|  1.0|2012-09-07 00:00:00|A1D87F6ZCVE5NK|
|B000LQOCH0|        1/1|Natalia Corres "N...|  4.0|2008-08-18 00:00:00| ABXLMWJIXXAIN|
|B000UA0QIQ|        3/3|                Karl|  2.0|2011-06-13 00:00:00|A395BORC6FGVXV|
|B006K2ZZ7K|        0/0|Michael D. Bigham...|  5.0|2012-10-21 00:00:00|A1UQRSCLF8GW1T|
+----------+-----------+--------------------+-----+--------

In [0]:

df_union_dataframes = df_union_dataframes.withColumn('score', col('score').cast('double'))
df_union_dataframes.printSchema()
df_union_dataframes.show(5)

root
 |-- productId: string (nullable = true)
 |-- helpfulness: string (nullable = true)
 |-- profileName: string (nullable = true)
 |-- score: double (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- userId: string (nullable = true)

+----------+-----------+--------------------+-----+-------------------+--------------+
| productId|helpfulness|         profileName|score|               time|        userId|
+----------+-----------+--------------------+-----+-------------------+--------------+
|B001E4KFG0|        1/1|          delmartian|  5.0|2011-04-27 00:00:00|A3SGXH7AUHU8GW|
|B00813GRG4|        0/0|              dll pa|  1.0|2012-09-07 00:00:00|A1D87F6ZCVE5NK|
|B000LQOCH0|        1/1|Natalia Corres "N...|  4.0|2008-08-18 00:00:00| ABXLMWJIXXAIN|
|B000UA0QIQ|        3/3|                Karl|  2.0|2011-06-13 00:00:00|A395BORC6FGVXV|
|B006K2ZZ7K|        0/0|Michael D. Bigham...|  5.0|2012-10-21 00:00:00|A1UQRSCLF8GW1T|
+----------+-----------+--------------------+-----+------

In [0]:
from pyspark.sql.functions import max, min, col

# Assuming your DataFrame is called 'df' and the time column is named 'time'
max_date = df_union_dataframes.agg(max(col('time')).alias('max_date')).collect()[0]['max_date']
min_date = df_union_dataframes.agg(min(col('time')).alias('min_date')).collect()[0]['min_date']

print("Minimum date:", min_date)
print("Maximum date:", max_date)


Minimum date: 1999-10-08 00:00:00
Maximum date: 2012-10-26 00:00:00


In [0]:
df_union_dataframes.createOrReplaceTempView("amazon_food")

query = "SELECT productId, count(*) as reviews FROM amazon_food GROUP BY productId ORDER BY reviews DESC LIMIT 1"
result_query = spark.sql(query)

result_query.show()



+----------+-------+
| productId|reviews|
+----------+-------+
|B007JFMH8M|    913|
+----------+-------+



In [0]:
query = "SELECT count(DISTINCT(productId)) as  number_items FROM amazon_food"
result_query = spark.sql(query)

result_query.show()




+------------+
|number_items|
+------------+
|       74258|
+------------+



In [0]:
query = "SELECT userId,count(*)  as number_reviews FROM amazon_food group by userId ORDER BY number_reviews DESC LIMIT 5"
result_query = spark.sql(query)

result_query.show()

+--------------+--------------+
|        userId|number_reviews|
+--------------+--------------+
|A3OXHLG6DIBRW8|           448|
|A1YUL9PCJR3JTY|           421|
| AY12DBB0U420B|           389|
|A281NPSIMI1C2R|           365|
|A1Z54EM24Y40LL|           256|
+--------------+--------------+



In [0]:
%sql
SELECT
    EXTRACT(MONTH FROM time) AS month,
    CASE
        WHEN EXTRACT(MONTH FROM time) = 1 THEN 'January'
        WHEN EXTRACT(MONTH FROM time) = 2 THEN 'February'
        WHEN EXTRACT(MONTH FROM time) = 3 THEN 'Mars'
        WHEN EXTRACT(MONTH FROM time) = 4 THEN 'April'
        WHEN EXTRACT(MONTH FROM time) = 5 THEN 'May'
        WHEN EXTRACT(MONTH FROM time) = 6 THEN 'June'
        WHEN EXTRACT(MONTH FROM time) = 7 THEN 'July'
        WHEN EXTRACT(MONTH FROM time) = 8 THEN 'August'
        WHEN EXTRACT(MONTH FROM time) = 9 THEN 'September'
        WHEN EXTRACT(MONTH FROM time) = 10 THEN 'October'
        WHEN EXTRACT(MONTH FROM time) = 11 THEN 'November'
        WHEN EXTRACT(MONTH FROM time) = 12 THEN 'December'
        -- Add similar WHEN clauses for other months
        ELSE 'Unknown' -- Handle unexpected values
    END AS month_name,
    COUNT(*) AS record_count
FROM
    amazon_food
GROUP BY
    month,
    month_name
ORDER BY
    month DESC;

month,month_name,record_count
12,December,41753
11,November,36986
10,October,55762
9,September,55740
8,August,50526
7,July,48419
6,June,44479
5,May,46226
4,April,44473
3,Mars,48367


Databricks visualization. Run in Databricks to view.