In [1]:
from belly import *


In [2]:
def read_schema(path):
    fs = GCSFileSystem(project='trollhunters')
    with fs.open(path, 'rb') as f:
        schema = pickle.load(f)
    return schema


def parse_dates(obj, key, fn):
    _parse = lambda k,v: parse_dates(v,key,fn) if k != key else fn(v)

    if isinstance(obj, dict):
        return {k:_parse(k,v) for k,v in obj.items()}
    elif isinstance(obj, list):
        return [parse_dates(vi, key, fn) for vi in obj]
    else:
        return obj

def replace_timestamps_in_dat(dat, fn):
    dat = parse_dates(dat, 'created_at', fn)
    return dat

In [8]:
from tweepy.utils import parse_datetime

def messages_to_df(spark, schema, messages):
    tweets = [json.loads(msg.value()) for msg in messages]
    tweets = [replace_timestamps_in_dat(t, parse_datetime) for t in tweets]
    tweets = [cast_coords(tw) for tw in tweets]
    return spark.createDataFrame(tweets, schema)

In [4]:
spark = build_spark()
schema = read_schema('gs://spain-tweets/schemas/tweet-clean.pickle')
c = get_consumer()



messages = consume(c, 20000)


# tweets = [json.loads(msg.value()) for msg in messages]
# tweets = [replace_timestamps_in_dat(t) for t in tweets]

In [7]:
from copy import deepcopy

def _cast_doubles(a):
    if type(a) == list:
        return [_cast_doubles(i) for i in a]
    if type(a) == dict:
        return {k:_cast_doubles(v) for k,v in a.items()}
    try:
        return float(a) 
    except ValueError:
        return a
    except TypeError:
        return a


def cast_coords(tw):
    tw = deepcopy(tw)
    if type(tw) != dict:
        return tw

    for k,v in tw.items():
        if k == 'coordinates':
            tw[k] = _cast_doubles(v)
        else:
            tw[k] = cast_coords(v)
    
    return tw
            

In [9]:
schema = read_schema('gs://spain-tweets/schemas/tweet-clean.pickle')

df = messages_to_df(spark, schema, messages)

In [50]:
from os import path


def dedup_data(d, week, year, inpath):
    ids = spark.read.parquet(inpath).select('id').where(f'week = {week} and year = {year}')
    d = d.where(f'week = {week} and year = {year}')
    d = d.join(ids, on='id', how='left_anti')
    return d

def write_out(d, week, year, outpath):
    f = path.join(outpath, f'year={year}', f'week={week}')
    d.where(f'week = {week} and year = {year}').write.mode('append').parquet(f)

def indempotent_write(df, warehouse):
    df.registerTempTable('tweets')
    dd = spark.sql('select *, weekofyear(created_at) as week, month(created_at) as month, year(created_at) as year from tweets')
    dd.registerTempTable('tweets')
    combos = spark.sql('select distinct year, week from tweets').collect()
    for combo in combos:
        week,year = combo.week, combo.year
        d = dedup_data(dd, week, year, warehouse)
        write_out(d, week, year, warehouse)        

In [56]:
indempotent_write(df, 'gs://spain-tweets/datalake')

In [57]:
spark.read.parquet('gs://spain-tweets/datalake').where('year=2020 and week=3').count()

3387

In [58]:
spark.read.parquet('gs://spain-tweets/datalake').where('year=2020 and week=4').count()

16613

In [21]:
spark.read.parquet('gs://spain-tweets/datalake').where('year=2020 and week=3').count()

3387

In [22]:
spark.read.parquet('gs://spain-tweets/datalake').where('year=2020 and week=4').count()

16613

In [505]:
from copy import deepcopy
from pyspark.sql.types import StructField, StructType, ArrayType

def replace_with_type(schema, target, NewType):
    if hasattr(schema, 'fields'):
        new_struct = StructType()
        for field in schema:
            if field.name == target:
                try:
                    dt = NewType()
                except TypeError:
                    dt = deepcopy(NewType)
            else:
                dt = replace_with_type(field.dataType, target, NewType)
                
            new_struct.add(field.name, dt)
        return new_struct
    elif hasattr(schema, 'elementType'):
        dt = replace_with_type(schema.elementType, target, NewType)
        new_array = ArrayType(dt)
        return new_array
    else:
        return schema

def get_field(struct, name):
    return [f.dataType for f in struct.fields if f.name == name][0]

def set_field(struct, name, value):
    new_struct = StructType()
    for f in struct:
        if f.name != name:
            new_struct.add(f)

    struct.add(name, data_type = deepcopy(value))
    return struct


def filter_fields(struct, fields):
    new_struct = StructType()
    for f in struct:
        if f.name not in fields:
            new_struct.add(f)
    return new_struct

def create_schema():
    # random original schema inferred from a bunch of tweets
    schema = read_schema('gs://spain-tweets/schemas/tweet-3.pickle')
    user = get_field(schema, 'user')

    s = deepcopy(schema)
    s = replace_with_type(s, 'source_user', user)
    s = replace_with_type(s, 'created_at', TimestampType)

    s = filter_fields(s, ['th_original', 'th_rehydrated'])
    s = filter_fields(s, ['retweeted_status', 'quoted_status'])

    s = set_field(s, 'quoted_status', s)
    s = set_field(s, 'retweeted_status', s)

    # remove if we no longer want UB original formats
    s = set_field(s, 'th_original', get_field(schema, 'th_original'))
    s = set_field(s, 'th_rehydrated', get_field(schema, 'th_rehydrated'))

    return s


In [507]:
new_schema = create_schema()

In [513]:
import pickle
from gcsfs import GCSFileSystem

new_schema = create_schema()
fs = GCSFileSystem(project='trollhunters')

with fs.open('gs://spain-tweets/schemas/tweet-clean.pickle', 'wb') as f:
    pickle.dump(new_schema, f)