# Climate extremes explorer (ODPS)

## 1. Login Citybrain Platform

### 1.1 Install python library

[Package Homepage](https://github.com/citybrain-platform/python-library)

In [None]:
!pip3 install -U citybrain_platform

### 1.2 Get the api key from [website](https://www.citybrain.org/#/settings) and configure library with it

> You need an api key to use the library, login [citybrain](https://www.citybrain.org/#/settings) and get the key

In [None]:
import citybrain_platform

# replace the api key with your own
citybrain_platform.api_key = "..."

### 1.1 Get a glimpse of all the tables current user can see

> There are two types of tables in the computing platform:
>
> **public**: head data operated by the platform
>
> **own**: tables created by user associated with the api key

In [None]:
tables = citybrain_platform.Computing.list_tables()

for p in tables.public:
    print(p)

for o in tables.own:
    print(o)

### 1.2 Show the schema of a table

In [None]:
schema = citybrain_platform.Computing.get_table_schema(name="cesm")

for col in schema.columns:
    print(f"name: {col.name}, type: {col.type}, comment: {col.comment}")

print(schema.create_table_sql)

## 2. Create your new table and insert data

> Define table columns before creating table.
> 
> You can specify one or more columns as partition key columns to create a partitioned table. Partitioned tables are similar to individual directories in a distributed file system. A partition is similar to a directory and all data in the partition is similar to all data files in the directory.

### 2.1 Define table columns

In [None]:
from citybrain_platform.computing.data_types import Column, ColumnType

columns = [
        Column(name='lat', type=ColumnType.DOUBLE, comment='latitude, unit: degree'),
        Column(name='lon', type=ColumnType.DOUBLE, comment='longitudue,unit: degree'),
        Column(name='time_list', type=ColumnType.TIMESTAMP, comment='date'),
        Column(name='climate_index', type=ColumnType.DOUBLE, comment='index of cliamte (e.g., temperature, precipitation), unit: (e.g.,C, M/S)'),
        Column(name='quant', type=ColumnType.DOUBLE, comment='quantile value of climate index at a specific percentile, unit: (e.g.,C, M/S)'),
        Column(name='frequence', type=ColumnType.INT, comment='counts of the number cliamte extreme events in the seleacted peroid, unit: counts'),
        Column(name='mean_climate_index', type=ColumnType.DOUBLE, comment='mean of the cliamte events in the seleacted peroid, unit: (e.g.,C, M/S)'),
        Column(name='group_id', type=ColumnType.DOUBLE, comment=''),
        Column(name='single_event_dur', type=ColumnType.INT, comment='single event duration, unit: days'),
        Column(name='total_duration', type=ColumnType.INT, comment='total duration of climate extreme events in the seleacted peroid, unit: days'),
        #Column(name='percentile', type='int', comment='the percentile for calculating the quantile'),
        #Column(name='event_kind', type='string', comment='kind of climate extreme events'),
        ]
partition_columns = [
            #Partition(name='percentile_period', type='string', comment='the period of the quantile'), 
            Column(name='percentile', type=ColumnType.INT, comment='the percentile for calculating the quantile'),
            Column(name='event_kind', type=ColumnType.STRING, comment='the partition'),
            Column(name='member_id', type=ColumnType.INT, comment='member id of cesm-lens1')]

# print columns detail in the output cell
columns, partition_columns 

### 2.2 Create table

In [None]:
table_name = "cesm_ass2_50year_0730"
comment = 'This table stored the extremes of heatwaves and prep of cesm-lens1 06-100, try_2023_07_30,first is prect, second is trefhtmx'
citybrain_platform.Computing.create_table(name=table_name, columns=columns, partition_columns=partition_columns, description=comment)

### 2.3 Show schema of the newly created table

In [None]:
schema = citybrain_platform.Computing.get_table_schema(name=table_name)

# columns
# print(schema.columns)

print(schema.create_table_sql)

### 2.3 Truncate table && Dorp table

> **Truncate table** clears all records in the table, the table remains after truncation. If table has partition keys, you must specify the partition to truncate.
> 
> **Drop table** delete the table and all records.

In [None]:
# drop table
# ok = citybrain_platform.Computing.drop_table(name=table_name)

# truncate table with partition key
# ok = citybrain_platform.Computing.truncate_table(name=table_name, partition_key={"k1": "v1"})

# print(ok)

## 3. Select or insert information from table

### 3.1 Query case
> Query data from table `cesm_trefhtmax` and insert the query results into the table created before
>
> Create a **job** to do this time-consuming work.

In [None]:
sql = """WITH T0 AS (
SELECT 
	trefhtmx climate_index,
	lat lat,
	lon lon,
	time time_list
	
FROM

	cesm_trefhtmx 

WHERE 
	(
			(
			1 = 1 OR 
				(
			YEAR(time) >= 2006 AND
			YEAR(time) <= 2015 AND
			MONTH(time) >= 1 AND
			MONTH(time) <= 12
				)
			)
			
 			
			
		AND
				( member_id = 105 )
		AND 	
				(0 = 0 OR trefhtmx IS NOT NULL)
		AND 	
				(0 = 0 OR trefhtmx > 0)
	)
)
,


T1 AS
(
SELECT 
	lon,
	lat,
	time_list,
	
	climate_index,
	
	(
		CASE 
			WHEN 
				NULL IS NULL
			THEN
				
				percentile_approx(T0.climate_index,0.9900000000000001) OVER (PARTITION BY lon, lat)
			ELSE
				NULL 
		END
	)AS quant
FROM 
	T0
),


T2 AS (
SELECT 
	lon, 
	lat,
	time_list,
	climate_index,
	quant,
	(CASE 
		WHEN quant <= T1.climate_index 
		THEN  1 
	ELSE 0 END) AS HW
FROM 
	T1),


T3 AS (

SELECT 
	lon, 
    lat,
    time_list,
    climate_index,
    quant,
    HW,
    ROW_NUMBER() OVER ( ORDER BY lat,lon,time_list) AS row_num
FROM
	T2
WHERE 
	HW = 1
),


T3_SHIFT AS (
    SELECT 
        lon, 
        lat,
        time_list,
        climate_index,
        quant,
        HW,
        (
            CASE 
                WHEN DATEDIFF(time_list, LAG(time_list, 1) OVER (ORDER BY row_num),'dd') = 1 
                    AND lat = LAG(lat, 1) OVER (ORDER BY row_num) 
                    AND lon = LAG(lon, 1) OVER (ORDER BY row_num)
                THEN 0
                ELSE 1
            END
        ) AS diff,
		row_num
    FROM T3
),

 T_fina_0 AS(
 SELECT
	lon,
 	lat,
 	time_list,
 	climate_index,
 	quant,
 	HW,
	diff,
	COUNT(group_id) OVER(PARTITION BY group_id  ORDER BY group_id) duration,
	group_id,
	row_num
 FROM 
	 (
	 SELECT 
 		lon,
 		lat,
 		time_list,
 		climate_index,
 		quant,
 		HW,
		diff,
 		
 		SUM(diff) OVER (ORDER BY row_num) AS group_id,
 		row_num
	 FROM
 		T3_SHIFT
	)T_temp
	
 ),

  T_final AS(
 SELECT
	lon,
 	lat,
 	time_list,
 	climate_index,
 	quant,
 	HW,
	diff,
	COUNT(group_id) OVER(PARTITION BY group_id  ORDER BY group_id) duration,
	group_id,
	row_num
 FROM 
	T_fina_0 
where 
	duration>=3
	
 ),

 
freq_col AS
(
SELECT
	lat,
	lon,
	COUNT(DISTINCT group_id)	feq 
FROM
	T_final FT
GROUP BY
	FT.lat ,FT.lon
),


dura_col AS
(
SELECT
	 DISTINCT lat, lon, total_dur	
 
FROM
(
	SELECT
		lat,
		lon,
		SUM(avg_d) OVER(PARTITION BY lat,lon) total_dur 
	FROM	
	(
		SELECT 
			lat,
			lon,
			AVG(duration) avg_d
		FROM
			T_final FT
		GROUP BY
			FT.lat,FT.lon,group_id
	)T_1
)T_2	
),


intens_col AS(
SELECT 
	lat,
	lon,
	AVG(FT.climate_index) mean_tmp
FROM
	T_final FT
GROUP BY
	lat,
	lon
),

Metrics_col AS(
SELECT
	T_t.frequence,
	ic.mean_tmp mean_t,
	T_t.total_duration,
	T_t.lat,
	T_t.lon
FROM
(
	SELECT
		fc.feq frequence,
		dc.total_dur total_duration,
		fc.lat lat,
		fc.lon lon
	FROM
		freq_col fc,dura_col dc
	WHERE 
		fc.lat = dc.lat AND
		fc.lon = dc.lon
	)T_t,intens_col ic
WHERE
	T_t.lat = ic.lat AND
	T_t.lon = ic.lon
),


result AS(
SELECT
	ft.lat,
	ft.lon,
	ft.time_list,
	ft.climate_index climate_index,
	ft.quant quant,
	mc.frequence,
	mc.mean_t mean_t, 
	ft.group_id,
	ft.duration single_event_dur,
	mc.total_duration total_duration,	
	ft.row_num

FROM
	Metrics_col mc,T_final ft
WHERE
	ft.lat = mc.lat AND ft.lon = mc.lon
)


INSERT INTO TABLE cesm_ass2_50year_0730 partition(percentile=99, member_id=105, event_kind = 'trefhtmx')
SELECT * except(row_num)
	
	
FROM result

ORDER BY 
		lat,
		lon,
 		time_list;
"""

# create job
job_id = citybrain_platform.Computing.create_job(sql=sql)

print(job_id)

### 3.2 Get job status

> You can query the job status to see if the job is running or terminated

In [None]:
from citybrain_platform import JobStatus

status = citybrain_platform.Computing.get_job_status(job_id=job_id)
print(status.status)

if status.status == JobStatus.RUNNING:
    for sub_task in status.progress:
        print(sub_task)
elif status.status == JobStatus.TERMINATED:
    print(status.summary)


### 3.3 Download job results

> When job terminated, you can download the job results directly if the job is a **select**. 
>
>For **insert select** query, you may create a new **select** query job and download the results of that job.

In [None]:
import time
from citybrain_platform import JobStatus

# results save to local file
result_local_file = f"{table_name}.csv"

# create job to get 1000 records of the result table
sqlresult = f"select * from {table_name} limit 1000"
resultjob_id = citybrain_platform.Computing.create_job(sql=sqlresult)

while True:
    status = citybrain_platform.Computing.get_job_status(job_id=resultjob_id)
    print(status.status)
    if status.status == JobStatus.TERMINATED:
        break
    time.sleep(1)

print("downloading result")
citybrain_platform.Computing.get_job_results(job_id=resultjob_id, filepath=result_local_file)
