## Mounting Google Drive locally

In [1]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


## Install [graphriccicurvature](https://graphriccicurvature.readthedocs.io/en/latest/GraphRicciCurvature.html)

In [2]:
!pip install graphriccicurvature


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting graphriccicurvature
  Downloading GraphRicciCurvature-0.5.3.1-py3-none-any.whl (23 kB)
Collecting pot>=0.8.0
  Downloading POT-0.8.2-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (664 kB)
[K     |████████████████████████████████| 664 kB 6.3 MB/s 
[?25hCollecting networkit>=6.1
  Downloading networkit-10.0.tar.gz (5.7 MB)
[K     |████████████████████████████████| 5.7 MB 26.2 MB/s 
Building wheels for collected packages: networkit
  Building wheel for networkit (setup.py) ... [?25l[?25hdone
  Created wheel for networkit: filename=networkit-10.0-cp37-cp37m-linux_x86_64.whl size=8165434 sha256=a431da2f4dfa7a603ddee3d7eb721bfd87c6e292e0a86c8b1357873923de08f3
  Stored in directory: /root/.cache/pip/wheels/63/7a/29/82ad675434206a11470c7179c78d3f172ea2a6bc05cfb7391c
Successfully built networkit
Installing collected packages: pot, networkit, graphriccicurvature
Successf

## extract_data_graph

extract_data_graph.py permette di estrarre la zona labiale e calcolare la curvatura di Ricci tra i landmark d'interesse (mouth_intern o mouth_extern)
a partire da uno o più file .mp4

Carmine Ippolito

In [3]:
import os
from tqdm import tqdm
import dlib
import cv2
import numpy as np
import networkx as nx
from scipy.spatial import distance
from GraphRicciCurvature.OllivierRicci import OllivierRicci
import csv

VIDEO_INPUT_PATH = ''
# You can get the predictor file from: http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2
PREDICTOR_PATH = ''
FRAME_SIZE = (300, 200)
# Extract the coordinates of mouth_intern [60:68] or mouth_extern [48:60]
MOUTH_COORDINATES = (60, 68)
# Use UPSAMPLE_NUM_TIMES = 1 for backward compatibility (will reduce performance)
UPSAMPLE_NUM_TIMES = 0
# Draw the landmarks on the ROI
DRAW_LANDMARKS = False
VIDEO_OUTPUT_PATH = None
CSV_RICCI_OUTPUT_PATH = ''


def extract_data(root, filename, predictor_path, frame_size, mouth_coordinates, upsample_num_times=0, draw_landmarks=False, video_output_path=None, csv_ricci_output_path=None):
    # Initialize dlib's face detector (HOG-based) and then create the facial landmark predictor
    detector = dlib.get_frontal_face_detector()
    predictor = dlib.shape_predictor(predictor_path)
    G = nx.complete_graph(mouth_coordinates[1] - mouth_coordinates[0])
    video_array = []
    curvature_ricci_matrix = []

    cap = cv2.VideoCapture(root + '/' + filename)
    while cap.isOpened():
        ret, image = cap.read()
        # The frame is read correctly if ret is True
        if ret != True:
            break

        # Use UPSAMPLE_NUM_TIMES = 1 for backward compatibility (will reduce performance)
        face = detector(image, upsample_num_times)[0]
        landmarks = predictor(image, face)

        # Convert the landmark (x, y) coordinates to a NumPy array
        # Initialize the list of (x, y) coordinates
        coordinates = np.zeros((68, 2), dtype='int')
        # Loop over the 68 facial landmarks and convert them to a 2-tuple of (x, y) coordinates
        for i in range(0, 68):
            coordinates[i] = (landmarks.part(i).x, landmarks.part(i).y)

        # Extract the coordinates of the mouth
        (x, y, w, h) = cv2.boundingRect(np.array([coordinates[48:68]]))
        # Extract and resize the ROI according to frame_size
        roi = image[y - 10: y + h + 10, x - 10: x + w + 10]
        roi = cv2.resize(roi, dsize=(frame_size[0], frame_size[1]),
                         interpolation=cv2.INTER_CUBIC)

        if draw_landmarks != False or csv_ricci_output_path != None:
            scaled_coordinates = []
            # Extract the coordinates of mouth_intern [60:68] or mouth_extern [48:60]
            for (xc, yc) in coordinates[mouth_coordinates[0]:mouth_coordinates[1]]:
                # Scale xc and yc according to frame_size
                scaled_xc = round(((xc - (x - 10)) * frame_size[0]) / (w + 20))
                scaled_yc = round(((yc - (y - 10)) * frame_size[1]) / (h + 20))
                scaled_coordinates.append((scaled_xc, scaled_yc))

                if draw_landmarks != False:
                    # Draw the landmarks on the ROI
                    cv2.circle(roi, (scaled_xc, scaled_yc), 1, (0, 0, 255), -1)

            if csv_ricci_output_path != None:
                # Calculate Manhattan distance
                distance_array = [round(distance.cityblock([x1, y1], [x2, y2]))
                                  for i, (x1, y1) in enumerate(scaled_coordinates) for (x2, y2) in scaled_coordinates[i + 1:]]
                for i, (u, v) in enumerate(G.edges):
                    G.edges[u, v]['weight'] = distance_array[i]

                # Calculate Ricci curvature
                orc = OllivierRicci(G, alpha=0.5)
                orc.compute_ricci_curvature()
                distance_array = [orc.G[n1][n2]['ricciCurvature']
                                  for n1, n2 in list(G.edges)]
                curvature_ricci_matrix.append(distance_array)

        video_array.append(roi)

    cap.release()

    if video_output_path != None:
        # Save the ROI in a .avi file
        os.chdir(video_output_path)
        fps = int(filename.split('.')[0].split('_')[4])
        outputVideo = cv2.VideoWriter((filename.split(
            '.')[0]) + '_m' + '.avi', cv2.VideoWriter_fourcc(*'DIVX'), fps, frame_size)
        for image in video_array:
            outputVideo.write(image)
        outputVideo.release()

    def save_csv(csv_output_path, distance_matrix):
        # Save the distances in a .csv file
        os.chdir(csv_output_path)
        with open(filename.split('.')[0] + '_m' + '.csv', mode='w', newline='') as csvfile:
            spamwriter = csv.writer(csvfile)
            for row in distance_matrix:
                spamwriter.writerow(row)

    if csv_ricci_output_path != None:
        save_csv(csv_ricci_output_path, curvature_ricci_matrix)


if __name__ == '__main__':
    # Count the number of videos and save their path
    N = 0
    videos = []

    for root, dirs, files in os.walk(VIDEO_INPUT_PATH):
        for filename in files:
            if filename.endswith('.mp4'):
                N += 1
                videos.append((root, filename))

    print('Extracting data')
    # Initialize the progress bar
    for root, filename in tqdm(videos):
        # Execute extract_data
        extract_data(root, filename, PREDICTOR_PATH, FRAME_SIZE, MOUTH_COORDINATES,
                     UPSAMPLE_NUM_TIMES, DRAW_LANDMARKS, VIDEO_OUTPUT_PATH, CSV_RICCI_OUTPUT_PATH)


Extracting data


100%|██████████| 1/1 [01:28<00:00, 89.00s/it]


In [4]:
import os
from tqdm import tqdm
import dlib
import cv2
import numpy as np
import networkx as nx
from scipy.spatial import distance
from GraphRicciCurvature.OllivierRicci import OllivierRicci
import csv

# You can get the predictor file from: http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2
VIDEO_INPUT_PATH = ''
PREDICTOR_PATH = ''
FRAME_SIZE = (300, 200)
# Extract the coordinates of mouth_intern [60:68] or mouth_extern [48:60]
MOUTH_COORDINATES = (60, 68)
# Use UPSAMPLE_NUM_TIMES = 1 for backward compatibility (will reduce performance)
UPSAMPLE_NUM_TIMES = 0
# Draw the landmarks on the ROI
DRAW_LANDMARKS = False
VIDEO_OUTPUT_PATH = None
CSV_RICCI_OUTPUT_PATH = ''


def extract_data(root, filename, predictor_path, frame_size, mouth_coordinates, upsample_num_times=0, draw_landmarks=False, video_output_path=None, csv_ricci_output_path=None):
    # Initialize dlib's face detector (HOG-based) and then create the facial landmark predictor
    detector = dlib.get_frontal_face_detector()
    predictor = dlib.shape_predictor(predictor_path)
    video_array = []
    curvature_ricci_matrix = []

    cap = cv2.VideoCapture(root + '/' + filename)
    while cap.isOpened():
        ret, image = cap.read()
        # The frame is read correctly if ret is True
        if ret != True:
            break

        # Use UPSAMPLE_NUM_TIMES = 1 for backward compatibility (will reduce performance)
        face = detector(image, upsample_num_times)[0]
        landmarks = predictor(image, face)

        # Convert the landmark (x, y) coordinates to a NumPy array
        # Initialize the list of (x, y) coordinates
        coordinates = np.zeros((68, 2), dtype='int')
        # Loop over the 68 facial landmarks and convert them to a 2-tuple of (x, y) coordinates
        for i in range(0, 68):
            coordinates[i] = (landmarks.part(i).x, landmarks.part(i).y)

        # Extract the coordinates of the mouth
        (x, y, w, h) = cv2.boundingRect(np.array([coordinates[48:68]]))
        # Extract and resize the ROI according to frame_size
        roi = image[y - 10: y + h + 10, x - 10: x + w + 10]
        roi = cv2.resize(roi, dsize=(frame_size[0], frame_size[1]),
                         interpolation=cv2.INTER_CUBIC)

        if draw_landmarks != False or csv_ricci_output_path != None:
            scaled_coordinates = []
            # Extract the coordinates of mouth_intern [60:68] or mouth_extern [48:60]
            for (xc, yc) in coordinates[mouth_coordinates[0]:mouth_coordinates[1]]:
                # Scale xc and yc according to frame_size
                scaled_xc = (
                    ((xc - (x - 10)) * frame_size[0]) / (w + 20)) / (frame_size[0] - 1)
                scaled_yc = (
                    ((yc - (y - 10)) * frame_size[1]) / (h + 20)) / (frame_size[1] - 1)
                scaled_coordinates.append((scaled_xc, scaled_yc))

                if draw_landmarks != False:
                    # Draw the landmarks on the ROI
                    cv2.circle(roi, (scaled_xc, scaled_yc), 1, (0, 0, 255), -1)

            if csv_ricci_output_path != None:
                # Calculate Manhattan distance
                distance_array = [[i, i + j + 1, distance.cityblock([x1, y1], [x2, y2])] for i, (x1, y1) in enumerate(
                    scaled_coordinates) for j, (x2, y2) in enumerate(scaled_coordinates[i + 1:])]

                # Calculate Ricci curvature
                G = nx.Graph()
                G.add_weighted_edges_from(distance_array)
                orc = OllivierRicci(G, alpha=0.5)
                orc.compute_ricci_curvature()
                distance_array = [orc.G[n1][n2]['ricciCurvature']
                                  for n1, n2 in list(G.edges)]
                curvature_ricci_matrix.append(distance_array)

        video_array.append(roi)

    cap.release()

    if video_output_path != None:
        # Save the ROI in a .avi file
        os.chdir(video_output_path)
        fps = int(filename.split('.')[0].split('_')[4])
        outputVideo = cv2.VideoWriter((filename.split(
            '.')[0]) + '_m' + '.avi', cv2.VideoWriter_fourcc(*'DIVX'), fps, frame_size)
        for image in video_array:
            outputVideo.write(image)
        outputVideo.release()

    def save_csv(csv_output_path, distance_matrix):
        # Save the distances in a .csv file
        os.chdir(csv_output_path)
        with open(filename.split('.')[0] + '_m' + '.csv', mode='w', newline='') as csvfile:
            spamwriter = csv.writer(csvfile)
            for row in distance_matrix:
                spamwriter.writerow(row)

    if csv_ricci_output_path != None:
        save_csv(csv_ricci_output_path, curvature_ricci_matrix)


if __name__ == '__main__':
    # Count the number of videos and save their path
    N = 0
    videos = []

    for root, dirs, files in os.walk(VIDEO_INPUT_PATH):
        for filename in files:
            if filename.endswith('.mp4'):
                N += 1
                videos.append((root, filename))

    print('Extracting data')
    # Initialize the progress bar
    for root, filename in tqdm(videos):
        # Execute extract_data
        extract_data(root, filename, PREDICTOR_PATH, FRAME_SIZE, MOUTH_COORDINATES,
                     UPSAMPLE_NUM_TIMES, DRAW_LANDMARKS, VIDEO_OUTPUT_PATH, CSV_RICCI_OUTPUT_PATH)


Extracting data


100%|██████████| 1/1 [01:24<00:00, 84.27s/it]


In [6]:
drive.flush_and_unmount()
print('All changes made in this colab session should now be visible in Drive.')


All changes made in this colab session should now be visible in Drive.


In [5]:
import networkx as nx
from GraphRicciCurvature.OllivierRicci import OllivierRicci

distance_array = [92,126,165,236,205,171,127,38,73,172,157,123,79,39,134,119,85,107,103,88,114,146,143,185,217,42,78,44]

G = nx.complete_graph(8)
for i, (u, v) in enumerate(G.edges):
    G.edges[u, v]['weight'] = distance_array[i]

orc = OllivierRicci(G, alpha=0.5)
orc.compute_ricci_curvature()

for n1, n2 in list(orc.G.edges):
    print('Ricci curvature of edge (%s,%s) is %f' %
          (n1, n2, orc.G[n1][n2]['ricciCurvature']))


Ricci curvature of edge (0,1) is 0.571429
Ricci curvature of edge (0,2) is 0.571429
Ricci curvature of edge (0,3) is 0.571429
Ricci curvature of edge (0,4) is 0.571429
Ricci curvature of edge (0,5) is 0.571429
Ricci curvature of edge (0,6) is 0.571429
Ricci curvature of edge (0,7) is 0.571429
Ricci curvature of edge (1,2) is 0.571429
Ricci curvature of edge (1,3) is 0.571429
Ricci curvature of edge (1,4) is 0.571429
Ricci curvature of edge (1,5) is 0.571429
Ricci curvature of edge (1,6) is 0.571429
Ricci curvature of edge (1,7) is 0.571429
Ricci curvature of edge (2,3) is 0.571429
Ricci curvature of edge (2,4) is 0.571429
Ricci curvature of edge (2,5) is 0.571429
Ricci curvature of edge (2,6) is 0.571429
Ricci curvature of edge (2,7) is 0.571429
Ricci curvature of edge (3,4) is 0.571429
Ricci curvature of edge (3,5) is 0.571429
Ricci curvature of edge (3,6) is 0.571429
Ricci curvature of edge (3,7) is 0.571429
Ricci curvature of edge (4,5) is 0.571429
Ricci curvature of edge (4,6) is 0