<div style="display:flex;">
<img alt="New Relic" style="display:block;height:64px" src="https://gitlab.com/softbutterfly/open-source/open-source-office/-/raw/master/banners/borderless/brands/new_relic.png" />
<img alt="SoftButterfly" style="display:block;height:64px;margin-left:auto" src="https://gitlab.com/softbutterfly/open-source/open-source-office/-/raw/master/banners/borderless/softbutterfly.png" />
</div>

# New Relic Playground 6: Fetch Data

## Imports

Python Imports

In [None]:
import itertools
import multiprocessing
import os
import time
from datetime import datetime, timedelta
from queue import Queue
from threading import Thread

Third-party libraries

In [None]:
import dotenv
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from newrelic_sb_sdk.client import NewRelicGqlClient
from newrelic_sb_sdk.graphql import nerdgraph
from newrelic_sb_sdk.graphql.objects import RootMutationType, RootQueryType
from newrelic_sb_sdk.utils.response import get_response_data, print_response
from sgqlc.operation import Operation

## Client setup

To setup the client, first we need to open load the credentials from environment variables.

In [None]:
env_file = "../.env"

dotenv.load_dotenv(env_file)

NEW_RELIC_USER_KEY = os.environ.get("NEW_RELIC_USER_KEY", None)

if NEW_RELIC_USER_KEY is None:
    raise ValueError(
        "Environment variable NEW_RELIC_USER_KEY is not set.",
    )

NEW_RELIC_ACCOUNT_ID = os.environ.get("NEW_RELIC_ACCOUNT_ID", None)

if NEW_RELIC_ACCOUNT_ID is None:
    raise ValueError(
        "Environment variable NEW_RELIC_ACCOUNT_ID is not set.",
    )

With environment varaibles loaded, we can proceed to instantiate the client

In [None]:
newrelic = NewRelicGqlClient(new_relic_user_key=NEW_RELIC_USER_KEY)

## Client testing

In order to use an test the client we need configure `query_type` and `mutation_type` for the `nerdgraph` schema

In [None]:
nerdgraph.query_type = RootQueryType
nerdgraph.mutation_type = RootMutationType

For testing we will use a simple query in GraphQL to get the atttributes from our user

```gql
query {
  actor {
    user {
      email
      id
      name
    }
  }
}
```

This query will be build from the `nerdgraph` schema.

In [None]:
# Declare an operation from the `query_type` in `nerdgraph`
operation = Operation(nerdgraph.query_type)

# Get the fields `id`, `email`, `name` from the `user` entity inside
# `actor`
operation.actor.user.__fields__("id", "email", "name")

This operation can be transformed into a GraphQL query

In [None]:
query = operation.__to_graphql__()
print(query)

And this query is the one we send to be executed by our client.

In [None]:
response = newrelic.execute(query)

The response data obtained is

In [None]:
print_response(response)

We can also use the raw query directly written by hand. For this is recomendable to use the `build_query` method in order to get a clean query.

In [None]:
query = newrelic.build_query(
    """
        {
            actor {
                user {
                    email
                    name
                    id
                }
            }
        }
    """
)

And execute directly, as in the previous execution, with the same result

In [None]:
response = newrelic.execute(query)
print_response(response)

## Playground area

## Data extraction

### The case of study and its parameters

Suppose we need to get logs from a specific application on a certain day. Our NRQL query will look like this

```sql
FROM
    Log
SELECT
    timestmap, message
WHERE
    `entity.name` = 'MyApplicationName'
LIMIT
    MAX 
SINCE
    '2023-01-01 00:00'
UNTIL
    '2023-01-02 00:00'
WITH TIMEZONE
    'America/Lima'
```

You can change the clauses values according to your interests, and then forma them in the following way

In [None]:
EVENT_TYPE = "Log"
SELECT_FIELDS_TEMPLATE = "`timestamp`, `message`"
WHERE_TEMPLATE = "WHERE `entity.name` = 'MyApplicationName'"

Aditionally, consider defining the following variables

In [None]:
SELECT_COUNT_TEMPLATE = "count(*)"
timeseries_template = "TIMESERIES 1 hour"

This variables can allow us to transform the previous NRQL query into

```sql
FROM
    Log
SELECT
    count(*)
WHERE
    `entity.name` = 'MyApplicationName'
LIMIT
    MAX 
SINCE
    '2023-01-01 00:00'
UNTIL
    '2023-01-02 00:00'
WITH TIMEZONE
    'America/Lima'
TIMESERIES
    1 hour
```

to get the time distribution of our data.

Finally we can define the time range variables to get our data and a file description to refer what we are getting

In [None]:
START_TIME = datetime(2023, 1, 1, 0, 0, 0)  # From 1/1/2023 at 00:00
END_TIME = datetime(2023, 1, 2, 0, 0, 0)  # Until 2/1/2023 at 00:00

In [None]:
FILE_DESCRIPTION = "my_application_name_logs"

### The data fetch function

To fetch data we will use the following function

In [None]:
def fetch_data(*, template, params, start_time, end_time, key_path):
    query = newrelic.build_query(  # pylint: disable=redefined-outer-name
        template,
        params={
            **params,
            "start_time": start_time.strftime("%Y-%m-%d %H:%M:%S"),
            "end_time": end_time.strftime("%Y-%m-%d %H:%M:%S"),
        },
    )

    data = None  # pylint: disable=redefined-outer-name
    response = None  # pylint: disable=redefined-outer-name

    try:
        response = newrelic.execute(query)
        data = get_response_data(response, key_path=key_path)
    except Exception as e:  # pylint: disable=broad-except
        print(
            f"Error fetching data from New Relic since {start_time} "
            + f"to {end_time}: {e}"
        )
        if response is not None:
            print_response(response)

        print("Retry again in 1 second")
        time.sleep(1)

        return fetch_data(
            template=template,
            params=params,
            start_time=start_time,
            end_time=end_time,
            key_path=key_path,
        )

    return data

### Counting data entries

Using the variables from the previuos parts, we can define our query and get the ampount of Log entries present in the range of time of our interest

In [None]:
query_template = """
    {
        actor {
            nrql(
                query: "FROM %(event_type)s SELECT %(select_template)s %(where_template)s SINCE '%(start_time)s' UNTIL '%(end_time)s' WITH TIMEZONE 'America/Lima' LIMIT MAX %(timeseries_template)s",
                accounts: %(account_id)s,
                timeout: 50,
            ) {
                results
            }
        }
    }
"""

query_params = {
    "event_type": EVENT_TYPE,
    "select_template": SELECT_COUNT_TEMPLATE,
    "where_template": WHERE_TEMPLATE,
    "timeseries_template": "",
    "account_id": NEW_RELIC_ACCOUNT_ID,
}

entries_count = fetch_data(
    template=query_template,
    params=query_params,
    start_time=START_TIME,
    end_time=END_TIME,
    key_path="nrql:results:0:count",
)

print(f"Entries count: {entries_count}")

### Data distribution over time

We can also look for data distribution over time

In [None]:
query_template = """
    {
        actor {
            nrql(
                query: "FROM %(event_type)s SELECT %(select_template)s %(where_template)s SINCE '%(start_time)s' UNTIL '%(end_time)s' WITH TIMEZONE 'America/Bogota' LIMIT MAX %(timeseries_template)s",
                accounts: %(account_id)s,
                timeout: 50,
            ) {
                results
            }
        }
    }
"""

query_params = {
    "event_type": EVENT_TYPE,
    "select_template": SELECT_COUNT_TEMPLATE,
    "where_template": WHERE_TEMPLATE,
    "timeseries_template": timeseries_template,
    "account_id": NEW_RELIC_ACCOUNT_ID,
}

results = fetch_data(
    template=query_template,
    params=query_params,
    start_time=START_TIME,
    end_time=END_TIME,
    key_path="nrql:results",
)

And plot it here to saw how it looks

In [None]:
%matplotlib inline

In [None]:
width = results[0]["endTimeSeconds"] - results[0]["beginTimeSeconds"]
data = [
    (
        datetime.fromtimestamp(
            (record["endTimeSeconds"] + record["beginTimeSeconds"]) // 2
        ),
        record["count"],
    )
    for record in results
]
x, y = zip(*data)
x = np.array(x)
y = np.array(y)

figure_handler = plt.figure(figsize=(15, 5))
axes_handler = figure_handler.add_subplot(111)
axes_handler.plot(x, y, "r-")
axes_handler.set_xlabel("Time")
axes_handler.set_ylabel("Count")
axes_handler.set_title(f"{EVENT_TYPE} count")
axes_handler.grid(axis="both", alpha=0.75)
figure_handler.tight_layout()
figure_handler.savefig(
    f"plot__{EVENT_TYPE}__{'_'.join(FILE_DESCRIPTION.split(' '))}"
    + f"__{START_TIME:%Y%m%dT%H%M%S}__{END_TIME:%Y%m%dT%H%M%S}.png",
)
plt.show()

### Data retrieval

Due to the New Relic restriction, if we have more than 2000 entries in our time range, we need to break it into small parts and iterate over them until we get all the records. But keep in mind that there are some race conditions.

In [None]:
def fetch_wide_range_data(
    *,
    worker,
    template,
    params,
    start_time,
    end_time,
    entries_count,  # pylint: disable=redefined-outer-name
):
    total_time = end_time - start_time
    remaining_time = end_time - start_time

    data = []  # pylint: disable=redefined-outer-name
    data_df = None  # pylint: disable=redefined-outer-name

    current_end_time = end_time

    while current_end_time > start_time:
        print(
            f"[Worker {worker.order}:{worker.job}] QUERY FROM "
            + f"[{start_time.strftime('%Y-%m-%d %H:%M:%S')}] TO "
            + f"[{current_end_time.strftime('%Y-%m-%d %H:%M:%S')}]"
        )

        data = fetch_data(
            template=template,
            params=params,
            start_time=start_time,
            end_time=current_end_time,
            key_path="nrql:results",
        )

        if data is None or len(data) == 0:
            current_end_time = start_time
        else:
            temp_df = pd.DataFrame(data)
            temp_df["timestamp"] = pd.to_datetime(
                temp_df["timestamp"],
                unit="ms",
            )
            temp_df = temp_df.sort_values(
                "timestamp",
                ascending=False,
            ).reset_index(
                drop=True,
            )

            if data_df is None:
                data_df = temp_df

            else:
                data_df = pd.concat([temp_df, data_df], axis=0)

            data_df = (
                data_df.sort_values("timestamp", ascending=False)
                .reset_index(drop=True)
                .drop_duplicates()
            )

            print(
                f"[Worker {worker.order}:{worker.job}] DATA FROM "
                + f"[{start_time.strftime('%Y-%m-%d %H:%M:%S.%f')}] TO "
                + f"[{current_end_time.strftime('%Y-%m-%d %H:%M:%S.%f')}]: "
                + f"{data_df.shape[0]} records of {entries_count}"
            )

            if data_df.shape[0] == entries_count:
                current_end_time = start_time
            else:
                current_end_time = pd.to_datetime(
                    data_df["timestamp"].values[-1],
                )
                current_end_time = current_end_time + timedelta(
                    microseconds=1000000 - current_end_time.microsecond
                )
                remaining_time = current_end_time - start_time

            if remaining_time > total_time:
                print("loop detected")
                break

    return data_df

We usually have tens or hundreds of thousands of logs for long periods, so to speed up the download of logs we can do it simultaneously. Fot this we need to define a `Worker` class which will handle the download jobs

In [None]:
class Worker(Thread):
    def __init__(self, *, queue, order):
        Thread.__init__(self)
        self.results = []
        self.queue = queue
        self.order = order
        self.job = None

    def run(self):
        while True:
            job, (start_time, end_time) = self.queue.get()

            if job is None:
                break

            self.job = job

            print(f"[Worker {self.order}:{self.job}] Started")
            print(
                f"[Worker {self.order}:{self.job}] Getting data since "
                + f"{start_time.strftime('%Y-%m-%d %H:%M:%S.%f')} until "
                + f"{end_time.strftime('%Y-%m-%d %H:%M:%S.%f')}"
            )

            query_template = ""  # pylint: disable=redefined-outer-name
            query_params = None  # pylint: disable=redefined-outer-name

            # -------------------------------------------------------------------------
            # Start User modifications
            # -------------------------------------------------------------------------
            # Modify your query here
            query_template = """
                {
                    actor {
                        nrql(
                            query: "FROM %(event_type)s SELECT %(select_template)s %(where_template)s SINCE '%(start_time)s' UNTIL '%(end_time)s' WITH TIMEZONE 'America/Bogota' LIMIT MAX %(timeseries_template)s",
                            accounts: %(account_id)s,
                            timeout: 50,
                        ) {
                            results
                        }
                    }
                }
            """

            query_params = {
                "event_type": EVENT_TYPE,
                "select_template": SELECT_COUNT_TEMPLATE,
                "where_template": WHERE_TEMPLATE,
                "timeseries_template": "",
                "account_id": NEW_RELIC_ACCOUNT_ID,
            }
            # -------------------------------------------------------------------------
            # End User modifications
            # -------------------------------------------------------------------------

            # Get entries count
            entries_count = fetch_data(  # pylint: disable=redefined-outer-name
                template=query_template,
                params=query_params,
                start_time=start_time,
                end_time=end_time,
                key_path="nrql:results:0:count",
            )
            print(
                f"[Worker {self.order}:{self.job}] Total entries count: {entries_count}"
            )

            # -------------------------------------------------------------------------
            # Start User modifications
            # -------------------------------------------------------------------------
            # Modify your query here
            query_template = """
                {
                    actor {
                        nrql(
                            query: "FROM %(event_type)s SELECT %(select_template)s %(where_template)s SINCE '%(start_time)s' UNTIL '%(end_time)s' WITH TIMEZONE 'America/Bogota' LIMIT MAX %(timeseries_template)s",
                            accounts: %(account_id)s,
                            timeout: 50,
                        ) {
                            results
                        }
                    }
                }
            """

            query_params = {
                "event_type": EVENT_TYPE,
                "select_template": SELECT_FIELDS_TEMPLATE,
                "where_template": WHERE_TEMPLATE,
                "timeseries_template": "",
                "account_id": NEW_RELIC_ACCOUNT_ID,
            }
            # -------------------------------------------------------------------------
            # End User modifications
            # -------------------------------------------------------------------------
            self.results.append(
                fetch_wide_range_data(
                    worker=self,
                    template=query_template,
                    params=query_params,
                    start_time=start_time,
                    end_time=end_time,
                    entries_count=entries_count,
                )
            )

        return self.results

and a function to handle the time splitting task and the whole download process

In [None]:
def perform_parallel_data_fetch(*, start_time, end_time, interval_duration):
    timer = datetime.now()
    intervals = range(
        int(start_time.timestamp()),
        int(end_time.timestamp() + interval_duration.total_seconds()),
        int(interval_duration.total_seconds()),
    )
    intervals = [datetime.fromtimestamp(interval) for interval in intervals]
    intervals = list(zip(intervals[0:-1], intervals[1:]))

    queue = Queue()
    empty_job = (None, (None, None))

    workers = []
    workers_count = min(multiprocessing.cpu_count(), len(intervals))

    results = []  # pylint: disable=redefined-outer-name

    for job in enumerate(intervals):
        queue.put(job)

    for _ in range(workers_count):
        queue.put(empty_job)

    for order in range(workers_count):
        worker = Worker(order=order, queue=queue)
        worker.start()
        workers.append(worker)

    for worker in workers:
        worker.join()

    for worker in workers:
        results.append(worker.results)

    print("Total executin time: ", datetime.now() - timer)

    return results

Once the above utilities are defined we can use them to download our data in wide time intervals

In [None]:
WIDE_RANGE_START_TIME = datetime(2023, 1, 1, 0, 0, 0)  # From 1/1/2023 at 00:00
WIDE_RANGE_END_TIME = datetime(2023, 2, 1, 0, 0, 0)  # Until 1/2/2023 at 00:00

INTERVAL_DURATION = timedelta(seconds=600)

In [None]:
query_template = """
    {
        actor {
            nrql(
                query: "FROM %(event_type)s SELECT %(select_template)s %(where_template)s SINCE '%(start_time)s' UNTIL '%(end_time)s' WITH TIMEZONE 'America/Bogota' LIMIT MAX %(timeseries_template)s",
                accounts: %(account_id)s,
                timeout: 50,
            ) {
                results
            }
        }
    }
"""

query_params = {
    "event_type": EVENT_TYPE,
    "select_template": SELECT_COUNT_TEMPLATE,
    "where_template": WHERE_TEMPLATE,
    "timeseries_template": "",
    "account_id": NEW_RELIC_ACCOUNT_ID,
}

entries_count = fetch_data(
    template=query_template,
    params=query_params,
    start_time=WIDE_RANGE_START_TIME,
    end_time=WIDE_RANGE_END_TIME,
    key_path="nrql:results:0:count",
)

print(f"Entries count: {entries_count}")

In [None]:
results = perform_parallel_data_fetch(
    start_time=WIDE_RANGE_START_TIME,
    end_time=WIDE_RANGE_END_TIME,
    interval_duration=INTERVAL_DURATION,
)

### Data saving

Finally we can save our results in a file

In [None]:
data_df = pd.concat(
    [result for result in itertools.chain(*results) if result is not None],
)

data_df = data_df.drop_duplicates()
data_df = data_df.sort_values(by="timestamp")
data_df = data_df.reset_index(drop=True)
data_df

In [None]:
data_df.to_csv(
    f"{EVENT_TYPE}__{'_'.join(FILE_DESCRIPTION.split(' '))}__"
    + f"{WIDE_RANGE_START_TIME:%Y%m%dT%H%M%S}__"
    + f"{WIDE_RANGE_END_TIME:%Y%m%dT%H%M%S}.csv",
    index=False,
)