# Duplicate Removal in Streaming Data
## Submitted By - Aniruddh Nathani

### Approach 1) Batch Processing

In [93]:
from collections import defaultdict
from random import randint

# Function which checks duplicate data present in the Real time Data or not. The function  can be connected to
#any realtime data generator API which gives data in the form of JSON Objects.
def DuplicateChecker(key, value, mapping):
    if not mapping:
        print("Print mapping at Time Stamp {}.".format(key))
        mapping[key] = value
    else:
        if value in mapping.values():
            print("Data {} at Time {}".format(value, key), "is already seen at a previous Time Stamp.", "Hence Ignore\n")
            return
        else:
            print("Print mapping at Time Stamp {}.".format(key))
            mapping[key] = value
    print(dict(mapping),"\n")
    return mapping
    

### Let real Time Data be in the JSON form. Assuming this data is randomly generated  for now, 
### the function created above can be called and used for streaming data as well.


RealTimeData = {}
for i in range(1, 11):
    RealTimeData[str(i) + 'am'] = randint(1, 10)  
print("Original Randomly generated Real Time Data is: \n" ,RealTimeData)

print("\n############### Duplicate removal in Progress ###################\n")

#creating hash map which keeps only unduplicated values.
mapping = defaultdict(lambda : 0) 

#for loop for Real time data seen at each timestamp. .
for key,value in RealTimeData.items():
    DuplicateChecker(key, value, mapping)
   
print("############### Duplicate removal Complete ###################\n")
print("\nData After Removing Duplicates: \n",dict(mapping))


Original Randomly generated Real Time Data is: 
 {'1am': 8, '2am': 8, '3am': 2, '4am': 8, '5am': 8, '6am': 9, '7am': 4, '8am': 1, '9am': 2, '10am': 9}

############### Duplicate removal in Progress ###################

Print mapping at Time Stamp 1am.
{'1am': 8} 

Data 8 at Time 2am is already seen at a previous Time Stamp. Hence Ignore

Print mapping at Time Stamp 3am.
{'1am': 8, '3am': 2} 

Data 8 at Time 4am is already seen at a previous Time Stamp. Hence Ignore

Data 8 at Time 5am is already seen at a previous Time Stamp. Hence Ignore

Print mapping at Time Stamp 6am.
{'1am': 8, '3am': 2, '6am': 9} 

Print mapping at Time Stamp 7am.
{'1am': 8, '3am': 2, '6am': 9, '7am': 4} 

Print mapping at Time Stamp 8am.
{'1am': 8, '3am': 2, '6am': 9, '7am': 4, '8am': 1} 

Data 2 at Time 9am is already seen at a previous Time Stamp. Hence Ignore

Data 9 at Time 10am is already seen at a previous Time Stamp. Hence Ignore

############### Duplicate removal Complete ###################


Data After

### Approach 2) Deduplication Solution in-stream data using Spark  (When data is huge)
#### 1)Spark flow that filters events based on their presence in a lookup database (cache storage)
#### 2) and Amazon S3 for storing Duplicator Caches.

### Algorithm Steps:
1) Partition Data into a Spark Stream. Each event has a unique ID based on the creation Time Stamp. Place in a bucket whose number is calculated as a hashcode of the event ID.

2) For better performance, Run the deduplication processing in parallel. 

3) Since we already processed data in parallel with the partitioning scheme described in 1), we should reuse the same parallelism for deduplication. Therefore, we should partition the cache stored in the S3 filesystem the same way we partitioned data in the stream.

4) Cache is a simple Data Structure that consists of two "SETS". The first one was an immutable set which contained data loaded from S3. The second one was a mutable set which was used for adding new event IDs.

5) Since set stores unique elements, On deduplication processing we get all the data elements which have unique data values and the ones which are repeated are ignored.