In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

## Start a spark session

In [2]:
# if in a EC2 with s3 permission enable
# spark = (SparkSession.builder.master("local[*]")  # you can set how much thread by this: local[4] using 4 thread.
#         ).getOrCreate()

# if has not s3 permission, need to use access key

YOUR_AWS_ACCESS_KEY_ID = "..."
YOUR_AWS_SECRET_KEY = "..."
YOUR_AWS_SESSION_TOKEN = "..."

spark = (SparkSession.builder.master("local[*]")
         .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider')
         .config('spark.hadoop.fs.s3a.access.key', YOUR_AWS_ACCESS_KEY_ID)
         .config('spark.hadoop.fs.s3a.secret.key', YOUR_AWS_SECRET_KEY)
         .config('spark.hadoop.fs.s3a.session.token', YOUR_AWS_SESSION_TOKEN)
        ).getOrCreate()

21/09/22 03:27:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/09/22 03:27:13 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Read Data
Spark could directly read and write from Amazon s3.

In [3]:
json2_path = "s3a://shopback-staging-au-orca-data-backup/grouping/latest/spark-demo/json2_20210921_sample_10k.parquet"

json2_df = spark.read.parquet(json2_path)

21/09/22 03:27:19 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [4]:
# count data
json2_df.count()

                                                                                

9969

In [5]:
json2_df.show(1, vertical=True)

21/09/22 03:29:51 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 3:>                                                          (0 + 1) / 1]

-RECORD 0--------------------------------------------
 _internalTags                | [PriceUpdated]       
 _isSameAsMongoDoc            | true                 
 accessoryScore               | 0.0                  
 bookScore                    | 0.0                  
 brand                        | null                 
 brandId                      | null                 
 calculatedCashback           | null                 
 cashback                     | null                 
 categoriesAnnotation         | null                 
 categoriesRank1              | null                 
 categoriesRank1Score         | null                 
 categoryFinalConfidenceScore | null                 
 categoryLv1Id                | null                 
 categoryLv2Id                | null                 
 categoryLv3Id                | null                 
 condition                    | 1                    
 crawledTs                    | 1632182400           
 currency                   

                                                                                

In [6]:
# check data
json2_df.take(1)[0].asDict()

                                                                                

{'_internalTags': ['PriceUpdated'],
 '_isSameAsMongoDoc': True,
 'accessoryScore': 0.0,
 'bookScore': 0.0,
 'brand': None,
 'brandId': None,
 'calculatedCashback': None,
 'cashback': None,
 'categoriesAnnotation': None,
 'categoriesRank1': None,
 'categoriesRank1Score': None,
 'categoryFinalConfidenceScore': None,
 'categoryLv1Id': None,
 'categoryLv2Id': None,
 'categoryLv3Id': None,
 'condition': 1,
 'crawledTs': 1632182400,
 'currency': 'AUD',
 'dataSource': None,
 'displayGroupTitle': 'AIICIOO Stainless Steel Check Valve 1/2” NPT Female High Pressure 70 PSI Cracking Pressure One Way Check Valve PTFE Seal',
 'flashSaleEndTime': None,
 'flashSaleStartTime': None,
 'groupId': 'amazon:B07X9LL4ST',
 'groupTitle': 'AIICIOO Stainless Steel Check Valve 1/2” NPT Female High Pressure 70 PSI Cracking Pressure One Way Check Valve PTFE Seal',
 'imageUrl': 'https://m.media-amazon.com/images/I/41Seus0++rL._SL160_.jpg',
 'imgproxyHashOriginalSize': '/VEsEACJt1cQ0quLSK6d8LrmzjECFt3WzZWyNyYaona8/aHR

## Demo some usage
I want to know how many offers by merchant column.

Use groupBy on column `merchant` and count.

In [9]:
# groupBy merchant and count
stats_df = (
    json2_df
    .select("merchant")
    .groupBy("merchant")
    .count()
)

In [10]:
stats_df.show(20)

                                                                                

+--------------------+-----+
|            merchant|count|
+--------------------+-----+
|       kitchen_style|    2|
|         rebel_sport|   10|
|               iherb|   10|
|              wiggle|   10|
|          ozgameshop|   32|
|   appliances_online|    2|
|      shoe_warehouse|    4|
|      platypus_shoes|    1|
|        under_armour|    1|
|              amazon| 3568|
|       strawberrynet|    6|
|         david-jones|   23|
|               apple|    2|
|automotive_supers...|   47|
|            anaconda|    2|
|                myer|   18|
|            petstock|    1|
|            bing-lee|    1|
|            boozebud|    2|
|          boohoo_man|   20|
+--------------------+-----+
only showing top 20 rows



In [11]:
stats_df.sort("count", ascending=False).show(20)



+--------------------+-----+
|            merchant|count|
+--------------------+-----+
|              amazon| 3568|
|                ebay| 2142|
|               kogan| 1106|
|          dick_smith| 1095|
|               catch|  629|
|     angus_robertson|  548|
|          matt_blatt|  166|
|              zookal|   87|
|           booktopia|   71|
|  temple_and_webster|   48|
|automotive_supers...|   47|
|              sanity|   43|
|              boohoo|   36|
|           qbd_books|   35|
|          the-iconic|   35|
|                asos|   34|
|        net_a_porter|   33|
|          ozgameshop|   32|
|         david-jones|   23|
|          boohoo_man|   20|
+--------------------+-----+
only showing top 20 rows



                                                                                