# Convert the raw JSON that was fetched to two-column Parquet, for upload to Athena and for processing

In [1]:
import os
import orjson
import dask
import dask.bag
import dask.dataframe

import numpy as np
import datetime

In [2]:
base_dir = "/rapids/src/tribex/LexicalFraming"

### Data preparation notes

Hang ran this Athena query:

```
with t as (
select actor_id, actor_preferredusername, count(*), max(actor_statusescount) as max_tweetcount
from us_deca inner join usa_loc_litecoder_pq
on lower(ltrim(rtrim(us_deca.actor_location_displayname))) = usa_loc_litecoder_pq.location
where us_deca.actor_friendscount > 10
    AND us_deca.actor_followerscount > 10
    AND actor_favoritescount > 20
    AND state is not null
    AND actor_image is not null
    AND LENGTH(actor_summary) > 10
    AND LENGTH(state) > 1 AND LENGTH(city) > 1
group by actor_id, actor_preferredusername
having count(*) >= 10 and count(*) <= 5000
) select actor_id, actor_preferredusername, max_tweetcount from t
```

This yielded `2445242` users, Athena cache results are here:

`s3://aws-athena-query-results-411833189048-us-east-1/Unsaved/2022/01/21/c0c2c5bb-7a4c-4bfb-9000-2bbdc635cb23.csv`

I downloaded and converted to JSON with this:

`cat c0c2c5bb-7a4c-4bfb-9000-2bbdc635cb23.csv | tail +2 | jq --slurp ".|sort_by(.|tonumber)"  - >usa_tweeters.json`

Then, I grabbed the friends of these users with Martin's `crawl_network` script, which yielded `usa_tweeters_friends.json.gz`


_After fetching the follow graph as a large, compressed JSON, split into multiple files before processing with Dask. __NOTE: now I am using `--line-bytes` argument which preserves lines and targets a desired number of bytes in each split___

`zcat usa_tweeters_friends.json.gz | split --line-bytes=512M - --numeric-suffixes --filter='gzip > $FILE.gz' usa_tweeters_friends-splitfiles/part_`

_Note that 512M is the target output from the split, before the gzip which is yielding ~200MB files_



# Set up Dask and convert follow graph to two column Parquet

In [3]:
from dask.distributed import Client, LocalCluster

dask.config.set({"temporary-directory":"/rapids/src/tribex/LexicalFraming/tmp_dask_dir"})
cluster = LocalCluster(dashboard_address=':58701',
                       n_workers=4, 
                       threads_per_worker=2)
client = Client(cluster)

## Convert to Parquet but also check if there would be any integer ID overflows when uploaded to Athena

In [4]:

MAX_ATHENA_ID = np.uint64((2**63)-1) # This seems to be the largest integer ID that Athena could properly represent



In [11]:
%%time

start_time = datetime.datetime.now()

def convert_follower_friendlist(record):
    follower_uid = np.uint64(record[0])
    friend_uids = np.unique([np.uint64(x) for x in record[1]["result"]])
    return follower_uid,friend_uids
    
dask.bag\
    .read_text(os.path.join(base_dir,"usa_tweeters_friends-splitfiles","part_*.gz"),compression='gzip')\
    .map(orjson.loads)\
    .filter(lambda record: ("result" in record[1]) and (len(record[1]["result"]) > 0))\
    .map(convert_follower_friendlist)\
    .to_dataframe(meta={"follower":np.uint64,"friends":object})\
    .explode("friends")\
    .astype({'friends':np.uint64})\
    .rename(columns={'friends':'friend'})\
    .to_parquet(os.path.join(base_dir, "usa_tweeters_friends-parquet"),
                compression='snappy',
                write_index=False)

#    .set_index('follower',partition_size='128MB')\
# OR .set_index('follower').repartition(partition_size='128MB')\
#
# OR 
#    .set_index('follower')\
# unfortuantely, even this is too much shuffling! Forget setting the index../

# BUT ACTUALLY, let's not do either since I've resplit the JSONs to be about the same output size

finish_time = datetime.datetime.now()
print(f"Compute duration: {finish_time - start_time}")

Compute duration: 0:22:42.119701
CPU times: user 20min 34s, sys: 54.2 s, total: 21min 29s
Wall time: 22min 42s


## <font color="red">NOTE: next time, do not convert ids to uint64, since in the end things work better on Athena as strings</font>

In [18]:
%%time

ddf = dask.dataframe.read_parquet(os.path.join(base_dir, "usa_tweeters_friends-parquet"))
((ddf.index > MAX_ATHENA_ID) | (ddf.friend > MAX_ATHENA_ID)).any().compute()


CPU times: user 40.1 s, sys: 2 s, total: 42.1 s
Wall time: 42.1 s


False

In [19]:
ddf = dask.dataframe.read_parquet(os.path.join(base_dir, "usa_tweeters_friends-parquet"))
ddf.head()

Unnamed: 0,follower,friend
0,13,12
1,13,14
2,13,15
3,13,16
4,13,17


In [22]:
%whos

Variable                      Type            Data/Info
-------------------------------------------------------
Client                        type            <class 'distributed.client.Client'>
LocalCluster                  type            <class 'distributed.deploy.local.LocalCluster'>
MAX_ATHENA_ID                 uint64          9223372036854775807
base_dir                      str             /rapids/src/tribex/LexicalFraming
client                        Client          <Client: 'tcp://127.0.0.1<...>ads=8, memory=125.80 GiB>
cluster                       LocalCluster    LocalCluster(d5712102, 't<...>ads=8, memory=125.80 GiB)
convert_follower_friendlist   function        <function convert_followe<...>ndlist at 0x7fbf2a3a7310>
dask                          module          <module 'dask' from '/opt<...>ckages/dask/__init__.py'>
datetime                      module          <module 'datetime' from '<...>b/python3.8/datetime.py'>
ddf                           DataFrame       Dask DataF