In [None]:
import pydriller
from pydriller import ModificationType
from elasticsearch import Elasticsearch
from elasticsearch.helpers import streaming_bulk
import json
from datetime import date, datetime
import tqdm
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from typing import List, Optional



## Create the elasticsearch and pydriller clients

First we instantiate the elasticsearch client. Without specifying parameters, it will look for an elasticsearch instance on localhost, port 9200 (default)

We then specify the name of the index where we want to store our commit data. In this case, we will look at the elasticsearch git repository itself, so we call the index "elasticsearch-commits" but this is completely arbitrary.
In case the index already exist and we want to wipe it to re-ingest the data we can run the delete command on the indeces.

Finally, we create a pydriller repository object by specifing the location on the local file system where we cloned the desired repository, in our case "C:\Projects\External-Examples\go-ipfs"

In [64]:
esclient = Elasticsearch()
index_name = "go-ipfs-commits-complete"
start_tag =  'v0.5.0'
end_tag = 'v0.10.0'
file_type = '.go'


esrepo = pydriller.Repository('C:\Projects\External-Examples\go-ipfs', from_tag=start_tag, to_tag=end_tag)

# in case we want to wipe the index and re-ingest the data
# esclient.indices.delete(index_name)

{'acknowledged': True}

## Define the elasticsearch schema

In case this is the first time we import the data, or if we deleted the index, the next step is to specify the schema of the elasticsearch index we use. Note that we can also import the data without specifying the schema, and elastic will infer the schema itself. However, elastic doesn't know how we want to analyze the data, so using the default schema inference is not recommended.

In the schema, besides the standard data type mapping such as integer, float, and datetime, which are simply a 1:1 mapping with python, we have text and keyword. keyword should be used for strings that we want to analyze as they are (for example author name or email, commit hash); text is for strings where we want to apply full text search capabilities. In this case we can also specify which analyzer to use, to define which stemming rules to apply and stop-world to remove. We use the english analyzer for the commit text messages, which are the only human writte piece of information in the commit (excluding the actual diff). In the mapping for the commit message (field msg) we also set "fielddata" to be true, so that we can apply techniques normally available only for keyword fields to the commit message (for example to generate the tag cloud).

In [65]:
esclient.indices.create(
    index_name, 
    body={
        "mappings": {
            "properties": {
                "author.name": {"type": "keyword"},
                "author.email": {"type": "keyword"},
                "committer.name": {"type": "keyword"},
                "committer.email": {"type": "keyword"},
                "deletions": {"type": "integer"},
                "insertions": {"type": "integer"},
                "lines": {"type": "integer"},
                "files": {"type": "integer"},
                
                "author_timezone": {"type": "integer"}, 
                "committer_date": {"type": "date"},
                "committer_timezone": {"type": "integer"}, 
                "parents": {"type": "keyword"},

                "old_path": {"type": "keyword"},
                "new_path": {"type": "keyword"},
                "filename": {"type": "keyword"},
                "change_type": {"type": "keyword"},
                "diff": {"type": "text"},
                "added_lines": {"type": "integer"},
                "deleted_lines": {"type": "integer"},
                "source_code": {"type": "text"},
                "source_code_before": {"type": "text"},
                "methods": {"type": "text"},
                "methods_before": {"type": "text"},
                "changed_methods": {"type": "text"},
                "nloc": {"type": "integer"},
                "complexity": {"type": "integer"},
                "token_count": {"type": "integer"},
            }}})

{'acknowledged': True,
 'shards_acknowledged': True,
 'index': 'go-ipfs-commits-complete'}

In [63]:

class modified_files(BaseModel):
    complexity: Optional[int]
    nloc: Optional[int]
    old_path: Optional[str]
    new_path: Optional[str]
    change_type: Optional[ModificationType]
    filename: Optional[str]
    diff: Optional[str]
    added_lines: Optional[int]
    deleted_lines: Optional[int]
    source_code: Optional[str]
    source_code_before: Optional[str]
    methods: Optional[List]
    methods_before: Optional[List]
    changed_methods: Optional[List]
    token_count: Optional[int]
    


class commit(BaseModel):
    hash: str
    msg: str
    author_date: datetime
    author_name: str
    author_email: str
    modified_files: List[modified_files]
    commiter_name: str
    commiter_email: str
    deletions: int
    insertions: int
    lines: int
    files: int


## Importing the data in ElasticSearch

### Converting Pydantic Pydriller objects into json

Once we have an elasticsearch index ready to receive our data, we need a way to convert Pydanitc pydriller objects into json format, the one expected by elasticsearch (we could also use other format with elasticsearch pipelines and plugins, but json is the default, easiest, and preferred format).

To convert python objects into json, the standard way is to use the json library with the json.dumps function. We use the Fastapi's json_encode to convert the python objects into json.

### Bulk importing the data

Now that we have the index, the schema, and a way to encode pydriller objects into json, we are ready to import the data in ElasticSearch. To do that, we use the bulk endpoints, which are way more efficient than the standard ingest endpoints to import large number of documents.

We first define a function that given a number of commits to import (`limit`), traverses the git history, converts every commit into json and yields back the converted commit.

We then use such a function in the `streaming_bulk` elasticsearch method together with the elasticsearch client and the index name.

In [66]:
def yield_commits(limit):
    counter = 0
    Commits = []
    for traverse_commit in esrepo.traverse_commits():
        if counter >= limit:
                break
        for m in traverse_commit.modified_files:
            if m.filename.endswith(file_type):
                json_commit = json.dumps(jsonable_encoder(commit(hash=traverse_commit.hash, 
                msg=traverse_commit.msg, author_date=traverse_commit.author_date,  
                author_name=traverse_commit.author.name, 
                author_email=traverse_commit.author.email,
                commiter_name=traverse_commit.committer.name,
                commiter_email=traverse_commit.committer.email,
                deletions=traverse_commit.deletions,
                insertions=traverse_commit.insertions,
                lines=traverse_commit.lines,
                files=traverse_commit.files,
                modified_files=[modified_files(complexity=m.complexity, 
                                                nloc=m.nloc, 
                                                old_path=m.old_path, 
                                                new_path=m.new_path,
                                                change_type=m.change_type,
                                                filename=m.filename,
                                                diff=m.diff,
                                                added_lines=m.added_lines,
                                                deleted_lines=m.deleted_lines,
                                                source_code=m.source_code,
                                                source_code_before=m.source_code_before,
                                                methods=m.methods,
                                                methods_before=m.methods_before,
                                                changed_methods=m.changed_methods,
                                                token_count=m.token_count
                                                )])))
                counter +=1
                yield json_commit

In [69]:
# Increase the limit if you have more commits
limit = 1000

progress = tqdm.tqdm(unit="docs", total=limit)
successes = 0
for ok, action in streaming_bulk(client=esclient, index=index_name, actions=yield_commits(limit), chunk_size=100):
    progress.update(1)
    successes += ok
    
print("Indexed %d/%d documents" % (successes, limit))

  8%|▊         | 779/10000 [13:37<2:41:20,  1.05s/docs]


KeyboardInterrupt: 

## We can now use Kibana for analysis

At this point we have imported a number of commits (together with all their data) equal to `limit`. We can now open localhost at port 5601 to analyze and visualize the data we imported with Kibana, the visualization engine of ElasticSearch. You may need to create an Index Pattern using the Kibaba UI.