<a href="https://colab.research.google.com/github/MarinaEstefania/take_home_exercise/blob/main/Take_Home_Excercise.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%%capture
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.2.2/spark-3.2.2-bin-hadoop3.2.tgz
!tar xf spark-3.2.2-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.2-bin-hadoop3.2"

import findspark
findspark.init()
findspark.find()

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import datetime

spark = SparkSession \
       .builder \
       .appName("take_home_excercise") \
       .getOrCreate()

spark

In [None]:
#Load Raw Data (Read from 'colab files')
df = spark.read.format('json').load("sample_data/ga_sessions_20160801.json")
df.show(10)

+---------------+--------------------+--------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+-----------+--------------+
|channelGrouping|    customDimensions|    date|              device|      fullVisitorId|          geoNetwork|                hits|socialEngagementType|              totals|       trafficSource|   visitId|visitNumber|visitStartTime|
+---------------+--------------------+--------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+-----------+--------------+
|         Social|[{4, North America}]|20160801|{Chrome, not avai...|8159312408158297118|{San Jose, not av...|[{{shop.googlemer...|Not Socially Engaged|{null, 3, null, 3...|{null, {null, not...|1470083489|        151|    1470083489|
|         Social|[{4, North America}]|20160801|{Internet Explore...|7194

In [218]:
#Transform data in order to get VISITS dataframe 
visitsDf = df.select('fullVisitorId', 'visitID', 'visitNumber', 'visitStartTime','device','geoNetwork' )\
          .withColumn('visit_start_time', from_unixtime(col("visitStartTime"))).drop('visitStartTime')\
          .withColumn('browser', df.device.getItem('browser')).drop('device')\
          .withColumn('country', df.geoNetwork.getItem('country')).drop('geoNetwork')\
          .withColumn('visits_primary_key', concat(df.fullVisitorId, df.visitId))\
          .withColumnRenamed('fullVisitorId', 'full_visitor_id')\
          .withColumnRenamed('visitID', 'visit_id')\
          .withColumnRenamed('visitNumber', 'visit_number')
visitsDf.show()

+-------------------+----------+------------+-------------------+-----------------+--------------+--------------------+
|    full_visitor_id|  visit_id|visit_number|   visit_start_time|          browser|       country|  visits_primary_key|
+-------------------+----------+------------+-------------------+-----------------+--------------+--------------------+
|8159312408158297118|1470083489|         151|2016-08-01 20:31:29|           Chrome| United States|81593124081582971...|
|7194065619159478122|1470117657|           1|2016-08-02 06:00:57|Internet Explorer| United States|71940656191594781...|
|9236304747882138291|1470052694|           1|2016-08-01 11:58:14|           Chrome|   Philippines|92363047478821382...|
|5270776363703942229|1470084135|           1|2016-08-01 20:42:15|           Chrome| United States|52707763637039422...|
|1792676004815023069|1470061879|           1|2016-08-01 14:31:19|          Firefox|        Canada|17926760048150230...|
|7305625498291809599|1470090830|        

In [210]:
#Create a dataframe that is going to function as a helper for the hits dataframe
hitsHelperDf = df.select('fullVisitorId', 'visitID', 'hits', 'visitStartTime', 'visitNumber' )\
          .withColumn('visits_primary_key', concat(df.fullVisitorId, df.visitId))\
          .withColumn('hit_number', explode(df.hits.getItem('hitNumber')))\
          .withColumn('hit_time_s', df.hits.getItem('time').getItem((col('hit_number').cast('int')-1))*.001)

hitsHelperDf = hitsHelperDf.select('*').withColumn('full_time_s', hitsHelperDf.visitStartTime+hitsHelperDf.hit_time_s)
hitsHelperDf = hitsHelperDf.select('*').withColumn('visit_start_time', from_unixtime(col("full_time_s")))

hitsHelperDf.show()


+-------------------+----------+--------------------+--------------+-----------+--------------------+----------+----------+----------------+-------------------+
|      fullVisitorId|   visitID|                hits|visitStartTime|visitNumber|  visits_primary_key|hit_number|hit_time_s|     full_time_s|   visit_start_time|
+-------------------+----------+--------------------+--------------+-----------+--------------------+----------+----------+----------------+-------------------+
|8159312408158297118|1470083489|[{{shop.googlemer...|    1470083489|        151|81593124081582971...|         1|       0.0|   1.470083489E9|2016-08-01 20:31:29|
|8159312408158297118|1470083489|[{{shop.googlemer...|    1470083489|        151|81593124081582971...|         2|   289.206|1.470083778206E9|2016-08-01 20:36:18|
|8159312408158297118|1470083489|[{{shop.googlemer...|    1470083489|        151|81593124081582971...|         3|    528.73| 1.47008401773E9|2016-08-01 20:40:17|
|7194065619159478122|1470117657|[{

In [217]:
#Transform data in order to get HITS dataframe 
hitsDf = hitsHelperDf.select('visits_primary_key', 'hits', 'hit_number', 'visit_start_time')\
        .withColumn('hit_type', hitsHelperDf.hits.getItem('type').getItem((col('hit_number').cast('int')-1)))\
        .withColumn('hit_timestamp', hitsHelperDf.visit_start_time).drop('visit_start_time')\
        .withColumn('page_path', hitsHelperDf.hits.getItem('page').getItem('pagePath').getItem((col('hit_number').cast('int')-1)))\
        .withColumn('page_title', hitsHelperDf.hits.getItem('page').getItem('pageTitle').getItem((col('hit_number').cast('int')-1)))\
        .withColumn('hostname', hitsHelperDf.hits.getItem('page').getItem('hostname').getItem((col('hit_number').cast('int')-1)))\
        .drop('hits')

hitsDf.show()

+--------------------+----------+--------+-------------------+--------------------+--------------------+--------------------+
|  visits_primary_key|hit_number|hit_type|      hit_timestamp|           page_path|          page_title|            hostname|
+--------------------+----------+--------+-------------------+--------------------+--------------------+--------------------+
|81593124081582971...|         1|    PAGE|2016-08-01 20:31:29|               /home|                Home|shop.googlemercha...|
|81593124081582971...|         2|    PAGE|2016-08-01 20:36:18|/google+redesign/...|      Men's-T-Shirts|shop.googlemercha...|
|81593124081582971...|         3|    PAGE|2016-08-01 20:40:17|/google+redesign/...|              Office|shop.googlemercha...|
|71940656191594781...|         1|    PAGE|2016-08-02 06:00:57|               /home|                Home|shop.googlemercha...|
|71940656191594781...|         2|    PAGE|2016-08-02 06:01:27|/google+redesign/...|          Gift Cards|shop.googlemer

In [220]:
#Write VISITS and HITS .json format into 'colab files' 
visitsDf.write.mode('Overwrite').json("sample_data/visits.json")
hitsDf.write.mode('Overwrite').json("sample_data/hits.json")