# How to Build Time Series Applications in CrateDB

This notebook guides you through an example of how to import and work with
time series data in CrateDB. It uses Dask to import data into CrateDB.
Dask is a framework to parallelize operations on pandas Dataframes.

## Dataset
This notebook uses a daily weather data set provided on kaggle.com. This dataset
offers a collection of **daily weather readings from major cities around the
world, making up to ~1250 cities**. Some locations provide historical data
tracing back to 1833, giving users a deep dive into **long-term weather patterns
and their evolution**.

The measurements include a few time series, listed here:

- Station ID
- City Name
- Timestamp (granularity: day)
- Season
- Average temperature in °C
- Minimum temperature in °C
- Maximum temperature in °C
- Precipitation in mm
- Snow depth in mm
- Average wind direction in degrees
- Average wind speed in km/h
- Peak wind gust in km/h
- Average sea level pressure in hpa
- Total sunshine in min

The data set is available on Kaggle at [The Weather Dataset].

[The Weather Dataset]: https://www.kaggle.com/datasets/guillemservera/global-daily-climate-data

## Step 1: Install dependencies

In [None]:
#!pip install dask pandas==2.0.0 'sqlalchemy[crate]'

## Step 2: Read and prepare the data

Download and prepare the data for importing into CrateDB.

The following data sets need to be processed:
- Daily weather data (daily_weather.parquet)
- Cities (cities.csv)
- Countries (countries.csv)

The subsequent code cell acquires the dataset directly from kaggle.com.
To properly configure the notebook to use corresponding credentials
after signing up on Kaggle, define the `KAGGLE_USERNAME` and
`KAGGLE_KEY` environment variables. Alternatively, put them into the
file `~/.kaggle/kaggle.json` in your home folder, like this:
```json
{
  "username": "acme",
  "key": "2b1dac2af55caaf1f34df76236fada4a"
}
```
Another variant is to acquire the dataset files manually, and extract
them into a folder called `DOWNLOAD`. In this case, you can deactivate
those two lines of code, in order to skip automatic dataset acquisition.

In [None]:
from cratedb_toolkit.datasets import load_dataset

dataset = load_dataset("kaggle://guillemservera/global-daily-climate-data/daily_weather.parquet")
dataset.acquire()

In [88]:
from dask import dataframe as dd
from dask.diagnostics import ProgressBar

# Show a progress bar for dask activities
pbar = ProgressBar()
pbar.register()

In [56]:
# Load the parquet file. Please adjust the file path as needed.
df_kaggle = dd.read_parquet('DOWNLOAD/daily_weather.parquet')

# Show info about the data.
df_kaggle.info(verbose=True, memory_usage=True)

# Display the first rows.
df_kaggle.head()

[########################################] | 100% Completed | 6.26 ss
[########################################] | 100% Completed | 6.37 s
[########################################] | 100% Completed | 6.47 s
[########################################] | 100% Completed | 6.47 s
<class 'dask.dataframe.core.DataFrame'>
Index: 27635763 entries, 0 to 24220
Data columns (total 14 columns):
 #   Column                  Non-Null Count  Dtype
---  ------                  --------------  -----
 0   station_id              27635763 non-null      category
 1   city_name               27621770 non-null      category
 2   date                    27635763 non-null      datetime64[ns]
 3   season                  27635763 non-null      category
 4   avg_temp_c              21404856 non-null      float64
 5   min_temp_c              21917534 non-null      float64
 6   max_temp_c              22096417 non-null      float64
 7   precipitation_mm        20993263 non-null      float64
 8   snow_depth_mm    

Unnamed: 0,station_id,city_name,date,season,avg_temp_c,min_temp_c,max_temp_c,precipitation_mm,snow_depth_mm,avg_wind_dir_deg,avg_wind_speed_kmh,peak_wind_gust_kmh,avg_sea_level_pres_hpa,sunshine_total_min
0,41515,Asadabad,1957-07-01,Summer,27.0,21.1,35.6,0.0,,,,,,
1,41515,Asadabad,1957-07-02,Summer,22.8,18.9,32.2,0.0,,,,,,
2,41515,Asadabad,1957-07-03,Summer,24.3,16.7,35.6,1.0,,,,,,
3,41515,Asadabad,1957-07-04,Summer,26.6,16.1,37.8,4.1,,,,,,
4,41515,Asadabad,1957-07-05,Summer,30.8,20.0,41.7,0.0,,,,,,


In [68]:
# Read cities, adapt the path to the files accordingly
cities = dd.read_csv("DOWNLOAD/cities.csv",dtype={'station_id': 'object'})

# Modify lon and lat of cities into an array that can be interpreted directly by CrateDB
def create_location_column(df):
    df['loc'] = df[['longitude', 'latitude']].values.tolist()
    return df

cities = cities.map_partitions(create_location_column)
cities = cities.drop(['longitude', 'latitude'], axis=1)

cities.head()

[########################################] | 100% Completed | 107.04 ms
[########################################] | 100% Completed | 211.77 ms
[########################################] | 100% Completed | 316.85 ms
[########################################] | 100% Completed | 421.17 ms


Unnamed: 0,station_id,city_name,country,state,iso2,iso3,loc
0,41515,Asadabad,Afghanistan,Kunar,AF,AFG,"[71.1500045859, 34.8660000397]"
1,38954,Fayzabad,Afghanistan,Badakhshan,AF,AFG,"[70.5792471913, 37.1297607616]"
2,41560,Jalalabad,Afghanistan,Nangarhar,AF,AFG,"[70.4361034738, 34.4415269155]"
3,38947,Kunduz,Afghanistan,Kunduz,AF,AFG,"[68.8725296619, 36.7279506623]"
4,38987,Qala i Naw,Afghanistan,Badghis,AF,AFG,"[63.1332996367, 34.983000131]"


In [119]:
# Read countries, adapt the path to the files accordingly
countries = dd.read_csv("DOWNLOAD/countries.csv")

## Step 3: Import data into CrateDB

After acquiring and preparing data files and data frames, they can be imported
into CrateDB. In order to provide the correct datatypes, and use, for example,
fulltext indexes, the tables are created manually. When writing a dataframe to
CrateDB, the schema can also be derived automatically.

### Connect to CrateDB

This code uses SQLAlchemy to connect to CrateDB.

In [102]:
import os
import sqlalchemy as sa
from crate.client.sqlalchemy.support import insert_bulk

# Define database address when using CrateDB Cloud.
# Please find these settings on your cluster overview page.
CONNECTION_STRING = os.environ.get(
    "CRATEDB_CONNECTION_STRING",
    "crate://<USER>:<PASSWORD>@<CRATEDB_HOST>/?ssl=true",
)

# Define database address when using CrateDB on localhost.
#CONNECTION_STRING = os.environ.get(
#    "CRATEDB_CONNECTION_STRING",
#    "crate://crate@localhost/",
#)

# Connect to CrateDB using SQLAlchemy.
engine = sa.create_engine(CONNECTION_STRING, echo=False)
connection = engine.connect()

#### Create tables

Now let's create the weather data table. We want to use fulltext search capabilities
on the city name, thus there is a corresponding an index defined on it.

In [121]:
connection.execute(sa.text("""
CREATE TABLE IF NOT EXISTS "weather_data" (
   "station_id" TEXT,
   "city_name" TEXT,
   "date" TIMESTAMP WITHOUT TIME ZONE,
   "season" TEXT,
   "avg_temp_c" REAL,
   "min_temp_c" REAL,
   "max_temp_c" REAL,
   "precipitation_mm" REAL,
   "snow_depth_mm" REAL,
   "avg_wind_dir_deg" REAL,
   "avg_wind_speed_kmh" REAL,
   "peak_wind_gust_kmh" REAL,
   "avg_sea_level_pres_hpa" REAL,
   "sunshine_total_min" REAL,
   INDEX city_name_ft using fulltext (city_name)
)
"""))

<sqlalchemy.engine.cursor.CursorResult at 0x3451f2740>

Now, create the `cities` table using a `GEO_POINT` column to store location information.

In [105]:
connection.execute(sa.text("""
CREATE TABLE "cities" (
   "station_id" TEXT,
   "city_name" TEXT,
   "country" TEXT,
   "state" TEXT,
   "iso2" TEXT,
   "iso3" TEXT,
   "loc" GEO_POINT
)
"""))

<sqlalchemy.engine.cursor.CursorResult at 0x34522a040>

#### Import Weather Data

If you are using a **CrateDB Cloud cluster**, the easiest and fastest way to
import the data is to use the **import mechanism of CrateDB Cloud**. It avoids
to transfer a lot of data across the network, as the Parquet file is uploaded
directly into a staging area and imported into CrateDB.

If you are running **CrateDB locally**, or do not want to use the GUI, we recommend
to use a parallelized import via Dask, which follows a few relevant ideas.
Background: pandas data frames would only use one CPU to prepare the data and not
utilize the database good enough.

- Create additional partitions to parallelize the import to CrateDB.
  They will be automatically processed/imported in parallel by Dask.
- Tuning the concurrency and batch size parameters correctly is important to not
  overload the database. A chunk size of 10,000 has shown good results on a single
  CrateDB node with 4 GB of assigned heap memory.
  Please watch the logs on the insert operation: If the garbage collector consumes
  a lot of time, it is an indicator that there is not enough memory assigned to
  CrateDB's heap.
- Instead of individual `INSERT` statements, the method outlined above uses the
  "bulk insert method" of CrateDB.
- The parallelization of the import procedure works on all partitions.
- Running CrateDB in a local Docker container with 5 assigned CPUs, 8 GB total memory,
  and 4 GB heap space, led to about 80,000 inserts/second, including all the indexing.

You can find additional hints about importing large datasets via Python's Dask Data
Frames to CrateDB at [SQLAlchemy: DataFrame operations].

[SQLAlchemy: DataFrame operations]: https://cratedb.com/docs/python/en/latest/by-example/sqlalchemy/dataframe.html

In [84]:
# Uncomment the following lines to process the actual weather data.
# They have been disabled in order to avoid long-running operations.
# df_kaggle = df_kaggle.repartition(26)
# df_kaggle.to_sql(name='weather_data', uri=dburi, if_exists='append',
#                 index=False, chunksize=10000, parallel=True, method=insert_bulk)

#### Import Countries

Countries will be imported as is, the schema is automatically derived by SQLAlchemy.

In [120]:
countries.to_sql('countries', CONNECTION_STRING, if_exists='append',
                 index=False, chunksize=1000, parallel=True, method=insert_bulk)

[########################################] | 100% Completed | 964.80 ms
[########################################] | 100% Completed | 1.06 s
[########################################] | 100% Completed | 1.16 s
[########################################] | 100% Completed | 1.17 s
[########################################] | 100% Completed | 1.27 s


#### Import Cities

Cities will be imported using the updated geolocation column.

In [106]:
cities.to_sql('cities', CONNECTION_STRING, if_exists='append',
              index=False, chunksize=1000, parallel=True, method=insert_bulk)

[########################################] | 100% Completed | 1.17 sms
[########################################] | 100% Completed | 1.17 s
[########################################] | 100% Completed | 1.27 s
[########################################] | 100% Completed | 1.27 s
[########################################] | 100% Completed | 1.37 s
