# Deadline + Late Penalty

**Note :** It will take you quite some time to complete this project, therefore, we earnestly recommend that you start working as early as possible.


* Submission deadline for the Project is **20:59:59 on 18th Jul, 2020** (Sydney Time).
* **LATE PENALTY: 10% on day-1 and 30% on each subsequent day.**

# Instructions
1. This note book contains instructions for **COMP9313 Project 1**.

* You are required to complete your implementation in the file `submission.py` provided along with this notebook.

* You are not allowed to print out unnecessary stuff. We will not consider any output printed out on the screen. All results should be returned in appropriate data structures via corresponding functions.

* You are required to submit the following files, via CSE `give`: 
    - (i)`submission.py`(your code), 
    - (ii)`report.pdf` (illustrating your implementation details)
    - **Note:** detailed submission instructions will be announced later.

* We provide you with detailed instructions of the project in this notebook. In case of any problem, you can post your query @Piazza. Please do not post questions regarding the implementation details.

* You are allowed to add other functions and/or import modules (you may have to for this project), but you are not allowed to define global variables. **All the functions should be implemented in `submission.py`**. 

* In this project, you may need to **CREATE YOUR OWN TEST CASES** in order to evaluate the correctness, while at the same time improving the efficiency of your implementation. **DO NOT COMPLETELY RELY ON THE TOY EXAMPLE IN THE SPEC!**
  * In order to create your own test cases, you are expected to use real datasets or randomly generated data, and generate hash functions by yourself.

* The testing environment is the same as that of `Lab1`. **Note:** Importing other modules (not a part of the Lab1 test environment) may lead to errors, which will result in **ZERO score for the ENTIRE Project**.


* After completing the project, the students are **ENCOURAGED** to attempt for **BONUS** part. Detailed instructions for the **BONUS** part are given in the later part of this notebook.

# Task1: C2LSH (90 points)

In this question, you will implement the C2LSH algorithm in Pyspark. Specifically, you are required to write a method `c2lsh()` in the file `submission.py` that takes the following four arguments as input:

1. **data_hashes**: is a rdd where each element (i.e., key,value pairs) in this rdd corresponds to (id, data_hash). `id` is an integer and `data_hash` is a python list that contains $m$ integers (i.e., hash values of the data point).
* **query_hashes** is a python list that contains $m$ integers (i.e., hash values of the query).
* **alpha_m** is an integer which indicates the minimum number of collide hash values between data and query (i.e., $\alpha m$).
* **beta_n** is an integer which indicates the minimum number of candidates to be returned (i.e., $\beta n$).

**Note:**
1. You don't need to implement hash functions and generate hashed data, we will provide the data hashes for you.
2. Please follow **the description of the algorithm provided in the lecture notes**, which is slightly different to the original C2LSH paper. 
3. While one of the main purposes of this project is to use spark to solve the problems. Therefore, it is meaningless to circumvent pyspark and do it in other ways (e.g., collect the data and implement the algorithm without transformations etc.). Any such attempt will be considered as a invalid implementation, hence will be assigned **ZERO** score. Specifically, you are not allowed to use the following PySpark functions:
  * `aggregate`, `treeAggregate`，`aggregateByKey`
  * `collect`, `collectAsMap`
  * `countByKey`， `countByValue`
  * `foreach`
  * `reduce`, `treeReduce`
  * `saveAs*` (e.g. `saveAsTextFile`)
  * `take*` (e.g. `take`, `takeOrdered`)
  * `top`
  * `fold`

## Return Format

The `c2lsh()` method returns a `rdd` which contains a sequence of candidate id's.

**Notice: The order of the elements in the list does not matter (e.g., we will collect the elements and evaluate them as a set).**

## Evaluation

Your implementation will be tested using 3 different test cases. We will be evaluating based on the following factors:
* the correctness of implemented `c2lsh()`. The output will be compared with the result from the correct implementation. Any difference will be considered as incorrect.
* the efficiency of your implmentation. We will calculate the running time of `c2lsh()` in each testcase (denoted as $T$).

For each testcase (worth 30 points), the following marking criteria will be used:
* **Case 1, 0 points**: the returned `rdd` is incorrect, or $T > T_1$
* **Case 2, 10 points**: the returned `rdd` is correct, and $T_1 \geq T > T_2$,
* **Case 3, 20 points**: the returned `rdd` is correct, and $T_2 \geq T > T_3$,
* **Case 4, 30 points**: the returned `rdd` is correct, and $T_3 \geq T$.

Where $T_1 > T_2 > T_3$ depend on the testing environment and the test cases.

# Task 2: Report (10 points)
You are also required to submit your project report, named: `report.pdf`. Specifically, in the report, you are at least expected to answer the following questions:
1. Implementation details of your `c2lsh()`. Explain how your major transform function works.
2. Show the evaluation result of your implementation using **your own test cases**.
3. What did you do to improve the efficiency of your implementation?

# Bonus

In order to encourage the students to come up with efficient implementations, we allow bonus part of the project with a maximum of 20 points. Rules for the **BONUS** part are as under:
 * Prerequisites:
   1. You must have obtained **90 points** for the implementation part. 
   2. The **total running time** of your implementation for the three test cases is among the top-50 smallest running times of the class. 
 * All the submissions, satisfying the above-mentioned conditions will be tested against a more challenging dataset. Top-20 most-efficient and correct implementations will be awarded the bonus scores.
 * We will rank the top-20 implementations in an increasing order w.r.t the running time. We will award 20 points to the most efficient implementation (i.e., the one with smallest running time), 19 points to the 2nd most efficient one, and so on. The implementation ranked 20-th on the list will get 1 bonus point.

# How to execute your implementation (EXAMPLE)

In [None]:
from pyspark import SparkContext, SparkConf
from time import time
import pickle
import test

def createSC():
    conf = SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName("C2LSH")
    sc = SparkContext(conf = conf)
    return sc

with open("toy/toy_hashed_data", "rb") as file:
    data = pickle.load(file)
with open("toy/toy_hashed_query", "rb") as file:
    query_hashes = pickle.load(file)

#query_hashes=query8
alpha_m  = 10
beta_n = 9

def collision_count(a, b, offset):
    counter = 0
    for i in range(len(a)):
        if abs(a[i]-b[i]) <= offset:
            counter += 1
    return counter
def c2lsh(data_hashes, query_hashes, alpha_m, beta_n):
    offset = 0
    cand_num = 0
    while cand_num < beta_n :
        candidates = data_hashes.flatMap(lambda x :[x[0]] if collision_count(x[1], query_hashes, offset)>=alpha_m else [])
        cand_num = candidates.count()
        offset += 1
    return candidates



sc = createSC()
data_hashes = sc.parallelize([(index, x) for index, x in enumerate(data)])
start_time = time()
res = c2lsh(data_hashes, query_hashes, alpha_m, beta_n).collect()
end_time = time()
sc.stop()

print('running time:', end_time - start_time)
print('Number of candidate: ', len(res))
print('set of candidate: ', set(res))

running time: 100.17657828330994
Number of candidate:  10
set of candidate:  {0, 70, 40, 10, 80, 50, 20, 90, 60, 30}


# Project Submission and Feedback


For the project submission, you are required to submit the following files:

1. Your implementation in the python file `submission.py`.
2. The report `report.pdf`.

Detailed instruction about using `give` to submit the project files will be announced later via Piazza.

In [None]:
from pyspark import SparkContext, SparkConf
from time import time
import pickle
import submission

def createSC():
    conf = SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName("C2LSH")
    sc = SparkContext(conf = conf)
    return sc
sc = createSC()
x=[(1,2),(2,5),(3,4)]
y=sc.parallelize(x)
#z=y.map(lambda j : (1))
#print(z.collect())
#print(z.sum())
df=y.toDF()
print(df)


In [1]:
print("helloooo")

helloooo


In [None]:
import random

def generate( dimension, count, seed, start=-1000, end=1000):
    random.seed(seed)

    data = [
        [
            random.randint(start, end)
            for _ in range(dimension)
        ]
        for i in range(count)
    ]

    query = [ random.randint(start, end) for _ in range(dimension) ]
    
    return data, query

def generate2( dimension, count, seed, start=0, end=100):
    data = [
        [n] * dimension
        for n in range(start, end)
        for i in range(count)
    ]

    query = [ seed ] * dimension
    
    return data, query

def generate3(dimension, count, seed, start=0, end=100):
    data = [
        [k + j for j in range(dimension)]
        for k in range(start, end)
        for i in range(count)
    ]

    query = [ seed ] * dimension
    
    return data, query

In [None]:
data2, query2 = generate(10, 20000, 0, 0, 1000)


In [None]:
print(data2)

In [None]:
print(query2)

In [None]:
def count(j,hashes,offset):
    d=[a_i - b_i for a_i, b_i in zip(j, hashes)]
    if offset in d
c=[]
for i in data2:
    c.append(count(i,query2,0))
print(c)
    

In [None]:
[a_i - b_i for a_i, b_i in zip(a, b)]

In [None]:
def count1(j,hashes,offset):
    result = map(lambda x, y: abs(x - y)<=offset, j, hashes)
    return sum(result)

In [None]:
a=[864, 394, 776, 911, 430, 41, 265, 988, 523, 497]
b=[293, 94, 674, 113, 852, 220, 503, 12, 458, 613]
one_time = time()
c=count1(a,b,0)
two_time = time()
print(c)
print(two_time-one_time)
one3_time = time()
d=count2(a,b,0)
one4_time = time()
print(one4_time-one3_time)
print(d)
one5_time = time()
e=count3(a,b,0)
one6_time = time()
print(one6_time-one5_time)

In [None]:
from itertools import *
def count2(j,hashes,offset):
    result = imap(lambda x, y: abs(x - y)<=offset, j, hashes)
    return sum(result)

In [None]:
def count3(j,hashes,offset):
    counter=0
    for i in range(len(j)):
        if abs(j[i]-hashes[i]) <= offset:
            counter +=1
    return counter

In [None]:
#alpha_m, beta_n = 10, 500
data6, query6 = generate( 13, 2_000, 100, -230_000, 50_000)

In [None]:
data7, query7 = generate( 15, 70_000, 140, -500_000, 500_000)

In [None]:
data8, query8 = generate2( 13, 9, 100, 0, 120)

In [None]:
from pyspark import SparkContext, SparkConf
from time import time
import pickle
import submission

 

def createSC():
    conf = SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName("C2LSH")
    sc = SparkContext(conf = conf)
    return sc

sc = createSC()
raw_data = [("Joseph", "Maths", 83), ("Joseph", "Physics", 74),
("Joseph", "Chemistry", 91), ("Joseph", "Biology", 82),
("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62),
("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80),
("Tina", "Maths", 78), ("Tina", "Physics", 73),
("Tina", "Chemistry", 68), ("Tina", "Biology", 87),
("Thomas", "Maths", 87), ("Thomas", "Physics", 93),
("Thomas", "Chemistry", 91), ("Thomas", "Biology", 74)]

student_rdd = sc.parallelize(raw_data)
rdd_1 = sc.parallelize(raw_data)
#start_time = time()
rdd_2 = rdd_1.map(lambda x:(x[0], x[2]))
rdd_3 = rdd_2.reduceByKey(lambda x, y:max(x, y))
rdd_4 = rdd_2.reduceByKey(lambda x, y:min(x, y))
rdd_5 = rdd_3.join(rdd_4)
rdd_6 = rdd_5.map(lambda x: (x[0], x[1][0]+x[1][1]))
#end_time = time()
#comb_rdd1 = student_rdd.map(lambda t: (t[0], (t[1], t[2]))) \
                    #.combineByKey(createCombiner, mergeValue, mergeCombiner).map(lambda t: (t[0],sum(t[1])))
#rdd_2 = rdd_1.map(lambda t: (t[0], (t[1], t[2]))).combineByKey(lambda t:(t[1],t[1]), lambda t,y:(max(t[0],y[1]),min(t[1],y[1])),lambda t1,t2:(max(t1[0],t2[0]),min(t1[1],t2[1]))).map(lambda t:(t[0],sum(t[1])))
print(rdd_6.collect())





Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 2.0 failed 1 times, most recent failure: Lost task 2.0 in stage 2.0 (TID 10, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\HP\Anaconda3\envs\comp9313\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 362, in main
  File "C:\Users\HP\Anaconda3\envs\comp9313\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 722, in read_int
    length = stream.read(4)
  File "C:\Users\HP\Anaconda3\envs\comp9313\lib\socket.py", line 586, in readinto
    return self._sock.recv_into(b)
socket.timeout: timed out

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\HP\Anaconda3\envs\comp9313\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 362, in main
  File "C:\Users\HP\Anaconda3\envs\comp9313\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 722, in read_int
    length = stream.read(4)
  File "C:\Users\HP\Anaconda3\envs\comp9313\lib\socket.py", line 586, in readinto
    return self._sock.recv_into(b)
socket.timeout: timed out

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
