<a href="https://colab.research.google.com/github/DimaFrank/Association_Rule_Learning/blob/logs/User_ct_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [85]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [218]:
import pyspark as spark
from google.colab import drive

from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import array, col, concat_ws, udf, array_remove, size, window, to_timestamp, date_format, concat, lit

In [None]:
drive.mount('/content/drive')
!wget --quiet --show-progress https://drive.google.com/drive/u/0/folders/1zKTXVP8ncsRRXgtFxA6MvL-lWirAoc6a

Mounted at /content/drive


### Get Data:

In [219]:
spark = SparkSession.builder.master("local[*]").appName("user_ct_test").getOrCreate()
sc = spark.sparkContext

In [103]:
!ls /content/drive/MyDrive/ColabDatasets/

Italy_Earthquakes		 Retail_Store_Sales_Transactions.zip
ItalyEarthquakes.zip		 rts22_test.csv
Retail_Store_Sales_Transactions  user-ct-test-collection-01.txt.gz


In [104]:
FILE_PATH = "/content/drive/MyDrive/ColabDatasets/user-ct-test-collection-01.txt.gz"

In [105]:
data = spark.sparkContext.textFile(FILE_PATH)
type(data)

pyspark.rdd.RDD

In [106]:
print("initial partition count:"+str(data.getNumPartitions()))

initial partition count:1


In [107]:
data.take(5)

['AnonID\tQuery\tQueryTime\tItemRank\tClickURL',
 '142\trentdirect.com\t2006-03-01 07:17:12\t\t',
 '142\twww.prescriptionfortime.com\t2006-03-12 12:31:06\t\t',
 '142\tstaple.com\t2006-03-17 21:19:29\t\t',
 '142\tstaple.com\t2006-03-17 21:19:45\t\t']

In [108]:
data.count()

3558412

In [109]:
header = data.first()
data = data.filter(lambda row: row != header) # filter out the header

In [None]:
data.take(5)

['142\trentdirect.com\t2006-03-01 07:17:12\t\t',
 '142\twww.prescriptionfortime.com\t2006-03-12 12:31:06\t\t',
 '142\tstaple.com\t2006-03-17 21:19:29\t\t',
 '142\tstaple.com\t2006-03-17 21:19:45\t\t',
 '142\twww.newyorklawyersite.com\t2006-03-18 08:02:58\t\t']

In [110]:
header.split('\t')

['AnonID', 'Query', 'QueryTime', 'ItemRank', 'ClickURL']

### Generic RDD parser implementation

In [131]:
from ast import Raise
import os.path
from os import path

class GenericParser:
  # dataset: <str> Full path to the dataset in Drive.

  global col_list, header

  def __init__(self, dataset, dlm='\t'):
    if isinstance(dataset, str):
       if path.exists(dataset)==True:
         self.dataset=dataset
         self.dlm=dlm
       else:
         raise Exception("The specified path does not exist!")       
    else: 
      raise Exception("<str> variable should be provided as dataset argument!")

  def GetColNames(self):
     rdd_temp = spark.sparkContext.textFile(self.dataset)
     header=rdd_temp.first()
     col_list=header.split(self.dlm)
     col_list=[el.upper() for el in col_list]
     return col_list

  # cols: <list>/<str> Column name/s to select from the data.
  def GetData(self, cols):
     rdd = spark.sparkContext.textFile(self.dataset) 
     if isinstance(cols,list)==True:          
        col_list=self.GetColNames()
        rdd = rdd.filter(lambda row: row != header)
        for i in range(len(col_list)):
            if col_list[i] not in [el.upper() for el in cols]:
                break
            else:
                indexes=[]
                for i in range(len(cols)):
                    element = col_list.index(cols[i].upper())
                    indexes.append(element)

                return rdd.map(lambda r: r.split(self.dlm)).map(lambda r: [r[i] for i in indexes])

     elif isinstance(cols,str):
         col_list=self.GetColNames()
         index = col_list.index(cols.upper())       
         return rdd.map(lambda r: r.split(self.dlm)).map(lambda r: r[index]) 


### Extracting and cleaning user sessions

In [112]:
RDD_Column = GenericParser(FILE_PATH).GetColNames()
RDD_Column

['ANONID', 'QUERY', 'QUERYTIME', 'ITEMRANK', 'CLICKURL']

In [130]:
df = GenericParser(FILE_PATH).GetData(['ANONID', 'QUERY', 'QUERYTIME', 'ITEMRANK', 'CLICKURL']).toDF(RDD_Column)
type(df)

pyspark.sql.dataframe.DataFrame

In [116]:
df.show(15)

+------+--------------------+-------------------+--------+--------------------+
|ANONID|               QUERY|          QUERYTIME|ITEMRANK|            CLICKURL|
+------+--------------------+-------------------+--------+--------------------+
|   142|      rentdirect.com|2006-03-01 07:17:12|        |                    |
|   142|www.prescriptionf...|2006-03-12 12:31:06|        |                    |
|   142|          staple.com|2006-03-17 21:19:29|        |                    |
|   142|          staple.com|2006-03-17 21:19:45|        |                    |
|   142|www.newyorklawyer...|2006-03-18 08:02:58|        |                    |
|   142|www.newyorklawyer...|2006-03-18 08:03:09|        |                    |
|   142|     westchester.gov|2006-03-20 03:55:57|       1|http://www.westch...|
|   142|       space.comhttp|2006-03-24 20:51:24|        |                    |
|   142|                dfdf|2006-03-24 22:23:07|        |                    |
|   142|                dfdf|2006-03-24 

In [117]:
df_clean = df.filter("CLICKURL != ''")
df_clean.show(10)

+------+--------------------+-------------------+--------+--------------------+
|ANONID|               QUERY|          QUERYTIME|ITEMRANK|            CLICKURL|
+------+--------------------+-------------------+--------+--------------------+
|   142|     westchester.gov|2006-03-20 03:55:57|       1|http://www.westch...|
|   142|        207 ad2d 530|2006-04-08 01:31:14|       1|http://www.courts...|
|   142|            vera.org|2006-04-08 08:38:42|       1| http://www.vera.org|
|   217|             lottery|2006-03-01 11:58:51|       1|http://www.calott...|
|   217|             lottery|2006-03-01 11:58:51|       1|http://www.calott...|
|   217|      ameriprise.com|2006-03-01 14:06:23|       1|http://www.amerip...|
|   217|          mizuno.com|2006-03-07 22:41:17|       1|http://www.mizuno...|
|   217|asiansexygoddess.com|2006-03-16 14:31:36|       1|http://www.asians...|
|   217|bestasiancompany.com|2006-03-20 15:15:43|       1|http://www.bestas...|
|   217|             lottery|2006-03-27 

In [128]:
df_clean = df_clean.withColumn("_QUERY_TIME_",to_timestamp("QUERYTIME"))
df_clean.show(10)

+------+--------------------+-------------------+--------+--------------------+-------------------+
|ANONID|               QUERY|          QUERYTIME|ITEMRANK|            CLICKURL|       _QUERY_TIME_|
+------+--------------------+-------------------+--------+--------------------+-------------------+
|   142|     westchester.gov|2006-03-20 03:55:57|       1|http://www.westch...|2006-03-20 03:55:57|
|   142|        207 ad2d 530|2006-04-08 01:31:14|       1|http://www.courts...|2006-04-08 01:31:14|
|   142|            vera.org|2006-04-08 08:38:42|       1| http://www.vera.org|2006-04-08 08:38:42|
|   217|             lottery|2006-03-01 11:58:51|       1|http://www.calott...|2006-03-01 11:58:51|
|   217|             lottery|2006-03-01 11:58:51|       1|http://www.calott...|2006-03-01 11:58:51|
|   217|      ameriprise.com|2006-03-01 14:06:23|       1|http://www.amerip...|2006-03-01 14:06:23|
|   217|          mizuno.com|2006-03-07 22:41:17|       1|http://www.mizuno...|2006-03-07 22:41:17|


In [129]:
df_clean.printSchema()

root
 |-- ANONID: string (nullable = true)
 |-- QUERY: string (nullable = true)
 |-- QUERYTIME: string (nullable = true)
 |-- ITEMRANK: string (nullable = true)
 |-- CLICKURL: string (nullable = true)
 |-- _QUERY_TIME_: timestamp (nullable = true)



In [118]:
df_clean.count()

1890568

In [119]:
df_grouped = df_clean.groupBy("ANONID", window("QUERYTIME", "10 minutes")).count()
df_grouped.show(5)

+------+--------------------+-----+
|ANONID|              window|count|
+------+--------------------+-----+
|  2722|{2006-03-23 11:50...|    1|
|  2722|{2006-04-06 07:10...|    1|
|  2722|{2006-05-14 22:50...|    2|
|  2722|{2006-05-25 00:10...|    1|
|  4781|{2006-03-10 12:00...|    1|
+------+--------------------+-----+
only showing top 5 rows



In [120]:
df_grouped.printSchema()

root
 |-- ANONID: string (nullable = true)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)



In [122]:
df_grouped.count()

976288

In [123]:
df_clean.printSchema()

root
 |-- ANONID: string (nullable = true)
 |-- QUERY: string (nullable = true)
 |-- QUERYTIME: string (nullable = true)
 |-- ITEMRANK: string (nullable = true)
 |-- CLICKURL: string (nullable = true)



In [132]:
df_clean.createOrReplaceTempView("df_clean")
df_grouped.createOrReplaceTempView("df_grouped")

In [135]:
spark.sql('select * from df_clean').show(10)
                                      

+------+--------------------+-------------------+--------+--------------------+-------------------+
|ANONID|               QUERY|          QUERYTIME|ITEMRANK|            CLICKURL|       _QUERY_TIME_|
+------+--------------------+-------------------+--------+--------------------+-------------------+
|   142|     westchester.gov|2006-03-20 03:55:57|       1|http://www.westch...|2006-03-20 03:55:57|
|   142|        207 ad2d 530|2006-04-08 01:31:14|       1|http://www.courts...|2006-04-08 01:31:14|
|   142|            vera.org|2006-04-08 08:38:42|       1| http://www.vera.org|2006-04-08 08:38:42|
|   217|             lottery|2006-03-01 11:58:51|       1|http://www.calott...|2006-03-01 11:58:51|
|   217|             lottery|2006-03-01 11:58:51|       1|http://www.calott...|2006-03-01 11:58:51|
|   217|      ameriprise.com|2006-03-01 14:06:23|       1|http://www.amerip...|2006-03-01 14:06:23|
|   217|          mizuno.com|2006-03-07 22:41:17|       1|http://www.mizuno...|2006-03-07 22:41:17|


In [136]:
spark.sql('select * from df_grouped').show(10)

+------+--------------------+-----+
|ANONID|              window|count|
+------+--------------------+-----+
|  2722|{2006-03-23 11:50...|    1|
|  2722|{2006-04-06 07:10...|    1|
|  2722|{2006-05-14 22:50...|    2|
|  2722|{2006-05-25 00:10...|    1|
|  4781|{2006-03-10 12:00...|    1|
|  4781|{2006-04-15 09:50...|    2|
|  4781|{2006-05-09 16:30...|    3|
|  6497|{2006-04-06 01:50...|    1|
|  7005|{2006-04-19 11:10...|    1|
|  8400|{2006-03-11 14:40...|    2|
+------+--------------------+-----+
only showing top 10 rows



In [163]:
df_joined = df_clean.join(df_grouped,((df_clean.ANONID == df_grouped.ANONID) & (df_clean._QUERY_TIME_.between(df_grouped.window.start,df_grouped.window.end))),'left_outer').drop(df_grouped.ANONID)
df_joined.show(10)

+--------------------+-------------------+--------+--------------------+-------------------+------+--------------------+-----+
|               QUERY|          QUERYTIME|ITEMRANK|            CLICKURL|       _QUERY_TIME_|ANONID|              window|count|
+--------------------+-------------------+--------+--------------------+-------------------+------+--------------------+-----+
|     westchester.gov|2006-03-20 03:55:57|       1|http://www.westch...|2006-03-20 03:55:57|   142|{2006-03-20 03:50...|    1|
|        207 ad2d 530|2006-04-08 01:31:14|       1|http://www.courts...|2006-04-08 01:31:14|   142|{2006-04-08 01:30...|    1|
|            vera.org|2006-04-08 08:38:42|       1| http://www.vera.org|2006-04-08 08:38:42|   142|{2006-04-08 08:30...|    1|
|             lottery|2006-03-01 11:58:51|       1|http://www.calott...|2006-03-01 11:58:51|   217|{2006-03-01 11:50...|    2|
|             lottery|2006-03-01 11:58:51|       1|http://www.calott...|2006-03-01 11:58:51|   217|{2006-03-01 

In [164]:
df_joined.count()

1891306

In [173]:
min_support = 20
df_joined.createOrReplaceTempView('df_joined')
query1 = spark.sql('''
                    SELECT ANONID, window, count(*) as cnt
                    FROM df_joined 
                    GROUP BY ANONID, window
                    HAVING cnt > {min_supp}
                    ORDER BY cnt DESC
                   '''.format(min_supp=min_support))
query1.show(15)

+--------+--------------------+---+
|  ANONID|              window|cnt|
+--------+--------------------+---+
| 3010310|{2006-03-28 18:40...|141|
|15164617|{2006-04-27 11:50...| 88|
|  607863|{2006-04-18 08:10...| 84|
| 3362728|{2006-03-23 18:00...| 56|
|  963132|{2006-05-21 12:50...| 56|
| 3241737|{2006-05-06 20:10...| 51|
| 8695017|{2006-04-02 19:50...| 51|
|12025009|{2006-03-20 20:20...| 49|
| 2023782|{2006-03-17 17:30...| 45|
| 3241737|{2006-04-26 18:30...| 43|
| 3639132|{2006-03-04 18:10...| 42|
| 5808557|{2006-03-28 17:10...| 41|
| 5808557|{2006-05-03 17:10...| 40|
| 3241737|{2006-04-27 20:40...| 39|
| 3241737|{2006-04-25 21:20...| 37|
+--------+--------------------+---+
only showing top 15 rows



In [174]:
query1.count()

176

In [192]:
W = df_joined.select("ANONID", df_joined.window.start.cast("string").alias("start"),
                               df_joined.window.end.cast("string").alias("end"),
                               "CLICKURL" )

W = W.select("ANONID",concat(W.start, lit("  "), W.end).alias("Interval"),"CLICKURL")
W.show(truncate=False)

+------+----------------------------------------+--------------------------------+
|ANONID|Interval                                |CLICKURL                        |
+------+----------------------------------------+--------------------------------+
|1268  |2006-03-01 17:30:00  2006-03-01 17:40:00|http://www.blanketsnmore.com    |
|1268  |2006-03-21 17:50:00  2006-03-21 18:00:00|http://www.osteen-schatzberg.com|
|1268  |2006-03-21 17:50:00  2006-03-21 18:00:00|http://www.osteen-schatzberg.com|
|1268  |2006-05-11 02:10:00  2006-05-11 02:20:00|http://www.niddk.nih.gov        |
|1326  |2006-03-21 11:50:00  2006-03-21 12:00:00|http://www.wonderlandtheatre.com|
|1326  |2006-03-29 17:10:00  2006-03-29 17:20:00|http://www.everyboat.com        |
|1326  |2006-04-01 17:50:00  2006-04-01 18:00:00|http://www.imdb.com             |
|142   |2006-03-20 03:50:00  2006-03-20 04:00:00|http://www.westchestergov.com   |
|142   |2006-04-08 01:30:00  2006-04-08 01:40:00|http://www.courts.state.ny.us   |
|142

In [214]:
W.printSchema()

root
 |-- ANONID: string (nullable = true)
 |-- Interval: string (nullable = true)
 |-- CLICKURL: string (nullable = true)



In [234]:
W.createOrReplaceTempView('W')
query = '''
        WITH SOURCE as (
                        SELECT d.*, REPLACE({ITEMS}, ' ', '_') as ITEMS                                                  
                        FROM {data} as d
                        ),

            TOTAL as ( select count(distinct ITEMS) from SOURCE ),          

            SUPPORT_ITEM_SET1 (
                  SELECT *, CNT/ALL_CNT as SUPPORT 
                  FROM(
                      SELECT ITEMS, count(*) as CNT, (select * from TOTAL) as ALL_CNT 
                      FROM SOURCE
                      GROUP BY ITEMS
                      ) as tab1
                  WHERE (CNT/ALL_CNT) > {min_support}
                  ORDER BY SUPPORT DESC
                              )


            select * from SUPPORT_ITEM_SET1
            '''.format(data='W', ITEMS="CLICKURL", min_support=0.01)

      #       SELECT *
      #       FROM SOURCE
      #       WHERE ITEMS in (SELECT ITEMS FROM SUPPORT_ITEM_SET1)

      #  '''.format(data='W', ITEMS="CLICKURL", min_support=0.01)


spark.sql(query).show(10)

+--------------------+-----+-------+--------------------+
|               ITEMS|  CNT|ALL_CNT|             SUPPORT|
+--------------------+-----+-------+--------------------+
|http://www.google...|35991| 377870|    0.09524704263371|
|http://www.myspac...|17220| 377870| 0.04557122820017466|
|http://www.yahoo.com|15212| 377870|0.040257231322941754|
|http://en.wikiped...|12112| 377870| 0.03205335168179533|
|http://www.amazon...|10746| 377870|0.028438351814115966|
| http://www.imdb.com| 9720| 377870|0.025723132294175246|
|http://www.mapque...| 9478| 377870|0.025082700399608332|
| http://www.ebay.com| 6956| 377870|0.018408447349617593|
|http://mail.yahoo...| 6197| 377870|0.016399820043930452|
|http://www.bankof...| 4725| 377870|0.012504300420779633|
+--------------------+-----+-------+--------------------+
only showing top 10 rows



In [159]:
# data_final = df_joined.groupBy(['ANONID','window']).pivot('CLICKURL').count().fillna(value=0)
# data_final.show(10)