## AWS Athena

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

Athena is easy to use. Simply point to your data in Amazon S3, define the schema, and start querying using standard SQL. Most results are delivered within seconds.

Udacity lessons:
> https://classroom.udacity.com/nanodegrees/nd027-mena-connect/parts/0b05eac1-17a3-4f7e-a43d-9b722fa0f745/modules/96060264-d86d-4b69-9259-0726cc2a336f/lessons/f14bb167-fee8-4a4b-94d3-9ca7fcbabe77/concepts/104d698b-d7bd-4335-b9a4-75c69fd5ed70

>https://classroom.udacity.com/nanodegrees/nd027-mena-connect/parts/0b05eac1-17a3-4f7e-a43d-9b722fa0f745/modules/96060264-d86d-4b69-9259-0726cc2a336f/lessons/f14bb167-fee8-4a4b-94d3-9ca7fcbabe77/concepts/7bf7229e-1506-4e27-affd-d215d9e3e949

**Benefits**
* **Serverless**: Athena is serverless. You can quickly query your data without having to setup and manage any servers or data warehouses.
* **runs standard SQL**: Athena is ideal for quick, ad-hoc querying but it can also handle complex analysis, including large joins, window functions, and arrays.
* **Only pay for data scanned** : you pay only for the queries that you run. You are charged $5 per terabyte scanned by your queries. Athena queries data directly in Amazon S3. There are no additional storage charges beyond S3.
* **Fast, Really Fast**: Amazon Athena automatically executes queries in parallel, so most results come back within seconds.

# The exercise

What we would like to do is simply creating a data pipeline for our third project (Data Warehouse), we will do the trasformation through Athena and then store the results in S3 to be queried later with Athena.

> We will use Airflow to manage the scheduling part, through udacity airflow workspace.

> S3 --> Do the Transformation through Athena --> Store the trasformed data again on S3 (partitioned)

> Fact table to be updated on a daily basis.

> Dimension tables to be updated on a weekly basis.

### Step1: create an Athena DB.

Name it:  `athena_tut`

### Step2: Create the tables needed for our raw data.

**On your Athena console, run these two queries**
```
CREATE EXTERNAL TABLE IF NOT EXISTS athena_tut.raw_song_data (
  `artist_id` string,
  `artist_latitude` float,
  `artist_longitude` float,
  `artist_location` string,
  `artist_name` string,
  `song_id` string,
  `title` string,
  `duration` float,
  `year` int 
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
) LOCATION 's3://udacity-dend/song_data/'
TBLPROPERTIES ('has_encrypted_data'='false');
```

```
CREATE EXTERNAL TABLE IF NOT EXISTS athena_tut.raw_log_data (
  `artist` string,
  `auth` string,
  `firstname` string,
  `gender` string,
  `iteminsession` string,
  `length` string,
  `level` string,
  `location` string,
  `method` string,
  `page` string,
  `registration` string,
  `sessionid` string,
  `song` string,
  `status` string,
  `ts` string,
  `useragent` string,
  `userid` string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
) LOCATION 's3://udacity-dend/log_data/'
TBLPROPERTIES ('has_encrypted_data'='false');
```

**Test your tables by running queries on them**

### Step3: Add your AWS Keys to your Airflow workspace.

> Do not forget to run `/opt/airflow/start.sh` in your workspace terminal in order to run airflow.

Refer the lesson to create the keys if you haven't before.
> https://classroom.udacity.com/nanodegrees/nd027-mena-connect/parts/6a3d5ccd-d0be-4632-ab7b-d20e3bae92d1/modules/445568fc-578d-4d3e-ab9c-2d186728ab22/lessons/21d59f40-6033-40b5-81a2-4a3211d9f46e/concepts/b2d6adbf-3324-45ab-b74b-b5141aa18ef3

Access your Airflow UI, and then paste your keys in:

`Menu -> Admin -> Connections -> aws_default`

### Step4: Create an S3 bucket for your work.

### Step5: Create your pipeline with Airflow (fact  songplay_table)

We will be creating our tables using Athena CTAS syntax:
>https://docs.aws.amazon.com/athena/latest/ug/ctas-examples.html#ctas-example-specify-columns

EXAMPLE : Creating Bucketed and Partitioned Tables 
```
CREATE TABLE  songplay_table
WITH (
      format = 'CSV', 
      external_location = 's3://my_athena_results/songplay_table/'
      ) 
      AS
    SELECT  DISTINCT(cast(from_unixtime(e.ts/1000) as timestamp))  AS start_time, 
        e.userId        AS user_id, 
        e.level         AS level, 
        s.song_id       AS song_id, 
        s.artist_id     AS artist_id, 
        e.sessionId AS session_id, 
        e.location      AS location, 
        e.userAgent     AS user_agent
    FROM raw_log_data e
    JOIN song_data  s   ON (e.song = s.title AND e.artist = s.artist_name)
    AND e.page  LIKE  'NextSong'
    ;
```

Use the AWSAthenaOperator to schedule the query using Airflow (on a daily basis): 

EXAMPLE of such a DAG: 
```
from airflow.models import DAG
from airflow.contrib.operators.aws_athena_operator import AWSAthenaOperator
from datetime import datetime

with DAG(dag_id='simple_athena_query',
         schedule_interval='@daily',
         start_date=datetime(2019, 5, 21)) as dag:
 
    run_query = AWSAthenaOperator(
        task_id='run_query',
        query='''
        CREATE TABLE  songplay_table
        WITH (
              format = 'PARQUET', 
              external_location = 's3://my_athena_results/songplay_table/{{ds}}'
              ) 
              AS
   SELECT  DISTINCT(cast(from_unixtime(e.ts/1000) as timestamp))  AS start_time, 
        e.userId        AS user_id, 
        e.level         AS level, 
        s.song_id       AS song_id, 
        s.artist_id     AS artist_id, 
        e.sessionId AS session_id, 
        e.location      AS location, 
        e.userAgent     AS user_agent
    FROM raw_log_data e
    JOIN song_data  s   ON (e.song = s.title AND e.artist = s.artist_name)
    AND e.page  LIKE  'NextSong'
    ;''',
        database='athena_tut'
    )
    
```

### Step6: do the same for the other tables (use the query example below).

time dim table (Athena format):
```
 SELECT  DISTINCT(start_time)                AS start_time,
            hour(start_time)       AS hour,
            day(start_time)        AS day,
            week(start_time)      AS week,
            month(start_time)    AS month,
            year(start_time)       AS year
    FROM songplay_table;
```

### Step7: Create a sperate dag for dimension tables (weekly run).

```
from airflow.models import DAG
from airflow.contrib.operators.aws_athena_operator import AWSAthenaOperator
from datetime import datetime

with DAG(dag_id='simple_athena_query',
         schedule_interval='@weekly',
         start_date=datetime(2019, 5, 21)) as dag:

    run_query = AWSAthenaOperator(
        task_id='run_query',
        query='''
        CREATE TABLE  dim_time_table
        WITH (
              format = 'PARQUET', 
              external_location = 's3://my_athena_results//'
              ) 
              AS
    SELECT  DISTINCT(start_time)                AS start_time,
            hour(start_time)       AS hour,
            day(start_time)        AS day,
            week(start_time)      AS week,
            month(start_time)    AS month,
            year(start_time)       AS year
    FROM songplay_table;
    ;''',
        database='athena_tut'
    )
```