### Improving the `loader` module by doing more with Spark
#### Goals
1. Create full extracts at time of load
2. Improve performance of loader (if possible)
3. Use Spark to create user-defined extracts

#### Approach
1. Migrate from Spark RDD to Spark Dataframes to optimize load time.
2. Use the initial Spark DF to persist the full extract to disk.
2. Leverage Spark Streaming to create user-defined extracts with checkpoints (to make jobs more fault tolerant)

#### Preliminaries

We use `models.py` to create the index in Elasticsearch.

In [103]:
import sys
sys.path.append('./TweetSets')
from models import to_tweet
import models
from twarc import json2csv
import json
import math

This code is replicated from `loader.py`; it contains some logic that is employed in the RDD load to Elasticsearch (used here for testing/comparison).

In [4]:
def clean_tweet_dict(tweet_dict):
    new_tweet_dict = tweet_dict['_source']
    new_tweet_dict['created_at'] = tweet_dict['_source']['created_at'].isoformat()
    new_tweet_dict['tweet_id'] = tweet_dict['_id']
    return new_tweet_dict

The following `.jar` file is included in the `Spark-loader` Docker image.

In [6]:
path_to_jar = '/Users/dsmith/Documents/code/tweetsets-dev/Tweetsets-upgrade-sprint/elasticsearch-hadoop-7.9.2.jar'

The following environment variable is required for launching the job from the Jupyter notebook. I don't think we need it in production, since it's included in the shell command.

In [7]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--jars {path_to_jar} pyspark-shell'

This code is also Jupyter-specific; it points the Python kernel to the local Spark installation.

In [8]:
import findspark
findspark.init()

The `pyspark` imports contain the functions and classes we need to using the Dataframe API.

In [9]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, explode
import pyspark.sql.functions as F

We connect to our local ES instance.

In [14]:
# Create the ES connection in order to create the index, etc.
# Running locally -- need to remove the options to sniff on start and retry on timeout
from elasticsearch_dsl.connections import connections
connections.create_connection(hosts=['localhost:9200'], timeout=90, maxsize=25)


<Elasticsearch([{'host': 'localhost', 'port': 9200}])>

In [265]:
from elasticsearch import Elasticsearch
from elasticsearch_dsl import A, Search
es = Elasticsearch()

#### Data for testing

Our sample for testing contains about 40G worth of Tweets.

In [22]:
# Using a sample of about 40 GB
from pathlib import Path
path_to_datasets = Path('./datasets/brexit/sample')

Initialize the Spark sessionm and set the TZ to match our Twitter data.

In [10]:
spark = SparkSession.builder.appName('TweetSets').getOrCreate()

In [11]:
spark.conf.set('spark.sql.session.timeZone', 'UTC')

We create two separate indices in ES to compare the performance and result.

In [105]:
dataset_id_df = 'brexit-test'
dataset_id_rdd = 'brexit-test-rdd'
def to_tweet_dict(tweet_str):
    return clean_tweet_dict(to_tweet(json.loads(tweet_str), dataset_id, '', store_tweet=True).to_dict(include_meta=True))

#### Creating a schema

Unlike the Spark RDD, the Dataframe API requires a schema (Scala types). Spark can infer this directly from the `jsonl` documents, but it is more efficient to create the schema in advance and load it at time of processing. Here we create a schema based on a single file and save it for future use.

In [24]:
path_to_sample = path_to_datasets / '1e30ce0543954d99b169c12390599aea-20191016224305252-00000-mjsaekx2.json'
df = spark.read.json(str(path_to_sample))

In [25]:
schema = df.schema

In [None]:
with open('tweet_schema.json', 'w') as f:
    json.dump(schema.jsonValue(), f)

##### Schema Issues
1. We may need to a `full_text` manually to the schema, since it doesn't appear here. (Its presence may depend on how the tweets were harvested.)
2. The output for `created_at` is not quite the same as from the Python code. 
  - Python produces `2019-10-16T22:43:00+00:00`
  - Spark SQL produces `2019-10-16T22:43:00Z` where the zero is rendered as `Z`. Not sure if that is okay for Elasticsearch or will make a difference in indexing.

#### Replicating the TS `TweetDocument` Model

The following translates the Python code in the `models.to_tweet` function to a Spark SQL statement. Using Spark SQL syntax, some repetition is unavoidable. But it provides optimized performance relative to passing `to_tweet` as a UDF (user-defined function) to Spark for execution. 

Using syntax for [common table expressions](https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-cte.html), we create a CTE with the `tweet_type` as a field, since these velues are used to derive the values in the `urls` and `text` fields. 

We include some complex columns in the initial SQL expression -- `quoted_status`, `retweeted_status` -- which can be dropped from the final DF.

In [379]:
sql_code = '''
     with cte as (
         select id_str as tweet_id,
            case when isnotnull(in_reply_to_status_id) then 'reply'
                when isnotnull(retweeted_status) then 'retweet'
                when isnotnull(quoted_status) then 'quote'
                else 'original'
            end as tweet_type,
            coalesce(extended_tweet.full_text, text) as text_str,
            quoted_status,
            retweeted_status,
            in_reply_to_user_id_str as in_reply_to_user_id,
            in_reply_to_screen_name,
            in_reply_to_status_id_str as in_reply_to_status_id,
            date_format(to_timestamp(created_at, 'EEE MMM dd HH:mm:ss ZZZZZ yyyy'),
                "yyyy-MM-dd'T'HH:mm:ssX") as created_at,
            user.id_str as user_id,
            user.screen_name as user_screen_name,
            user.followers_count as user_follower_count,
            user.verified as user_verified,
            user.lang as user_language,
            user.utc_offset as user_utc_offset,
            user.time_zone as user_time_zone,
            user.location as user_location,
            transform(coalesce(extended_tweet.entities.user_mentions,
                                entities.user_mentions), x -> x.id_str) as mention_user_ids,
            transform(coalesce(extended_tweet.entities.user_mentions,
                                entities.user_mentions), x -> x.screen_name) as mention_screen_names,
            transform(
                    coalesce(extended_tweet.entities.hashtags,
                        entities.hashtags), x -> lower(x.text)) as hashtags,
            favorite_count,
            retweet_count,
            lang as language,
            isnotnull(entities.media) or isnotnull(extended_tweet.entities.media) 
                    as has_media,
            transform(coalesce(extended_tweet.entities.urls,
                        entities.urls), x -> coalesce(x.expanded_url, x.url)) as tweet_urls,
            isnotnull(geo) or isnotnull(place) or isnotnull(coordinates) as has_geo,
            tweet
        from tweets)
        select 
            case when tweet_type = 'quote' then array(text_str, 
                                                    coalesce(quoted_status.extended_tweet.full_text,
                                                            quoted_status.text))
                when tweet_type = 'retweet' then array(coalesce(retweeted_status.extended_tweet.full_text,
                                                            retweeted_status.text))
                else array(text_str)
            end as text,
            case when tweet_type = 'quote' then quoted_status.user.id_str
                else retweeted_status.user.id_str
            end as retweeted_quoted_user_id,
            case when tweet_type = 'quote' then quoted_status.user.screen_name
                else retweeted_status.user.screen_name
            end as retweeted_quoted_screen_name,
            case when tweet_type = 'quote' then quoted_status.id_str
                else retweeted_status.id_str
            end as retweet_quoted_status_id,
            case when tweet_type = 'quote' then transform(filter(tweet_urls, x -> x not like 'https://twitter.com/%'),
                                                            x -> lower(replace(lower(x), 'https://', 'http://')))
                else transform(tweet_urls, x -> lower(replace(lower(x), 'https://', 'http://')))
            end as urls,
            *
        from cte

'''


Saving the SQL to disk for re-use later.

In [498]:
with open('tweet_sql.sql', 'w') as f:
    f.write(sql_code)

### Benchmarking ETL

The following benchmarks were done running Spark in local (not cluster) mode. 

Machine specs:
* MacBook Pro, running Mojave (10.14.6)
* 2.8 GHz Intel Core i7, 4 CPU cores
* 16 GB 2133 MHz LPDDR3 RAM
* 466 GB available of 1 TB storage

#### Dataframe vs RDD for local operations

Use the `count` method as a simple proxy for operations on large dataset.

Create an RDD, using the current methods in `loader.py`.

In [None]:
dataset_id = 'brexit-test-rdd'
tweets_str_rdd = spark.sparkContext.textFile('./datasets/brexit/sample/*.json')
tweets_rdd = tweets_str_rdd.map(to_tweet_dict).map(lambda row: (row['tweet_id'], row))

In [None]:
tweets_rdd.count()

Create a Dataframe, applying the transformations necessary for our ES index.

In [25]:
path_to_full = str(path_to_datasets)
df_full = spark.read.schema(schema).json(path_to_full)
df_full = df_full.withColumn("tweet", F.to_json(F.struct([df_full[x] for x in df_full.columns])))
df_full.createOrReplaceTempView("tweets")
df_full = spark.sql(sql_code)
cols_to_drop = ['tweet_urls','quoted_status', 'retweeted_status', 'text_str']
df_full = df_full.drop(*cols_to_drop)
df_full = df_full.withColumn('dataset_id', F.lit(dataset_id))
df_full.count()

7285533

| Method          | Time
________________________
| RDD             | 24 min

| Dataframe       | 50 sec

#### Benchmarking with Elasticsearch Hadoop 

The following benchmarks were established on a two-node Elasticsearch cluster running locally in Docker. One limitation is that only one node was used by this cluster for loading. Normally, the `elasticsearch-hadoop` library will delegate all available nodes to Spark for reading/writing, but in this case, on account of my configuration, I had to constrain Spark to the primary node. That no doubt introduced additional latency.


This function replicates the logic above for creating a TweetSet document via Spark Dataframe transformations.

In [106]:
def make_spark_df(path_to_datasets, schema, sql_code, dataset_id):
    path_to_datasets = str(path_to_datasets)
    # Load the JSON into a dataframe, using the supplied schema
    df = spark.read.schema(schema).json(path_to_datasets)
    # Create a column to hold the JSON representation of the entire document 
    df = df.withColumn("tweet", F.to_json(F.struct([df[x] for x in df.columns])))
    # Register a temp view to execute the SQL code against
    df.createOrReplaceTempView("tweets")
    # Execute SQL code
    df = spark.sql(sql_code)
    # Drop intermediate columns
    cols_to_drop = ['tweet_urls','quoted_status', 'retweeted_status', 'text_str']
    df = df.drop(*cols_to_drop)
    # Add dataset ID as a new column
    return df.withColumn('dataset_id', F.lit(dataset_id))

In creating the ES index, we need to register the number of shards. This code is replicated from `tweetset_loader.py`. 

In [16]:
def shard_count(tweet_count, store_tweet=True):
    # In testing, 500K tweets (storing tweet) = 615MB
    # Thus, 32.5 million tweets per shard to have a max shard size of 40GB
    # In testing, 500k tweets (not storing tweet) = 145MB
    # Thus, 138 million tweets per shard to have a max shard size of 40GB
    tweets_per_shard = 32500000 if store_tweet else 138000000
    return math.ceil(float(tweet_count) / tweets_per_shard) or 1
tweets_per_shard = 32500000
tweet_count = 7285533
shards = max(shard_count(tweet_count, store_tweet=True), 4)

The following code is also from `tweetset_loader.py`. It creates the index in Elasticsearch

In [403]:
def create_index(dataset_id):
    # Create the index in ES
    tweet_index = models.TweetIndex(dataset_id, shards=shards, replicas=0, refresh_interval=-1)
    tweet_index.create()

In [None]:
dataset_id_df = 'brexit-test'


In [404]:
create_index(dataset_id_df)

The ES-Hadoop configuration, taken from `tweetset_loader.py`. The entry for `es.nodes.wan.only` is used to restrict Spark to a single node. 

In [405]:
 es_conf = {"es.nodes": 'localhost',
                       "es.port": "9200",
                       "es.index.auto.create": "false",
                       "es.mapping.id": "tweet_id",
                       "es.resource": dataset_id_df,
                       'es.nodes.discovery': 'false',
                       'es.nodes.data.only': 'false',
                       "es.nodes.wan.only": "true"}

Creating the Spark Dataframe. (Won't actually execute until the `save` operation below.)

In [406]:
df_full = make_spark_df(path_to_datasets, schema, sql_code, dataset_id_df)

In [407]:
# Sanity check
df_full.count()

7285533

Writing the Spark DataFrame to ES.

In [None]:
df_full.write.format('org.elasticsearch.spark.sql').options(**es_conf).save()

Total time spent: 3.7 hours

![Job report from Spark Job monitor for DF: 3.7 hours](spark-es-df.png)
![Job stats from Spark Jobs for DF](spark-es-df-stats.png)

Now for comparison, we load the same dataset as a separate index, using the RDD API.

In [98]:
create_index(dataset_id_rdd)

In [100]:
dataset_id = dataset_id_rdd
tweets_str_rdd = spark.sparkContext.textFile('./datasets/brexit/sample/*.json')
tweets_rdd = tweets_str_rdd.map(to_tweet_dict).map(lambda row: (row['tweet_id'], row))

Same settings as above, except for the value of `es.resource` (the ES index name).

In [99]:
 es_conf = {"es.nodes": 'localhost',
                       "es.port": "9200",
                       "es.index.auto.create": "false",
                       "es.mapping.id": "tweet_id",
                       "es.resource": dataset_id_rdd,
                       'es.nodes.discovery': 'false',
                       'es.nodes.data.only': 'false',
                       "es.nodes.wan.only": "true"}

This code is verbatim from `tweetset_loader.py`.

In [101]:
tweets_rdd.saveAsNewAPIHadoopFile(
                path='-',
                outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
                keyClass="org.apache.hadoop.io.NullWritable",
                valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
                conf=es_conf)

Total time spent: 4.9 hours

![Job report from Spark Jobs for RDD: 4.9 hours](spark-es-rdd.png)
![Job stats from Spark Jobs for RDD](spark-es-rdd-stats.png)

##### Results

The Dataframe API performs about 30% better than the RDD API in this environment.

#### Testing ES schema 

Comparing the ES documents created via Spark SQL with those created by the current method.

Modified version of this TweetSets method that does not use the TweetSets index naming convention. See the commented line below:

```
    index = dataset_params.get('source_dataset')
    #index = get_tweets_index_name(source_dataset)
    search = Search(index=index).extra(track_total_hits=True)
```

In [115]:
from utils_mod import dataset_params_to_search

Compare aggregate results, using the date field

In [419]:
def compare_aggs(dataset_rdd, dataset_df):
    for d in (dataset_rdd, dataset_df):
        s = Search(using=es, index=dataset_id_df)
        s.aggs.bucket('dates', 'stats', field='created_at') 
        response = s.execute()
        print(f'Aggregation from {d}')
        print(response.aggregations.to_dict())

In [420]:
compare_aggs(dataset_id_rdd, dataset_id_df)

Aggregation from brexit-test-rdd
{'dates': {'count': 7284216, 'min': 1571265780000.0, 'max': 1572311576000.0, 'avg': 1571737589951.189, 'sum': 1.144887610052389e+19, 'min_as_string': '2019-10-16T22:43:00.000Z', 'max_as_string': '2019-10-29T01:12:56.000Z', 'avg_as_string': '2019-10-22T09:46:29.951Z', 'sum_as_string': '+292278994-08-17T07:12:55.807Z'}}
Aggregation from brexit-test
{'dates': {'count': 7284216, 'min': 1571265780000.0, 'max': 1572311576000.0, 'avg': 1571737589951.189, 'sum': 1.144887610052389e+19, 'min_as_string': '2019-10-16T22:43:00.000Z', 'max_as_string': '2019-10-29T01:12:56.000Z', 'avg_as_string': '2019-10-22T09:46:29.951Z', 'sum_as_string': '+292278994-08-17T07:12:55.807Z'}}


Function to retrieve documents from ES index up to certain limit. Returns both Tweet ID's and CSV representation of documents.

In [413]:
def retrieve_docs(dataset_params, n=1000):
    s = dataset_params_to_search(dataset_params, skip_aggs=True)
    results = [r for i, r in enumerate(s.scan()) if i < n]
    ids = [r.meta.id for r in results]
    csv = [json2csv.get_row(json.loads(hit.tweet), excel=True) for hit in results]
    return ids, csv

Function to compare the CSV representation of two sets of Tweet documents.

In [414]:
def compare_csvs(csv1, csv2):
    for i, (c1, c2) in enumerate(zip(sorted(csv1, key=lambda x: x[0]),
                                     sorted(csv2, key=lambda x: x[0]))):
        for r1, r2 in zip(c1, c2):
            try:
                assert r1 == r2
            except AssertionError:
                print(f'Row {i}, CSV1 has {r1}, CSV2 has {r2}')
                continue
    return

Function to execute the same search against two indices and compare results.

In [449]:
def do_tests(dataset_params, dataset_1, dataset_2, n=1000):
    dataset_params['source_dataset'] = dataset_1
    ids1, csv1 = retrieve_docs(dataset_params, n)
    dataset_params['source_dataset'] = dataset_2
    ids2, csv2 = retrieve_docs(dataset_params, n)
    print(f'Comparing the first {n} of {len(ids1)} Tweet IDs from {dataset_1} and {dataset_2}')
    try:
        assert set(ids1) - set(ids2) == set()
    except AssertionError:
        print(f'Tweet IDs returned do not match.')
    print(f'Comparing CSV results from {dataset_1} and {dataset_2}')
    compare_csvs(csv1, csv2)
        

Basic search by date range.

In [426]:
dataset_params_date_range = {
    'tweet_type_original': 'true',
    'created_at_from': '2019-10-16',
    'created_at_to': '2019-10-17'
}

In [450]:
do_tests(dataset_params_date_range, dataset_id_rdd, dataset_id_df)

Comparing the first 1000 of 104131 Tweet IDs from brexit-test-rdd and brexit-test
Comparing CSV results from brexit-test-rdd and brexit-test


Search by keyword

In [451]:
dataset_params_keyword = {
    'tweet_text_all': 'UK',
    'tweet_type_original': 'true'
}
do_tests(dataset_params_keyword, dataset_id_rdd, dataset_id_df)

Comparing the first 1000 of 62129 Tweet IDs from brexit-test-rdd and brexit-test
Comparing CSV results from brexit-test-rdd and brexit-test


##### Results

The ES schemas (RDD-derived vs. Dataframe-derived) yield identical results on keyword and date-range searches, as well as on aggregation by date. 


#### Possible issuse with JSON representation of Tweet

It is [a known issue](https://stackoverflow.com/questions/60008665/why-spark-to-json-not-populating-null-values) in Spark v.2 that the Dataframe API in serializing to JSON will drop null fields. This poses a problem for creating the `tweet` field in the Tweet document, which contains the full JSON representation of the Tweet.

On the initial load from disk, the null fields in the JSON are preserved. Note the `contributors` and `coordinates` fields.

In [454]:
df_all = spark.read.schema(schema).json(str(path_to_datasets))
id_str = '1188822287841034240'
df1 = df_all.filter(df_all.id_str == id_str)
df1.collect()

[Row(contributors=None, coordinates=None, created_at='Mon Oct 28 14:18:15 +0000 2019', display_text_range=None, entities=Row(hashtags=[Row(indices=[91, 95], text='Ads'), Row(indices=[96, 104], text='apology'), Row(indices=[105, 112], text='brexit')], media=None, symbols=[], urls=[Row(display_url='flyingeze.com/?p=15306', expanded_url='https://flyingeze.com/?p=15306', indices=[67, 90], url='https://t.co/e21kQGAYMM'), Row(display_url='twitter.com/i/web/status/1…', expanded_url='https://twitter.com/i/web/status/1188822287841034240', indices=[114, 137], url='https://t.co/x9Xb4OMeJq')], user_mentions=[Row(id=3081529131, id_str='3081529131', indices=[56, 66], name='Flying Eze', screen_name='flyingeze')]), extended_entities=None, extended_tweet=Row(display_text_range=[0, 147], entities=Row(hashtags=[Row(indices=[91, 95], text='Ads'), Row(indices=[96, 104], text='apology'), Row(indices=[105, 112], text='brexit'), Row(indices=[113, 121], text='Dickens'), Row(indices=[122, 131], text='facebook')

This is the operation proposed to render the Tweet as a valid JSON string in the `tweet` field. Note that the `contributor` and `coordinates` fields are missing completely.

In [457]:
df1.withColumn("tweet", F.to_json(F.struct([df1[x] for x in df1.columns]))).select('tweet').collect()

[Row(tweet='{"created_at":"Mon Oct 28 14:18:15 +0000 2019","entities":{"hashtags":[{"indices":[91,95],"text":"Ads"},{"indices":[96,104],"text":"apology"},{"indices":[105,112],"text":"brexit"}],"symbols":[],"urls":[{"display_url":"flyingeze.com/?p=15306","expanded_url":"https://flyingeze.com/?p=15306","indices":[67,90],"url":"https://t.co/e21kQGAYMM"},{"display_url":"twitter.com/i/web/status/1…","expanded_url":"https://twitter.com/i/web/status/1188822287841034240","indices":[114,137],"url":"https://t.co/x9Xb4OMeJq"}],"user_mentions":[{"id":3081529131,"id_str":"3081529131","indices":[56,66],"name":"Flying Eze","screen_name":"flyingeze"}]},"extended_tweet":{"display_text_range":[0,147],"entities":{"hashtags":[{"indices":[91,95],"text":"Ads"},{"indices":[96,104],"text":"apology"},{"indices":[105,112],"text":"brexit"},{"indices":[113,121],"text":"Dickens"},{"indices":[122,131],"text":"facebook"},{"indices":[132,141],"text":"politics"},{"indices":[142,147],"text":"News"}],"symbols":[],"urls"

##### Possible solutions
1. Upgrade to Spark v. 3. (I don't think there are any changes that would break the current/proposed implementation, but this would require testing.) It looks like the `elasticsearch-hadoop` library supports Spark 3 as of [version 7.12.0](https://www.elastic.co/guide/en/elasticsearch/hadoop/7.12/eshadoop-7.12.0.html). We are currently using 7.9.2 
2. Decide that null fields are not important; users can always account for these when processing the JSON extracts downstream.
3. Explore other ways to add the serialized representation to the Tweet document at time of load.

#### Creating extracts at time of load

Since we have loaded the Tweet `jsonl` documents into a Dataframe, we can write to CSV and JSON at load time in order to produce the full extract.

In [462]:
path_to_files = Path('./tweetset_data/full_datasets/brexit-test')

Here we write the Tweet ID's to CSV.

In [460]:
df_full.select('tweet_id').write.csv(str(path_to_ids / 'brexit-test-ids'), compression='gzip')

JSON serialization of the full Tweet documents.

In [463]:
df_full.select('tweet').write.json(str(path_to_files / 'brexit-test-json'),
                                  compression='gzip')

##### Porting `twarc.json2csv` to Spark SQL.

Mapping of columns in our implementation to those in json2csv

In [482]:
column_mapping = {
    'retweet_quoted_status_id': 'retweet_or_quote_id',
    'retweeted_quoted_screen_name': 'retweet_or_quote_screen_name',
    'tweet_id': 'id',
    'user_follower_count': 'user_followers_count',
    'language': 'lang',
    'retweeted_quoted_user_id': 'retweet_or_quote_user_id'
}
column_mapping.update({k: k for k in df_full.columns if k in json2csv.get_headings()})

For `text`, `hashtags` and `urls`, we need to convert from array to space-separated string

Additional transformations required.

- tweet_url 
```
"https://twitter.com/%s/status/%s" % (t["user"]["screen_name"], t["id_str"])
```
- parsed_created_at
```
date_parse(get("created_at"))
```
- coordinates
```
    "%f %f" % tuple(t["coordinates"]["coordinates"])
```
- media
```
if "extended_entities" in t and "media" in t["extended_entities"]:
        return " ".join([h["media_url_https"] for h in t["extended_entities"]["media"]])
elif "media" in t["entities"]:
        return " ".join([h["media_url_https"] for h in t["entities"]["media"]])
else:
     return None
```
- place
```
    if "place" in t and t["place"]:
        return t["place"]["full_name"]
```
- possible_sensitive
```
t.get('possibly_sensitive')
```
- source
```
t.get('source')
```
- user_created_at
```
 t.get("user").get('created_at')
```
- user_default_profile_image
```
t.get("user").get('default_profile_image')
```
- user_description
```
t.get("user").get('description').replace("\n", " ").replace("\r", "")
```
- user_favourites_count
```
t.get("user").get('favorites_count')
```
- user_friends_count
```
t.get("user").get('friends_count')
```
- user_listed_count
```
t.get("user").get('listed_count')
```
- user_name
```
t.get("user").get('name').replace("\n", " ").replace("\r", "")
```
- user_statuses_count
```
t.get("user").get('statuses_count')
```
- user_urls

**Question**: Does the `user` field still contain an `entities` element? It doesn't show up in the schema derived from the Brexit Tweets
```
 u = t.get("user")
    if not u:
        return None
    urls = []
    if "entities" in u and "url" in u["entities"] and "urls" in u["entities"]["url"]:
        for url in u["entities"]["url"]["urls"]:
            if url["expanded_url"]:
                urls.append(url["expanded_url"])
    return " ".join(urls)
```

Revised SQL query to include additional columns

In [544]:
with open('./tweet_sql_exp.sql') as f:
    sql_code_exp = f.read()
df_exp = make_spark_df(path_to_datasets, schema, sql_code_exp, dataset_id_df)

Columns for ES 

**Need to hard code these somewhere**

In [513]:
es_columns = df_full.columns

In [549]:
es_columns

['text',
 'retweeted_quoted_user_id',
 'retweeted_quoted_screen_name',
 'retweet_quoted_status_id',
 'urls',
 'tweet_id',
 'tweet_type',
 'in_reply_to_user_id',
 'in_reply_to_screen_name',
 'in_reply_to_status_id',
 'created_at',
 'user_id',
 'user_screen_name',
 'user_follower_count',
 'user_verified',
 'user_language',
 'user_utc_offset',
 'user_time_zone',
 'user_location',
 'mention_user_ids',
 'mention_screen_names',
 'hashtags',
 'favorite_count',
 'retweet_count',
 'language',
 'has_media',
 'has_geo',
 'tweet',
 'dataset_id']

Columns for CSV extract

In [517]:
csv_columns = json2csv.get_headings()

**Use the following expression when saving to ES** to limit to those columns that match our ES schema.

In [545]:
df_exp.select(es_columns)

DataFrame[text: array<string>, retweeted_quoted_user_id: string, retweeted_quoted_screen_name: string, retweet_quoted_status_id: string, urls: array<string>, tweet_id: string, tweet_type: string, in_reply_to_user_id: string, in_reply_to_screen_name: string, in_reply_to_status_id: string, created_at: string, user_id: string, user_screen_name: string, user_follower_count: bigint, user_verified: boolean, user_language: string, user_utc_offset: string, user_time_zone: string, user_location: string, mention_user_ids: array<string>, mention_screen_names: array<string>, hashtags: array<string>, favorite_count: bigint, retweet_count: bigint, language: string, has_media: boolean, has_geo: boolean, tweet: string, dataset_id: string]

Prepare Dataframe for CSV extract.

In [532]:
def make_csv(df, column_map):
    # Convert array columns to string representations
    for c in ['text', 'hashtags', 'urls']:
        df = df.withColumn(c, F.concat_ws(' ', df[c]))
    # Rename columns as necessary
    for k, v in column_map.items():
        if k != v:
            df = df.withColumnRenamed(k, v)
    return df

In [546]:
df_csv = make_csv(df_exp, column_mapping)

For now, ommitting the possibly superfluous `user_urls` column

In [537]:
csv_columns = [c for c in csv_columns if c != 'user_urls']

In [548]:
df_csv.select(csv_columns).write.csv(str(path_to_files / 'brexit-test-csv'),
                                  compression='gzip')

Prepare `mentions` (edges and nodes). We use the parsed Dataframe (the version we loaded to ES).

In [605]:
def get_mentions(df):
    # Create a temp table so that we can use SQL
    df.createOrReplaceTempView("tweets_parsed")
    # SQL for extracting the mention ids, screen_names, and user_ids
    mentions_sql = '''
    select mentions.*,
        user_id
    from (
        select 
            explode(arrays_zip(mention_user_ids, mention_screen_names)) as mentions,
            user_id
        from tweets_parsed
    )
    '''
    mentions_df = spark.sql(mentions_sql)
    mention_edges = mentions_df.select('mention_user_ids', 'mention_screen_names')\
                        .distinct()
    mention_nodes = mentions_df.select('mention_user_ids', 'user_id').distinct()
    return mention_edges, mention_nodes
    

In [606]:
edges, nodes = get_mentions(df_full)

In [609]:
edges.write.csv(str(path_to_files / 'brexit-test-mention-edges'),
                                  compression='gzip')

In [610]:
nodes.write.csv(str(path_to_files / 'brexit-test-mention-nodes'),
                                  compression='gzip')

Prepare count of mentions per mentioned user (aka `top mentions`)

In [615]:
def agg_mentions():
    # Reusing the temporary table already registered
    sql_agg = '''
    select count(distinct tweet_id) as number_mentions,
        mention_user_id as mentioned_user
    from (
        select 
            explode(mention_user_ids) as mention_user_id,
            tweet_id
        from tweets_parsed
    )
    group by mention_user_id
    '''
    ment_agg_df = spark.sql(sql_agg)
    return ment_agg_df

In [616]:
mentions_agg_df = agg_mentions()

In [617]:
mentions_agg_df.write.csv(str(path_to_files / 'brexit-test-mention-counts'),
                                  compression='gzip')

##### Results
- Count of Tweets: 7,285,533
- Size on disk: 49.6 GB
--------
1. Writing Tweet ID's to gzipped CSV
   - Time: 1.2 min
   - Size on disk: 58 MB
2. Writing full Tweets to gzipped JSON
   - Time: 8.1 min
   - Size on disk: 6.74 GB
3. Writing full Tweets to gzipped CSV
   - Time: 4.4 min
   - Size on disk: 2.16 GB
4. Writing mentions edges to gzipped CSV
   - Time: 1.7 min
   - Size on disk: 5 MB
5. Writing mentions nodes to gzipped CSV
   - Time: 1.9 min
   - Size on disk: 65.3 MB
6. Writing mention counts to gzipped CSV
   - Size on disk: 3.3 MB
   - Time: 1.9 min

##### Retrieval from ES via Spark SQL

In [86]:
# We need to pass as an option a list (string) of the fields with array type in ES
read_options = {"es.nodes": 'localhost',
                       "es.port": "9200",
                       "es.index.auto.create": "false",
                       "es.mapping.id": "tweet_id",
                       "es.resource": index_name,
                       'es.nodes.discovery': 'false',
                       'es.nodes.data.only': 'false',
                        'es.read.field.as.array.include': 'text,urls,mention_user_ids,mention_screen_names,hashtags',
                       "es.nodes.wan.only": "true"}

In [87]:
df_read = spark.read.format("org.elasticsearch.spark.sql")\
                        .options(**read_options)\
                        .load("brexit-test")