# Querying Streaming Twitter Data Results in S3 with Athena

This notebook is meant to follow after you have a running streaming application that is outputing raw and analyzed data into S3. 
The notebook will show you how you can use the serverless interactive query service [Amazon Athena](https://aws.amazon.com/athena/) to run standard sql queries against the data directly in S3 without needing to move or transform the data into a database. 

While we could do the following steps using the Athena console or CLI commanes, here we will show you how you can use the [PyAthena](https://pypi.org/project/PyAthena/) library to connect to Amazon Athena to run SQL queries directly from your Jupyter notebook. We will also import the results into Pandas DataFrames, a common representation for further analytics of the data. 

### Install the PyAthena library

In [None]:
!pip install pyathena

## Import libraries we will be using, and set some configurations

*Note: It is important to update the **bucket** variable with the name of the bucket that was created in the previous demo steps using CloudFormation*

In [None]:
from pyathena import connect
import pandas as pd
from pyathena.pandas_cursor import PandasCursor

bucket = '[YOUR BUCKET HERE]'

conn = connect(s3_staging_dir='s3://{}/athena-staging/'.format(bucket))
cursor = conn.cursor()

## Create the Athena tables

We are going to create Amazon Athena tables for our Twitter data using the PyAthena library. This is also a great place to leverage AWS Glue crawling features in your data lake architectures. The crawlers will automatically discover the data format and data types of your different datasets that live in Amazon S3 (as well as relational databases and data warehouses). More details can be found in the documentation for Crawlers with AWS Glue. 

We will run the following queries to create the Athena database and tables:

## Create the database

In [None]:
db_create_query = "CREATE DATABASE socialanalytics;"

cursor.execute(db_create_query)
print('Created the socialanalytics database')

## Create an external table for the raw tweets

In [None]:
tweets_table = "CREATE EXTERNAL TABLE socialanalytics.tweets ( \
    coordinates STRUCT< \
        type: STRING, \
        coordinates: ARRAY< \
            DOUBLE \
        > \
    >, \
    retweeted BOOLEAN, \
    source STRING, \
    entities STRUCT< \
        hashtags: ARRAY< \
            STRUCT< \
                text: STRING, \
                indices: ARRAY< \
                    BIGINT \
                > \
            > \
        >, \
        urls: ARRAY< \
            STRUCT< \
                url: STRING, \
                expanded_url: STRING, \
                display_url: STRING, \
                indices: ARRAY< \
                    BIGINT \
                > \
            > \
        > \
    >, \
    reply_count BIGINT, \
    favorite_count BIGINT, \
    geo STRUCT< \
        type: STRING, \
        coordinates: ARRAY< \
            DOUBLE \
        > \
    >, \
    id_str STRING, \
    timestamp_ms BIGINT, \
    truncated BOOLEAN, \
    text STRING, \
    retweet_count BIGINT, \
    id BIGINT, \
    possibly_sensitive BOOLEAN, \
    filter_level STRING, \
    created_at STRING, \
    place STRUCT< \
        id: STRING, \
        url: STRING, \
        place_type: STRING, \
        name: STRING, \
        full_name: STRING, \
        country_code: STRING, \
        country: STRING, \
        bounding_box: STRUCT< \
            type: STRING, \
            coordinates: ARRAY< \
                ARRAY< \
                    ARRAY< \
                        FLOAT \
                    > \
                > \
            > \
        > \
    >, \
    favorited BOOLEAN, \
    lang STRING, \
    in_reply_to_screen_name STRING, \
    is_quote_status BOOLEAN, \
    in_reply_to_user_id_str STRING, \
    user STRUCT< \
        id: BIGINT, \
        id_str: STRING, \
        name: STRING, \
        screen_name: STRING, \
        location: STRING, \
        url: STRING, \
        description: STRING, \
        translator_type: STRING, \
        protected: BOOLEAN, \
        verified: BOOLEAN, \
        followers_count: BIGINT, \
        friends_count: BIGINT, \
        listed_count: BIGINT, \
        favourites_count: BIGINT, \
        statuses_count: BIGINT, \
        created_at: STRING, \
        utc_offset: BIGINT, \
        time_zone: STRING, \
        geo_enabled: BOOLEAN, \
        lang: STRING, \
        contributors_enabled: BOOLEAN, \
        is_translator: BOOLEAN, \
        profile_background_color: STRING, \
        profile_background_image_url: STRING, \
        profile_background_image_url_https: STRING, \
        profile_background_tile: BOOLEAN, \
        profile_link_color: STRING, \
        profile_sidebar_border_color: STRING, \
        profile_sidebar_fill_color: STRING, \
        profile_text_color: STRING, \
        profile_use_background_image: BOOLEAN, \
        profile_image_url: STRING, \
        profile_image_url_https: STRING, \
        profile_banner_url: STRING, \
        default_profile: BOOLEAN, \
        default_profile_image: BOOLEAN \
    >, \
    quote_count BIGINT \
) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' \
LOCATION 's3://{}/raw';".format(bucket)

cursor.execute(tweets_table)
print('Created the socialanalytics.tweets table')

## Query the tweets table

In [None]:
query = "SELECT * from socialanalytics.tweets limit 500"

df = pd.read_sql(query, conn)
df.head()

## Entities and Sentiment Tables

Now we will create tables for the entities and sentiment data that is also stored in S3. 

In [None]:
entities_table = "CREATE EXTERNAL TABLE socialanalytics.tweet_entities ( \
    tweetid BIGINT, \
    entity STRING, \
    type STRING, \
    score DOUBLE \
) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' \
LOCATION 's3://{}/entities/';".format(bucket)

conn.cursor().execute(entities_table)
print('Created the Entities table')

In [None]:
sentiment_table = "CREATE EXTERNAL TABLE socialanalytics.tweet_sentiments ( \
    tweetid BIGINT, \
    text STRING, \
    originalText STRING, \
    sentiment STRING, \
    sentimentPosScore DOUBLE, \
    sentimentNegScore DOUBLE, \
    sentimentNeuScore DOUBLE, \
    sentimentMixedScore DOUBLE \
) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' \
LOCATION 's3://{}/sentiment/';".format(bucket)

conn.cursor().execute(sentiment_table)
print('Created the Sentiment table')

## Querying the data

Now that we have our tables created, we can run sql queries against them to begin exploring the data. 

### Run the following query to view a sample of the sentiments table:

In [None]:
query = "SELECT * from socialanalytics.tweet_sentiments limit 20;"

df = pd.read_sql(query, conn)
df.head()

### Pull the top entity types:


In [None]:
query = "select type, count(*) cnt from socialanalytics.tweet_entities group by type order by cnt desc"

df = pd.read_sql(query, conn)
df.head()

### Let’s now pull 20 positive tweets and see their scores from sentiment analysis:


In [None]:
query = "select * from socialanalytics.tweet_sentiments where sentiment = 'POSITIVE' limit 20;"

df = pd.read_sql(query, conn)
df.head()

### Look at the distribution of languages accross tweets

In [None]:
query  = "select lang, count(*) cnt \
        from socialanalytics.tweets \
        group by lang \
        order by cnt desc"

df = pd.read_sql(query, conn)
df.head()

### Now we can pull the top 20 commercial items:

In [None]:
query = "select entity, type, count(*) cnt \
        from socialanalytics.tweet_entities \
        where type = 'COMMERCIAL_ITEM' \
        group by entity, type \
        order by cnt desc limit 20;"

df = pd.read_sql(query, conn)
df.head()

### You can also start to query the translation details. Even if I don’t know the German word for shoe, I could easily do the following query:


In [None]:
query = "select ts.text, ts.originaltext \
        from socialanalytics.tweet_sentiments ts \
        join socialanalytics.tweets t on (ts.tweetid = t.id) \
        where lang = 'de' and ts.text like '%shoe%'"

df = pd.read_sql(query, conn)
df.head()

### The results show a tweet talking about shoes based on the translated text: Let’s also look at the non-English tweets that have Kindle extracted through NLP:


In [None]:
query = "select lang, ts.text, ts.originaltext \
            from socialanalytics.tweet_sentiments ts \
            join socialanalytics.tweets t on (ts.tweetid = t.id) \
            where lang != 'en' and ts.tweetid in \
                (select distinct tweetid from socialanalytics.tweet_entities where entity = 'AWS')"

df = pd.read_sql(query, conn)
df.head()

## Cleanup

When you are done with the demo, uncomment and run the following cell to drop all of the tables and the database we created.

In [None]:
#db_delete_query = "DROP DATABASE socialanalytics CASCADE;"
#conn.cursor().execute(db_delete_query)