# API Python de visualisation de données conversationnelles

In [1]:
from os import listdir
from numpy import *
from scipy import signal, interpolate
from bokeh.plotting import figure, show
from bokeh.layouts import gridplot
from bokeh.io import output_notebook
from abc import ABC, abstractmethod
from re import match
from pickle import load
output_notebook()

In [2]:
class VisualizationData:
    
    def __init__(self, directory, corpus=[], conversations=[], speakers=[], data_type=[], format=[]):
        self.metadata = self.read_metadata(directory)
        self.data = []
        cpt = 0
        for file_name in listdir(directory):
            if match("[a-zA-Z]+_[0-9]+_[a-zA-Z0-9]+_.+\.csv$",file_name):
                file_info = file_name.replace(".csv", "").split("_") 
                id_caller = self.find_info(file_info[1], file_info[-1])[3] 
                if ((len(corpus) == 0 or int(file_info[0]) in corpus) 
                and (len(conversations) == 0 or int(file_info[1]) in conversations) 
                and (len(speakers) == 0 or id_caller in speakers) 
                and (len(data_type) == 0 or file_info[2] in data_type)): 
                    to_add = {"corpus":file_info[0], "conversation":file_info[1], "speaker":file_info[-1]} 
                    #to_add[file_info[2]] = load(open(directory+"/"+file_name, "rb")) # pkl 50 secondes
                    to_add[file_info[2]] = loadtxt(open(directory+"/"+file_name, "rb"), delimiter="\t", skiprows=1)
                    self.data.append(to_add)
                    cpt += 1
        print(str(cpt)+" fichiers de données ont été lus")
    
    def read_metadata(self, directory):
        for file_name in listdir("data"):
            file_info = file_name.replace(".csv", "").split("_")
            if (file_info[1] == "metadata"):
                metadata_file = open(directory+"/SW_metadata.csv", "r")
                columns = metadata_file.readline().split("\t")
                return(genfromtxt(metadata_file, delimiter="\t", usecols=range(1,len(columns)),
                                  dtype=None, names=columns))    
    def find_info(self, conv, caller):
        for line in self.metadata:
            if line["id_conv"] == int(conv) and line["id_caller"].decode('UTF-8') == caller:
                return line

In [3]:
class Display():
    
    @staticmethod
    def average(vdata, smoothing_window=None, points_number=None):
        plot = AveragePlot(smoothing_window, points_number)
        for d in vdata.data:
            plot.add_data(d)
        show(plot.get_plot())
    
    @staticmethod
    def conversation(vdata, smoothing_window=None, points_number=None, linked = False,
                     color_palette = ["red", "blue", "green", "purple", "yellow"]):
        grid = []
        conversations = {}
        for d in vdata.data:
            if d['conversation'] not in conversations.keys():
                conversations[d['conversation']] = {}
            conversations[d['conversation']][d['speaker']] = d['speech'] # ! pour l'instant cette fonction ne gère que les speech rate
            
        for conv_id, conv_data in conversations.items():
            conv = ConversationPlot(conv_id, smoothing_window, points_number, color_palette)
            for speaker, data in conv_data.items():
                conv.add_data(speaker, data)
            grid.append(conv.get_plot())
            
        if linked == True:
            for i in range(1, len(grid)):
                grid[i].x_range = grid[0].x_range
                grid[i].y_range = grid[0].y_range
        show(gridplot(grid, ncols=1))

In [4]:
class Plot:
    def __init__(self, smoothing_window, points_number):
        self.smoothing_window = smoothing_window
        self.points_number = points_number        
        
    # Smoothing function
    def smooth(x, y, window_len, points_number):
        if window_len is None:
            window_len = int(x[-1]/15)
        if points_number is None:
            points_number = 200
        
        # first step, get the values closer together by averaging each values with a window of hann :
        window = hanning(window_len)
        wider_data = r_[y[window_len-1:0:-1],y,y[-2:-window_len-1:-1]]
        new_y=convolve(window/window.sum(),wider_data,mode='valid')

        # second step, create new points to smooth the curve :
        smooth_function = interpolate.CubicSpline(linspace(0, int(x[-1]),new_y.size), new_y)
        smooth_x = linspace(0, x[-1], points_number)
        smooth_y = smooth_function(smooth_x)

        return {'x':smooth_x, 'y':smooth_y}

In [5]:
class AveragePlot (Plot):
    def __init__(self, smoothing_window, points_number):
        Plot.__init__(self, smoothing_window, points_number)
        self.plot = figure(width=800,height=250)
        self.grouped_data = [[],[]]
    
    def add_data(self, data):
        x=[]
        for row_x in data['speech'][:,2]:
            x.append(row_x * 100 / data['speech'][-1,2]) # time to pourcent conversion
        smoothed_values = Plot.smooth(x, data['speech'][:,1], self.smoothing_window, self.points_number)
        self.grouped_data[0].append(smoothed_values['x'])
        self.grouped_data[1].append(smoothed_values['y'])
    
    def get_plot (self):
        self.plot.multi_line(self.grouped_data[0], self.grouped_data[1], line_width=2,
                             color="grey", alpha = 0.6, legend = "all data")
        average = mean(self.grouped_data[1], axis=0)
        self.plot.line(linspace(0, 100, average.size), average, legend="Average", line_width=4, color="blue")
        self.plot.legend.click_policy="hide"
        return self.plot

In [6]:
class ConversationPlot (Plot):
    def __init__(self, id_conv, smoothing_window, points_number, color_palette):
        Plot.__init__(self, smoothing_window, points_number)
        self.plot = figure(width=800, height=250, title="Speech rate evolution conversation number "+id_conv)
        self.color_palette = color_palette
        self.color_number = 0

    def add_data (self, speaker, data):
        color=self.color_palette[self.color_number%9]
        average = mean(data[:,1])
        smoothed_data = Plot.smooth(data[:,2], data[:,1], self.smoothing_window, self.points_number)
        self.plot.line(data[:,2], data[:,1], legend="Raw data",
                       alpha=0.3, line_dash="10 4", line_width=1, color=color)
        self.plot.cross(data[:,2], data[:,1], legend="Raw data",
                       line_width=1, alpha=0.4, size=10, color=color)
        self.plot.line(smoothed_data['x'], smoothed_data['y'], legend="Smooth Speaker "+speaker,
                       line_width=3, color=color)
        self.plot.line([0,data[-1,2]], [average, average], legend="Averages",
                       line_width=1,  line_dash="20 3", color=color)
        self.color_number += 1
        
    def get_plot (self):
        self.plot.legend.click_policy="hide"
        return self.plot

## Utilisation de l'API

In [7]:
vdata = VisualizationData("../Desktop/X11", conversations=[3500,4000])



4 fichiers de données ont été lus


In [8]:
Display.conversation(vdata, color_palette=["green", "#220099", "yellow"], smoothing_window=15)

In [9]:
Display.average(vdata, smoothing_window=25)