# Spark
Please upload provided files: sessions.json, camapigns.json, and orders.json

Initialize spark enviroment:

In [44]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.4.2/spark-3.4.2-bin-hadoop3.tgz
!tar xf spark-3.4.2-bin-hadoop3.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.2-bin-hadoop3"

import findspark
findspark.init()

^C


# Initialize data frames (1 point)

Please start Spark session and load three datasets (and set aliases to sessions, orders, campaigns).

In [45]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local[*]") \
.appName("App Name") \
.getOrCreate()

In [118]:
sessions_df = spark.read.json("sessions.json")
campaigns_df = spark.read.json("campaigns.json")
orders_df = spark.read.json("orders.json")

In [111]:
sessions_df.head()
campaigns_df.head()
orders_df.head()

Row(number_of_items=4, session_id='7f93a680-0634-4e35-a3a7-08a29dcbe16c', transaction_value=99.28)

In [112]:
sessions_df.alias('sessions')
sessions_df.show(4)
campaigns_df.show(4)
orders_df.show(4)

+-------+-------+--------------------+-------------------+
|browser|     os|          session_id|      session_start|
+-------+-------+--------------------+-------------------+
|Firefox| Mac OS|99c24019-0d95-472...|2018-04-20 00:00:05|
|Firefox| Mac OS|3155cff6-4b22-4eb...|2018-04-20 00:00:25|
| Chrome|Windows|7f93a680-0634-4e3...|2018-04-20 00:00:27|
|Firefox|Windows|fedaf382-25e3-4dc...|2018-04-20 00:00:29|
+-------+-------+--------------------+-------------------+
only showing top 4 rows

+--------+--------------------+
|campaign|          session_id|
+--------+--------------------+
|       B|99c24019-0d95-472...|
|       C|3155cff6-4b22-4eb...|
|       B|fedaf382-25e3-4dc...|
|       B|5eeef4e0-fc6c-4ed...|
+--------+--------------------+
only showing top 4 rows

+---------------+--------------------+-----------------+
|number_of_items|          session_id|transaction_value|
+---------------+--------------------+-----------------+
|              4|7f93a680-0634-4e3...|            9

In [113]:
campaigns_df.where(campaigns_df.campaign == 'C').show()

+--------+--------------------+
|campaign|          session_id|
+--------+--------------------+
|       C|3155cff6-4b22-4eb...|
|       C|d3d27684-b7af-4fd...|
|       C|fcc5a689-8e78-484...|
|       C|f0f79d22-f045-43d...|
|       C|e3d7becb-b6cd-422...|
|       C|724a9dbb-8b55-43c...|
|       C|1077ec9e-99b1-481...|
|       C|229eb9a0-ec6b-4e9...|
|       C|35fd7bd2-e957-44d...|
|       C|a2b1e8df-0aa9-478...|
|       C|d8096738-13c7-490...|
|       C|f793e88e-1c67-45e...|
|       C|bd8e97d9-8767-4f0...|
|       C|7ffad26d-1879-499...|
|       C|07bc33c7-bd36-49a...|
|       C|9cbd29da-3850-438...|
|       C|9d35651a-b850-471...|
|       C|60a44e5d-35ce-48a...|
|       C|aa1014d2-37c0-4fc...|
|       C|a6086c80-de13-418...|
+--------+--------------------+
only showing top 20 rows



# Exploratory Analysis (4 points)

Print schema for each dataframe (1 point)

In [114]:
sessions_df.printSchema()
campaigns_df.printSchema()
orders_df.printSchema()

root
 |-- browser: string (nullable = true)
 |-- os: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- session_start: string (nullable = true)

root
 |-- campaign: string (nullable = true)
 |-- session_id: string (nullable = true)

root
 |-- number_of_items: long (nullable = true)
 |-- session_id: string (nullable = true)
 |-- transaction_value: double (nullable = true)



Show count of sessions by browser, by OS, and by both browser and OS (1 point)

In [115]:
sessions_df.groupBy(['browser']).count().show()
sessions_df.groupBy(['os']).count().show()
sessions_df.groupBy(['browser','os']).count().show()

+-------+-----+
|browser|count|
+-------+-----+
|Firefox| 6381|
| Safari|  883|
| Chrome| 9999|
|   Edge| 2737|
+-------+-----+

+-------+-----+
|     os|count|
+-------+-----+
| Mac OS| 5367|
|Windows|14633|
+-------+-----+

+-------+-------+-----+
|browser|     os|count|
+-------+-------+-----+
| Chrome| Mac OS| 2687|
| Chrome|Windows| 7312|
| Safari| Mac OS|  883|
|Firefox| Mac OS| 1797|
|Firefox|Windows| 4584|
|   Edge|Windows| 2737|
+-------+-------+-----+



Show count of sessions by day and hour of the day, e.g., 2018-04-20 00:00:05 should become 2018-04-20 00

(hint create new column hour of the day based on session_start, ad this column to sessions dataframe as it will be needed later) (2 points)

In [120]:
#Show count of sessions by day and hour of the day
sessions_df.show()
from pyspark.sql import functions as F
sessions_df = sessions_df.withColumn("session_hour", F.split(sessions_df["session_start"], ":", 0).getItem(0))

sessions_df.groupBy(['session_hour']).count().show()

+-------+-------+--------------------+-------------------+-------------+
|browser|     os|          session_id|      session_start| session_hour|
+-------+-------+--------------------+-------------------+-------------+
|Firefox| Mac OS|99c24019-0d95-472...|2018-04-20 00:00:05|2018-04-20 00|
|Firefox| Mac OS|3155cff6-4b22-4eb...|2018-04-20 00:00:25|2018-04-20 00|
| Chrome|Windows|7f93a680-0634-4e3...|2018-04-20 00:00:27|2018-04-20 00|
|Firefox|Windows|fedaf382-25e3-4dc...|2018-04-20 00:00:29|2018-04-20 00|
|Firefox|Windows|5eeef4e0-fc6c-4ed...|2018-04-20 00:00:49|2018-04-20 00|
| Chrome|Windows|da4cdcd6-4393-4fe...|2018-04-20 00:00:50|2018-04-20 00|
|Firefox|Windows|067cb75e-0051-4cb...|2018-04-20 00:01:11|2018-04-20 00|
|Firefox|Windows|158d5493-34e9-441...|2018-04-20 00:01:13|2018-04-20 00|
|Firefox| Mac OS|c025616a-f5d6-4a8...|2018-04-20 00:01:18|2018-04-20 00|
| Chrome|Windows|f1dc344d-aacd-4c6...|2018-04-20 00:01:39|2018-04-20 00|
|Firefox| Mac OS|63ef9d7e-1334-4cd...|2018-04-20 00

Show the same count as in previous step, but sorted by ascending hour_of_day, please show all 24 hours (1 point)

In [121]:
sessions_df.groupBy(['session_hour']).count().sort("session_hour").show()

+-------------+-----+
| session_hour|count|
+-------------+-----+
|2018-04-20 00|  316|
|2018-04-20 01|  234|
|2018-04-20 02|  142|
|2018-04-20 03|  122|
|2018-04-20 04|  163|
|2018-04-20 05|  387|
|2018-04-20 06|  759|
|2018-04-20 07| 1291|
|2018-04-20 08| 1600|
|2018-04-20 09| 1650|
|2018-04-20 10| 1373|
|2018-04-20 11| 1066|
|2018-04-20 12|  852|
|2018-04-20 13|  801|
|2018-04-20 14|  881|
|2018-04-20 15|  966|
|2018-04-20 16| 1107|
|2018-04-20 17| 1122|
|2018-04-20 18| 1147|
|2018-04-20 19| 1167|
+-------------+-----+
only showing top 20 rows



# Preparing aggregated report (5 points)
Join sessions with orders (creating new dataframe that will be used in later steps) and add is_transaction column (1 if transaction_value is present, 0 otherwise)

To validate step display first 20 rows(2 points)

In [161]:
orders_df.show(4)
sessions_df.show(4)
sessions_df = sessions_df.withColumnRenamed("session_id", "sessions_session_id")
orders_sessions_df = orders_df.join(sessions_df,orders_df.session_id == sessions_df.sessions_session_id,"inner")
orders_sessions_df = orders_sessions_df.drop('session_id')
orders_sessions_df.show()

+---------------+--------------------+-----------------+
|number_of_items|          session_id|transaction_value|
+---------------+--------------------+-----------------+
|              4|7f93a680-0634-4e3...|            99.28|
|              1|6100a44a-3d1f-4ca...|            20.82|
|              2|6100a44a-3d1f-4ca...|            34.09|
|              1|724a9dbb-8b55-43c...|            24.66|
+---------------+--------------------+-----------------+
only showing top 4 rows

+-------+-------+--------------------+-------------------+-------------+
|browser|     os| sessions_session_id|      session_start| session_hour|
+-------+-------+--------------------+-------------------+-------------+
|Firefox| Mac OS|99c24019-0d95-472...|2018-04-20 00:00:05|2018-04-20 00|
|Firefox| Mac OS|3155cff6-4b22-4eb...|2018-04-20 00:00:25|2018-04-20 00|
| Chrome|Windows|7f93a680-0634-4e3...|2018-04-20 00:00:27|2018-04-20 00|
|Firefox|Windows|fedaf382-25e3-4dc...|2018-04-20 00:00:29|2018-04-20 00|
+-------

Join the dataframe created in previous step with campaigns, and create a dataframe that will have count of sessions, total transaction value, total number of sold items, and number of transactions per each day and hour, campaign, browser, and os

To validate step display first 20 rows(3 points)

In [203]:
from pyspark.sql.functions import sum,avg,max,count

#Joining the Campaign dataFrame to the previous dataFrame
all_df = campaigns_df.join(
    orders_sessions_df, campaigns_df.session_id == orders_sessions_df.sessions_session_id, "inner")
all_df.show(5)

#Grouping ['campaign', 'session_hour', 'browser', 'os'] and sum the value and count the sessions for each
all_df.groupBy(
    ['campaign', 'session_hour', 'browser', 'os']
    ).agg(sum("transaction_value").alias("sum_transaction_value")
    ,count("sessions_session_id").alias("session_count")
    ).sort('session_hour', 'campaign'
    ).show(10)

+--------+--------------------+---------------+-----------------+-------+-------+--------------------+-------------------+-------------+
|campaign|          session_id|number_of_items|transaction_value|browser|     os| sessions_session_id|      session_start| session_hour|
+--------+--------------------+---------------+-----------------+-------+-------+--------------------+-------------------+-------------+
|       A|c828de15-3a31-496...|              1|            31.75| Chrome|Windows|c828de15-3a31-496...|2018-04-20 00:03:22|2018-04-20 00|
|       A|c828de15-3a31-496...|              1|            31.75| Chrome|Windows|c828de15-3a31-496...|2018-04-20 00:03:22|2018-04-20 00|
|       A|6100a44a-3d1f-4ca...|              2|            34.09| Chrome|Windows|6100a44a-3d1f-4ca...|2018-04-20 00:05:52|2018-04-20 00|
|       A|6100a44a-3d1f-4ca...|              1|            20.82| Chrome|Windows|6100a44a-3d1f-4ca...|2018-04-20 00:05:52|2018-04-20 00|
|       C|724a9dbb-8b55-43c...|          