In [0]:
import copy
import time

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
sc=spark.sparkContext

# Get yourself Familiar with Key-Value Pairs in Pyspark

In [0]:
#Play with this if you are not familiar with mapReduce in Pyspark :)

dff = spark.sql("SELECT * FROM _2018_01_bme280sof_1_csv limit 5")
dff=dff.drop('_c0')
schema=dff.columns

rdd1=dff.rdd.map(lambda x: ({schema[0],schema[1]},schema[2],(x[0],x[1]),x[2]))
rdd2=dff.rdd.map(lambda x: ({schema[1],schema[2]},schema[3],(x[1],x[2]),x[3]))

rdd3=rdd1.union(rdd2)
rdd3=rdd3.map(lambda x:((x[0],x[1],x[2],x[3]),1))
# rdd3=rdd3.reduceByKey(lambda x,y:x)
i=0
for w in rdd3.toLocalIterator():
  i+=1
  print(w)

In [0]:
#Play with this if you are not familiar with mapReduce in Pyspark :)

# words = sc.parallelize([(({"Hadoop"},1),1),(({"is"},1),1),(({"good"},1),1), (({"Spark"},1),1),(({"is"},1),1),(({"fast"},1),1),(({"Spark"},1),1),(({"is"},1),1),(({"better"},1),1)])

# words = sc.parallelize([({"Hadoop"},'1','2'),({"is"},'1','1'),({"good"},'1','1'), ({"Spark"},'1','1'),({"is"},'1','2'),({"fast"},'1','1'),({"Spark"},'1','1'),({"is"},'1','1'),({"better"},'1','1')])

words = sc.parallelize([("Hadoop",'1',2),("is",'1',1),("good",'1',1), ("Spark",'1',1),("is",'2',1),("fast",'1',1),("Spark",'1',1),("is",'1',1),("better",'1',1)])

words1 = words.map(lambda x: ((x[0],x[1],x[2]),1))

#remap
word_re=words1.map(lambda x:(x[0],x[1][0],x[1][1]))

words2 = words1.reduceByKey(lambda x,y: x)

# words2 = words1.aggregateByKey(10,(lambda a,b:1 if(a==b) else 0),(lambda a,b:1 if(a==b) else 0))

# words2 = words1.reduceByKey(lambda a,b:a+b)
# words3 = words2.aggregateByKey()

#remap
print("before")
for w in words1.toLocalIterator():
  print(w)
  
print("after")
for w in words2.toLocalIterator():
  print(w)

# Define Controller

In [0]:
def generate_computational_graph(RHS, schema):
  """
  Output
  ----------
  A dictionary where
  key: level
  value: list of current level's candidates, candidates are in the format of set
  -----
  
  """
  computational_graph=dict()
  for level in range(3):
    #use brute force to generate candidates for each level
    computational_graph[level]=[]
    if level== 0:
      for attribute  in schema:
        if attribute !=RHS:
          computational_graph[level].append(set([attribute]))
        
    else:
      for element1 in computational_graph[level-1]:
        for element2 in computational_graph[0]:
          newelement = element1.union(element2)
          if newelement not in computational_graph[level]:
            if len(newelement)==level+1:
              computational_graph[level].append(newelement)    
  
  return computational_graph

In [0]:
def get_candidates(level, computational_graph):
  return computational_graph[level]

In [0]:
def prune_graph(level,current_level_result,computational_graph):
  """
  Input
  -------
  current_level_result: (soft/delta) functional dependencies discovered by algorithm, data structure: a list of candidates where candidates are in the format of sets
  computational_graph: A dict where key:level value: list of current level's candidates, candidates are in the format of set
  
  Output
  -------
  A pruned computational graph
  """
  # Candidates are pruned because minimal FD are already discovered
  
  # prune candidates after this level by verifying whether the next level has previous level's candidates as subset
  new_computational_graph = copy.deepcopy(computational_graph)
  while level<2:
    level+=1
    for LHS in current_level_result:
      for candidate in computational_graph[level]:
        if LHS.issubset(candidate):
          if candidate in new_computational_graph[level]:
            new_computational_graph[level].remove(candidate)
              
          
  return new_computational_graph

In [0]:
def transform_res(FDs):
  """
  Parameters
  --------------
  FDs: a list of (soft/delta) functional dependencies, where elements are tuples(LHS,RHS), LHS is in the format of set
  
  Output
  ---------
  current_level_result: a dictionary where key: RHS value: a list of LHS where candidates are in the form of sets
  """
  
  current_level_result=dict()
  for (LHS,RHS) in FDs:
    if RHS not in current_level_result.keys():
      current_level_result[RHS]=[]
    
    current_level_result[RHS].append(LHS)
    
  return current_level_result

In [0]:
def controller(df, func):
  """
  A control flow function

  Parameters
  -----------
  func: (soft/delta) Functional Discovery functions
  df: dataframe
  
  Output
  ------
  (soft/delta) Functional Dependencies
  """  
  # Initialization: Generate computational graph for each attribute which will be on RHS
  schema = df.columns
  computational_graph=dict()
  FDs=[]
  for RHS in schema:
    computational_graph[RHS]=generate_computational_graph(RHS,schema)

  for level in range(3):
    # Get current level candidates
    current_level_candidates=dict()
    for RHS in computational_graph.keys():
      current_level_candidates[RHS] = get_candidates(level,computational_graph[RHS])
    
    # Use current_level candidates as an input to FD-functions for each level, func will return discovered (soft/delta)functional dependencies
    tFDs = func(level,df,current_level_candidates)
#     print(tFDs)
    FDs.extend(tFDs)
    #Transform res into a dictionary where key: RHS value: a list of LHS where candidates are in the form of sets
    current_level_result = transform_res(tFDs)
#     print(current_level_result)
    
    # Prune graphs according to feedback of FD-functions
#     print(f"level:{level}, computatioanl_graph_key:{computational_graph.keys()},current_level_result_key:{current_level_result.keys()}")
    for RHS in computational_graph.keys():
      if RHS in current_level_result.keys():
        computational_graph[RHS]=prune_graph(level, current_level_result[RHS],computational_graph[RHS])
    
  
  return FDs

# Define FD Functions

In [0]:
# PAST FUNCTION, WHICH IS ABANDONED BY GROUP DECISION

# def find_FDs_sql(df,current_level_candidates):
#   """
#   Parameters
#   -------------
#   df: dataframe
#   current_level_candidates: A dictionary where key:RHS value: a list of LHS, LHS are in a set format
  
#   Output
#   ---------
#   A list of discovered functional dependencies where elements are tuples(LHS,RHS)
#   """
  
#   schema = df.columns
#   FDs=[]
#   i=0
#   for RHS in current_level_candidates.keys():
#     for LHS in current_level_candidates[RHS]:
#       i+=1
# #       sqlstring='SELECT '+f'{", ".join(f"{attribute}" for attribute in LHS)}'+f', COUNT(DISTINCT {RHS}) c'+' FROM _2018_01_bme280sof_1_csv GROUP BY '+f'{", ".join(f"{attribute}" for attribute in LHS)}'+ ' HAVING c>1'
#       sqlstring='SELECT '+f'{", ".join(f"{attribute}" for attribute in LHS)}'+f', COUNT(DISTINCT {RHS}) c'+' FROM toytable_csv GROUP BY '+f'{", ".join(f"{attribute}" for attribute in LHS)}'+ ' HAVING c>1'

#       res = spark.sql(sqlstring).count()
#       if(res==0):
#         FDs.append((LHS,RHS))
    
#   print(i)
#   return FDs

In [0]:
def find_FDs_pairs(level,df,current_level_candidates):
  """
  Parameters
  -------------
  df: dataframe
  current_level_candidates: A dictionary where key:RHS value: a list of LHS, LHS are in a set format
  
  Output
  ---------
  A list of discovered functional dependencies where elements are tuples(LHS,RHS)
  """
  
  schema = df.columns
  FDs=[]
  
  #transform dataframe to rdds
  rdds=spark.sparkContext.emptyRDD()
  for RHS in current_level_candidates.keys():
    for LHS in current_level_candidates[RHS]:
      rddt=df.rdd.map(lambda x:(*LHS,RHS,*[x[schema.index(attribute)] for attribute in LHS],x[schema.index(RHS)]))
      rdds=rdds.union(rddt)
      
      
  #Implementation of architecture
  rdds=rdds.distinct().map(lambda x:((*x[0:-2],x[-2]),1)).reduceByKey(lambda a,b:a+b).map(lambda x:((*x[0][:level],x[0][level]),x[1])).reduceByKey(lambda a,b:max(a,b)).filter(lambda x:x[1]==1).map(lambda x:(*x[0][:level],x[0][level])).distinct()
  
  #RDD to FDs: tuple(LHS,RHS)
  for item in rdds.toLocalIterator():
    FDs.append(({*item[:-1]},item[-1]))
    
  return FDs

In [0]:
a=dict()
a['a']=1
a['b']={1,2}
print(list(a.items())[0])

# Function Sanity Check (you do not have to play with this, but play with it if you like)

In [0]:
#Import toyTable

dfff = spark.sql("SELECT * FROM toytable_csv")

schema = dfff.columns

#Generate computational graph
computational_graph=dict()
for RHS in schema:
  computational_graph[RHS]=generate_computational_graph(RHS,schema)

#Define current_level_candidates
current_level_candidates=dict()

# current_level_candidates['S1'] = get_candidates(0,computational_graph['S1'])
for RHS in schema:
  current_level_candidates[RHS] = get_candidates(2,computational_graph[RHS])
  

start_time = time.time()

# FDs=find_FDs_sql(dfff,current_level_candidates)
FDss=find_FDs_pairs(dfff,current_level_candidates)

print("--- %s seconds ---" % (time.time() - start_time))
print(FDss)

In [0]:
i=0
FDt=copy.deepcopy(FDss)

for FDssitem in FDss:
  for FDsitem in FDs:
    if(FDssitem[1]==FDsitem[1]):
      if(FDsitem[0].issubset(FDssitem[0])):
        if FDssitem in FDt:
          FDt.remove(FDssitem)

print(FDt)

In [0]:
df = spark.sql("SELECT * FROM _2018_01_bme280sof_1_csv limit 2000")

In [0]:
#Function Sanity Check
df=df.drop('_c0')
schema = df.columns
computational_graph=dict()
for RHS in schema:
  computational_graph[RHS]=generate_computational_graph(RHS,schema)

# print(computational_graph['sensor_id'][2])

#Transform res into a dictionary where key: RHS value: a list of LHS
current_level_result = dict()
for RHS in schema:
  current_level_result[RHS] = [{'location', 'temperature'}]
  
# Prune graphs according to feedback of FD-functions
for RHS in schema:
  computational_graph[RHS]=prune_graph(1, current_level_result[RHS],computational_graph[RHS]) 
  
# print(computational_graph['sensor_id'][2])

current_level_candidates=dict()

current_level_candidates['sensor_id'] = get_candidates(0,computational_graph['sensor_id'])
# for RHS in schema:
#   current_level_candidates[RHS] = get_candidates(0,computational_graph[RHS])
  

start_time = time.time()

# FDs=find_FDs_sql(df,current_level_candidates)
FDs=find_FDs_pairs(df,current_level_candidates)

print("--- %s seconds ---" % (time.time() - start_time))

# print(FDs)

In [0]:
print(FDs)

# Test on ToyTable & Dataset

In [0]:
#Import toyTable

dff = spark.sql("SELECT * FROM toytable_csv")
  
# Use controller  
start_time = time.time()

FDs=controller(dff,find_FDs_pairs)

# FDs=find_FDs_pairs(dfff,current_level_candidates)

print("--- %s seconds ---" % (time.time() - start_time))
print(FDs)

In [0]:
#Import toyTable

dfff = spark.sql("SELECT * FROM _2018_01_bme280sof_1_csv limit 5")
dfff.drop('_c0')
# Use controller  
start_time = time.time()

FDs = controller(dfff,find_FDs_pairs)

# FDs=find_FDs_pairs(dfff,current_level_candidates)

print("--- %s seconds ---" % (time.time() - start_time))
print(FDs)