# Batch Ingestion

In this notebook we are focusing on [SQL based ingestion](https://druid.apache.org/docs/latest/multi-stage-query/reference.html#sql-reference). 

While [Native Batch Ingestion](https://druid.apache.org/docs/latest/ingestion/native-batch.html) is still available in Apache Druid 26.0.0, the ease of use and improved performance of SQL based ingestion powered by the Multi-stage Query Framework is the quickly becoming the norm.

Batch ingestion is the process of reading raw data from files or other external batch sources tranforming them into well organizing and fully indexed Druid segment files. 

This notebook focuses on the basics of batch ingestion in Druid. 

## Prerequisites

This tutorial works with Druid 25.0.0 or later.

Launch this tutorial and all prerequisites using the `all-services` profile of the Docker Compose file for Jupyter-based Druid tutorials. For more information, see [Docker for Jupyter Notebook tutorials](https://druid.apache.org/docs/latest/tutorials/tutorial-jupyter-docker.html).


<details><summary>    
<b>Run without Docker Compose</b>    
</summary>

In order to run this notebook you will need:

<b>Required Services</b>
* <!-- include list of components needed for notebook, i.e. kafka, druid instance, etc. -->

<b>Python packages</b>
* druidapi, a [Python client for Apache Druid](https://github.com/apache/druid/blob/master/examples/quickstart/jupyter-notebooks/druidapi/README.md)
*  <!-- include any python package dependencies -->
</details>

### Initialization

In [2]:
import druidapi
import os

if (os.environ['DRUID_HOST'] == None):
    druid_host=f"http://router:8888"
else:
    druid_host=f"http://{os.environ['DRUID_HOST']}:8888"

druid = druidapi.jupyter_client(druid_host)

KeyError: 'DRUID_HOST'

## SQL Based Ingestion

Let's start with something simple. With this first example we are simply loading all the data from an external file 


In [2]:
sql = '''
REPLACE INTO "wikipedia_events" OVERWRITE ALL
WITH "ext" AS 
(
    SELECT *
    FROM TABLE(
      EXTERN(
        '{"type":"http","uris":["https://druid.apache.org/data/wikipedia.json.gz"]}',
        '{"type":"json"}'
      )
) EXTEND ("isRobot" VARCHAR, "channel" VARCHAR, "timestamp" VARCHAR, "flags" VARCHAR, "isUnpatrolled" VARCHAR, "page" VARCHAR, "diffUrl" VARCHAR, "added" BIGINT, "comment" VARCHAR, "commentLength" BIGINT, "isNew" VARCHAR, "isMinor" VARCHAR, "delta" BIGINT, "isAnonymous" VARCHAR, "user" VARCHAR, "deltaBucket" BIGINT, "deleted" BIGINT, "namespace" VARCHAR, "cityName" VARCHAR, "countryName" VARCHAR, "regionIsoCode" VARCHAR, "metroCode" BIGINT, "countryIsoCode" VARCHAR, "regionName" VARCHAR))
SELECT
  TIME_PARSE("timestamp") AS "__time",
  *
FROM "ext"
PARTITIONED BY DAY
'''
druid.sql.run_task(sql)

Let's look at the different parts:

The initial `REPLACE` or `INSERT` statement tells Druid to execute an ingestion task. `INSERT` is used when appending data, `REPLACE` when replacing data. Both methods work to add data to a new or empty Druid datasource.

```
REPLACE INTO "wikipedia_events" OVERWRITE ALL
```

The `WITH` clause is used to declare one or more input sources, this could also be placed directly in the `FROM` clause of the final `SELECT`, but this is easier to read:

```
WITH "ext" AS 
(
    SELECT ...
      FROM TABLE (EXTERN ( ... ) )
) EXTEND (...)
```

`EXTERN` supports many batch [input sources](https://druid.apache.org/docs/latest/ingestion/native-batch-input-sources.html) and [formats](https://druid.apache.org/docs/latest/ingestion/data-formats.html). In this case we are using input source type `http` to access a set or `uris` that each contain a data file in the `json` data format. Note that compressed files are allowed and will automatically be decompressed.
```
FROM TABLE(
      EXTERN(
        '{"type":"http","uris":["https://druid.apache.org/data/wikipedia.json.gz"]}',
        '{"type":"json"}'
      )
```
The `EXTEND` clause describes the input schema using SQL data types:
```
EXTEND ("isRobot" VARCHAR, "channel" VARCHAR, "timestamp" VARCHAR,  ...)
```

The final SELECT statement defines the transformations and schema of the resulting Druid table. A `__time` column is usually parsed from the source, this expression will be mapped to Druid's primary time partitioning of segments. In this case we specified the `__time` column and ingested the rest of the columns "AS IS" using `*`.

```
SELECT
  TIME_PARSE("timestamp") AS "__time",
  *
FROM "ext"
```

The final portion of this ingestion is the `PARTITIONED BY DAY` clause which tells Driud to create a separate set of segments for each day. A `PARTITION BY` clause must be included in all `INSERT`/`REPLACE` statements. The [Time Partitioning notebook](03-time-partitioning.ipynb) reviews this option in more detail.

#### Wait for Segment Availibility
In the next cell, `sql_wait_until_ready` function is used to pause until all the ingested data is available in the Historical cacheing layer before executing any queries:

In [3]:
druid.sql.wait_until_ready('wikipedia_events')

#### Query the Data
Let's take a look at the data that was loaded:

In [5]:
druid.display.sql("""
SELECT channel, count(*) 
FROM "wikipedia_events" 
GROUP BY 1 
ORDER BY 2 DESC 
LIMIT 10
""")

channel,EXPR\$1
#en.wikipedia,6650
#sh.wikipedia,3969
#sv.wikipedia,1867
#ceb.wikipedia,1808
#de.wikipedia,1357
#fr.wikipedia,1328
#ru.wikipedia,996
#it.wikipedia,916
#es.wikipedia,708
#ja.wikipedia,472


### Ingesting from Multiple Files

[Druid Input Sources](https://druid.apache.org/docs/latest/ingestion/native-batch.html#splittable-input-sources) allow you to specify multiple files as input to an ingestion job.

In the following example we are using the same file three times as an example of multiple sources. Normally this would be a list of different files to load: 


In [18]:
sql = '''
REPLACE INTO "wikipedia_events_3" OVERWRITE ALL
WITH "ext" AS 
(
    SELECT *
    FROM TABLE(
      EXTERN(
        '{"type":"http",
          "uris":[ "https://druid.apache.org/data/wikipedia.json.gz",
                   "https://druid.apache.org/data/wikipedia.json.gz",
                   "https://druid.apache.org/data/wikipedia.json.gz"
                 ]
         }',
        '{"type":"json"}'
      )
) EXTEND ("isRobot" VARCHAR, "channel" VARCHAR, "timestamp" VARCHAR, "flags" VARCHAR, "isUnpatrolled" VARCHAR, "page" VARCHAR, "diffUrl" VARCHAR, "added" BIGINT, "comment" VARCHAR, "commentLength" BIGINT, "isNew" VARCHAR, "isMinor" VARCHAR, "delta" BIGINT, "isAnonymous" VARCHAR, "user" VARCHAR, "deltaBucket" BIGINT, "deleted" BIGINT, "namespace" VARCHAR, "cityName" VARCHAR, "countryName" VARCHAR, "regionIsoCode" VARCHAR, "metroCode" BIGINT, "countryIsoCode" VARCHAR, "regionName" VARCHAR))
SELECT
  TIME_PARSE("timestamp") AS "__time",
  *
FROM "ext"
PARTITIONED BY DAY
'''
druid.sql.run_task(sql)
druid.sql.wait_until_ready('wikipedia_events_3')


Let's look at the data now, the quantities are 3x, which is expected:

In [9]:
druid.display.sql("""
SELECT channel, count(*) 
FROM "wikipedia_events_3" 
GROUP BY 1 
ORDER BY 2 DESC 
LIMIT 10
""")

channel,EXPR\$1
#en.wikipedia,19950
#sh.wikipedia,11907
#sv.wikipedia,5601
#ceb.wikipedia,5424
#de.wikipedia,4071
#fr.wikipedia,3984
#ru.wikipedia,2988
#it.wikipedia,2748
#es.wikipedia,2124
#ja.wikipedia,1416


#### Context Parameters
Certain aspects of the ingestion can be controlled using [context parameter](https://druid.apache.org/docs/latest/multi-stage-query/reference.html#context-parameters)s. This section discussed two of the commonly used parameters:

##### maxNumTasks
The Multi-stage Query Framework uses parallel workers to execute each stage of the ingestion process. Each stage creates output partitions that organize the data in preparation for the next stage. 

The input stage parallelism is limited by the input sources, as each file is processed by one of the workers. While multiple input files are split evenly among parallel worker tasks. As such a single large file cannot be parallelized at this stage. Consider splitting single large files into multiple files to improve parallelism at this stage. 

After the initial input stage, the level of parallelism of the job will remain consistent and is controlled by the [context parameter](https://druid.apache.org/docs/latest/multi-stage-query/reference.html#context-parameters) `maxNumTasks`

If you are running Druid on your laptop, the default configuration only provides 2 worker slots on the Middle Manager, so you can only run with `maxNumTasks=2` resulting in one controller and one worker. If you are using this notebook against a larger Druid cluster, feel free to experiment with higher values. Note that if `maxNumTasks` exceeds the available worker slots, the job will fail with a time out error because it waits for all the worker tasks to be active.

##### rowsPerSegment
`rowsPerSegment` defaults to 3,000,000. You can adjust it to produce larger or smaller segments. 

This example shows how to set context parameters:

In [16]:
sql = '''
REPLACE INTO "wikipedia_events_4" OVERWRITE ALL
WITH "ext" AS 
(
    SELECT *
    FROM TABLE(
      EXTERN(
        '{"type":"http",
          "uris":[ "https://druid.apache.org/data/wikipedia.json.gz",
                   "https://druid.apache.org/data/wikipedia.json.gz",
                   "https://druid.apache.org/data/wikipedia.json.gz"
                 ]
         }',
        '{"type":"json"}'
      )
) EXTEND ("isRobot" VARCHAR, "channel" VARCHAR, "timestamp" VARCHAR, "flags" VARCHAR, "isUnpatrolled" VARCHAR, "page" VARCHAR, "diffUrl" VARCHAR, "added" BIGINT, "comment" VARCHAR, "commentLength" BIGINT, "isNew" VARCHAR, "isMinor" VARCHAR, "delta" BIGINT, "isAnonymous" VARCHAR, "user" VARCHAR, "deltaBucket" BIGINT, "deleted" BIGINT, "namespace" VARCHAR, "cityName" VARCHAR, "countryName" VARCHAR, "regionIsoCode" VARCHAR, "metroCode" BIGINT, "countryIsoCode" VARCHAR, "regionName" VARCHAR))
SELECT
  TIME_PARSE("timestamp") AS "__time",
  *
FROM "ext"
PARTITIONED BY DAY
'''
request = druid.sql.sql_request( sql)         # init request object
request.add_context( 'rowsPerSegment', 20000) # setting it low to produce many segments
request.add_context( 'maxNumTasks', 2)        # can't go any higher in test env

druid.sql.run_task(request)
druid.sql.wait_until_ready('wikipedia_events_4')

With a `rowsPerSegment` of only 20,000, the same ingestion as before produces more segments. Open the [Druid console in the Data Sources view](http://localhost:8888/unified-console.html#datasources) to see the difference in segments between `wikipedia_events_3` and `wikipedia_events_4`.

Note that 20,000 is a very low value used to illustrate setting parameters. Normally this value is in the millions.

#### Filter Data During Ingestion

In situations where you need data cleansing or your only interested in a subset of the data, the ingestion job can filter the data by simply adding a `WHERE` clause.

The example excludes all robotic wikipedia updates:

In [20]:
sql = '''
REPLACE INTO "wikipedia_events_only_human" OVERWRITE ALL
WITH "ext" AS 
(
    SELECT *
    FROM TABLE(
      EXTERN(
        '{"type":"http",
          "uris":[ "https://druid.apache.org/data/wikipedia.json.gz"]
         }',
        '{"type":"json"}'
      )
) EXTEND ("isRobot" VARCHAR, "channel" VARCHAR, "timestamp" VARCHAR, "flags" VARCHAR, "isUnpatrolled" VARCHAR, "page" VARCHAR, "diffUrl" VARCHAR, "added" BIGINT, "comment" VARCHAR, "commentLength" BIGINT, "isNew" VARCHAR, "isMinor" VARCHAR, "delta" BIGINT, "isAnonymous" VARCHAR, "user" VARCHAR, "deltaBucket" BIGINT, "deleted" BIGINT, "namespace" VARCHAR, "cityName" VARCHAR, "countryName" VARCHAR, "regionIsoCode" VARCHAR, "metroCode" BIGINT, "countryIsoCode" VARCHAR, "regionName" VARCHAR))
SELECT
  TIME_PARSE("timestamp") AS "__time",
  *
FROM "ext"

WHERE "isRobot"='false'

PARTITIONED BY DAY
'''

druid.sql.run_task(sql)
druid.sql.wait_until_ready('wikipedia_events_only_human')

In [21]:
druid.display.sql("""
SELECT isRobot, channel, count(*) 
FROM "wikipedia_events_only_human" 
GROUP BY 1,2 
ORDER BY 3 DESC 
LIMIT 10
""")

isRobot,channel,EXPR\$2
False,#en.wikipedia,6114
False,#de.wikipedia,1171
False,#fr.wikipedia,1148
False,#ru.wikipedia,930
False,#es.wikipedia,658
False,#it.wikipedia,494
False,#ja.wikipedia,467
False,#zh.wikipedia,382
False,#pt.wikipedia,348
False,#nl.wikipedia,299


#### Transform Data During Ingestion

The SQL language provides a rich [set of functions](https://druid.apache.org/docs/latest/querying/sql-scalar.html) that can be applied to input columns to transform the data as it is being ingested. All scalar SQL function are available for normal ingestion. Rollup ingestion is discussed in the [Rollup Notebook](05-rollup.ipynb) which includes the use of aggregate functions at ingestion time as well.

Here are some examples of such transformations:

##### Time manipulation
There are many [time parsing and manipulation functions](https://druid.apache.org/docs/latest/querying/sql-scalar.html#date-and-time-functions) available in Apache Druid. It is common to do some time cleansing/transformation at ingestion. Here are some examples of time manipulation functions:
```
  TIME_PARSE( "timestamp") AS "__time",   
  TIME_FLOOR( TIME_PARSE( "timestamp"), 'P1W') AS "week_start",
  TIMESTAMPDIFF( DAY,
                 TIME_FLOOR( TIME_PARSE( "timestamp"), 'P1W'),
                 TIME_PARSE( "timestamp")
               ) AS "days_since_week_start"
   
```

##### Use CASE statements to transform data
CASE statements can be used to resolve complex logic and prepare columns for certain query patterns. 
Examples:
```
  CASE
     WHEN UPPER("adblock_list")='NOADBLOCK' THEN 0
     ELSE 1
  END AS adblock_count,

  CASE
     WHEN UPPER("adblock_list")='EASYLIST' THEN 1
     ELSE 0
  END AS easylist_count
```
The two case statements above are examples of converting a categorical column like `adblock_list` into a numerical column that can be used as a meaningful metric when aggregated across different dimensions to get the count of events that were affected by an ad blocker.

##### String manipulation
Apache Druid has [string manipulation functions](https://druid.apache.org/docs/latest/querying/sql-scalar.html#string-functions) that can be very useful for transformation during ingestion. Some examples:
```
  REPLACE(REGEXP_EXTRACT("app_version", '[^\.]*\.'),'.','') AS major_version,
  STRING_TO_ARRAY("app_version",'\.') AS version_array,
  ARRAY_ORDINAL(STRING_TO_ARRAY("app_version",'\.'),3) AS patch_version
```
The above makes use of regex-based extraction, string replacement, string to array conversion and access to array elements as examples of the string transformation functions available.

##### Data Flattening functions
If you need to extract fields from nested structures in the input data, JSON_VALUE function can be used to retrieve them and cast them to the desired data type:
```
  JSON_VALUE("event", '$.percentage' RETURNING BIGINT) as percent_cleared,
  JSON_VALUE("geo_ip", '$.city') AS city,
```

Here's a SQL based ingestion statement that uses all of these examples and a few more:

In [1]:
sql = '''
REPLACE INTO "kttm_transformation" OVERWRITE ALL
WITH "ext" AS 
(
    SELECT *
    FROM TABLE(
      EXTERN(
        '{"type":"http","uris":["https://static.imply.io/example-data/kttm-nested-v2/kttm-nested-v2-2019-08-25.json.gz"]}',
        '{"type":"json"}'
      )
    ) EXTEND ("timestamp" VARCHAR, "session" VARCHAR, "number" VARCHAR, "event" TYPE('COMPLEX<json>'), "agent" TYPE('COMPLEX<json>'), "client_ip" VARCHAR, "geo_ip" TYPE('COMPLEX<json>'), "language" VARCHAR, "adblock_list" VARCHAR, "app_version" VARCHAR, "path" VARCHAR, "loaded_image" VARCHAR, "referrer" VARCHAR, "referrer_host" VARCHAR, "server_ip" VARCHAR, "screen" VARCHAR, "window" VARCHAR, "session_length" BIGINT, "timezone" VARCHAR, "timezone_offset" VARCHAR)
)
SELECT
  session, 
  number,
  TIME_PARSE("timestamp") AS "__time",
  TIMESTAMPDIFF(DAY, TIME_FLOOR(TIME_PARSE("timestamp"), 'P1W'), TIME_PARSE("timestamp")) AS days_since_week_start,
  TIME_FLOOR(TIME_PARSE("timestamp"), 'P1W') AS week_start,
  TIME_CEIL(TIME_PARSE("timestamp"), 'P1W') AS week_end,
  TIME_SHIFT(TIME_FLOOR(TIME_PARSE("timestamp"), 'P1D'),'P1D', -1) AS start_of_yesterday,
  
  JSON_VALUE("event", '$.percentage' RETURNING BIGINT) as percent_cleared,
  JSON_VALUE("geo_ip", '$.city') AS city,
  
  CASE WHEN UPPER("adblock_list")='NOADBLOCK' THEN 0 ELSE 1 END AS adblock_count,
  CASE WHEN UPPER("adblock_list")='EASYLIST' THEN 1 ELSE 0 END AS easylist_count,
  
  REPLACE(REGEXP_EXTRACT("app_version", '[^\.]*\.'),'.','') AS major_version
  -- ,
  -- ARRAY_ORDINAL(STRING_TO_ARRAY("app_version",'\.'),2) AS minor_version,
  -- ARRAY_ORDINAL(STRING_TO_ARRAY("app_version",'\.'),3) AS patch_version
FROM "ext"
PARTITIONED BY DAY
'''

druid.sql.run_task(sql)
druid.sql.wait_until_ready('kttm_transformation')

NameError: name 'druid' is not defined

In [None]:
druid.display.sql("""
SELECT session, count(distinct "week_start")
FROM "kttm_transformation" 
GROUP BY 1 
ORDER BY 2 DESC 
LIMIT 10
""")

#### Nested Columns

Apache Druid supports ingestion of [nested columns](https://druid.apache.org/docs/latest/querying/nested-columns.html). These are columns whose values  contain nested structures with its own set of fields which in turn are either a literal value or a nested structure as well. Druid can automatically parse nested columns and index all internal fields into columnar form making them all available for fast filtering and aggregation just as if they were top level columns. The schema of the nested columns is automatically discovered and access to the columns is through familiar JSON paths by using the JSON_VALUE function.

This example load the Koalas to the Max sample dataset that includes multiple nested columns:


In [34]:
sql = '''
REPLACE INTO "kttm_nested" OVERWRITE ALL
WITH "ext" AS 
(
    SELECT *
    FROM TABLE(
      EXTERN(
        '{"type":"http","uris":["https://static.imply.io/example-data/kttm-nested-v2/kttm-nested-v2-2019-08-25.json.gz"]}',
        '{"type":"json"}'
      )
    ) EXTEND ( "timestamp" VARCHAR, "session" VARCHAR, "number" VARCHAR, 
               "event" TYPE('COMPLEX<json>'), 
               "agent" TYPE('COMPLEX<json>'), 
               "client_ip" VARCHAR, 
               "geo_ip" TYPE('COMPLEX<json>'), 
               "language" VARCHAR, "adblock_list" VARCHAR, "app_version" VARCHAR, 
               "path" VARCHAR, "loaded_image" VARCHAR, "referrer" VARCHAR, 
               "referrer_host" VARCHAR, "server_ip" VARCHAR, 
               "screen" VARCHAR, "window" VARCHAR, 
               "session_length" BIGINT, "timezone" VARCHAR, 
               "timezone_offset" VARCHAR)
)
SELECT
  TIME_PARSE("timestamp") AS "__time", *
FROM "ext"
PARTITIONED BY DAY
'''
druid.sql.run_task(sql)
druid.sql.wait_until_ready('kttm_nested')

TypeError: 'str' object is not callable

As you can see ingesting nested columns is very easy. All you need to do is declare them as `TYPE('COMPLEX<json>')`, include the input field in the main SELECT clause ( `*` = all columns ) and you're done!
Take a look at the query example below where we access these nested fields as dimensions, metrics and filters:

#### Enhancing Data at Ingestion

- Lookups - Broadcast Joins
- Fact-to-Fact - Shuffle Joins

### Cleanup

Run the following cell to remove all data sources created in this notebook.

In [17]:
druid.datasources.drop('wikipedia_events', True)
druid.datasources.drop('wikipedia_events_3', True)
druid.datasources.drop('wikipedia_events_4', True)
druid.datasources.drop('wikipedia_events_only_human', True)
druid.datasources.drop('kttm_transformation', True)
