# **Create grounding** - We begin by reading our data set from csv file and put it in the spark dataframe . 
**Get Data** - we read data from a csv file which each tuple known with its index and list of denial constarints
**Prepare Data** - We parse each denial constraints and to make SQL query over it 
**Analyze Data** - For each constraint we make a dataframe that contains tuple that satisfy that denial constraint      
    

> The ***pyspark*** library is used for all the data analysis excluding a small piece of the data presentation section. The ***numpy*** library will only be needed for set functionality. Importing the libraries is the first step we will take in the 

In [1]:
# Import all libraries needed for the tutorial

# General syntax to import specific functions in a library: 
##from (library) import (specific library function)
import pyspark as ps #this is how I usually import spark
import numpy as np #this is how I usually import numpy
import sys #For some system calls


In [2]:
print('Python version ' + sys.version)
print('Spark version ' + ps.__version__)
print('numpy version ' + np.__version__)

Python version 2.7.13 |Anaconda 4.4.0 (64-bit)| (default, Dec 20 2016, 23:09:15) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]
Spark version 2.2.0
numpy version 1.12.1


# Initialize Spark Session  

The spark session for this tutorial runs locally and its name is "Grounding"

In [3]:
# The inital set of baby names and bith rates
spark = ps.sql.SparkSession.builder.master("local").appName("Word Count").getOrCreate()

# Reading data 

In this section we use spark session to read csv file which its name is 10.csv that include 10 weather data with header index , city , temp (which is the temperature) , tempType (the temperature type TMIN or TMAX) and date .

In [4]:
df=spark.read.csv("10.csv",header=True)
df.show()

+-----+-----------+--------+--------+----+
|index|       city|    date|tempType|temp|
+-----+-----------+--------+--------+----+
|    1|ITE00100554|18000101|    TMAX| -75|
|    2|ITE00100554|18000101|    TMIN|-148|
|    3|GM000010962|18000101|    PRCP|   0|
|    4|EZE00100082|18000101|    TMAX| -86|
|    5|EZE00100082|18000101|    TMAX|-135|
|    6|ITE00100554|18000102|    TMAX| -60|
|    7|ITE00100554|18000102|    TMIN|-125|
|    8|GM000010962|18000102|    PRCP|   0|
|    9|EZE00100082|18000102|    TMAX| -44|
|   10|EZE00100082|18000102|    TMIN|-135|
+-----+-----------+--------+--------+----+



We are basically done reading the data set. 

***df*** will be a ***DataFrame*** object. You can think of this object holding the contents of the BabyDataSet in a format similar to a sql table or an excel spreadsheet. In next step we get get the list of denial constraints normally denial constraints make a logical compare between two tuples

$$\begin{array}{c}
\forall t_1,t_2 : \lbrace P_1 \cap P_2 \cap ... \cap  P_m  \rbrace
\end{array}
$$

We use similar format to show the denial constraints that used in Holostic Error Detection algorithm we also have a list of noisy cells which is not needed for this part.For simplicity we assume that we just just one denial constraint. It says that in one city if one year we have maximum temerature and that temprature cannot be minimum tempreture for same city.


In [5]:
dcCode=['t1&t2&EQ(t1.city,t2.city)&EQ(t1.temp,t2.temp)&IQ(t1.tempType,t2.tempType)']

Grounding use for making feature vectors for learning part of holoclean to this end we have a class for parsing the input data and make some SQL predicate for making query over our dataframe.

In [28]:
class DenialConstraint:

    operationsArr=['=' , '<' , '>' , '<>' , '<=' ,'>=']
    operationSign=['EQ','LT', 'GT','IQ','LTE', 'GTE']
    
    
    
    def __init__(self,denial_constraints):
        self.denial_constraints=denial_constraints

    def dc2SqlCondition(self):
        
        """
        Creates list of list of sql predicates by parsing the input denial constraints
        :return: list[list[string]]
        """

        
        dcSql=[]
        usedOperations=[]
        numOfContraints=len(self.denial_constraints)
        for i in range(0,numOfContraints):
            ruleParts=self.denial_constraints[i].split('&')
            firstTuple=ruleParts[0]
            secondTuple=ruleParts[1]
            numOfpredicate=len(ruleParts)-2
            dcOperations=[]
            dc2sqlpred=[]
            for c in range(2,len(ruleParts)):
                dc2sql=''
                predParts=ruleParts[c].split('(')
                op=predParts[0]
                dcOperations.append(op)
                predBody=predParts[1][:-1]
                tmp=predBody.split(',')
                predLeft=tmp[0]
                predRight=tmp[1]
                #predicate type detection
                if firstTuple in predBody and secondTuple in predBody:
                    if firstTuple in predLeft:
                        dc2sql= dc2sql+'table1.'+ predLeft.split('.')[1]+ self.operationsArr[self.operationSign.index(op)]+'table2.'+predRight.split('.')[1]
                    else:
                        dc2sql= dc2sql+'table2.'+ predLeft.split('.')[1]+ self.operationsArr[self.operationSign.index(op)]+'table1.'+predRight.split('.')[1]
                elif firstTuple in predBody:
                    if firstTuple in predLeft:
                        dc2sql= dc2sql+'table1.'+ predLeft.split('.')[1]+ self.operationsArr[self.operationSign.index(op)]+predRight
                    else:
                        dc2sql= dc2sql+ predLeft+ self.operationsArr[self.operationSign.index(op)]+'table1.'+ predRight.split('.')[1]
                else:
                    if secondTuple in predLeft:
                        dc2sql= dc2sql+'table2.'+ predLeft.split('.')[1]+ self.operationsArr[self.operationSign.index(op)]+predRight
                    else:
                        dc2sql= dc2sql+ predLeft+ self.operationsArr[self.operationSign.index(op)]+'table2.'+ predRight.split('.')[1]
                dc2sqlpred.append(dc2sql)
            usedOperations.append(dcOperations)
            dcSql.append(dc2sqlpred) 
        return dcSql,usedOperations   
    
    def make_and_condition(self,conditionInd):
        """
        return the list of indexed constraints
        :param conditionInd: int
        :return: string
        """
        result,dc=self.dc2SqlCondition()
        parts=result[conditionInd]
        strRes=str(parts[0])
        if len(parts)>1:
            for i in range(1,len(parts)):
                strRes=strRes+" AND "+str(parts[i])
        return strRes
    
    def noViolation_tuple(self,dataset,condition,spak_session):
        
        dataset.createOrReplaceTempView("df")   
        q="SELECT table1.index as indexT1,table2.index as indexT2 FROM df table1,df table2 WHERE NOT("+ self. make_and_condition(condition)+")"        
        satisfied_tuples_index=spak_session.sql(q)         
        return satisfied_tuples_index
    
    def violation_tuple(self,dataset,condition,spak_session):
        
        dataset.createOrReplaceTempView("df")   
        q="SELECT table1.index as indexT1,table2.index as indexT2 FROM df table1,df table2 WHERE ("+ self. make_and_condition(condition)+")"        
        not_satisfied_tuples_index=spak_session.sql(q)         
        return not_satisfied_tuples_index
    
    def all_dc_violation(self,dataset,spak_session):
        
        """
        Return list of violation tuples dataframe
        :param dataset: Dataframe
        :param spak_session: SparkSession        
        :return: list[Dataframe]
        """
        
        return [self.violation_tuple(dataset, i, spak_session) for i in range(0,len(self.denial_constraints))]
    
    def all_dc_nonViolation(self,dataset,spak_session):
        
        """
        Return list of non violation tuples dataframe
        :param dataset: Dataframe
        :param spak_session: SparkSession        
        :return: list[Dataframe]
        """
        
        return [self.noViolation_tuple(dataset, i, spak_session) for i in range(0,len(self.denial_constraints))]

In here we can test method $dc2SqlCondition$ by creating an object from DenialConstraint and give $dcCode$ 

In [29]:
print(dcCode)
dc=DenialConstraint(dcCode)
predicatePart,l=dc.dc2SqlCondition()
print(predicatePart)
print(dc. make_and_condition(0))
print (df.show(df.count()))
print("%%%%%%%%%%%%%%%%%%%%%%")
df.createOrReplaceTempView("df") 
multiCond="SELECT table1.index as indexT1,table2.index as indexT2 FROM df table1,df table2 WHERE NOT("+ dc.make_and_condition(0) +" AND "+ str(predicatePart[0][1])+" AND "+ str(predicatePart[0][2])+")"
print(multiCond)
singleCond="SELECT table1.index as indexT1,table2.index as indexT2 FROM df table1,df table2 WHERE ("+ dc. make_and_condition(0) +")"
mcDF=spark.sql(multiCond)
scDF=spark.sql(singleCond)
print(mcDF.count())
print (mcDF.show(mcDF.count()))
print("%%%%%%%%%%%%%%%%%%%%%%")
print(scDF.count())
print (scDF.show(scDF.count()))
print("%%%%%%%%%%%%%%%%%%%%%%")

['t1&t2&EQ(t1.city,t2.city)&EQ(t1.temp,t2.temp)&IQ(t1.tempType,t2.tempType)']
[['table1.city=table2.city', 'table1.temp=table2.temp', 'table1.tempType<>table2.tempType']]
table1.city=table2.city AND table1.temp=table2.temp AND table1.tempType<>table2.tempType
+-----+-----------+--------+--------+----+
|index|       city|    date|tempType|temp|
+-----+-----------+--------+--------+----+
|    1|ITE00100554|18000101|    TMAX| -75|
|    2|ITE00100554|18000101|    TMIN|-148|
|    3|GM000010962|18000101|    PRCP|   0|
|    4|EZE00100082|18000101|    TMAX| -86|
|    5|EZE00100082|18000101|    TMAX|-135|
|    6|ITE00100554|18000102|    TMAX| -60|
|    7|ITE00100554|18000102|    TMIN|-125|
|    8|GM000010962|18000102|    PRCP|   0|
|    9|EZE00100082|18000102|    TMAX| -44|
|   10|EZE00100082|18000102|    TMIN|-135|
+-----+-----------+--------+--------+----+

None
%%%%%%%%%%%%%%%%%%%%%%
SELECT table1.index as indexT1,table2.index as indexT2 FROM df table1,df table2 WHERE NOT(table1.city=table2.

As you can see if the predicate have n part then it will make a list of length n and put each predicate in order. Method (make_condition) just return list that in position conditionInd


## Finding satisfied tuples using Spark

In this section we make method for finding tuple that shows the indices record of data that satisfy the specified constraint for this we have noViolation_tuple that make dataframe with columns that eash row shows which two indices are satisfies that denial constraint.

In [30]:
dp=DenialConstraint(dcCode)
gh=dp.noViolation_tuple(df, 0, spark)
cnt=gh.count()
print(gh.show(cnt)) 

+-------+-------+
|indexT1|indexT2|
+-------+-------+
|      1|      1|
|      1|      2|
|      1|      3|
|      1|      4|
|      1|      5|
|      1|      6|
|      1|      7|
|      1|      8|
|      1|      9|
|      1|     10|
|      2|      1|
|      2|      2|
|      2|      3|
|      2|      4|
|      2|      5|
|      2|      6|
|      2|      7|
|      2|      8|
|      2|      9|
|      2|     10|
|      3|      1|
|      3|      2|
|      3|      3|
|      3|      4|
|      3|      5|
|      3|      6|
|      3|      7|
|      3|      8|
|      3|      9|
|      3|     10|
|      4|      1|
|      4|      2|
|      4|      3|
|      4|      4|
|      4|      5|
|      4|      6|
|      4|      7|
|      4|      8|
|      4|      9|
|      4|     10|
|      5|      1|
|      5|      2|
|      5|      3|
|      5|      4|
|      5|      5|
|      5|      6|
|      5|      7|
|      5|      8|
|      5|      9|
|      6|      1|
|      6|      2|
|      6|      3|
|      6| 

As you can see in dataframe index 10 and 5 doesnt satisfiy the denial costraint the city is paris and the minimum temperature in one year is equal to maximum temperature in another year so this is error and as you can see in the tple dataframe neither (10,5) and (5,10) didn't appeared

In [9]:
df.show()

+-----+-----------+--------+--------+----+
|index|       city|    date|tempType|temp|
+-----+-----------+--------+--------+----+
|    1|ITE00100554|18000101|    TMAX| -75|
|    2|ITE00100554|18000101|    TMIN|-148|
|    3|GM000010962|18000101|    PRCP|   0|
|    4|EZE00100082|18000101|    TMAX| -86|
|    5|EZE00100082|18000101|    TMAX|-135|
|    6|ITE00100554|18000102|    TMAX| -60|
|    7|ITE00100554|18000102|    TMIN|-125|
|    8|GM000010962|18000102|    PRCP|   0|
|    9|EZE00100082|18000102|    TMAX| -44|
|   10|EZE00100082|18000102|    TMIN|-135|
+-----+-----------+--------+--------+----+



## Finding not satisfied tuples using Spark

In this section we make method for finding tuples that shows the indices of records of data that not satisfy the we specified constraint.For this we have violation_tuple that make dataframe with columns that eash row shows which two indices didn't satisfy the specified denial constraint.

In [10]:
dp=DenialConstraint(dcCode)
gh=dp.violation_tuple(df, 0, spark)
cnt=gh.count()
print(gh.show(cnt)) 

+-------+-------+
|indexT1|indexT2|
+-------+-------+
|      5|     10|
|     10|      5|
+-------+-------+

None
