In [1]:
# This will work only on Spark Docker cotainer and won't work on HDInsight-Spark Cluster

import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark import SparkConf

#use ".master("spark://spark-master:7077") if you want to run this job on the cluster
spark = SparkSession \
    .builder \
    .master("local") \
    .appName("spark-avro-json-sample") \
    .config('spark.hadoop.avro.mapred.ignore.inputs.without.extension', 'true') \
    .getOrCreate()

# Load avro files into DataFrame
# In production the file path should be either hdfs url or azure storage url
avroDf = spark.read.format("avro").load("/home/jovyan/examples/product-data-capture/*/*/*/*/*/*") 

# DataFrame -> Json
jsonRdd = avroDf.select(avroDf.Body.cast("string")).rdd.map(lambda x: x[0])
data = spark.read.json(jsonRdd)
data.show()
groupedData = data.groupby("shortSku").agg(f.count(data["shortSku"])).alias("sku count")
    

+--------------------+--------+-----------+
|          receivedAt|shortSku|     userId|
+--------------------+--------+-----------+
|2019-04-24T16:25:...| J132983|11193438098|
|2019-04-24T18:25:...| A132323|11193438098|
|2019-04-24T18:25:...| J132323|11193438098|
|2019-04-24T16:35:...|   K3827|    3938783|
|2019-04-19T16:35:...|   K3928|    3938783|
|2019-04-24T16:35:...|   K3827|    3938783|
|2019-04-24T16:35:...|  K43632|     111098|
|2019-04-24T16:35:...|   K3827|    3938783|
|2019-04-21T16:35:...|   Z3892|    3938783|
|2019-04-24T16:35:...|  q43876|     111098|
|2019-04-24T16:35:...|   K3827|    3938783|
|2019-04-21T16:35:...|   Z3892|    3938783|
|2019-04-24T16:35:...|   K3827|     111098|
|2019-04-24T16:35:...|  K43632|     111098|
+--------------------+--------+-----------+



In [2]:
# print Json Schema
data.printSchema()
groupedData.printSchema()

root
 |-- receivedAt: string (nullable = true)
 |-- shortSku: string (nullable = true)
 |-- userId: string (nullable = true)

root
 |-- shortSku: string (nullable = true)
 |-- count(shortSku): long (nullable = false)



In [3]:
from pyspark.sql.types import StructType, StructField, TimestampType, StringType

# DataFrame -> Json with a defined schema
schema = StructType([StructField("shortSku", StringType(), True), StructField("userId", StringType(), True), StructField("receivedAt", TimestampType(), True)])
data = spark.read.json(jsonRdd, schema = schema, timestampFormat="yyyy-MM-dd HH:mm:ss")
data.printSchema()
data.show()

root
 |-- shortSku: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- receivedAt: timestamp (nullable = true)

+--------+-----------+--------------------+
|shortSku|     userId|          receivedAt|
+--------+-----------+--------------------+
| J132983|11193438098|2019-04-24 16:25:...|
| A132323|11193438098|2019-04-24 18:25:...|
| J132323|11193438098|2019-04-24 18:25:...|
|   K3827|    3938783|2019-04-24 16:35:...|
|   K3928|    3938783|2019-04-19 16:35:...|
|   K3827|    3938783|2019-04-24 16:35:...|
|  K43632|     111098|2019-04-24 16:35:...|
|   K3827|    3938783|2019-04-24 16:35:...|
|   Z3892|    3938783|2019-04-21 16:35:...|
|  q43876|     111098|2019-04-24 16:35:...|
|   K3827|    3938783|2019-04-24 16:35:...|
|   Z3892|    3938783|2019-04-21 16:35:...|
|   K3827|     111098|2019-04-24 16:35:...|
|  K43632|     111098|2019-04-24 16:35:...|
+--------+-----------+--------------------+



In [4]:
# Pandas example
import pandas as pd
from pandas import DataFrame
import dateutil.parser as parser

# DataFrame => Pandas
pd = data.toPandas()
pd = pd.groupby(by=['shortSku', 'receivedAt']).count()
print(pd)

                                  userId
shortSku receivedAt                     
A132323  2019-04-24 18:25:43.511       1
J132323  2019-04-24 18:25:43.511       1
J132983  2019-04-24 16:25:43.511       1
K3827    2019-04-24 16:35:43.511       5
K3928    2019-04-19 16:35:43.511       1
K43632   2019-04-24 16:35:43.511       2
Z3892    2019-04-21 16:35:43.511       2
q43876   2019-04-24 16:35:43.511       1


In [5]:
print(pd.dtypes)

userId    int64
dtype: object
