## Big Data - Ex2 

Kfir Avlas 
060519071

## Association rule learning

Association rule learning is a rule-based machine learning method for discovering interesting relations between variables in large databases.
It is intended to identify strong rules discovered in databases using some measures of interestingness.


My algorithm :

Step1: - Read data file
       - Clean and arrange data
      
Step2: - Arrange the data as key-value pairs with key = user Id, value = query
       - Remove duplicates ( same query of the same user)

       [(user_id_1 , query_1) , (user_id_1 , query_2), ..., (user_id_n, query_m) ]

Step3: - Save total transactions number

Step4: - Invert the data, this is needed in order to save query repetities for calculating the support value
         Save queries as a key in order to check how many users search for it
       - Arrange it as key-value pairs with key = query, value = user id
       
       [( query_1 , user_id_1) , (query_2 , user_id_1), ..., (query_m , user_id_n) ]

Step5: - Save item (query) counts as an Hashmap, in order to use it later for calculating the support value

Step6: - Set minimum support parameter and alculates the minimum query count 
       - Remove rare quries accoring the minimum support value  

Step7: - Reduce data , arrange it as a key-value pairs with key = user_id, value = list of user queries

        [( user_id_1 , [query_1 , query2 ,..] ) , ( user_id_2 , [query_3 , query4 ,..] ) , .... ] 
        
Step8: - Map all possible query pair combinations 
       - Use flat map in order to return more than one value for each data line
       - Foreach pair save its repetities number 
       
       [ ((query1 , query2), count1) , ((query1 , query3), count2), ... , ) ]
      
Step9: - For given confidence value, filter quries in order to discovering interesting relations

Step10: - Save results in  a file
        

In [1]:
# Read data file as csv

def read_file(p_path: str):
    return spark.read.csv(p_path, sep = "\t", inferSchema=True, header=True)

In [2]:
# Read data

path = "/home/spark-vm/PycharmProjects/BigDataCource/user-ct-test-collection-01.txt"
main_data = read_file(path)

In [3]:
main_data

DataFrame[AnonID: int, Query: string, QueryTime: timestamp, ItemRank: int, ClickURL: string]

In [4]:
main_data.take(5)

[Row(AnonID=142, Query='rentdirect.com', QueryTime=datetime.datetime(2006, 3, 1, 7, 17, 12), ItemRank=None, ClickURL=None),
 Row(AnonID=142, Query='www.prescriptionfortime.com', QueryTime=datetime.datetime(2006, 3, 12, 12, 31, 6), ItemRank=None, ClickURL=None),
 Row(AnonID=142, Query='staple.com', QueryTime=datetime.datetime(2006, 3, 17, 21, 19, 29), ItemRank=None, ClickURL=None),
 Row(AnonID=142, Query='staple.com', QueryTime=datetime.datetime(2006, 3, 17, 21, 19, 45), ItemRank=None, ClickURL=None),
 Row(AnonID=142, Query='www.newyorklawyersite.com', QueryTime=datetime.datetime(2006, 3, 18, 8, 2, 58), ItemRank=None, ClickURL=None)]

In [5]:
# Check if query is meaningful

def is_valid_query(t):
    query = str(t["Query"])
    
    if (query == "-" or query == ""):
        return False
    else:
        return True

In [6]:
# Clean data

# We will use filter transformation before map in order to improve efficiency 
# because map is pass on the data line by line

# Filter empty queries 

main_data = main_data.rdd.filter(is_valid_query)

In [7]:
# Use userId and query columns, remove duplicate queries for the same userId

trx_item = main_data.map(lambda x: (int(x[0]), str(x[1]))).distinct()

In [8]:
trx_item

PythonRDD[23] at RDD at PythonRDD.scala:53

In [9]:
trx_item.take(20)

[(142, 'rentdirect.com'),
 (142, 'staple.com'),
 (142, 'www.newyorklawyersite.com'),
 (142, '207 ad2d 530'),
 (142, 'frankmellace.com'),
 (142, 'ucs.ljx.com'),
 (142, 'attornyleslie.com'),
 (142, 'merit release appearance'),
 (142, 'www.bonsai.wbff.org'),
 (142, 'loislaw.com'),
 (217, 'lottery'),
 (217, 'susheme'),
 (217, 'united.com'),
 (217, 'mizuno.com'),
 (217, 'buddylis'),
 (217, 'bestasiancompany.com'),
 (217, 'weather.com'),
 (217, 'vietnam'),
 (993, 'googl'),
 (1268, 'sstack.com')]

In [10]:
# Save total transactions number

trx_counts = trx_item.countByKey()

In [11]:
total_trx = len(trx_counts)

In [12]:
total_trx

64942

In [13]:
# Invert the key-value

# Save queries (items) as a key in order to check how many users (transactions) search for it

items = trx_item.map(lambda x: (x[1],x[0]))

In [14]:
items

PythonRDD[26] at RDD at PythonRDD.scala:53

In [15]:
items.take(20)

[('rentdirect.com', 142),
 ('staple.com', 142),
 ('www.newyorklawyersite.com', 142),
 ('207 ad2d 530', 142),
 ('frankmellace.com', 142),
 ('ucs.ljx.com', 142),
 ('attornyleslie.com', 142),
 ('merit release appearance', 142),
 ('www.bonsai.wbff.org', 142),
 ('loislaw.com', 142),
 ('lottery', 217),
 ('susheme', 217),
 ('united.com', 217),
 ('mizuno.com', 217),
 ('buddylis', 217),
 ('bestasiancompany.com', 217),
 ('weather.com', 217),
 ('vietnam', 217),
 ('googl', 993),
 ('sstack.com', 1268)]

In [16]:
# Save item counts as a Hashmap using countByKey action

item_counts = items.countByKey()

In [17]:
item_counts

defaultdict(int,
            {'rentdirect.com': 3,
             'staple.com': 6,
             'www.newyorklawyersite.com': 1,
             '207 ad2d 530': 1,
             'frankmellace.com': 1,
             'ucs.ljx.com': 1,
             'attornyleslie.com': 1,
             'merit release appearance': 1,
             'www.bonsai.wbff.org': 1,
             'loislaw.com': 2,
             'lottery': 127,
             'susheme': 1,
             'united.com': 44,
             'mizuno.com': 6,
             'buddylis': 1,
             'bestasiancompany.com': 2,
             'weather.com': 318,
             'vietnam': 14,
             'googl': 95,
             'sstack.com': 2,
             'www.victoriacostumiere.com': 1,
             'osteen-schatzberg.com': 1,
             'www.buckmountianestates.com': 1,
             'idx.techsolsc.com': 1,
             'www.bridleandbit.com': 1,
             'http www.flickr.com photos 88145967 n00 24368586 in pool-32148876 n00': 1,
             'href a h

In [18]:
item_counts["yahoo.com"]

2726

In [19]:
# Lets filter out rare items using minimum support parameter

# When trying to set minimum support of 0.0005 or 0.0001 I did'nt get any rule with confidence bigger than 0.4

# Iv'e tried some values and set minimum support value according 

# I set the support value in a way that only items appeared more than 3 times will be taken in considaration 

# This will prevent results of 100% confidence for a single query pair which is actually say nothing  

In [20]:
min_support = 0.000045

min_count = min_support * total_trx

In [21]:
min_count

2.92239

In [22]:
#  => In order to reach support of minumum 0.000045, query should by searched by at least 3 different users

In [23]:
# Check query frequency in the data set

def is_frequent_item(t):
    trx = t[0]
    item = t[1]
    frequency = item_counts[item]
    
    if frequency > min_count:
        return True
    else:
        return False

In [24]:
trx_freq_item = trx_item.filter(is_frequent_item)

In [25]:
trx_freq_item

PythonRDD[29] at RDD at PythonRDD.scala:53

In [26]:
trx_freq_item.take(20)

[(142, 'rentdirect.com'),
 (142, 'staple.com'),
 (217, 'lottery'),
 (217, 'united.com'),
 (217, 'mizuno.com'),
 (217, 'weather.com'),
 (217, 'vietnam'),
 (993, 'googl'),
 (1268, 'adbuyer3.lycos.com'),
 (1326, 'files'),
 (1337, 'auto locator'),
 (1337, 'ford'),
 (1410, 'google'),
 (2005, 'myspace.ocm'),
 (2005, 'glitter graphics.com'),
 (2005, 'profileedit.myspace.com'),
 (2178, 'weatherchannel'),
 (2178, 'zip codes'),
 (2178, 'dana reeves dies'),
 (2178, 'harry')]

In [27]:
trx_freq_item = trx_freq_item.map(lambda x: (x[0], [x[1]]))

In [28]:
trx_freq_item.take(20)

[(142, ['rentdirect.com']),
 (142, ['staple.com']),
 (217, ['lottery']),
 (217, ['united.com']),
 (217, ['mizuno.com']),
 (217, ['weather.com']),
 (217, ['vietnam']),
 (993, ['googl']),
 (1268, ['adbuyer3.lycos.com']),
 (1326, ['files']),
 (1337, ['auto locator']),
 (1337, ['ford']),
 (1410, ['google']),
 (2005, ['myspace.ocm']),
 (2005, ['glitter graphics.com']),
 (2005, ['profileedit.myspace.com']),
 (2178, ['weatherchannel']),
 (2178, ['zip codes']),
 (2178, ['dana reeves dies']),
 (2178, ['harry'])]

In [29]:
trx_freq_item = trx_freq_item.reduceByKey(lambda x,y: x+y)

In [30]:
# Now trx_item is holding a structure of key - value where:

# Key = user Id (transaction)
# Value = a list of user queries 
    
# It is guaranteed that any query is searched by users at least min_count times and has a support of min_supprt 


In [31]:
trx_freq_item.take(20)

[(142, ['rentdirect.com', 'staple.com', 'space.comhttp', 'whitepages.com']),
 (1268, ['adbuyer3.lycos.com', 'gall stones', 'gallstones']),
 (1326,
  ['files',
   'dellcomputers',
   'budget truck rental',
   'jet blue airlines',
   'verizon wireless']),
 (1410,
  ['google', 'mycl.cravelyrics.com', 'www.toledo11.com', 'www.mapquest']),
 (2178,
  ['weatherchannel',
   'zip codes',
   'dana reeves dies',
   'harry',
   'kbb',
   'remax',
   'google',
   'sprint.com',
   'msn.com',
   'braxton hicks contractions',
   'roth ira',
   'babycenter.com',
   'baby center',
   'yahoo.com',
   'baby names',
   'mortgage calculator',
   'harry and david',
   'army.mil',
   'fidelity.com',
   'people search',
   'pergola',
   'philadelphia art museum',
   'babiesrus',
   'ebay.com',
   'once upon a child',
   'walmart',
   'sears',
   'target',
   'us zip codes']),
 (2334,
  ['jessemccartney',
   'c',
   '.com',
   'jojo',
   'jesse mccartney',
   'disneychannel.com',
   'new mexico',
   'kol.com'])

In [32]:
#The second part will be searching all possible item pairs in order to find interesting rules

# We will use a minumum confidence parameter

In [33]:
# For each transaction line we will build an array of all posible item pairs of the specific user

# We will set this pair as a key and set the value to be 1 in order to find pair frequency 

def combinations(t):
    queries = t[1]
    arr = []
    i = 0
    j = 0
    ln = len(queries)

    while i < ln:
        q1 = queries[i]
        j = i + 1
        while j < ln:
            q2 = queries[j]
            pair = (q1,q2)
            arr.append((pair, 1))
            j += 1
        i += 1
    
    return arr

In [34]:
# Use flat map in order to return more than one value (possible pairs) for each data line (user and list of his queries)

item_pairs = trx_freq_item.flatMap(combinations)

In [35]:
item_pairs.take(20)

[(('rentdirect.com', 'staple.com'), 1),
 (('rentdirect.com', 'space.comhttp'), 1),
 (('rentdirect.com', 'whitepages.com'), 1),
 (('staple.com', 'space.comhttp'), 1),
 (('staple.com', 'whitepages.com'), 1),
 (('space.comhttp', 'whitepages.com'), 1),
 (('adbuyer3.lycos.com', 'gall stones'), 1),
 (('adbuyer3.lycos.com', 'gallstones'), 1),
 (('gall stones', 'gallstones'), 1),
 (('files', 'dellcomputers'), 1),
 (('files', 'budget truck rental'), 1),
 (('files', 'jet blue airlines'), 1),
 (('files', 'verizon wireless'), 1),
 (('dellcomputers', 'budget truck rental'), 1),
 (('dellcomputers', 'jet blue airlines'), 1),
 (('dellcomputers', 'verizon wireless'), 1),
 (('budget truck rental', 'jet blue airlines'), 1),
 (('budget truck rental', 'verizon wireless'), 1),
 (('jet blue airlines', 'verizon wireless'), 1),
 (('google', 'mycl.cravelyrics.com'), 1)]

In [36]:
# Find query pair frequency by reduceByKey transformation

item_pairs = item_pairs.reduceByKey(lambda x,y: x+y)

In [37]:
item_pairs.take(20)

[(('rentdirect.com', 'staple.com'), 1),
 (('space.comhttp', 'whitepages.com'), 1),
 (('gall stones', 'gallstones'), 2),
 (('dellcomputers', 'budget truck rental'), 1),
 (('dellcomputers', 'jet blue airlines'), 1),
 (('dellcomputers', 'verizon wireless'), 1),
 (('budget truck rental', 'jet blue airlines'), 1),
 (('budget truck rental', 'verizon wireless'), 1),
 (('jet blue airlines', 'verizon wireless'), 2),
 (('mycl.cravelyrics.com', 'www.toledo11.com'), 1),
 (('mycl.cravelyrics.com', 'www.mapquest'), 2),
 (('www.toledo11.com', 'www.mapquest'), 1),
 (('weatherchannel', 'zip codes'), 1),
 (('weatherchannel', 'dana reeves dies'), 1),
 (('weatherchannel', 'harry'), 1),
 (('weatherchannel', 'kbb'), 1),
 (('weatherchannel', 'remax'), 1),
 (('weatherchannel', 'google'), 6),
 (('weatherchannel', 'sprint.com'), 1),
 (('weatherchannel', 'msn.com'), 1)]

In [38]:
# Filter by confidecne calculation 

# It is guaranteed that any query is searched by users at least min_count times and has a support of min_supprt 

# The formula for confidence is:

#  Sup(X) = X in transactions / total_transactions

#  Conf(X => Y) = Sup(X and Y) / Sup(X)

# ==>

# Conf (X => Y) = (X and Y in transactions / total_transactions) / (X in transactions / total_transactions)

# ==>

# Conf (X => Y) = (X and Y in transactions) / (X in transactions) 


def confidence(t):
    pair = t[0]
    q1 = pair[0]
    q2 = pair[1]
                
    support_q1_q2 = t[1] 
        
    support_q1 = item_counts[q1] 
    
    confidence_q1_q2 =  support_q1_q2 / support_q1
        
    key = pair
    val = confidence_q1_q2
        
    return (key,val)

In [39]:
item_pairs_confidence = item_pairs.map(confidence)

In [40]:
item_pairs_confidence.take(20)

[(('rentdirect.com', 'staple.com'), 0.3333333333333333),
 (('space.comhttp', 'whitepages.com'), 0.04),
 (('gall stones', 'gallstones'), 0.2222222222222222),
 (('dellcomputers', 'budget truck rental'), 0.25),
 (('dellcomputers', 'jet blue airlines'), 0.25),
 (('dellcomputers', 'verizon wireless'), 0.25),
 (('budget truck rental', 'jet blue airlines'), 0.16666666666666666),
 (('budget truck rental', 'verizon wireless'), 0.16666666666666666),
 (('jet blue airlines', 'verizon wireless'), 0.05),
 (('mycl.cravelyrics.com', 'www.toledo11.com'), 0.0021645021645021645),
 (('mycl.cravelyrics.com', 'www.mapquest'), 0.004329004329004329),
 (('www.toledo11.com', 'www.mapquest'), 0.3333333333333333),
 (('weatherchannel', 'zip codes'), 0.034482758620689655),
 (('weatherchannel', 'dana reeves dies'), 0.034482758620689655),
 (('weatherchannel', 'harry'), 0.034482758620689655),
 (('weatherchannel', 'kbb'), 0.034482758620689655),
 (('weatherchannel', 'remax'), 0.034482758620689655),
 (('weatherchannel', 

In [41]:
def format_output(t):
    pair = t[0]
    conf = str(t[1])
    q1 = str(pair[0])
    q2 = str(pair[1])
    
    return q1 + " => " + q2 + " with confidence " + conf

In [46]:
def results(confidence_low: float, confidence_high: float, path: str):
    
    res = item_pairs_confidence.filter(lambda x: x[1] >= confidence_low and x[1] < confidence_high)
    
    cnt = res.count()
    
    res = res.map(format_output)
    
    res.saveAsTextFile(path)
    
    return cnt

In [43]:
def print_amounts(conf, cnt):
    print("With confidence = " + conf + ", amount related queries = " + str(cnt))

In [47]:
path = "/home/spark-vm/PycharmProjects/BigDataCource/Results/"

file_name = "rules_confidence"

cnt_09 = results(confidence_low = 0.9, confidence_high = 1.1, path = path + file_name + "_09")

print_amounts("0.9", cnt_09)

cnt_08 = results(confidence_low = 0.8, confidence_high = 0.9, path = path + file_name + "_08")

print_amounts("0.8", cnt_08)

cnt_06 = results(confidence_low = 0.6, confidence_high = 0.8, path = path + file_name + "_06")

print_amounts("0.6", cnt_06)

With confidence = 0.9, amount related queries = 128
With confidence = 0.8, amount related queries = 11
With confidence = 0.6, amount related queries = 3683


# Results analysis:

I've set a minimum support value in otder to prevent results of 100% confidence for a single query pair which is actually say nothing


With support of 0.000045 and confidence between 0.9 and 1.0 , 128 related queries returned. 

With support of 0.000045 and confidence between 0.8 and 0.9 , 11 related queries returned. 

With support of 0.000045 and confidence between 0.6 and 0.8 , 3683 related queries returned. 

1. The algorithm turned out to be usfull for spelling mistakes: 

 - american idal => american idol with confidence 1.0
 - circitcity => circuit city with confidence 1.0
 - handa => honda with confidence 1.0
 - hyundi => hyundai with confidence 1.0
 - waether => weather with confidence 1.0
 - schawns => schwans with confidence 1.0
 - diconary => dictionary with confidence 0.8571428571428571
 - resturant => restaurant with confidence 0.6666666666666666
 - southwest ailines => southwest airlines with confidence 0.6666666666666666
 - maos => maps with confidence 1.0
 - faceboo => facebook with confidence 1.0


2. Services offers similar to Google promotions:

 - star motorcycles => harley davidson motorcycles with confidence 0.6666666666666666
 - sony computers => ebay with confidence 1.0
 - windermere => mapquest with confidence 1.0 
   (windermere is a lake in England and mapquest is a driving maps website )


3. Related topices:

 - destiny child => usher with confidence 1.0
   (both of them are singers)

 - bowel movements => colon cancer with confidence 0.6666666666666666

 - the bold and the beautiful => american idol with confidence 0.8
   (both of them are television shows)