# Project 3: Understanding User Behavior
 

### Group project by Da Qi Ren, Frank Staszak and Cynthia Zhu 
### December 2021


## PART 1: Introduction and Platform Setup.


### 1.1 Project Goal: Understanding User Behavior

We are data scientists at a game development company. The latest mobile game has two events you're interested in tracking: buy a sword & join guild. Each has metadata characterstic of such events (i.e., sword type, guild name, etc)

Functions
- Instrument API server to log events to Kafka

- Assembled a data pipeline to catch these events: use Spark streaming to filter select event types from Kafka, land them into HDFS/parquet to make them available for analysis using Presto.

- Used Apache Bench to generate test data stream for pipeline.

- Produced an analytics report describing of the pipeline and some basic analysis of the events are provided. Explaining the pipeline is key for this project!

Events in this pipeline are generated events which make them hard to connect to actual business decisions. However, this project is to demonstrate an ability to plumb this pipeline end-to-end, which includes initially generating test data as well as submitting a notebook-based report of at least simple event analytics. That said the analytics will only be a small part of the notebook. 


### 1.2 Platform Setup

Environment:   

| Name         | Command | State       | Ports |
|---------------|-------|-------------|-------|
| project-3-cynyzhu_cloudera_1          | /usr/bin/docker-entrypoint ...    | Up     | 10000/tcp, 50070/tcp, 8020/tcp,   |
| project-3-cynyzhu_kafka_1        | /etc/confluent/docker/run    | Up       | 29092/tcp, 9092/tcp      |
| project-3-cynyzhu_mids_1      |  /bin/bash     | Up       | 8888/tcp   |
| project-3-cynyzhu_presto_1   | /usr/bin/docker-entrypoint ...    | Up      | 8080/tcp    |
| project-3-cynyzhu_spark_1   | docker-entrypoint.sh bash    | Up      | 0.0.0.0:7000->7000/tcp,:::7000->7000/tcp    |
| project-3-cynyzhu_zookeeper_1   | /etc/confluent/docker/run    | Up      | 2181/tcp, 2888/tcp, 32181/tcp, 3888/tcp     |


### 1.3. Docker Commands

##### Starting up the cluster
```
docker-compose up -d
docker-compose ps
docker ps -a
```

##### Creating a topic

Function:Kafka for event streaming

Create topic called "events"

```
docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --bootstrap-server kafka:29092
```

##### Flask 

Function: Run the flask server in one CLI window (create.sh)

```
docker-compose exec mids env FLASK_APP=/w205/project-3-CynYZhu/game_api.py flask run --host 0.0.0.0
```

##### Apache Bench 

Generate data with Apache Bench in a different CLI window (event.sh)
The scinario includes 5 players having random numbers of purchase events and states.


| Users         | 
|---------------|
| user1.comcast.com           | 
| user2.att.com        | 
| user3.google.com       | 
| user4.isvc.com    |
| user5.berkeley.com   | 


14 events for each of players in the game:

default, purchase_a_lightsaber_sword, return_a_lightsaber_sword, purchase_a_buster_sword, return_a_buster_sword,purchase_a_diamond_sword, return_a_diamond_sword, purchase_a_helmet，join_guild, purchase_a_spear, start_a_war, return_a_helmet，join_guild, return_a_spear

A sample of Apache Bench:

```
docker-compose exec mids ab -n 5 -H "Host: user5.berkeley.com" http://localhost:5000/purchase_a_lightsaber_sword
docker-compose exec mids ab -n 2 -H "Host: user5.berkeley.com" http://localhost:5000/purchase_a_buster_sword
docker-compose exec mids ab -n 3 -H "Host: user5.berkeley.com" http://localhost:5000/purchase_a_diamond_sword
docker-compose exec mids ab -n 1 -H "Host: user5.berkeley.com" http://localhost:5000/purchase_a_helmet
```

##### Shutting down the cluster (after completing all tasks)  
```
docker-compose down
docker-compose ps
docker ps -a 
```
 

### 1.4 using scripts for above operations: 

Shell script: Spin up kafka and flask (create.sh)
```
docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --bootstrap-server kafka:29092
docker-compose exec mids env FLASK_APP=/w205/project-3-CynYZhu/game_api.py flask run --host 0.0.0.0
```

Shell script: run Apache Bench (events.sh)
```
docker-compose exec mids ab -n 3 -H "Host: user5.berkeley.com" http://localhost:5000/purchase_a_diamond_sword
```

Shell script: Run Apache Spark in Jupyter Notebook (jupyter.sh)

```
docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 7000 --ip 0.0.0.0 --allow-root'  pyspark
```

 
### 1.5 A summary of the pipeline platform 

A cluster of containers for kafka, zookeeper, spark, mids, redis, presto, and cloudera images are spined up on the mids container:

- Flask works as game API server, accepting and processing game requests, handling for purchasing products or joining a guild and others.
- The game API server sends the event data to Kafka server.
- The spark scripts that initiates a read/write stream that reads the Kafka logs and writes them out to parquet files in HDFS. 
- A spark script works for each event type: purchase and return events, war events, and other events. 
- Spark scripts are made for defining the schema, filtering by event type, and writing the a specified HDFS directory. 
- A Hive was run to declare the schema of the datatables and access parquet files to make data queryable from Presto.
- Queries are performed from Presto that return the results.
- The entire pipeline was tested by using Apache Bench in spark streaming jobs.

 

### 1.6 Implementation of shopping cart

The Shopping Cart uses as a source the event data from Kafka/Spark, specifically the product, price, user, price_tot columns are used to create an in-memory SQLLite table that serves as the table that represents the data in the user's current shopping cart. The SQLite database is used to implement the shopping cart, it is a built-in feature of Python, which has all the functions required for the back end of a personal database or even a data-driven website. The code is written in a Jupyter notebook, but it should also run well as a standalone Python program.

- Implementation 1

Stage 1, preprocessing: In this stage we extract the data from the event in Spark to an Array, then transform the extracted data to the proper format for insertion into SQLLite.
Stage 2, building the cart: In this stage we perform DDL/DML operations to build and modify the contents of the shopping cart, where a user may add or remove items from their shopping cart.

- Implementation 2

Use Pandas and SQLite together to store data frames permanently in the table and read them directly into new data frames as needed. You can use simple SQL commands to select and filter data. Make purchases, refunds, and account processing.  

The functions include creating the CART table which supports the adding products to shopping cart, Using the preprocessed event data, insert the event data from Spark into the SQLLite CART table. We then select the event data from the SQLLite CART table. The event data includes purchasing items and return items, counting income, etc. And lastly, we can delete the inserted event from the CART table. 

## PART 2:  Flask/Kafka/Spark Streaming 

In this section, I use Spark streaming to filter select event types from Kafka and land them into HDFS/parquet to make them available for analysis using Presto. 

In [1]:
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
import pandas as pd

#### 2.1 Read in raw events

This section uses Spark to read in raw events from Kafka. 

In [2]:
# load raw events
raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

In [3]:
# view raw events
raw_events.show(5)

+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 65 76 65 6...|events|        0|     0|2021-12-13 01:00:...|            0|
|null|[7B 22 65 76 65 6...|events|        0|     1|2021-12-13 01:00:...|            0|
|null|[7B 22 65 76 65 6...|events|        0|     2|2021-12-13 01:00:...|            0|
|null|[7B 22 65 76 65 6...|events|        0|     3|2021-12-13 01:00:...|            0|
|null|[7B 22 65 76 65 6...|events|        0|     4|2021-12-13 01:00:...|            0|
+----+--------------------+------+---------+------+--------------------+-------------+
only showing top 5 rows



#### 2.2 Filter to select "purchase_sword" events

In this section, we filter the Kafka events such that I remove all but the purchase_sword events. 

In [4]:
 @udf('boolean')
def is_purchase_sword(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [5]:
# Get all the purchase_sword events.
purchase_sword_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_purchase_sword('raw'))

In [6]:
# Show the puchase_sword_events schema.
purchase_sword_events.printSchema()

root
 |-- raw: string (nullable = true)
 |-- timestamp: string (nullable = true)



#### 2.3 Filter to select "purchase_sword" events

Show all of the puchase_sword events.

In [7]:
# purchase_sword_events.show()
from pyspark.sql.functions import from_json, col

json_schema = spark.read.json(purchase_sword_events.rdd.map(lambda row: row.raw)).schema

df0 = purchase_sword_events.withColumn('json', from_json(col('raw'), json_schema))

# Select and view the specific JSON fields we're interested in. 
df1 = df0.select('json.event_type', 'json.description', 'json.price')

df1.show(5, truncate=False)

+--------------+----------------+-----+
|event_type    |description     |price|
+--------------+----------------+-----+
|purchase_sword|lightsaber_sword|150.0|
|purchase_sword|buster_sword    |200.0|
|purchase_sword|buster_sword    |200.0|
|purchase_sword|diamond_sword   |300.0|
|purchase_sword|diamond_sword   |300.0|
+--------------+----------------+-----+
only showing top 5 rows



In [8]:
extracted_purchase_sword_events = purchase_sword_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()

In [9]:
extracted_purchase_sword_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- description: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- price: double (nullable = true)
 |-- timestamp: string (nullable = true)



In [10]:
extracted_purchase_sword_events.show(5)

+------+-----------------+---------------+----------------+--------------+-----+--------------------+
|Accept|             Host|     User-Agent|     description|    event_type|price|           timestamp|
+------+-----------------+---------------+----------------+--------------+-----+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|lightsaber_sword|purchase_sword|150.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|    buster_sword|purchase_sword|200.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|    buster_sword|purchase_sword|200.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|   diamond_sword|purchase_sword|300.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|   diamond_sword|purchase_sword|300.0|2021-12-13 01:00:...|
+------+-----------------+---------------+----------------+--------------+-----+--------------------+
only showing top 5 rows



#### 2.4 Save purchase events

Save the purchase_sword events in a folder based on the purchase type, in this case, swords. 

In [11]:
# Load all purchase events. 
# Write the swords data to swords.parquet.
extracted_purchase_sword_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/purchase/swords.parquet')

In [12]:
 
sword_purchases = spark.read.parquet('/tmp/purchase/swords.parquet')

In [13]:
 
sword_purchases.show(5)

+------+-----------------+---------------+----------------+--------------+-----+--------------------+
|Accept|             Host|     User-Agent|     description|    event_type|price|           timestamp|
+------+-----------------+---------------+----------------+--------------+-----+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|lightsaber_sword|purchase_sword|150.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|    buster_sword|purchase_sword|200.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|    buster_sword|purchase_sword|200.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|   diamond_sword|purchase_sword|300.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|   diamond_sword|purchase_sword|300.0|2021-12-13 01:00:...|
+------+-----------------+---------------+----------------+--------------+-----+--------------------+
only showing top 5 rows



## PART 3: Event Analysis

Here we perform analysis on user events, viewing game activities such as starting wars and making purchases.

In [14]:
sword_purchases.registerTempTable('purchases')

In [15]:
purchases_by_example1 = spark.sql("select * from purchases where Host = 'user1.att.com'")

In [16]:
purchases_by_example2 = spark.sql("select * from purchases  ")

In [17]:
purchases_by_example2.show(5)

+------+-----------------+---------------+----------------+--------------+-----+--------------------+
|Accept|             Host|     User-Agent|     description|    event_type|price|           timestamp|
+------+-----------------+---------------+----------------+--------------+-----+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|lightsaber_sword|purchase_sword|150.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|    buster_sword|purchase_sword|200.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|    buster_sword|purchase_sword|200.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|   diamond_sword|purchase_sword|300.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|   diamond_sword|purchase_sword|300.0|2021-12-13 01:00:...|
+------+-----------------+---------------+----------------+--------------+-----+--------------------+
only showing top 5 rows



In [18]:
df_purchase = purchases_by_example2.toPandas()

In [19]:
# view pandas dataframe
df_purchase.head(5)

Unnamed: 0,Accept,Host,User-Agent,description,event_type,price,timestamp
0,*/*,user1.comcast.com,ApacheBench/2.3,lightsaber_sword,purchase_sword,150.0,2021-12-13 01:00:16.093
1,*/*,user1.comcast.com,ApacheBench/2.3,buster_sword,purchase_sword,200.0,2021-12-13 01:00:17.129
2,*/*,user1.comcast.com,ApacheBench/2.3,buster_sword,purchase_sword,200.0,2021-12-13 01:00:17.139
3,*/*,user1.comcast.com,ApacheBench/2.3,diamond_sword,purchase_sword,300.0,2021-12-13 01:00:17.961
4,*/*,user1.comcast.com,ApacheBench/2.3,diamond_sword,purchase_sword,300.0,2021-12-13 01:00:17.975


In [20]:
@udf('boolean')
def is_war(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'].endswith('war'):
        return True
    return False

@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'].startswith('purchase'):
        return True
    return False

@udf('boolean')
def is_return(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'].startswith('return'):
        return True
    return False

In [21]:
# can be used for any events: 
# war events refer to who fights with whom
# war_events processes: 

#1 view events 
war_events = raw_events.\
             select(raw_events.value.cast('string').alias('raw'), 
                    raw_events.timestamp.cast('string')). \
             filter(is_war('raw'))
war_events.show()

#2 printschema
extracted_war_events = war_events.rdd. \
                       map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))). \
                       toDF()
extracted_war_events.printSchema()

#3 write to parquet and show 
extracted_war_events.write.mode('overwrite').parquet('/tmp/war')
war = spark.read.parquet('/tmp/war')
war.show(5)

+--------------------+--------------------+
|                 raw|           timestamp|
+--------------------+--------------------+
|{"Host": "user1.c...|2021-12-13 01:00:...|
|{"Host": "user1.c...|2021-12-13 01:00:...|
|{"Host": "user1.c...|2021-12-13 01:00:...|
|{"Host": "user1.c...|2021-12-13 01:00:...|
|{"Host": "user1.c...|2021-12-13 01:00:...|
|{"Host": "user2.a...|2021-12-13 01:00:...|
|{"Host": "user2.a...|2021-12-13 01:00:...|
|{"Host": "user2.a...|2021-12-13 01:00:...|
|{"Host": "user2.a...|2021-12-13 01:00:...|
|{"Host": "user2.a...|2021-12-13 01:00:...|
|{"Host": "user2.a...|2021-12-13 01:00:...|
|{"Host": "user3.g...|2021-12-13 01:00:...|
|{"Host": "user3.g...|2021-12-13 01:00:...|
|{"Host": "user3.g...|2021-12-13 01:00:...|
|{"Host": "user3.g...|2021-12-13 01:00:...|
|{"Host": "user4.i...|2021-12-13 01:00:...|
|{"Host": "user4.i...|2021-12-13 01:00:...|
|{"Host": "user4.i...|2021-12-13 01:00:...|
|{"Host": "user5.b...|2021-12-13 01:00:...|
|{"Host": "user5.b...|2021-12-13

In [22]:
# purchase events processes: (1) view events; (2) printschema; (2) write to parquet and show. : 

purchase_events = raw_events.\
                  select(raw_events.value.cast('string').alias('raw'), 
                         raw_events.timestamp.cast('string')).\
                  filter(is_purchase('raw'))
purchase_events.show()

extracted_purchase_events = war_events.\
                            rdd.map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))).\
                            toDF()
extracted_purchase_events.printSchema()

# Read and show  purchase events.
extracted_purchase_events.write.mode('overwrite').parquet('/tmp/purchase/')
purchase = spark.read.parquet('/tmp/purchase/*')
purchase.show(5) 

+--------------------+--------------------+
|                 raw|           timestamp|
+--------------------+--------------------+
|{"event_type": "p...|2021-12-13 01:00:...|
|{"event_type": "p...|2021-12-13 01:00:...|
|{"event_type": "p...|2021-12-13 01:00:...|
|{"event_type": "p...|2021-12-13 01:00:...|
|{"event_type": "p...|2021-12-13 01:00:...|
|{"event_type": "p...|2021-12-13 01:00:...|
|{"event_type": "p...|2021-12-13 01:00:...|
|{"event_type": "p...|2021-12-13 01:00:...|
|{"event_type": "p...|2021-12-13 01:00:...|
|{"event_type": "p...|2021-12-13 01:00:...|
|{"event_type": "p...|2021-12-13 01:00:...|
|{"event_type": "p...|2021-12-13 01:00:...|
|{"event_type": "p...|2021-12-13 01:00:...|
|{"event_type": "p...|2021-12-13 01:00:...|
|{"event_type": "p...|2021-12-13 01:00:...|
|{"event_type": "p...|2021-12-13 01:00:...|
|{"event_type": "p...|2021-12-13 01:00:...|
|{"event_type": "p...|2021-12-13 01:00:...|
|{"event_type": "p...|2021-12-13 01:00:...|
|{"event_type": "p...|2021-12-13

In [23]:
return_events = raw_events.\
                  select(raw_events.value.cast('string').alias('raw'), 
                         raw_events.timestamp.cast('string')).\
                  filter(is_return('raw'))
return_events.show()


extracted_return_events = return_events.rdd.map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))).toDF()
extracted_return_events.printSchema()

# Read and show  purchase events.
extracted_return_events.write.mode('overwrite').parquet('/tmp/return/')
return_evt = spark.read.parquet('/tmp/return/')
return_evt.show(5) 

+--------------------+--------------------+
|                 raw|           timestamp|
+--------------------+--------------------+
|{"event_type": "r...|2021-12-13 01:00:...|
|{"event_type": "r...|2021-12-13 01:00:...|
|{"event_type": "r...|2021-12-13 01:00:...|
|{"event_type": "r...|2021-12-13 01:00:...|
|{"event_type": "r...|2021-12-13 01:00:...|
|{"event_type": "r...|2021-12-13 01:00:...|
|{"event_type": "r...|2021-12-13 01:00:...|
|{"event_type": "r...|2021-12-13 01:00:...|
|{"event_type": "r...|2021-12-13 01:00:...|
|{"event_type": "r...|2021-12-13 01:00:...|
|{"event_type": "r...|2021-12-13 01:00:...|
|{"event_type": "r...|2021-12-13 01:00:...|
|{"event_type": "r...|2021-12-13 01:00:...|
|{"event_type": "r...|2021-12-13 01:00:...|
|{"event_type": "r...|2021-12-13 01:00:...|
|{"event_type": "r...|2021-12-13 01:00:...|
|{"event_type": "r...|2021-12-13 01:00:...|
|{"event_type": "r...|2021-12-13 01:00:...|
|{"event_type": "r...|2021-12-13 01:00:...|
|{"event_type": "r...|2021-12-13

## PART 4: User Behavior Analysis

In [24]:
# prepare tables, save to pandas for further analysis:

war.registerTempTable('war')
war = spark.sql("select * from war")
war_df = war.toPandas()
war_df.head()
war_df.tail()

Unnamed: 0,Accept,Host,User-Agent,description,event_type,timestamp
0,*/*,user1.comcast.com,ApacheBench/2.3,hard_war,start_war,2021-12-13 01:00:20.528
1,*/*,user1.comcast.com,ApacheBench/2.3,hard_war,start_war,2021-12-13 01:00:20.538
2,*/*,user1.comcast.com,ApacheBench/2.3,hard_war,start_war,2021-12-13 01:00:20.545
3,*/*,user1.comcast.com,ApacheBench/2.3,hard_war,start_war,2021-12-13 01:00:20.556
4,*/*,user1.comcast.com,ApacheBench/2.3,hard_war,start_war,2021-12-13 01:00:20.562


Unnamed: 0,Accept,Host,User-Agent,description,event_type,timestamp
17,*/*,user4.isvc.com,ApacheBench/2.3,hard_war,start_war,2021-12-13 01:00:36.191
18,*/*,user5.berkeley.com,ApacheBench/2.3,hard_war,start_war,2021-12-13 01:00:41.576
19,*/*,user5.berkeley.com,ApacheBench/2.3,hard_war,start_war,2021-12-13 01:00:41.585
20,*/*,user5.berkeley.com,ApacheBench/2.3,hard_war,start_war,2021-12-13 01:00:41.601
21,*/*,user5.berkeley.com,ApacheBench/2.3,hard_war,start_war,2021-12-13 01:00:41.622


In [25]:
# prepare tables, save to pandas for further analysis:

purchase.registerTempTable('purchase')
purchase = spark.sql("select * from purchase")
purchase_df = purchase.toPandas()
purchase_df.head()
purchase_df.tail()

Unnamed: 0,Accept,Host,User-Agent,description,event_type,timestamp
0,*/*,user1.comcast.com,ApacheBench/2.3,hard_war,start_war,2021-12-13 01:00:20.528
1,*/*,user1.comcast.com,ApacheBench/2.3,hard_war,start_war,2021-12-13 01:00:20.538
2,*/*,user1.comcast.com,ApacheBench/2.3,hard_war,start_war,2021-12-13 01:00:20.545
3,*/*,user1.comcast.com,ApacheBench/2.3,hard_war,start_war,2021-12-13 01:00:20.556
4,*/*,user1.comcast.com,ApacheBench/2.3,hard_war,start_war,2021-12-13 01:00:20.562


Unnamed: 0,Accept,Host,User-Agent,description,event_type,timestamp
17,*/*,user4.isvc.com,ApacheBench/2.3,hard_war,start_war,2021-12-13 01:00:36.191
18,*/*,user5.berkeley.com,ApacheBench/2.3,hard_war,start_war,2021-12-13 01:00:41.576
19,*/*,user5.berkeley.com,ApacheBench/2.3,hard_war,start_war,2021-12-13 01:00:41.585
20,*/*,user5.berkeley.com,ApacheBench/2.3,hard_war,start_war,2021-12-13 01:00:41.601
21,*/*,user5.berkeley.com,ApacheBench/2.3,hard_war,start_war,2021-12-13 01:00:41.622


In [26]:
#dataframe and Sanity check 

war_df.describe()
purchase_df.describe()

Unnamed: 0,Accept,Host,User-Agent,description,event_type,timestamp
count,22,22,22,22,22,22
unique,1,5,1,1,1,22
top,*/*,user2.att.com,ApacheBench/2.3,hard_war,start_war,2021-12-13 01:00:25.565
freq,22,6,22,22,22,1


Unnamed: 0,Accept,Host,User-Agent,description,event_type,timestamp
count,22,22,22,22,22,22
unique,1,5,1,1,1,22
top,*/*,user2.att.com,ApacheBench/2.3,hard_war,start_war,2021-12-13 01:00:25.565
freq,22,6,22,22,22,1


In [27]:
war_df.groupby(war_df.columns.tolist(),as_index=False).size()

Accept  Host                User-Agent       description  event_type  timestamp              
*/*     user1.comcast.com   ApacheBench/2.3  hard_war     start_war   2021-12-13 01:00:20.528    1
                                                                      2021-12-13 01:00:20.538    1
                                                                      2021-12-13 01:00:20.545    1
                                                                      2021-12-13 01:00:20.556    1
                                                                      2021-12-13 01:00:20.562    1
        user2.att.com       ApacheBench/2.3  hard_war     start_war   2021-12-13 01:00:25.531    1
                                                                      2021-12-13 01:00:25.545    1
                                                                      2021-12-13 01:00:25.556    1
                                                                      2021-12-13 01:00:25.565    1
               

#### Purchase events (both swords and gear)

#### Purchase Events

In [28]:
@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    elif event["event_type"] == "purchase_gear":
        return True
    return False

In [29]:
purchase_events = raw_events.\
                  select(raw_events.value.cast('string').alias('raw'), 
                         raw_events.timestamp.cast('string')).\
                  filter(is_purchase('raw'))

extracted_purchase_events = purchase_events.\
                            rdd.map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))).\
                            toDF()
extracted_purchase_events.printSchema()
extracted_purchase_events.show(5)
# Read and show purchase events.
# extracted_purchase_events.write.mode('overwrite').parquet('/tmp/purchase/')
# purchase = spark.read.parquet('/tmp/purchase/*')
# purchase.show() 

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- description: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- price: double (nullable = true)
 |-- timestamp: string (nullable = true)

+------+-----------------+---------------+----------------+--------------+-----+--------------------+
|Accept|             Host|     User-Agent|     description|    event_type|price|           timestamp|
+------+-----------------+---------------+----------------+--------------+-----+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|lightsaber_sword|purchase_sword|150.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|    buster_sword|purchase_sword|200.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|    buster_sword|purchase_sword|200.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|   diamond_sword|purchase_sword|300.0|2021-12-13 01:00:...|
|

#### Preliminary Analysis

In [30]:
extracted_purchase_events.groupBy('description').count().show()
# ax = pd.DataFrame(df_num_swords).plot.bar(x='description', y='count', rot=0)

+----------------+-----+
|     description|count|
+----------------+-----+
|    buster_sword|   12|
|   diamond_sword|   12|
|          shield|   21|
|           spear|   15|
|lightsaber_sword|   14|
|          helmet|   14|
+----------------+-----+



In [31]:
extracted_purchase_events.groupBy('Host').sum("price").show()

+------------------+----------+
|              Host|sum(price)|
+------------------+----------+
|     user2.att.com|    2770.0|
|  user3.google.com|    2805.0|
| user1.comcast.com|    2165.0|
|    user4.isvc.com|    3665.0|
|user5.berkeley.com|    3625.0|
+------------------+----------+



In [32]:
extracted_purchase_events.groupBy('event_type', 'Host').count().show()

+--------------+------------------+-----+
|    event_type|              Host|count|
+--------------+------------------+-----+
| purchase_gear|  user3.google.com|   10|
| purchase_gear|    user4.isvc.com|   13|
| purchase_gear| user1.comcast.com|    5|
| purchase_gear|     user2.att.com|   12|
|purchase_sword|  user3.google.com|    7|
|purchase_sword|    user4.isvc.com|    9|
|purchase_sword| user1.comcast.com|    6|
| purchase_gear|user5.berkeley.com|   10|
|purchase_sword|     user2.att.com|    6|
|purchase_sword|user5.berkeley.com|   10|
+--------------+------------------+-----+



## PART 5:  User Interacts with Mobile App

#### 5.1 User interacts with mobile app

* mobile app makes API calls to web services
* API server handles requests:
* handles actual business requirements (e.g., process purchase)
* logs events to kafka
* spark then:
* pulls events from kafka
* filters/flattens/transforms events
* writes them to storage
* presto then queries those events 



In [33]:
@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [34]:
raw_events = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "events") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

In [35]:
purchase_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_purchase('raw'))

In [36]:
extracted_purchase_events = purchase_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()

In [37]:
extracted_purchase_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- description: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- price: double (nullable = true)
 |-- timestamp: string (nullable = true)



In [38]:
extracted_purchase_events.show(5)

+------+-----------------+---------------+----------------+--------------+-----+--------------------+
|Accept|             Host|     User-Agent|     description|    event_type|price|           timestamp|
+------+-----------------+---------------+----------------+--------------+-----+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|lightsaber_sword|purchase_sword|150.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|    buster_sword|purchase_sword|200.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|    buster_sword|purchase_sword|200.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|   diamond_sword|purchase_sword|300.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|   diamond_sword|purchase_sword|300.0|2021-12-13 01:00:...|
+------+-----------------+---------------+----------------+--------------+-----+--------------------+
only showing top 5 rows



In [39]:
extracted_purchase_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/purchases')

In [40]:
purchases = spark.read.parquet('/tmp/purchases')
returns = spark.read.parquet('/tmp/return')

In [41]:
returns.show(5)

+------+-----------------+---------------+----------------+------------+------+--------------------+
|Accept|             Host|     User-Agent|     description|  event_type| price|           timestamp|
+------+-----------------+---------------+----------------+------------+------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|   diamond_sword|return_sword|-300.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|   diamond_sword|return_sword|-300.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|           spear| return_gear|-140.0|2021-12-13 01:00:...|
|   */*|    user2.att.com|ApacheBench/2.3|lightsaber_sword|return_sword|-150.0|2021-12-13 01:00:...|
|   */*|    user2.att.com|ApacheBench/2.3|lightsaber_sword|return_sword|-150.0|2021-12-13 01:00:...|
+------+-----------------+---------------+----------------+------------+------+--------------------+
only showing top 5 rows



In [42]:
purchases.show(5)

+------+-----------------+---------------+----------------+--------------+-----+--------------------+
|Accept|             Host|     User-Agent|     description|    event_type|price|           timestamp|
+------+-----------------+---------------+----------------+--------------+-----+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|lightsaber_sword|purchase_sword|150.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|    buster_sword|purchase_sword|200.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|    buster_sword|purchase_sword|200.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|   diamond_sword|purchase_sword|300.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|   diamond_sword|purchase_sword|300.0|2021-12-13 01:00:...|
+------+-----------------+---------------+----------------+--------------+-----+--------------------+
only showing top 5 rows



In [43]:
purchases.registerTempTable('purchases')
returns.registerTempTable('returns')

In [44]:
purchases_by_example2 = spark.sql("select * from purchases where Host = 'user1.comcast.com'")

In [45]:
purchases_by_example2.show()

+------+-----------------+---------------+----------------+--------------+-----+--------------------+
|Accept|             Host|     User-Agent|     description|    event_type|price|           timestamp|
+------+-----------------+---------------+----------------+--------------+-----+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|lightsaber_sword|purchase_sword|150.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|    buster_sword|purchase_sword|200.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|    buster_sword|purchase_sword|200.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|   diamond_sword|purchase_sword|300.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|   diamond_sword|purchase_sword|300.0|2021-12-13 01:00:...|
|   */*|user1.comcast.com|ApacheBench/2.3|   diamond_sword|purchase_sword|300.0|2021-12-13 01:00:...|
+------+-----------------+---------------+----------------+--------------+-----+--

In [46]:
df = purchases_by_example2.toPandas()

In [47]:
df.head()

Unnamed: 0,Accept,Host,User-Agent,description,event_type,price,timestamp
0,*/*,user1.comcast.com,ApacheBench/2.3,lightsaber_sword,purchase_sword,150.0,2021-12-13 01:00:16.093
1,*/*,user1.comcast.com,ApacheBench/2.3,buster_sword,purchase_sword,200.0,2021-12-13 01:00:17.129
2,*/*,user1.comcast.com,ApacheBench/2.3,buster_sword,purchase_sword,200.0,2021-12-13 01:00:17.139
3,*/*,user1.comcast.com,ApacheBench/2.3,diamond_sword,purchase_sword,300.0,2021-12-13 01:00:17.961
4,*/*,user1.comcast.com,ApacheBench/2.3,diamond_sword,purchase_sword,300.0,2021-12-13 01:00:17.975


##### 5.1.1 A quick check how many events were connected from each Host.

In [48]:
Host_count = spark.sql("SELECT Host, COUNT(Host) as Host_count FROM purchases GROUP BY Host ORDER BY count() desc")
#att_count = spark.sql("SELECT COUNT(Host) as ATT FROM purchases WHERE Host = 'user2.att.com'")

In [49]:
Host_count.show()

+------------------+----------+
|              Host|Host_count|
+------------------+----------+
|     user2.att.com|         6|
|  user3.google.com|         7|
| user1.comcast.com|         6|
|    user4.isvc.com|         9|
|user5.berkeley.com|        10|
+------------------+----------+



##### 5.1.2 What are the start and stop time for return events ? 

In [50]:
time_stamp_table = spark.sql("SELECT MIN(timestamp), MAX(timestamp) FROM returns")

In [51]:
time_stamp_table.show()

+--------------------+--------------------+
|      min(timestamp)|      max(timestamp)|
+--------------------+--------------------+
|2021-12-13 01:00:...|2021-12-13 01:00:...|
+--------------------+--------------------+




####  5.2 Stream Apache Bench

Issue infinite loop to run Apache Bench: 
```
docker-compose exec cloudera hadoop fs -ls /tmp/sword_purchases
do docker-compose exec mids ab -n 5 -H "Host: user5.berkeley.com" http://localhost:5000/purchase_a_lightsaber_sword
```
Command to see if infinite loop wrote to HDFS  
```
docker-compose exec cloudera hadoop fs -ls /tmp/sword_purchases
```

In [52]:
def purchase_sword_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
    ])

In [53]:
@udf('boolean')
def is_sword_purchase(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [54]:
raw_events = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .load()

In [55]:
#raw_events.show()

In [56]:
sword_purchases = raw_events \
        .filter(is_sword_purchase(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          purchase_sword_event_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')

In [57]:
print(sword_purchases)

DataFrame[raw_event: string, timestamp: string, Accept: string, Host: string, User-Agent: string, event_type: string]


In [58]:
sink = sword_purchases \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_sword_purchases") \
        .option("path", "/tmp/sword_purchases") \
        .trigger(processingTime="10 seconds") \
        .start()

### 5.3 Hive  
 
###### 5.3.1 Run Hive 

Run Hive: 
```
docker-compose exec cloudera hive
```

To hold the output format, the output is pasted inside the cells below:

In [59]:
# ~/w205/project-3-cynyzhu$ docker-compose exec cloudera hive
# 2021-12-12 09:09:57,329 WARN  [main] mapreduce.TableMapReduceUtil: The hbase-prefix-tree module jar containing PrefixTreeCodec is not present.  Continuing without it.

# Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j.properties
# WARNING: Hive CLI is deprecated and migration to Beeline is recommended.
# hive> create external table if not exists default.purchases2 (Accept string, Host string, User_Agent string, event_type string, timestamp string) stored as parquet location '/tmp/purchases'  tblproperties ("parquet.compress"="SNAPPY");
# OK
# Time taken: 1.134 seconds
# hive> 

###### 4.3.2  run spark job to start up pyspark

Read parquet from hdfs; register temp table; create external table purchase event; store as parquet
```
docker-compose exec spark pyspark
```

To hold the output format, the output is pasted inside the cells below:

In [60]:
# >>> df = spark.read.parquet('/tmp/purchases')
# >>> df.registerTempTable('purchases')
# >>> query = """
# ... create external table purchase_events
# ...   stored as parquet
# ...   location '/tmp/purchase_events'
# ...   as
# ...   select * from purchases
# ... """

### 5.4 Presto

We use Presto talking to the hive thrift server to get the table we added. After connected to hdfs we get the data
Querying with presto.

###### 5.4.1 start presto
```
docker-compose exec presto presto --server presto:8080 --catalog hive --schema default
```

###### 5.4.2 Querying with presto

* Table; 
* select * from purchases limit 5;
* SELECT Host, SUM(price) total_spending FROM purchases GROUP BY HOST ORDER BY total_spending desc;

To hold the output format, the query results are pasted inside the cells below:

In [61]:
# '''
# presto:default>   Table   
#              -> -----------  
#              ->  purchases  
#              -> (1 row) 
#              ->  
#              -> Query 20180404_224746_00009_zsma3, FINISHED, 1 node 
#              -> Splits: 2 total, 1 done (50.00%) 
#              -> 0:00 [1 rows, 34B] [10 rows/s, 342B/s]
             
# '''

In [62]:
# ''''''

# presto:default> select * from purchases limit 5;
#  accept |       host        |   user-agent    |   description    |   event_type   | price |        timestamp        
# --------+-------------------+-----------------+------------------+----------------+-------+-------------------------
#  */*    | user1.comcast.com | ApacheBench/2.3 | lightsaber_sword | purchase_sword | 150.0 | 2021-12-12 07:51:04.626 
#  */*    | user1.comcast.com | ApacheBench/2.3 | buster_sword     | purchase_sword | 200.0 | 2021-12-12 07:51:05.177 
#  */*    | user1.comcast.com | ApacheBench/2.3 | buster_sword     | purchase_sword | 200.0 | 2021-12-12 07:51:05.183 
#  */*    | user1.comcast.com | ApacheBench/2.3 | diamond_sword    | purchase_sword | 300.0 | 2021-12-12 07:51:05.656 
#  */*    | user1.comcast.com | ApacheBench/2.3 | diamond_sword    | purchase_sword | 300.0 | 2021-12-12 07:51:05.662 
# (5 rows)

# Query 20211212_091838_00010_3p4py, FINISHED, 1 node
# Splits: 2 total, 0 done (0.00%)
# 0:04 [0 rows, 0B] [0 rows/s, 0B/s]

# presto:default> 

# ''''''

In [63]:
# ''''''
# presto:default> SELECT Host, SUM(price) total_spending FROM purchases GROUP BY HOST ORDER BY total_spending desc;
#         Host        | total_spending 
# --------------------+----------------
#  user4.isvc.com     |         2050.0 
#  user5.berkeley.com |         2050.0 
#  user1.comcast.com  |         1450.0 
#  user3.google.com   |         1400.0 
#  user2.att.com      |         1150.0 
# (5 rows)

# Query 20211212_092044_00011_3p4py, FINISHED, 1 node
# Splits: 3 total, 1 done (33.33%)
# 0:01 [38 rows, 2.36KB] [45 rows/s, 2.85KB/s]

# presto:default> 
    
# ''''''

In [64]:
#stops streaming
sink.stop()

## PART 6: A Shopping Cart Implementation

### 6.1 Method 1

The Shopping Cart uses as a source the event data from Kafka/Spark, specifically the product, price, user, price_tot columns are used to create an in-memory SQLLite table that serves as the table that represents the data in the user's current shopping cart.

###### 6.1.1.Stage 1, preprocessing

In this stage we extract the data from the event in Spark to an Array, then transform the extracted data to the proper format for insertion into SQLLite.

In [65]:
# Convert the first row of the Spark dataframe event to an Array by collecting from the RDD.
cart_data = extracted_purchase_events.select("description", 'Host', 'event_type', 'price', 'timestamp').limit(1).rdd.flatMap(lambda x: x).collect()

# Convert each element in the given Array to a string.
cd = []
for i in cart_data:
    cd.append(str(i))
    
str1 = ""
for e in cd:
   str1 += "'"+e+"', "

final_insert = str1[:-2] 
# sanity check the query string.
# print(final_insert)

##### 6.1.2 Stage 2, building the cart
In this stage we perform DDL/DML operations to build and modify the contents of the shopping cart, where a user may add or remove items from their shopping cart.

1. Create the CART table which supports the adding products to shopping cart.
2. Using the preprocessed event data, insert the event data from Spark into the SQLLite CART table.
3. We then select the event data from the SQLLite CART table.
4. And lastly, we delete the inserted event from the CART table.

In [66]:
import sqlite3

# Create the CART table.
conn = sqlite3.connect('shopping_db.db')
print ("Opened database successfully")
conn.execute('''DROP TABLE IF EXISTS CART''')
conn.execute('''CREATE TABLE CART
       (ID TEXT     NOT NULL,
       product           TEXT    NOT NULL,
       price            TEXT     NOT NULL,
       user        CHAR(50),
       price_tot         REAL);''')

print ("Table created successfully")
conn.commit()
conn.close()


# Insert the event into the CART table.
conn = sqlite3.connect('shopping_db.db')
print ("Opened database successfully")
conn.execute(f"INSERT INTO CART (ID, product, price, user, price_tot) VALUES ( {final_insert} )")
print("Total number of rows Inserted :" + str(conn.total_changes))
conn.commit()
conn.close()


# Select the inserted event from the CART table.
conn = sqlite3.connect('shopping_db.db')
print ("Opened database successfully")
cursor = conn.execute("SELECT ID, product, user, price_tot from CART")
for row in cursor:
   print ("ID = ", row[0])
   print ("product = ", row[1])
   print ("user = ", row[2])
   print ("price_tot = ", row[3])
print("Selected rows successfully.")
conn.commit()
conn.close()


# Delete the event from the CART table.
conn = sqlite3.connect('shopping_db.db')
print ("Opened database successfully")
conn.execute("DELETE from CART where ID IS NOT NULL")
print("Total number of rows Deleted from shopping cart:" + str(conn.total_changes))
conn.commit()
conn.close()


Opened database successfully


<sqlite3.Cursor at 0x7f248c583ab0>

<sqlite3.Cursor at 0x7f248c583c00>

Table created successfully
Opened database successfully


<sqlite3.Cursor at 0x7f248c583b90>

Total number of rows Inserted :1
Opened database successfully
ID =  lightsaber_sword
product =  user1.comcast.com
user =  150.0
price_tot =  2021-12-13 01:00:16.093
Selected rows successfully.
Opened database successfully


<sqlite3.Cursor at 0x7f248c583ce0>

Total number of rows Deleted from shopping cart:1


### 6.2 Method 2:  

In this stage we use Sqlite3 to perform DDL/DML operations to build and modify the contents of the shopping cart, where a user may add or remove items from their shopping cart.

1. Create the CART table which supports the adding products to shopping cart.
2. Using the preprocessed event data, insert the event data from Spark into the SQLLite CART table.
3. We then select the event data from the SQLLite CART table.
4. And lastly, we delete the inserted event from the CART table.

In [67]:
import sqlite3
# Create your connection.
cnx = sqlite3.connect(':memory:')

### 6.3 Shopping CART Operations: 

##### 6.3.1 Create Salite3 based CART

In [68]:
CART_purchase = extracted_purchase_events.toPandas()
CART_return = extracted_return_events.toPandas()

In [69]:
CART_purchase.to_sql(name='CART', con=cnx)

In [70]:
p2 = pd.read_sql('select * from CART', cnx)

In [71]:
pd.read_sql('select * from CART limit(5)', cnx)

Unnamed: 0,index,Accept,Host,User-Agent,description,event_type,price,timestamp
0,0,*/*,user1.comcast.com,ApacheBench/2.3,lightsaber_sword,purchase_sword,150.0,2021-12-13 01:00:16.093
1,1,*/*,user1.comcast.com,ApacheBench/2.3,buster_sword,purchase_sword,200.0,2021-12-13 01:00:17.129
2,2,*/*,user1.comcast.com,ApacheBench/2.3,buster_sword,purchase_sword,200.0,2021-12-13 01:00:17.139
3,3,*/*,user1.comcast.com,ApacheBench/2.3,diamond_sword,purchase_sword,300.0,2021-12-13 01:00:17.961
4,4,*/*,user1.comcast.com,ApacheBench/2.3,diamond_sword,purchase_sword,300.0,2021-12-13 01:00:17.975


In [72]:
pd.read_sql('select sum(price) from CART', cnx)

Unnamed: 0,sum(price)
0,8100.0


##### 6.3.2 Update purchase events in the CART

1. add purchase events
2. calcuilate total income 

In [73]:
CART_purchase.to_sql('CART', cnx, if_exists='append', index=False)

In [74]:
pd.read_sql('select * from CART limit(5) ', cnx)

Unnamed: 0,index,Accept,Host,User-Agent,description,event_type,price,timestamp
0,0,*/*,user1.comcast.com,ApacheBench/2.3,lightsaber_sword,purchase_sword,150.0,2021-12-13 01:00:16.093
1,1,*/*,user1.comcast.com,ApacheBench/2.3,buster_sword,purchase_sword,200.0,2021-12-13 01:00:17.129
2,2,*/*,user1.comcast.com,ApacheBench/2.3,buster_sword,purchase_sword,200.0,2021-12-13 01:00:17.139
3,3,*/*,user1.comcast.com,ApacheBench/2.3,diamond_sword,purchase_sword,300.0,2021-12-13 01:00:17.961
4,4,*/*,user1.comcast.com,ApacheBench/2.3,diamond_sword,purchase_sword,300.0,2021-12-13 01:00:17.975


In [75]:
pd.read_sql('select sum(price) from CART', cnx)

Unnamed: 0,sum(price)
0,16200.0


##### 6.3.3 Update return events in CART
 
1. add resturn events
2. calcuilate total income 

In [76]:
CART_return.to_sql('CART', cnx, if_exists='append', index=False)

In [77]:
pd.read_sql('select * from CART limit(5)', cnx)

Unnamed: 0,index,Accept,Host,User-Agent,description,event_type,price,timestamp
0,0,*/*,user1.comcast.com,ApacheBench/2.3,lightsaber_sword,purchase_sword,150.0,2021-12-13 01:00:16.093
1,1,*/*,user1.comcast.com,ApacheBench/2.3,buster_sword,purchase_sword,200.0,2021-12-13 01:00:17.129
2,2,*/*,user1.comcast.com,ApacheBench/2.3,buster_sword,purchase_sword,200.0,2021-12-13 01:00:17.139
3,3,*/*,user1.comcast.com,ApacheBench/2.3,diamond_sword,purchase_sword,300.0,2021-12-13 01:00:17.961
4,4,*/*,user1.comcast.com,ApacheBench/2.3,diamond_sword,purchase_sword,300.0,2021-12-13 01:00:17.975


In [78]:
pd.read_sql('select sum(price) from CART', cnx)

Unnamed: 0,sum(price)
0,11240.0


##### 6.3.4 Edit records and events in CART
 
1. Edit records and events in CART
2. calcuilate total income 

In [79]:
cnx.execute("DELETE from CART where Host IS 'user3.google.com'")

<sqlite3.Cursor at 0x7f248c583b90>

In [80]:
pd.read_sql('select sum(price) from CART', cnx)

Unnamed: 0,sum(price)
0,9120.0


##### 6.3.5 Statistice business transactions and profits 

1. Edit records and events in CART
2. calcuilate total income 

In [81]:
pd.read_sql('select * from CART limit(5)', cnx)

Unnamed: 0,index,Accept,Host,User-Agent,description,event_type,price,timestamp
0,0,*/*,user1.comcast.com,ApacheBench/2.3,lightsaber_sword,purchase_sword,150.0,2021-12-13 01:00:16.093
1,1,*/*,user1.comcast.com,ApacheBench/2.3,buster_sword,purchase_sword,200.0,2021-12-13 01:00:17.129
2,2,*/*,user1.comcast.com,ApacheBench/2.3,buster_sword,purchase_sword,200.0,2021-12-13 01:00:17.139
3,3,*/*,user1.comcast.com,ApacheBench/2.3,diamond_sword,purchase_sword,300.0,2021-12-13 01:00:17.961
4,4,*/*,user1.comcast.com,ApacheBench/2.3,diamond_sword,purchase_sword,300.0,2021-12-13 01:00:17.975


In [82]:
pd.read_sql('select sum(price) from CART', cnx)

Unnamed: 0,sum(price)
0,9120.0


##### 6.3.6 Which user spent most Monday to purchase ?  


In [83]:
pd.read_sql( 'SELECT Host, SUM(price) total_spending FROM CART GROUP BY HOST ORDER BY total_spending desc', cnx)           

Unnamed: 0,Host,total_spending
0,user4.isvc.com,3225.0
1,user1.comcast.com,2160.0
2,user5.berkeley.com,2095.0
3,user2.att.com,1640.0


##### 6.3.7 How many transactions for each of the users are there in  this game ?  

In [84]:
pd.read_sql( 'SELECT Host, count() transactions FROM CART GROUP BY HOST ORDER BY count() desc', cnx) 

Unnamed: 0,Host,transactions
0,user5.berkeley.com,30
1,user4.isvc.com,22
2,user2.att.com,16
3,user1.comcast.com,15


##### 6.3.8 How many each typr of events are there in  this game ?  

In [85]:
pd.read_sql( 'SELECT event_type, count() event_count FROM CART GROUP BY event_type ORDER BY count() desc', cnx) 

Unnamed: 0,event_type,event_count
0,purchase_sword,62
1,return_sword,12
2,return_gear,9


##### 6.3.9 What is the total amount of refound for the returned products?  

In [86]:
pd.read_sql( 'SELECT sum(price) FROM CART where event_type like "return%"  ', cnx) 

Unnamed: 0,sum(price)
0,-4280.0
