# Batch ingestion
<!--
  ~ Licensed to the Apache Software Foundation (ASF) under one
  ~ or more contributor license agreements.  See the NOTICE file
  ~ distributed with this work for additional information
  ~ regarding copyright ownership.  The ASF licenses this file
  ~ to you under the Apache License, Version 2.0 (the
  ~ "License"); you may not use this file except in compliance
  ~ with the License.  You may obtain a copy of the License at
  ~
  ~   http://www.apache.org/licenses/LICENSE-2.0
  ~
  ~ Unless required by applicable law or agreed to in writing,
  ~ software distributed under the License is distributed on an
  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  ~ KIND, either express or implied.  See the License for the
  ~ specific language governing permissions and limitations
  ~ under the License.
  -->
  
In this notebook we are focusing on [SQL based ingestion](https://druid.apache.org/docs/latest/multi-stage-query/reference.html#sql-reference). 

Batch ingestion is the process that reads raw data from files or other external batch sources and transforms them into time partitioned and fully indexed [Druid segment files](https://druid.apache.org/docs/latest/design/segments). 

This notebook focuses on the basics of batch ingestion in Druid, you will:

- Ingest data from one or more external files
- Learn to use the context parameters to control the parallelism of ingestion
- Filter data at during ingestion
- Apply transformations during ingestion
- Ingested nested JSON columns
- Enhance the data during ingestion using joins 

## Prerequisites

This tutorial works with Druid 29.0.0 or later.

Launch this tutorial and all prerequisites using the `druid-jupyter` profile of the Docker Compose file for Jupyter-based Druid tutorials. For more information, see [Learn Druid Project page](https://github.com/implydata/learn-druid/#readme).


## Initialization

In [None]:
import druidapi
import os

if (os.environ['DRUID_HOST'] == None):
    druid_host=f"http://router:8888"
else:
    druid_host=f"http://{os.environ['DRUID_HOST']}:8888"

druid = druidapi.jupyter_client(druid_host)
display = druid.display
sql_client = druid.sql
status_client = druid.status

status_client.version

## SQL based ingestion

Run the following cell to load data from an external file into the "example-wikipedia-batch" table.

In [None]:
sql = '''
REPLACE INTO "example-wikipedia-batch" OVERWRITE ALL
WITH "ext" AS 
(
    SELECT *
    FROM TABLE(
      EXTERN(
        '{"type":"http","uris":["https://druid.apache.org/data/wikipedia.json.gz"]}',
        '{"type":"json"}'
      )
) EXTEND ("isRobot" VARCHAR, "channel" VARCHAR, "timestamp" VARCHAR, "flags" VARCHAR, "isUnpatrolled" VARCHAR, "page" VARCHAR, "diffUrl" VARCHAR, "added" BIGINT, "comment" VARCHAR, "commentLength" BIGINT, "isNew" VARCHAR, "isMinor" VARCHAR, "delta" BIGINT, "isAnonymous" VARCHAR, "user" VARCHAR, "deltaBucket" BIGINT, "deleted" BIGINT, "namespace" VARCHAR, "cityName" VARCHAR, "countryName" VARCHAR, "regionIsoCode" VARCHAR, "metroCode" BIGINT, "countryIsoCode" VARCHAR, "regionName" VARCHAR))
SELECT
  TIME_PARSE("timestamp") AS "__time",
  *
FROM "ext"
PARTITIONED BY DAY
'''
druid.display.run_task(sql)
druid.sql.wait_until_ready('example-wikipedia-batch')

REPLACE or INSERT at the beginning of the statement tells Druid to execute an ingestion task. INSERT is used when appending data, REPLACE when replacing data. Both methods work when adding data to a new or empty Druid datasource. The OVERWRITE ALL clause means that the whole datasource will be replaced with the result of this ingestion. 

```
REPLACE INTO "example-wikipedia-batch" OVERWRITE ALL
```

The WITH clause is used to declare one or more input sources, this could also be placed directly in the FROM clause of the final SELECT, but this is easier to read:

```
WITH "ext" AS 
(
    SELECT ...
      FROM TABLE (EXTERN ( ... ) )
) EXTEND (...)
```

EXTERN supports many batch [input sources](https://druid.apache.org/docs/latest/ingestion/native-batch-input-sources.html) and [formats](https://druid.apache.org/docs/latest/ingestion/data-formats.html). In this case the SQL statement uses input source type "http" to access a set or "uris" that each contain a data file in the "json" data format. Note that compressed files are allowed and will automatically be decompressed.
```
FROM TABLE(
      EXTERN(
        '{"type":"http","uris":["https://druid.apache.org/data/wikipedia.json.gz"]}',
        '{"type":"json"}'
      )
```
The [EXTEND clause describes the input schema](https://druid.apache.org/docs/latest/multi-stage-query/reference#extern-function) using SQL data types:
```
EXTEND ("isRobot" VARCHAR, "channel" VARCHAR, "timestamp" VARCHAR,  ...)
```

The final SELECT statement defines the transformations and schema of the resulting Druid table. A "__time" column is usually parsed from the source, this expression will be mapped to Druid's primary time partitioning of segments. In this case we specified the "__time" column and ingested the rest of the columns as defined in the EXTEND clause using "*".

```
SELECT
  TIME_PARSE("timestamp") AS "__time",
  *
FROM "ext"
```

The final portion of this ingestion is the PARTITIONED BY DAY clause which tells Driud to create a separate set of segments for each day. A PARTITION BY clause must be included in all INSERT/REPLACE statements.

##### Wait for Segment Availibility:
The `sql_wait_until_ready` function is used to pause until all the ingested data is available in the Historical cacheing layer before executing any queries.

#### Query the data
Let's take a look at the data that was loaded:

In [None]:
sql = """
SELECT channel,  count(*) num_events
FROM "example-wikipedia-batch" 
WHERE __time BETWEEN '2016-06-27' AND '2016-06-28'
GROUP BY 1 
ORDER BY 2 DESC 
LIMIT 10
"""
druid.display.sql(sql)

## Ingesting from multiple files

[Druid Input Sources](https://druid.apache.org/docs/latest/ingestion/native-batch.html#splittable-input-sources) allow you to specify multiple files as input to an ingestion job.

In the following example we are using the same file three times simulating multiple source files with the same schema. Normally this would be a list of different files to load: 


In [None]:
sql = '''
REPLACE INTO "example-wikipedia-3-batch" OVERWRITE ALL
WITH "ext" AS 
(
    SELECT *
    FROM TABLE(
      EXTERN(
        '{"type":"http",
          "uris":[ "https://druid.apache.org/data/wikipedia.json.gz",
                   "https://druid.apache.org/data/wikipedia.json.gz",
                   "https://druid.apache.org/data/wikipedia.json.gz"
                 ]
         }',
        '{"type":"json"}'
      )
) EXTEND ("isRobot" VARCHAR, "channel" VARCHAR, "timestamp" VARCHAR, "flags" VARCHAR, "isUnpatrolled" VARCHAR, "page" VARCHAR, "diffUrl" VARCHAR, "added" BIGINT, "comment" VARCHAR, "commentLength" BIGINT, "isNew" VARCHAR, "isMinor" VARCHAR, "delta" BIGINT, "isAnonymous" VARCHAR, "user" VARCHAR, "deltaBucket" BIGINT, "deleted" BIGINT, "namespace" VARCHAR, "cityName" VARCHAR, "countryName" VARCHAR, "regionIsoCode" VARCHAR, "metroCode" BIGINT, "countryIsoCode" VARCHAR, "regionName" VARCHAR))
SELECT
  TIME_PARSE("timestamp") AS "__time",
  *
FROM "ext"
PARTITIONED BY DAY
'''
druid.display.run_task(sql)
druid.sql.wait_until_ready('example-wikipedia-3-batch')


Let's look at the data now. The quantities are 3 times larger than before because we loaded the same file three times:

In [None]:
druid.display.sql("""
SELECT channel, count(*) num_events
FROM "example-wikipedia-3-batch" 
WHERE __time BETWEEN '2016-06-27' AND '2016-06-28'
GROUP BY 1 
ORDER BY 2 DESC 
LIMIT 10
""")

## Context parameters
Certain aspects of the ingestion can be controlled using [context parameter](https://druid.apache.org/docs/latest/multi-stage-query/reference.html#context-parameters)s. This section discussed two of the commonly used parameters:

##### maxNumTasks
The Multi-stage Query Framework uses parallel workers to execute each stage of the ingestion process. Each stage creates output partitions that organize the data in preparation for the next stage. 

The input stage parallelism is limited by the input sources, as each file is processed by one of the workers. While multiple input files are split evenly among parallel worker tasks. As such a single large file cannot be parallelized at this stage. You can split very large files into smaller ones to improve parallelism for the input stage.

After the initial input stage, the level of parallelism of the job will remain consistent and is controlled by the [context parameter](https://druid.apache.org/docs/latest/multi-stage-query/reference.html#context-parameters) `maxNumTasks`

If you are running Druid on your laptop, the default configuration only provides 4 worker slots on the Middle Manager, so you can only run with `maxNumTasks=4` resulting in one controller and one worker. If you are using this notebook against a larger Druid cluster, feel free to experiment with higher values. Note that, if `maxNumTasks` exceeds the available worker slots, the job will fail with a timeout error because it waits for all the worker tasks to be active.

##### rowsPerSegment
`rowsPerSegment` defaults to 3,000,000. You can adjust it to produce larger or smaller segments. 

This example shows how to set context parameters:

In [None]:
sql = '''
REPLACE INTO "example-wikipedia-4-batch" OVERWRITE ALL
WITH "ext" AS 
(
    SELECT *
    FROM TABLE(
      EXTERN(
        '{"type":"http",
          "uris":[ "https://druid.apache.org/data/wikipedia.json.gz",
                   "https://druid.apache.org/data/wikipedia.json.gz",
                   "https://druid.apache.org/data/wikipedia.json.gz"
                 ]
         }',
        '{"type":"json"}'
      )
) EXTEND ("isRobot" VARCHAR, "channel" VARCHAR, "timestamp" VARCHAR, "flags" VARCHAR, "isUnpatrolled" VARCHAR, "page" VARCHAR, "diffUrl" VARCHAR, "added" BIGINT, "comment" VARCHAR, "commentLength" BIGINT, "isNew" VARCHAR, "isMinor" VARCHAR, "delta" BIGINT, "isAnonymous" VARCHAR, "user" VARCHAR, "deltaBucket" BIGINT, "deleted" BIGINT, "namespace" VARCHAR, "cityName" VARCHAR, "countryName" VARCHAR, "regionIsoCode" VARCHAR, "metroCode" BIGINT, "countryIsoCode" VARCHAR, "regionName" VARCHAR))
SELECT
  TIME_PARSE("timestamp") AS "__time",
  *
FROM "ext"
PARTITIONED BY DAY
'''
request = druid.sql.sql_request( sql)         # init request object
request.add_context( 'rowsPerSegment', 20000) # setting it low to produce many segments
request.add_context( 'maxNumTasks', 4)        # can't go any higher in learning environment

druid.display.run_task(request)
druid.sql.wait_until_ready('example-wikipedia-4-batch')

With a `rowsPerSegment` of only 20,000, the same ingestion as before produces more segments. Open the [Druid console in the Data Sources view](http://localhost:8888/unified-console.html#datasources) to see the difference in segments between "example-wikipedia-3-batch" and "example-wikipedia-4-batch".

Note that 20,000 is a very low value used to illustrate setting parameters. Normally this value is in the millions.

## Filter data during ingestion

In situations where you need data cleansing or your only interested in a subset of the data, the ingestion job can filter the data by simply adding a WHERE clause.

The example excludes all robotic wikipedia updates:

In [None]:
sql = '''
REPLACE INTO "example-wikipedia-only-human" OVERWRITE ALL
WITH "ext" AS 
(
    SELECT *
    FROM TABLE(
      EXTERN(
        '{"type":"http",
          "uris":[ "https://druid.apache.org/data/wikipedia.json.gz"]
         }',
        '{"type":"json"}'
      )
) EXTEND ("isRobot" VARCHAR, "channel" VARCHAR, "timestamp" VARCHAR, "flags" VARCHAR, "isUnpatrolled" VARCHAR, "page" VARCHAR, "diffUrl" VARCHAR, "added" BIGINT, "comment" VARCHAR, "commentLength" BIGINT, "isNew" VARCHAR, "isMinor" VARCHAR, "delta" BIGINT, "isAnonymous" VARCHAR, "user" VARCHAR, "deltaBucket" BIGINT, "deleted" BIGINT, "namespace" VARCHAR, "cityName" VARCHAR, "countryName" VARCHAR, "regionIsoCode" VARCHAR, "metroCode" BIGINT, "countryIsoCode" VARCHAR, "regionName" VARCHAR))
SELECT
  TIME_PARSE("timestamp") AS "__time",
  *
FROM "ext"

WHERE "isRobot"='false'

PARTITIONED BY DAY
'''

druid.display.run_task(sql)
druid.sql.wait_until_ready('example-wikipedia-only-human')

In [None]:
druid.display.sql("""
SELECT isRobot, channel, count(*) num_events
FROM "example-wikipedia-only-human" 
GROUP BY 1,2 
ORDER BY 3 DESC 
LIMIT 10
""")

## Transform data during ingestion

The SQL language provides a rich [set of functions](https://druid.apache.org/docs/latest/querying/sql-scalar.html) that can be applied to input columns to transform the data as it is being ingested. All scalar SQL functions are available for normal ingestion. Rollup ingestion is discussed in the [Rollup Notebook](05-rollup.ipynb) which includes the use of aggregate functions at ingestion time as well.

Here are some examples of such transformations:

##### Time manipulation
There are many [time parsing and manipulation functions](https://druid.apache.org/docs/latest/querying/sql-scalar.html#date-and-time-functions) available in Apache Druid. It is common to do some time cleansing/transformation at ingestion. Here are some examples of time manipulation functions:
```
  TIME_PARSE( "timestamp") AS "__time",   
  TIME_FLOOR( TIME_PARSE( "timestamp"), 'P1W') AS "week_start",
  TIMESTAMPDIFF( DAY,
                 TIME_FLOOR( TIME_PARSE( "timestamp"), 'P1W'),
                 TIME_PARSE( "timestamp")
               ) AS "days_since_week_start"
   
```

##### Use CASE statements to transform data
CASE statements can be used to resolve complex logic and prepare columns for certain query patterns. 
Examples:
```
  CASE
     WHEN UPPER("adblock_list")='NOADBLOCK' THEN 0
     ELSE 1
  END AS adblock_count,

  CASE
     WHEN UPPER("adblock_list")='EASYLIST' THEN 1
     ELSE 0
  END AS easylist_count
```
The two case statements above are examples of converting a categorical column like `adblock_list` into a numerical column that can be used as a meaningful metric when aggregated across different dimensions to get the count of events that were affected by an ad blocker.

##### String manipulation
Apache Druid has [string manipulation functions](https://druid.apache.org/docs/latest/querying/sql-scalar.html#string-functions) that can be very useful for transformation during ingestion. Some examples:
```
  REPLACE(REGEXP_EXTRACT("app_version", '[^\.]*\.'),'.','') AS major_version,
  STRING_TO_ARRAY("app_version",'\.') AS version_array,
  ARRAY_ORDINAL(STRING_TO_ARRAY("app_version",'\.'),3) AS patch_version
```
The above makes use of regex-based extraction, string replacement, string to array conversion and access to array elements as examples of the string transformation functions available.

##### Data Flattening functions
If you need to extract fields from nested structures in the input data, JSON_VALUE function can be used to retrieve them and cast them to the desired data type:
```
  JSON_VALUE("event", '$.percentage' RETURNING BIGINT) as percent_cleared,
  JSON_VALUE("geo_ip", '$.city') AS city,
```

Here's a SQL based ingestion statement that uses all of these examples and a few more:

In [None]:
sql = '''
REPLACE INTO "example-kttm-transform-batch" OVERWRITE ALL
WITH "ext" AS 
(
    SELECT *
    FROM TABLE(
      EXTERN(
        '{"type":"http","uris":["https://static.imply.io/example-data/kttm-nested-v2/kttm-nested-v2-2019-08-25.json.gz"]}',
        '{"type":"json"}'
      )
    ) EXTEND ("timestamp" VARCHAR, "session" VARCHAR, "number" VARCHAR, "event" TYPE('COMPLEX<json>'), "agent" TYPE('COMPLEX<json>'), "client_ip" VARCHAR, "geo_ip" TYPE('COMPLEX<json>'), "language" VARCHAR, "adblock_list" VARCHAR, "app_version" VARCHAR, "path" VARCHAR, "loaded_image" VARCHAR, "referrer" VARCHAR, "referrer_host" VARCHAR, "server_ip" VARCHAR, "screen" VARCHAR, "window" VARCHAR, "session_length" BIGINT, "timezone" VARCHAR, "timezone_offset" VARCHAR)
)
SELECT
  session, 
  number,
  TIME_PARSE("timestamp") AS "__time",
  TIMESTAMPDIFF(DAY, TIME_FLOOR(TIME_PARSE("timestamp"), 'P1W'), TIME_PARSE("timestamp")) AS days_since_week_start,
  TIME_FLOOR(TIME_PARSE("timestamp"), 'P1W') AS week_start,
  TIME_CEIL(TIME_PARSE("timestamp"), 'P1W') AS week_end,
  TIME_SHIFT(TIME_FLOOR(TIME_PARSE("timestamp"), 'P1D'),'P1D', -1) AS start_of_yesterday,
  
  JSON_VALUE("event", '$.percentage' RETURNING BIGINT) as percent_cleared,
  JSON_VALUE("geo_ip", '$.city') AS city,
  
  CASE WHEN UPPER("adblock_list")='NOADBLOCK' THEN 0 ELSE 1 END AS adblock_count,
  CASE WHEN UPPER("adblock_list")='EASYLIST' THEN 1 ELSE 0 END AS easylist_count,
  
  REPLACE(REGEXP_EXTRACT("app_version", '[^\.]*\.'),'.','') AS major_version,
  ARRAY_ORDINAL(STRING_TO_ARRAY("app_version",'\.'),2) AS minor_version,
  ARRAY_ORDINAL(STRING_TO_ARRAY("app_version",'\.'),3) AS patch_version,
  session_length
FROM "ext"
PARTITIONED BY DAY
'''

druid.display.run_task(sql)
druid.sql.wait_until_ready('example-kttm-transform-batch')

In [None]:
# let's see what time of day shows the highest user activity
druid.display.sql("""
SELECT EXTRACT( HOUR FROM "__time") time_hour, city, count(distinct "session") session_count
FROM "example-kttm-transform-batch" 
WHERE "city" IS NOT NULL AND "city" <> ''
GROUP BY 1,2 
ORDER BY 3 DESC 
LIMIT 10
""")

## Nested columns

Apache Druid supports ingestion of [nested columns](https://druid.apache.org/docs/latest/querying/nested-columns.html). These are columns that contain nested structures with their own set of fields which in turn are either have literal values or nested structures as well. Druid can automatically parse nested columns and index all internal fields into columnar form. This makes all fields in the JSON objects available for fast filtering and aggregation just as if they were top level columns. The schema of the nested columns is automatically discovered and access to the columns is through familiar JSON paths by using the JSON_VALUE function.

This example loads the Koalas to the Max sample dataset that includes multiple nested columns:

In [None]:
sql = '''
REPLACE INTO "example-kttm-nested-batch" OVERWRITE ALL
WITH "ext" AS 
(
    SELECT *
    FROM TABLE(
      EXTERN(
        '{"type":"http","uris":["https://static.imply.io/example-data/kttm-nested-v2/kttm-nested-v2-2019-08-25.json.gz"]}',
        '{"type":"json"}'
      )
    ) EXTEND ( "timestamp" VARCHAR, "session" VARCHAR, "number" VARCHAR, 
               "event" TYPE('COMPLEX<json>'), 
               "agent" TYPE('COMPLEX<json>'), 
               "client_ip" VARCHAR, 
               "geo_ip" TYPE('COMPLEX<json>'), 
               "language" VARCHAR, "adblock_list" VARCHAR, "app_version" VARCHAR, 
               "path" VARCHAR, "loaded_image" VARCHAR, "referrer" VARCHAR, 
               "referrer_host" VARCHAR, "server_ip" VARCHAR, 
               "screen" VARCHAR, "window" VARCHAR, 
               "session_length" BIGINT, "timezone" VARCHAR, 
               "timezone_offset" VARCHAR)
)
SELECT
  TIME_PARSE("timestamp") AS "__time", *
FROM "ext"
PARTITIONED BY DAY
'''
druid.display.run_task(sql)
druid.sql.wait_until_ready('example-kttm-nested-batch')

As you can see, ingesting nested columns is straight forward. All you need to do is declare them as "TYPE('COMPLEX<json>')", include the input field in the main SELECT clause ( * = all columns ) and you're done!
Take a look at the query example below where we access these nested fields as dimensions we can group by, metrics we can aggregate and filters we can apply:

In [None]:
druid.display.sql("""
SELECT
  JSON_VALUE("agent", '$.browser') AS "browser",
  SUM( JSON_VALUE("event", '$.layer' RETURNING BIGINT) ) AS "sum_layers",
  COUNT( DISTINCT JSON_VALUE("geo_ip", '$.city') ) AS "unique_cities"

FROM "example-kttm-nested-batch"

WHERE JSON_VALUE("geo_ip", '$.continent') = 'South America'

GROUP BY 1 
ORDER BY 3 DESC
""")

Since nested columns could have different fields from row to row or as their schema changes over time, you can inspect the fields that have been discovered during ingestion using the JSON_PATHS function on nested columns:

In [None]:
druid.display.sql("""
SELECT 'agent' as nested_column, STRING_AGG( DISTINCT JSON_PATHS("agent"), ', ') paths FROM "example-kttm-nested-batch"
UNION ALL
SELECT 'event', STRING_AGG( DISTINCT JSON_PATHS("event"), ', ') paths FROM "example-kttm-nested-batch"
UNION ALL
SELECT 'geo_ip', STRING_AGG( DISTINCT JSON_PATHS("geo_ip"), ', ') paths FROM "example-kttm-nested-batch"
""")

<a id='system_fields'></a>
## System Fields for Batch Ingestion

When doing ingestion of multiple files, it is generally helpful to know the specific source of the data. This feature allows you to do just that. It provides system fields that identifty the input source and which can be added to the ingestion job.

Each Input Source has slightly different input fields. In the example below we use HTTP [checkout in the docs to see the fields that are available](https://druid.apache.org/docs/latest/ingestion/input-sources#http-input-source). 

To enable this functionality, add the new property "systemFields" the Input Source field in the EXTERN clause:
```
FROM TABLE(
    EXTERN(
      '{
         "type":"http",
         "systemFields":["__file_uri","__file_path"],   <<<<<< list of system fields to capture
         "uris":[<list of file URIs]}',
        }'
...
    )
```

and in the EXTEND clause add the fields so they are accesible to SELECT :
```
EXTEND ("__file_uri" VARCHAR,"__file_path" VARCHAR, ...)

```

This example ingests three files (20 million rows each) into a roll-up table to demonstrate the use of system fields. It takes a while, so be patient:

In [None]:
sql = '''
REPLACE INTO "example-taxi-trips-rollup" OVERWRITE ALL
WITH "ext" AS (
  SELECT *
  FROM TABLE(
    EXTERN(
      '{"type":"http","systemFields":["__file_uri","__file_path"],"uris":["https://static.imply.io/example-data/trips/trips_xaa.csv.gz","https://static.imply.io/example-data/trips/trips_xab.csv.gz","https://static.imply.io/example-data/trips/trips_xac.csv.gz"]}',
      '{"type":"csv","findColumnsFromHeader":false,"columns":["trip_id","vendor_id","pickup_datetime","dropoff_datetime","store_and_fwd_flag","rate_code_id","pickup_longitude","pickup_latitude","dropoff_longitude","dropoff_latitude","passenger_count","trip_distance","fare_amount","extra","mta_tax","tip_amount","tolls_amount","ehail_fee","improvement_surcharge","total_amount","payment_type","trip_type","pickup","dropoff","cab_type","precipitation","snow_depth","snowfall","max_temperature","min_temperature","average_wind_speed","pickup_nyct2010_gid","pickup_ctlabel","pickup_borocode","pickup_boroname","pickup_ct2010","pickup_boroct2010","pickup_cdeligibil","pickup_ntacode","pickup_ntaname","pickup_puma","dropoff_nyct2010_gid","dropoff_ctlabel","dropoff_borocode","dropoff_boroname","dropoff_ct2010","dropoff_boroct2010","dropoff_cdeligibil","dropoff_ntacode","dropoff_ntaname","dropoff_puma"]}'
    )
  ) EXTEND ("__file_uri" VARCHAR,"__file_path" VARCHAR, "trip_id" BIGINT, "vendor_id" BIGINT, "pickup_datetime" VARCHAR, "dropoff_datetime" VARCHAR, "store_and_fwd_flag" VARCHAR, "rate_code_id" BIGINT, "pickup_longitude" DOUBLE, "pickup_latitude" DOUBLE, "dropoff_longitude" DOUBLE, "dropoff_latitude" DOUBLE, "passenger_count" BIGINT, "trip_distance" DOUBLE, "fare_amount" DOUBLE, "extra" DOUBLE, "mta_tax" DOUBLE, "tip_amount" DOUBLE, "tolls_amount" DOUBLE, "ehail_fee" VARCHAR, "improvement_surcharge" VARCHAR, "total_amount" DOUBLE, "payment_type" BIGINT, "trip_type" VARCHAR, "pickup" VARCHAR, "dropoff" VARCHAR, "cab_type" VARCHAR, "precipitation" DOUBLE, "snow_depth" BIGINT, "snowfall" BIGINT, "max_temperature" BIGINT, "min_temperature" BIGINT, "average_wind_speed" DOUBLE, "pickup_nyct2010_gid" BIGINT, "pickup_ctlabel" BIGINT, "pickup_borocode" BIGINT, "pickup_boroname" VARCHAR, "pickup_ct2010" BIGINT, "pickup_boroct2010" BIGINT, "pickup_cdeligibil" VARCHAR, "pickup_ntacode" VARCHAR, "pickup_ntaname" VARCHAR, "pickup_puma" BIGINT, "dropoff_nyct2010_gid" BIGINT, "dropoff_ctlabel" BIGINT, "dropoff_borocode" BIGINT, "dropoff_boroname" VARCHAR, "dropoff_ct2010" BIGINT, "dropoff_boroct2010" BIGINT, "dropoff_cdeligibil" VARCHAR, "dropoff_ntacode" VARCHAR, "dropoff_ntaname" VARCHAR, "dropoff_puma" BIGINT)
)
SELECT
  TIME_FLOOR( TIME_PARSE(TRIM("pickup_datetime")), 'P1M') AS "__time",
  "__file_uri",
  "__file_path",
  count(*) as "row_count"
FROM "ext"
WHERE "passenger_count" > 2   -- less rows to process for this example
GROUP BY 1,2,3
PARTITIONED BY ALL
'''
request = druid.sql.sql_request( sql)         # init request object
request.add_context( 'maxNumTasks', 4)        # can't go any higher in learning environment

druid.display.run_task(request)
druid.sql.wait_until_ready('example-taxi-trips-rollup')

Query the system fields that were ingested to see information about how each file was ingested:

In [None]:
sql='''
SELECT "__file_uri", "__file_path", 
    SUM("row_count") "total_file_rows" 
FROM "example-taxi-trips-rollup"
GROUP BY 1,2
'''

druid.display.sql(sql)

While the above examples are rather simple, this is a powerful tool to enhance data when the files are organized in folder structures where the path contains infomation about the data. It is common to see this kind of file system organization in cloud storage where that data has already been partitioned by time or other dimensions. Take this list of files as an example:
```
/data/activity_log/customer=501/hour=2024-01-01T10:00:00/datafile1.csv
/data/activity_log/customer=501/hour=2024-01-01T10:00:00/datafile2.csv
/data/activity_log/customer=376/hour=2024-01-01T11:00:00/datafile1.csv
...
```
With this example, the __file_uri or __file_path columns can be parsed at ingestion to create other fields using functions like REGEXP_EXTRACT to extract `customer` and `hour` in this example.

## Enhancing data at ingestion

Adding dimensions and metrics to your data can enhance its analytic value. It's common, for example, to add product categorization, user demographics or additional location based metrics to Retail clickstream or POS data. In IoT scenarios, additional info like metric type (temperature, pressure, flow, etc) for a particular device is common, the device can be associated to a specific industrial process and grouped into components and subcomponents of the overall system being monitored are very useful in determining subsystem anomalies. 

Lookups and joins can be used at query time to enhance data in this fashion. But there is a performance penalty when using lookups and even more penalty with joins at query time. So in the interest of achieving fast analytic queries, joins can be applied at ingestion time.



#### Broadcast joins - small lookups joins

SQL Based Ingestion can process joins efficiently during ingestion using either broadcast or sort merge joins. Broadcast is the default method, in which the right table of the join is broadcast in its entirety to all workers involved in the ingestion. The content of the lookup is kept within each worker's memory in order to process the join. You'll need to take care that the whole set of lookup tables joined in this fashion for a given ingestion will fit within the heap of each worker JVM.

Here's an example:

In [None]:
sql = '''
REPLACE INTO "example-kttm-enhanced-batch" OVERWRITE ALL
WITH
kttm_data AS (
  SELECT * 
  FROM TABLE(
    EXTERN(
      '{"type":"http","uris":["https://static.imply.io/example-data/kttm-v2/kttm-v2-2019-08-25.json.gz"]}',
      '{"type":"json"}'
    )
  ) EXTEND ("timestamp" VARCHAR, "agent_category" VARCHAR, "agent_type" VARCHAR, "browser" VARCHAR, "browser_version" VARCHAR, "city" VARCHAR, "continent" VARCHAR, "country" VARCHAR, "version" VARCHAR, "event_type" VARCHAR, "event_subtype" VARCHAR, "loaded_image" VARCHAR, "adblock_list" VARCHAR, "forwarded_for" VARCHAR, "language" VARCHAR, "number" VARCHAR, "os" VARCHAR, "path" VARCHAR, "platform" VARCHAR, "referrer" VARCHAR, "referrer_host" VARCHAR, "region" VARCHAR, "remote_address" VARCHAR, "screen" VARCHAR, "session" VARCHAR, "session_length" BIGINT, "timezone" VARCHAR, "timezone_offset" VARCHAR, "window" VARCHAR)
),
country_lookup AS (
  SELECT * 
  FROM TABLE(
    EXTERN(
      '{"type":"http","uris":["https://static.imply.io/example-data/lookup/countries.tsv"]}',
      '{"type":"tsv","findColumnsFromHeader":true}'
    )
  ) EXTEND ("Country" VARCHAR, "Capital" VARCHAR, "ISO3" VARCHAR, "ISO2" VARCHAR)
)

SELECT
  TIME_PARSE(kttm_data."timestamp") AS __time,
  kttm_data."session",
  kttm_data."agent_category",
  kttm_data."agent_type",
  kttm_data."browser",
  kttm_data."browser_version",
  kttm_data."language",
  kttm_data."os",
  kttm_data."city",
  kttm_data."country",
  country_lookup."Capital" AS "capital",
  country_lookup."ISO3" AS "iso3",
  kttm_data."forwarded_for" AS "ip_address",
  kttm_data."session_length",
  kttm_data."event_type"
FROM kttm_data
LEFT JOIN country_lookup ON country_lookup.Country = kttm_data.country
PARTITIONED BY DAY
'''
druid.display.run_task(sql)
druid.sql.wait_until_ready('example-kttm-enhanced-batch')

Data for both sources "kttm_data" and "country_lookup" are obtained from external sources:
```
WITH
kttm_data AS
(
  SELECT *
  FROM TABLE(
    EXTERN(
               '{"type":"http","uris":["https://static.imply.io/example-data/kttm-v2/kttm-v2-2019-08-25.json.gz"]}',
               '{"type":"json"}'
    )
  ) EXTEND ("timestamp" VARCHAR, "agent_category" VARCHAR, ...)
),
country_lookup AS
(
  SELECT *
  FROM TABLE(
    EXTERN(
      '{"type":"http","uris":["https://static.imply.io/example-data/lookup/countries.tsv"]}',
      '{"type":"tsv","findColumnsFromHeader":true}'
    )
  ) EXTEND ("Country" VARCHAR, "Capital" VARCHAR, "ISO3" VARCHAR, "ISO2" VARCHAR)
)
```

Columns from both tables can be used in the SELECT expressions using the alias "country_lookup" to reference any joined column:
```
  kttm_data."country",
  country_lookup."Capital" AS "capital",
  country_lookup."ISO3" AS "iso3"
```

The join is specified in the FROM clause:
```
FROM kttm_data
LEFT JOIN country_lookup ON country_lookup.Country = kttm_data.country
```
LEFT JOIN insured that all the rows from kttm_data source are ingested. An INNER JOIN would exclude rows from "kttm_data" if the value for "kttm_data.country" is not present in "country_lookup.Country". 
Since no context parameters were set, the join is processed as a broadcast join. The first table in the FROM clause is the distributed table and all other joined tables will be shipped to the workers to execute the join.

Take a look at the data:

In [None]:
druid.display.sql("""
SELECT
  "iso3" AS "country_code", 
  "capital",
  count( DISTINCT "ip_address" ) distinct_users, 
  MIN("session_length")/1000 fastest_session_ms,
  MAX("session_length")/1000 slowest_session_ms
FROM "example-kttm-enhanced-batch"
WHERE "event_type"='LayerClear'
GROUP BY 1,2
ORDER BY 3 DESC
LIMIT 10
""")

#### Shuffle joins - "Large lookup to fact" or "fact to fact" joins

See [Shuffle joins in SQL Based Ingestion](https://druid.apache.org/docs/latest/multi-stage-query/reference.html#sort-merge).
This is the ability to join large tables to other large tables without fully loading either one into memory. Both sources involved in the join are scanned in parallel across all workers, the intermediate data for both sources is then redistributed among the workers based on the join column(s) such that rows from both sources with the same values end up in the same worker.

![](assets/shuffle-join.png)

In order to use shuffle join the query context must include:
```
{
   "sqlJoinAlgorithm":"sortMerge"
}
```

Given that this example is meant to run on the local docker compose deployment, two very large tables is not possible, try it out with small sources and just pretend they are big. We'll use the "wikipedia" sample data and join it with "example-wiki-users-batch" profile data. But first, create the users table because at the time of this writing there wasn't a matching "user" source handy:

In [None]:
# since we don't have a source for example-wiki-users-batch let's
# create one using in-database transformation that
# generates user profiles from the wikipedia data by
# - grouping on "user"
# - injecting variability into the "group" column using modulus of the __time of the event
# - "edits" represent the number of edits done by the user, so just count the number of events
# - calculate the registration time using the minimum __time from events and adjusting it some variable years back in time
# - determine the preferred language of the user based on their earliest channel edit

sql = '''
REPLACE INTO "example-wiki-users-batch" OVERWRITE ALL
SELECT 
  "user", 
  EARLIEST(
    CASE 
      WHEN  MOD(TIMESTAMP_TO_MILLIS(__time),5) > 3 THEN 'Reviewers' 
      WHEN  MOD(TIMESTAMP_TO_MILLIS(__time),17) > 13 THEN 'Patrollers' 
      WHEN  MOD(TIMESTAMP_TO_MILLIS(__time),23) > 21 THEN 'Bots'
      ELSE 'Autoconfirmed'
    END,
    1024
  ) AS "group",
  count(*) "edits",
  TIME_SHIFT(MIN(__time), 'P1Y', -1 * MOD(MIN(EXTRACT (MICROSECOND FROM __time)),20) ) AS "registered_at_ms",
  EARLIEST(SUBSTRING("channel", 2, 2), 1024) AS "language"
FROM "example-wikipedia-batch"
GROUP BY 1
PARTITIONED BY ALL
'''
request = druid.sql.sql_request( sql)              # init request object
request.add_context( 'finalizeAggregations', True) # EARLIEST functions will store a partial aggregation otherwise
request.add_context( 'maxNumTasks', 2)             # can't go any higher in test env

druid.display.run_task(request)
druid.sql.wait_until_ready('example-wiki-users-batch')

The next cell runs an ingestion using the sortMerge join:

In [None]:
sql = '''
REPLACE INTO "example-wiki-merge-batch" OVERWRITE ALL
WITH "wikidata" AS 
(
    SELECT *
    FROM TABLE(
      EXTERN(
        '{"type":"http",
          "uris":[ "https://druid.apache.org/data/wikipedia.json.gz"]
         }',
        '{"type":"json"}'
      )
) EXTEND ("isRobot" VARCHAR, "channel" VARCHAR, "timestamp" VARCHAR, "flags" VARCHAR, "isUnpatrolled" VARCHAR, "page" VARCHAR, "diffUrl" VARCHAR, "added" BIGINT, "comment" VARCHAR, "commentLength" BIGINT, "isNew" VARCHAR, "isMinor" VARCHAR, "delta" BIGINT, "isAnonymous" VARCHAR, "user" VARCHAR, "deltaBucket" BIGINT, "deleted" BIGINT, "namespace" VARCHAR, "cityName" VARCHAR, "countryName" VARCHAR, "regionIsoCode" VARCHAR, "metroCode" BIGINT, "countryIsoCode" VARCHAR, "regionName" VARCHAR))
SELECT 
  TIME_PARSE(d."timestamp") as "__time",
  d."isRobot", 
  d."channel" , 
  d."timestamp" , 
  d."flags" , 
  d."isUnpatrolled" , 
  d."page" , 
  d."diffUrl" , 
  d."added" , 
  d."comment" , 
  d."commentLength" , 
  d."isNew" , 
  d."isMinor" , 
  d."delta" , 
  d."isAnonymous" , 
  d."user" , 
  d."deltaBucket" , 
  d."deleted" , 
  d."namespace" , 
  d."cityName" , 
  d."countryName" , 
  d."regionIsoCode" , 
  d."metroCode" , 
  d."countryIsoCode" , 
  d."regionName", 
  u."group" AS "user_group",
  u."edits" AS "user_edits",
  u."registered_at_ms" AS "user_registration_epoch",
  u."language" AS "user_language"
FROM "wikidata" AS d 
  LEFT JOIN "example-wiki-users-batch" AS u ON u."user"=d."user"
PARTITIONED BY DAY
'''
request = druid.sql.sql_request( sql)                 # init request object
request.add_context( 'sqlJoinAlgorithm', 'sortMerge') # use sortMerge to join the sources
request.add_context( 'maxNumTasks', 2)                # use 2 tasks to run the ingestion

druid.display.run_task(request)
druid.sql.wait_until_ready('example-wiki-merge-batch')


Run the next cell to query the newly joined data:

In [None]:
druid.display.sql("""
SELECT "user_group",
  count( DISTINCT "user") "distinct_users",
  sum("user_edits") "total_activity"
FROM "example-wiki-merge-batch"
GROUP BY 1
ORDER BY 1, 3 DESC
""")

## Conclusion

Druid's [SQL Based ingestion](https://druid.apache.org/docs/latest/multi-stage-query/index.html) enables scalable batch ingestion from a large variety of [data sources](https://druid.apache.org/docs/latest/ingestion/native-batch-input-sources.html) and [formats](https://druid.apache.org/docs/latest/ingestion/data-formats.html). The familiarity and expressiveness of SQL enables users to quickly transform, filter and generally enhance data directly in the cluster.



## Cleanup

Run the following cell to remove all data sources created in this notebook.

In [None]:
druid.datasources.drop('example-wikipedia-batch', True)
druid.datasources.drop('example-wikipedia-3-batch', True)
druid.datasources.drop('example-wikipedia-4-batch', True)
druid.datasources.drop('example-wikipedia-only-human', True)
druid.datasources.drop('example-kttm-transform-batch', True)
druid.datasources.drop('example-kttm-nested-batch', True)
druid.datasources.drop('example-kttm-enhanced-batch', True)
druid.datasources.drop('example-wiki-users-batch', True)
druid.datasources.drop('example-wiki-merge-batch', True)
druid.datasources.drop('example-taxi-trips-rollup', True)
