In [1]:

# Import SparkConf class into program
from pyspark import SparkConf
from pyspark import SparkContext # Spark
from pyspark.sql import SparkSession # Spark SQL

from pyspark.sql import functions as F
from pyspark.sql.types import StringType, StructType,StructField

master = "local[*]"
app_name = "hipages"

# Setup configuration parameters for Spark
spark_conf = SparkConf().setMaster(master).setAppName(app_name)


# Using SparkSession
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')

### Read data and produce first csv file

In [4]:

# df = spark.read.json("source_event_data.json")
# df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()
# No null values as per above

# Specifying the schema explicitly
schema = StructType([
    StructField("user", StructType([
        StructField("id", StringType(), True),
        StructField("ip", StringType(), True),
        StructField("session_id", StringType(), True),
    ]), True),
    StructField("action",  StringType(), True),
    StructField("event_id", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("url", StringType(), True),
    ])

# Read the file
# if there are many files we can read thm all at once by specifying the file name as *.json - assume all the files related to event data
df =spark.read.format("json").option("header", "True").schema(schema).load("./source_event_data.json")


df.printSchema()


root
 |-- user: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- ip: string (nullable = true)
 |    |-- session_id: string (nullable = true)
 |-- action: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- url: string (nullable = true)



In [5]:

# get the user id
df=df.withColumn("user_id",df['user']["id"])

# clean the url with regex, remove http://www.,www.,https://www. and w3. patterns 
# then split the string with / then assign first 3 url lelvels accordingly
df = df.withColumn("cleaned_url",F.regexp_replace(df.url,"(http://www.|www.|https://www.|w3.)", ""))
df=df.withColumn("cleaned_url", F.split(F.col("cleaned_url"), "/"))
df =df.withColumn("url_level1", df['cleaned_url'][0])
df =df.withColumn("url_level2", df['cleaned_url'][1])
df =df.withColumn("url_level3", df['cleaned_url'][2])

# select the required columes and replace null values as NULL
df =df.select(F.col("user_id"),F.col("timestamp"),F.col("url_level1"),F.col("url_level2"),F.col("url_level3"),F.col("action")).na.fill(value='NULL')

#write to a csv file
df.write.format("csv").save("./table_1")
df.show()                

+-------+-------------------+--------------+--------------------+------------+--------------+
|user_id|          timestamp|    url_level1|          url_level2|  url_level3|        action|
+-------+-------------------+--------------+--------------------+------------+--------------+
|  56456|02/02/2017 20:22:00|hipages.com.au|            articles|        NULL|     page_view|
|  56456|02/02/2017 20:23:00|hipages.com.au|             connect| sfelectrics|     page_view|
|  56456|02/02/2017 20:26:00|hipages.com.au|get_quotes_simple...|        NULL|     page_view|
|  56456|01/03/2017 20:21:00|hipages.com.au|           advertise|        NULL|  button_click|
|  56456|02/02/2017 20:12:34|hipages.com.au|              photos|   bathrooms|     page_view|
|  56456|01/01/2017 20:22:00|hipages.com.au|                find|electricians|list_directory|
|  56456|02/02/2017 20:26:07|hipages.com.au|            articles|        NULL|     page_view|
|  56456|02/02/2017 20:22:00|hipages.com.au|             con

### Producing the second csv file

In [6]:
# DO the aggredation
# Once the first step is completed

df=  df.withColumn('timestamp', F.to_timestamp('timestamp', format='dd/MM/yyyy HH:mm:ss')) 
df=df.withColumn("time_bucket", F.date_format(F.col("timestamp").cast("timestamp"), "yyyyMMddHH"))
df=df.groupBy("time_bucket", "url_level1","url_level2","action").agg(F.count('action').alias('activity_count'),F.countDistinct('user_id').alias('user_count'))

# save to a csv file
df.write.format("csv").save("./table_2")

df.show(50)

+-----------+--------------------+--------------------+--------------+--------------+----------+
|time_bucket|          url_level1|          url_level2|        action|activity_count|user_count|
+-----------+--------------------+--------------------+--------------+--------------+----------+
| 2017020220|      hipages.com.au|              photos|     page_view|            30|        30|
| 2017020220|      hipages.com.au|            articles|     page_view|            59|        30|
| 2017020420|      hipages.com.au|get_quotes_simple...|     page_view|            28|        28|
| 2017020220|      hipages.com.au|                find|  button_click|             1|         1|
| 2017020220|      hipages.com.au|           advertise|  button_click|            29|        29|
| 2017010120|      hipages.com.au|get_quotes_simple...|     page_view|            30|        30|
| 2017050220|  randompages.com.au|                find|  button_click|             1|         1|
| 2017030120|      hipages.com

### Note
    1. Spark is preferred/used for the data transformation/etl process, it can operate in a cluster environment and efficient on performing jobs in parallel
    2. we can use scheduling tool like air flow to perform above operations in scheduled intervals
    3. Parquet file types are profred and it's a column oriented format and column pruning is a big performance improvement so we can perform the operations we need for the selected columns than reading the entire rows, further it has information about the column so we no need to perform operations to get those info (ex: null count etc..
    
    