<a href="https://colab.research.google.com/github/anantguptadbl/python/blob/master/pytorch/explanability/ReviewExp.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
from google.colab import drive
import nltk
import numpy as np
from nltk.corpus import wordnet
import torch
from torch import nn
from torch.autograd import Variable
from torch.utils.data import DataLoader
import torch.nn.functional as F
import pandas as pd

nltk.download('averaged_perceptron_tagger')
nltk.download('wordnet')
drive.mount('/content/gdrive')

In [3]:
use_cuda=True
torch.cuda.is_available()

True

In [4]:
with open("/content/gdrive/My Drive/Colab Notebooks/b.csv","rb") as file:
  allData=file.readlines()

allData=[str(x).replace('\r','').replace('\n','').replace('\\n','').replace('\\r','').replace('\\','') for x in allData]
allData=allData[1:]
allData=[x[x.find(',')+1:] for x in allData]
allData=[x.replace('\'','') for x in allData]
ratings=[int(x[-1:]) for x in allData]
allData=[x[:-1] for x in allData]
allData=[x.replace('"','').replace(",",'') for x in allData]

# We will only take Adjective and Adverbs for our prediction
allData=[[y[0] for y in nltk.pos_tag([x for x in z.split(' ') if len(x) > 2]) if y[1] in ['JJ','JJR','JJS','RB','RBR','RBS','NN','VB']] for z in allData]

# Filtering the unique Words
uniqueWords=np.unique(np.array([y for x in allData for y in x]),return_counts=True)
uniqueWords=pd.DataFrame(zip(uniqueWords[0],uniqueWords[1]),columns=['word','wordCount'])
uniqueWords=uniqueWords[uniqueWords['wordCount']>50]
uniqueWords=uniqueWords['word'].values

#uniqueWords=np.unique(np.array([y for x in allData for y in x]))
uniqueWords=[x for x in uniqueWords if len(wordnet.synsets(x))>0]
uniqueWords=[x for x in uniqueWords if x[0].isdigit()==False]

from nltk.stem import WordNetLemmatizer
lemmatizer = WordNetLemmatizer()
p = nltk.PorterStemmer()
uniqueWordDict=dict((x,lemmatizer.lemmatize(x, pos="a"))  for x in uniqueWords)

def returnValidWords(curSentence,uniqueWordDict):
  alteredSentence=[]
  for curWord in curSentence:
    if(curWord in uniqueWordDict):
      alteredSentence.append(uniqueWordDict[curWord])
  return(alteredSentence)

allData=[returnValidWords(x,uniqueWordDict) for x in allData]
uniqueWords=np.unique(list(uniqueWordDict.values()))
uniqueWords=dict((x,i) for i,x in enumerate(uniqueWords))

def getVector(curSentence,uniqueWords):
  curArray=np.zeros(len(uniqueWords))
  for curWord in curSentence:
    curArray[uniqueWords[curWord]]=1
  return(curArray)

allData=[getVector(x,uniqueWords) for x in allData]
allData=np.array(allData)
ratings=np.array(ratings)

print("Cell Execution Completed")

Cell Execution Completed


In [0]:
# We will now be trying to fit a model and in the process derive explanability as well

torch.manual_seed(42)

class straightModel(nn.Module):
  def __init__(self,inputDim):
    super(straightModel,self).__init__()
    self.inputDim=inputDim
    self.l1=nn.Linear(self.inputDim,1024).cuda()
    self.b1=nn.BatchNorm1d(1024).cuda()
    self.l2=nn.Linear(1024,256).cuda()
    self.b2=nn.BatchNorm1d(256).cuda()
    self.l3=nn.Linear(256,64).cuda()
    self.b3=nn.BatchNorm1d(64).cuda()
    self.l4=nn.Linear(64,1).cuda()

  def forward(self,x):
    out=F.relu(self.b1(self.l1(x))).cuda()
    out=F.relu(self.b2(self.l2(out))).cuda()
    out=F.relu(self.b3(self.l3(out))).cuda()
    out=self.l4(out).cuda()
    return(out)

# Model Configuration
learning_rate=0.0001
batchSize=256
numBatches=int(len(allData)/batchSize)
numEpochs=10000

# Model
model=straightModel(allData.shape[1]).cuda()
criterion=nn.MSELoss()
optimizer=torch.optim.SGD(model.parameters(),lr=learning_rate)

for curEpoch in range(numEpochs):
  totalLoss=0
  for curBatch in range(numBatches):
    optimizer.zero_grad()
    dataOutput = model(Variable(torch.from_numpy(allData[curBatch*batchSize:(curBatch+1)*batchSize].astype(np.float32))).cuda())
    loss = criterion(dataOutput, Variable(torch.from_numpy(ratings[curBatch*batchSize:(curBatch+1)*batchSize].astype(np.float32))).cuda())
    # We will perform the backward propagation
    loss.backward()
    totalLoss=totalLoss + loss.item()
    optimizer.step()
  if(curEpoch%50==0):
      print('epoch {0} totalLoss {1}'.format(curEpoch + 1,totalLoss))   

layerNames=['l1','l2','l3','l4']
layersDict={}
for layerName in layerNames:
    for featureNumber,feature in enumerate(model._modules[layerName].weight.detach().numpy()):
        curResult=Variable(torch.from_numpy(allData[0:1].astype(np.float32)))
        for curLayerName in ['l1','l2','l3','l4']:
            curLayer=model._modules[curLayerName]
            if(curLayerName==layerName):
                break
            else:
                print("Before {0} the result is {1}".format(curLayer,curResult.size()))
                print(model._modules[curLayerName])
                curResult=curLayer(curResult)
                print("After {0} the result is {1}".format(curLayer,curResult.size()))
        curResult=feature*curResult.detach().numpy()
        curResult=np.absolute(curResult)
        curResult=curResult/np.sum(curResult)
        if(layerName not in layersDict):
            layersDict[layerName]=[]
        layersDict[layerName].append(curResult)
        
for curLayerIndex in range(len(layerNames)-1):
    for curSubLayer in layersDict[layerNames[len(layerNames)-curLayerIndex-1]]:   
        for i,x in enumerate(curSubLayer):
            layersDict[layerNames[len(layerNames)-curLayerIndex-2]][i]=layersDict[layerNames[len(layerNames)-curLayerIndex-2]][i]*x[0]
featureImportance=np.sum(np.array(layersDict[layerNames[0]]),axis=0)
featureImportance=pd.DataFrame(zip(uniqueWords.keys(),featureImportance[0]),columns=['word','importance'])
featureImportance.sort_values(by='importance',ascending=False)  