# Mining distinct and frequent Items

In this notebook, we consider basic problems in data mining, mining distinct items and frequent items  from data sets or data streams. They may seem trivial problems, but when we want to solve them with large data sets or even worst, with data streams, the traditional exact algorithms we know turn out to be not good options for solving them. Instead, we must focus on faster algorithms that provide **approximate** solutions to these problems.



Preliminary start-up code:

In [1]:
import pyspark
import os
import math
import random

# make sure pyspark tells workers to use python2 not 3 if both are installed\n",
# comment the following line if you want to use the default python interpreter of
# your spark installation, that in new versions is python 3
#os.environ['PYSPARK_PYTHON'] = '/usr/bin/python2'

In [2]:
spark_home = os.environ.get('SPARK_HOME', None)

In [3]:
print ( spark_home )

/usr/share/spark-2.2.1-bin-hadoop2.7/


In [4]:
sc = pyspark.SparkContext('local[*]')

In [5]:
print ( sc)

<SparkContext master=local[*] appName=pyspark-shell>


## 1- Counting the distinct elements from a data set or a data stream 

Given a data set of elements of the same type, what is the complexity of the best algorithm to find the cardinality of the set of distinct elements ? 

Observe that to exactly find all the distinct elements in a data set with N elements, a basic approach is based on maintaining a growing sequence that in the worst case can have the same size as the original data set, and we must compare every element of the data set with the elements of the growing sequence to check if the element is already in the growing sequence. This gives an algorithm with a worst-case time complexity of O(N²) and space complexity of O(N). We can instead think of an approach based on modifying the original sequence (eliminating duplicates of items), but this also gives an O(N²) time complexity algorithm.

When we consider large data sets, as the ones we tipically will store in a spark RDD, if we are only interested on counting the number of distinct elements, and we are happy to get an **approximate value (close to the real one)**, there are approximate counting algorithms for doing this that waste much less memory and much less time, because they do not need to compare each element with each other one. These algorithms are based on the idea of computing *hash signatures* from elements and checking for *unusual* hash signatures. The basic principle is that the more distinct elements we have, the more likely is that we get one unusual hash signature from them. This is the basic idea behind the approximate probabilistic counting algorithm of Flajolet and Martin. We present here the most basic version, based on using only one single hash function. The more accurate variations of this basic algorithm are based on combining the estimates obtained from many different hash functions. 

Observe that in general, for working with data streams, we must in principle consider only **online algorithms**, that is, algorithms that:
1. Process every item only once (when the item is drawn from a data stream this is sometimes a mandatory constraint, as one usually does not have time to store items in memory to later process them several times)
2. Waste a constant amount of time per item in the input stream.

The algorithm we present here for approximate distinct counting satisfies the requerirements of an on-line algorithm: it needs to process only once each element of the input sequence, and the time spent with each element can be considered constant if the maximum possible number of distinct elements cannot increase when we consider larger input sequences (the possible domain of elements is fixed in advance). However, the particular implementation we are presenting here is actually for a data set stored in an RDD, but it can be easily adapted for the case of a data stream.

### The approximate counting algorithm of Flajolet and Martin


We next present the approximate counting algorithm of Flajolet and Martin for an input sequence (or a data set stored in any other data structure). We assume we have available a hash function that maps an input element x from the sequence to a bit string with L bits, where L should be big enough such that the range $ [0;2^L -1 ] $ of values of the hash function should be big enough to can encode all the possible distinct elements of the input sequence. This is the pseudo-code of the algorithm:

```python
for x in L:
    bitmap[index] = 0
for x in inputsequence:
    # Check how much unusual is the element x
    index = gettaillength( hashbitstring(x) ) 
    bitmap[index] = 1
    
R = max( {  index |  bitmap[index] == 1 }  )
Estimate number of distinct elements as 2^R
``` 

This algorithm uses the above mentioned hash function and a function, gettaillength( hash(x) ), that given the bit string obtained with the hash function, computes the position of the least significant '1'-bit in the bitstring. The idea is that the larger this position is, the most unusual we consider that the element x is. This is because if we consider that the range of our hash function is uniformly distributed, then the probability of observing a bit string with 'tail' 1 followed by k zeroes is $ 2^{-(k+1)} $ if the input element x is randomly drawn from all the possible $2^L$ elements (observe that we could consider other 'unusual' tail strings). Taking this information into account, we can compute that the expected number of **distinct elements in the sequence** that will be mapped to such tail is:

$$ \sum_{e \ \in \ distinct \ elements}  1 \cdot 2^{-k-1} $$

So, we need to draw suficiently many distinct elements to observe such long tail *at least once*. That is, we need to have at least $ 2^{k+1}$ distinct elements to have an expected value of "1" for the number of elements that will be mapped to such tail. So, as an **approximate inference**, if we get such tail from an element in the sequence, we deduce that the number of distinct elements is at least $2^{k+1}$.

To get a better understanding about why this algorithm gives a good approximation for the distinct counting algorithm, you can check the original paper: http://algo.inria.fr/flajolet/Publications/FlMa85.pdf

Or read more about the algorithm here: https://en.wikipedia.org/wiki/Flajolet%E2%80%93Martin_algorithm

We present here, step by step, the building blocks of a basic version of this algorithm but for the case where the data set is stored in a RDD (so we can in parallel compute the index associated with every element of the RDD) and where we use very basic hash functions but that allow us to easily explain one of the further improvements of this algorithm: using many hash functions instead of only one.

Let's start by presenting a possible family of hash functions. Not any member of this family satisfies the requeriments for good hash functions, but they are good enough for understanding how this algorithm works.

In [6]:
# hash integer number x 
def hashbitstring( a, b, numbits, x ):
    size = 2**numbits
    val = ((a*x)+b) % size
    return bin(val)

Let's check the binary hash codes obtained with three hash functions from this family  with some  values:

In [7]:
for x in range(10):
    print ( x, " -> ", hashbitstring( 3, 7, 5, x ),  hashbitstring( 2, 1, 5, x ), hashbitstring( 7, 1, 5, x ))

(0, ' -> ', '0b111', '0b1', '0b1')
(1, ' -> ', '0b1010', '0b11', '0b1000')
(2, ' -> ', '0b1101', '0b101', '0b1111')
(3, ' -> ', '0b10000', '0b111', '0b10110')
(4, ' -> ', '0b10011', '0b1001', '0b11101')
(5, ' -> ', '0b10110', '0b1011', '0b100')
(6, ' -> ', '0b11001', '0b1101', '0b1011')
(7, ' -> ', '0b11100', '0b1111', '0b10010')
(8, ' -> ', '0b11111', '0b10001', '0b11001')
(9, ' -> ', '0b10', '0b10011', '0b0')


One basic observation is that the size of the resulting bitstring hash code should be large enough to be able to count the maximum number of distinct elements in the input data set, but if it is much larger than needed, then we can easily get big over-estimations if the estimate is based on a *single* hash function

Remember that the *unusual* feature we want to check in the resulting binary string hash code, is the length of the tail of 0s, where the longer that tail, the more unusual will be. We can compute such length from our binary strings with the following function:

In [8]:
def gettaillength( bitstring ):
   p = -1
   while (bitstring[p] == '0'):
        p -= 1
   return -(p+1)     

In [9]:
# Testing it:
print ( gettaillength( bin(0) ), gettaillength( bin(2) ), gettaillength( bin(4) ), gettaillength( bin(8) ), gettaillength( bin(16) )  )

(1, 1, 2, 3, 4)


With our previous test sequence, we can now compute the taillenght from each hash code:

In [10]:
hash1seq = [ gettaillength(hashbitstring( 3, 7, 5, x )) for x in range(10)]
hash2seq = [ gettaillength(hashbitstring( 2, 1, 5, x )) for x in range(10)]
hash3seq = [ gettaillength(hashbitstring( 7, 1, 5, x )) for x in range(10)]
print (hash1seq)
print (hash2seq)
print (hash3seq)

[0, 1, 0, 4, 0, 1, 0, 2, 0, 1]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[0, 3, 0, 1, 0, 2, 0, 1, 0, 1]


If we take the maximum from each of these sequences, compute the estimate 2^maximum, and order them we get:

In [11]:
maxvalues = sorted([2**max(hash1seq),2**max(hash2seq),2**max(hash3seq)])
print (maxvalues)

[1, 8, 16]


Observe that from the estimations obtained, one underestimates by large (1), other overestimates by large(16), but the *median* value (8) is very close to the real value (10). So, a basic improvement over the basic algorithm based on a single hash function would be to use many different hash functions and get the median value from all the estimates obtained. 

Let's try a different sequence to see if we are so lucky this time:

In [12]:
seq1 = [3,1,4,1,5,9,2,2,2,2,2,9,9,9,9,99,99,99,99,3,3,3,3,3,3,3,3,5,5,5,5,6,6,6,6,1,1,1,1,4,4,4,4,123,123,123,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,23,23,23]

In [13]:
maxvalues = sorted( [ 2**max( [ gettaillength(hashbitstring( 3, 7, 5, x )) for x in seq1 ]),
                      2**max( [ gettaillength(hashbitstring( 2, 1, 5, x )) for x in seq1 ]),
                      2**max( [ gettaillength(hashbitstring( 7, 1, 5, x )) for x in seq1 ]) ] )
print (maxvalues)

[1, 8, 16]


We can implement this algorithm in spark as a sequence of transformations:

1. For each hash function, we first map each number to its binary string hash code, so we get an RDD for each hash function.
2. Then, from each of these RDDs we compute their tail lengths getting new RDDs

3. Finally, we have to get the maximum tail length from the RDD of each hash function (a reduce action). The estimation obtained from each hash function will be $2^{max\_tail\_length}$

Let's do it with our example sequence seq1:

In [14]:
rdd1 = sc.parallelize(seq1)

In [15]:
#
# Check the exact distinct count of the sequence
#
rdd1.distinct().count()

10

Then, for the distributed version of our basic approximate couting algorithm, we  map each number to its tail length (for a particular hash code), and then compute the value $2^{maximun\_tail\_legth}$. We do this once with every hash function.

As we get three different estimates, we can pick the median value of the three different maximun values as a more accurate estimation. Let's get the tail length of their hash codes from our example sequence distributed in three different RDDs and then compute with a reduce action their maximum tail lengths to finally compute the quantity  $2^{maximun\_tail\_legth}$ for each hash function:

In [16]:
#
# Map the RDD with the function that computes the tail length of the hash code of each element
#
rdd1hashed = rdd1.map( lambda x: gettaillength(hashbitstring(3,7,5,x)) )
rdd2hashed = rdd1.map( lambda x: gettaillength(hashbitstring(2,1,5,x)) )
rdd3hashed = rdd1.map( lambda x: gettaillength(hashbitstring(7,1,5,x)) )

#
# Collect back their maximun values to the driver and compute the estimates:
#
maxvalues = [ 2**rdd1hashed.max(), 2**rdd2hashed.max(), 2**rdd3hashed.max()  ]
print ( sorted(maxvalues) )

[1, 8, 16]


The most accurate current variation of this probabilistic counting algorithm is the HyperLogLog counting algorithm (check https://en.wikipedia.org/wiki/HyperLogLog). Spark includes an implementation of that last algorithm, and it is available as a member function of the RDD class. Let's try it with our previous test sequence:

In [17]:
# Execute HyperLogLog probabilistic counting algorithm to approximate distinct number of elements
rdd1.countApproxDistinct()

10L

This function has an optional parameter (relativeSD) that controls the desired accuracy of the probabilistic counting. The default accuracy is 0.05, and we observe that with that accuracy we get the exact count of our test data set. Let's try this algorithm with 10 random sequences of 100000 integer numbers each one in the range [0,1000000]

In [18]:
for test in range(10):
    rddtest = sc.parallelize( [ random.randint(0,1000000) for x in xrange(100000) ] )
    print ( " Exact : ", rddtest.distinct().count(), "approx: ", rddtest.countApproxDistinct() )
    
# MINI-EXERCISE:
#  Use the notebook macro %timeit -n1 -r 1 to compare the running time of the exact counting function
#  with the one of the approximate counting function in the previous test

(' Exact : ', 95055, 'approx: ', 97134L)
(' Exact : ', 95120, 'approx: ', 97767L)
(' Exact : ', 95192, 'approx: ', 96879L)
(' Exact : ', 95242, 'approx: ', 99828L)
(' Exact : ', 95111, 'approx: ', 88874L)
(' Exact : ', 95176, 'approx: ', 93131L)
(' Exact : ', 95146, 'approx: ', 96783L)
(' Exact : ', 95125, 'approx: ', 93585L)
(' Exact : ', 95204, 'approx: ', 101357L)
(' Exact : ', 95203, 'approx: ', 96229L)


As we can see, approximate counting can fail to provide an exact result, but observe that the relative error is not too high. Also, if we consider the case of data streams, that cannot be stored entirely in a RDD before processing all its elements, the approach followed by the approximate counting algorithm, where each element is processed only once and it is not necessary to store it to compare it with the others, is the only one algorithmic approach to be able to approximate the number of distinct elements of the data stream.

## 2- Mining frequent items

A related problem is to determine the set of items from a data set that are most frequent, where by 'most frequent' we mean those that their frequency is bigger than a threshold value $\theta$. Observe that we cannot have more than $1/\theta$ different items with frequency $> \theta$ in a same sequence, so this gives us a general upper bound on how many different elements we should keep in an optimal exact algorithm for mining $\theta$-frequent items. It is worth noticing that depending on the particular value of $\theta$, this upper bound can be tunned a little bit, as we will discuss in some examples. The problem is that if we think about an  **exact on-line algorithm** for this problem (single pass) and constant time per item, it is known that we need memory $ \Omega( n \log (N/n) )  $, where $n$ is the number of different symbols in the input sequence and $N$ is its lenght. 

So, as with the previous problem, we are going to explain an algorithm, that it is implemented in the spark dataframe library, to **approximately count** the number of $\theta$-frequent items. This approximate algorithm can give *false positives*, that is, elements that are not $\theta$-frequent, but never a false negative (any element that is $\theta$-frequent will be found). This approximate algorithm is single pass, and only wastes $O(1/\theta) $ memory (of the order of the maximum number of different elements that can all be $\theta$-frequent). So, its memory consumption is optimal.

This is the pseudo-code of the algorithm of Karp, Shenker and Papadimitriou for approximately mining items with frequency $ > \theta $. You can find the full paper with the analysis of the algorithm in this link: https://www.cs.bgu.ac.il/~dinitz/Course/SS-12/Karp-frequent-el.pdf.

```python
  def MineFrequentItems(Sequence,theta)
      # countDict will be a dictionary that will contain at most (1/theta) 
      # different elements from the sequence
      countDict = {}
      for a in Sequence:
         if (a in countDict.keys()):
            countDict[a] = countDict[a] + 1
         else:
            countDict[a] = 1
            if ( size(countDict.keys()) > 1/theta ):
               # Eliminate occurences of elements as not all of them
               # can be theta-frequent
               for ap in countDict.keys():
                 countDict[ap] = countDict[ap] - 1
                 # eliminate elements with 0 ocurrences
                 # at least a will be eliminated
                 if (countDict[ap] == 0): del(countDict[ap])
      # Return the remaining elements in countDict as the theta-frequent
      # possibly with some false positives
      return countDict.keys()
```

The algorithm, as presented here, is appropriate for a sequential, stream version of it. For the case of working with the distributed data frames of spark, the version used by spark works with many counting dictionaries (one per partition of the RDD), and then combines them with a merge operation. 

Let's check how the algorithm works with some small examples:

- Assume the following sequence with 9 elements, and the value $\theta=1/2$:

```python 
   [ 1, 2, 3, 4, 5, 1, 1, 1, 1]
```
For this particular value of $\theta$, instead of the general upper bound (the set of  $1/2$-frequent elements cannot contain more than 2 elements), we can can use a better one: the set cannot contain more than 1 element. That is, every time we have two elements in the set we eliminate one occurrence of every simbol in the sequence. Working that way, we end the algorithm having only the element 1 with a number of occurrences equal to 3. Observe that the element "1" appears 5 times (> 9 * 1/2), so the final set contains the right answer.

- In the second example, we consider a different value for $\theta$ (1/2.5), and the following sequence with 10 elements:

```python 
   [ 1, 2, 3, 1, 2, 3, 2, 3, 2, 3]
```

In this case, the general upper bound is the tightest one, as we can have 2 elements with frequency 5 (> 1/2.5 * 10) but no 3 such elements. With this upper bound, we end the algorithm having only the elements 2 and 3 with a number of occurrences equal to 2. Observe that such elements have a real frequency of 4, so they do not satisfy having frequency 5 (although they almost satisfy it !!). So, in this case, the final set contains elements that are not really part of the correct answer (actually, there are no elements with frequency $> 4$). 

- In the third example, we consider a value of $\theta$ equal to $1/3$, and the same sequence as in the previous example:

```python 
   [ 1, 2, 3, 1, 2, 3, 2, 3, 2, 3]
```

In this case, the general upper bound can be refined as in the first example: the set of $1/3$-frequent elements cannot contain more than 2 elements. If we consider this more refined upper bound, we end the algorithm having only elements 2 and 3 with a number of occurrences equal to 2. In this case, the answer is correct, as elements 2 and 3 have a real frequency of 4 (that is bigger than  1/3 * 10). 

However, observe that it is safe to use always the general upper bound (in any case, the set cannot contain more than $1/\theta$ elements), for a general value of $\theta$.

Let's test this algorithm working a little bit with a data set obtained from the city of Chicago, that contains crime information from 2006 to 2016 (except murders). The original format is a CSV file, and we will have to convert it to dataframes if we want to execute the frequent items algorithm of the data frames library of spark.

In [19]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)

In [20]:
rddchicagocrimes = sc.textFile("./ChicagoCrimes_2006to2016.csv")

In [21]:
rddchicagocrimes.count()

3773539

In [22]:
rddchicagocrimes.take(5)

[u'ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location',
 u'10479397,HZ218150,04/08/2016 01:30:00 PM,0000X W TERMINAL ST,0484,BATTERY,PRO EMP HANDS NO/MIN INJURY,AIRPORT TERMINAL UPPER LEVEL - SECURE AREA,true,false,1653,016,41,76,08B,1101811,1934419,2016,05/06/2016 03:48:54 PM,41.976762981,-87.900983721,"(41.976762981, -87.900983721)"',
 u'10480217,HZ219687,04/09/2016 11:00:00 AM,011XX S DELANO CT W,0810,THEFT,OVER $500,RESIDENCE,false,false,0123,001,2,32,06,1175106,1895291,2016,05/06/2016 03:48:54 PM,41.868062668,-87.632621013,"(41.868062668, -87.632621013)"',
 u'10484708,HZ222956,04/11/2016 02:49:00 PM,012XX S WABASH AVE,1150,DECEPTIVE PRACTICE,CREDIT CARD FRAUD,OTHER,false,false,0131,001,2,33,11,,,2016,05/06/2016 03:48:54 PM,,,',
 u'3732,HM520797,08/05/2006 03:10:00 AM,014XX N MAYFIELD AVE,0110,HOMICIDE,FIRST DEGREE MURDER,STREET,

In [23]:
headerChicagoCrimes = rddchicagocrimes.first()
headerChicagoCrimes

u'ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location'

In [24]:
fieldsChicagoCrimes = [StructField(field_name, StringType(), True) for field_name in headerChicagoCrimes.split(',')]
fieldsChicagoCrimes

[StructField(ID,StringType,true),
 StructField(Case Number,StringType,true),
 StructField(Date,StringType,true),
 StructField(Block,StringType,true),
 StructField(IUCR,StringType,true),
 StructField(Primary Type,StringType,true),
 StructField(Description,StringType,true),
 StructField(Location Description,StringType,true),
 StructField(Arrest,StringType,true),
 StructField(Domestic,StringType,true),
 StructField(Beat,StringType,true),
 StructField(District,StringType,true),
 StructField(Ward,StringType,true),
 StructField(Community Area,StringType,true),
 StructField(FBI Code,StringType,true),
 StructField(X Coordinate,StringType,true),
 StructField(Y Coordinate,StringType,true),
 StructField(Year,StringType,true),
 StructField(Updated On,StringType,true),
 StructField(Latitude,StringType,true),
 StructField(Longitude,StringType,true),
 StructField(Location,StringType,true)]

There are many fields that are not obviously of string type, but for the *query* we want to execute now we do not need to consider the right data types for every column. Check for example this tutorial page:

  http://www.nodalpoint.com/spark-data-frames-from-csv-files-handling-headers-column-types/
  
if you want to know more about converting CSV to spark data frames with correct data types for each column

NOTE: In spark 2.0 there is a built-in CSV reader, that in principle should make thinks easier. But converting CSV to dataframes is a good data cleaning/processing exercise to try with spark !

In [25]:
schemaChicagoCrimes = StructType(fieldsChicagoCrimes)

We are going to use the csv python module to parse the lines of the csv loaded into the RDD. We are going to map each partition with a single function call, using the mapPartitions function instead of the usual map function. Doing it in this way we gain some efficiency, because only one csv.reader object will be created and used to iterate over all the CSV lines of each partition, instead of creating one CSV reader object for each single CSV line. The following function is the one 

In [26]:
def parseCSVPartition( csvseq ):
    import csv
    reader = csv.reader(csvseq)  # creates the reader object for the iterable of CSV records
    for row in reader:   # iterates the rows of the file in orders
        yield row

In [27]:
chicagocrimesonlydataRDD = rddchicagocrimes.filter( lambda l: not l.startswith('ID,Case Number' ) ).mapPartitions( parseCSVPartition )

In [28]:
chicagocrimesonlydataRDD.take(2)

[['10479397',
  'HZ218150',
  '04/08/2016 01:30:00 PM',
  '0000X W TERMINAL ST',
  '0484',
  'BATTERY',
  'PRO EMP HANDS NO/MIN INJURY',
  'AIRPORT TERMINAL UPPER LEVEL - SECURE AREA',
  'true',
  'false',
  '1653',
  '016',
  '41',
  '76',
  '08B',
  '1101811',
  '1934419',
  '2016',
  '05/06/2016 03:48:54 PM',
  '41.976762981',
  '-87.900983721',
  '(41.976762981, -87.900983721)'],
 ['10480217',
  'HZ219687',
  '04/09/2016 11:00:00 AM',
  '011XX S DELANO CT W',
  '0810',
  'THEFT',
  'OVER $500',
  'RESIDENCE',
  'false',
  'false',
  '0123',
  '001',
  '2',
  '32',
  '06',
  '1175106',
  '1895291',
  '2016',
  '05/06/2016 03:48:54 PM',
  '41.868062668',
  '-87.632621013',
  '(41.868062668, -87.632621013)']]

Once we have the data set without the header line, we can finally convert it to data frames:

In [29]:
chicagocrimesDF = sqlContext.createDataFrame( chicagocrimesonlydataRDD, schemaChicagoCrimes )
chicagocrimesDF.head(2)

[Row(ID=u'10479397', Case Number=u'HZ218150', Date=u'04/08/2016 01:30:00 PM', Block=u'0000X W TERMINAL ST', IUCR=u'0484', Primary Type=u'BATTERY', Description=u'PRO EMP HANDS NO/MIN INJURY', Location Description=u'AIRPORT TERMINAL UPPER LEVEL - SECURE AREA', Arrest=u'true', Domestic=u'false', Beat=u'1653', District=u'016', Ward=u'41', Community Area=u'76', FBI Code=u'08B', X Coordinate=u'1101811', Y Coordinate=u'1934419', Year=u'2016', Updated On=u'05/06/2016 03:48:54 PM', Latitude=u'41.976762981', Longitude=u'-87.900983721', Location=u'(41.976762981, -87.900983721)'),
 Row(ID=u'10480217', Case Number=u'HZ219687', Date=u'04/09/2016 11:00:00 AM', Block=u'011XX S DELANO CT W', IUCR=u'0810', Primary Type=u'THEFT', Description=u'OVER $500', Location Description=u'RESIDENCE', Arrest=u'false', Domestic=u'false', Beat=u'0123', District=u'001', Ward=u'2', Community Area=u'32', FBI Code=u'06', X Coordinate=u'1175106', Y Coordinate=u'1895291', Year=u'2016', Updated On=u'05/06/2016 03:48:54 PM', 

And finally, we can find frequent items for the columns we want. For example, for columns 'Primary Type' and 'Community Area'

In [30]:
freq01rdd = chicagocrimesDF.freqItems(["Primary Type", "Community Area"], 0.1)
freq03rdd = chicagocrimesDF.freqItems(["Primary Type", "Community Area"], 0.3)
freq05rdd = chicagocrimesDF.freqItems(["Primary Type", "Community Area"], 0.5)

In [31]:
print (freq01rdd.collect()[0])
print (freq03rdd.collect()[0])
print (freq05rdd.collect()[0])

Row(Primary Type_freqItems=[u'DECEPTIVE PRACTICE', u'MOTOR VEHICLE THEFT', u'BURGLARY', u'THEFT', u'OTHER OFFENSE', u'NARCOTICS', u'BATTERY', u'CRIMINAL DAMAGE', u'HOMICIDE', u'ASSAULT'], Community Area_freqItems=[u'8', u'14', u'49', u'43', u'16', u'25'])
Row(Primary Type_freqItems=[u'THEFT', u'BATTERY', u'HOMICIDE'], Community Area_freqItems=[u'49', u'31'])
Row(Primary Type_freqItems=[u'BATTERY', u'THEFT'], Community Area_freqItems=[u'27', u'49'])


Observe that results obtained for different threshold frequencies are not necessarily consistent. Why ? Check the actual code of the estimation algorithm, and remember that it is an *approximation* algorithm. 

For more information about transformations and actions with spark dataframes, check for example this blog page at databricks:
  
     https://databricks.com/blog/2015/06/02/statistical-and-mathematical-functions-with-dataframes-in-spark.html
     

### Exercises

Find the distinct elements and most frequent elements (with frequencies $> 0.15$, $> 0.30$ and $> 0.50$) for the attribute column 'EventCode' from the GDELT data set GDELT10/1979.csv

> Check the web page http://www.gdeltproject.org/ if you want to know more about the GDELT project, or
> here: https://en.wikipedia.org/wiki/Conflict_and_Mediation_Event_Observations if you want to know more 
> about the encoding of information in the event data sets. The concrete data set (1979.csv), can be downloaded from the URL: http://data.gdeltproject.org/events/1979.zip

In [32]:
# GDEL1.0 files separate columns with tabulators
def parseGDELT10line( line ):
    return line.split("\t")

hlinefile = open( "CSV.header.historical.txt")
GDELT10headerline = hlinefile.readline().rstrip().split("\t")
hlinefile.close()
print ( GDELT10headerline, "\n")
# Get the column index for the 'EventCode' column
eventcodecol = GDELT10headerline.index( 'EventCode' )


(['GLOBALEVENTID', 'SQLDATE', 'MonthYear', 'Year', 'FractionDate', 'Actor1Code', 'Actor1Name', 'Actor1CountryCode', 'Actor1KnownGroupCode', 'Actor1EthnicCode', 'Actor1Religion1Code', 'Actor1Religion2Code', 'Actor1Type1Code', 'Actor1Type2Code', 'Actor1Type3Code', 'Actor2Code', 'Actor2Name', 'Actor2CountryCode', 'Actor2KnownGroupCode', 'Actor2EthnicCode', 'Actor2Religion1Code', 'Actor2Religion2Code', 'Actor2Type1Code', 'Actor2Type2Code', 'Actor2Type3Code', 'IsRootEvent', 'EventCode', 'EventBaseCode', 'EventRootCode', 'QuadClass', 'GoldsteinScale', 'NumMentions', 'NumSources', 'NumArticles', 'AvgTone', 'Actor1Geo_Type', 'Actor1Geo_FullName', 'Actor1Geo_CountryCode', 'Actor1Geo_ADM1Code', 'Actor1Geo_Lat', 'Actor1Geo_Long', 'Actor1Geo_FeatureID', 'Actor2Geo_Type', 'Actor2Geo_FullName', 'Actor2Geo_CountryCode', 'Actor2Geo_ADM1Code', 'Actor2Geo_Lat', 'Actor2Geo_Long', 'Actor2Geo_FeatureID', 'ActionGeo_Type', 'ActionGeo_FullName', 'ActionGeo_CountryCode', 'ActionGeo_ADM1Code', 'ActionGeo_Lat',

In [None]:
#
#  PUT YOUR SOLUTION CODE IN THIS CELL AND IN THE FOLLOWING ONES
#    Use the function parseGDEL10line to transform each line (GDELT record) to a list of strings

# Setup a RDD with the CSV file GDELT10/1979.csv loading each line as a python list with the fields of the line
rddGDELT1979 = 

# filter the eventcode column to a new RDD with only that column
rddEventCodes = 


In [None]:
# Show total number of records and number of different event codes first with the exact algorithm
# Compare their runing times with the 
# notebook macro %timeit -n1 -r 1  pythonsentence
print " Total : "
%timeit -n 1 -r 1 print " Different event codes : "


In [None]:
# Next, show the approximate number of distinc event codes obtained with the
# approximation algorithm:

%timeit -n 1 -r 1 


In [None]:
# Next create the dataframes version of the RDD (all with StringType) for using the freqItems() function
fieldsGDELT10 = 
schemaGDELT10 = 
# Create the DF version of  rddGDELT1979  with the schema information:
GDELT10_1979_DF = 


In [None]:
# Call the freqItems function over the GDELT10_1979_DF dataframes RDD with freq > 0.15, > 0.3 and > 0.5
gdeltfreq015rdd = 
gdeltfreq030rdd = 
gdeltfreq050rdd = 


In [None]:
# Show the resulting RDDs with frequent items:
print 
print 
print 
