# _Clean & Store in BigQuery_

In [27]:
import utils
import pandas as pd
from google.cloud import storage
from google.cloud import bigquery
import pandas_gbq
from pandas_gbq.gbq import TableCreationError, NotFoundException

%reload_ext autoreload
%autoreload 2
%matplotlib inline

### _May 29, 2020_

In [28]:
#day = '2020-05-24'
#bucket_path = f'gs://thepanacealab_covid19twitter/dailies/{day}/{day}_clean-dataset.json'

In [67]:
def load_from_gcs(bucket_path):
    '''
    Takes path to file in Google Cloud Storage Bucket
    and returns a Pandas DataFrame
    '''
    df = pd.read_json(
        bucket_path,
        lines=True,
        dtype={
            'id_str': str,
            'in_reply_to_status_id_str': str,
            'quoted_status_id_str': str
        }
    )
    return df

In [68]:
#%%time
#df = load_from_gcs(bucket_path)

In [69]:
#%%time
#df = utils.load_data('2020-03-22')

In [70]:
#df.info()

In [71]:
def clean_for_parquet(df):
    '''
    Takes Panda DataFrame, cleans data into appropriate form for BigQuery.
    '''
    cols_of_interest = [
        'created_at',
        'id_str',
        'user',
        'lang',
        'full_text'
    ]
    df2clean = df.loc[:, cols_of_interest]
    df2clean['user_id_str'] = df2clean['user'].apply(lambda user: str(user['id_str']))
    df2clean.drop(labels = 'user', axis = 1, inplace = True)
    dfclean = df2clean[['created_at', 'id_str', 'user_id_str', 'lang', 'full_text']]
    return dfclean

In [34]:
#%%time
#dfclean = clean_for_parquet(df)
#assert len(dfclean) == len(df)

In [35]:
#dfclean.info()

In [74]:
def parquet_to_gcs(df, day):
    df.to_parquet(
        f'gs://thepanacealab_covid19twitter/dailies/{day}/{day}_tweets.parquet'
    )
    print('Dataframe uploaded to bucket as parquet file.')

In [75]:
def data_prep_wrapper(day):
    bucket_path = (
        f'gs://thepanacealab_covid19twitter/dailies/'
        + f'{day}/{day}_clean-dataset.json'
    )
    print(f'Loading data for {day}...')
    df = load_from_gcs(bucket_path)
    print(f'Cleaning data for {day}...')
    dfclean = clean_for_parquet(df)
    assert len(dfclean) == len(df)
    print(f'Converting to parquet file & storing in {day} bucket.')
    parquet_to_gcs(dfclean, day)
    print(f'{day} successfully converted and stored in Storage.\n')

In [76]:
#day = '2020-05-24'

In [77]:
#%%time
#data_prep_wrapper(day)

In [78]:
def list_json_dates(bucket_name='thepanacealab_covid19twitter'):
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    blobs = bucket.list_blobs(prefix='dailies/')
    json_files = [
        str(i).split(',')[1].strip() for i in blobs 
        if str(i).split(',')[1].endswith('.json')
    ]
    json_dates = [i.split('/')[1] for i in json_files]
    return json_dates

In [79]:
def list_parquet_dates(bucket_name='thepanacealab_covid19twitter'):
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    blobs = bucket.list_blobs(prefix='dailies/')
    parquet_files = [
        str(i).split(',')[1].strip() for i in blobs 
        if str(i).split(',')[1].endswith('.parquet')
    ]
    parquet_dates = [
        i.split('/')[1] for i in parquet_files
    ]
    return parquet_dates

In [80]:
json_dates = list_json_dates()
parquet_dates = list_parquet_dates()
print(parquet_dates)

['2020-05-23', '2020-05-24', '2020-05-25']


In [81]:
print(json_dates)

['2020-03-22', '2020-03-23', '2020-03-24', '2020-03-25', '2020-03-26', '2020-03-27', '2020-03-28', '2020-03-29', '2020-03-30', '2020-03-31', '2020-04-01', '2020-04-02', '2020-04-03', '2020-04-04', '2020-04-05', '2020-04-06', '2020-04-07', '2020-04-08', '2020-04-09', '2020-04-10', '2020-04-11', '2020-04-12', '2020-04-13', '2020-04-14', '2020-04-15', '2020-04-16', '2020-04-17', '2020-04-18', '2020-04-19', '2020-04-20', '2020-04-21', '2020-04-22', '2020-04-23', '2020-04-24', '2020-04-25', '2020-04-26', '2020-04-27', '2020-04-28', '2020-04-29', '2020-04-30', '2020-05-01', '2020-05-02', '2020-05-03', '2020-05-04', '2020-05-05', '2020-05-06', '2020-05-07', '2020-05-08', '2020-05-09', '2020-05-10', '2020-05-11', '2020-05-12', '2020-05-13', '2020-05-14', '2020-05-15', '2020-05-16', '2020-05-17', '2020-05-18', '2020-05-19', '2020-05-20', '2020-05-21', '2020-05-22', '2020-05-23', '2020-05-24', '2020-05-25', '2020-05-27']


In [82]:
need_parquet = sorted(list(set(json_dates) - set(parquet_dates)))
print(need_parquet)

['2020-03-22', '2020-03-23', '2020-03-24', '2020-03-25', '2020-03-26', '2020-03-27', '2020-03-28', '2020-03-29', '2020-03-30', '2020-03-31', '2020-04-01', '2020-04-02', '2020-04-03', '2020-04-04', '2020-04-05', '2020-04-06', '2020-04-07', '2020-04-08', '2020-04-09', '2020-04-10', '2020-04-11', '2020-04-12', '2020-04-13', '2020-04-14', '2020-04-15', '2020-04-16', '2020-04-17', '2020-04-18', '2020-04-19', '2020-04-20', '2020-04-21', '2020-04-22', '2020-04-23', '2020-04-24', '2020-04-25', '2020-04-26', '2020-04-27', '2020-04-28', '2020-04-29', '2020-04-30', '2020-05-01', '2020-05-02', '2020-05-03', '2020-05-04', '2020-05-05', '2020-05-06', '2020-05-07', '2020-05-08', '2020-05-09', '2020-05-10', '2020-05-11', '2020-05-12', '2020-05-13', '2020-05-14', '2020-05-15', '2020-05-16', '2020-05-17', '2020-05-18', '2020-05-19', '2020-05-20', '2020-05-21', '2020-05-22', '2020-05-27']


In [84]:
%%time

# time with 3 most recent days
for day in need_parquet[-3:]:
    data_prep_wrapper(day)

Loading data for 2020-05-21...
Cleaning data for 2020-05-21...
Converting to parquet file & storing in 2020-05-21 bucket.
Dataframe uploaded to bucket as parquet file.
2020-05-21 successfully converted and stored in Storage.
Loading data for 2020-05-22...
Cleaning data for 2020-05-22...
Converting to parquet file & storing in 2020-05-22 bucket.
Dataframe uploaded to bucket as parquet file.
2020-05-22 successfully converted and stored in Storage.
Loading data for 2020-05-27...
Cleaning data for 2020-05-27...
Converting to parquet file & storing in 2020-05-27 bucket.
Dataframe uploaded to bucket as parquet file.
2020-05-27 successfully converted and stored in Storage.
CPU times: user 18min, sys: 1min 24s, total: 19min 25s
Wall time: 19min 45s


In [13]:
#df.info()

In [14]:
#def bq_create_table(df, day):
#    '''
#    Takes Pandas DataFrame, and it's associated date, and stores in BigQuery table
#   '''
#   bigquery_client = bigquery.Client()
#    dataset_ref = bigquery_client.dataset('twitter_dailies')
#    # BigQuery only accepts underscores for table names
#    bq_day = day.replace('-', '_')
#    # Prepares a reference to the table
#    table_ref = dataset_ref.table(bq_day)
#
#    try:
#        found = bigquery_client.get_table(table_ref)
#        return found
#    except Exception as e:
#        table_ref = dataset_ref.table(bq_day)
#        job = bigquery_client.load_table_from_dataframe(df, table_ref, location='US')
        # waits for table load to complete
#        job.result()
#        print(
#            f'Loaded dataframe from {day} with {job.output_rows} observations into',
#            + f' {table_ref.path}'
#        )

In [15]:
def bq_create_table(df, day):
    '''
    Takes Pandas DataFrame, and it's associated date, and stores in BigQuery table
    '''
    # BigQuery only accepts underscores for table names
    bq_day = day.replace('-', '_')
    # prepares reference to the table
    try:
        table_schema = [
            {'name': 'created_at', 'type': 'TIMESTAMP'},
            {'name': 'id_str', 'type': 'STRING'},
            {'name': 'user_id_str', 'type': 'STRING'},
            {'name': 'lang', 'type': 'STRING'},
            {'name': 'full_text', 'type': 'STRING'}
        ]
        df.to_gbq(
            project_id='covid-disinfo-detect',
            destination_table=f'twitter_dailies.{bq_day}',
            if_exists='fail'
        )
    except TableCreationError as e:
        pass

In [16]:
#def bq_create_table(df, day):
#    '''
#    Takes Pandas DataFrame, and it's associated date, and stores in BigQuery table
#    '''
#    client = bigquery.Client()
#    dataset_ref = client.dataset('twitter_dailies')
#    # BigQuery only accepts underscores for table names
#    bq_day = day.replace('-', '_')
#    # prepares reference to the table
#    table_ref = dataset_ref.table(bq_day)
#    try:
#        found = client.get_table(table_ref)
#        print(found)
#    except NotFoundException as e:
#        job_config = bigquery.LoadJobConfig(schema=[
#            bigquery.SchemaField('created_at', 'TIMESTAMP'),
#            bigquery.SchemaField('id_str', 'STRING'),
#            bigquery.SchemaField('user_id_str', 'STRING'),
#            bigquery.SchemaField('lang', 'STRING'),
#            bigquery.SchemaField('full_text', 'STRING')
#        ])
#        job = client.load_table_from_dataframe(
#            df, table_ref, job_config=job_config
#        )
#        job.result()
#        print("Loaded dataframe to {}".format(table_ref.path))

In [17]:
#%%time
#bq_create_table(dfclean, day)

In [18]:
def data_ingestion_bigquery(day):
    bucket_path = (
        f'gs://thepanacealab_covid19twitter/dailies/{day}/'
        + f'{day}_clean-dataset.json'
    )
    print(f'Loading data for {day}...')
    df = load_from_gcs(bucket_path)
    print(f'Loaded data for {day}...')
    dfclean = clean_for_bigquery(df)
    print(f'Cleaned data for BigQuery...')
    assert len(dfclean) == len(df)
    bq_create_table(dfclean, day)

In [19]:
day = '2020-05-25'
#day_list = ['2020-05-25', '2020-05-24', '2020-05-23']

In [20]:
#%%time
#for day in day_list:
#    print(day)

In [21]:
#%%time
#for day in day_list:
#    data_ingestion_bigquery(day)

In [22]:
%%time
data_ingestion_bigquery(day)

Loading data for 2020-05-25...
Loaded data for 2020-05-25...
Cleaned data for BigQuery...


1it [00:13, 13.10s/it]

GenericGBQException: Reason: 400 Error while reading data, error message: CSV table encountered too many errors, giving up. Rows: 194510; errors: 1. Please look into the errors[] collection for more details.

In [2]:
%%time
#utils.download_json('2020-05-19')

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 6.2 µs


In [3]:
sb_client = storage.Client()
buckets = sb_client.list_buckets()

print("Buckets in {}:".format(sb_client.project))
for item in buckets:
    print("\t" + item.name)

Buckets in covid-disinfo-detect:
	covid-disinfo-detect.appspot.com
	staging.covid-disinfo-detect.appspot.com
	thepanacealab_covid19twitter


In [None]:
from datetime import timedelta, date

def daterange(date1, date2):
    for n in range(int((date2 - date1).days)+1):
        yield date1 + timedelta(n)
        
start_dt = date(2015, 12, 20)
end_dt = date(2016, 1, 11)
for dt in daterange(start_dt, end_dt):
    print(dt.strftime("%Y-%m-%d"))

In [4]:
def blob_exists(day, bucket_name='thepanacealab_covid19twitter'):
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    blob = bucket.blob(f'dailies/{day}/{day}_clean-dataset.json')
    return blob.exists()

In [7]:
def list_blobs_bucket(bucket_name='thepanacealab_covid19twitter'):
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    blobs = list(bucket.list_blobs(prefix='dailies/', suff))
    return blobs

In [8]:
list_blobs_bucket()

[]

In [4]:
blob_exists('2020-05-21')

True

In [19]:
def bigquery_tables_list():
    # gather list of tables in bigquery
    client = bigquery.Client()
    dataset = client.get_dataset('twitter_dailies')
    tables = list(client.list_tables(dataset))
    return [table.table_id for table in tables]

In [77]:
bq_tables = bigquery_tables_list()

In [81]:
def list_blobs_bucket(bucket_name='thepanacealab_covid19twitter'):
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    blobs = bucket.list_blobs(prefix='dailies/')
    json_files = [
        str(i).split(',')[1].strip() for i in blobs if str(i).split(',')[1].endswith('.json')
    ]
    storage_dates = [i.split('/')[1].replace('-', '_') for i in json_files]
    return json_files, storage_dates

In [82]:
json_files, storage_dates = list_blobs_bucket()

In [83]:
bigquery_tables_list()

['2020_05_19', '2020_05_20']

In [84]:
storage_dates

['2020_03_22',
 '2020_03_23',
 '2020_03_24',
 '2020_03_25',
 '2020_03_26',
 '2020_03_27',
 '2020_03_28',
 '2020_03_29',
 '2020_03_30',
 '2020_03_31',
 '2020_04_01',
 '2020_04_02',
 '2020_04_03',
 '2020_04_04',
 '2020_04_05',
 '2020_04_06',
 '2020_04_07',
 '2020_04_08',
 '2020_04_09',
 '2020_04_10',
 '2020_04_11',
 '2020_04_12',
 '2020_04_13',
 '2020_04_14',
 '2020_04_15',
 '2020_04_16',
 '2020_04_17',
 '2020_04_18',
 '2020_04_19',
 '2020_04_20',
 '2020_04_21',
 '2020_04_22',
 '2020_04_23',
 '2020_04_24',
 '2020_04_25',
 '2020_04_26',
 '2020_04_27',
 '2020_04_28',
 '2020_04_29',
 '2020_04_30',
 '2020_05_01',
 '2020_05_02',
 '2020_05_03',
 '2020_05_04',
 '2020_05_05',
 '2020_05_06',
 '2020_05_07',
 '2020_05_08',
 '2020_05_09',
 '2020_05_10',
 '2020_05_11',
 '2020_05_12',
 '2020_05_13',
 '2020_05_14',
 '2020_05_15',
 '2020_05_16',
 '2020_05_17',
 '2020_05_18',
 '2020_05_19',
 '2020_05_20',
 '2020_05_21',
 '2020_05_22',
 '2020_05_23']

In [86]:
notinbq = sorted(list(set(storage_dates) - set(bq_tables)))
notinbq

['2020_03_22',
 '2020_03_23',
 '2020_03_24',
 '2020_03_25',
 '2020_03_26',
 '2020_03_27',
 '2020_03_28',
 '2020_03_29',
 '2020_03_30',
 '2020_03_31',
 '2020_04_01',
 '2020_04_02',
 '2020_04_03',
 '2020_04_04',
 '2020_04_05',
 '2020_04_06',
 '2020_04_07',
 '2020_04_08',
 '2020_04_09',
 '2020_04_10',
 '2020_04_11',
 '2020_04_12',
 '2020_04_13',
 '2020_04_14',
 '2020_04_15',
 '2020_04_16',
 '2020_04_17',
 '2020_04_18',
 '2020_04_19',
 '2020_04_20',
 '2020_04_21',
 '2020_04_22',
 '2020_04_23',
 '2020_04_24',
 '2020_04_25',
 '2020_04_26',
 '2020_04_27',
 '2020_04_28',
 '2020_04_29',
 '2020_04_30',
 '2020_05_01',
 '2020_05_02',
 '2020_05_03',
 '2020_05_04',
 '2020_05_05',
 '2020_05_06',
 '2020_05_07',
 '2020_05_08',
 '2020_05_09',
 '2020_05_10',
 '2020_05_11',
 '2020_05_12',
 '2020_05_13',
 '2020_05_14',
 '2020_05_15',
 '2020_05_16',
 '2020_05_17',
 '2020_05_18',
 '2020_05_21',
 '2020_05_22',
 '2020_05_23']

In [None]:
for day in notinbq[:2]:
    day = day.replace('_', '-')
    utils.download_json(day)
    print(f'Downloaded data from {day}..')

Blob dailies/2020-03-22/2020-03-22_clean-dataset.json downloaded to /home/jupyter/covid_disinfo_detect/experiments/playground_data/2020-03-22_clean-dataset.json.
Downloaded data from 2020-03-22..


In [24]:
def load_data(date, chunksize=50000):
    cols_interest = [
        'created_at',
        'id_str',
        'user',
        'lang',
        'full_text'
    ]
    
    chunks = pd.read_json(
        f'playground_data/{date}_clean-dataset.json',
        lines=True,
        chunksize=chunksize,
        dtype={
            'id_str': str,
            'in_reply_to_status_id_str': str,
            'quoted_status_id_str': str
        }
    )
    
    df = pd.concat(chunk for chunk in chunks)
    print('Loaded data...\n')
    return df[cols_interest]


def clean_data(df):
    df2clean = df.copy()
    df2clean['user_id_str'] = df2clean['user'].apply(lambda user: str(user['id_str']))
    df2clean.drop(labels = 'user', axis = 1, inplace = True)
    dfclean = df2clean[['created_at', 'id_str', 'user_id_str', 'lang', 'full_text']]
    print('Cleaned data...\n')
    return dfclean


def data_setup(date):
    df = clean_data(load_data(date))
    return df


def load_bigquery(df, date):
    client = bigquery.Client(location='US')
    dataset = client.get_dataset('twitter_dailies')
    bq_date = date.replace('-', '_')    # need to slightly change date string
    table_ref = dataset.table(bq_date)
    job = client.load_table_from_dataframe(df, table_ref, location='US')
    job.result()
    print(f'Loaded {job.output_rows} rows from dataframe to {table_ref.path}\n')
    
    
def data_bq_wrapper():
    date = input('What is the date of the data that you would like to store in BigQuery?\n')
    df = data_setup(date)
    load_bigquery(df, date)
    print(f'Data from {date} successfully stored in BigQuery.\n')

In [25]:
%%time
data_bq_wrapper()

What is the date of the data that you would like to store in BigQuery?
 2020-05-19


Loaded data...

Cleaned data...

Loaded 1009123 rows from dataframe to /projects/covid-disinfo-detect/datasets/twitter_dailies/tables/2020_05_19

Data from 2020-05-19 successfully stored in BigQuery.

CPU times: user 4min 13s, sys: 8.08 s, total: 4min 21s
Wall time: 5min 25s
