# **Module `pepper.cython.ufuncs`**

# **`is_constant`** non Cython

## La fonction

In [1]:
import numpy as np
import pandas as pd

from typing import Iterable, Union

def is_constant(x: Iterable[Union[int, float, str]]) -> bool:
    """
    Check if all elements in the group are equal.

    Parameters
    ----------
    x : iterable
        The group of elements to be checked.

    Returns
    -------
    bool
        True if all elements are equal or if the group has one element,
        otherwise False.
    """
    if isinstance(x, (list, tuple)):
        # Don't do this: return len(set(x)) <= 1
        x = np.array(x)
    if isinstance(x, pd.Series):
        return x.empty or x.eq(x.iloc[0]).all()
    if isinstance(x, np.ndarray):
        return len(x) < 1 or np.all(x == x[0])

## Tests unitaires

In [90]:
# Test cases

# Case 1: All elements in the group are equal
group1 = [1, 1, 1, 1]
result1 = is_constant(group1)
print(f"Case 1: {result1}")  # Expected output: True

# Case 2: Group has only one element
group2 = [2]
result2 = is_constant(group2)
print(f"Case 2: {result2}")  # Expected output: True

# Case 3: Group has different elements
group3 = [3, 4, 5, 6]
result3 = is_constant(group3)
print(f"Case 3: {result3}")  # Expected output: False

# Case 4: Empty group
group4 = []
result4 = is_constant(group4)
print(f"Case 4: {result4}")  # Expected output: True

Case 1: True
Case 2: True
Case 3: False
Case 4: True


## Test sur un **`groupby`**

In [91]:
import pandas as pd

# Create a sample DataFrame
data = {
    'Category': ['A', 'A', 'B', 'B', 'C'],
    'Value': [1, 1, 2, 3, 4]
}

df = pd.DataFrame(data)
display(df)

# Group by 'Category' column and apply the all_equal function
grouped = df.groupby("Category")
aggregated = grouped.agg({
    "Value": ["sum", "size", tuple, is_constant]
})

print(aggregated)

Unnamed: 0,Category,Value
0,A,1
1,A,1
2,B,2
3,B,3
4,C,4


         Value                         
           sum size   tuple is_constant
Category                               
A            2    2  (1, 1)        True
B            5    2  (2, 3)       False
C            4    1    (4,)        True


## Alternatives

In [None]:
import pandas as pd
import numpy as np

s = pd.Series([1, 1, 1, 1])
print(s.eq(s.iloc[0]).all())
print((s == s[0]).all())
print(np.all(s == s[0]))
print(len(set(s)) == 1)

True
True
True
True


## Benchmark **`timeit`**

In [82]:
import pandas as pd
import numpy as np
import timeit

N = 1_000_000
n_iter = 1_000

# Créer des séquences de données égales et inégales
equal_data = [1] * N
unequal_data = [1] * (N - 1) + [2]

s_equal = pd.Series(equal_data)
s_unequal = pd.Series(unequal_data)

arr_equal = np.array(equal_data)
arr_unequal = np.array(unequal_data)

# Fonctions alternatives pour tester l'égalité
def method1():
    return s_equal.eq(s_equal.iloc[0]).all()

def method2():
    return (s_equal == s_equal[0]).all()

def method3():
    return np.all(s_equal == s_equal[0])

# Very bad perfs !
def method4():
    return len(set(s_equal)) == 1

# Exécuter le benchmark pour les séquences égales
print("Benchmark pour des séquences égales :")
print("Method 1:", timeit.timeit(method1, number=n_iter))
print("Method 2:", timeit.timeit(method2, number=n_iter))
print("Method 3:", timeit.timeit(method3, number=n_iter))
# print("Method 4:", timeit.timeit(method4, number=n_iter))

# Exécuter le benchmark pour les séquences inégales
print("\nBenchmark pour des séquences inégales :")
print("Method 1:", timeit.timeit(lambda: method1(), number=n_iter))
print("Method 2:", timeit.timeit(lambda: method2(), number=n_iter))
print("Method 3:", timeit.timeit(lambda: method3(), number=n_iter))
# print("Method 4:", timeit.timeit(lambda: method4(), number=n_iter))


Benchmark pour des séquences égales :
Method 1: 1.035789699992165
Method 2: 1.0292290000070352
Method 3: 1.0553065999993123

Benchmark pour des séquences inégales :
Method 1: 1.0315671000280418
Method 2: 1.1662725000060163
Method 3: 1.0793781999964267


# **`is_constant_ufunc`** Cython

## Tests unitaires

In [2]:
from pepper.agg import is_constant

# Test cases

# Case 1: All elements in the group are equal
group1 = [1, 1, 1, 1]
result1 = is_constant(group1)
print(f"Case 1: {result1}")  # Expected output: True

# Case 2: Group has only one element
group2 = [2]
result2 = is_constant(group2)
print(f"Case 2: {result2}")  # Expected output: True

# Case 3: Group has different elements
group3 = [3, 4, 5, 6]
result3 = is_constant(group3)
print(f"Case 3: {result3}")  # Expected output: False

# Case 4: Empty group
group4 = []
result4 = is_constant(group4)
print(f"Case 4: {result4}")  # Expected output: True

Case 1: True
Case 2: True
Case 3: False
Case 4: True


## Test sur un **`groupby`**

In [1]:
import pandas as pd
from pepper.agg import is_constant

# Create a sample DataFrame
data = {
    'Category': ['A', 'A', 'B', 'B', 'C'],
    'Value': [1, 1, 2, 3, 4]
}

df = pd.DataFrame(data)
display(df)

# Group by 'Category' column and apply the all_equal function
grouped = df.groupby("Category")
aggregated = grouped.agg({
    "Value": ["sum", "size", tuple, is_constant]
})

print(aggregated)

Unnamed: 0,Category,Value
0,A,1
1,A,1
2,B,2
3,B,3
4,C,4


         Value                         
           sum size   tuple is_constant
Category                               
A            2    2  (1, 1)        True
B            5    2  (2, 3)       False
C            4    1    (4,)        True


## Comparaison des performances

Pas de gain significatif sur ce cas, mais ce fut l'occasion d'apprendre à faire du Cython.

Pour déboguer du Cython et notamment prolonger la pile d'appel en cas d'exception :

```python
import cython.debug
if __name__ == "__main__":
    cython.debug.embed()
    ...
```

In [9]:
import pandas as pd
import numpy as np
import timeit
from pepper.agg import is_constant as is_constant_cython

N = 1_000_000
n_iter = 1_000

# Créer des séquences de données égales et inégales
equal_data = [1] * N
unequal_data = [1] * (N - 1) + [2]

arr_equal = np.array(equal_data)
arr_unequal = np.array(unequal_data)

# Exécuter le benchmark pour les séquences égales
print("Benchmark pour des séquences égales :")
print("Method 1:", timeit.timeit(lambda: is_constant(arr_equal), number=n_iter))
print("Method 2:", timeit.timeit(lambda: is_constant_cython(arr_equal), number=n_iter))

# Exécuter le benchmark pour les séquences inégales
print("\nBenchmark pour des séquences inégales :")
print("Method 1:", timeit.timeit(lambda: is_constant(arr_unequal), number=n_iter))
print("Method 2:", timeit.timeit(lambda: is_constant_cython(arr_unequal), number=n_iter))

Benchmark pour des séquences égales :
Method 1: 0.730645600000571
Method 2: 0.7046755000001212

Benchmark pour des séquences inégales :
Method 1: 0.733071700000437
Method 2: 0.7308216999999786


In [13]:
x = [1, 10, 5, 8, 3, 2, 7, 6, 0]

n = len(x)
# i = 1
q = 0
for i in range(n):
    if x[n-1] > i:
        q = x[i]
print(q)

0
