In [1]:
import re
import utils
import tiktoken
from collections import namedtuple

In [2]:
markdown = """# Heading 1

This is some content under heading 1.

## Heading 2

This is some content under heading 2.

### Heading 3

This is some content under heading 3.

#### Heading 4

This is some content under heading 4.

##### Heading 5

This is some content under heading 5.

## Another Heading 2

This is some content under another heading 2.
"""

In [3]:
MAX_TOKEN = 512
MAX_HEADING_LEVEL = 6

# Constants
CHUNK_SIZE = 1024  # The target size of each text chunk in tokens
MIN_CHUNK_SIZE_CHARS = 256  # The minimum size of each text chunk in characters
MIN_CHUNK_LENGTH_TO_EMBED = 5  # Discard chunks shorter than this
# EMBEDDINGS_BATCH_SIZE = 128  # The number of embeddings to request at a time
MAX_NUM_CHUNKS = 4096  # The maximum number of chunks to generate from a text


def get_text_chunks(text: str, chunk_token_size=CHUNK_SIZE):
    """
    Split a text into chunks of ~CHUNK_SIZE tokens, based on punctuation and newline boundaries.

    Args:
        text: The text to split into chunks.
        chunk_token_size: The target size of each chunk in tokens, or None to use the default CHUNK_SIZE.

    Returns:
        A list of text chunks, each of which is a string of ~CHUNK_SIZE tokens.
    """
    # Return an empty list if the text is empty or whitespace
    if not text or text.isspace():
        return []

    # Tokenize the text
    tokenizer = tiktoken.encoding_for_model('gpt-3.5-turbo')
    tokens = tokenizer.encode(text)

    # Initialize an empty list of chunks
    chunks = []

    # Use the provided chunk token size or the default one
    chunk_size = chunk_token_size or CHUNK_SIZE

    # Initialize a counter for the number of chunks
    num_chunks = 0

    # tokens of text less than CHUNK_SIZE, then do not divide
    if len(tokens) < CHUNK_SIZE:
        chunks.append(text.replace("\n", " ").strip())
        return chunks

    # Loop until all tokens are consumed
    while tokens and num_chunks < MAX_NUM_CHUNKS:
        # Take the first chunk_size tokens as a chunk
        chunk = tokens[:chunk_size]

        # Decode the chunk into text
        chunk_text = tokenizer.decode(chunk)

        # Skip the chunk if it is empty or whitespace
        if not chunk_text or chunk_text.isspace():
            # Remove the tokens corresponding to the chunk text from the remaining tokens
            tokens = tokens[len(chunk) :]
            # Continue to the next iteration of the loop
            continue

        # Find the last period or punctuation mark in the chunk
        last_punctuation = max(
            chunk_text.rfind("."),
            chunk_text.rfind("?"),
            chunk_text.rfind("!"),
            chunk_text.rfind("\n"),
        )

        # If there is a punctuation mark, and the last punctuation index is before MIN_CHUNK_SIZE_CHARS
        if last_punctuation != -1 and last_punctuation > MIN_CHUNK_SIZE_CHARS:
            # Truncate the chunk text at the punctuation mark
            chunk_text = chunk_text[: last_punctuation + 1]

        # Remove any newline characters and strip any leading or trailing whitespace
        chunk_text_to_append = chunk_text.replace("\n", " ").strip()

        if len(chunk_text_to_append) > MIN_CHUNK_LENGTH_TO_EMBED:
            # Append the chunk text to the list of chunks
            chunks.append(chunk_text_to_append)

        # Remove the tokens corresponding to the chunk text from the remaining tokens
        tokens = tokens[len(tokenizer.encode(chunk_text, disallowed_special=())) :]

        # Increment the number of chunks
        num_chunks += 1

    # Handle the remaining tokens
    if tokens:
        remaining_text = tokenizer.decode(tokens).replace("\n", " ").strip()
        if len(remaining_text) > MIN_CHUNK_LENGTH_TO_EMBED:
            chunks.append(remaining_text)

    return chunks

def get_chunks_by_heading(
    text: str,
    max_level: int
):
    """
    Extract the sections of a Wikipedia page, discarding the references and other low information sections
    """
    # find all headings and the coresponding contents
#     regex_pattern = r'^#{max_level}\s.*$'
    regex_pattern = fr'^#{{{max_level}}}\s.*$'
    headings = re.findall(regex_pattern, text, re.MULTILINE)
    for heading in headings:
        text = text.replace(heading, "==+ !! ==+"+heading)


    contents = text.split("==+ !! ==+")
    return [x for x in contents if len(x.strip()) > 0]

def merge_short_paragraphs(contents, max_token=MAX_TOKEN, direction="bottom-up"):
    results = []
    i = 0
    while i < len(contents):
        if utils.num_tokens_from_string(contents[i]) < max_token:
            j = i + 1
            while j < len(contents) and utils.num_tokens_from_string(contents[i] + contents[j]) <= max_token:
                contents[i] += contents[j]
                j += 1
            results.append(contents[i])
            i = j
        else:
            results.append(contents[i])
            i += 1
    return results

def recursive_split_by_heading(markdown, max_level=1):
    # 如果整篇文档的token小于max_token，则直接返回
    token_count = utils.num_tokens_from_string(markdown)
    if token_count < MAX_TOKEN:
        yield markdown
        return
    
    # 如果 max_level 大于6，直接返回整个 markdown
    if max_level > MAX_HEADING_LEVEL:
        if token_count > CHUNK_SIZE:
            yield from get_text_chunks(markdown)
        else:
            yield markdown
        return

    # 调用 split_by_heading 函数进行分块
    blocks = get_chunks_by_heading(markdown, max_level)
    
    # 调用 merge_short_paragraphs 进行合并
    merged_blocks = merge_short_paragraphs(blocks)

    # 检查分块结果的长度
    for i, block in enumerate(merged_blocks):
        token_count = utils.num_tokens_from_string(block)
#         print('level:', max_level)
#         print('block:', block)
#         print('token_count:', token_count)
#         print('-'*50)
        if token_count > MAX_TOKEN:
            # 如果长度大于50个字符，递归调用 recursive_split_by_heading 函数
            yield from recursive_split_by_heading(block, max_level + 1)
        else:
            yield block

#     # 将分块结果扁平化为一个列表并返回
#     result = []
#     for block in blocks:
#         if isinstance(block, list):
#             result.extend(block)
#         else:
#             result.append(block)

#     return result

In [4]:
contents = get_chunks_by_heading(markdown, 3)
for content in contents:
    print(content)
    print('-'*50)

# Heading 1

This is some content under heading 1.

## Heading 2

This is some content under heading 2.


--------------------------------------------------
### Heading 3

This is some content under heading 3.

#### Heading 4

This is some content under heading 4.

##### Heading 5

This is some content under heading 5.

## Another Heading 2

This is some content under another heading 2.

--------------------------------------------------


In [5]:
contents = list(recursive_split_by_heading(markdown))

In [6]:
for content in contents:
    print(content)
    print('tokens:', utils.num_tokens_from_string(content))
    print('-'*100)

# Heading 1

This is some content under heading 1.

## Heading 2

This is some content under heading 2.

### Heading 3

This is some content under heading 3.

#### Heading 4

This is some content under heading 4.

##### Heading 5

This is some content under heading 5.

## Another Heading 2

This is some content under another heading 2.

tokens: 86
----------------------------------------------------------------------------------------------------


In [7]:
markdown = '''
# Working with Pact
## 1. Overview
### 1.1 Consumer Driven Contract Test
Contract tests assert that inter-application messages conform to a shared understanding that is documented in a contract. Without contract testing, the only way to ensure that applications will work correctly together is by using expensive and brittle integration tests. We use PACT, which is a code-first tool for testing HTTP and message integrations using contract tests. Contract tests should focus on the messages rather than the behavior. Please refer to [Contract Tests vs Functional Tests](https://docs.pact.io/consumer/contract_tests_not_functional_tests) for more details.

Please read the [PACT Introduction page](https://docs.pact.io/) for more details on what contract testing is, when to use it and how it works.



### 1.2 PACT Modification - Schema based contract testing
We make use of a modification that is based on schema based contract testing as proposed in blog 1 ([Atlassian](https://www.atlassian.com/blog/technology/))spec-first-api-development
 and blog 2 ([Pactflow](https://pactflow.io/blog/contract-testing-using-json-schemas-and-open-api-part-3/)). We verify PACT by comparing the API definition for both HTTP API and ASYNC API to the contract. As such, we mainly need to implement the test on the consumer side since we will use the API definition on the provider side. Please refer to the blog posts and the Contract Testing [presentation](https://sap.sharepoint.com/:p:/r/teams/S4HANALabs-Eureka/_layouts/15/Doc.aspx?sourcedoc=%7B6810054E-D74C-492C-9762-73483611D9B0%7D&file=consumer-driven-contract-test.pptx&wdLOR=cDA68E6C8-9776-9E4B-A0F4-25CBB6966638&action=edit&mobileredirect=true&cid=650753b6-1100-4e91-bc87-e24d55edb944) held by Cosine in the DevOps CoP (01/12/2021):

[![](../assets/images/nativepact.png)](../assets/images/nativepact.png)
[![](../assets/images/proposedpact.png)](../assets/images/proposedpact.png)

### 1.3 Writing the Consumer Test
Once you run the test project, the generated PACT file will be generated in your projects `$rootDir/Pacts`

References:

[PACT Consumer Test Best Practices](https://docs.pact.io/consumer)

PACT DSL Manual: [PACT Consumer General Information](https://docs.pact.io/implementation_guides/jvm/consumer) and [PACT Consumer JUnit 5 Guide](https://docs.pact.io/implementation_guides/jvm/consumer/junit5), [Lambda DSL for PACT](https://docs.pact.io/implementation_guides/jvm/consumer/java8)

The links above will explain the annotations seen in the sample PRs: @ExtendWith, @Pact, @PactTestFor

[Example Message Consumer Test](https://github.com/pact-foundation/pact-jvm/blob/master/consumer/junit/src/test/java/au/com/dius/pact/consumer/junit/v3/ExampleMessageConsumerTest.java) (pact-jvm repo)

More examples and source code can be found in the [pact-jvm](https://github.com/pact-foundation/pact-jvm) repository

### 1.4 Samples
#### 1.4.1. Consumer Tests
##### JAVA

Refer to [this](https://github.tools.sap/CIC/ar-open-item/pull/34/files) PR for HTTP API 

Refer to [this](https://github.tools.sap/CIC/claims-settlement/pull/22/files) PR for Async API 

##### JavaScript
TBD

##### Python
TBD

## Publishing Pact files to the Broker    
To publish pact files, add the following step to ci.yml
```yaml
- name: pact-publish
  type: eureka/dev-pact:v1
  env:
    STAGE: "PUBLISH"
    PACT_FOLDER: "./Pacts/*.json"

```

Link to PACT Broker : [https://pact-broker.eurekacloud.io/](https://pact-broker.eurekacloud.io/)

### Credentials to the Pact Broker

From the Eureka Developer Portal, you can get your user name and password for accessing Pact Broker.
Proceed as follows:

1. Select the `Secret` tab.
2. Switch to the `CREDENTIAL` tab.
3. You can then find `Pact Broker User Name` and `Pact Broker Password`
4. You can choose the `Copy` icon to copy the value.


### Make Sure that Async Pact files have Topic and Matching rules 



Check that "Topic" exists, similar to example below :
```json
"messages": [
    {
      "_id": "8e0f8ea65b8140ad2547150a24816e66f1da07a3",
      "description": "Attachment Notification Received Successfully",
      "metaData": {
        "Topic": "AttachmentNotification",
        "contentType": "application/json"
      },
      "contents": {
        "originalFileName": "string",
        "execFailureDesc": "string",

```
Check that Matching Rules exists, similar to example below :
```json
 "id": 100,
        "reasonCode": "string",
        "projectId": "string",
        "contentType": "string"
      },
      "matchingRules": {
        "body": {
          "$.id": {
            "matchers": [
              {
                "match": "integer"
              }
            ],
            "combine": "AND"
          },
          "$.projectId": {
            "matchers": [
              {
                "match": "type"
              }
            ],
```



[Example of pact with correct format](https://pact-broker.eurekacloud.io/pacts/provider/attachment-ng-async/consumer/document-library-async/latest)


[Example repo](https://github.wdf.sap.corp/Eureka/document-library/blob/develop/document-library-cdc/src/test/java/com/sap/s4hana/eureka/business/documentlibrary/cdc/async/AttachmentNotificationTest.java)


## Prerequisites for verification steps
### Add steps to export OpenAPI and AsyncAPI files.
### To add export for OpenAPI:
#### 1. Update frw version to 0.15.6
#### 2. Import dependency
You need to add this dependency for the module to generate the swagger file, the module is usually called **-api
For Gradle:
```
testCompile 'com.sap.s4hana.eureka.framework:app-frw-open-api-doc-starter
```
For Maven:

```
<dependency>
    <groupId>com.sap.s4hana.eureka.framework</groupId>
    <artifactId>app-frw-open-api-doc-starter</artifactId>
    <scope>test</scope>
</dependency>
```
#### 3. Config exporting
The exporter can be configured by environment variable, this step is optional when you want to use default config. Here is an example using gradle:
```
test{
    environment = [
            // optional, output swagger file directory, default current directory
            'swagger.jsongen.dir': "${project.buildDir.name}",
 
            //optional, scan packages, comma split, default com.sap.s4hana.eureka.business
            'swagger.jsongen.package': 'com.sap.s4hana.eureka.business',
 
            //optional, can disable json generate, default true
            'swagger.jsongen.enable': 'true',
 
            //optional, swagger file name, default project dir name
            'swagger.jsongen.file': "${project.rootDir.name}",
            "asyncapidoc.path": "../${project.buildDir.name}/asyncapidoc.json"
    ]
}
```
#### 4. Export json document
For Gradle:
```
./gradlew clean test
```
For Maven:
```
mvn clean test
```
using project asynapitest as example, this will generate file asynapitest-api/build/asynapitest.json 
#### 5. Code example pr : https://github.tools.sap/CIC/asynapitest/pull/1
#### 6. Attention please
Add swagger's @Api annotation on the controller class, otherwise the RequestMapping will category to default.
[![](../assets/images/pactOpenAPIannotation.png)](../assets/images/pactOpenAPIannotation.png)
### To add export AsyncAPI:
#### 1. Add com.sap.s4hana.eureka.framework:app-frw-async-api-doc-starter dependency to build.gradle.
#### 2. For each file "filenameeo", Add @ApiAsyncEvent(topic = "*filename*") annotation to filenameeo.
[![](../assets/images/exportasyncapi.png)](../assets/images/exportasyncapi.png)
#### 3. If this module has no test cases, add a test case
#### 4. run test will generate asyncapidoc.json in this module.
#### 5. You can change the default path in build.gradle by setting environment "asyncapidoc.path".
[![](../assets/images/pactasyncapipath.png)](../assets/images/pactasyncapipath.png)
#### 6. Example PR: https://github.tools.sap/CIC/async-api-doc-test/pull/1
#### 7. Configuration
```
test {
    environment = [
            // optional, output swagger file directory, default current directory
            'swagger.jsongen.dir': "../${project.buildDir.name}",
 
            //optional, scan packages, comma split, default com.sap.s4hana.eureka.business
            'swagger.jsongen.package': 'com.sap.s4hana.eureka.business',
 
            //optional, can disable json generate, default true
            'swagger.jsongen.enable': 'true',
 
            //optional, swagger file name, default project dir name
            'swagger.jsongen.file': 'swagger',
 
            "asyncapidoc.path": "../${project.buildDir.name}/asyncapidoc.json"
    ]
}
```

## Verification step 
In the verify step for http, the PATH_TO_SWAGGER should be set to swagger.json from step generated above.

Example with attachment-ng is shown below, note that the provider and tag must match with the Pact Broker. If PATH_TO_SWAGGER is omitted, it will default to GIT_REPO-api/build/swagger.json. If PATH_TO_ASYNC_API_FILE is omitted, it will default to GIT_REPO-application/asyncapi.doc. If PROVIDER is omitted, it will default to GIT_REPO. If TAG is omitted it will default to GIT_TARGET_BRANCH.
```yaml
- name: pact-verify
  type: eureka/dev-pact:v1
  env:
     STAGE: "VERIFY"
     PATH_TO_SWAGGER: "attachment-ng-api/build/workspace.json"             # Swagger spec auto-generated in build step (from prerequisite #1)
     PATH_TO_ASYNC_API_FILE: "attachment-ng-application/asyncapidoc.json"  # Async API spec auto-generated in build step (from prerequisite #1)
     PROVIDER: "attachment-ng" # Defaults to $GIT_REPO
     TAG: "develop" # Defaults to $GIT_TARGET_BRANCH
```

Note: In special cases where an api is split into seperate modules within the same project, two swagger files will be generated which will require the verify step to be run twice, once for each swagger. 
Example of this scenario with claims backend is shown below. 
```yaml
- name: pact-verify-1
  type: eureka/dev-pact:v1
  env:
    STAGE: "VERIFY"
    PATH_TO_ASYNC_API_FILE: "claims-backend-application/asyncapidoc.json"    # Swagger 1 
    PATH_TO_SWAGGER: "claims-backend-api/build/swagger.json"           
    PROVIDER: "claims-backend"                                               # Provider 1
- name: pact-verify-2
  type: type: eureka/dev-pact:v1
  env:
    STAGE: "VERIFY"
    PATH_TO_SWAGGER: "claims-backend-analytics-report/build/swagger.json"   # Swagger 2     
    PROVIDER: "claims-backend-analytics"                                    # Provider 2
```
## Understanding Results
Results will be displayed with the following format:


[![](../assets/images/pactResults.png)](../assets/images/pactResults.png)
   


#### Consumer 
  - Displays the number of consumer contracts published to the broker by the specified service. 
#### Provider
  - Displays two metrics ex. PASSED:PASSED, NA:FAILED, etc 
    - The first value indicates the services verification status for HTTP contract test
    - The second value indicates the services verification status for ASYNC contract test
    - If the value is listed as NA, it means there were no published consumer contracts to be verified for the given provider


'''



In [8]:
contents = list(recursive_split_by_heading(markdown))
contents = merge_short_paragraphs(contents, max_token=int(MAX_TOKEN*0.8), direction="bottom-up")
for content in contents:
    print(content)
    print('tokens:', utils.num_tokens_from_string(content))
    print('-'*100)

# Working with Pact

tokens: 5
----------------------------------------------------------------------------------------------------
## 1. Overview
### 1.1 Consumer Driven Contract Test
Contract tests assert that inter-application messages conform to a shared understanding that is documented in a contract. Without contract testing, the only way to ensure that applications will work correctly together is by using expensive and brittle integration tests. We use PACT, which is a code-first tool for testing HTTP and message integrations using contract tests. Contract tests should focus on the messages rather than the behavior. Please refer to [Contract Tests vs Functional Tests](https://docs.pact.io/consumer/contract_tests_not_functional_tests) for more details.

Please read the [PACT Introduction page](https://docs.pact.io/) for more details on what contract testing is, when to use it and how it works.



### 1.2 PACT Modification - Schema based contract testing
We make use of a modificati